Skip to content

Commit a6f3e48

Browse files
committed
optimize enumerator state for lake source
1 parent e277372 commit a6f3e48

File tree

14 files changed

+250
-281
lines changed

14 files changed

+250
-281
lines changed

fluss-client/src/main/java/org/apache/fluss/client/admin/Admin.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import org.apache.fluss.exception.InvalidReplicationFactorException;
3232
import org.apache.fluss.exception.InvalidTableException;
3333
import org.apache.fluss.exception.KvSnapshotNotExistException;
34+
import org.apache.fluss.exception.LakeTableSnapshotNotExistException;
3435
import org.apache.fluss.exception.NonPrimaryKeyTableException;
3536
import org.apache.fluss.exception.PartitionAlreadyExistsException;
3637
import org.apache.fluss.exception.PartitionNotExistException;
@@ -383,6 +384,7 @@ CompletableFuture<KvSnapshotMetadata> getKvSnapshotMetadata(
383384
*
384385
* <ul>
385386
* <li>{@link TableNotExistException} if the table does not exist.
387+
* <li>{@link LakeTableSnapshotNotExistException} if no any lake snapshot exist.
386388
* </ul>
387389
*
388390
* @param tablePath the table path of the table.

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeSplitGenerator.java

Lines changed: 38 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import org.apache.fluss.client.admin.Admin;
2121
import org.apache.fluss.client.metadata.LakeSnapshot;
22+
import org.apache.fluss.exception.LakeTableSnapshotNotExistException;
2223
import org.apache.fluss.flink.lake.split.LakeSnapshotAndFlussLogSplit;
2324
import org.apache.fluss.flink.lake.split.LakeSnapshotSplit;
2425
import org.apache.fluss.flink.source.enumerator.initializer.OffsetsInitializer;
@@ -29,6 +30,7 @@
2930
import org.apache.fluss.metadata.PartitionInfo;
3031
import org.apache.fluss.metadata.TableBucket;
3132
import org.apache.fluss.metadata.TableInfo;
33+
import org.apache.fluss.utils.ExceptionUtils;
3234

3335
import javax.annotation.Nullable;
3436

@@ -75,59 +77,43 @@ public LakeSplitGenerator(
7577
this.listPartitionSupplier = listPartitionSupplier;
7678
}
7779

78-
public List<SourceSplitBase> generateHybridLakeSplits(
79-
Map<Long, String> newPartitionNameById,
80-
boolean loadLakeSplits,
81-
List<LakeSplit> remainingLakeSplits,
82-
Map<TableBucket, Long> tableBucketsOffsetState)
83-
throws Exception {
84-
// get the file store
85-
LakeSnapshot lakeSnapshotInfo =
86-
flussAdmin.getLatestLakeSnapshot(tableInfo.getTablePath()).get();
80+
/**
81+
* Return A list of hybrid lake snapshot {@link LakeSnapshotSplit}, {@link
82+
* LakeSnapshotAndFlussLogSplit} and the corresponding Fluss {@link LogSplit} based on the lake
83+
* snapshot. Return null if no lake snapshot exists.
84+
*/
85+
@Nullable
86+
public List<SourceSplitBase> generateHybridLakeFlussSplits() throws Exception {
87+
LakeSnapshot lakeSnapshotInfo;
88+
try {
89+
lakeSnapshotInfo = flussAdmin.getLatestLakeSnapshot(tableInfo.getTablePath()).get();
90+
} catch (Exception exception) {
91+
if (ExceptionUtils.stripExecutionException(exception)
92+
instanceof LakeTableSnapshotNotExistException) {
93+
return null;
94+
}
95+
throw exception;
96+
}
8797

8898
boolean isLogTable = !tableInfo.hasPrimaryKey();
8999
boolean isPartitioned = tableInfo.isPartitioned();
90100

91-
Map<String, Map<Integer, List<LakeSplit>>> lakeSplits;
92-
if (remainingLakeSplits.isEmpty()) {
93-
if (loadLakeSplits) {
94-
lakeSplits =
95-
groupLakeSplits(
96-
lakeSource
97-
.createPlanner(
98-
(LakeSource.PlannerContext)
99-
lakeSnapshotInfo::getSnapshotId)
100-
.plan());
101-
} else {
102-
lakeSplits = Collections.emptyMap();
103-
}
104-
} else {
105-
lakeSplits = groupLakeSplits(remainingLakeSplits);
106-
}
101+
Map<String, Map<Integer, List<LakeSplit>>> lakeSplits =
102+
groupLakeSplits(
103+
lakeSource
104+
.createPlanner(
105+
(LakeSource.PlannerContext) lakeSnapshotInfo::getSnapshotId)
106+
.plan());
107107

108-
// TODO 注释掉是否有问题, 看单元测试结果
109-
// if (lakeSplits.isEmpty()) {
110-
// return Collections.emptyList();
111-
// }
112108
Map<TableBucket, Long> tableBucketsOffset = lakeSnapshotInfo.getTableBucketsOffset();
113-
if (!tableBucketsOffsetState.isEmpty()) {
114-
// TODO tableBucketsOffsetState 未赋值
115-
tableBucketsOffset = tableBucketsOffsetState;
116-
}
117109
if (isPartitioned) {
118-
Map<Long, String> partitionNameById;
119-
if (newPartitionNameById.isEmpty()) {
120-
Set<PartitionInfo> partitionInfos = listPartitionSupplier.get();
121-
partitionNameById =
122-
partitionInfos.stream()
123-
.collect(
124-
Collectors.toMap(
125-
PartitionInfo::getPartitionId,
126-
PartitionInfo::getPartitionName));
127-
} else {
128-
partitionNameById = newPartitionNameById;
129-
}
130-
110+
Set<PartitionInfo> partitionInfos = listPartitionSupplier.get();
111+
Map<Long, String> partitionNameById =
112+
partitionInfos.stream()
113+
.collect(
114+
Collectors.toMap(
115+
PartitionInfo::getPartitionId,
116+
PartitionInfo::getPartitionName));
131117
return generatePartitionTableSplit(
132118
lakeSplits, isLogTable, tableBucketsOffset, partitionNameById);
133119
} else {
@@ -157,8 +143,7 @@ private List<SourceSplitBase> generatePartitionTableSplit(
157143
Map<String, Map<Integer, List<LakeSplit>>> lakeSplits,
158144
boolean isLogTable,
159145
Map<TableBucket, Long> tableBucketSnapshotLogOffset,
160-
Map<Long, String> partitionNameById)
161-
throws Exception {
146+
Map<Long, String> partitionNameById) {
162147
List<SourceSplitBase> splits = new ArrayList<>();
163148
Map<String, Long> flussPartitionIdByName =
164149
partitionNameById.entrySet().stream()
@@ -240,12 +225,10 @@ private List<SourceSplitBase> generateSplit(
240225
Map<Integer, Long> bucketEndOffset) {
241226
List<SourceSplitBase> splits = new ArrayList<>();
242227
if (isLogTable) {
243-
int needInitOffsetBucketsNum = bucketCount;
244228
if (lakeSplits != null) {
245229
splits.addAll(toLakeSnapshotSplits(lakeSplits, partitionName, partitionId));
246-
needInitOffsetBucketsNum = lakeSplits.size();
247230
}
248-
for (int bucket = 0; bucket < needInitOffsetBucketsNum; bucket++) {
231+
for (int bucket = 0; bucket < bucketCount; bucket++) {
249232
TableBucket tableBucket =
250233
new TableBucket(tableInfo.getTableId(), partitionId, bucket);
251234
Long snapshotLogOffset = tableBucketSnapshotLogOffset.get(tableBucket);
@@ -292,11 +275,14 @@ private List<SourceSplitBase> toLakeSnapshotSplits(
292275
@Nullable String partitionName,
293276
@Nullable Long partitionId) {
294277
List<SourceSplitBase> splits = new ArrayList<>();
278+
// we may have multiple table buckets; so we need to
279+
// introduce an index to make split unique
280+
int index = 0;
295281
for (LakeSplit lakeSplit :
296282
lakeSplits.values().stream().flatMap(List::stream).collect(Collectors.toList())) {
297283
TableBucket tableBucket =
298284
new TableBucket(tableInfo.getTableId(), partitionId, lakeSplit.bucket());
299-
splits.add(new LakeSnapshotSplit(tableBucket, partitionName, lakeSplit));
285+
splits.add(new LakeSnapshotSplit(tableBucket, partitionName, lakeSplit, index++));
300286
}
301287
return splits;
302288
}

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeSplitSerializer.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -48,8 +48,9 @@ public LakeSplitSerializer(SimpleVersionedSerializer<LakeSplit> sourceSplitSeria
4848

4949
public void serialize(DataOutputSerializer out, SourceSplitBase split) throws IOException {
5050
if (split instanceof LakeSnapshotSplit) {
51-
byte[] serializeBytes =
52-
sourceSplitSerializer.serialize(((LakeSnapshotSplit) split).getLakeSplit());
51+
LakeSnapshotSplit lakeSplit = (LakeSnapshotSplit) split;
52+
out.writeInt(lakeSplit.getSplitIndex());
53+
byte[] serializeBytes = sourceSplitSerializer.serialize(lakeSplit.getLakeSplit());
5354
out.writeInt(serializeBytes.length);
5455
out.write(serializeBytes);
5556
} else if (split instanceof LakeSnapshotAndFlussLogSplit) {
@@ -89,12 +90,13 @@ public SourceSplitBase deserialize(
8990
DataInputDeserializer input)
9091
throws IOException {
9192
if (splitKind == LAKE_SNAPSHOT_SPLIT_KIND) {
93+
int splitIndex = input.readInt();
9294
byte[] serializeBytes = new byte[input.readInt()];
9395
input.read(serializeBytes);
94-
LakeSplit fileStoreSourceSplit =
96+
LakeSplit lakeSplit =
9597
sourceSplitSerializer.deserialize(
9698
sourceSplitSerializer.getVersion(), serializeBytes);
97-
return new LakeSnapshotSplit(tableBucket, partition, fileStoreSourceSplit);
99+
return new LakeSnapshotSplit(tableBucket, partition, lakeSplit, splitIndex);
98100
} else if (splitKind == LAKE_SNAPSHOT_FLUSS_LOG_SPLIT_KIND) {
99101
List<LakeSplit> lakeSplits = null;
100102
if (input.readBoolean()) {

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/split/LakeSnapshotAndFlussLogSplit.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -107,4 +107,23 @@ public String splitId() {
107107
public List<LakeSplit> getLakeSplits() {
108108
return lakeSnapshotSplits;
109109
}
110+
111+
@Override
112+
public String toString() {
113+
return "LakeSnapshotAndFlussLogSplit{"
114+
+ "lakeSnapshotSplits="
115+
+ lakeSnapshotSplits
116+
+ ", recordOffset="
117+
+ recordOffset
118+
+ ", startingOffset="
119+
+ startingOffset
120+
+ ", stoppingOffset="
121+
+ stoppingOffset
122+
+ ", tableBucket="
123+
+ tableBucket
124+
+ ", partitionName='"
125+
+ partitionName
126+
+ '\''
127+
+ '}';
128+
}
110129
}

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/split/LakeSnapshotSplit.java

Lines changed: 37 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -32,18 +32,25 @@ public class LakeSnapshotSplit extends SourceSplitBase {
3232

3333
private final long recordsToSplit;
3434

35+
private final int splitIndex;
36+
3537
public LakeSnapshotSplit(
36-
TableBucket tableBucket, @Nullable String partitionName, LakeSplit lakeSplit) {
37-
this(tableBucket, partitionName, lakeSplit, 0);
38+
TableBucket tableBucket,
39+
@Nullable String partitionName,
40+
LakeSplit lakeSplit,
41+
int splitIndex) {
42+
this(tableBucket, partitionName, lakeSplit, splitIndex, 0);
3843
}
3944

4045
public LakeSnapshotSplit(
4146
TableBucket tableBucket,
4247
@Nullable String partitionName,
4348
LakeSplit lakeSplit,
49+
int splitIndex,
4450
long recordsToSplit) {
4551
super(tableBucket, partitionName);
4652
this.lakeSplit = lakeSplit;
53+
this.splitIndex = splitIndex;
4754
this.recordsToSplit = recordsToSplit;
4855
}
4956

@@ -55,14 +62,20 @@ public long getRecordsToSplit() {
5562
return recordsToSplit;
5663
}
5764

65+
public int getSplitIndex() {
66+
return splitIndex;
67+
}
68+
5869
@Override
5970
public String splitId() {
6071
return toSplitId(
61-
"lake-snapshot-",
62-
new TableBucket(
63-
tableBucket.getTableId(),
64-
tableBucket.getPartitionId(),
65-
lakeSplit.bucket()));
72+
"lake-snapshot-",
73+
new TableBucket(
74+
tableBucket.getTableId(),
75+
tableBucket.getPartitionId(),
76+
lakeSplit.bucket()))
77+
+ "-"
78+
+ splitIndex;
6679
}
6780

6881
@Override
@@ -74,4 +87,21 @@ public boolean isLakeSplit() {
7487
public byte splitKind() {
7588
return LAKE_SNAPSHOT_SPLIT_KIND;
7689
}
90+
91+
@Override
92+
public String toString() {
93+
return "LakeSnapshotSplit{"
94+
+ "lakeSplit="
95+
+ lakeSplit
96+
+ ", recordsToSplit="
97+
+ recordsToSplit
98+
+ ", splitIndex="
99+
+ splitIndex
100+
+ ", tableBucket="
101+
+ tableBucket
102+
+ ", partitionName='"
103+
+ partitionName
104+
+ '\''
105+
+ '}';
106+
}
77107
}

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/state/LakeSnapshotSplitState.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ public SourceSplitBase toSourceSplit() {
4343
split.getTableBucket(),
4444
split.getPartitionName(),
4545
split.getLakeSplit(),
46+
split.getSplitIndex(),
4647
recordsToSplit);
4748
}
4849
}

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkSource.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -161,8 +161,7 @@ public SplitEnumerator<SourceSplitBase, SourceEnumeratorState> restoreEnumerator
161161
splitEnumeratorContext,
162162
sourceEnumeratorState.getAssignedBuckets(),
163163
sourceEnumeratorState.getAssignedPartitions(),
164-
sourceEnumeratorState.getRemainingLakeSnapshotSplits(),
165-
sourceEnumeratorState.getTableBucketsOffset(),
164+
sourceEnumeratorState.getRemainingHybridLakeFlussSplits(),
166165
offsetsInitializer,
167166
scanPartitionDiscoveryIntervalMs,
168167
streaming,

0 commit comments

Comments
 (0)