Skip to content

Commit 0d09e8d

Browse files
garyrussellartembilan
authored andcommitted
Support stateful retry
* Polishing - we don't need to keep RetryStates in a map - in this context it is just a holder for the message key. Also PR comments. * Doc Polishing.
1 parent 4505874 commit 0d09e8d

File tree

6 files changed

+244
-7
lines changed

6 files changed

+244
-7
lines changed

spring-kafka/src/main/java/org/springframework/kafka/config/AbstractKafkaListenerContainerFactory.java

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2014-2017 the original author or authors.
2+
* Copyright 2014-2018 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -67,6 +67,8 @@ public abstract class AbstractKafkaListenerContainerFactory<C extends AbstractMe
6767

6868
private RecoveryCallback<? extends Object> recoveryCallback;
6969

70+
private Boolean statefulRetry;
71+
7072
private Boolean batchListener;
7173

7274
private ApplicationEventPublisher applicationEventPublisher;
@@ -143,6 +145,20 @@ public void setRecoveryCallback(RecoveryCallback<? extends Object> recoveryCallb
143145
this.recoveryCallback = recoveryCallback;
144146
}
145147

148+
/**
149+
* When using a {@link RetryTemplate} Set to true to enable stateful retry. Use in
150+
* conjunction with a
151+
* {@link org.springframework.kafka.listener.SeekToCurrentErrorHandler} when retry can
152+
* take excessive time; each failure goes back to the broker, to keep the Consumer
153+
* alive.
154+
* @param statefulRetry true to enable stateful retry.
155+
* @since 2.1.3
156+
*/
157+
public void setStatefulRetry(boolean statefulRetry) {
158+
this.statefulRetry = statefulRetry;
159+
}
160+
161+
146162
/**
147163
* Return true if this endpoint creates a batch listener.
148164
* @return true for a batch listener.
@@ -216,6 +232,9 @@ public C createListenerContainer(KafkaListenerEndpoint endpoint) {
216232
if (this.recoveryCallback != null) {
217233
aklEndpoint.setRecoveryCallback(this.recoveryCallback);
218234
}
235+
if (this.statefulRetry != null) {
236+
aklEndpoint.setStatefulRetry(this.statefulRetry);
237+
}
219238
if (this.batchListener != null) {
220239
aklEndpoint.setBatchListener(this.batchListener);
221240
}

spring-kafka/src/main/java/org/springframework/kafka/config/AbstractKafkaListenerEndpoint.java

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2014-2017 the original author or authors.
2+
* Copyright 2014-2018 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -88,6 +88,8 @@ public abstract class AbstractKafkaListenerEndpoint<K, V>
8888

8989
private RecoveryCallback<? extends Object> recoveryCallback;
9090

91+
private boolean statefulRetry;
92+
9193
private boolean batchListener;
9294

9395
private KafkaTemplate<K, V> replyTemplate;
@@ -303,6 +305,23 @@ public void setRecoveryCallback(RecoveryCallback<? extends Object> recoveryCallb
303305
this.recoveryCallback = recoveryCallback;
304306
}
305307

308+
protected boolean isStatefulRetry() {
309+
return this.statefulRetry;
310+
}
311+
312+
/**
313+
* When using a {@link RetryTemplate}, set to true to enable stateful retry. Use in
314+
* conjunction with a
315+
* {@link org.springframework.kafka.listener.SeekToCurrentErrorHandler} when retry can
316+
* take excessive time; each failure goes back to the broker, to keep the Consumer
317+
* alive.
318+
* @param statefulRetry true to enable stateful retry.
319+
* @since 2.1.3
320+
*/
321+
public void setStatefulRetry(boolean statefulRetry) {
322+
this.statefulRetry = statefulRetry;
323+
}
324+
306325
@Override
307326
public String getClientIdPrefix() {
308327
return this.clientIdPrefix;
@@ -356,7 +375,7 @@ private void setupMessageListener(MessageListenerContainer container, MessageCon
356375
Assert.state(messageListener != null, "Endpoint [" + this + "] must provide a non null message listener");
357376
if (this.retryTemplate != null) {
358377
messageListener = new RetryingMessageListenerAdapter<>((MessageListener<K, V>) messageListener,
359-
this.retryTemplate, this.recoveryCallback);
378+
this.retryTemplate, this.recoveryCallback, this.statefulRetry);
360379
}
361380
if (this.recordFilterStrategy != null) {
362381
if (this.batchListener) {

spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/RetryingMessageListenerAdapter.java

Lines changed: 31 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2017 the original author or authors.
2+
* Copyright 2016-2018 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -23,6 +23,8 @@
2323
import org.springframework.kafka.listener.MessageListener;
2424
import org.springframework.kafka.support.Acknowledgment;
2525
import org.springframework.retry.RecoveryCallback;
26+
import org.springframework.retry.RetryState;
27+
import org.springframework.retry.support.DefaultRetryState;
2628
import org.springframework.retry.support.RetryTemplate;
2729
import org.springframework.util.Assert;
2830

@@ -56,6 +58,8 @@ public class RetryingMessageListenerAdapter<K, V>
5658
*/
5759
public static final String CONTEXT_RECORD = "record";
5860

61+
private boolean stateful;
62+
5963
/**
6064
* Construct an instance with the provided template and delegate. The exception will
6165
* be thrown to the container after retries are exhausted.
@@ -75,13 +79,38 @@ public RetryingMessageListenerAdapter(MessageListener<K, V> messageListener, Ret
7579
*/
7680
public RetryingMessageListenerAdapter(MessageListener<K, V> messageListener, RetryTemplate retryTemplate,
7781
RecoveryCallback<? extends Object> recoveryCallback) {
82+
this(messageListener, retryTemplate, recoveryCallback, false);
83+
}
84+
85+
/**
86+
* Construct an instance with the provided template, callback and delegate. When using
87+
* stateful retry, the retry context key is a concatenated String
88+
* {@code topic-partition-offset}. A
89+
* {@link org.springframework.kafka.listener.SeekToCurrentErrorHandler} is required in
90+
* the listener container because stateful retry will throw the exception to the
91+
* container for each delivery attempt.
92+
* @param messageListener the delegate listener.
93+
* @param retryTemplate the template.
94+
* @param recoveryCallback the recovery callback; if null, the exception will be
95+
* thrown to the container after retries are exhausted.
96+
* @param stateful true for stateful retry.
97+
* @since 2.1.3
98+
*/
99+
public RetryingMessageListenerAdapter(MessageListener<K, V> messageListener, RetryTemplate retryTemplate,
100+
RecoveryCallback<? extends Object> recoveryCallback, boolean stateful) {
101+
78102
super(messageListener, retryTemplate, recoveryCallback);
79103
Assert.notNull(messageListener, "'messageListener' cannot be null");
104+
this.stateful = stateful;
80105
}
81106

82107
@Override
83108
public void onMessage(final ConsumerRecord<K, V> record, final Acknowledgment acknowledgment,
84109
final Consumer<?, ?> consumer) {
110+
RetryState retryState = null;
111+
if (this.stateful) {
112+
retryState = new DefaultRetryState(record.topic() + "-" + record.partition() + "-" + record.offset());
113+
}
85114
getRetryTemplate().execute(context -> {
86115
context.setAttribute(CONTEXT_RECORD, record);
87116
switch (RetryingMessageListenerAdapter.this.delegateType) {
@@ -103,7 +132,7 @@ public void onMessage(final ConsumerRecord<K, V> record, final Acknowledgment ac
103132
}
104133
return null;
105134
},
106-
getRecoveryCallback());
135+
getRecoveryCallback(), retryState);
107136
}
108137

109138
/*
Lines changed: 149 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,149 @@
1+
/*
2+
* Copyright 2018 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.kafka.annotation;
18+
19+
import static org.assertj.core.api.Assertions.assertThat;
20+
21+
import java.util.List;
22+
import java.util.Map;
23+
import java.util.concurrent.CountDownLatch;
24+
import java.util.concurrent.TimeUnit;
25+
26+
import org.apache.kafka.clients.consumer.Consumer;
27+
import org.apache.kafka.clients.consumer.ConsumerConfig;
28+
import org.apache.kafka.clients.consumer.ConsumerRecord;
29+
import org.junit.ClassRule;
30+
import org.junit.Test;
31+
import org.junit.runner.RunWith;
32+
33+
import org.springframework.beans.factory.annotation.Autowired;
34+
import org.springframework.context.annotation.Bean;
35+
import org.springframework.context.annotation.Configuration;
36+
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
37+
import org.springframework.kafka.config.KafkaListenerContainerFactory;
38+
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
39+
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
40+
import org.springframework.kafka.core.KafkaTemplate;
41+
import org.springframework.kafka.core.ProducerFactory;
42+
import org.springframework.kafka.listener.MessageListenerContainer;
43+
import org.springframework.kafka.listener.SeekToCurrentErrorHandler;
44+
import org.springframework.kafka.test.rule.KafkaEmbedded;
45+
import org.springframework.kafka.test.utils.KafkaTestUtils;
46+
import org.springframework.retry.support.RetryTemplate;
47+
import org.springframework.test.annotation.DirtiesContext;
48+
import org.springframework.test.context.junit4.SpringRunner;
49+
50+
/**
51+
* @author Gary Russell
52+
* @since 2.1.3
53+
*
54+
*/
55+
@RunWith(SpringRunner.class)
56+
@DirtiesContext
57+
public class StatefulRetryTests {
58+
59+
private static final String DEFAULT_TEST_GROUP_ID = "statefulRetry";
60+
61+
@ClassRule
62+
public static KafkaEmbedded embeddedKafka = new KafkaEmbedded(1, true, 1, "sr1");
63+
64+
@Autowired
65+
private Config config;
66+
67+
@Autowired
68+
private KafkaTemplate<Integer, String> template;
69+
70+
@Test
71+
public void testStatefulRetry() throws Exception {
72+
this.template.send("sr1", "foo");
73+
assertThat(this.config.latch1.await(10, TimeUnit.SECONDS)).isTrue();
74+
assertThat(this.config.latch2.await(10, TimeUnit.SECONDS)).isTrue();
75+
assertThat(this.config.seekPerformed).isTrue();
76+
}
77+
78+
@Configuration
79+
@EnableKafka
80+
public static class Config {
81+
82+
private final CountDownLatch latch1 = new CountDownLatch(3);
83+
84+
private final CountDownLatch latch2 = new CountDownLatch(1);
85+
86+
private boolean seekPerformed;
87+
88+
@Bean
89+
public KafkaListenerContainerFactory<?> kafkaListenerContainerFactory() {
90+
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
91+
new ConcurrentKafkaListenerContainerFactory<>();
92+
factory.setConsumerFactory(consumerFactory());
93+
factory.getContainerProperties().setErrorHandler(new SeekToCurrentErrorHandler() {
94+
95+
@Override
96+
public void handle(Exception thrownException, List<ConsumerRecord<?, ?>> records,
97+
Consumer<?, ?> consumer, MessageListenerContainer container) {
98+
Config.this.seekPerformed = true;
99+
super.handle(thrownException, records, consumer, container);
100+
}
101+
102+
});
103+
factory.setStatefulRetry(true);
104+
factory.setRetryTemplate(new RetryTemplate());
105+
factory.setRecoveryCallback(c -> {
106+
this.latch2.countDown();
107+
return null;
108+
});
109+
return factory;
110+
}
111+
112+
@Bean
113+
public DefaultKafkaConsumerFactory<Integer, String> consumerFactory() {
114+
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
115+
}
116+
117+
@Bean
118+
public Map<String, Object> consumerConfigs() {
119+
Map<String, Object> consumerProps =
120+
KafkaTestUtils.consumerProps(DEFAULT_TEST_GROUP_ID, "false", embeddedKafka);
121+
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
122+
return consumerProps;
123+
}
124+
125+
@Bean
126+
public KafkaTemplate<Integer, String> template() {
127+
KafkaTemplate<Integer, String> kafkaTemplate = new KafkaTemplate<>(producerFactory());
128+
return kafkaTemplate;
129+
}
130+
131+
@Bean
132+
public ProducerFactory<Integer, String> producerFactory() {
133+
return new DefaultKafkaProducerFactory<>(producerConfigs());
134+
}
135+
136+
@Bean
137+
public Map<String, Object> producerConfigs() {
138+
return KafkaTestUtils.producerProps(embeddedKafka);
139+
}
140+
141+
@KafkaListener(id = "retry", topics = "sr1", groupId = "sr1")
142+
public void listen1(String in) {
143+
this.latch1.countDown();
144+
throw new RuntimeException("retry");
145+
}
146+
147+
}
148+
149+
}

src/reference/asciidoc/kafka.adoc

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1049,15 +1049,16 @@ When using `@KafkaListener`, set the `RecordFilterStrategy` (and optionally `ack
10491049

10501050
In addition, a `FilteringBatchMessageListenerAdapter` is provided, for when using a batch <<message-listeners, message listener>>.
10511051

1052+
[[retrying-deliveries]]
10521053
===== Retrying Deliveries
10531054

10541055
If your listener throws an exception, the default behavior is to invoke the `ErrorHandler`, if configured, or logged otherwise.
10551056

10561057
NOTE: Two error handler interfaces are provided `ErrorHandler` and `BatchErrorHandler`; the appropriate type must be configured to match the <<message-listeners, Message Listener>>.
10571058

1058-
To retry deliveries, convenient listener adapters - `RetryingMessageListenerAdapter` and `RetryingAcknowledgingMessageListenerAdapter` are provided, depending on whether you are using a `MessageListener` or an `AcknowledgingMessageListener`.
1059+
To retry deliveries, a convenient listener adapter `RetryingMessageListenerAdapter` is provided.
10591060

1060-
These can be configured with a `RetryTemplate` and `RecoveryCallback<Void>` - see the https://github.com/spring-projects/spring-retry[spring-retry]
1061+
It can be configured with a `RetryTemplate` and `RecoveryCallback<Void>` - see the https://github.com/spring-projects/spring-retry[spring-retry]
10611062
project for information about these components.
10621063
If a recovery callback is not provided, the exception is thrown to the container after retries are exhausted.
10631064
In that case, the `ErrorHandler` will be invoked, if configured, or logged otherwise.
@@ -1073,6 +1074,23 @@ See its javadocs for more information.
10731074
A retry adapter is not provided for any of the batch <<message-listeners, message listeners>> because the framework has no knowledge of where, in a batch, the failure occurred.
10741075
Users wishing retry capabilities, when using a batch listener, are advised to use a `RetryTemplate` within the listener itself.
10751076

1077+
[[stateful-retry]]
1078+
===== Stateful Retry
1079+
1080+
It is important to understand that the retry discussed above suspends the consumer thread (if a `BackOffPolicy` is used); there are no calls to `Consumer.poll()` during the retries.
1081+
Kafka has two properties to determine consumer health; the `session.timeout.ms` is used to determine if the consumer is active.
1082+
Since version `0.10.1.0` heartbeats are sent on a background thread so a slow consumer no longer affects that.
1083+
`max.poll.interval.ms` (default 5 minutes) is used to determine if a consumer appears to be hung (taking too long to process records from the last poll).
1084+
If the time between `poll()` s exceeds this, the broker will revoke the assigned partitions and perform a rebalance.
1085+
For lengthy retry sequences, with back off, this can easily happen.
1086+
1087+
Since _version 2.1.3_, you can avoid this problem by using stateful retry in conjunction with a `SeekToCurrentErrorHandler`.
1088+
In this case, each delivery attempt will throw the exception back to the container and the error handler will re-seek the unprocessed offsets and the same message will be redelivered by the next `poll()`.
1089+
This avoids the problem of exceeding the `max.poll.interval.ms` property (as long as an individual delay between attempts does not exceed it).
1090+
So, when using an `ExponentialBackOffPolicy`, it's important to ensure that the `maxInterval` is rather less than the `max.poll.interval.ms` property.
1091+
To enable stateful retry, use the `RetryingMessageListenerAdapter` constructor that takes a `stateful` `boolean` argument (set it to `true`).
1092+
When configuring using the listener container factory (for `@KafkaListener` s), set the factory's `statefulRetry` property to `true`.
1093+
10761094
[[idle-containers]]
10771095
===== Detecting Idle and Non-Responsive Consumers
10781096

src/reference/asciidoc/whats-new.adoc

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,9 @@ See <<serdes>> for more information.
1515
Container Error handlers are now provided for both record and batch listeners that treat any exceptions thrown by the listener as fatal; they stop the container.
1616
See <<annotation-error-handling>> for more information.
1717

18+
==== Stateful Retry
19+
20+
Starting with _version 2.1.3_, stateful retry can be configured; see <<stateful-retry>> for more information.
1821

1922
==== Client ID
2023

0 commit comments

Comments
 (0)