Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
335 changes: 335 additions & 0 deletions floodsub_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1063,3 +1063,338 @@ func TestImproperlySignedMessageRejected(t *testing.T) {
)
}
}

func TestSubscriptionJoinNotification(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

const numLateSubscribers = 10
const numHosts = 20
hosts := getNetHosts(t, ctx, numHosts)

psubs := getPubsubs(ctx, hosts)

msgs := make([]*Subscription, numHosts)
subPeersFound := make([]map[peer.ID]struct{}, numHosts)

// Have some peers subscribe earlier than other peers.
// This exercises whether we get subscription notifications from
// existing peers.
for i, ps := range psubs[numLateSubscribers:] {
subch, err := ps.Subscribe("foobar")
if err != nil {
t.Fatal(err)
}

msgs[i] = subch
}

connectAll(t, hosts)

time.Sleep(time.Millisecond * 100)

// Have the rest subscribe
for i, ps := range psubs[:numLateSubscribers] {
subch, err := ps.Subscribe("foobar")
if err != nil {
t.Fatal(err)
}

msgs[i+numLateSubscribers] = subch
}

wg := sync.WaitGroup{}
for i := 0; i < numHosts; i++ {
peersFound := make(map[peer.ID]struct{})
subPeersFound[i] = peersFound
sub := msgs[i]
wg.Add(1)
go func(peersFound map[peer.ID]struct{}) {
defer wg.Done()
for len(peersFound) < numHosts-1 {
event, err := sub.NextPeerEvent(ctx)
if err != nil {
t.Fatal(err)
}
if event.Type == PeerJoin {
peersFound[event.Peer] = struct{}{}
}
}
}(peersFound)
}

wg.Wait()
for _, peersFound := range subPeersFound {
if len(peersFound) != numHosts-1 {
t.Fatal("incorrect number of peers found")
}
}
}

func TestSubscriptionLeaveNotification(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

const numHosts = 20
hosts := getNetHosts(t, ctx, numHosts)

psubs := getPubsubs(ctx, hosts)

msgs := make([]*Subscription, numHosts)
subPeersFound := make([]map[peer.ID]struct{}, numHosts)

// Subscribe all peers and wait until they've all been found
for i, ps := range psubs {
subch, err := ps.Subscribe("foobar")
if err != nil {
t.Fatal(err)
}

msgs[i] = subch
}

connectAll(t, hosts)

time.Sleep(time.Millisecond * 100)

wg := sync.WaitGroup{}
for i := 0; i < numHosts; i++ {
peersFound := make(map[peer.ID]struct{})
subPeersFound[i] = peersFound
sub := msgs[i]
wg.Add(1)
go func(peersFound map[peer.ID]struct{}) {
defer wg.Done()
for len(peersFound) < numHosts-1 {
event, err := sub.NextPeerEvent(ctx)
if err != nil {
t.Fatal(err)
}
if event.Type == PeerJoin {
peersFound[event.Peer] = struct{}{}
}
}
}(peersFound)
}

wg.Wait()
for _, peersFound := range subPeersFound {
if len(peersFound) != numHosts-1 {
t.Fatal("incorrect number of peers found")
}
}

// Test removing peers and verifying that they cause events
msgs[1].Cancel()
hosts[2].Close()
psubs[0].BlacklistPeer(hosts[3].ID())

leavingPeers := make(map[peer.ID]struct{})
for len(leavingPeers) < 3 {
event, err := msgs[0].NextPeerEvent(ctx)
if err != nil {
t.Fatal(err)
}
if event.Type == PeerLeave {
leavingPeers[event.Peer] = struct{}{}
}
}

if _, ok := leavingPeers[hosts[1].ID()]; !ok {
t.Fatal(fmt.Errorf("canceling subscription did not cause a leave event"))
}
if _, ok := leavingPeers[hosts[2].ID()]; !ok {
t.Fatal(fmt.Errorf("closing host did not cause a leave event"))
}
if _, ok := leavingPeers[hosts[3].ID()]; !ok {
t.Fatal(fmt.Errorf("blacklisting peer did not cause a leave event"))
}
}

func TestSubscriptionManyNotifications(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

const topic = "foobar"

const numHosts = 35
hosts := getNetHosts(t, ctx, numHosts)

psubs := getPubsubs(ctx, hosts)

msgs := make([]*Subscription, numHosts)
subPeersFound := make([]map[peer.ID]struct{}, numHosts)

// Subscribe all peers except one and wait until they've all been found
for i := 1; i < numHosts; i++ {
subch, err := psubs[i].Subscribe(topic)
if err != nil {
t.Fatal(err)
}

msgs[i] = subch
}

connectAll(t, hosts)

time.Sleep(time.Millisecond * 100)

wg := sync.WaitGroup{}
for i := 1; i < numHosts; i++ {
peersFound := make(map[peer.ID]struct{})
subPeersFound[i] = peersFound
sub := msgs[i]
wg.Add(1)
go func(peersFound map[peer.ID]struct{}) {
defer wg.Done()
for len(peersFound) < numHosts-2 {
event, err := sub.NextPeerEvent(ctx)
if err != nil {
t.Fatal(err)
}
if event.Type == PeerJoin {
peersFound[event.Peer] = struct{}{}
}
}
}(peersFound)
}

wg.Wait()
for _, peersFound := range subPeersFound[1:] {
if len(peersFound) != numHosts-2 {
t.Fatalf("found %d peers, expected %d", len(peersFound), numHosts-2)
}
}

// Wait for remaining peer to find other peers
for len(psubs[0].ListPeers(topic)) < numHosts-1 {
time.Sleep(time.Millisecond * 100)
}

// Subscribe the remaining peer and check that all the events came through
sub, err := psubs[0].Subscribe(topic)
if err != nil {
t.Fatal(err)
}

msgs[0] = sub

peerState := readAllQueuedEvents(ctx, t, sub)

if len(peerState) != numHosts-1 {
t.Fatal("incorrect number of peers found")
}

for _, e := range peerState {
if e != PeerJoin {
t.Fatal("non Join event occurred")
}
}

// Unsubscribe all peers except one and check that all the events came through
for i := 1; i < numHosts; i++ {
msgs[i].Cancel()
}

// Wait for remaining peer to disconnect from the other peers
for len(psubs[0].ListPeers(topic)) != 0 {
time.Sleep(time.Millisecond * 100)
}

peerState = readAllQueuedEvents(ctx, t, sub)

if len(peerState) != numHosts-1 {
t.Fatal("incorrect number of peers found")
}

for _, e := range peerState {
if e != PeerLeave {
t.Fatal("non Leave event occurred")
}
}
}

func TestSubscriptionNotificationSubUnSub(t *testing.T) {
// Resubscribe and Unsubscribe a peers and check the state for consistency
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

const topic = "foobar"

const numHosts = 35
hosts := getNetHosts(t, ctx, numHosts)
psubs := getPubsubs(ctx, hosts)

for i := 1; i < numHosts; i++ {
connect(t, hosts[0], hosts[i])
}
time.Sleep(time.Millisecond * 100)

notifSubThenUnSub(ctx, t, topic, psubs)
}

func notifSubThenUnSub(ctx context.Context, t *testing.T, topic string,
psubs []*PubSub) {

ps := psubs[0]
msgs := make([]*Subscription, len(psubs))
checkSize := len(psubs) - 1

// Subscribe all peers to the topic
var err error
for i, ps := range psubs {
msgs[i], err = ps.Subscribe(topic)
if err != nil {
t.Fatal(err)
}
}

sub := msgs[0]

// Wait for the primary peer to be connected to the other peers
for len(ps.ListPeers(topic)) < checkSize {
time.Sleep(time.Millisecond * 100)
}

// Unsubscribe all peers except the primary
for i := 1; i < checkSize+1; i++ {
msgs[i].Cancel()
}

// Wait for the unsubscribe messages to reach the primary peer
for len(ps.ListPeers(topic)) < 0 {
time.Sleep(time.Millisecond * 100)
}

// read all available events and verify that there are no events to process
// this is because every peer that joined also left
peerState := readAllQueuedEvents(ctx, t, sub)

if len(peerState) != 0 {
for p, s := range peerState {
fmt.Println(p, s)
}
t.Fatalf("Received incorrect events. %d extra events", len(peerState))
}
}

func readAllQueuedEvents(ctx context.Context, t *testing.T, sub *Subscription) map[peer.ID]EventType {
peerState := make(map[peer.ID]EventType)
for {
ctx, _ := context.WithTimeout(ctx, time.Millisecond*100)
event, err := sub.NextPeerEvent(ctx)
if err == context.DeadlineExceeded {
break
} else if err != nil {
t.Fatal(err)
}

e, ok := peerState[event.Peer]
if !ok {
peerState[event.Peer] = event.Type
} else if e != event.Type {
delete(peerState, event.Peer)
}
}
return peerState
}
Loading