Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.fluss.flink.sink;

import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.utils.DataTypeUtils;
import org.apache.fluss.config.Configuration;
import org.apache.fluss.flink.sink.serializer.RowDataSerializationSchema;
import org.apache.fluss.flink.sink.writer.FlinkSinkWriter;
Expand Down Expand Up @@ -307,8 +309,28 @@ public Optional<Long> executeDeletion() {
@Override
public RowLevelDeleteInfo applyRowLevelDelete(
@Nullable RowLevelModificationScanContext rowLevelModificationScanContext) {
throw new UnsupportedOperationException(
"Currently, Fluss table only supports DELETE statement with conditions on primary key.");
validateUpdatableAndDeletable();
Map<String, DataType> primaryKeyDataTypes = getPrimaryKeyDataTypes();
if (primaryKeyDataTypes.isEmpty()) {
throw new UnsupportedOperationException(
String.format(
"Table %s is a Log Table. Log Table doesn't support DELETE statements.",
tablePath));
}
return new RowLevelDeleteInfo() {
@Override
public Optional<List<Column>> requiredColumns() {
return Optional.of(primaryKeyDataTypes.entrySet()
.stream()
.map(pkDateType -> Column.physical(pkDateType.getKey(), pkDateType.getValue()))
.collect(Collectors.toList()));
}

@Override
public RowLevelDeleteMode getRowLevelDeleteMode() {
return RowLevelDeleteMode.DELETED_ROWS;
}
};
}

@Override
Expand Down Expand Up @@ -362,6 +384,14 @@ private void validateUpdatableAndDeletable() {
}
}

private Map<String, DataType> getPrimaryKeyDataTypes() {
Map<String, DataType> pkDataTypes = new HashMap<>();
for (int index : primaryKeyIndexes) {
pkDataTypes.put(tableRowType.getFieldNames().get(index), DataTypeUtils.toInternalDataType(tableRowType.getTypeAt(index)));
}
return pkDataTypes;
}

private Map<Integer, LogicalType> getPrimaryKeyTypes() {
Map<Integer, LogicalType> pkTypes = new HashMap<>();
for (int index : primaryKeyIndexes) {
Expand Down
Loading