24
24
import org .apache .fluss .config .ConfigOptions ;
25
25
import org .apache .fluss .config .Configuration ;
26
26
import org .apache .fluss .flink .lake .LakeSplitGenerator ;
27
+ import org .apache .fluss .flink .lake .split .LakeSnapshotSplit ;
27
28
import org .apache .fluss .flink .source .enumerator .initializer .BucketOffsetsRetrieverImpl ;
28
29
import org .apache .fluss .flink .source .enumerator .initializer .NoStoppingOffsetsInitializer ;
29
30
import org .apache .fluss .flink .source .enumerator .initializer .OffsetsInitializer ;
@@ -111,6 +112,10 @@ public class FlinkSourceEnumerator
111
112
/** buckets that have been assigned to readers. */
112
113
private final Set <TableBucket > assignedTableBuckets ;
113
114
115
+ private final List <LakeSnapshotSplit > remainingLakeSnapshotSplits ;
116
+
117
+ private final Map <TableBucket , Long > tableBucketsOffset ;
118
+
114
119
private final long scanPartitionDiscoveryIntervalMs ;
115
120
116
121
private final boolean streaming ;
@@ -129,6 +134,8 @@ public class FlinkSourceEnumerator
129
134
130
135
private boolean lakeEnabled = false ;
131
136
137
+ private boolean loadLakeSplits = true ;
138
+
132
139
private volatile boolean closed = false ;
133
140
134
141
private final List <FieldEqual > partitionFilters ;
@@ -177,6 +184,8 @@ public FlinkSourceEnumerator(
177
184
context ,
178
185
Collections .emptySet (),
179
186
Collections .emptyMap (),
187
+ Collections .emptyList (),
188
+ Collections .emptyMap (),
180
189
startingOffsetsInitializer ,
181
190
scanPartitionDiscoveryIntervalMs ,
182
191
streaming ,
@@ -192,6 +201,8 @@ public FlinkSourceEnumerator(
192
201
SplitEnumeratorContext <SourceSplitBase > context ,
193
202
Set <TableBucket > assignedTableBuckets ,
194
203
Map <Long , String > assignedPartitions ,
204
+ List <LakeSnapshotSplit > remainingLakeSnapshotSplits ,
205
+ Map <TableBucket , Long > tableBucketsOffset ,
195
206
OffsetsInitializer startingOffsetsInitializer ,
196
207
long scanPartitionDiscoveryIntervalMs ,
197
208
boolean streaming ,
@@ -206,6 +217,8 @@ public FlinkSourceEnumerator(
206
217
this .assignedTableBuckets = new HashSet <>(assignedTableBuckets );
207
218
this .startingOffsetsInitializer = startingOffsetsInitializer ;
208
219
this .assignedPartitions = new HashMap <>(assignedPartitions );
220
+ this .remainingLakeSnapshotSplits = new ArrayList <>(remainingLakeSnapshotSplits );
221
+ this .tableBucketsOffset = new HashMap <>(tableBucketsOffset );
209
222
this .scanPartitionDiscoveryIntervalMs = scanPartitionDiscoveryIntervalMs ;
210
223
this .streaming = streaming ;
211
224
this .partitionFilters = checkNotNull (partitionFilters );
@@ -276,7 +289,14 @@ private void genHybridSplitsInBatchMode() {
276
289
277
290
private void genHybridSplitsInStreamNonPartitionedMode () {
278
291
if (lakeEnabled ) {
279
- context .callAsync (this ::getLakeSplit , this ::handleSplitsAdd );
292
+ context .callAsync (
293
+ () ->
294
+ getLakeSplit (
295
+ Collections .EMPTY_MAP ,
296
+ true ,
297
+ remainingLakeSnapshotSplits ,
298
+ tableBucketsOffset ),
299
+ this ::handleSplitsAdd );
280
300
} else {
281
301
// init bucket splits and assign
282
302
context .callAsync (this ::initNonPartitionedSplits , this ::handleSplitsAdd );
@@ -292,7 +312,15 @@ private void genHybridSplitsInStreamPartitionedMode(PartitionChange partitionCha
292
312
Partition ::getPartitionId ,
293
313
Partition ::getPartitionName ));
294
314
295
- context .callAsync (() -> getLakeSplit (newPartitionsNameById ), this ::handleSplitsAdd );
315
+ context .callAsync (
316
+ () ->
317
+ getLakeSplit (
318
+ newPartitionsNameById ,
319
+ loadLakeSplits ,
320
+ remainingLakeSnapshotSplits ,
321
+ tableBucketsOffset ),
322
+ this ::handleSplitsAdd );
323
+ loadLakeSplits = false ;
296
324
} else {
297
325
context .callAsync (
298
326
() -> initPartitionedSplits (partitionChange .newPartitions ),
@@ -535,7 +563,11 @@ private List<SourceSplitBase> getLogSplit(
535
563
return splits ;
536
564
}
537
565
538
- private List <SourceSplitBase > getLakeSplit (Map <Long , String > newPartitionsNameById )
566
+ private List <SourceSplitBase > getLakeSplit (
567
+ Map <Long , String > newPartitionsNameById ,
568
+ boolean loadLakeSplits ,
569
+ List <LakeSnapshotSplit > remainingLakeSnapshotSplits ,
570
+ Map <TableBucket , Long > tableBucketsOffset )
539
571
throws Exception {
540
572
LakeSplitGenerator lakeSplitGenerator =
541
573
new LakeSplitGenerator (
@@ -546,11 +578,17 @@ private List<SourceSplitBase> getLakeSplit(Map<Long, String> newPartitionsNameBy
546
578
stoppingOffsetsInitializer ,
547
579
tableInfo .getNumBuckets (),
548
580
this ::listPartitions );
549
- return lakeSplitGenerator .generateHybridLakeSplits (newPartitionsNameById );
581
+ List <LakeSplit > remainingLakeSplits =
582
+ remainingLakeSnapshotSplits .stream ()
583
+ .map (LakeSnapshotSplit ::getLakeSplit )
584
+ .collect (Collectors .toList ());
585
+ return lakeSplitGenerator .generateHybridLakeSplits (
586
+ newPartitionsNameById , loadLakeSplits , remainingLakeSplits , tableBucketsOffset );
550
587
}
551
588
552
589
private List <SourceSplitBase > getLakeSplit () throws Exception {
553
- return getLakeSplit (Collections .EMPTY_MAP );
590
+ return getLakeSplit (
591
+ Collections .EMPTY_MAP , true , Collections .emptyList (), Collections .emptyMap ());
554
592
}
555
593
556
594
private boolean ignoreTableBucket (TableBucket tableBucket ) {
@@ -640,6 +678,19 @@ private void assignPendingSplits(Set<Integer> pendingReaders) {
640
678
incrementalAssignment
641
679
.computeIfAbsent (pendingReader , (ignored ) -> new ArrayList <>())
642
680
.addAll (pendingAssignmentForReader );
681
+ remainingLakeSnapshotSplits .addAll (
682
+ pendingAssignmentForReader .stream ()
683
+ .filter (SourceSplitBase ::isLakeSplit )
684
+ .map (x -> (LakeSnapshotSplit ) x )
685
+ .collect (Collectors .toList ()));
686
+
687
+ Map <TableBucket , Long > tableBucketRecordsToSplit =
688
+ remainingLakeSnapshotSplits .stream ()
689
+ .collect (
690
+ Collectors .toMap (
691
+ SourceSplitBase ::getTableBucket ,
692
+ LakeSnapshotSplit ::getRecordsToSplit ));
693
+ tableBucketsOffset .putAll (tableBucketRecordsToSplit );
643
694
644
695
// Mark pending bucket assignment as already assigned
645
696
pendingAssignmentForReader .forEach (
@@ -658,6 +709,9 @@ private void assignPendingSplits(Set<Integer> pendingReaders) {
658
709
"partition name shouldn't be null for the splits of partitioned table." );
659
710
assignedPartitions .put (partitionId , partitionName );
660
711
}
712
+ if (split .isLakeSplit ()) {
713
+ remainingLakeSnapshotSplits .remove ((LakeSnapshotSplit ) split );
714
+ }
661
715
});
662
716
}
663
717
}
@@ -783,7 +837,11 @@ public void addReader(int subtaskId) {
783
837
@ Override
784
838
public SourceEnumeratorState snapshotState (long checkpointId ) {
785
839
final SourceEnumeratorState enumeratorState =
786
- new SourceEnumeratorState (assignedTableBuckets , assignedPartitions );
840
+ new SourceEnumeratorState (
841
+ assignedTableBuckets ,
842
+ assignedPartitions ,
843
+ remainingLakeSnapshotSplits ,
844
+ tableBucketsOffset );
787
845
LOG .debug ("Source Checkpoint is {}" , enumeratorState );
788
846
return enumeratorState ;
789
847
}
0 commit comments