Skip to content

Commit 22f34b9

Browse files
author
Liebing
committed
[WIP] Rescale bucket
1 parent c1dc8e3 commit 22f34b9

File tree

31 files changed

+1412
-16
lines changed

31 files changed

+1412
-16
lines changed

fluss-client/src/main/java/org/apache/fluss/client/admin/Admin.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -234,6 +234,25 @@ CompletableFuture<Void> createTable(
234234
*/
235235
CompletableFuture<List<String>> listTables(String databaseName);
236236

237+
/**
238+
* Alter a table.
239+
*
240+
* <p>The following exceptions can be anticipated when calling {@code get()} on returned future.
241+
*
242+
* <ul>
243+
* <li>{@link DatabaseNotExistException} when the database does not exist.
244+
* <li>{@link TableNotExistException} when the table does not exist, if ignoreIfNotExists is
245+
* false.
246+
* </ul>
247+
*
248+
* @param tablePath The table path of the table.
249+
* @param tableDescriptor The table descriptor.
250+
* @param ignoreIfNotExists if it is true, do nothing if table does not exist. If false, throw a
251+
* TableNotExistException.
252+
*/
253+
CompletableFuture<Void> alterTable(
254+
TablePath tablePath, TableDescriptor tableDescriptor, boolean ignoreIfNotExists);
255+
237256
/**
238257
* List all partitions in the given table in fluss cluster asynchronously.
239258
*

fluss-client/src/main/java/org/apache/fluss/client/admin/FlussAdmin.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
import org.apache.fluss.rpc.gateway.AdminGateway;
4242
import org.apache.fluss.rpc.gateway.AdminReadOnlyGateway;
4343
import org.apache.fluss.rpc.gateway.TabletServerGateway;
44+
import org.apache.fluss.rpc.messages.AlterTableRequest;
4445
import org.apache.fluss.rpc.messages.CreateAclsRequest;
4546
import org.apache.fluss.rpc.messages.CreateDatabaseRequest;
4647
import org.apache.fluss.rpc.messages.CreateTableRequest;
@@ -235,6 +236,19 @@ public CompletableFuture<Void> createTable(
235236
return gateway.createTable(request).thenApply(r -> null);
236237
}
237238

239+
@Override
240+
public CompletableFuture<Void> alterTable(
241+
TablePath tablePath, TableDescriptor tableDescriptor, boolean ignoreIfNotExists) {
242+
tablePath.validate();
243+
AlterTableRequest request = new AlterTableRequest();
244+
request.setTableJson(tableDescriptor.toJsonBytes())
245+
.setIgnoreIfNotExists(ignoreIfNotExists)
246+
.setTablePath()
247+
.setDatabaseName(tablePath.getDatabaseName())
248+
.setTableName(tablePath.getTableName());
249+
return gateway.alterTable(request).thenApply(r -> null);
250+
}
251+
238252
@Override
239253
public CompletableFuture<TableInfo> getTableInfo(TablePath tablePath) {
240254
GetTableInfoRequest request = new GetTableInfoRequest();

fluss-client/src/test/java/org/apache/fluss/client/admin/ClientToServerITCaseBase.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -235,6 +235,13 @@ public static void waitAllReplicasReady(long tableId, int expectBucketCount) {
235235
}
236236
}
237237

238+
public static void waitAllReplicasReady(long tableId, long partitionId, int expectBucketCount) {
239+
for (int i = 0; i < expectBucketCount; i++) {
240+
FLUSS_CLUSTER_EXTENSION.waitUntilAllReplicaReady(
241+
new TableBucket(tableId, partitionId, i));
242+
}
243+
}
244+
238245
protected static void verifyRows(
239246
RowType rowType,
240247
Map<Long, List<InternalRow>> actualRows,

fluss-client/src/test/java/org/apache/fluss/client/admin/FlussAdminITCase.java

Lines changed: 145 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@
6060
import org.apache.fluss.metadata.TablePath;
6161
import org.apache.fluss.server.kv.snapshot.CompletedSnapshot;
6262
import org.apache.fluss.server.kv.snapshot.KvSnapshotHandle;
63+
import org.apache.fluss.server.zk.data.TableAssignment;
6364
import org.apache.fluss.types.DataTypes;
6465

6566
import org.junit.jupiter.api.BeforeEach;
@@ -197,6 +198,150 @@ void testGetTableInfoAndSchema() throws Exception {
197198
.isBetween(timestampBeforeCreate, timestampAfterCreate);
198199
}
199200

201+
@Test
202+
void testAlterTable() throws Exception {
203+
// create table
204+
TablePath tablePath = TablePath.of("test_db", "alter_table_1");
205+
admin.createTable(tablePath, DEFAULT_TABLE_DESCRIPTOR, false).get();
206+
207+
TableInfo tableInfo = admin.getTableInfo(tablePath).get();
208+
209+
TableDescriptor existingTableDescriptor = tableInfo.toTableDescriptor();
210+
Map<String, String> updateProperties =
211+
new HashMap<>(existingTableDescriptor.getProperties());
212+
Map<String, String> updateCustomProperties =
213+
new HashMap<>(existingTableDescriptor.getCustomProperties());
214+
updateProperties.put("table.datalake.enabled", "true");
215+
updateCustomProperties.put("table.datalake.enabled", "true");
216+
217+
TableDescriptor newTableDescriptor =
218+
TableDescriptor.builder()
219+
.schema(existingTableDescriptor.getSchema())
220+
.comment(existingTableDescriptor.getComment().orElse("test table"))
221+
.partitionedBy(existingTableDescriptor.getPartitionKeys())
222+
.distributedBy(
223+
existingTableDescriptor
224+
.getTableDistribution()
225+
.get()
226+
.getBucketCount()
227+
.orElse(3),
228+
existingTableDescriptor.getBucketKeys())
229+
.properties(updateProperties)
230+
.customProperties(updateCustomProperties)
231+
.build();
232+
// alter table
233+
admin.alterTable(tablePath, newTableDescriptor, false).get();
234+
235+
TableInfo alteredTableInfo = admin.getTableInfo(tablePath).get();
236+
TableDescriptor alteredTableDescriptor = alteredTableInfo.toTableDescriptor();
237+
assertThat(alteredTableDescriptor).isEqualTo(newTableDescriptor);
238+
239+
// throw exception if table not exist
240+
assertThatThrownBy(
241+
() ->
242+
admin.alterTable(
243+
TablePath.of("test_db", "alter_table_not_exist"),
244+
newTableDescriptor,
245+
false)
246+
.get())
247+
.cause()
248+
.isInstanceOf(TableNotExistException.class);
249+
250+
// throw exception if database not exist
251+
assertThatThrownBy(
252+
() ->
253+
admin.alterTable(
254+
TablePath.of(
255+
"test_db_not_exist",
256+
"alter_table_not_exist"),
257+
newTableDescriptor,
258+
false)
259+
.get())
260+
.cause()
261+
.isInstanceOf(DatabaseNotExistException.class);
262+
}
263+
264+
@Test
265+
void testAlterTableBucket() throws Exception {
266+
// create table
267+
TablePath tablePath = TablePath.of("test_db", "alter_table_bucket");
268+
admin.createTable(tablePath, DEFAULT_TABLE_DESCRIPTOR, false).get();
269+
270+
TableInfo tableInfo = admin.getTableInfo(tablePath).get();
271+
272+
TableAssignment tableAssignment =
273+
FLUSS_CLUSTER_EXTENSION
274+
.getZooKeeperClient()
275+
.getTableAssignment(tableInfo.getTableId())
276+
.get();
277+
System.out.println(tableAssignment);
278+
279+
TableDescriptor existingTableDescriptor = tableInfo.toTableDescriptor();
280+
281+
TableDescriptor newTableDescriptor =
282+
TableDescriptor.builder()
283+
.schema(existingTableDescriptor.getSchema())
284+
.comment(existingTableDescriptor.getComment().orElse("test table"))
285+
.partitionedBy(existingTableDescriptor.getPartitionKeys())
286+
.distributedBy(
287+
existingTableDescriptor
288+
.getTableDistribution()
289+
.get()
290+
.getBucketCount()
291+
.get()
292+
+ 1,
293+
existingTableDescriptor.getBucketKeys())
294+
.properties(existingTableDescriptor.getProperties())
295+
.customProperties(existingTableDescriptor.getCustomProperties())
296+
.build();
297+
// alter table
298+
admin.alterTable(tablePath, newTableDescriptor, false).get();
299+
300+
TableInfo alteredTableInfo = admin.getTableInfo(tablePath).get();
301+
TableDescriptor alteredTableDescriptor = alteredTableInfo.toTableDescriptor();
302+
// assertThat(alteredTableDescriptor).isEqualTo(newTableDescriptor);
303+
304+
TableAssignment tableAssignment1 =
305+
FLUSS_CLUSTER_EXTENSION
306+
.getZooKeeperClient()
307+
.getTableAssignment(tableInfo.getTableId())
308+
.get();
309+
System.out.println(tableAssignment1);
310+
311+
// TODO: test partitioned table
312+
}
313+
314+
@Test
315+
void testAlterTableBucketForPrimaryKeyTable() throws Exception {
316+
// create table
317+
TablePath tablePath = TablePath.of("test_db", "alter_table_bucket");
318+
admin.createTable(tablePath, DEFAULT_TABLE_DESCRIPTOR, false).get();
319+
320+
TableInfo tableInfo = admin.getTableInfo(tablePath).get();
321+
TableDescriptor existingTableDescriptor = tableInfo.toTableDescriptor();
322+
323+
TableDescriptor newTableDescriptor =
324+
TableDescriptor.builder()
325+
.schema(existingTableDescriptor.getSchema())
326+
.comment(existingTableDescriptor.getComment().orElse("test table"))
327+
.partitionedBy(existingTableDescriptor.getPartitionKeys())
328+
.distributedBy(
329+
existingTableDescriptor
330+
.getTableDistribution()
331+
.get()
332+
.getBucketCount()
333+
.get()
334+
+ 1,
335+
existingTableDescriptor.getBucketKeys())
336+
.properties(existingTableDescriptor.getProperties())
337+
.customProperties(existingTableDescriptor.getCustomProperties())
338+
.build();
339+
// alter table
340+
assertThatThrownBy(() -> admin.alterTable(tablePath, newTableDescriptor, false).get())
341+
.rootCause()
342+
.isInstanceOf(InvalidTableException.class);
343+
}
344+
200345
@Test
201346
void testCreateInvalidDatabaseAndTable() {
202347
assertThatThrownBy(

fluss-client/src/test/java/org/apache/fluss/client/table/FlussTableITCase.java

Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,8 +65,10 @@
6565
import java.time.Duration;
6666
import java.util.ArrayList;
6767
import java.util.Collections;
68+
import java.util.HashSet;
6869
import java.util.Iterator;
6970
import java.util.List;
71+
import java.util.Set;
7072
import java.util.concurrent.CompletableFuture;
7173

7274
import static org.apache.fluss.client.table.scanner.batch.BatchScanUtils.collectRows;
@@ -724,6 +726,90 @@ void verifyAppendOrPut(boolean append, String logFormat, @Nullable String kvForm
724726
}
725727
}
726728

729+
@Test
730+
void testAppendWithAlterTableBucket() throws Exception {
731+
TableDescriptor data1TableDescriptor =
732+
TableDescriptor.builder().schema(DATA1_SCHEMA).distributedBy(1).build();
733+
createTable(DATA1_TABLE_PATH, data1TableDescriptor, false);
734+
735+
Configuration clientConf = FLUSS_CLUSTER_EXTENSION.getClientConfig();
736+
// use round-robin bucket assigner, so that we can append data to all buckets
737+
clientConf.set(
738+
ConfigOptions.CLIENT_WRITER_BUCKET_NO_KEY_ASSIGNER,
739+
ConfigOptions.NoKeyAssigner.ROUND_ROBIN);
740+
Connection conn = ConnectionFactory.createConnection(clientConf);
741+
long tableId;
742+
int expectedRowCount = 10;
743+
try (Table table = conn.getTable(DATA1_TABLE_PATH)) {
744+
tableId = table.getTableInfo().getTableId();
745+
AppendWriter appendWriter = table.newAppend().createWriter();
746+
747+
for (int i = 0; i < expectedRowCount; i++) {
748+
GenericRow row = row(i, "a");
749+
appendWriter.append(row).get();
750+
}
751+
appendWriter.flush();
752+
753+
try (LogScanner logScanner = createLogScanner(table)) {
754+
subscribeFromBeginning(logScanner, table);
755+
756+
int count = 0;
757+
Set<TableBucket> allBuckets = new HashSet<>();
758+
while (count < expectedRowCount) {
759+
ScanRecords scanRecords = logScanner.poll(Duration.ofSeconds(1));
760+
allBuckets.addAll(scanRecords.buckets());
761+
for (ScanRecord ignored : scanRecords) {
762+
count++;
763+
}
764+
}
765+
// now we only have 1 bucket
766+
assertThat(allBuckets).containsExactly(new TableBucket(tableId, 0));
767+
assertThat(count).isEqualTo(expectedRowCount);
768+
}
769+
}
770+
conn.close();
771+
772+
// alter table bucket from 1 to 2
773+
TableDescriptor data2TableDescriptor =
774+
TableDescriptor.builder().schema(DATA1_SCHEMA).distributedBy(2).build();
775+
admin.alterTable(DATA1_TABLE_PATH, data2TableDescriptor, false);
776+
777+
// wait until new bucket replicas are ready
778+
waitAllReplicasReady(tableId, 2);
779+
780+
// reestablish connection to force update table meta
781+
conn = ConnectionFactory.createConnection(clientConf);
782+
try (Table table = conn.getTable(DATA1_TABLE_PATH)) {
783+
AppendWriter appendWriter = table.newAppend().createWriter();
784+
785+
for (int i = 0; i < expectedRowCount; i++) {
786+
appendWriter.append(row(i, "a")).get();
787+
}
788+
appendWriter.flush();
789+
790+
try (LogScanner logScanner = createLogScanner(table)) {
791+
subscribeFromBeginning(logScanner, table);
792+
793+
int count = 0;
794+
Set<TableBucket> allBuckets = new HashSet<>();
795+
while (count < expectedRowCount * 2) {
796+
ScanRecords scanRecords = logScanner.poll(Duration.ofSeconds(1));
797+
allBuckets.addAll(scanRecords.buckets());
798+
for (ScanRecord ignored : scanRecords) {
799+
count++;
800+
}
801+
}
802+
// now we have 2 buckets and use round-robin bucket assigner when write rows, so
803+
// rows will come from
804+
// both buckets
805+
assertThat(allBuckets)
806+
.contains(new TableBucket(tableId, 0), new TableBucket(tableId, 1));
807+
assertThat(count).isEqualTo(expectedRowCount * 2);
808+
}
809+
}
810+
conn.close();
811+
}
812+
727813
@ParameterizedTest
728814
@ValueSource(strings = {"INDEXED", "ARROW"})
729815
void testAppendAndProject(String format) throws Exception {

0 commit comments

Comments
 (0)