Skip to content

Commit bb44bf3

Browse files
committed
1. load lakesplits once 2.enumerator restore
1 parent 2e89973 commit bb44bf3

File tree

7 files changed

+218
-36
lines changed

7 files changed

+218
-36
lines changed

fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lake/LakeSplitGenerator.java

Lines changed: 34 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,11 @@ public LakeSplitGenerator(
7575
this.listPartitionSupplier = listPartitionSupplier;
7676
}
7777

78-
public List<SourceSplitBase> generateHybridLakeSplits(Map<Long, String> newPartitionNameById)
78+
public List<SourceSplitBase> generateHybridLakeSplits(
79+
Map<Long, String> newPartitionNameById,
80+
boolean loadLakeSplits,
81+
List<LakeSplit> remainingLakeSplits,
82+
Map<TableBucket, Long> tableBucketsOffsetState)
7983
throws Exception {
8084
// get the file store
8185
LakeSnapshot lakeSnapshotInfo =
@@ -84,17 +88,32 @@ public List<SourceSplitBase> generateHybridLakeSplits(Map<Long, String> newParti
8488
boolean isLogTable = !tableInfo.hasPrimaryKey();
8589
boolean isPartitioned = tableInfo.isPartitioned();
8690

87-
Map<String, Map<Integer, List<LakeSplit>>> lakeSplits =
88-
groupLakeSplits(
89-
lakeSource
90-
.createPlanner(
91-
(LakeSource.PlannerContext) lakeSnapshotInfo::getSnapshotId)
92-
.plan());
93-
94-
if (lakeSplits.isEmpty()) {
95-
return Collections.emptyList();
91+
Map<String, Map<Integer, List<LakeSplit>>> lakeSplits;
92+
if (remainingLakeSplits.isEmpty()) {
93+
if (loadLakeSplits) {
94+
lakeSplits = Collections.emptyMap();
95+
} else {
96+
lakeSplits =
97+
groupLakeSplits(
98+
lakeSource
99+
.createPlanner(
100+
(LakeSource.PlannerContext)
101+
lakeSnapshotInfo::getSnapshotId)
102+
.plan());
103+
}
104+
} else {
105+
lakeSplits = groupLakeSplits(remainingLakeSplits);
96106
}
97107

108+
// TODO 注释掉是否有问题, 看单元测试结果
109+
// if (lakeSplits.isEmpty()) {
110+
// return Collections.emptyList();
111+
// }
112+
Map<TableBucket, Long> tableBucketsOffset = lakeSnapshotInfo.getTableBucketsOffset();
113+
if (!tableBucketsOffsetState.isEmpty()) {
114+
// TODO tableBucketsOffsetState 未赋值
115+
tableBucketsOffset = tableBucketsOffsetState;
116+
}
98117
if (isPartitioned) {
99118
Map<Long, String> partitionNameById;
100119
if (newPartitionNameById.isEmpty()) {
@@ -110,16 +129,13 @@ public List<SourceSplitBase> generateHybridLakeSplits(Map<Long, String> newParti
110129
}
111130

112131
return generatePartitionTableSplit(
113-
lakeSplits,
114-
isLogTable,
115-
lakeSnapshotInfo.getTableBucketsOffset(),
116-
partitionNameById);
132+
lakeSplits, isLogTable, tableBucketsOffset, partitionNameById);
117133
} else {
118134
Map<Integer, List<LakeSplit>> nonPartitionLakeSplits =
119135
lakeSplits.values().iterator().next();
120136
// non-partitioned table
121137
return generateNoPartitionedTableSplit(
122-
nonPartitionLakeSplits, isLogTable, lakeSnapshotInfo.getTableBucketsOffset());
138+
nonPartitionLakeSplits, isLogTable, tableBucketsOffset);
123139
}
124140
}
125141

@@ -224,10 +240,12 @@ private List<SourceSplitBase> generateSplit(
224240
Map<Integer, Long> bucketEndOffset) {
225241
List<SourceSplitBase> splits = new ArrayList<>();
226242
if (isLogTable) {
243+
int needInitOffsetBucketsNum = bucketCount;
227244
if (lakeSplits != null) {
228245
splits.addAll(toLakeSnapshotSplits(lakeSplits, partitionName, partitionId));
246+
needInitOffsetBucketsNum = lakeSplits.size();
229247
}
230-
for (int bucket = 0; bucket < bucketCount; bucket++) {
248+
for (int bucket = 0; bucket < needInitOffsetBucketsNum; bucket++) {
231249
TableBucket tableBucket =
232250
new TableBucket(tableInfo.getTableId(), partitionId, bucket);
233251
Long snapshotLogOffset = tableBucketSnapshotLogOffset.get(tableBucket);

fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/source/FlinkSource.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -161,6 +161,8 @@ public SplitEnumerator<SourceSplitBase, SourceEnumeratorState> restoreEnumerator
161161
splitEnumeratorContext,
162162
sourceEnumeratorState.getAssignedBuckets(),
163163
sourceEnumeratorState.getAssignedPartitions(),
164+
sourceEnumeratorState.getRemainingLakeSnapshotSplits(),
165+
sourceEnumeratorState.getTableBucketsOffset(),
164166
offsetsInitializer,
165167
scanPartitionDiscoveryIntervalMs,
166168
streaming,
@@ -175,7 +177,7 @@ public SimpleVersionedSerializer<SourceSplitBase> getSplitSerializer() {
175177

176178
@Override
177179
public SimpleVersionedSerializer<SourceEnumeratorState> getEnumeratorCheckpointSerializer() {
178-
return FlussSourceEnumeratorStateSerializer.INSTANCE;
180+
return new FlussSourceEnumeratorStateSerializer(lakeSource);
179181
}
180182

181183
@Override

fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/source/enumerator/FlinkSourceEnumerator.java

Lines changed: 64 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import com.alibaba.fluss.config.ConfigOptions;
2525
import com.alibaba.fluss.config.Configuration;
2626
import com.alibaba.fluss.flink.lake.LakeSplitGenerator;
27+
import com.alibaba.fluss.flink.lake.split.LakeSnapshotSplit;
2728
import com.alibaba.fluss.flink.source.enumerator.initializer.BucketOffsetsRetrieverImpl;
2829
import com.alibaba.fluss.flink.source.enumerator.initializer.NoStoppingOffsetsInitializer;
2930
import com.alibaba.fluss.flink.source.enumerator.initializer.OffsetsInitializer;
@@ -111,6 +112,10 @@ public class FlinkSourceEnumerator
111112
/** buckets that have been assigned to readers. */
112113
private final Set<TableBucket> assignedTableBuckets;
113114

115+
private final List<LakeSnapshotSplit> remainingLakeSnapshotSplits;
116+
117+
private final Map<TableBucket, Long> tableBucketsOffset;
118+
114119
private final long scanPartitionDiscoveryIntervalMs;
115120

116121
private final boolean streaming;
@@ -129,6 +134,8 @@ public class FlinkSourceEnumerator
129134

130135
private boolean lakeEnabled = false;
131136

137+
private boolean loadLakeSplits = true;
138+
132139
private volatile boolean closed = false;
133140

134141
private final List<FieldEqual> partitionFilters;
@@ -177,6 +184,8 @@ public FlinkSourceEnumerator(
177184
context,
178185
Collections.emptySet(),
179186
Collections.emptyMap(),
187+
Collections.emptyList(),
188+
Collections.emptyMap(),
180189
startingOffsetsInitializer,
181190
scanPartitionDiscoveryIntervalMs,
182191
streaming,
@@ -192,6 +201,8 @@ public FlinkSourceEnumerator(
192201
SplitEnumeratorContext<SourceSplitBase> context,
193202
Set<TableBucket> assignedTableBuckets,
194203
Map<Long, String> assignedPartitions,
204+
List<LakeSnapshotSplit> remainingLakeSnapshotSplits,
205+
Map<TableBucket, Long> tableBucketsOffset,
195206
OffsetsInitializer startingOffsetsInitializer,
196207
long scanPartitionDiscoveryIntervalMs,
197208
boolean streaming,
@@ -206,6 +217,8 @@ public FlinkSourceEnumerator(
206217
this.assignedTableBuckets = new HashSet<>(assignedTableBuckets);
207218
this.startingOffsetsInitializer = startingOffsetsInitializer;
208219
this.assignedPartitions = new HashMap<>(assignedPartitions);
220+
this.remainingLakeSnapshotSplits = new ArrayList<>(remainingLakeSnapshotSplits);
221+
this.tableBucketsOffset = new HashMap<>(tableBucketsOffset);
209222
this.scanPartitionDiscoveryIntervalMs = scanPartitionDiscoveryIntervalMs;
210223
this.streaming = streaming;
211224
this.partitionFilters = checkNotNull(partitionFilters);
@@ -276,7 +289,14 @@ private void genHybridSplitsInBatchMode() {
276289

277290
private void genHybridSplitsInStreamNonPartitionedMode() {
278291
if (lakeEnabled) {
279-
context.callAsync(this::getLakeSplit, this::handleSplitsAdd);
292+
context.callAsync(
293+
() ->
294+
getLakeSplit(
295+
Collections.EMPTY_MAP,
296+
true,
297+
remainingLakeSnapshotSplits,
298+
tableBucketsOffset),
299+
this::handleSplitsAdd);
280300
} else {
281301
// init bucket splits and assign
282302
context.callAsync(this::initNonPartitionedSplits, this::handleSplitsAdd);
@@ -292,7 +312,15 @@ private void genHybridSplitsInStreamPartitionedMode(PartitionChange partitionCha
292312
Partition::getPartitionId,
293313
Partition::getPartitionName));
294314

295-
context.callAsync(() -> getLakeSplit(newPartitionsNameById), this::handleSplitsAdd);
315+
context.callAsync(
316+
() ->
317+
getLakeSplit(
318+
newPartitionsNameById,
319+
loadLakeSplits,
320+
remainingLakeSnapshotSplits,
321+
tableBucketsOffset),
322+
this::handleSplitsAdd);
323+
loadLakeSplits = false;
296324
} else {
297325
context.callAsync(
298326
() -> initPartitionedSplits(partitionChange.newPartitions),
@@ -535,7 +563,11 @@ private List<SourceSplitBase> getLogSplit(
535563
return splits;
536564
}
537565

538-
private List<SourceSplitBase> getLakeSplit(Map<Long, String> newPartitionsNameById)
566+
private List<SourceSplitBase> getLakeSplit(
567+
Map<Long, String> newPartitionsNameById,
568+
boolean loadLakeSplits,
569+
List<LakeSnapshotSplit> remainingLakeSnapshotSplits,
570+
Map<TableBucket, Long> tableBucketsOffset)
539571
throws Exception {
540572
LakeSplitGenerator lakeSplitGenerator =
541573
new LakeSplitGenerator(
@@ -546,11 +578,17 @@ private List<SourceSplitBase> getLakeSplit(Map<Long, String> newPartitionsNameBy
546578
stoppingOffsetsInitializer,
547579
tableInfo.getNumBuckets(),
548580
this::listPartitions);
549-
return lakeSplitGenerator.generateHybridLakeSplits(newPartitionsNameById);
581+
List<LakeSplit> remainingLakeSplits =
582+
remainingLakeSnapshotSplits.stream()
583+
.map(LakeSnapshotSplit::getLakeSplit)
584+
.collect(Collectors.toList());
585+
return lakeSplitGenerator.generateHybridLakeSplits(
586+
newPartitionsNameById, loadLakeSplits, remainingLakeSplits, tableBucketsOffset);
550587
}
551588

552589
private List<SourceSplitBase> getLakeSplit() throws Exception {
553-
return getLakeSplit(Collections.EMPTY_MAP);
590+
return getLakeSplit(
591+
Collections.EMPTY_MAP, true, Collections.emptyList(), Collections.emptyMap());
554592
}
555593

556594
private boolean ignoreTableBucket(TableBucket tableBucket) {
@@ -640,6 +678,19 @@ private void assignPendingSplits(Set<Integer> pendingReaders) {
640678
incrementalAssignment
641679
.computeIfAbsent(pendingReader, (ignored) -> new ArrayList<>())
642680
.addAll(pendingAssignmentForReader);
681+
remainingLakeSnapshotSplits.addAll(
682+
pendingAssignmentForReader.stream()
683+
.filter(SourceSplitBase::isLakeSplit)
684+
.map(x -> (LakeSnapshotSplit) x)
685+
.collect(Collectors.toList()));
686+
687+
Map<TableBucket, Long> tableBucketRecordsToSplit =
688+
remainingLakeSnapshotSplits.stream()
689+
.collect(
690+
Collectors.toMap(
691+
SourceSplitBase::getTableBucket,
692+
LakeSnapshotSplit::getRecordsToSplit));
693+
tableBucketsOffset.putAll(tableBucketRecordsToSplit);
643694

644695
// Mark pending bucket assignment as already assigned
645696
pendingAssignmentForReader.forEach(
@@ -658,6 +709,9 @@ private void assignPendingSplits(Set<Integer> pendingReaders) {
658709
"partition name shouldn't be null for the splits of partitioned table.");
659710
assignedPartitions.put(partitionId, partitionName);
660711
}
712+
if (split.isLakeSplit()) {
713+
remainingLakeSnapshotSplits.remove((LakeSnapshotSplit) split);
714+
}
661715
});
662716
}
663717
}
@@ -783,7 +837,11 @@ public void addReader(int subtaskId) {
783837
@Override
784838
public SourceEnumeratorState snapshotState(long checkpointId) {
785839
final SourceEnumeratorState enumeratorState =
786-
new SourceEnumeratorState(assignedTableBuckets, assignedPartitions);
840+
new SourceEnumeratorState(
841+
assignedTableBuckets,
842+
assignedPartitions,
843+
remainingLakeSnapshotSplits,
844+
tableBucketsOffset);
787845
LOG.debug("Source Checkpoint is {}", enumeratorState);
788846
return enumeratorState;
789847
}

0 commit comments

Comments
 (0)