Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions crdt.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,9 @@ var (
// all replicas and to retrieve payloads broadcasted.
type Broadcaster interface {
// Send payload to other replicas.
Broadcast([]byte) error
Broadcast(context.Context, []byte) error
// Obtain the next payload received from the network.
Next() ([]byte, error)
Next(context.Context) ([]byte, error)
}

// A SessionDAGService is a Sessions-enabled DAGService. This type of DAG-Service
Expand Down Expand Up @@ -351,7 +351,7 @@ func (store *Datastore) handleNext(ctx context.Context) {
default:
}

data, err := store.broadcaster.Next()
data, err := store.broadcaster.Next(ctx)
if err != nil {
if err == ErrNoMoreBroadcast || ctx.Err() != nil {
return
Expand Down Expand Up @@ -1301,7 +1301,7 @@ func (store *Datastore) broadcast(ctx context.Context, cids []cid.Cid) error {
return err
}

err = store.broadcaster.Broadcast(bcastBytes)
err = store.broadcaster.Broadcast(ctx, bcastBytes)
if err != nil {
return errors.Wrapf(err, "error broadcasting %s", cids)
}
Expand Down
6 changes: 4 additions & 2 deletions crdt_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ func newBroadcasters(t testing.TB, n int) ([]*mockBroadcaster, context.CancelFun
return broadcasters, cancel
}

func (mb *mockBroadcaster) Broadcast(data []byte) error {
func (mb *mockBroadcaster) Broadcast(ctx context.Context, data []byte) error {
var wg sync.WaitGroup

randg := rand.New(rand.NewSource(time.Now().UnixNano()))
Expand Down Expand Up @@ -155,10 +155,12 @@ func (mb *mockBroadcaster) Broadcast(data []byte) error {
return nil
}

func (mb *mockBroadcaster) Next() ([]byte, error) {
func (mb *mockBroadcaster) Next(ctx context.Context) ([]byte, error) {
select {
case data := <-mb.myChan:
return data, nil
case <-ctx.Done():
return nil, ErrNoMoreBroadcast
case <-mb.ctx.Done():
return nil, ErrNoMoreBroadcast
}
Expand Down
2 changes: 2 additions & 0 deletions pubsub_broadcaster.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
pubsub "github.com/libp2p/go-libp2p-pubsub"
)

var _ Broadcaster = (*PubSubBroadcaster)(nil)

// PubSubBroadcaster implements a Broadcaster using libp2p PubSub.
type PubSubBroadcaster struct {
ctx context.Context
Expand Down