Skip to content

Commit 405ceb0

Browse files
committed
[flink] Union read in stream mode should support
1 parent 7937996 commit 405ceb0

File tree

10 files changed

+335
-28
lines changed

10 files changed

+335
-28
lines changed

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkTableSource.java

Lines changed: 52 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,12 @@
3232
import org.apache.fluss.lake.source.LakeSplit;
3333
import org.apache.fluss.metadata.MergeEngineType;
3434
import org.apache.fluss.metadata.TablePath;
35+
import org.apache.fluss.predicate.GreaterOrEqual;
36+
import org.apache.fluss.predicate.LeafPredicate;
3537
import org.apache.fluss.predicate.Predicate;
3638
import org.apache.fluss.predicate.PredicateBuilder;
39+
import org.apache.fluss.row.TimestampLtz;
40+
import org.apache.fluss.types.DataTypes;
3741
import org.apache.fluss.types.RowType;
3842

3943
import org.apache.flink.annotation.VisibleForTesting;
@@ -86,7 +90,9 @@
8690
import static org.apache.fluss.flink.utils.PushdownUtils.ValueConversion.FLINK_INTERNAL_VALUE;
8791
import static org.apache.fluss.flink.utils.PushdownUtils.ValueConversion.FLUSS_INTERNAL_VALUE;
8892
import static org.apache.fluss.flink.utils.PushdownUtils.extractFieldEquals;
93+
import static org.apache.fluss.metadata.TableDescriptor.TIMESTAMP_COLUMN_NAME;
8994
import static org.apache.fluss.utils.Preconditions.checkNotNull;
95+
import static org.apache.fluss.utils.Preconditions.checkState;
9096

9197
/** Flink table source to scan Fluss data. */
9298
public class FlinkTableSource
@@ -258,19 +264,34 @@ public boolean isBounded() {
258264
flussRowType = flussRowType.project(projectedFields);
259265
}
260266
OffsetsInitializer offsetsInitializer;
267+
boolean enableLakeSource = lakeSource != null;
261268
switch (startupOptions.startupMode) {
262269
case EARLIEST:
263270
offsetsInitializer = OffsetsInitializer.earliest();
264271
break;
265272
case LATEST:
266273
offsetsInitializer = OffsetsInitializer.latest();
274+
// since it's scan from latest, don't consider lake data
275+
enableLakeSource = false;
267276
break;
268277
case FULL:
269278
offsetsInitializer = OffsetsInitializer.full();
270279
break;
271280
case TIMESTAMP:
272281
offsetsInitializer =
273282
OffsetsInitializer.timestamp(startupOptions.startupTimestampMs);
283+
if (hasPrimaryKey()) {
284+
// Currently, for primary key tables, we do not consider lake data
285+
// when reading from a given timestamp. This is because we will need
286+
// to read the change log of primary key table.
287+
// TODO: consider support it using paimon change log data?
288+
enableLakeSource = false;
289+
} else {
290+
if (enableLakeSource) {
291+
enableLakeSource =
292+
pushTimeStampFilterToLakeSource(lakeSource, flussRowType);
293+
}
294+
}
274295
break;
275296
default:
276297
throw new IllegalArgumentException(
@@ -290,7 +311,7 @@ public boolean isBounded() {
290311
new RowDataDeserializationSchema(),
291312
streaming,
292313
partitionFilters,
293-
lakeSource);
314+
enableLakeSource ? lakeSource : null);
294315

295316
if (!streaming) {
296317
// return a bounded source provide to make planner happy,
@@ -321,6 +342,36 @@ public boolean isBounded() {
321342
}
322343
}
323344

345+
private boolean pushTimeStampFilterToLakeSource(
346+
LakeSource<?> lakeSource, RowType flussRowType) {
347+
// will push timestamp to lake
348+
// we will have three additional system columns, __bucket, __offset, __timestamp
349+
// in lake, get the __timestamp index in lake table
350+
final int timestampFieldIndex = flussRowType.getFieldCount() + 2;
351+
Predicate timestampFilter =
352+
new LeafPredicate(
353+
GreaterOrEqual.INSTANCE,
354+
DataTypes.TIMESTAMP_LTZ(),
355+
timestampFieldIndex,
356+
TIMESTAMP_COLUMN_NAME,
357+
Collections.singletonList(
358+
TimestampLtz.fromEpochMillis(startupOptions.startupTimestampMs)));
359+
List<Predicate> acceptedPredicates =
360+
lakeSource
361+
.withFilters(Collections.singletonList(timestampFilter))
362+
.acceptedPredicates();
363+
if (acceptedPredicates.isEmpty()) {
364+
LOG.warn(
365+
"The lake source doesn't accept the filter {}, won't read data from lake.",
366+
timestampFilter);
367+
return false;
368+
}
369+
checkState(
370+
acceptedPredicates.size() == 1
371+
&& acceptedPredicates.get(0).equals(timestampFilter));
372+
return true;
373+
}
374+
324375
@Override
325376
public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext context) {
326377
LookupNormalizer lookupNormalizer =

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/LakeSourceUtils.java

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,14 +26,26 @@
2626
import org.apache.fluss.lake.source.LakeSplit;
2727
import org.apache.fluss.metadata.TablePath;
2828

29+
import org.slf4j.Logger;
30+
import org.slf4j.LoggerFactory;
31+
32+
import javax.annotation.Nullable;
33+
2934
import java.util.Map;
3035

3136
import static org.apache.fluss.utils.Preconditions.checkNotNull;
3237

3338
/** Utils for create lake source. */
3439
public class LakeSourceUtils {
3540

41+
public static final Logger LOG = LoggerFactory.getLogger(LakeSourceUtils.class);
42+
43+
/**
44+
* Return the lake source of the given table. Return null when the lake storage doesn't support
45+
* create lake source.
46+
*/
3647
@SuppressWarnings("unchecked")
48+
@Nullable
3749
public static LakeSource<LakeSplit> createLakeSource(
3850
TablePath tablePath, Map<String, String> properties) {
3951
Map<String, String> catalogProperties =
@@ -47,6 +59,13 @@ public static LakeSource<LakeSplit> createLakeSource(
4759
LakeStoragePlugin lakeStoragePlugin =
4860
LakeStoragePluginSetUp.fromDataLakeFormat(dataLake, null);
4961
LakeStorage lakeStorage = checkNotNull(lakeStoragePlugin).createLakeStorage(lakeConfig);
50-
return (LakeSource<LakeSplit>) lakeStorage.createLakeSource(tablePath);
62+
try {
63+
return (LakeSource<LakeSplit>) lakeStorage.createLakeSource(tablePath);
64+
} catch (UnsupportedOperationException e) {
65+
LOG.info(
66+
"method createLakeSource throw UnsupportedOperationException for datalake format {}, return null as lakeSource to disable reading from lake source.",
67+
dataLake);
68+
return null;
69+
}
5170
}
5271
}

fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/testutils/FlinkRowAssertionsUtils.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323

2424
import java.util.ArrayList;
2525
import java.util.List;
26+
import java.util.stream.Collectors;
2627

2728
import static org.assertj.core.api.Assertions.assertThat;
2829

@@ -31,6 +32,14 @@ public class FlinkRowAssertionsUtils {
3132

3233
private FlinkRowAssertionsUtils() {}
3334

35+
public static void assertRowResultsIgnoreOrder(
36+
CloseableIterator<Row> actual, List<Row> expectedRows, boolean closeIterator) {
37+
assertResultsIgnoreOrder(
38+
actual,
39+
expectedRows.stream().map(Row::toString).collect(Collectors.toList()),
40+
closeIterator);
41+
}
42+
3443
public static void assertResultsIgnoreOrder(
3544
CloseableIterator<Row> iterator, List<String> expected, boolean closeIterator) {
3645
List<String> actual = collectRowsWithTimeout(iterator, expected.size(), closeIterator);
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,190 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.fluss.lake.paimon.flink;
20+
21+
import org.apache.fluss.config.ConfigOptions;
22+
import org.apache.fluss.config.Configuration;
23+
import org.apache.fluss.config.MemorySize;
24+
import org.apache.fluss.lake.paimon.testutils.FlinkPaimonTieringTestBase;
25+
import org.apache.fluss.metadata.TableBucket;
26+
import org.apache.fluss.metadata.TablePath;
27+
import org.apache.fluss.row.InternalRow;
28+
import org.apache.fluss.server.replica.Replica;
29+
import org.apache.fluss.server.testutils.FlussClusterExtension;
30+
import org.apache.fluss.utils.clock.ManualClock;
31+
32+
import org.apache.flink.core.execution.JobClient;
33+
import org.apache.flink.table.api.EnvironmentSettings;
34+
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
35+
import org.apache.flink.types.Row;
36+
import org.apache.flink.util.CloseableIterator;
37+
import org.junit.jupiter.api.BeforeAll;
38+
import org.junit.jupiter.api.BeforeEach;
39+
import org.junit.jupiter.api.Test;
40+
import org.junit.jupiter.api.extension.RegisterExtension;
41+
42+
import java.time.Duration;
43+
import java.util.ArrayList;
44+
import java.util.List;
45+
import java.util.stream.Collectors;
46+
47+
import static org.apache.flink.core.testutils.CommonTestUtils.waitUtil;
48+
import static org.apache.fluss.flink.FlinkConnectorOptions.BOOTSTRAP_SERVERS;
49+
import static org.apache.fluss.flink.source.testutils.FlinkRowAssertionsUtils.assertRowResultsIgnoreOrder;
50+
import static org.apache.fluss.testutils.DataTestUtils.row;
51+
52+
/** The ITCase for Flink union read from a timestamp. */
53+
class FlinkUnionReadFromTimestampITCase extends FlinkPaimonTieringTestBase {
54+
55+
private static final ManualClock CLOCK = new ManualClock();
56+
57+
@RegisterExtension
58+
public static final FlussClusterExtension FLUSS_CLUSTER_EXTENSION =
59+
FlussClusterExtension.builder()
60+
.setClusterConf(initConfig())
61+
.setNumOfTabletServers(3)
62+
.setClock(CLOCK)
63+
.build();
64+
65+
private StreamTableEnvironment streamTEnv;
66+
67+
protected static Configuration initConfig() {
68+
Configuration configuration = FlinkPaimonTieringTestBase.initConfig();
69+
// set file size to 10b to make log segment roll frequently
70+
configuration.set(ConfigOptions.LOG_SEGMENT_FILE_SIZE, MemorySize.parse("10b"));
71+
configuration.set(ConfigOptions.REMOTE_LOG_TASK_INTERVAL_DURATION, Duration.ofMillis(100));
72+
return configuration;
73+
}
74+
75+
@BeforeAll
76+
static void beforeAll() {
77+
FlinkPaimonTieringTestBase.beforeAll(FLUSS_CLUSTER_EXTENSION.getClientConfig());
78+
}
79+
80+
@BeforeEach
81+
public void beforeEach() {
82+
super.beforeEach();
83+
buildStreamTEnv();
84+
}
85+
86+
@Override
87+
protected FlussClusterExtension getFlussClusterExtension() {
88+
return FLUSS_CLUSTER_EXTENSION;
89+
}
90+
91+
@Test
92+
void testUnionReadFromTimestamp() throws Exception {
93+
// first of all, start tiering
94+
JobClient jobClient = buildTieringJob(execEnv);
95+
try {
96+
String tableName = "logTable_read_timestamp";
97+
TablePath tablePath = TablePath.of(DEFAULT_DB, tableName);
98+
long tableId = createLogTable(tablePath, 1);
99+
TableBucket t1Bucket = new TableBucket(tableId, 0);
100+
101+
List<Row> rows = new ArrayList<>();
102+
for (int i = 0; i < 10; i++) {
103+
rows.addAll(writeRows(tablePath, 3));
104+
// each round advance 1s to make sure each round of writing has
105+
// different timestamp
106+
CLOCK.advanceTime(Duration.ofSeconds(1));
107+
}
108+
assertReplicaStatus(t1Bucket, rows.size());
109+
110+
Replica t1Replica = FLUSS_CLUSTER_EXTENSION.waitAndGetLeaderReplica(t1Bucket);
111+
112+
// wait util only 2(default keep 2 segments in local) log segments in local
113+
waitUtil(
114+
() -> t1Replica.getLogTablet().logSegments().size() == 2,
115+
Duration.ofMinutes(1),
116+
"Fail to wait util only 2 segments in local.");
117+
118+
// advance 10 days to mock remote log ttl
119+
CLOCK.advanceTime(Duration.ofDays(10));
120+
// wait util remote log ttl, should can't fetch from remote log for offset 10
121+
waitUtil(
122+
() -> !t1Replica.getLogTablet().canFetchFromRemoteLog(10),
123+
Duration.ofMinutes(1),
124+
"Fail to wait log offset 10 ttl from remote log.");
125+
126+
// verify scan from timestamp 0, should read full data
127+
assertRowResultsIgnoreOrder(
128+
streamTEnv
129+
.executeSql(
130+
"select * from "
131+
+ tableName
132+
+ " /*+ OPTIONS('scan.startup.mode' = 'timestamp',\n"
133+
+ "'scan.startup.timestamp' = '0') */")
134+
.collect(),
135+
rows,
136+
true);
137+
138+
// verify scan from timestamp 2000, shouldn't read the rows written in first two
139+
// rounds,
140+
CloseableIterator<Row> actualRows =
141+
streamTEnv
142+
.executeSql(
143+
"select * from "
144+
+ tableName
145+
+ " /*+ OPTIONS('scan.startup.mode' = 'timestamp',\n"
146+
+ "'scan.startup.timestamp' = '2000') */")
147+
.collect();
148+
List<Row> expectedRows = rows.stream().skip(2 * 3).collect(Collectors.toList());
149+
assertRowResultsIgnoreOrder(actualRows, expectedRows, true);
150+
151+
// verify scan from earliest
152+
assertRowResultsIgnoreOrder(
153+
streamTEnv
154+
.executeSql(
155+
"select * from "
156+
+ tableName
157+
+ " /*+ OPTIONS('scan.startup.mode' = 'earliest') */")
158+
.collect(),
159+
rows,
160+
true);
161+
162+
} finally {
163+
jobClient.cancel();
164+
}
165+
}
166+
167+
private List<Row> writeRows(TablePath tablePath, int rows) throws Exception {
168+
List<InternalRow> writtenRows = new ArrayList<>();
169+
List<Row> flinkRow = new ArrayList<>();
170+
for (int i = 0; i < rows; i++) {
171+
writtenRows.add(row(i, "v" + i));
172+
flinkRow.add(Row.of(i, "v" + i));
173+
}
174+
writeRows(tablePath, writtenRows, true);
175+
return flinkRow;
176+
}
177+
178+
private void buildStreamTEnv() {
179+
String bootstrapServers = String.join(",", clientConf.get(ConfigOptions.BOOTSTRAP_SERVERS));
180+
// create table environment
181+
streamTEnv = StreamTableEnvironment.create(execEnv, EnvironmentSettings.inStreamingMode());
182+
// crate catalog using sql
183+
streamTEnv.executeSql(
184+
String.format(
185+
"create catalog %s with ('type' = 'fluss', '%s' = '%s')",
186+
CATALOG_NAME, BOOTSTRAP_SERVERS.key(), bootstrapServers));
187+
streamTEnv.executeSql("use catalog " + CATALOG_NAME);
188+
streamTEnv.executeSql("use " + DEFAULT_DB);
189+
}
190+
}

0 commit comments

Comments
 (0)