Skip to content

Commit 55f4ad6

Browse files
authored
Merge pull request #184 from aschmahmann/feat/discovery
Add Discovery
2 parents aa9a875 + f9c26c2 commit 55f4ad6

File tree

14 files changed

+1740
-507
lines changed

14 files changed

+1740
-507
lines changed

comm.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ import (
1919
// get the initial RPC containing all of our subscriptions to send to new peers
2020
func (p *PubSub) getHelloPacket() *RPC {
2121
var rpc RPC
22-
for t := range p.myTopics {
22+
for t := range p.mySubs {
2323
as := &pb.RPC_SubOpts{
2424
Topicid: proto.String(t),
2525
Subscribe: proto.Bool(true),

discovery.go

Lines changed: 336 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,336 @@
1+
package pubsub
2+
3+
import (
4+
"context"
5+
"math/rand"
6+
"time"
7+
8+
"github.com/libp2p/go-libp2p-core/discovery"
9+
"github.com/libp2p/go-libp2p-core/host"
10+
"github.com/libp2p/go-libp2p-core/peer"
11+
discimpl "github.com/libp2p/go-libp2p-discovery"
12+
)
13+
14+
var (
15+
// poll interval
16+
17+
// DiscoveryPollInitialDelay is how long the discovery system waits after it first starts before polling
18+
DiscoveryPollInitialDelay = 0 * time.Millisecond
19+
// DiscoveryPollInterval is approximately how long the discovery system waits in between checks for whether the
20+
// more peers are needed for any topic
21+
DiscoveryPollInterval = 1 * time.Second
22+
)
23+
24+
type DiscoverOpt func(*discoverOptions) error
25+
26+
type discoverOptions struct {
27+
connFactory BackoffConnectorFactory
28+
opts []discovery.Option
29+
}
30+
31+
func defaultDiscoverOptions() *discoverOptions {
32+
rng := rand.New(rand.NewSource(rand.Int63()))
33+
minBackoff, maxBackoff := time.Second*10, time.Hour
34+
cacheSize := 100
35+
dialTimeout := time.Minute * 2
36+
discoverOpts := &discoverOptions{
37+
connFactory: func(host host.Host) (*discimpl.BackoffConnector, error) {
38+
backoff := discimpl.NewExponentialBackoff(minBackoff, maxBackoff, discimpl.FullJitter, time.Second, 5.0, 0, rng)
39+
return discimpl.NewBackoffConnector(host, cacheSize, dialTimeout, backoff)
40+
},
41+
}
42+
43+
return discoverOpts
44+
}
45+
46+
// discover represents the discovery pipeline.
47+
// The discovery pipeline handles advertising and discovery of peers
48+
type discover struct {
49+
p *PubSub
50+
51+
// discovery assists in discovering and advertising peers for a topic
52+
discovery discovery.Discovery
53+
54+
// advertising tracks which topics are being advertised
55+
advertising map[string]context.CancelFunc
56+
57+
// discoverQ handles continuing peer discovery
58+
discoverQ chan *discoverReq
59+
60+
// ongoing tracks ongoing discovery requests
61+
ongoing map[string]struct{}
62+
63+
// done handles completion of a discovery request
64+
done chan string
65+
66+
// connector handles connecting to new peers found via discovery
67+
connector *discimpl.BackoffConnector
68+
69+
// options are the set of options to be used to complete struct construction in Start
70+
options *discoverOptions
71+
}
72+
73+
// MinTopicSize returns a function that checks if a router is ready for publishing based on the topic size.
74+
// The router ultimately decides the whether it is ready or not, the given size is just a suggestion.
75+
func MinTopicSize(size int) RouterReady {
76+
return func(rt PubSubRouter, topic string) (bool, error) {
77+
return rt.EnoughPeers(topic, size), nil
78+
}
79+
}
80+
81+
// Start attaches the discovery pipeline to a pubsub instance, initializes discovery and starts event loop
82+
func (d *discover) Start(p *PubSub, opts ...DiscoverOpt) error {
83+
if d.discovery == nil || p == nil {
84+
return nil
85+
}
86+
87+
d.p = p
88+
d.advertising = make(map[string]context.CancelFunc)
89+
d.discoverQ = make(chan *discoverReq, 32)
90+
d.ongoing = make(map[string]struct{})
91+
d.done = make(chan string)
92+
93+
conn, err := d.options.connFactory(p.host)
94+
if err != nil {
95+
return err
96+
}
97+
d.connector = conn
98+
99+
go d.discoverLoop()
100+
go d.pollTimer()
101+
102+
return nil
103+
}
104+
105+
func (d *discover) pollTimer() {
106+
select {
107+
case <-time.After(DiscoveryPollInitialDelay):
108+
case <-d.p.ctx.Done():
109+
return
110+
}
111+
112+
select {
113+
case d.p.eval <- d.requestDiscovery:
114+
case <-d.p.ctx.Done():
115+
return
116+
}
117+
118+
ticker := time.NewTicker(DiscoveryPollInterval)
119+
defer ticker.Stop()
120+
121+
for {
122+
select {
123+
case <-ticker.C:
124+
select {
125+
case d.p.eval <- d.requestDiscovery:
126+
case <-d.p.ctx.Done():
127+
return
128+
}
129+
case <-d.p.ctx.Done():
130+
return
131+
}
132+
}
133+
}
134+
135+
func (d *discover) requestDiscovery() {
136+
for t := range d.p.myTopics {
137+
if !d.p.rt.EnoughPeers(t, 0) {
138+
d.discoverQ <- &discoverReq{topic: t, done: make(chan struct{}, 1)}
139+
}
140+
}
141+
}
142+
143+
func (d *discover) discoverLoop() {
144+
for {
145+
select {
146+
case discover := <-d.discoverQ:
147+
topic := discover.topic
148+
149+
if _, ok := d.ongoing[topic]; ok {
150+
discover.done <- struct{}{}
151+
continue
152+
}
153+
154+
d.ongoing[topic] = struct{}{}
155+
156+
go func() {
157+
d.handleDiscovery(d.p.ctx, topic, discover.opts)
158+
select {
159+
case d.done <- topic:
160+
case <-d.p.ctx.Done():
161+
}
162+
discover.done <- struct{}{}
163+
}()
164+
case topic := <-d.done:
165+
delete(d.ongoing, topic)
166+
case <-d.p.ctx.Done():
167+
return
168+
}
169+
}
170+
}
171+
172+
// Advertise advertises this node's interest in a topic to a discovery service. Advertise is not thread-safe.
173+
func (d *discover) Advertise(topic string) {
174+
if d.discovery == nil {
175+
return
176+
}
177+
178+
advertisingCtx, cancel := context.WithCancel(d.p.ctx)
179+
180+
if _, ok := d.advertising[topic]; ok {
181+
cancel()
182+
return
183+
}
184+
d.advertising[topic] = cancel
185+
186+
go func() {
187+
next, err := d.discovery.Advertise(advertisingCtx, topic)
188+
if err != nil {
189+
log.Warningf("bootstrap: error providing rendezvous for %s: %s", topic, err.Error())
190+
}
191+
192+
t := time.NewTimer(next)
193+
for {
194+
select {
195+
case <-t.C:
196+
next, err = d.discovery.Advertise(advertisingCtx, topic)
197+
if err != nil {
198+
log.Warningf("bootstrap: error providing rendezvous for %s: %s", topic, err.Error())
199+
}
200+
t.Reset(next)
201+
case <-advertisingCtx.Done():
202+
t.Stop()
203+
return
204+
}
205+
}
206+
}()
207+
}
208+
209+
// StopAdvertise stops advertising this node's interest in a topic. StopAdvertise is not thread-safe.
210+
func (d *discover) StopAdvertise(topic string) {
211+
if d.discovery == nil {
212+
return
213+
}
214+
215+
if advertiseCancel, ok := d.advertising[topic]; ok {
216+
advertiseCancel()
217+
delete(d.advertising, topic)
218+
}
219+
}
220+
221+
// Discover searches for additional peers interested in a given topic
222+
func (d *discover) Discover(topic string, opts ...discovery.Option) {
223+
if d.discovery == nil {
224+
return
225+
}
226+
227+
d.discoverQ <- &discoverReq{topic, opts, make(chan struct{}, 1)}
228+
}
229+
230+
// Bootstrap attempts to bootstrap to a given topic. Returns true if bootstrapped successfully, false otherwise.
231+
func (d *discover) Bootstrap(ctx context.Context, topic string, ready RouterReady, opts ...discovery.Option) bool {
232+
if d.discovery == nil {
233+
return true
234+
}
235+
236+
t := time.NewTimer(time.Hour)
237+
if !t.Stop() {
238+
<-t.C
239+
}
240+
241+
for {
242+
// Check if ready for publishing
243+
bootstrapped := make(chan bool, 1)
244+
select {
245+
case d.p.eval <- func() {
246+
done, _ := ready(d.p.rt, topic)
247+
bootstrapped <- done
248+
}:
249+
if <-bootstrapped {
250+
return true
251+
}
252+
case <-d.p.ctx.Done():
253+
return false
254+
case <-ctx.Done():
255+
return false
256+
}
257+
258+
// If not ready discover more peers
259+
disc := &discoverReq{topic, opts, make(chan struct{}, 1)}
260+
select {
261+
case d.discoverQ <- disc:
262+
case <-d.p.ctx.Done():
263+
return false
264+
case <-ctx.Done():
265+
return false
266+
}
267+
268+
select {
269+
case <-disc.done:
270+
case <-d.p.ctx.Done():
271+
return false
272+
case <-ctx.Done():
273+
return false
274+
}
275+
276+
t.Reset(time.Millisecond * 100)
277+
select {
278+
case <-t.C:
279+
case <-d.p.ctx.Done():
280+
return false
281+
case <-ctx.Done():
282+
return false
283+
}
284+
}
285+
}
286+
287+
func (d *discover) handleDiscovery(ctx context.Context, topic string, opts []discovery.Option) {
288+
discoverCtx, cancel := context.WithTimeout(ctx, time.Second*10)
289+
defer cancel()
290+
291+
peerCh, err := d.discovery.FindPeers(discoverCtx, topic, opts...)
292+
if err != nil {
293+
log.Debugf("error finding peers for topic %s: %v", topic, err)
294+
return
295+
}
296+
297+
d.connector.Connect(ctx, peerCh)
298+
}
299+
300+
type discoverReq struct {
301+
topic string
302+
opts []discovery.Option
303+
done chan struct{}
304+
}
305+
306+
type pubSubDiscovery struct {
307+
discovery.Discovery
308+
opts []discovery.Option
309+
}
310+
311+
func (d *pubSubDiscovery) Advertise(ctx context.Context, ns string, opts ...discovery.Option) (time.Duration, error) {
312+
return d.Discovery.Advertise(ctx, "floodsub:"+ns, append(opts, d.opts...)...)
313+
}
314+
315+
func (d *pubSubDiscovery) FindPeers(ctx context.Context, ns string, opts ...discovery.Option) (<-chan peer.AddrInfo, error) {
316+
return d.Discovery.FindPeers(ctx, "floodsub:"+ns, append(opts, d.opts...)...)
317+
}
318+
319+
// WithDiscoveryOpts passes libp2p Discovery options into the PubSub discovery subsystem
320+
func WithDiscoveryOpts(opts ...discovery.Option) DiscoverOpt {
321+
return func(d *discoverOptions) error {
322+
d.opts = opts
323+
return nil
324+
}
325+
}
326+
327+
// BackoffConnectorFactory creates a BackoffConnector that is attached to a given host
328+
type BackoffConnectorFactory func(host host.Host) (*discimpl.BackoffConnector, error)
329+
330+
// WithDiscoverConnector adds a custom connector that deals with how the discovery subsystem connects to peers
331+
func WithDiscoverConnector(connFactory BackoffConnectorFactory) DiscoverOpt {
332+
return func(d *discoverOptions) error {
333+
d.connFactory = connFactory
334+
return nil
335+
}
336+
}

0 commit comments

Comments
 (0)