Skip to content

Commit eb19eea

Browse files
authored
[server] Enhance ReplicaManager#getReplicaOrException to return more accurate error messages (#1573)
1 parent c57158c commit eb19eea

File tree

5 files changed

+157
-1
lines changed

5 files changed

+157
-1
lines changed

fluss-server/src/main/java/org/apache/fluss/server/metadata/ServerMetadataSnapshot.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.apache.fluss.cluster.ServerNode;
2323
import org.apache.fluss.cluster.TabletServerInfo;
2424
import org.apache.fluss.metadata.PhysicalTablePath;
25+
import org.apache.fluss.metadata.TableBucket;
2526
import org.apache.fluss.metadata.TablePath;
2627

2728
import javax.annotation.Nullable;
@@ -179,4 +180,18 @@ public Map<Long, Map<Integer, BucketMetadata>> getBucketMetadataMapForPartitions
179180
public Map<Integer, ServerInfo> getAliveTabletServers() {
180181
return aliveTabletServers;
181182
}
183+
184+
public boolean contains(TableBucket tableBucket) {
185+
if (tableBucket.getPartitionId() == null) {
186+
return bucketMetadataMapForTables.containsKey(tableBucket.getTableId())
187+
&& bucketMetadataMapForTables
188+
.get(tableBucket.getTableId())
189+
.containsKey(tableBucket.getBucket());
190+
} else {
191+
return bucketMetadataMapForPartitions.containsKey(tableBucket.getPartitionId())
192+
&& bucketMetadataMapForPartitions
193+
.get(tableBucket.getPartitionId())
194+
.containsKey(tableBucket.getBucket());
195+
}
196+
}
182197
}

fluss-server/src/main/java/org/apache/fluss/server/metadata/TabletServerMetadataCache.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.apache.fluss.cluster.ServerNode;
2222
import org.apache.fluss.cluster.TabletServerInfo;
2323
import org.apache.fluss.metadata.PhysicalTablePath;
24+
import org.apache.fluss.metadata.TableBucket;
2425
import org.apache.fluss.metadata.TableInfo;
2526
import org.apache.fluss.metadata.TablePath;
2627
import org.apache.fluss.server.coordinator.MetadataManager;
@@ -160,6 +161,10 @@ public PartitionMetadata getPartitionMetadata(PhysicalTablePath partitionPath) {
160161
}
161162
}
162163

164+
public boolean contains(TableBucket tableBucket) {
165+
return serverMetadataSnapshot.contains(tableBucket);
166+
}
167+
163168
public void updateClusterMetadata(ClusterMetadata clusterMetadata) {
164169
inLock(
165170
metadataLock,

fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1499,8 +1499,12 @@ public Replica getReplicaOrException(TableBucket tableBucket) {
14991499
HostedReplica replica = getReplica(tableBucket);
15001500
if (replica instanceof OnlineReplica) {
15011501
return ((OnlineReplica) replica).getReplica();
1502+
} else if (replica instanceof OfflineReplica) {
1503+
throw new StorageException(tableBucket + " is offline on server " + serverId);
1504+
} else if ((replica instanceof NoneReplica) && metadataCache.contains(tableBucket)) {
1505+
throw new NotLeaderOrFollowerException(
1506+
"server " + serverId + " is not leader or follower for " + tableBucket);
15021507
} else {
1503-
// TODO add metadata cache to judge.
15041508
throw new UnknownTableOrBucketException("Unknown table or bucket: " + tableBucket);
15051509
}
15061510
}

fluss-server/src/test/java/org/apache/fluss/server/metadata/TabletServerMetadataCacheTest.java

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.apache.fluss.config.Configuration;
2424
import org.apache.fluss.exception.TableNotExistException;
2525
import org.apache.fluss.metadata.PhysicalTablePath;
26+
import org.apache.fluss.metadata.TableBucket;
2627
import org.apache.fluss.metadata.TableInfo;
2728
import org.apache.fluss.metadata.TablePath;
2829
import org.apache.fluss.server.coordinator.MetadataManager;
@@ -244,6 +245,62 @@ void testUpdateClusterMetadataRequest() {
244245
initialBucketMetadata);
245246
}
246247

248+
@Test
249+
void testContainsTableBucket() {
250+
tableMetadataList =
251+
Collections.singletonList(
252+
new TableMetadata(DATA1_TABLE_INFO, initialBucketMetadata));
253+
partitionMetadataList =
254+
Collections.singletonList(
255+
new PartitionMetadata(
256+
partitionTableId,
257+
partitionName1,
258+
partitionId1,
259+
initialBucketMetadata));
260+
serverMetadataCache.updateClusterMetadata(
261+
new ClusterMetadata(
262+
coordinatorServer,
263+
aliveTableServers,
264+
tableMetadataList,
265+
partitionMetadataList));
266+
267+
assertThat(
268+
serverMetadataCache.contains(
269+
new TableBucket(
270+
DATA1_TABLE_INFO.getTableId(),
271+
initialBucketMetadata.get(0).getBucketId())))
272+
.isTrue();
273+
assertThat(
274+
serverMetadataCache.contains(
275+
new TableBucket(
276+
DATA1_TABLE_INFO.getTableId(),
277+
1L,
278+
initialBucketMetadata.get(0).getBucketId())))
279+
.isFalse();
280+
assertThat(
281+
serverMetadataCache.contains(
282+
new TableBucket(DATA1_TABLE_INFO.getTableId(), Integer.MAX_VALUE)))
283+
.isFalse();
284+
285+
assertThat(
286+
serverMetadataCache.contains(
287+
new TableBucket(
288+
partitionTableId,
289+
partitionId1,
290+
initialBucketMetadata.get(0).getBucketId())))
291+
.isTrue();
292+
assertThat(
293+
serverMetadataCache.contains(
294+
new TableBucket(
295+
partitionTableId,
296+
initialBucketMetadata.get(0).getBucketId())))
297+
.isFalse();
298+
assertThat(
299+
serverMetadataCache.contains(
300+
new TableBucket(partitionTableId, partitionId1, Integer.MAX_VALUE)))
301+
.isFalse();
302+
}
303+
247304
private void assertTableMetadataEquals(
248305
long tableId,
249306
TableInfo expectedTableInfo,

fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaManagerTest.java

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,10 @@
2222
import org.apache.fluss.cluster.ServerType;
2323
import org.apache.fluss.exception.InvalidCoordinatorException;
2424
import org.apache.fluss.exception.InvalidRequiredAcksException;
25+
import org.apache.fluss.exception.NotLeaderOrFollowerException;
2526
import org.apache.fluss.exception.PartitionNotExistException;
2627
import org.apache.fluss.exception.TableNotExistException;
28+
import org.apache.fluss.exception.UnknownTableOrBucketException;
2729
import org.apache.fluss.metadata.PhysicalTablePath;
2830
import org.apache.fluss.metadata.Schema;
2931
import org.apache.fluss.metadata.TableBucket;
@@ -58,6 +60,7 @@
5860
import org.apache.fluss.server.kv.snapshot.CompletedSnapshot;
5961
import org.apache.fluss.server.log.FetchParams;
6062
import org.apache.fluss.server.log.ListOffsetsParam;
63+
import org.apache.fluss.server.metadata.BucketMetadata;
6164
import org.apache.fluss.server.metadata.ClusterMetadata;
6265
import org.apache.fluss.server.metadata.PartitionMetadata;
6366
import org.apache.fluss.server.metadata.ServerInfo;
@@ -1637,6 +1640,78 @@ void testUpdateMetadata() throws Exception {
16371640
+ "The latest known coordinator epoch is 2");
16381641
}
16391642

1643+
@Test
1644+
void testGetReplicaOrException() {
1645+
// 1. Test online replica
1646+
TableBucket tb = new TableBucket(DATA1_TABLE_ID, 1);
1647+
makeLogTableAsLeader(tb.getBucket());
1648+
assertThat(replicaManager.getReplicaOrException(tb).getTableBucket()).isEqualTo(tb);
1649+
1650+
// 2. Test offline replica
1651+
// TODO: Add test for offline replica
1652+
1653+
// 3. Test not leader or follower replica
1654+
Set<ServerInfo> tsServerInfoList =
1655+
new HashSet<>(
1656+
Arrays.asList(
1657+
new ServerInfo(
1658+
TABLET_SERVER_ID,
1659+
"rack0",
1660+
Endpoint.fromListenersString("CLIENT://localhost:90"),
1661+
ServerType.TABLET_SERVER),
1662+
new ServerInfo(
1663+
2,
1664+
"rack1",
1665+
Endpoint.fromListenersString("CLIENT://localhost:91"),
1666+
ServerType.TABLET_SERVER),
1667+
new ServerInfo(
1668+
3,
1669+
"rack2",
1670+
Endpoint.fromListenersString("CLIENT://localhost:92"),
1671+
ServerType.TABLET_SERVER)));
1672+
TablePath tablePath = TablePath.of("test_db_1", "test_get_replica_or_exception");
1673+
long tableId = 150004L;
1674+
TableInfo tableInfo =
1675+
TableInfo.of(
1676+
tablePath,
1677+
tableId,
1678+
1,
1679+
DATA1_TABLE_DESCRIPTOR,
1680+
System.currentTimeMillis(),
1681+
System.currentTimeMillis());
1682+
TableBucket tableBucket1 = new TableBucket(tableId, 1);
1683+
TableMetadata tableMetadata1 =
1684+
new TableMetadata(
1685+
tableInfo,
1686+
Collections.singletonList(
1687+
new BucketMetadata(
1688+
tableBucket1.getBucket(),
1689+
null,
1690+
null,
1691+
Arrays.asList(1, 2, 3))));
1692+
replicaManager.maybeUpdateMetadataCache(
1693+
0,
1694+
buildClusterMetadata(
1695+
null,
1696+
tsServerInfoList,
1697+
Collections.singletonList(tableMetadata1),
1698+
Collections.emptyList()));
1699+
assertThatThrownBy(() -> replicaManager.getReplicaOrException(tableBucket1))
1700+
.isInstanceOf(NotLeaderOrFollowerException.class);
1701+
// Online the replica
1702+
makeLogTableAsLeader(tableBucket1, false);
1703+
// Now it should return an online replica
1704+
assertThat(replicaManager.getReplicaOrException(tableBucket1).getTableBucket())
1705+
.isEqualTo(tableBucket1);
1706+
1707+
// 4. Test really non-exist replica
1708+
assertThatThrownBy(
1709+
() ->
1710+
replicaManager.getReplicaOrException(
1711+
new TableBucket(DATA1_TABLE_ID, Integer.MAX_VALUE)))
1712+
.isInstanceOf(UnknownTableOrBucketException.class);
1713+
}
1714+
16401715
private void assertReplicaEpochEquals(
16411716
Replica replica, boolean isLeader, int leaderEpoch, int bucketEpoch) {
16421717
assertThat(replica.isLeader()).isEqualTo(isLeader);

0 commit comments

Comments
 (0)