Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 45 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,51 @@ The migration-verifier periodically persists its change stream’s resume token

The verifier has been observed handling test source write loads of 15,000 writes per second. Real-world performance will vary according to several factors, including network latency, cluster resources, and the verifier node’s resources.

## Change stream lag

Every time the verifier notices a change in a document, it schedules a recheck
of that document. If the changes happen faster than the verifier can schedule
rechecks, then the verifier “lags” the cluster. We measure that lag by
comparing the server-reported cluster time with the time of the most
recently-seen event.

If the lag exceeds a certain “comfortable” threshold, the verifier will warn
in the logs. High lag can cause either of these outcomes:

1. Once writes stop on the source (i.e., during the migration’s cutover),
you’ll have to wait for a longer-than-ideal time for the verifier to recheck
documents until its writes-off timestamp.

2. Sufficiently high verifier lag can exceed the server’s oplog capacity. If
this happens, verification will fail permanently, and you’ll have to restart
verification from the beginning.

### Mitigation

The following may help if you see warnings about change stream lag:

1. Scale up: Run the verifier on a more powerful host.

2. Reduce load: Disable nonessential applications during verification until cutover.

## Recheck generation size

Even if the change stream keeps up with the write load, the verifier may still recheck
the documents more slowly than writes happen on the source. If this happens, you’ll
see recheck generations grow over time.

Unlike change stream lag, this won’t actually endanger the verification. It will, though,
extend downtime during cutover because the final recheck generation will take longer than
it otherwise might.

### Mitigation

1. Scale up. (See above.)

2. Reduce load. (ditto)

3. Make the verifier compare document hashes rather than full documents. See below for details.

## Per-shard verification

If migrating shard-to-shard, you can also verify shard-to-shard to scale verification horizontally. Run 1 verifier per source shard. You can colocate all verifiers’ metadata on the same metadata cluster, but each verifier must use its own database (e.g., `verify90`, `verify1`, …). If that metadata cluster buckles under the load, consider splitting verification across multiple hosts.
Expand Down
75 changes: 75 additions & 0 deletions history/history.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
package history

import (
"slices"
"sync"
"time"
)

// History stores an ordered list of entries, each with a TTL (time-to-live).
// Once an entry expires, it goes away.
//
// This facilitates computation of data flow rates across batches.
type History[T any] struct {
mu sync.RWMutex
ttl time.Duration
logs []Log[T]
}

// Log represents a single entry in a History.
type Log[T any] struct {
At time.Time
Datum T
}

// New creates & returns a new History.
func New[T any](ttl time.Duration) *History[T] {
return &History[T]{
ttl: ttl,
}
}

// Get returns a copy of the History’s (non-expired) elements.
func (h *History[T]) Get() []Log[T] {
h.mu.RLock()
defer h.mu.RUnlock()

now := time.Now()

return slices.Clone(h.logs[h.getFirstValidIdxWhileLocked(now):])
}

// Add augments the History’s Log list. It returns the list’s count of
// (non-expired) elements.
func (h *History[T]) Add(datum T) int {
h.mu.Lock()
defer h.mu.Unlock()

now := time.Now()

h.reapWhileLocked(now)

h.logs = append(h.logs, Log[T]{now, datum})

return len(h.logs)
}

// NB: If all entries are invalid this returns len(logs).
func (h *History[T]) getFirstValidIdxWhileLocked(now time.Time) int {
cutoff := now.Add(-h.ttl)

for i, logItem := range h.logs {
if logItem.At.Before(cutoff) {
continue
}

return i
}

// We only get here if all logs are stale.
return len(h.logs)
}

func (h *History[T]) reapWhileLocked(now time.Time) {
h.logs = h.logs[h.getFirstValidIdxWhileLocked(now):]
}
64 changes: 64 additions & 0 deletions history/history_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package history

import (
"slices"
"testing"
"time"

"github.com/10gen/migration-verifier/mslices"
"github.com/stretchr/testify/assert"
)

func TestHistory(t *testing.T) {
h := New[int](time.Hour)

assert.Equal(t, 1, h.Add(234))
assert.Equal(t, 2, h.Add(234))
assert.Equal(t, 3, h.Add(345))

got := h.Get()
times, data := splitLogs(got)
assert.True(
t,
slices.IsSortedFunc(times, time.Time.Compare),
"times should be increasing",
)
assert.Equal(t, mslices.Of(234, 234, 345), data, "data as expected")

got[0].Datum = 999
got = h.Get()
_, data = splitLogs(got)
assert.Equal(t, mslices.Of(234, 234, 345), data, "slice is copied")
}

func TestHistoryTTL(t *testing.T) {
h := New[int](time.Millisecond)

assert.Equal(t, 1, h.Add(234))
assert.Equal(t, 2, h.Add(234))
assert.Equal(t, 3, h.Add(345))

assert.Eventually(
t,
func() bool {
return len(h.Get()) == 0
},
time.Minute,
time.Millisecond,
"history should expire its entries",
)

assert.Equal(t, 1, h.Add(234), "new record should be the first")
}

func splitLogs[T any](in []Log[T]) ([]time.Time, []T) {
var times []time.Time
var data []T

for _, cur := range in {
times = append(times, cur.At)
data = append(data, cur.Datum)
}

return times, data
}
89 changes: 58 additions & 31 deletions internal/verifier/change_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,21 @@ import (
"fmt"
"time"

"github.com/10gen/migration-verifier/history"
"github.com/10gen/migration-verifier/internal/keystring"
"github.com/10gen/migration-verifier/internal/logger"
"github.com/10gen/migration-verifier/internal/retry"
"github.com/10gen/migration-verifier/internal/types"
"github.com/10gen/migration-verifier/internal/util"
"github.com/10gen/migration-verifier/mbson"
"github.com/10gen/migration-verifier/mslices"
"github.com/10gen/migration-verifier/msync"
"github.com/10gen/migration-verifier/option"
mapset "github.com/deckarep/golang-set/v2"
clone "github.com/huandu/go-clone/generic"
"github.com/pkg/errors"
"github.com/rs/zerolog"
"github.com/samber/lo"
"github.com/samber/mo"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/bson/primitive"
Expand Down Expand Up @@ -93,42 +96,41 @@ type ChangeStreamReader struct {

startAtTs *primitive.Timestamp

lag *msync.TypedAtomic[option.Option[time.Duration]]
lag *msync.TypedAtomic[option.Option[time.Duration]]
batchSizeHistory *history.History[int]

onDDLEvent ddlEventHandling
}

func (verifier *Verifier) initializeChangeStreamReaders() {
verifier.srcChangeStreamReader = &ChangeStreamReader{
readerType: src,
logger: verifier.logger,
namespaces: verifier.srcNamespaces,
metaDB: verifier.metaClient.Database(verifier.metaDBName),
watcherClient: verifier.srcClient,
clusterInfo: *verifier.srcClusterInfo,
changeStreamRunning: false,
changeEventBatchChan: make(chan changeEventBatch),
writesOffTs: util.NewEventual[primitive.Timestamp](),
readerError: util.NewEventual[error](),
handlerError: util.NewEventual[error](),
doneChan: make(chan struct{}),
lag: msync.NewTypedAtomic(option.None[time.Duration]()),
}
verifier.dstChangeStreamReader = &ChangeStreamReader{
readerType: dst,
logger: verifier.logger,
namespaces: verifier.dstNamespaces,
metaDB: verifier.metaClient.Database(verifier.metaDBName),
watcherClient: verifier.dstClient,
clusterInfo: *verifier.dstClusterInfo,
changeStreamRunning: false,
changeEventBatchChan: make(chan changeEventBatch),
writesOffTs: util.NewEventual[primitive.Timestamp](),
readerError: util.NewEventual[error](),
handlerError: util.NewEventual[error](),
doneChan: make(chan struct{}),
lag: msync.NewTypedAtomic(option.None[time.Duration]()),
onDDLEvent: onDDLEventAllow,
srcReader := &ChangeStreamReader{
readerType: src,
namespaces: verifier.srcNamespaces,
watcherClient: verifier.srcClient,
clusterInfo: *verifier.srcClusterInfo,
}
verifier.srcChangeStreamReader = srcReader

dstReader := &ChangeStreamReader{
readerType: dst,
namespaces: verifier.dstNamespaces,
watcherClient: verifier.dstClient,
clusterInfo: *verifier.dstClusterInfo,
onDDLEvent: onDDLEventAllow,
}
verifier.dstChangeStreamReader = dstReader

// Common elements in both readers:
for _, csr := range mslices.Of(srcReader, dstReader) {
csr.logger = verifier.logger
csr.metaDB = verifier.metaClient.Database(verifier.metaDBName)
csr.changeEventBatchChan = make(chan changeEventBatch)
csr.writesOffTs = util.NewEventual[primitive.Timestamp]()
csr.readerError = util.NewEventual[error]()
csr.handlerError = util.NewEventual[error]()
csr.doneChan = make(chan struct{})
csr.lag = msync.NewTypedAtomic(option.None[time.Duration]())
csr.batchSizeHistory = history.New[int](time.Minute)
}
}

Expand Down Expand Up @@ -462,6 +464,8 @@ func (csr *ChangeStreamReader) readAndHandleOneChangeEventBatch(
return nil
}

csr.batchSizeHistory.Add(eventsRead)

if event, has := latestEvent.Get(); has {
csr.logger.Trace().
Stringer("changeStreamReader", csr).
Expand Down Expand Up @@ -786,6 +790,29 @@ func (csr *ChangeStreamReader) GetLag() option.Option[time.Duration] {
return csr.lag.Load()
}

func (csr *ChangeStreamReader) GetEventsPerSecond() option.Option[float64] {
logs := csr.batchSizeHistory.Get()
lastLog, hasLogs := lo.Last(logs)

if hasLogs && lastLog.At != logs[0].At {
span := lastLog.At.Sub(logs[0].At)

// Each log contains a time and a # of events that happened since
// the prior log. Thus, each log’s Datum is a count of events that
// happened before the timestamp. Since we want the # of events that
// happened between the first & last times, we only want events *after*
// the first time. Thus, we skip the first log entry here.
totalEvents := 0
for _, log := range logs[1:] {
totalEvents += log.Datum
}

return option.Some(util.DivideToF64(totalEvents, span.Seconds()))
}

return option.None[float64]()
}

func addTimestampToLogEvent(ts primitive.Timestamp, event *zerolog.Event) *zerolog.Event {
return event.
Any("timestamp", ts).
Expand Down
2 changes: 1 addition & 1 deletion internal/verifier/migration_verifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -1533,7 +1533,7 @@ func (verifier *Verifier) PrintVerificationSummary(ctx context.Context, genstatu
return
}

verifier.printChangeEventStatistics(strBuilder, reportGenStartTime)
verifier.printChangeEventStatistics(strBuilder)

// Only print the worker status table if debug logging is enabled.
if verifier.logger.Debug().Enabled() {
Expand Down
Loading