Skip to content

Commit 8ff355a

Browse files
authored
support setting decoder concurrency for TransactionPayloadEvent binlog (#1064)
1 parent dec4081 commit 8ff355a

File tree

3 files changed

+13
-1
lines changed

3 files changed

+13
-1
lines changed

replication/binlogsyncer.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,10 @@ type BinlogSyncerConfig struct {
134134
// Only works with MariaDB flavor.
135135
FillZeroLogPos bool
136136

137+
// PayloadDecoderConcurrency is used to control concurrency for decoding TransactionPayloadEvent.
138+
// Default 0, this will be set to GOMAXPROCS.
139+
PayloadDecoderConcurrency int
140+
137141
// SynchronousEventHandler is used for synchronous event handling.
138142
// This should not be used together with StartBackupWithHandler.
139143
// If this is not nil, GetEvent does not need to be called.

replication/parser.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,8 @@ type BinlogParser struct {
4040
ignoreJSONDecodeErr bool
4141
verifyChecksum bool
4242

43+
payloadDecoderConcurrency int
44+
4345
rowsEventDecodeFunc func(*RowsEvent, []byte) error
4446

4547
tableMapOptionalMetaDecodeFunc func([]byte) error
@@ -215,6 +217,10 @@ func (p *BinlogParser) SetFlavor(flavor string) {
215217
p.flavor = flavor
216218
}
217219

220+
func (p *BinlogParser) SetPayloadDecoderConcurrency(concurrency int) {
221+
p.payloadDecoderConcurrency = concurrency
222+
}
223+
218224
func (p *BinlogParser) SetRowsEventDecodeFunc(rowsEventDecodeFunc func(*RowsEvent, []byte) error) {
219225
p.rowsEventDecodeFunc = rowsEventDecodeFunc
220226
}
@@ -456,6 +462,7 @@ func (p *BinlogParser) newRowsEvent(h *EventHeader) *RowsEvent {
456462
func (p *BinlogParser) newTransactionPayloadEvent() *TransactionPayloadEvent {
457463
e := &TransactionPayloadEvent{}
458464
e.format = *p.format
465+
e.concurrency = p.payloadDecoderConcurrency
459466

460467
return e
461468
}

replication/transaction_payload_event.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ const (
2828

2929
type TransactionPayloadEvent struct {
3030
format FormatDescriptionEvent
31+
concurrency int
3132
Size uint64
3233
UncompressedSize uint64
3334
CompressionType uint64
@@ -103,7 +104,7 @@ func (e *TransactionPayloadEvent) decodePayload() error {
103104
e.CompressionType, e.compressionType())
104105
}
105106

106-
decoder, err := zstd.NewReader(nil, zstd.WithDecoderConcurrency(0))
107+
decoder, err := zstd.NewReader(nil, zstd.WithDecoderConcurrency(e.concurrency))
107108
if err != nil {
108109
return err
109110
}

0 commit comments

Comments
 (0)