Skip to content

Commit 9ffbf7e

Browse files
committed
[flink] Union read in stream mode should support
1 parent 7937996 commit 9ffbf7e

File tree

10 files changed

+346
-34
lines changed

10 files changed

+346
-34
lines changed

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkTableSource.java

Lines changed: 48 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,12 @@
3232
import org.apache.fluss.lake.source.LakeSplit;
3333
import org.apache.fluss.metadata.MergeEngineType;
3434
import org.apache.fluss.metadata.TablePath;
35+
import org.apache.fluss.predicate.GreaterOrEqual;
36+
import org.apache.fluss.predicate.LeafPredicate;
3537
import org.apache.fluss.predicate.Predicate;
3638
import org.apache.fluss.predicate.PredicateBuilder;
39+
import org.apache.fluss.row.TimestampLtz;
40+
import org.apache.fluss.types.DataTypes;
3741
import org.apache.fluss.types.RowType;
3842

3943
import org.apache.flink.annotation.VisibleForTesting;
@@ -86,7 +90,9 @@
8690
import static org.apache.fluss.flink.utils.PushdownUtils.ValueConversion.FLINK_INTERNAL_VALUE;
8791
import static org.apache.fluss.flink.utils.PushdownUtils.ValueConversion.FLUSS_INTERNAL_VALUE;
8892
import static org.apache.fluss.flink.utils.PushdownUtils.extractFieldEquals;
93+
import static org.apache.fluss.metadata.TableDescriptor.TIMESTAMP_COLUMN_NAME;
8994
import static org.apache.fluss.utils.Preconditions.checkNotNull;
95+
import static org.apache.fluss.utils.Preconditions.checkState;
9096

9197
/** Flink table source to scan Fluss data. */
9298
public class FlinkTableSource
@@ -258,19 +264,32 @@ public boolean isBounded() {
258264
flussRowType = flussRowType.project(projectedFields);
259265
}
260266
OffsetsInitializer offsetsInitializer;
267+
boolean enableLakeSource = lakeSource != null;
261268
switch (startupOptions.startupMode) {
262269
case EARLIEST:
263270
offsetsInitializer = OffsetsInitializer.earliest();
264271
break;
265272
case LATEST:
266273
offsetsInitializer = OffsetsInitializer.latest();
274+
// since it's scan from latest, don't consider lake data
275+
enableLakeSource = false;
267276
break;
268277
case FULL:
269278
offsetsInitializer = OffsetsInitializer.full();
270279
break;
271280
case TIMESTAMP:
272281
offsetsInitializer =
273282
OffsetsInitializer.timestamp(startupOptions.startupTimestampMs);
283+
if (hasPrimaryKey()) {
284+
// currently, for primary key table, don't consider lake data
285+
// when read from a given timestamp, todo: consider support it?
286+
enableLakeSource = false;
287+
} else {
288+
if (enableLakeSource) {
289+
enableLakeSource =
290+
pushTimeStampFilterToLakeSource(lakeSource, flussRowType);
291+
}
292+
}
274293
break;
275294
default:
276295
throw new IllegalArgumentException(
@@ -290,7 +309,7 @@ public boolean isBounded() {
290309
new RowDataDeserializationSchema(),
291310
streaming,
292311
partitionFilters,
293-
lakeSource);
312+
enableLakeSource ? lakeSource : null);
294313

295314
if (!streaming) {
296315
// return a bounded source provide to make planner happy,
@@ -321,6 +340,34 @@ public boolean isBounded() {
321340
}
322341
}
323342

343+
private boolean pushTimeStampFilterToLakeSource(
344+
LakeSource<?> lakeSource, RowType flussRowType) {
345+
// will push timestamp to lake
346+
Predicate timestampFilter =
347+
new LeafPredicate(
348+
GreaterOrEqual.INSTANCE,
349+
DataTypes.TIMESTAMP_LTZ(),
350+
// the timestamp index
351+
flussRowType.getFieldCount() + 2,
352+
TIMESTAMP_COLUMN_NAME,
353+
Collections.singletonList(
354+
TimestampLtz.fromEpochMillis(startupOptions.startupTimestampMs)));
355+
List<Predicate> acceptedPredicates =
356+
lakeSource
357+
.withFilters(Collections.singletonList(timestampFilter))
358+
.acceptedPredicates();
359+
if (acceptedPredicates.isEmpty()) {
360+
LOG.warn(
361+
"The lake source doesn't accept the filter {}, won't read data from lake.",
362+
timestampFilter);
363+
return false;
364+
}
365+
checkState(
366+
acceptedPredicates.size() == 1
367+
&& acceptedPredicates.get(0).equals(timestampFilter));
368+
return true;
369+
}
370+
324371
@Override
325372
public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext context) {
326373
LookupNormalizer lookupNormalizer =

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/LakeSourceUtils.java

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,14 +26,26 @@
2626
import org.apache.fluss.lake.source.LakeSplit;
2727
import org.apache.fluss.metadata.TablePath;
2828

29+
import org.slf4j.Logger;
30+
import org.slf4j.LoggerFactory;
31+
32+
import javax.annotation.Nullable;
33+
2934
import java.util.Map;
3035

3136
import static org.apache.fluss.utils.Preconditions.checkNotNull;
3237

3338
/** Utils for create lake source. */
3439
public class LakeSourceUtils {
3540

41+
public static final Logger LOG = LoggerFactory.getLogger(LakeSourceUtils.class);
42+
43+
/**
44+
* Return the lake source of the given table. Return null when the lake storage doesn't support
45+
* create lake source.
46+
*/
3647
@SuppressWarnings("unchecked")
48+
@Nullable
3749
public static LakeSource<LakeSplit> createLakeSource(
3850
TablePath tablePath, Map<String, String> properties) {
3951
Map<String, String> catalogProperties =
@@ -47,6 +59,13 @@ public static LakeSource<LakeSplit> createLakeSource(
4759
LakeStoragePlugin lakeStoragePlugin =
4860
LakeStoragePluginSetUp.fromDataLakeFormat(dataLake, null);
4961
LakeStorage lakeStorage = checkNotNull(lakeStoragePlugin).createLakeStorage(lakeConfig);
50-
return (LakeSource<LakeSplit>) lakeStorage.createLakeSource(tablePath);
62+
try {
63+
return (LakeSource<LakeSplit>) lakeStorage.createLakeSource(tablePath);
64+
} catch (UnsupportedOperationException e) {
65+
LOG.info(
66+
"method createLakeSource throw UnsupportedOperationException for datalake format {}, return null as lakeSource to disable reading from lake source.",
67+
dataLake);
68+
return null;
69+
}
5170
}
5271
}

fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/testutils/FlinkRowAssertionsUtils.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323

2424
import java.util.ArrayList;
2525
import java.util.List;
26+
import java.util.stream.Collectors;
2627

2728
import static org.assertj.core.api.Assertions.assertThat;
2829

@@ -31,6 +32,14 @@ public class FlinkRowAssertionsUtils {
3132

3233
private FlinkRowAssertionsUtils() {}
3334

35+
public static void assertRowResultsIgnoreOrder(
36+
CloseableIterator<Row> actual, List<Row> expectedRows, boolean closeIterator) {
37+
assertResultsIgnoreOrder(
38+
actual,
39+
expectedRows.stream().map(Row::toString).collect(Collectors.toList()),
40+
closeIterator);
41+
}
42+
3443
public static void assertResultsIgnoreOrder(
3544
CloseableIterator<Row> iterator, List<String> expected, boolean closeIterator) {
3645
List<String> actual = collectRowsWithTimeout(iterator, expected.size(), closeIterator);
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,194 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.fluss.lake.paimon.flink;
20+
21+
import org.apache.fluss.client.ConnectionFactory;
22+
import org.apache.fluss.config.ConfigOptions;
23+
import org.apache.fluss.config.Configuration;
24+
import org.apache.fluss.config.MemorySize;
25+
import org.apache.fluss.lake.paimon.testutils.FlinkPaimonTieringTestBase;
26+
import org.apache.fluss.metadata.TableBucket;
27+
import org.apache.fluss.metadata.TablePath;
28+
import org.apache.fluss.row.InternalRow;
29+
import org.apache.fluss.server.replica.Replica;
30+
import org.apache.fluss.server.testutils.FlussClusterExtension;
31+
import org.apache.fluss.utils.clock.ManualClock;
32+
33+
import org.apache.flink.core.execution.JobClient;
34+
import org.apache.flink.table.api.EnvironmentSettings;
35+
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
36+
import org.apache.flink.types.Row;
37+
import org.apache.flink.util.CloseableIterator;
38+
import org.junit.jupiter.api.BeforeAll;
39+
import org.junit.jupiter.api.BeforeEach;
40+
import org.junit.jupiter.api.Test;
41+
import org.junit.jupiter.api.extension.RegisterExtension;
42+
43+
import java.time.Duration;
44+
import java.util.ArrayList;
45+
import java.util.List;
46+
import java.util.stream.Collectors;
47+
48+
import static org.apache.flink.core.testutils.CommonTestUtils.waitUtil;
49+
import static org.apache.fluss.flink.FlinkConnectorOptions.BOOTSTRAP_SERVERS;
50+
import static org.apache.fluss.flink.source.testutils.FlinkRowAssertionsUtils.assertRowResultsIgnoreOrder;
51+
import static org.apache.fluss.testutils.DataTestUtils.row;
52+
53+
/** The ITCase for Flink union read from a timestamp. */
54+
class FlinkUnionReadFromTimestampITCase extends FlinkPaimonTieringTestBase {
55+
56+
private static final ManualClock CLOCK = new ManualClock();
57+
58+
@RegisterExtension
59+
public static FlussClusterExtension FLUSS_CLUSTER_EXTENSION =
60+
FlussClusterExtension.builder()
61+
.setClusterConf(initConfig())
62+
.setNumOfTabletServers(3)
63+
.setClock(CLOCK)
64+
.build();
65+
66+
private StreamTableEnvironment streamTEnv;
67+
68+
protected static Configuration initConfig() {
69+
Configuration configuration = FlinkPaimonTieringTestBase.initConfig();
70+
// set file size to 10b to make log segment roll frequently
71+
configuration.set(ConfigOptions.LOG_SEGMENT_FILE_SIZE, MemorySize.parse("10b"));
72+
configuration.set(ConfigOptions.REMOTE_LOG_TASK_INTERVAL_DURATION, Duration.ofMillis(100));
73+
return configuration;
74+
}
75+
76+
@BeforeAll
77+
static void beforeAll() {
78+
clientConf = FLUSS_CLUSTER_EXTENSION.getClientConfig();
79+
conn = ConnectionFactory.createConnection(clientConf);
80+
admin = conn.getAdmin();
81+
paimonCatalog = getPaimonCatalog();
82+
}
83+
84+
@BeforeEach
85+
public void beforeEach() {
86+
super.beforeEach();
87+
buildStreamTEnv();
88+
}
89+
90+
@Override
91+
protected FlussClusterExtension getFlussClusterExtension() {
92+
return FLUSS_CLUSTER_EXTENSION;
93+
}
94+
95+
@Test
96+
void testUnionReadFromTimestamp() throws Exception {
97+
// first of all, start tiering
98+
JobClient jobClient = buildTieringJob(execEnv);
99+
try {
100+
String tableName = "logTable_read_timestamp";
101+
TablePath tablePath = TablePath.of(DEFAULT_DB, tableName);
102+
long tableId = createLogTable(tablePath, 1);
103+
TableBucket t1Bucket = new TableBucket(tableId, 0);
104+
105+
List<Row> rows = new ArrayList<>();
106+
for (int i = 0; i < 10; i++) {
107+
rows.addAll(writeRows(tablePath, 3));
108+
// each round advance 1s to make sure each round of writing has
109+
// different timestamp
110+
CLOCK.advanceTime(Duration.ofSeconds(1));
111+
}
112+
assertReplicaStatus(t1Bucket, rows.size());
113+
114+
Replica t1Replica = FLUSS_CLUSTER_EXTENSION.waitAndGetLeaderReplica(t1Bucket);
115+
116+
// wait util only 2(default keep 2 segments in local) log segments in local
117+
waitUtil(
118+
() -> t1Replica.getLogTablet().logSegments().size() == 2,
119+
Duration.ofMinutes(1),
120+
"Fail to wait util only 2 segments in local.");
121+
122+
// advance 10 days to mock remote log ttl
123+
CLOCK.advanceTime(Duration.ofDays(10));
124+
// wait util remote log ttl, should can't fetch from remote log for offset 10
125+
waitUtil(
126+
() -> !t1Replica.getLogTablet().canFetchFromRemoteLog(10),
127+
Duration.ofMinutes(1),
128+
"Fail to wait log offset 10 ttl from remote log.");
129+
130+
// verify scan from timestamp 0, should read full data
131+
assertRowResultsIgnoreOrder(
132+
streamTEnv
133+
.executeSql(
134+
"select * from "
135+
+ tableName
136+
+ " /*+ OPTIONS('scan.startup.mode' = 'timestamp',\n"
137+
+ "'scan.startup.timestamp' = '0') */")
138+
.collect(),
139+
rows,
140+
true);
141+
142+
// verify scan from timestamp 2000, shouldn't read the rows written in first two
143+
// rounds,
144+
CloseableIterator<Row> actualRows =
145+
streamTEnv
146+
.executeSql(
147+
"select * from "
148+
+ tableName
149+
+ " /*+ OPTIONS('scan.startup.mode' = 'timestamp',\n"
150+
+ "'scan.startup.timestamp' = '2000') */")
151+
.collect();
152+
List<Row> expectedRows = rows.stream().skip(2 * 3).collect(Collectors.toList());
153+
assertRowResultsIgnoreOrder(actualRows, expectedRows, true);
154+
155+
// verify scan from earliest
156+
assertRowResultsIgnoreOrder(
157+
streamTEnv
158+
.executeSql(
159+
"select * from "
160+
+ tableName
161+
+ " /*+ OPTIONS('scan.startup.mode' = 'earliest') */")
162+
.collect(),
163+
rows,
164+
true);
165+
166+
} finally {
167+
jobClient.cancel();
168+
}
169+
}
170+
171+
private List<Row> writeRows(TablePath tablePath, int rows) throws Exception {
172+
List<InternalRow> writtenRows = new ArrayList<>();
173+
List<Row> flinkRow = new ArrayList<>();
174+
for (int i = 0; i < rows; i++) {
175+
writtenRows.add(row(i, "v" + i));
176+
flinkRow.add(Row.of(i, "v" + i));
177+
}
178+
writeRows(tablePath, writtenRows, true);
179+
return flinkRow;
180+
}
181+
182+
private void buildStreamTEnv() {
183+
String bootstrapServers = String.join(",", clientConf.get(ConfigOptions.BOOTSTRAP_SERVERS));
184+
// create table environment
185+
streamTEnv = StreamTableEnvironment.create(execEnv, EnvironmentSettings.inStreamingMode());
186+
// crate catalog using sql
187+
streamTEnv.executeSql(
188+
String.format(
189+
"create catalog %s with ('type' = 'fluss', '%s' = '%s')",
190+
CATALOG_NAME, BOOTSTRAP_SERVERS.key(), bootstrapServers));
191+
streamTEnv.executeSql("use catalog " + CATALOG_NAME);
192+
streamTEnv.executeSql("use " + DEFAULT_DB);
193+
}
194+
}

0 commit comments

Comments
 (0)