Skip to content

Commit bc1b51f

Browse files
authored
Restrict WAL usage to node.go (#4201)
* Revert "synchronize access to wal storage from chain and node (#3919)" This reverts commit 2678417. Signed-off-by: Yacov Manevich <[email protected]> * Restrict WAL usage to node.go This commit moves the WAL sync from the goroutine of the chain, to the goroutine that performs all the rest of the WAL operations, in order to prevent concurrent invocation of the WAL which is not thread safe. Signed-off-by: Yacov Manevich <[email protected]> --------- Signed-off-by: Yacov Manevich <[email protected]>
1 parent 4a179d3 commit bc1b51f

File tree

3 files changed

+20
-10
lines changed

3 files changed

+20
-10
lines changed

orderer/consensus/etcdraft/chain.go

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1179,10 +1179,8 @@ func (c *Chain) apply(ents []raftpb.Entry) {
11791179
continue
11801180
}
11811181

1182-
// persist the WAL entries into disk
1183-
c.Node.storage.WALSyncC <- struct{}{}
1184-
11851182
c.confState = *c.Node.ApplyConfChange(cc)
1183+
11861184
switch cc.Type {
11871185
case raftpb.ConfChangeAddNode:
11881186
c.logger.Infof("Applied config change to add node %d, current nodes in channel: %+v", cc.NodeID, c.confState.Voters)

orderer/consensus/etcdraft/node.go

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -153,6 +153,7 @@ func (n *node) run(campaign bool) {
153153

154154
// skip empty apply
155155
if len(rd.CommittedEntries) != 0 || rd.SoftState != nil {
156+
n.maybeSyncWAL(rd.CommittedEntries)
156157
n.chain.applyC <- apply{rd.CommittedEntries, rd.SoftState}
157158
}
158159

@@ -171,11 +172,6 @@ func (n *node) run(campaign bool) {
171172
// to the followers and them writing to their disks. Check 10.2.1 in thesis
172173
n.send(rd.Messages)
173174

174-
case <-n.storage.WALSyncC:
175-
if err := n.storage.Sync(); err != nil {
176-
n.logger.Warnf("Failed to sync raft log, error: %s", err)
177-
}
178-
179175
case <-n.chain.haltC:
180176
raftTicker.Stop()
181177
n.Stop()
@@ -224,6 +220,24 @@ func (n *node) send(msgs []raftpb.Message) {
224220
}
225221
}
226222

223+
func (n *node) maybeSyncWAL(entries []raftpb.Entry) {
224+
allNormal := true
225+
for _, entry := range entries {
226+
if entry.Type == raftpb.EntryNormal {
227+
continue
228+
}
229+
allNormal = false
230+
}
231+
232+
if allNormal {
233+
return
234+
}
235+
236+
if err := n.storage.Sync(); err != nil {
237+
n.logger.Errorf("Failed to sync raft log, error: %s", err)
238+
}
239+
}
240+
227241
// abdicateLeadership picks a node that is recently active, and attempts to transfer leadership to it.
228242
// Blocks until leadership transfer happens or when a timeout expires.
229243
// Returns error upon failure.

orderer/consensus/etcdraft/storage.go

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,6 @@ type RaftStorage struct {
5959

6060
// a queue that keeps track of indices of snapshots on disk
6161
snapshotIndex []uint64
62-
WALSyncC chan struct{}
6362
}
6463

6564
// CreateStorage attempts to create a storage to persist etcd/raft data.
@@ -114,7 +113,6 @@ func CreateStorage(
114113
walDir: walDir,
115114
snapDir: snapDir,
116115
snapshotIndex: ListSnapshots(lg, snapDir),
117-
WALSyncC: make(chan struct{}),
118116
}, nil
119117
}
120118

0 commit comments

Comments
 (0)