Skip to content

Commit d4dfb45

Browse files
authored
[Iceberg/lake] Iceberg supports append-only non-partitioned table (#1524)
1 parent 49d6ae8 commit d4dfb45

File tree

26 files changed

+1505
-43
lines changed

26 files changed

+1505
-43
lines changed

fluss-common/src/main/java/com/alibaba/fluss/lake/writer/LakeTieringFactory.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,8 @@
3737
@PublicEvolving
3838
public interface LakeTieringFactory<WriteResult, CommittableT> extends Serializable {
3939

40+
String FLUSS_LAKE_TIERING_COMMIT_USER = "__fluss_lake_tiering";
41+
4042
/**
4143
* Creates a lake writer to write Fluss's rows to Paimon/Iceberg rows.
4244
*

fluss-common/src/test/java/com/alibaba/fluss/lake/lakestorage/LakeStorageTest.java

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,12 @@
11
/*
2-
* Copyright (c) 2025 Alibaba Group Holding Ltd.
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
38
*
4-
* Licensed under the Apache License, Version 2.0 (the "License");
5-
* you may not use this file except in compliance with the License.
6-
* You may obtain a copy of the License at
7-
*
8-
* http://www.apache.org/licenses/LICENSE-2.0
9+
* http://www.apache.org/licenses/LICENSE-2.0
910
*
1011
* Unless required by applicable law or agreed to in writing, software
1112
* distributed under the License is distributed on an "AS IS" BASIS,

fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/tiering/source/enumerator/TieringSourceEnumerator.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -203,7 +203,7 @@ public void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) {
203203
Long tieringEpoch = tieringTableEpochs.remove(failedTableId);
204204
LOG.info(
205205
"Tiering table {} is failed, fail reason is {}.",
206-
tieringEpoch,
206+
failedTableId,
207207
failedEvent.failReason());
208208
if (tieringEpoch == null) {
209209
// shouldn't happen, warn it

fluss-lake/fluss-lake-iceberg/pom.xml

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,11 +38,87 @@
3838
<artifactId>iceberg-core</artifactId>
3939
<version>${iceberg.version}</version>
4040
</dependency>
41+
<dependency>
42+
<groupId>org.apache.iceberg</groupId>
43+
<artifactId>iceberg-data</artifactId>
44+
<version>${iceberg.version}</version>
45+
</dependency>
46+
<dependency>
47+
<groupId>org.apache.iceberg</groupId>
48+
<artifactId>iceberg-parquet</artifactId>
49+
<version>${iceberg.version}</version>
50+
</dependency>
4151
<dependency>
4252
<groupId>com.alibaba.fluss</groupId>
4353
<artifactId>fluss-common</artifactId>
4454
<version>${project.version}</version>
4555
</dependency>
56+
<dependency>
57+
<groupId>com.alibaba.fluss</groupId>
58+
<artifactId>fluss-flink-common</artifactId>
59+
<version>${project.version}</version>
60+
<scope>test</scope>
61+
</dependency>
62+
<dependency>
63+
<groupId>org.apache.hadoop</groupId>
64+
<artifactId>hadoop-mapreduce-client-core</artifactId>
65+
<version>2.8.5</version>
66+
<scope>test</scope>
67+
</dependency>
68+
69+
<dependency>
70+
<groupId>org.apache.hadoop</groupId>
71+
<artifactId>hadoop-hdfs-client</artifactId>
72+
<version>${fluss.hadoop.version}</version>
73+
<scope>provided</scope>
74+
</dependency>
75+
<dependency>
76+
<groupId>org.apache.hadoop</groupId>
77+
<artifactId>hadoop-common</artifactId>
78+
<scope>provided</scope>
79+
<exclusions>
80+
<exclusion>
81+
<artifactId>avro</artifactId>
82+
<groupId>org.apache.avro</groupId>
83+
</exclusion>
84+
<exclusion>
85+
<artifactId>log4j</artifactId>
86+
<groupId>log4j</groupId>
87+
</exclusion>
88+
<exclusion>
89+
<artifactId>slf4j-log4j12</artifactId>
90+
<groupId>org.slf4j</groupId>
91+
</exclusion>
92+
<exclusion>
93+
<groupId>ch.qos.reload4j</groupId>
94+
<artifactId>reload4j</artifactId>
95+
</exclusion>
96+
<exclusion>
97+
<groupId>org.slf4j</groupId>
98+
<artifactId>slf4j-reload4j</artifactId>
99+
</exclusion>
100+
<exclusion>
101+
<artifactId>jdk.tools</artifactId>
102+
<groupId>jdk.tools</groupId>
103+
</exclusion>
104+
<exclusion>
105+
<artifactId>protobuf-java</artifactId>
106+
<groupId>com.google.protobuf</groupId>
107+
</exclusion>
108+
<exclusion>
109+
<artifactId>commons-io</artifactId>
110+
<groupId>commons-io</groupId>
111+
</exclusion>
112+
</exclusions>
113+
</dependency>
114+
115+
<!-- Flink test dependency -->
116+
<dependency>
117+
<groupId>com.alibaba.fluss</groupId>
118+
<artifactId>fluss-flink-${flink.major.version}</artifactId>
119+
<scope>test</scope>
120+
</dependency>
121+
46122
</dependencies>
47123

48124
<build>

fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/FlussDataTypeToIcebergDataType.java

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,12 @@
11
/*
2-
* Copyright (c) 2025 Alibaba Group Holding Ltd.
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
38
*
4-
* Licensed under the Apache License, Version 2.0 (the "License");
5-
* you may not use this file except in compliance with the License.
6-
* You may obtain a copy of the License at
7-
*
8-
* http://www.apache.org/licenses/LICENSE-2.0
9+
* http://www.apache.org/licenses/LICENSE-2.0
910
*
1011
* Unless required by applicable law or agreed to in writing, software
1112
* distributed under the License is distributed on an "AS IS" BASIS,

fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/IcebergLakeCatalog.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -85,8 +85,11 @@ private Catalog createIcebergCatalog(Configuration configuration) {
8585
String catalogName = icebergProps.getOrDefault("name", "fluss-iceberg-catalog");
8686

8787
return buildIcebergCatalog(
88-
catalogName, icebergProps, null // Optional: pass Hadoop configuration if available
89-
);
88+
catalogName,
89+
icebergProps, // todo: current is an empty configuration, need to init from env or
90+
// fluss
91+
// configurations
92+
new org.apache.hadoop.conf.Configuration());
9093
}
9194

9295
@Override

fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/IcebergLakeStorage.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package com.alibaba.fluss.lake.iceberg;
1919

2020
import com.alibaba.fluss.config.Configuration;
21+
import com.alibaba.fluss.lake.iceberg.tiering.IcebergLakeTieringFactory;
2122
import com.alibaba.fluss.lake.lakestorage.LakeStorage;
2223
import com.alibaba.fluss.lake.source.LakeSource;
2324
import com.alibaba.fluss.lake.writer.LakeTieringFactory;
@@ -34,7 +35,7 @@ public IcebergLakeStorage(Configuration configuration) {
3435

3536
@Override
3637
public LakeTieringFactory<?, ?> createLakeTieringFactory() {
37-
throw new UnsupportedOperationException("Not implemented");
38+
return new IcebergLakeTieringFactory(icebergConfig);
3839
}
3940

4041
@Override
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,209 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package com.alibaba.fluss.lake.iceberg.tiering;
20+
21+
import com.alibaba.fluss.record.LogRecord;
22+
import com.alibaba.fluss.row.InternalRow;
23+
import com.alibaba.fluss.types.BigIntType;
24+
import com.alibaba.fluss.types.BinaryType;
25+
import com.alibaba.fluss.types.BooleanType;
26+
import com.alibaba.fluss.types.DataType;
27+
import com.alibaba.fluss.types.DateType;
28+
import com.alibaba.fluss.types.DecimalType;
29+
import com.alibaba.fluss.types.DoubleType;
30+
import com.alibaba.fluss.types.FloatType;
31+
import com.alibaba.fluss.types.IntType;
32+
import com.alibaba.fluss.types.LocalZonedTimestampType;
33+
import com.alibaba.fluss.types.RowType;
34+
import com.alibaba.fluss.types.SmallIntType;
35+
import com.alibaba.fluss.types.StringType;
36+
import com.alibaba.fluss.types.TimeType;
37+
import com.alibaba.fluss.types.TimestampType;
38+
import com.alibaba.fluss.types.TinyIntType;
39+
import com.alibaba.fluss.utils.DateTimeUtils;
40+
41+
import org.apache.iceberg.Schema;
42+
import org.apache.iceberg.data.Record;
43+
import org.apache.iceberg.types.Types;
44+
45+
import java.nio.ByteBuffer;
46+
import java.time.Instant;
47+
import java.time.OffsetDateTime;
48+
import java.time.ZoneOffset;
49+
import java.util.Map;
50+
51+
import static com.alibaba.fluss.utils.Preconditions.checkState;
52+
53+
/**
54+
* Wrap Fluss {@link LogRecord} as Iceberg {@link Record}.
55+
*
56+
* <p>todo: refactor to implement ParquetWriters, OrcWriters, AvroWriters just like Flink & Spark
57+
* write to iceberg for higher performance
58+
*/
59+
public class FlussRecordAsIcebergRecord implements Record {
60+
61+
// Lake table for iceberg will append three system columns: __bucket, __offset,__timestamp
62+
private static final int LAKE_ICEBERG_SYSTEM_COLUMNS = 3;
63+
64+
private LogRecord logRecord;
65+
private final int bucket;
66+
private final Schema icebergSchema;
67+
private final RowType flussRowType;
68+
69+
// the origin row fields in fluss, excluding the system columns in iceberg
70+
private int originRowFieldCount;
71+
private InternalRow internalRow;
72+
73+
public FlussRecordAsIcebergRecord(int bucket, Schema icebergSchema, RowType flussRowType) {
74+
this.bucket = bucket;
75+
this.icebergSchema = icebergSchema;
76+
this.flussRowType = flussRowType;
77+
}
78+
79+
public void setFlussRecord(LogRecord logRecord) {
80+
this.logRecord = logRecord;
81+
this.internalRow = logRecord.getRow();
82+
this.originRowFieldCount = internalRow.getFieldCount();
83+
checkState(
84+
originRowFieldCount
85+
== icebergSchema.asStruct().fields().size() - LAKE_ICEBERG_SYSTEM_COLUMNS,
86+
"The Iceberg table fields count must equals to LogRecord's fields count.");
87+
}
88+
89+
@Override
90+
public Types.StructType struct() {
91+
return icebergSchema.asStruct();
92+
}
93+
94+
@Override
95+
public Object getField(String name) {
96+
return icebergSchema;
97+
}
98+
99+
@Override
100+
public void setField(String name, Object value) {
101+
throw new UnsupportedOperationException("method setField is not supported.");
102+
}
103+
104+
@Override
105+
public Object get(int pos) {
106+
// firstly, for system columns
107+
if (pos == originRowFieldCount) {
108+
// bucket column
109+
return bucket;
110+
} else if (pos == originRowFieldCount + 1) {
111+
// log offset column
112+
return logRecord.logOffset();
113+
} else if (pos == originRowFieldCount + 2) {
114+
// timestamp column
115+
return getTimestampLtz(logRecord.timestamp());
116+
}
117+
118+
// handle normal columns
119+
if (internalRow.isNullAt(pos)) {
120+
return null;
121+
}
122+
123+
DataType dataType = flussRowType.getTypeAt(pos);
124+
if (dataType instanceof BooleanType) {
125+
return internalRow.getBoolean(pos);
126+
} else if (dataType instanceof TinyIntType) {
127+
return (int) internalRow.getByte(pos);
128+
} else if (dataType instanceof SmallIntType) {
129+
return internalRow.getShort(pos);
130+
} else if (dataType instanceof IntType) {
131+
return internalRow.getInt(pos);
132+
} else if (dataType instanceof BigIntType) {
133+
return internalRow.getLong(pos);
134+
} else if (dataType instanceof FloatType) {
135+
return internalRow.getFloat(pos);
136+
} else if (dataType instanceof DoubleType) {
137+
return internalRow.getDouble(pos);
138+
} else if (dataType instanceof StringType) {
139+
return internalRow.getString(pos).toString();
140+
} else if (dataType instanceof BinaryType) {
141+
// Iceberg's Record interface expects ByteBuffer for binary types.
142+
return ByteBuffer.wrap(internalRow.getBytes(pos));
143+
} else if (dataType instanceof DecimalType) {
144+
// Iceberg expects BigDecimal for decimal types.
145+
DecimalType decimalType = (DecimalType) dataType;
146+
return internalRow
147+
.getDecimal(pos, decimalType.getPrecision(), decimalType.getScale())
148+
.toBigDecimal();
149+
} else if (dataType instanceof LocalZonedTimestampType) {
150+
// Iceberg expects OffsetDateTime for timestamp with local timezone.
151+
return getTimestampLtz(
152+
internalRow
153+
.getTimestampLtz(
154+
pos, ((LocalZonedTimestampType) dataType).getPrecision())
155+
.toInstant());
156+
} else if (dataType instanceof TimestampType) {
157+
// Iceberg expects LocalDateType for timestamp without local timezone.
158+
return internalRow
159+
.getTimestampNtz(pos, ((TimestampType) dataType).getPrecision())
160+
.toLocalDateTime();
161+
} else if (dataType instanceof DateType) {
162+
return DateTimeUtils.toLocalDate(internalRow.getInt(pos));
163+
} else if (dataType instanceof TimeType) {
164+
return DateTimeUtils.toLocalTime(internalRow.getInt(pos));
165+
}
166+
throw new UnsupportedOperationException(
167+
"Unsupported data type conversion for Fluss type: "
168+
+ dataType.getClass().getName());
169+
}
170+
171+
private OffsetDateTime getTimestampLtz(long timestamp) {
172+
return OffsetDateTime.ofInstant(Instant.ofEpochMilli(timestamp), ZoneOffset.UTC);
173+
}
174+
175+
private OffsetDateTime getTimestampLtz(Instant instant) {
176+
return OffsetDateTime.ofInstant(instant, ZoneOffset.UTC);
177+
}
178+
179+
@Override
180+
public Record copy() {
181+
throw new UnsupportedOperationException("method copy is not supported.");
182+
}
183+
184+
@Override
185+
public Record copy(Map<String, Object> overwriteValues) {
186+
throw new UnsupportedOperationException("method copy is not supported.");
187+
}
188+
189+
@Override
190+
public int size() {
191+
return icebergSchema.asStruct().fields().size();
192+
}
193+
194+
@Override
195+
public <T> T get(int pos, Class<T> javaClass) {
196+
Object value = get(pos);
197+
if (value == null || javaClass.isInstance(value)) {
198+
return javaClass.cast(value);
199+
} else {
200+
throw new IllegalStateException(
201+
"Not an instance of " + javaClass.getName() + ": " + value);
202+
}
203+
}
204+
205+
@Override
206+
public <T> void set(int pos, T value) {
207+
throw new UnsupportedOperationException("method set is not supported.");
208+
}
209+
}

0 commit comments

Comments
 (0)