Skip to content

Commit 348cbfc

Browse files
authored
Bulk load CDK: log diagnostics when unflushed states at end of sync (#68668)
1 parent 99658b9 commit 348cbfc

File tree

5 files changed

+33
-2
lines changed

5 files changed

+33
-2
lines changed

airbyte-cdk/bulk/changelog.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,6 @@
1+
## Version 0.1.58
2+
3+
load cdk: log unflushed state diagnostic info
14
## Version 0.1.57
25

36
**Extract CDK**

airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/dataflow/pipeline/PipelineRunner.kt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,8 @@ class PipelineRunner(
6767
if (store.hasStates()) {
6868
val stateException =
6969
IllegalStateException("Sync completed, but unflushed states were detected.")
70-
log.info { "Destination Pipeline Completed — Exceptionally: $stateException" }
70+
store.logStateInfo()
71+
log.error { "Destination Pipeline Completed — Exceptionally: $stateException" }
7172
throw stateException
7273
}
7374

airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/dataflow/state/StateHistogramStore.kt

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,14 @@ class StateHistogramStore {
3232
return expectedCount == flushedCount
3333
}
3434

35+
// mirrors isComplete. Purely for debugging purposes.
36+
fun whyIsStateIncomplete(key: StateKey): String {
37+
val expectedCount = expected.get(key)
38+
val partitionFlushCounts = key.partitionKeys.map { flushed.get(it) ?: 0 }
39+
val flushedCount = partitionFlushCounts.sum()
40+
return "expectedCount $expectedCount does not equal flushedCount $flushedCount (by partition: $partitionFlushCounts)"
41+
}
42+
3543
fun remove(key: StateKey): Long? {
3644
key.partitionKeys.forEach { flushed.remove(it) }
3745
return expected.remove(key)

airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/dataflow/state/StateStore.kt

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,4 +47,23 @@ class StateStore(
4747
}
4848

4949
fun hasStates(): Boolean = states.isNotEmpty()
50+
51+
/** Log some information about why state messages cannot be flushed. */
52+
// implementation-wise, this is largely just mirroring getNextComplete(),
53+
// except it logs things instead of actually doing anything.
54+
fun logStateInfo() {
55+
val message = StringBuilder("State diagnostic information:")
56+
val key = states.firstKey()
57+
val state = states.get(key)
58+
message.append("\nFirst state key: $key (full state message: $state)")
59+
if (key.id != stateSequence.get()) {
60+
message.append("\nKey ID did not match state sequence ($stateSequence.get()")
61+
} else if (!histogramStore.isComplete(key)) {
62+
message.append(
63+
"\nhistogram store says key is incomplete: ${histogramStore.whyIsStateIncomplete(key)}"
64+
)
65+
}
66+
log.info { message }
67+
return
68+
}
5069
}
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
version=0.1.57
1+
version=0.1.58

0 commit comments

Comments
 (0)