|
26 | 26 | import io.kafbat.ui.model.TopicCreationDTO; |
27 | 27 | import io.kafbat.ui.model.TopicUpdateDTO; |
28 | 28 | import java.time.Duration; |
| 29 | +import java.util.ArrayList; |
29 | 30 | import java.util.Collection; |
30 | 31 | import java.util.Collections; |
31 | 32 | import java.util.Comparator; |
@@ -288,6 +289,18 @@ private Map<TopicPartition, Optional<NewPartitionReassignment>> getPartitionsRea |
288 | 289 | Map<Integer, Integer> brokersUsage = getBrokersMap(cluster, currentAssignment); |
289 | 290 | int currentReplicationFactor = topic.getReplicationFactor(); |
290 | 291 |
|
| 292 | + // Get online nodes |
| 293 | + List<Integer> onlineNodes = statisticsCache.get(cluster).getClusterDescription().getNodes() |
| 294 | + .stream().map(Node::id).toList(); |
| 295 | + |
| 296 | + // keep only online nodes |
| 297 | + for (Map.Entry<Integer, List<Integer>> parition : currentAssignment.entrySet()) { |
| 298 | + parition.getValue().retainAll(onlineNodes); |
| 299 | + } |
| 300 | + |
| 301 | + brokersUsage.keySet().retainAll(onlineNodes); |
| 302 | + |
| 303 | + |
291 | 304 | // If we should to increase Replication factor |
292 | 305 | if (replicationFactorChange.getTotalReplicationFactor() > currentReplicationFactor) { |
293 | 306 | // For each partition |
@@ -320,28 +333,35 @@ private Map<TopicPartition, Optional<NewPartitionReassignment>> getPartitionsRea |
320 | 333 | var partition = assignmentEntry.getKey(); |
321 | 334 | var brokers = assignmentEntry.getValue(); |
322 | 335 |
|
| 336 | + // Copy from online nodes if all nodes are offline |
| 337 | + if (brokers.isEmpty()) { |
| 338 | + brokers = new ArrayList<>(onlineNodes); |
| 339 | + } |
| 340 | + |
323 | 341 | // Get brokers list sorted by usage in reverse order |
324 | 342 | var brokersUsageList = brokersUsage.entrySet().stream() |
325 | 343 | .sorted(Map.Entry.comparingByValue(Comparator.reverseOrder())) |
326 | 344 | .map(Map.Entry::getKey) |
327 | 345 | .toList(); |
328 | 346 |
|
| 347 | + Integer leader = topic.getPartitions().get(partition).getLeader(); |
| 348 | + |
329 | 349 | // Iterate brokers and try to remove them from assignment |
330 | 350 | // while partition replicas count != requested replication factor |
331 | 351 | for (Integer broker : brokersUsageList) { |
| 352 | + if (brokers.size() == replicationFactorChange.getTotalReplicationFactor()) { |
| 353 | + break; |
| 354 | + } |
332 | 355 | // Check is the broker the leader of partition |
333 | | - if (!topic.getPartitions().get(partition).getLeader() |
334 | | - .equals(broker)) { |
| 356 | + if (leader == null || !leader.equals(broker)) { |
335 | 357 | brokers.remove(broker); |
336 | 358 | brokersUsage.merge(broker, -1, Integer::sum); |
337 | 359 | } |
338 | | - if (brokers.size() == replicationFactorChange.getTotalReplicationFactor()) { |
339 | | - break; |
340 | | - } |
341 | 360 | } |
342 | 361 | if (brokers.size() != replicationFactorChange.getTotalReplicationFactor()) { |
343 | 362 | throw new ValidationException("Something went wrong during removing replicas"); |
344 | 363 | } |
| 364 | + currentAssignment.put(partition, brokers); |
345 | 365 | } |
346 | 366 | } else { |
347 | 367 | throw new ValidationException("Replication factor already equals requested"); |
@@ -374,7 +394,7 @@ private Map<Integer, Integer> getBrokersMap(KafkaCluster cluster, |
374 | 394 | c -> 0 |
375 | 395 | )); |
376 | 396 | currentAssignment.values().forEach(brokers -> brokers |
377 | | - .forEach(broker -> result.put(broker, result.get(broker) + 1))); |
| 397 | + .forEach(broker -> result.put(broker, result.getOrDefault(broker, 0) + 1))); |
378 | 398 |
|
379 | 399 | return result; |
380 | 400 | } |
|
0 commit comments