Skip to content

Commit dbbfc79

Browse files
MehulBatraluoyuxia
andauthored
[lake/iceberg] Introduce IcebergRewriteDataFiles to compact files (#1552)
--------- Co-authored-by: luoyuxia <[email protected]>
1 parent 3d95c40 commit dbbfc79

17 files changed

+1073
-164
lines changed

fluss-lake/fluss-lake-iceberg/pom.xml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,6 @@
3131
<name>Fluss : Lake : Iceberg</name>
3232

3333
<packaging>jar</packaging>
34-
3534
<dependencies>
3635
<dependency>
3736
<groupId>org.apache.iceberg</groupId>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,182 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.fluss.lake.iceberg.maintenance;
20+
21+
import org.apache.fluss.lake.iceberg.tiering.writer.TaskWriterFactory;
22+
import org.apache.fluss.metadata.TableBucket;
23+
24+
import org.apache.iceberg.BaseCombinedScanTask;
25+
import org.apache.iceberg.CombinedScanTask;
26+
import org.apache.iceberg.ContentScanTask;
27+
import org.apache.iceberg.DataFile;
28+
import org.apache.iceberg.FileScanTask;
29+
import org.apache.iceberg.Table;
30+
import org.apache.iceberg.data.IcebergGenericReader;
31+
import org.apache.iceberg.data.Record;
32+
import org.apache.iceberg.expressions.Expression;
33+
import org.apache.iceberg.io.CloseableIterable;
34+
import org.apache.iceberg.io.TaskWriter;
35+
import org.apache.iceberg.io.WriteResult;
36+
import org.apache.iceberg.util.BinPacking;
37+
import org.slf4j.Logger;
38+
import org.slf4j.LoggerFactory;
39+
40+
import javax.annotation.Nullable;
41+
42+
import java.io.IOException;
43+
import java.nio.ByteBuffer;
44+
import java.nio.ByteOrder;
45+
import java.util.ArrayList;
46+
import java.util.Arrays;
47+
import java.util.Collections;
48+
import java.util.Comparator;
49+
import java.util.List;
50+
import java.util.stream.Collectors;
51+
52+
import static org.apache.fluss.lake.iceberg.utils.IcebergConversions.toFilterExpression;
53+
import static org.apache.fluss.metadata.TableDescriptor.OFFSET_COLUMN_NAME;
54+
import static org.apache.fluss.utils.Preconditions.checkState;
55+
56+
/**
57+
* Concrete implementation for Fluss's Iceberg integration. Handles bin-packing compaction of small
58+
* files into larger ones.
59+
*/
60+
public class IcebergRewriteDataFiles {
61+
62+
private static final Logger LOG = LoggerFactory.getLogger(IcebergRewriteDataFiles.class);
63+
64+
private static final int MIN_FILES_TO_COMPACT = 3;
65+
66+
private final Table table;
67+
private final String partition;
68+
private final TableBucket bucket;
69+
private final Expression filter;
70+
private long targetSizeInBytes = 128 * 1024 * 1024; // 128MB default
71+
72+
public IcebergRewriteDataFiles(Table table, @Nullable String partition, TableBucket bucket) {
73+
this.table = table;
74+
this.partition = partition;
75+
this.bucket = bucket;
76+
this.filter = toFilterExpression(table, partition, bucket.getBucket());
77+
}
78+
79+
public IcebergRewriteDataFiles targetSizeInBytes(long targetSize) {
80+
this.targetSizeInBytes = targetSize;
81+
return this;
82+
}
83+
84+
private List<CombinedScanTask> planRewriteFileGroups() throws IOException {
85+
List<FileScanTask> fileScanTasks = new ArrayList<>();
86+
try (CloseableIterable<FileScanTask> tasks =
87+
table.newScan().includeColumnStats().filter(filter).ignoreResiduals().planFiles()) {
88+
tasks.forEach(fileScanTasks::add);
89+
}
90+
91+
// the files < targetSizeInBytes is less than MIN_FILES_TO_COMPACT, don't compact
92+
if (fileScanTasks.stream()
93+
.filter(fileScanTask -> fileScanTask.length() < targetSizeInBytes)
94+
.count()
95+
< MIN_FILES_TO_COMPACT) {
96+
// return empty file group
97+
return Collections.emptyList();
98+
}
99+
100+
// then, pack the fileScanTasks into compaction units which contains compactable
101+
// fileScanTasks, after compaction, we want to it still keep order by __offset column,
102+
// so, let's first sort by __offset column
103+
int offsetFieldId = table.schema().findField(OFFSET_COLUMN_NAME).fieldId();
104+
fileScanTasks.sort(sortFileScanTask(offsetFieldId));
105+
106+
// do package now
107+
BinPacking.ListPacker<FileScanTask> packer =
108+
new BinPacking.ListPacker<>(targetSizeInBytes, 1, false);
109+
return packer.pack(fileScanTasks, ContentScanTask::length).stream()
110+
.filter(tasks -> tasks.size() > 1)
111+
.map(BaseCombinedScanTask::new)
112+
.collect(Collectors.toList());
113+
}
114+
115+
private Comparator<FileScanTask> sortFileScanTask(int sortFiledId) {
116+
return (f1, f2) -> {
117+
ByteBuffer buffer1 =
118+
f1.file()
119+
.lowerBounds()
120+
.get(sortFiledId)
121+
.order(ByteOrder.LITTLE_ENDIAN)
122+
.rewind();
123+
long offset1 = buffer1.getLong();
124+
ByteBuffer buffer2 =
125+
f2.file()
126+
.lowerBounds()
127+
.get(sortFiledId)
128+
.order(ByteOrder.LITTLE_ENDIAN)
129+
.rewind();
130+
long offset2 = buffer2.getLong();
131+
return Long.compare(offset1, offset2);
132+
};
133+
}
134+
135+
@Nullable
136+
public RewriteDataFileResult execute() {
137+
try {
138+
// plan the file groups to be rewrite
139+
List<CombinedScanTask> tasksToRewrite = planRewriteFileGroups();
140+
if (tasksToRewrite.isEmpty()) {
141+
return null;
142+
}
143+
LOG.info("Start to rewrite files {}.", tasksToRewrite);
144+
List<DataFile> deletedDataFiles = new ArrayList<>();
145+
List<DataFile> addedDataFiles = new ArrayList<>();
146+
for (CombinedScanTask combinedScanTask : tasksToRewrite) {
147+
addedDataFiles.addAll(rewriteFileGroup(combinedScanTask));
148+
deletedDataFiles.addAll(
149+
combinedScanTask.files().stream()
150+
.map(ContentScanTask::file)
151+
.collect(Collectors.toList()));
152+
}
153+
LOG.info("Finish rewriting files from {} to {}.", deletedDataFiles, addedDataFiles);
154+
return new RewriteDataFileResult(deletedDataFiles, addedDataFiles);
155+
} catch (Exception e) {
156+
throw new RuntimeException(
157+
String.format("Fail to compact bucket %s of table %s.", bucket, table.name()),
158+
e);
159+
}
160+
}
161+
162+
private List<DataFile> rewriteFileGroup(CombinedScanTask combinedScanTask) throws IOException {
163+
try (CloseableIterable<Record> records = readDataFile(combinedScanTask);
164+
TaskWriter<Record> taskWriter =
165+
TaskWriterFactory.createTaskWriter(table, partition, bucket.getBucket())) {
166+
for (Record record : records) {
167+
taskWriter.write(record);
168+
}
169+
WriteResult rewriteResult = taskWriter.complete();
170+
checkState(
171+
rewriteResult.deleteFiles().length == 0,
172+
"the delete files should be empty, but got "
173+
+ Arrays.toString(rewriteResult.deleteFiles()));
174+
return Arrays.asList(rewriteResult.dataFiles());
175+
}
176+
}
177+
178+
private CloseableIterable<Record> readDataFile(CombinedScanTask combinedScanTask) {
179+
IcebergGenericReader reader = new IcebergGenericReader(table.newScan(), true);
180+
return reader.open(combinedScanTask);
181+
}
182+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.fluss.lake.iceberg.maintenance;
20+
21+
import org.apache.iceberg.DataFile;
22+
23+
import java.io.Serializable;
24+
import java.util.List;
25+
26+
/** The result for rewrite iceberg data files. */
27+
public class RewriteDataFileResult implements Serializable {
28+
29+
private static final long serialVersionUID = 1L;
30+
31+
private final List<DataFile> deletedDataFiles;
32+
private final List<DataFile> addedDataFiles;
33+
34+
public RewriteDataFileResult(List<DataFile> deletedDataFiles, List<DataFile> addedDataFiles) {
35+
this.deletedDataFiles = deletedDataFiles;
36+
this.addedDataFiles = addedDataFiles;
37+
}
38+
39+
public List<DataFile> deletedDataFiles() {
40+
return deletedDataFiles;
41+
}
42+
43+
public List<DataFile> addedDataFiles() {
44+
return addedDataFiles;
45+
}
46+
47+
@Override
48+
public String toString() {
49+
return "RewriteDataFileResult{"
50+
+ "deletedDataFiles="
51+
+ deletedDataFiles
52+
+ ", addedDataFiles="
53+
+ addedDataFiles
54+
+ '}';
55+
}
56+
}

fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/tiering/IcebergCommittable.java

Lines changed: 28 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717

1818
package org.apache.fluss.lake.iceberg.tiering;
1919

20+
import org.apache.fluss.lake.iceberg.maintenance.RewriteDataFileResult;
21+
2022
import org.apache.iceberg.DataFile;
2123
import org.apache.iceberg.DeleteFile;
2224

@@ -32,9 +34,15 @@ public class IcebergCommittable implements Serializable {
3234
private final List<DataFile> dataFiles;
3335
private final List<DeleteFile> deleteFiles;
3436

35-
private IcebergCommittable(List<DataFile> dataFiles, List<DeleteFile> deleteFiles) {
37+
private final List<RewriteDataFileResult> rewriteDataFiles;
38+
39+
private IcebergCommittable(
40+
List<DataFile> dataFiles,
41+
List<DeleteFile> deleteFiles,
42+
List<RewriteDataFileResult> rewriteDataFiles) {
3643
this.dataFiles = dataFiles;
3744
this.deleteFiles = deleteFiles;
45+
this.rewriteDataFiles = rewriteDataFiles;
3846
}
3947

4048
public List<DataFile> getDataFiles() {
@@ -45,6 +53,10 @@ public List<DeleteFile> getDeleteFiles() {
4553
return deleteFiles;
4654
}
4755

56+
public List<RewriteDataFileResult> rewriteDataFileResults() {
57+
return rewriteDataFiles;
58+
}
59+
4860
public static Builder builder() {
4961
return new Builder();
5062
}
@@ -57,6 +69,8 @@ public static class Builder {
5769
private final List<DataFile> dataFiles = new ArrayList<>();
5870
private final List<DeleteFile> deleteFiles = new ArrayList<>();
5971

72+
private final List<RewriteDataFileResult> rewriteDataFileResults = new ArrayList<>();
73+
6074
public Builder addDataFile(DataFile dataFile) {
6175
this.dataFiles.add(dataFile);
6276
return this;
@@ -67,18 +81,28 @@ public Builder addDeleteFile(DeleteFile deleteFile) {
6781
return this;
6882
}
6983

84+
public Builder addRewriteDataFileResult(RewriteDataFileResult rewriteDataFileResult) {
85+
this.rewriteDataFileResults.add(rewriteDataFileResult);
86+
return this;
87+
}
88+
7089
public IcebergCommittable build() {
71-
return new IcebergCommittable(new ArrayList<>(dataFiles), new ArrayList<>(deleteFiles));
90+
return new IcebergCommittable(
91+
new ArrayList<>(dataFiles),
92+
new ArrayList<>(deleteFiles),
93+
rewriteDataFileResults);
7294
}
7395
}
7496

7597
@Override
7698
public String toString() {
7799
return "IcebergCommittable{"
78100
+ "dataFiles="
79-
+ dataFiles.size()
101+
+ dataFiles
80102
+ ", deleteFiles="
81-
+ deleteFiles.size()
103+
+ deleteFiles
104+
+ "rewriteDataFiles = "
105+
+ rewriteDataFiles
82106
+ '}';
83107
}
84108
}

0 commit comments

Comments
 (0)