Skip to content
This repository was archived by the owner on Aug 23, 2019. It is now read-only.

Commit 8fe69a5

Browse files
committed
feat: handle active/passive relays
1 parent ad93dfc commit 8fe69a5

File tree

3 files changed

+108
-69
lines changed

3 files changed

+108
-69
lines changed

src/circuit/dialer.js

Lines changed: 39 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ class Dialer {
5757
log.err(err)
5858
return cb(err)
5959
}
60+
6061
dstConn.setInnerConn(conn)
6162
cb(null, dstConn)
6263
})
@@ -75,37 +76,29 @@ class Dialer {
7576
cb = once(cb || (() => {}))
7677

7778
if (!this.relayPeers.get(this.utils.getB58String(peer))) {
78-
return this._dialRelay(peer, (err, streamHandler) => {
79-
if (err) {
80-
return log.err(err)
81-
}
82-
83-
streamHandler.write(proto.CircuitRelay.encode({
79+
let streamHandler
80+
waterfall([
81+
(wCb) => this._dialRelay(peer, wCb),
82+
(sh, wCb) => {
83+
streamHandler = sh
84+
wCb()
85+
},
86+
(wCb) => streamHandler.write(proto.CircuitRelay.encode({
8487
type: proto.CircuitRelay.Type.CAN_HOP
85-
}), (err) => {
86-
if (err) {
87-
log.err(err)
88-
return cb(err)
89-
}
88+
}), wCb),
89+
(wCb) => streamHandler.read(wCb),
90+
(msg, wCb) => {
91+
const response = proto.CircuitRelay.decode(msg)
9092

91-
streamHandler.read((err, msg) => {
92-
if (err) {
93-
log.err(err)
94-
return cb(err)
95-
}
96-
97-
const response = proto.CircuitRelay.decode(msg)
98-
99-
if (response.code !== proto.CircuitRelay.Status.SUCCESS) {
100-
return log(`HOP not supported, skipping - ${this.utils.getB58String(peer)}`)
101-
}
93+
if (response.code !== proto.CircuitRelay.Status.SUCCESS) {
94+
return log(`HOP not supported, skipping - ${this.utils.getB58String(peer)}`)
95+
}
10296

103-
log(`HOP supported adding as relay - ${this.utils.getB58String(peer)}`)
104-
this.relayPeers.set(this.utils.getB58String(peer), peer)
105-
cb(null)
106-
})
107-
})
108-
})
97+
log(`HOP supported adding as relay - ${this.utils.getB58String(peer)}`)
98+
this.relayPeers.set(this.utils.getB58String(peer), peer)
99+
wCb(null)
100+
}
101+
], cb)
109102
}
110103

111104
return cb(null)
@@ -177,14 +170,19 @@ class Dialer {
177170
dstMa = multiaddr(dstMa)
178171

179172
const srcMas = this.swarm._peerInfo.multiaddrs.toArray()
173+
let streamHandler
180174
waterfall([
181175
(cb) => {
182176
if (relay instanceof Connection) {
183177
return cb(null, new StreamHandler(relay))
184178
}
185179
return this._dialRelay(this.utils.peerInfoFromMa(relay), cb)
186180
},
187-
(streamHandler, cb) => {
181+
(sh, cb) => {
182+
streamHandler = sh
183+
cb(null)
184+
},
185+
(cb) => {
188186
log(`negotiating relay for peer ${dstMa.getPeerId()}`)
189187
streamHandler.write(
190188
proto.CircuitRelay.encode({
@@ -197,35 +195,21 @@ class Dialer {
197195
id: PeerId.createFromB58String(dstMa.getPeerId()).id,
198196
addrs: [dstMa.buffer]
199197
}
200-
}),
201-
(err) => {
202-
if (err) {
203-
log.err(err)
204-
return cb(err)
205-
}
206-
207-
cb(null, streamHandler)
208-
})
198+
}), cb)
209199
},
210-
(streamHandler, cb) => {
211-
streamHandler.read((err, msg) => {
212-
if (err) {
213-
log.err(err)
214-
return cb(err)
215-
}
216-
217-
const message = proto.CircuitRelay.decode(msg)
218-
if (message.type !== proto.CircuitRelay.Type.STATUS) {
219-
return cb(new Error(`Got invalid message type - ` +
220-
`expected ${proto.CircuitRelay.Type.STATUS} got ${message.type}`))
221-
}
200+
(cb) => streamHandler.read(cb),
201+
(msg, cb) => {
202+
const message = proto.CircuitRelay.decode(msg)
203+
if (message.type !== proto.CircuitRelay.Type.STATUS) {
204+
return cb(new Error(`Got invalid message type - ` +
205+
`expected ${proto.CircuitRelay.Type.STATUS} got ${message.type}`))
206+
}
222207

223-
if (message.code !== proto.CircuitRelay.Status.SUCCESS) {
224-
return cb(new Error(`Got ${message.code} error code trying to dial over relay`))
225-
}
208+
if (message.code !== proto.CircuitRelay.Status.SUCCESS) {
209+
return cb(new Error(`Got ${message.code} error code trying to dial over relay`))
210+
}
226211

227-
cb(null, new Connection(streamHandler.rest()))
228-
})
212+
cb(null, new Connection(streamHandler.rest()))
229213
}
230214
], callback)
231215
}

src/circuit/hop.js

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -65,13 +65,15 @@ class Hop extends EE {
6565
return this.utils.writeResponse(streamHandler, proto.CircuitRelay.Status.SUCCESS)
6666
}
6767

68-
if (message.dstPeer.id.toString() === this.peerInfo.id.toB58String()) {
68+
const srcPeerId = PeerId.createFromBytes(message.dstPeer.id)
69+
if (srcPeerId.toB58String() === this.peerInfo.id.toB58String()) {
6970
return this.utils.writeResponse(streamHandler, proto.CircuitRelay.Status.HOP_CANT_RELAY_TO_SELF)
7071
}
7172

73+
const dstPeerId = PeerId.createFromBytes(message.dstPeer.id).toB58String()
7274
if (!message.dstPeer.addrs.length) {
7375
// TODO: use encapsulate here
74-
const addr = multiaddr(`/p2p-circuit/ipfs/${PeerId.createFromBytes(message.dstPeer.id).toB58String()}`).buffer
76+
const addr = multiaddr(`/p2p-circuit/ipfs/${dstPeerId}`).buffer
7577
message.dstPeer.addrs.push(addr)
7678
}
7779

@@ -80,6 +82,10 @@ class Hop extends EE {
8082
return log(err)
8183
}
8284

85+
if (!this.swarm.conns[dstPeerId] && !this.active) {
86+
return this.utils.writeResponse(streamHandler, proto.CircuitRelay.Status.HOP_NO_CONN_TO_DST)
87+
}
88+
8389
return this._circuit(streamHandler.rest(), message, (err) => {
8490
if (err) {
8591
log.err(err)

test/hop.spec.js

Lines changed: 61 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -17,15 +17,15 @@ const sinon = require('sinon')
1717
const expect = require('chai').expect
1818

1919
describe('relay', function () {
20-
describe(`handle circuit requests`, function () {
20+
describe(`should handle circuit requests`, function () {
2121
let relay
2222
let swarm
2323
let fromConn
2424
let stream
2525
let shake
2626

2727
beforeEach(function (done) {
28-
stream = handshake({timeout: 1000 * 60})
28+
stream = handshake({ timeout: 1000 * 60 })
2929
shake = stream.handshake
3030
fromConn = new Connection(stream)
3131
fromConn.setPeerInfo(new PeerInfo(PeerId.createFromB58String('QmQWqGdndSpAkxfk8iyiJyz3XXGkrDNujvc8vEst3baubA')))
@@ -38,14 +38,16 @@ describe('relay', function () {
3838
swarm = {
3939
_peerInfo: peer,
4040
conns: {
41-
QmSswe1dCFRepmhjAMR5VfHeokGLcvVggkuDJm7RMfJSrE: new Connection()
41+
QmSswe1dCFRepmhjAMR5VfHeokGLcvVggkuDJm7RMfJSrE: new Connection(),
42+
QmQWqGdndSpAkxfk8iyiJyz3XXGkrDNujvc8vEst3baubA: new Connection(),
43+
QmQvM2mpqkjyXWbTHSUidUAWN26GgdMphTh9iGDdjgVXCy: new Connection()
4244
}
4345
}
4446

4547
cb()
4648
}
4749
], () => {
48-
relay = new Hop(swarm, {enabled: true})
50+
relay = new Hop(swarm, { enabled: true })
4951
relay._circuit = sinon.stub()
5052
relay._circuit.callsArg(2, null, new Connection())
5153
done()
@@ -56,15 +58,15 @@ describe('relay', function () {
5658
relay._circuit.reset()
5759
})
5860

59-
it(`handle a valid circuit request`, function (done) {
61+
it(`should handle a valid circuit request`, function (done) {
6062
let relayMsg = {
6163
type: proto.CircuitRelay.Type.HOP,
6264
srcPeer: {
63-
id: Buffer.from(`QmSswe1dCFRepmhjAMR5VfHeokGLcvVggkuDJm7RMfJSrE`),
65+
id: PeerId.createFromB58String(`QmSswe1dCFRepmhjAMR5VfHeokGLcvVggkuDJm7RMfJSrE`).id,
6466
addrs: [multiaddr(`/ipfs/QmSswe1dCFRepmhjAMR5VfHeokGLcvVggkuDJm7RMfJSrE`).buffer]
6567
},
6668
dstPeer: {
67-
id: Buffer.from(`QmQWqGdndSpAkxfk8iyiJyz3XXGkrDNujvc8vEst3baubA`),
69+
id: PeerId.createFromB58String(`QmQWqGdndSpAkxfk8iyiJyz3XXGkrDNujvc8vEst3baubA`).id,
6870
addrs: [multiaddr(`/ipfs/QmQWqGdndSpAkxfk8iyiJyz3XXGkrDNujvc8vEst3baubA`).buffer]
6971
}
7072
}
@@ -77,15 +79,62 @@ describe('relay', function () {
7779
relay.handle(relayMsg, new StreamHandler(fromConn))
7880
})
7981

82+
it(`should handle a request to passive circuit`, function (done) {
83+
let relayMsg = {
84+
type: proto.CircuitRelay.Type.HOP,
85+
srcPeer: {
86+
id: PeerId.createFromB58String(`QmQWqGdndSpAkxfk8iyiJyz3XXGkrDNujvc8vEst3baubA`).id,
87+
addrs: [multiaddr(`/ipfs/QmQWqGdndSpAkxfk8iyiJyz3XXGkrDNujvc8vEst3baubA`).buffer]
88+
},
89+
dstPeer: {
90+
id: PeerId.createFromB58String(`QmYJjAri5soV8RbeQcHaYYcTAYTET17QTvcoFMyKvRDTXe`).id,
91+
addrs: [multiaddr(`/ipfs/QmYJjAri5soV8RbeQcHaYYcTAYTET17QTvcoFMyKvRDTXe`).buffer]
92+
}
93+
}
94+
95+
relay.active = false
96+
lp.decodeFromReader(shake, (err, msg) => {
97+
expect(err).to.be.null
98+
99+
const response = proto.CircuitRelay.decode(msg)
100+
expect(response.code).to.equal(proto.CircuitRelay.Status.HOP_NO_CONN_TO_DST)
101+
expect(response.type).to.equal(proto.CircuitRelay.Type.STATUS)
102+
done()
103+
})
104+
105+
relay.handle(relayMsg, new StreamHandler(fromConn))
106+
})
107+
108+
it(`should handle a request to active circuit`, function (done) {
109+
let relayMsg = {
110+
type: proto.CircuitRelay.Type.HOP,
111+
srcPeer: {
112+
id: PeerId.createFromB58String(`QmQWqGdndSpAkxfk8iyiJyz3XXGkrDNujvc8vEst3baubA`).id,
113+
addrs: [multiaddr(`/ipfs/QmQWqGdndSpAkxfk8iyiJyz3XXGkrDNujvc8vEst3baubA`).buffer]
114+
},
115+
dstPeer: {
116+
id: PeerId.createFromB58String(`QmYJjAri5soV8RbeQcHaYYcTAYTET17QTvcoFMyKvRDTXe`).id,
117+
addrs: [multiaddr(`/ipfs/QmYJjAri5soV8RbeQcHaYYcTAYTET17QTvcoFMyKvRDTXe`).buffer]
118+
}
119+
}
120+
121+
relay.active = true
122+
relay.on('circuit:success', () => {
123+
expect(relay._circuit.calledWith(sinon.match.any, relayMsg)).to.be.ok
124+
done()
125+
})
126+
relay.handle(relayMsg, new StreamHandler(fromConn))
127+
})
128+
80129
it(`not dial to self`, function (done) {
81130
let relayMsg = {
82131
type: proto.CircuitRelay.Type.HOP,
83132
srcPeer: {
84-
id: Buffer.from(`QmSswe1dCFRepmhjAMR5VfHeokGLcvVggkuDJm7RMfJSrE`),
133+
id: PeerId.createFromB58String(`QmSswe1dCFRepmhjAMR5VfHeokGLcvVggkuDJm7RMfJSrE`).id,
85134
addrs: [multiaddr(`/ipfs/QmSswe1dCFRepmhjAMR5VfHeokGLcvVggkuDJm7RMfJSrE`).buffer]
86135
},
87136
dstPeer: {
88-
id: Buffer.from(`QmQvM2mpqkjyXWbTHSUidUAWN26GgdMphTh9iGDdjgVXCy`),
137+
id: PeerId.createFromB58String(`QmQvM2mpqkjyXWbTHSUidUAWN26GgdMphTh9iGDdjgVXCy`).id,
89138
addrs: [multiaddr(`/ipfs/QmQvM2mpqkjyXWbTHSUidUAWN26GgdMphTh9iGDdjgVXCy`).buffer]
90139
}
91140
}
@@ -110,7 +159,7 @@ describe('relay', function () {
110159
addrs: [`sdfkjsdnfkjdsb`]
111160
},
112161
dstPeer: {
113-
id: Buffer.from(`QmQWqGdndSpAkxfk8iyiJyz3XXGkrDNujvc8vEst3baubA`),
162+
id: PeerId.createFromB58String(`QmQWqGdndSpAkxfk8iyiJyz3XXGkrDNujvc8vEst3baubA`).id,
114163
addrs: [multiaddr(`/ipfs/QmQWqGdndSpAkxfk8iyiJyz3XXGkrDNujvc8vEst3baubA`).buffer]
115164
}
116165
}
@@ -131,11 +180,11 @@ describe('relay', function () {
131180
let relayMsg = {
132181
type: proto.CircuitRelay.Type.HOP,
133182
srcPeer: {
134-
id: Buffer.from(`QmQWqGdndSpAkxfk8iyiJyz3XXGkrDNujvc8vEst3baubA`),
183+
id: PeerId.createFromB58String(`QmQWqGdndSpAkxfk8iyiJyz3XXGkrDNujvc8vEst3baubA`).id,
135184
addrs: [multiaddr(`/ipfs/QmQWqGdndSpAkxfk8iyiJyz3XXGkrDNujvc8vEst3baubA`).buffer]
136185
},
137186
dstPeer: {
138-
id: `sdfkjsdnfkjdsb`,
187+
id: PeerId.createFromB58String(`QmSswe1dCFRepmhjAMR5VfHeokGLcvVggkuDJm7RMfJSrE`).id,
139188
addrs: [`sdfkjsdnfkjdsb`]
140189
}
141190
}

0 commit comments

Comments
 (0)