Skip to content

Commit c57158c

Browse files
authored
[server] Accelerate deleting table or partition (#1081)
1 parent d378179 commit c57158c

File tree

4 files changed

+108
-48
lines changed

4 files changed

+108
-48
lines changed

fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessor.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -700,7 +700,7 @@ private void onReplicaBecomeOffline(Set<TableBucketReplica> offlineReplicas) {
700700
private void processNewTabletServer(NewTabletServerEvent newTabletServerEvent) {
701701
// NOTE: we won't need to detect bounced tablet servers like Kafka as we won't
702702
// miss the event of tablet server un-register and register again since we can
703-
// listener the children created and deleted in zk node.
703+
// listen the children created and deleted in zk node.
704704

705705
// Also, Kafka use broker epoch to make it can reject the LeaderAndIsrRequest,
706706
// UpdateMetadataRequest and StopReplicaRequest

fluss-server/src/main/java/org/apache/fluss/server/coordinator/statemachine/ReplicaStateMachine.java

Lines changed: 23 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -414,16 +414,24 @@ private String stringifyReplica(TableBucketReplica replica) {
414414
private Map<TableBucketReplica, LeaderAndIsr> doRemoveReplicaFromIsr(
415415
Collection<TableBucketReplica> tableBucketReplicas) {
416416
Map<TableBucketReplica, LeaderAndIsr> adjustedLeaderAndIsr = new HashMap<>();
417+
Map<TableBucket, LeaderAndIsr> toUpdateLeaderAndIsrList = new HashMap<>();
417418
for (TableBucketReplica tableBucketReplica : tableBucketReplicas) {
418419
TableBucket tableBucket = tableBucketReplica.getTableBucket();
419420
int replicaId = tableBucketReplica.getReplica();
420-
Optional<LeaderAndIsr> optLeaderAndIsr =
421-
coordinatorContext.getBucketLeaderAndIsr(tableBucket);
422-
if (!optLeaderAndIsr.isPresent()) {
423-
// no leader and isr for this table bucket, skip
424-
continue;
421+
422+
LeaderAndIsr leaderAndIsr = null;
423+
if (toUpdateLeaderAndIsrList.get(tableBucket) != null) {
424+
leaderAndIsr = toUpdateLeaderAndIsrList.get(tableBucket);
425+
} else {
426+
Optional<LeaderAndIsr> optLeaderAndIsr =
427+
coordinatorContext.getBucketLeaderAndIsr(tableBucket);
428+
if (!optLeaderAndIsr.isPresent()) {
429+
// no leader and isr for this table bucket, skip
430+
continue;
431+
}
432+
leaderAndIsr = optLeaderAndIsr.get();
425433
}
426-
LeaderAndIsr leaderAndIsr = optLeaderAndIsr.get();
434+
427435
if (!leaderAndIsr.isr().contains(replicaId)) {
428436
// isr doesn't contain the replica, skip
429437
continue;
@@ -438,25 +446,21 @@ private Map<TableBucketReplica, LeaderAndIsr> doRemoveReplicaFromIsr(
438446
List<Integer> newIsr =
439447
leaderAndIsr.isr().size() == 1
440448
// don't remove the replica id from isr when isr size is 1,
441-
// if isr is empty, we can't elect leader any more
449+
// if isr is empty, we can't elect leader anymore
442450
? leaderAndIsr.isr()
443451
: leaderAndIsr.isr().stream()
444452
.filter(id -> id != replicaId)
445453
.collect(Collectors.toList());
446454
LeaderAndIsr adjustLeaderAndIsr = leaderAndIsr.newLeaderAndIsr(newLeader, newIsr);
447-
try {
448-
zooKeeperClient.updateLeaderAndIsr(tableBucket, adjustLeaderAndIsr);
449-
} catch (Exception e) {
450-
LOG.error(
451-
"Fail to update bucket LeaderAndIsr for table bucket {} of table {}.",
452-
tableBucket,
453-
coordinatorContext.getTablePathById(tableBucket.getTableId()),
454-
e);
455-
continue;
456-
}
457-
// update leader and isr
458-
coordinatorContext.putBucketLeaderAndIsr(tableBucket, adjustLeaderAndIsr);
459455
adjustedLeaderAndIsr.put(tableBucketReplica, adjustLeaderAndIsr);
456+
toUpdateLeaderAndIsrList.put(tableBucket, adjustLeaderAndIsr);
457+
}
458+
try {
459+
zooKeeperClient.batchUpdateLeaderAndIsr(toUpdateLeaderAndIsrList);
460+
toUpdateLeaderAndIsrList.forEach(coordinatorContext::putBucketLeaderAndIsr);
461+
return adjustedLeaderAndIsr;
462+
} catch (Exception e) {
463+
LOG.error("Fail to batch update bucket LeaderAndIsr.", e);
460464
}
461465
return adjustedLeaderAndIsr;
462466
}

fluss-server/src/test/java/org/apache/fluss/server/coordinator/statemachine/ReplicaStateMachineTest.java

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -188,6 +188,44 @@ void testDeleteReplicaStateChange() {
188188
}
189189
}
190190

191+
@Test
192+
void testOfflineReplicasShouldBeRemovedFromIsr() throws Exception {
193+
CoordinatorContext coordinatorContext = new CoordinatorContext();
194+
coordinatorContext.setLiveTabletServers(createServers(new int[] {0, 1, 2}));
195+
ReplicaStateMachine replicaStateMachine = createReplicaStateMachine(coordinatorContext);
196+
197+
// put the replica to online
198+
long tableId = 123L;
199+
coordinatorContext.putTableInfo(
200+
TableInfo.of(
201+
DATA1_TABLE_PATH,
202+
tableId,
203+
0,
204+
DATA1_TABLE_DESCRIPTOR,
205+
System.currentTimeMillis(),
206+
System.currentTimeMillis()));
207+
coordinatorContext.putTablePath(tableId, DATA1_TABLE_PATH);
208+
TableBucket tableBucket = new TableBucket(tableId, 0);
209+
List<TableBucketReplica> replicas = new ArrayList<>();
210+
for (int i = 0; i < 3; i++) {
211+
TableBucketReplica replica = new TableBucketReplica(tableBucket, i);
212+
coordinatorContext.putReplicaState(replica, OnlineReplica);
213+
replicas.add(replica);
214+
}
215+
// put leader and isr
216+
LeaderAndIsr leaderAndIsr = new LeaderAndIsr(0, 0, Arrays.asList(0, 1, 2), 0, 0);
217+
zookeeperClient.registerLeaderAndIsr(tableBucket, leaderAndIsr);
218+
coordinatorContext.updateBucketReplicaAssignment(tableBucket, Arrays.asList(0, 1, 2));
219+
coordinatorContext.putBucketLeaderAndIsr(tableBucket, leaderAndIsr);
220+
221+
// set replica 0,1,2 to offline together. The result should be the same as offline one by
222+
// one.
223+
replicaStateMachine.handleStateChanges(replicas, OfflineReplica);
224+
leaderAndIsr = coordinatorContext.getBucketLeaderAndIsr(tableBucket).get();
225+
assertThat(leaderAndIsr)
226+
.isEqualTo(new LeaderAndIsr(LeaderAndIsr.NO_LEADER, 0, Arrays.asList(2), 0, 3));
227+
}
228+
191229
@Test
192230
void testOfflineReplicaShouldBeRemovedFromIsr() throws Exception {
193231
CoordinatorContext coordinatorContext = new CoordinatorContext();

fluss-server/src/test/java/org/apache/fluss/server/zk/ZooKeeperClientTest.java

Lines changed: 46 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,8 @@
4949
import org.junit.jupiter.api.BeforeAll;
5050
import org.junit.jupiter.api.Test;
5151
import org.junit.jupiter.api.extension.RegisterExtension;
52+
import org.junit.jupiter.params.ParameterizedTest;
53+
import org.junit.jupiter.params.provider.ValueSource;
5254

5355
import java.util.ArrayList;
5456
import java.util.Arrays;
@@ -177,47 +179,63 @@ void testLeaderAndIsr() throws Exception {
177179
assertThat(zookeeperClient.getLeaderAndIsr(tableBucket)).isEmpty();
178180
}
179181

180-
@Test
181-
void testBatchCreateLeaderAndIsr() throws Exception {
182-
List<RegisterTableBucketLeadAndIsrInfo> noPartitionTableBucket = new ArrayList<>();
183-
// non-partition table
184-
List<LeaderAndIsr> noPartitionleaderAndIsrList = new ArrayList<>();
182+
@ParameterizedTest
183+
@ValueSource(booleans = {true, false})
184+
void testBatchCreateAndUpdateLeaderAndIsr(boolean isPartitionTable) throws Exception {
185+
List<RegisterTableBucketLeadAndIsrInfo> tableBucketInfo = new ArrayList<>();
186+
List<LeaderAndIsr> leaderAndIsrList = new ArrayList<>();
185187
for (int i = 0; i < 100; i++) {
186-
TableBucket tableBucket = new TableBucket(1, i);
188+
TableBucket tableBucket =
189+
isPartitionTable ? new TableBucket(1, 2L, i) : new TableBucket(1, i);
187190
LeaderAndIsr leaderAndIsr =
188191
new LeaderAndIsr(i, 10, Arrays.asList(i + 1, i + 2, i + 3), 100, 1000);
189-
noPartitionleaderAndIsrList.add(leaderAndIsr);
190-
noPartitionTableBucket.add(
191-
new RegisterTableBucketLeadAndIsrInfo(tableBucket, leaderAndIsr, null, null));
192+
leaderAndIsrList.add(leaderAndIsr);
193+
RegisterTableBucketLeadAndIsrInfo info =
194+
isPartitionTable
195+
? new RegisterTableBucketLeadAndIsrInfo(
196+
tableBucket, leaderAndIsr, "partition" + i, null)
197+
: new RegisterTableBucketLeadAndIsrInfo(
198+
tableBucket, leaderAndIsr, null, null);
199+
tableBucketInfo.add(info);
192200
}
193-
zookeeperClient.batchRegisterLeaderAndIsrForTablePartition(noPartitionTableBucket);
201+
// batch create
202+
zookeeperClient.batchRegisterLeaderAndIsrForTablePartition(tableBucketInfo);
194203

195204
for (int i = 0; i < 100; i++) {
205+
// each should register successful
196206
Optional<LeaderAndIsr> optionalLeaderAndIsr =
197-
zookeeperClient.getLeaderAndIsr(noPartitionTableBucket.get(i).getTableBucket());
207+
zookeeperClient.getLeaderAndIsr(tableBucketInfo.get(i).getTableBucket());
198208
assertThat(optionalLeaderAndIsr.isPresent()).isTrue();
199-
assertThat(optionalLeaderAndIsr.get()).isIn(noPartitionleaderAndIsrList);
209+
assertThat(optionalLeaderAndIsr.get()).isIn(leaderAndIsrList);
200210
}
201211

202-
List<RegisterTableBucketLeadAndIsrInfo> partitionTableBucket = new ArrayList<>();
203-
// partition table
204-
List<LeaderAndIsr> partitionleaderAndIsrList = new ArrayList<>();
205-
for (int i = 0; i < 100; i++) {
206-
TableBucket tableBucket = new TableBucket(1, 2L, i);
207-
LeaderAndIsr leaderAndIsr =
208-
new LeaderAndIsr(i, 10, Arrays.asList(i + 1, i + 2, i + 3), 100, 1000);
209-
partitionleaderAndIsrList.add(leaderAndIsr);
210-
partitionTableBucket.add(
211-
new RegisterTableBucketLeadAndIsrInfo(
212-
tableBucket, leaderAndIsr, "partition" + i, null));
213-
}
214-
215-
zookeeperClient.batchRegisterLeaderAndIsrForTablePartition(partitionTableBucket);
212+
Map<TableBucket, LeaderAndIsr> updateMap =
213+
tableBucketInfo.stream()
214+
.collect(
215+
Collectors.toMap(
216+
RegisterTableBucketLeadAndIsrInfo::getTableBucket,
217+
RegisterTableBucketLeadAndIsrInfo::getLeaderAndIsr));
218+
List<LeaderAndIsr> leaderAndIsrUpdateList = new ArrayList<>();
219+
updateMap
220+
.entrySet()
221+
.forEach(
222+
entry -> {
223+
LeaderAndIsr originalLeaderAndIsr = entry.getValue();
224+
LeaderAndIsr adjustLeaderAndIsr =
225+
originalLeaderAndIsr.newLeaderAndIsr(
226+
LeaderAndIsr.NO_LEADER,
227+
originalLeaderAndIsr.isr().subList(0, 1));
228+
leaderAndIsrUpdateList.add(adjustLeaderAndIsr);
229+
entry.setValue(adjustLeaderAndIsr);
230+
});
231+
// batch update
232+
zookeeperClient.batchUpdateLeaderAndIsr(updateMap);
216233
for (int i = 0; i < 100; i++) {
234+
// each should update successful
217235
Optional<LeaderAndIsr> optionalLeaderAndIsr =
218-
zookeeperClient.getLeaderAndIsr(partitionTableBucket.get(i).getTableBucket());
236+
zookeeperClient.getLeaderAndIsr(tableBucketInfo.get(i).getTableBucket());
219237
assertThat(optionalLeaderAndIsr.isPresent()).isTrue();
220-
assertThat(optionalLeaderAndIsr.get()).isIn(partitionleaderAndIsrList);
238+
assertThat(optionalLeaderAndIsr.get()).isIn(leaderAndIsrUpdateList);
221239
}
222240
}
223241

0 commit comments

Comments
 (0)