Skip to content

Commit a4ee944

Browse files
committed
cmd/blsync, beacon/light: standalone beacon light sync tool
1 parent 3a5acee commit a4ee944

23 files changed

+5114
-9
lines changed

beacon/light/api/light_api.go

Lines changed: 675 additions & 0 deletions
Large diffs are not rendered by default.

beacon/light/api/syncer.go

Lines changed: 264 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,264 @@
1+
// Copyright 2022 The go-ethereum Authors
2+
// This file is part of the go-ethereum library.
3+
//
4+
// The go-ethereum library is free software: you can redistribute it and/or modify
5+
// it under the terms of the GNU Lesser General Public License as published by
6+
// the Free Software Foundation, either version 3 of the License, or
7+
// (at your option) any later version.
8+
//
9+
// The go-ethereum library is distributed in the hope that it will be useful,
10+
// but WITHOUT ANY WARRANTY; without even the implied warranty of
11+
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12+
// GNU Lesser General Public License for more details.
13+
//
14+
// You should have received a copy of the GNU Lesser General Public License
15+
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
16+
17+
package api
18+
19+
import (
20+
"context"
21+
"errors"
22+
"time"
23+
24+
"github.com/ethereum/go-ethereum/beacon/light/sync"
25+
"github.com/ethereum/go-ethereum/beacon/light/types"
26+
"github.com/ethereum/go-ethereum/common"
27+
"github.com/ethereum/go-ethereum/common/lru"
28+
"github.com/ethereum/go-ethereum/log"
29+
)
30+
31+
const (
32+
headPollFrequency = time.Millisecond * 200
33+
headPollCount = 50
34+
maxRequest = 8
35+
)
36+
37+
// committee update syncing is initiated in each period for each syncPeriodOffsets[i]
38+
// when slot (period+1)*params.SyncPeriodLength+syncPeriodOffsets[i] has been reached.
39+
// This ensures that a close-to-best update for each period can be synced and
40+
// propagated well in advance before the next period begins but later (when it's
41+
// very unlikely that even a reorg could change the given period) the absolute
42+
// best update will also be propagated if it's different from the previous one.
43+
var syncPeriodOffsets = []int{-256, -16, 64}
44+
45+
// CommitteeSyncer syncs committee updates and signed heads from BeaconLightApi
46+
// to CommitteeTracker
47+
type CommitteeSyncer struct {
48+
api *BeaconLightApi
49+
50+
genesisData sync.GenesisData
51+
checkpointPeriod uint64
52+
checkpointCommittee []byte
53+
committeeTracker *sync.CommitteeTracker
54+
55+
lastAdvertisedPeriod uint64
56+
lastPeriodOffset int
57+
58+
updateCache *lru.Cache[uint64, types.LightClientUpdate]
59+
committeeCache *lru.Cache[uint64, []byte]
60+
closeCh chan struct{}
61+
stopFn func()
62+
}
63+
64+
// NewCommitteeSyncer creates a new CommitteeSyncer
65+
// Note: genesisData is only needed when light syncing (using GetInitData for bootstrap)
66+
func NewCommitteeSyncer(api *BeaconLightApi, genesisData sync.GenesisData) *CommitteeSyncer {
67+
return &CommitteeSyncer{
68+
api: api,
69+
genesisData: genesisData,
70+
closeCh: make(chan struct{}),
71+
updateCache: lru.NewCache[uint64, types.LightClientUpdate](maxRequest),
72+
committeeCache: lru.NewCache[uint64, []byte](maxRequest),
73+
}
74+
}
75+
76+
// Start starts the syncing of the given CommitteeTracker
77+
func (cs *CommitteeSyncer) Start(committeeTracker *sync.CommitteeTracker) {
78+
cs.committeeTracker = committeeTracker
79+
committeeTracker.SyncWithPeer(cs, nil)
80+
stopFn := cs.api.StartHeadListener(
81+
func(slot uint64, blockRoot common.Hash) {
82+
cs.updateCache.Purge()
83+
cs.committeeCache.Purge()
84+
cs.syncUpdates(slot, false)
85+
}, func(signedHead sync.SignedHead) {
86+
if cs.committeeTracker.AddSignedHeads(cs, []sync.SignedHead{signedHead}) != nil {
87+
cs.syncUpdates(signedHead.Header.Slot, true)
88+
if err := cs.committeeTracker.AddSignedHeads(cs, []sync.SignedHead{signedHead}); err != nil {
89+
log.Error("Error adding new signed head", "error", err)
90+
}
91+
}
92+
}, func(err error) {
93+
log.Warn("Head event stream error", "err", err)
94+
})
95+
cs.stopFn = stopFn
96+
}
97+
98+
// Stop stops the syncing process
99+
func (cs *CommitteeSyncer) Stop() {
100+
cs.committeeTracker.Disconnect(cs)
101+
close(cs.closeCh)
102+
if cs.stopFn != nil {
103+
cs.stopFn()
104+
}
105+
}
106+
107+
// syncUpdates checks whether one of the syncPeriodOffsets for the latest period
108+
// has been reached by the current head and initiates az update sync if necessary.
109+
// If retry is true then syncing is tried again even if no new syncing offset
110+
// point has been reached.
111+
func (cs *CommitteeSyncer) syncUpdates(slot uint64, retry bool) {
112+
nextPeriod := types.PeriodOfSlot(slot + uint64(-syncPeriodOffsets[0]))
113+
if nextPeriod == 0 {
114+
return
115+
}
116+
var (
117+
nextPeriodStart = types.PeriodStart(nextPeriod)
118+
lastPeriod = nextPeriod - 1
119+
offset = 1
120+
)
121+
for offset < len(syncPeriodOffsets) && slot >= nextPeriodStart+uint64(syncPeriodOffsets[offset]) {
122+
offset++
123+
}
124+
if (retry || lastPeriod != cs.lastAdvertisedPeriod || offset != cs.lastPeriodOffset) && cs.syncUpdatesUntil(lastPeriod) {
125+
cs.lastAdvertisedPeriod, cs.lastPeriodOffset = lastPeriod, offset
126+
}
127+
}
128+
129+
// syncUpdatesUntil queries committee updates that the tracker does not have or
130+
// might have improved since the last query and advertises them to the tracker.
131+
// The tracker can then fetch the actual updates and committees via GetBestCommitteeProofs.
132+
func (cs *CommitteeSyncer) syncUpdatesUntil(lastPeriod uint64) bool {
133+
ptr := int(types.MaxUpdateInfoLength)
134+
if lastPeriod+1 < uint64(ptr) {
135+
ptr = int(lastPeriod + 1)
136+
}
137+
var (
138+
updateInfo = &types.UpdateInfo{
139+
AfterLastPeriod: lastPeriod + 1,
140+
Scores: make(types.UpdateScores, ptr),
141+
}
142+
localNextPeriod = cs.committeeTracker.NextPeriod()
143+
period = lastPeriod
144+
)
145+
for {
146+
remoteUpdate, err := cs.getBestUpdate(period)
147+
if err != nil {
148+
break
149+
}
150+
ptr--
151+
updateInfo.Scores[ptr] = remoteUpdate.Score()
152+
if ptr == 0 || period == 0 {
153+
break
154+
}
155+
if period < localNextPeriod {
156+
localUpdate := cs.committeeTracker.GetBestUpdate(period)
157+
if localUpdate == nil || localUpdate.NextSyncCommitteeRoot == remoteUpdate.NextSyncCommitteeRoot {
158+
break
159+
}
160+
}
161+
period--
162+
}
163+
updateInfo.Scores = updateInfo.Scores[ptr:]
164+
log.Info("Fetched committee updates", "localNext", localNextPeriod, "count", len(updateInfo.Scores))
165+
if len(updateInfo.Scores) == 0 {
166+
log.Error("Could not fetch last committee update")
167+
return false
168+
}
169+
select {
170+
case <-cs.committeeTracker.SyncWithPeer(cs, updateInfo):
171+
localNextPeriod = cs.committeeTracker.NextPeriod()
172+
if localNextPeriod <= lastPeriod {
173+
log.Error("Failed to sync all API committee updates", "local next period", localNextPeriod, "remote next period", lastPeriod+1)
174+
}
175+
case <-cs.closeCh:
176+
return false
177+
}
178+
return true
179+
}
180+
181+
// GetBestCommitteeProofs fetches updates and committees for the specified periods
182+
func (cs *CommitteeSyncer) GetBestCommitteeProofs(ctx context.Context, req types.CommitteeRequest) (types.CommitteeReply, error) {
183+
reply := types.CommitteeReply{
184+
Updates: make([]types.LightClientUpdate, len(req.UpdatePeriods)),
185+
Committees: make([][]byte, len(req.CommitteePeriods)),
186+
}
187+
var err error
188+
for i, period := range req.UpdatePeriods {
189+
if reply.Updates[i], err = cs.getBestUpdate(period); err != nil {
190+
return types.CommitteeReply{}, err
191+
}
192+
}
193+
for i, period := range req.CommitteePeriods {
194+
if reply.Committees[i], err = cs.getCommittee(period); err != nil {
195+
return types.CommitteeReply{}, err
196+
}
197+
}
198+
return reply, nil
199+
}
200+
201+
// CanRequest returns true if a request for the given amount of items can be processed
202+
func (cs *CommitteeSyncer) CanRequest(updateCount, committeeCount int) bool {
203+
return updateCount <= maxRequest && committeeCount <= maxRequest
204+
}
205+
206+
// getBestUpdate returns the best update for the given period
207+
func (cs *CommitteeSyncer) getBestUpdate(period uint64) (types.LightClientUpdate, error) {
208+
if c, ok := cs.updateCache.Get(period); ok {
209+
return c, nil
210+
}
211+
update, _, err := cs.getBestUpdateAndCommittee(period)
212+
return update, err
213+
}
214+
215+
// getCommittee returns the committee for the given period
216+
// Note: cannot return committee altair fork period; this is always same as the
217+
// committee of the next period
218+
func (cs *CommitteeSyncer) getCommittee(period uint64) ([]byte, error) {
219+
if period == 0 {
220+
return nil, errors.New("no committee available for period 0")
221+
}
222+
if cs.checkpointCommittee != nil && period == cs.checkpointPeriod {
223+
return cs.checkpointCommittee, nil
224+
}
225+
if c, ok := cs.committeeCache.Get(period); ok {
226+
return c, nil
227+
}
228+
_, committee, err := cs.getBestUpdateAndCommittee(period - 1)
229+
return committee, err
230+
}
231+
232+
// getBestUpdateAndCommittee fetches the best update for period and corresponding
233+
// committee for period+1 and caches the results until a new head is received by
234+
// headPollLoop
235+
func (cs *CommitteeSyncer) getBestUpdateAndCommittee(period uint64) (types.LightClientUpdate, []byte, error) {
236+
update, committee, err := cs.api.GetBestUpdateAndCommittee(period)
237+
if err != nil {
238+
return types.LightClientUpdate{}, nil, err
239+
}
240+
cs.updateCache.Add(period, update)
241+
cs.committeeCache.Add(period+1, committee)
242+
return update, committee, nil
243+
}
244+
245+
// GetInitData fetches the bootstrap data and returns LightClientInitData (the
246+
// corresponding committee is stored so that a subsequent GetBestCommitteeProofs
247+
// can return it when requested)
248+
func (cs *CommitteeSyncer) GetInitData(ctx context.Context, checkpoint common.Hash) (types.Header, sync.LightClientInitData, error) {
249+
if cs.genesisData == (sync.GenesisData{}) {
250+
return types.Header{}, sync.LightClientInitData{}, errors.New("missing genesis data")
251+
}
252+
header, checkpointData, committee, err := cs.api.GetCheckpointData(ctx, checkpoint)
253+
if err != nil {
254+
return types.Header{}, sync.LightClientInitData{}, err
255+
}
256+
cs.checkpointPeriod, cs.checkpointCommittee = checkpointData.Period, committee
257+
return header, sync.LightClientInitData{GenesisData: cs.genesisData, CheckpointData: checkpointData}, nil
258+
}
259+
260+
// ProtocolError is called by the tracker when the BeaconLightApi has provided
261+
// wrong committee updates or signed heads
262+
func (cs *CommitteeSyncer) ProtocolError(description string) {
263+
log.Error("Beacon node API data source delivered wrong reply", "error", description)
264+
}

0 commit comments

Comments
 (0)