28
28
import org .apache .flink .util .CloseableIterator ;
29
29
import org .apache .flink .util .CollectionUtil ;
30
30
import org .junit .jupiter .api .BeforeAll ;
31
+ import org .junit .jupiter .api .Test ;
31
32
import org .junit .jupiter .params .ParameterizedTest ;
32
33
import org .junit .jupiter .params .provider .ValueSource ;
33
34
37
38
import java .time .LocalDateTime ;
38
39
import java .time .ZoneId ;
39
40
import java .util .ArrayList ;
41
+ import java .util .LinkedList ;
40
42
import java .util .List ;
41
43
import java .util .Map ;
42
44
import java .util .stream .Collectors ;
@@ -126,14 +128,14 @@ void testReadLogTableFullType(boolean isPartitioned) throws Exception {
126
128
127
129
@ ParameterizedTest
128
130
@ ValueSource (booleans = {false , true })
129
- void testReadLogTableInStreamingMode (boolean isPartitioned ) throws Exception {
131
+ void testReadLogTableInStreamMode (boolean isPartitioned ) throws Exception {
130
132
// first of all, start tiering
131
133
JobClient jobClient = buildTieringJob (execEnv );
132
134
133
135
String tableName = "stream_logTable_" + (isPartitioned ? "partitioned" : "non_partitioned" );
134
136
135
137
TablePath t1 = TablePath .of (DEFAULT_DB , tableName );
136
- List <Row > writtenRows = new ArrayList <>();
138
+ List <Row > writtenRows = new LinkedList <>();
137
139
long tableId = prepareLogTable (t1 , DEFAULT_BUCKET_NUM , isPartitioned , writtenRows );
138
140
// wait until records has been synced
139
141
waitUntilBucketSynced (t1 , tableId , DEFAULT_BUCKET_NUM , isPartitioned );
@@ -145,8 +147,6 @@ void testReadLogTableInStreamingMode(boolean isPartitioned) throws Exception {
145
147
streamTEnv .executeSql ("select * from " + tableName ).collect ();
146
148
assertResultsIgnoreOrder (
147
149
actual , writtenRows .stream ().map (Row ::toString ).collect (Collectors .toList ()), true );
148
- // assertQueryResultExactOrder(streamTEnv, "select * from " + tableName,
149
- // writtenRows.stream().map(Row::toString).collect(Collectors.toList()));
150
150
151
151
// can database sync job
152
152
jobClient .cancel ().get ();
@@ -161,14 +161,39 @@ void testReadLogTableInStreamingMode(boolean isPartitioned) throws Exception {
161
161
actual , writtenRows .stream ().map (Row ::toString ).collect (Collectors .toList ()), true );
162
162
}
163
163
164
+ @ Test
165
+ void testReadLogTableInStreamDiscoveryPartitionedMode () throws Exception {
166
+ // first of all, start tiering
167
+ JobClient jobClient = buildTieringJob (execEnv );
168
+
169
+ String tableName = "stream_logTable_discovery_partitioned" ;
170
+
171
+ TablePath t1 = TablePath .of (DEFAULT_DB , tableName );
172
+ List <Row > writtenRows = new LinkedList <>();
173
+ long tableId = prepareLogTable (t1 , DEFAULT_BUCKET_NUM , true , writtenRows );
174
+ // wait until records has been synced
175
+ waitUntilBucketSynced (t1 , tableId , DEFAULT_BUCKET_NUM , true );
176
+
177
+ CloseableIterator <Row > actual =
178
+ streamTEnv .executeSql ("select * from " + tableName ).collect ();
179
+ assertResultsIgnoreOrder (
180
+ actual , writtenRows .stream ().map (Row ::toString ).collect (Collectors .toList ()), true );
181
+
182
+ writtenRows .addAll (writeFullTypeRows (t1 , 10 , "2027" ));
183
+ actual = streamTEnv .executeSql ("select * from " + tableName ).collect ();
184
+ assertResultsIgnoreOrder (
185
+ actual , writtenRows .stream ().map (Row ::toString ).collect (Collectors .toList ()), true );
186
+ }
187
+
164
188
private long prepareLogTable (
165
189
TablePath tablePath , int bucketNum , boolean isPartitioned , List <Row > flinkRows )
166
190
throws Exception {
167
191
// createFullTypeLogTable creates a datalake-enabled table with a partition column.
168
192
long t1Id = createFullTypeLogTable (tablePath , bucketNum , isPartitioned );
169
193
if (isPartitioned ) {
170
194
Map <Long , String > partitionNameById = waitUntilPartitions (tablePath );
171
- for (String partition : partitionNameById .values ()) {
195
+ for (String partition :
196
+ partitionNameById .values ().stream ().sorted ().collect (Collectors .toList ())) {
172
197
for (int i = 0 ; i < 3 ; i ++) {
173
198
flinkRows .addAll (writeFullTypeRows (tablePath , 10 , partition ));
174
199
}
0 commit comments