Skip to content

Commit dac5c20

Browse files
authored
Document share consumer poison message protection and delivery count
Add comprehensive documentation on KIP-932's broker-side poison message protection mechanism. Clarifies that delivery count is broker-internal and not exposed to applications. Includes: - How delivery count works and configuration via group.share.delivery.attempt.limit - Retry strategy recommendations using RELEASE/REJECT/ACCEPT acknowledgment types - Example showing proper error handling patterns for transient vs permanent failures Addresses common questions about redelivery semantics in share consumers. Signed-off-by: Soby Chacko <[email protected]>
1 parent 44fb1ac commit dac5c20

File tree

1 file changed

+91
-0
lines changed

1 file changed

+91
-0
lines changed

spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/kafka-queues.adoc

Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -667,6 +667,96 @@ public void validateData(ConsumerRecord<String, String> record, ShareAcknowledgm
667667
}
668668
----
669669

670+
[[share-poison-message-protection]]
671+
== Poison Message Protection and Delivery Count
672+
673+
KIP-932 includes broker-side poison message protection to prevent unprocessable records from being endlessly redelivered.
674+
675+
=== How It Works
676+
677+
Every time a record is acquired by a consumer in a share group, the broker increments an internal delivery count.
678+
The first acquisition sets the delivery count to 1, and each subsequent acquisition increments it.
679+
When the delivery count reaches the configured limit (default: 5), the record moves to **Archived** state and is not eligible for additional delivery attempts.
680+
681+
=== Configuration
682+
683+
The maximum delivery attempts can be configured per share group using the Admin API:
684+
685+
[source,java]
686+
----
687+
private void configureMaxDeliveryAttempts(String bootstrapServers, String groupId) throws Exception {
688+
Map<String, Object> adminProps = new HashMap<>();
689+
adminProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
690+
691+
try (Admin admin = Admin.create(adminProps)) {
692+
ConfigResource configResource = new ConfigResource(ConfigResource.Type.GROUP, groupId);
693+
694+
// Default is 5, adjust based on your retry tolerance
695+
ConfigEntry maxAttempts = new ConfigEntry("group.share.delivery.attempt.limit", "10");
696+
697+
Map<ConfigResource, Collection<AlterConfigOp>> configs = Map.of(
698+
configResource, List.of(new AlterConfigOp(maxAttempts, AlterConfigOp.OpType.SET))
699+
);
700+
701+
admin.incrementalAlterConfigs(configs).all().get();
702+
}
703+
}
704+
----
705+
706+
[IMPORTANT]
707+
====
708+
**Delivery Count is Not Exposed to Applications**
709+
710+
The delivery count is maintained internally by the broker and is **not exposed to consumer applications**.
711+
This is an intentional design decision in KIP-932.
712+
The delivery count is approximate and serves as a poison message protection mechanism, not a precise redelivery counter.
713+
Applications cannot query or access this value through any API.
714+
715+
For application-level retry logic, use the acknowledgment types:
716+
717+
* `RELEASE` - Make record available for redelivery (contributes to delivery count)
718+
* `REJECT` - Mark as permanently failed (does not cause redelivery)
719+
* `ACCEPT` - Successfully processed (does not cause redelivery)
720+
721+
The broker automatically prevents endless redelivery once `group.share.delivery.attempt.limit` is reached, moving the record to Archived state.
722+
====
723+
724+
=== Retry Strategy Recommendations
725+
726+
[source,java]
727+
----
728+
@KafkaListener(topics = "orders", containerFactory = "explicitShareKafkaListenerContainerFactory")
729+
public void processOrder(ConsumerRecord<String, String> record, ShareAcknowledgment ack) {
730+
try {
731+
// Attempt to process the order
732+
orderService.process(record.value());
733+
ack.acknowledge(); // ACCEPT - successfully processed
734+
}
735+
catch (TransientException e) {
736+
// Temporary failure (network issue, service unavailable, etc.)
737+
// Release the record for redelivery
738+
// Broker will retry up to group.share.delivery.attempt.limit times
739+
logger.warn("Transient error processing order, will retry: {}", e.getMessage());
740+
ack.release(); // RELEASE - make available for retry
741+
}
742+
catch (ValidationException e) {
743+
// Permanent semantic error (invalid data format, business rule violation, etc.)
744+
// Do not retry - this record will never succeed
745+
logger.error("Invalid order data, rejecting: {}", e.getMessage());
746+
ack.reject(); // REJECT - permanent failure, do not retry
747+
}
748+
catch (Exception e) {
749+
// Unknown error - typically safer to reject to avoid infinite loops
750+
// But could also release if you suspect it might be transient
751+
logger.error("Unexpected error processing order, rejecting: {}", e.getMessage());
752+
ack.reject(); // REJECT - avoid poison message loops
753+
}
754+
}
755+
----
756+
757+
The broker's poison message protection ensures that even if you always use `RELEASE` for errors, records won't be retried endlessly.
758+
They will automatically be archived after reaching the delivery attempt limit.
759+
670760
[[share-differences-from-regular-consumers]]
671761
== Differences from Regular Consumers
672762

@@ -678,6 +768,7 @@ Share consumers differ from regular consumers in several key ways:
678768
4. **Record-Level Acknowledgment**: Supports explicit acknowledgment with `ACCEPT`, `RELEASE`, and `REJECT` types
679769
5. **Different Group Management**: Share groups use different coordinator protocols
680770
6. **No Batch Processing**: Share consumers process records individually, not in batches
771+
7. **Broker-Side Retry Management**: Delivery count tracking and poison message protection are managed by the broker, not exposed to applications
681772

682773
[[share-limitations-and-considerations]]
683774
== Limitations and Considerations

0 commit comments

Comments
 (0)