Skip to content

Commit d378179

Browse files
authored
[hotfix] Fix unstable test IcebergTieringITCase (#1584)
* [hotfix] Fix unstable test IcebergTieringITCase * address comments
1 parent c1dc8e3 commit d378179

File tree

1 file changed

+23
-14
lines changed

1 file changed

+23
-14
lines changed

fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/tiering/IcebergLakeCommitter.java

Lines changed: 23 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444

4545
import java.io.IOException;
4646
import java.util.Arrays;
47+
import java.util.Collections;
4748
import java.util.List;
4849
import java.util.Map;
4950
import java.util.stream.Collectors;
@@ -55,6 +56,7 @@
5556

5657
/** Implementation of {@link LakeCommitter} for Iceberg. */
5758
public class IcebergLakeCommitter implements LakeCommitter<IcebergWriteResult, IcebergCommittable> {
59+
private static final String COMMITTER_USER = "commit-user";
5860

5961
private final Catalog icebergCatalog;
6062
private final Table icebergTable;
@@ -139,7 +141,7 @@ public long commit(IcebergCommittable committable, Map<String, String> snapshotP
139141

140142
private void addFlussProperties(
141143
SnapshotUpdate<?> operation, Map<String, String> snapshotProperties) {
142-
operation.set("commit-user", FLUSS_LAKE_TIERING_COMMIT_USER);
144+
operation.set(COMMITTER_USER, FLUSS_LAKE_TIERING_COMMIT_USER);
143145
for (Map.Entry<String, String> entry : snapshotProperties.entrySet()) {
144146
operation.set(entry.getKey(), entry.getValue());
145147
}
@@ -173,9 +175,19 @@ public CommittedLakeSnapshot getMissingLakeSnapshot(@Nullable Long latestLakeSna
173175
}
174176

175177
// Check if there's a gap between Fluss and Iceberg snapshots
176-
if (latestLakeSnapshotIdOfFluss != null
177-
&& latestLakeSnapshot.snapshotId() <= latestLakeSnapshotIdOfFluss) {
178-
return null;
178+
if (latestLakeSnapshotIdOfFluss != null) {
179+
Snapshot latestLakeSnapshotOfFluss = icebergTable.snapshot(latestLakeSnapshotIdOfFluss);
180+
if (latestLakeSnapshotOfFluss == null) {
181+
throw new IllegalStateException(
182+
"Referenced Fluss snapshot "
183+
+ latestLakeSnapshotIdOfFluss
184+
+ " not found in Iceberg table");
185+
}
186+
// note: we need to use sequence number to compare,
187+
// we can't use snapshot id as the snapshot id is not ordered
188+
if (latestLakeSnapshot.sequenceNumber() <= latestLakeSnapshotOfFluss.sequenceNumber()) {
189+
return null;
190+
}
179191
}
180192

181193
CommittedLakeSnapshot committedLakeSnapshot =
@@ -237,20 +249,17 @@ private Snapshot getCommittedLatestSnapshotOfLake(String commitUser) {
237249
icebergTable.refresh();
238250

239251
// Find the latest snapshot committed by Fluss
240-
Iterable<Snapshot> snapshots = icebergTable.snapshots();
241-
Snapshot latestFlussSnapshot = null;
242-
252+
List<Snapshot> snapshots = (List<Snapshot>) icebergTable.snapshots();
253+
// snapshots() returns snapshots in chronological order (oldest to newest), Reverse to find
254+
// most recent snapshot committed by Fluss
255+
Collections.reverse(snapshots);
243256
for (Snapshot snapshot : snapshots) {
244257
Map<String, String> summary = snapshot.summary();
245-
if (summary != null && commitUser.equals(summary.get("commit-user"))) {
246-
if (latestFlussSnapshot == null
247-
|| snapshot.snapshotId() > latestFlussSnapshot.snapshotId()) {
248-
latestFlussSnapshot = snapshot;
249-
}
258+
if (summary != null && commitUser.equals(summary.get(COMMITTER_USER))) {
259+
return snapshot;
250260
}
251261
}
252-
253-
return latestFlussSnapshot;
262+
return null;
254263
}
255264

256265
/** A {@link Listener} to listen the iceberg create snapshot event. */

0 commit comments

Comments
 (0)