Skip to content

Commit 4dc87bc

Browse files
committed
add discover partition IT tests
1 parent 0221cd5 commit 4dc87bc

File tree

1 file changed

+30
-5
lines changed

1 file changed

+30
-5
lines changed

fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/flink/FlinkUnionReadLogTableITCase.java

Lines changed: 30 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import org.apache.flink.util.CloseableIterator;
2929
import org.apache.flink.util.CollectionUtil;
3030
import org.junit.jupiter.api.BeforeAll;
31+
import org.junit.jupiter.api.Test;
3132
import org.junit.jupiter.params.ParameterizedTest;
3233
import org.junit.jupiter.params.provider.ValueSource;
3334

@@ -37,6 +38,7 @@
3738
import java.time.LocalDateTime;
3839
import java.time.ZoneId;
3940
import java.util.ArrayList;
41+
import java.util.LinkedList;
4042
import java.util.List;
4143
import java.util.Map;
4244
import java.util.stream.Collectors;
@@ -126,14 +128,14 @@ void testReadLogTableFullType(boolean isPartitioned) throws Exception {
126128

127129
@ParameterizedTest
128130
@ValueSource(booleans = {false, true})
129-
void testReadLogTableInStreamingMode(boolean isPartitioned) throws Exception {
131+
void testReadLogTableInStreamMode(boolean isPartitioned) throws Exception {
130132
// first of all, start tiering
131133
JobClient jobClient = buildTieringJob(execEnv);
132134

133135
String tableName = "stream_logTable_" + (isPartitioned ? "partitioned" : "non_partitioned");
134136

135137
TablePath t1 = TablePath.of(DEFAULT_DB, tableName);
136-
List<Row> writtenRows = new ArrayList<>();
138+
List<Row> writtenRows = new LinkedList<>();
137139
long tableId = prepareLogTable(t1, DEFAULT_BUCKET_NUM, isPartitioned, writtenRows);
138140
// wait until records has been synced
139141
waitUntilBucketSynced(t1, tableId, DEFAULT_BUCKET_NUM, isPartitioned);
@@ -145,8 +147,6 @@ void testReadLogTableInStreamingMode(boolean isPartitioned) throws Exception {
145147
streamTEnv.executeSql("select * from " + tableName).collect();
146148
assertResultsIgnoreOrder(
147149
actual, writtenRows.stream().map(Row::toString).collect(Collectors.toList()), true);
148-
// assertQueryResultExactOrder(streamTEnv, "select * from " + tableName,
149-
// writtenRows.stream().map(Row::toString).collect(Collectors.toList()));
150150

151151
// can database sync job
152152
jobClient.cancel().get();
@@ -161,14 +161,39 @@ void testReadLogTableInStreamingMode(boolean isPartitioned) throws Exception {
161161
actual, writtenRows.stream().map(Row::toString).collect(Collectors.toList()), true);
162162
}
163163

164+
@Test
165+
void testReadLogTableInStreamDiscoveryPartitionedMode() throws Exception {
166+
// first of all, start tiering
167+
JobClient jobClient = buildTieringJob(execEnv);
168+
169+
String tableName = "stream_logTable_discovery_partitioned";
170+
171+
TablePath t1 = TablePath.of(DEFAULT_DB, tableName);
172+
List<Row> writtenRows = new LinkedList<>();
173+
long tableId = prepareLogTable(t1, DEFAULT_BUCKET_NUM, true, writtenRows);
174+
// wait until records has been synced
175+
waitUntilBucketSynced(t1, tableId, DEFAULT_BUCKET_NUM, true);
176+
177+
CloseableIterator<Row> actual =
178+
streamTEnv.executeSql("select * from " + tableName).collect();
179+
assertResultsIgnoreOrder(
180+
actual, writtenRows.stream().map(Row::toString).collect(Collectors.toList()), true);
181+
182+
writtenRows.addAll(writeFullTypeRows(t1, 10, "2027"));
183+
actual = streamTEnv.executeSql("select * from " + tableName).collect();
184+
assertResultsIgnoreOrder(
185+
actual, writtenRows.stream().map(Row::toString).collect(Collectors.toList()), true);
186+
}
187+
164188
private long prepareLogTable(
165189
TablePath tablePath, int bucketNum, boolean isPartitioned, List<Row> flinkRows)
166190
throws Exception {
167191
// createFullTypeLogTable creates a datalake-enabled table with a partition column.
168192
long t1Id = createFullTypeLogTable(tablePath, bucketNum, isPartitioned);
169193
if (isPartitioned) {
170194
Map<Long, String> partitionNameById = waitUntilPartitions(tablePath);
171-
for (String partition : partitionNameById.values()) {
195+
for (String partition :
196+
partitionNameById.values().stream().sorted().collect(Collectors.toList())) {
172197
for (int i = 0; i < 3; i++) {
173198
flinkRows.addAll(writeFullTypeRows(tablePath, 10, partition));
174199
}

0 commit comments

Comments
 (0)