Skip to content
This repository was archived by the owner on Oct 11, 2024. It is now read-only.

Commit 06c181c

Browse files
albrowfabioberger
andauthored
Improve algorithm for sharing old orders (#692)
* Remove old algorithm for order sharing via GossipSub * WIP initial implementation * Improve test for p2p/ordersync * Implement ordersync.Provider in core package * Make new core tests serial * Wait for nodes to exit in core_test * Try running new core tests without the race detector * Only run core serial tests in CI * Add more log spam * Log spam about number of orders requested/provided * More logging * log when orders are invalid * Add additional sleep statement * Change timing for test * Remove some logs * Increase ETH RPC rate limiting limits for tests * Re-enable all tests * Remove more old logs * Move ordersync to core package; create subprotocols * Use BlockchainLifecycle in new core tests * Update core.go with new ordersync logic * Fix some bugs and get integration tests passing * make core_test more robust * Add timeout for ordersync requests and responses * Fix failing tests * Wait for event to be sent before returning from ValidateAndStoreValidOrders * Increase test timeouts * Add manual delay in browser integraiton tests * Add note about timing hack in browser integration tests * Remove old files * Add a lot more comments * Break out of for loop in GetOrders when minPeers reached * Add appropriate peer score events * Add rate limiting * Remove old constants * Update core/ordersync/ordersync.go Co-Authored-By: Fabio B <[email protected]> * Update core/ordersync_subprotocols.go Co-Authored-By: Fabio B <[email protected]> * Update core/ordersync_subprotocols.go Co-Authored-By: Fabio B <[email protected]> * Add missing newline * Fix assertion in core_test.go * Rename some methods of ordersync.Subprotocol * Increase minPeers to 5 * Take filters into account in ordersync protocol * Update some comments * Add log message when receiving valid orders via ordersync * Update core/ordersync_subprotocols.go Co-Authored-By: Fabio B <[email protected]> * Use same log message for receiving new order from peer * Update core/ordersync_subprotocols.go Co-Authored-By: Fabio B <[email protected]> * Return errors in waitForResponse and waitForRequest * Add note to changelog Co-authored-by: Fabio B <[email protected]>
1 parent ca2cea7 commit 06c181c

File tree

20 files changed

+1029
-493
lines changed

20 files changed

+1029
-493
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ This changelog is a work in progress and may contain notes for versions which ha
77
### Breaking changes 🛠
88

99
- As a result of implementing custom order filters, some of the code Mesh uses under the hood to share orders with peers has changed. As a result this version of Mesh cannot share orders with any older versions and vice versa ([#630](https://github.com/0xProject/0x-mesh/pull/630)).
10+
- Implemented a new protocol for sharing existing orders with peers. This will drastically reduce bandwidth and CPU usage and increase the speed at which _new_ orders are propagated. ([#692](https://github.com/0xProject/0x-mesh/pull/692)).
1011
- Rename `RPC_ADDR` to `WS_RPC_ADDR` since we now support both WS and HTTP JSON-RPC endpoints. ([#658](https://github.com/0xProject/0x-mesh/pull/658))
1112

1213
### Features ✅

Makefile

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,10 +40,12 @@ test-all: test-go test-wasm-node test-wasm-browser
4040
.PHONY: test-go
4141
test-go: test-go-parallel test-go-serial test-browser-conversion
4242

43+
4344
.PHONY: test-go-parallel
4445
test-go-parallel:
4546
go test ./... -race -timeout 30s
4647

48+
4749
.PHONY: test-go-serial
4850
test-go-serial:
4951
go test ./zeroex/ordervalidator ./zeroex/orderwatch ./core -race -timeout 90s -p=1 --serial
@@ -54,7 +56,7 @@ test-browser-integration:
5456

5557
.PHONY: test-browser-conversion
5658
test-browser-conversion:
57-
go test ./browser/go/conversion-test -timeout 120s --enable-browser-conversion-tests -run BrowserConversions
59+
go test ./browser/go/conversion-test -timeout 185s --enable-browser-conversion-tests -run BrowserConversions
5860

5961
.PHONY: test-wasm-node
6062
test-wasm-node:

browser/go/conversion-test/conversion_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ func TestBrowserConversions(t *testing.T) {
5050

5151
// Declare a context that will be used for all child processes, servers, and
5252
// other goroutines.
53-
ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second)
53+
ctx, cancel := context.WithTimeout(context.Background(), 180*time.Second)
5454
ctx, _ = chromedp.NewContext(ctx, chromedp.WithErrorf(t.Errorf))
5555
defer cancel()
5656

core/core.go

Lines changed: 56 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import (
1515

1616
"github.com/0xProject/0x-mesh/common/types"
1717
"github.com/0xProject/0x-mesh/constants"
18+
"github.com/0xProject/0x-mesh/core/ordersync"
1819
"github.com/0xProject/0x-mesh/db"
1920
"github.com/0xProject/0x-mesh/encoding"
2021
"github.com/0xProject/0x-mesh/ethereum"
@@ -61,6 +62,10 @@ const (
6162
// logStatsInterval is how often to log stats for this node.
6263
logStatsInterval = 5 * time.Minute
6364
version = "development"
65+
// ordersyncMinPeers is the minimum amount of peers to receive orders from
66+
// before considering the ordersync process finished.
67+
ordersyncMinPeers = 5
68+
paginationSubprotocolPerPage = 500
6469
)
6570

6671
// Note(albrow): The Config type is currently copied to browser/ts/index.ts. We
@@ -193,8 +198,8 @@ type App struct {
193198
idToSnapshotInfo map[string]snapshotInfo
194199
ethRPCRateLimiter ratelimit.RateLimiter
195200
ethRPCClient ethrpcclient.Client
196-
orderSelector *orderSelector
197201
db *meshdb.MeshDB
202+
ordersyncService *ordersync.Service
198203

199204
// started is closed to signal that the App has been started. Some methods
200205
// will block until after the App is started.
@@ -348,11 +353,6 @@ func New(config Config) (*App, error) {
348353

349354
// Initialize remaining fields.
350355
snapshotExpirationWatcher := expirationwatch.New()
351-
orderSelector := &orderSelector{
352-
topic: orderFilter.Topic(),
353-
nextOffset: 0,
354-
db: meshDB,
355-
}
356356

357357
app := &App{
358358
started: make(chan struct{}),
@@ -366,7 +366,6 @@ func New(config Config) (*App, error) {
366366
orderFilter: orderFilter,
367367
snapshotExpirationWatcher: snapshotExpirationWatcher,
368368
idToSnapshotInfo: map[string]snapshotInfo{},
369-
orderSelector: orderSelector,
370369
ethRPCRateLimiter: ethRPCRateLimiter,
371370
ethRPCClient: ethClient,
372371
db: meshDB,
@@ -476,6 +475,9 @@ func (app *App) Start(ctx context.Context) error {
476475
wg.Add(1)
477476
go func() {
478477
defer wg.Done()
478+
defer func() {
479+
log.Debug("closing app.db")
480+
}()
479481
<-innerCtx.Done()
480482
app.db.Close()
481483
}()
@@ -485,13 +487,19 @@ func (app *App) Start(ctx context.Context) error {
485487
wg.Add(1)
486488
go func() {
487489
defer wg.Done()
490+
defer func() {
491+
log.Debug("closing eth RPC rate limiter")
492+
}()
488493
ethRPCRateLimiterErrChan <- app.ethRPCRateLimiter.Start(innerCtx, rateLimiterCheckpointInterval)
489494
}()
490495

491496
// Set up the snapshot expiration watcher pruning logic
492497
wg.Add(1)
493498
go func() {
494499
defer wg.Done()
500+
defer func() {
501+
log.Debug("closing snapshot expiration watcher")
502+
}()
495503
ticker := time.NewTicker(expirationPollingInterval)
496504
for {
497505
select {
@@ -513,6 +521,9 @@ func (app *App) Start(ctx context.Context) error {
513521
wg.Add(1)
514522
go func() {
515523
defer wg.Done()
524+
defer func() {
525+
log.Debug("closing order watcher")
526+
}()
516527
log.Info("starting order watcher")
517528
orderWatcherErrChan <- app.orderWatcher.Watch(innerCtx)
518529
}()
@@ -528,6 +539,9 @@ func (app *App) Start(ctx context.Context) error {
528539
wg.Add(1)
529540
go func() {
530541
defer wg.Done()
542+
defer func() {
543+
log.Debug("closing block watcher")
544+
}()
531545
log.Info("starting block watcher")
532546
blockWatcherErrChan <- app.blockWatcher.Watch(innerCtx)
533547
}()
@@ -580,11 +594,31 @@ func (app *App) Start(ctx context.Context) error {
580594
return err
581595
}
582596

597+
// Register and start ordersync service.
598+
ordersyncSubprotocols := []ordersync.Subprotocol{
599+
NewFilteredPaginationSubprotocol(app, paginationSubprotocolPerPage),
600+
}
601+
app.ordersyncService = ordersync.New(innerCtx, app.node, ordersyncSubprotocols)
602+
orderSyncErrChan := make(chan error, 1)
603+
wg.Add(1)
604+
go func() {
605+
defer wg.Done()
606+
defer func() {
607+
log.Debug("closing ordersync service")
608+
}()
609+
if err := app.ordersyncService.GetOrders(innerCtx, ordersyncMinPeers); err != nil {
610+
orderSyncErrChan <- err
611+
}
612+
}()
613+
583614
// Start the p2p node.
584615
p2pErrChan := make(chan error, 1)
585616
wg.Add(1)
586617
go func() {
587618
defer wg.Done()
619+
defer func() {
620+
log.Debug("closing p2p node")
621+
}()
588622
addrs := app.node.Multiaddrs()
589623
log.WithFields(map[string]interface{}{
590624
"addresses": addrs,
@@ -594,6 +628,9 @@ func (app *App) Start(ctx context.Context) error {
594628
wg.Add(1)
595629
go func() {
596630
defer wg.Done()
631+
defer func() {
632+
log.Debug("closing new addrs checker")
633+
}()
597634
app.periodicallyCheckForNewAddrs(innerCtx, addrs)
598635
}()
599636

@@ -604,6 +641,9 @@ func (app *App) Start(ctx context.Context) error {
604641
wg.Add(1)
605642
go func() {
606643
defer wg.Done()
644+
defer func() {
645+
log.Debug("closing periodic stats logger")
646+
}()
607647
app.periodicallyLogStats(innerCtx)
608648
}()
609649

@@ -628,14 +668,20 @@ func (app *App) Start(ctx context.Context) error {
628668
return err
629669
}
630670
case err := <-blockWatcherErrChan:
631-
log.WithError(err).Error("block watcher exited with error")
632671
if err != nil {
672+
log.WithError(err).Error("block watcher exited with error")
633673
cancel()
634674
return err
635675
}
636676
case err := <-ethRPCRateLimiterErrChan:
637-
log.WithError(err).Error("ETH JSON-RPC ratelimiter exited with error")
638677
if err != nil {
678+
log.WithError(err).Error("ETH JSON-RPC ratelimiter exited with error")
679+
cancel()
680+
return err
681+
}
682+
case err := <-orderSyncErrChan:
683+
if err != nil {
684+
log.WithError(err).Error("ordersync service exited with error")
639685
cancel()
640686
return err
641687
}
@@ -644,6 +690,7 @@ func (app *App) Start(ctx context.Context) error {
644690
// Wait for all goroutines to exit. If we reached here it means we are done
645691
// and there are no errors.
646692
wg.Wait()
693+
log.Debug("app successfully closed")
647694
return nil
648695
}
649696

0 commit comments

Comments
 (0)