Skip to content

Commit 538cfe4

Browse files
authored
GH-3172: Support consumer-side batching
Resolves #3172 When the listener container supports creating batches of consumed messages, present the batch as the message payload - either a `List<Message<?>>` or `List<SomePayload>`. * Add 'since' to new method.
1 parent 867a8cf commit 538cfe4

File tree

14 files changed

+525
-39
lines changed

14 files changed

+525
-39
lines changed

build.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@ ext {
9090
rsocketVersion = '1.0.0-RC6'
9191
servletApiVersion = '4.0.1'
9292
smackVersion = '4.3.4'
93-
springAmqpVersion = project.hasProperty('springAmqpVersion') ? project.springAmqpVersion : '2.2.3.RELEASE'
93+
springAmqpVersion = project.hasProperty('springAmqpVersion') ? project.springAmqpVersion : '2.2.4.BUILD-SNAPSHOT'
9494
springDataVersion = project.hasProperty('springDataVersion') ? project.springDataVersion : 'Neumann-BUILD-SNAPSHOT'
9595
springSecurityVersion = project.hasProperty('springSecurityVersion') ? project.springSecurityVersion : '5.3.0.RC1'
9696
springRetryVersion = '1.2.5.RELEASE'

spring-integration-amqp/src/main/java/org/springframework/integration/amqp/config/AbstractAmqpInboundAdapterParser.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,7 @@ protected void doParse(Element element, ParserContext parserContext, BeanDefinit
119119
IntegrationNamespaceUtils.setReferenceIfAttributeDefined(builder, element, "error-channel");
120120
IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, "auto-startup");
121121
IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, "phase");
122+
IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, "batch-mode");
122123
configureChannels(element, parserContext, builder);
123124
AbstractBeanDefinition adapterBeanDefinition = builder.getRawBeanDefinition();
124125
adapterBeanDefinition.setResource(parserContext.getReaderContext().getResource());

spring-integration-amqp/src/main/java/org/springframework/integration/amqp/dsl/AmqpInboundChannelAdapterSMLCSpec.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import java.util.function.Consumer;
2020

2121
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
22+
import org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter.BatchMode;
2223

2324
/**
2425
* Spec for an inbound channel adapter with a {@link SimpleMessageListenerContainer}.
@@ -41,4 +42,15 @@ public AmqpInboundChannelAdapterSMLCSpec configureContainer(Consumer<SimpleMessa
4142
return this;
4243
}
4344

45+
/**
46+
* Set the {@link BatchMode} to use when the container is configured to support
47+
* batching consumed records.
48+
* @param batchMode the batch mode.
49+
* @return the spec.
50+
* @since 5.3
51+
*/
52+
public AmqpInboundChannelAdapterSMLCSpec batchMode(BatchMode batchMode) {
53+
this.target.setBatchMode(batchMode);
54+
return this;
55+
}
4456
}

spring-integration-amqp/src/main/java/org/springframework/integration/amqp/dsl/SimpleMessageListenerContainerSpec.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,4 +119,15 @@ public SimpleMessageListenerContainerSpec batchSize(int batchSize) {
119119
return this;
120120
}
121121

122+
/**
123+
* Set to true to enable batching of consumed messages.
124+
* @param enabled true to enable.
125+
* @return the spec.
126+
* @since 5.3
127+
*/
128+
public SimpleMessageListenerContainerSpec consumerBatchEnabled(boolean enabled) {
129+
this.listenerContainer.setConsumerBatchEnabled(enabled);
130+
return this;
131+
}
132+
122133
}

spring-integration-amqp/src/main/java/org/springframework/integration/amqp/inbound/AmqpInboundChannelAdapter.java

Lines changed: 177 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package org.springframework.integration.amqp.inbound;
1818

1919
import java.util.ArrayList;
20+
import java.util.HashMap;
2021
import java.util.List;
2122
import java.util.Map;
2223
import java.util.concurrent.atomic.AtomicInteger;
@@ -26,6 +27,7 @@
2627
import org.springframework.amqp.rabbit.batch.BatchingStrategy;
2728
import org.springframework.amqp.rabbit.batch.SimpleBatchingStrategy;
2829
import org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer;
30+
import org.springframework.amqp.rabbit.listener.api.ChannelAwareBatchMessageListener;
2931
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
3032
import org.springframework.amqp.support.AmqpHeaders;
3133
import org.springframework.amqp.support.converter.MessageConversionException;
@@ -42,6 +44,7 @@
4244
import org.springframework.integration.endpoint.MessageProducerSupport;
4345
import org.springframework.integration.support.ErrorMessageUtils;
4446
import org.springframework.retry.RecoveryCallback;
47+
import org.springframework.retry.RetryOperations;
4548
import org.springframework.retry.support.RetrySynchronizationManager;
4649
import org.springframework.retry.support.RetryTemplate;
4750
import org.springframework.util.Assert;
@@ -61,6 +64,25 @@
6164
public class AmqpInboundChannelAdapter extends MessageProducerSupport implements
6265
OrderlyShutdownCapable {
6366

67+
/**
68+
* Defines the payload type when the listener container is configured with consumerBatchEnabled.
69+
*/
70+
public enum BatchMode {
71+
72+
/**
73+
* Payload is a {@code List<Message<?>>} where each element is a message is
74+
* converted from the Spring AMQP Message.
75+
*/
76+
MESSAGES,
77+
78+
/**
79+
* Payload is a {@code List<?>} where each element is the converted body of the
80+
* Spring AMQP Message.
81+
*/
82+
EXTRACT_PAYLOADS
83+
84+
}
85+
6486
private static final ThreadLocal<AttributeAccessor> ATTRIBUTES_HOLDER = new ThreadLocal<>();
6587

6688
private final AbstractMessageListenerContainer messageListenerContainer;
@@ -77,6 +99,8 @@ public class AmqpInboundChannelAdapter extends MessageProducerSupport implements
7799

78100
private boolean bindSourceMessage;
79101

102+
private BatchMode batchMode = BatchMode.MESSAGES;
103+
80104
public AmqpInboundChannelAdapter(AbstractMessageListenerContainer listenerContainer) {
81105
Assert.notNull(listenerContainer, "listenerContainer must not be null");
82106
Assert.isNull(listenerContainer.getMessageListener(),
@@ -124,8 +148,9 @@ public void setRecoveryCallback(RecoveryCallback<?> recoveryCallback) {
124148
}
125149

126150
/**
127-
* Set a batching strategy to use when de-batching messages.
128-
* Default is {@link SimpleBatchingStrategy}.
151+
* Set a batching strategy to use when de-batching messages created by a batching
152+
* producer (such as the BatchingRabbitTemplate). Default is
153+
* {@link SimpleBatchingStrategy}.
129154
* @param batchingStrategy the strategy.
130155
* @since 5.2
131156
*/
@@ -144,6 +169,17 @@ public void setBindSourceMessage(boolean bindSourceMessage) {
144169
this.bindSourceMessage = bindSourceMessage;
145170
}
146171

172+
/**
173+
* When the listener container is configured with consumerBatchEnabled, set the payload
174+
* type for messages generated for the batches. Default is {@link BatchMode#MESSAGES}.
175+
* @param batchMode the batch mode.
176+
* @since 5.3
177+
*/
178+
public void setBatchMode(BatchMode batchMode) {
179+
Assert.notNull(batchMode, "'batchMode' cannot be null");
180+
this.batchMode = batchMode;
181+
}
182+
147183
@Override
148184
public String getComponentType() {
149185
return "amqp:inbound-channel-adapter";
@@ -156,7 +192,13 @@ protected void onInit() {
156192
+ "provided; use an 'ErrorMessageSendingRecoverer' in the 'recoveryCallback' property to "
157193
+ "send an error message when retries are exhausted");
158194
}
159-
Listener messageListener = new Listener();
195+
Listener messageListener;
196+
if (this.messageListenerContainer.isConsumerBatchEnabled()) {
197+
messageListener = new BatchListener();
198+
}
199+
else {
200+
messageListener = new Listener();
201+
}
160202
this.messageListenerContainer.setMessageListener(messageListener);
161203
this.messageListenerContainer.afterPropertiesSet();
162204
super.onInit();
@@ -193,7 +235,7 @@ public int afterShutdown() {
193235
* @param message the Spring Messaging message to use.
194236
* @since 4.3.10
195237
*/
196-
private void setAttributesIfNecessary(Message amqpMessage, org.springframework.messaging.Message<?> message) {
238+
private void setAttributesIfNecessary(Object amqpMessage, org.springframework.messaging.Message<?> message) {
197239
boolean needHolder = getErrorChannel() != null && this.retryTemplate == null;
198240
boolean needAttributes = needHolder || this.retryTemplate != null;
199241
if (needHolder) {
@@ -223,37 +265,48 @@ protected AttributeAccessor getErrorMessageAttributes(org.springframework.messag
223265

224266
protected class Listener implements ChannelAwareMessageListener {
225267

268+
protected final MessageConverter converter = AmqpInboundChannelAdapter.this.messageConverter; // NOSONAR
269+
270+
protected final boolean manualAcks = AcknowledgeMode.MANUAL ==
271+
AmqpInboundChannelAdapter.this.messageListenerContainer.getAcknowledgeMode(); // NNOSONAR
272+
273+
protected final RetryOperations retryOps = AmqpInboundChannelAdapter.this.retryTemplate; // NOSONAR
274+
275+
protected final RecoveryCallback<?> recoverer = AmqpInboundChannelAdapter.this.recoveryCallback; // NOSONAR
276+
277+
protected Listener() {
278+
}
279+
226280
@Override
227281
public void onMessage(final Message message, final Channel channel) {
228-
boolean retryDisabled = AmqpInboundChannelAdapter.this.retryTemplate == null;
229282
try {
230-
if (retryDisabled) {
283+
if (this.retryOps == null) {
231284
createAndSend(message, channel);
232285
}
233286
else {
234287
final org.springframework.messaging.Message<Object> toSend = createMessage(message, channel);
235-
AmqpInboundChannelAdapter.this.retryTemplate.execute(
288+
this.retryOps.execute(
236289
context -> {
237290
StaticMessageHeaderAccessor.getDeliveryAttempt(toSend).incrementAndGet();
238291
setAttributesIfNecessary(message, toSend);
239292
sendMessage(toSend);
240293
return null;
241-
}, AmqpInboundChannelAdapter.this.recoveryCallback);
294+
}, this.recoverer);
242295
}
243296
}
244297
catch (MessageConversionException e) {
245298
if (getErrorChannel() != null) {
246299
setAttributesIfNecessary(message, null);
247300
getMessagingTemplate()
248301
.send(getErrorChannel(), buildErrorMessage(null,
249-
EndpointUtils.errorMessagePayload(message, channel, isManualAck(), e)));
302+
EndpointUtils.errorMessagePayload(message, channel, this.manualAcks, e)));
250303
}
251304
else {
252305
throw e;
253306
}
254307
}
255308
finally {
256-
if (retryDisabled) {
309+
if (this.retryOps == null) {
257310
ATTRIBUTES_HOLDER.remove();
258311
}
259312
}
@@ -265,38 +318,139 @@ private void createAndSend(Message message, Channel channel) {
265318
sendMessage(messagingMessage);
266319
}
267320

268-
private org.springframework.messaging.Message<Object> createMessage(Message message, Channel channel) {
321+
protected org.springframework.messaging.Message<Object> createMessage(Message message, Channel channel) {
322+
Object payload = convertPayload(message);
323+
Map<String, Object> headers = AmqpInboundChannelAdapter.this.headerMapper
324+
.toHeadersFromRequest(message.getMessageProperties());
325+
if (AmqpInboundChannelAdapter.this.bindSourceMessage) {
326+
headers.put(IntegrationMessageHeaderAccessor.SOURCE_DATA, message);
327+
}
328+
long deliveryTag = message.getMessageProperties().getDeliveryTag();
329+
return finalize(channel, payload, headers, deliveryTag);
330+
}
331+
332+
protected Object convertPayload(Message message) {
269333
Object payload;
270334
if (AmqpInboundChannelAdapter.this.batchingStrategy.canDebatch(message.getMessageProperties())) {
271335
List<Object> payloads = new ArrayList<>();
272336
AmqpInboundChannelAdapter.this.batchingStrategy.deBatch(message, fragment -> payloads
273-
.add(AmqpInboundChannelAdapter.this.messageConverter.fromMessage(fragment)));
337+
.add(this.converter.fromMessage(fragment)));
274338
payload = payloads;
275339
}
276340
else {
277-
payload = AmqpInboundChannelAdapter.this.messageConverter.fromMessage(message);
341+
payload = this.converter.fromMessage(message);
278342
}
279-
Map<String, Object> headers = AmqpInboundChannelAdapter.this.headerMapper
280-
.toHeadersFromRequest(message.getMessageProperties());
281-
if (isManualAck()) {
282-
headers.put(AmqpHeaders.DELIVERY_TAG, message.getMessageProperties().getDeliveryTag());
343+
return payload;
344+
}
345+
346+
protected org.springframework.messaging.Message<Object> finalize(Channel channel, Object payload,
347+
Map<String, Object> headers, long deliveryTag) {
348+
349+
if (this.manualAcks) {
350+
headers.put(AmqpHeaders.DELIVERY_TAG, deliveryTag);
283351
headers.put(AmqpHeaders.CHANNEL, channel);
284352
}
285353
if (AmqpInboundChannelAdapter.this.retryTemplate != null) {
286354
headers.put(IntegrationMessageHeaderAccessor.DELIVERY_ATTEMPT, new AtomicInteger());
287355
}
288-
if (AmqpInboundChannelAdapter.this.bindSourceMessage) {
289-
headers.put(IntegrationMessageHeaderAccessor.SOURCE_DATA, message);
290-
}
291356
return getMessageBuilderFactory()
292357
.withPayload(payload)
293358
.copyHeaders(headers)
294359
.build();
295360
}
296361

297-
private boolean isManualAck() {
298-
return AcknowledgeMode.MANUAL ==
299-
AmqpInboundChannelAdapter.this.messageListenerContainer.getAcknowledgeMode();
362+
}
363+
364+
protected class BatchListener extends Listener implements ChannelAwareBatchMessageListener {
365+
366+
private final boolean batchModeMessages = BatchMode.MESSAGES.equals(AmqpInboundChannelAdapter.this.batchMode);
367+
368+
@Override
369+
public void onMessageBatch(List<Message> messages, Channel channel) {
370+
List<?> converted;
371+
if (this.batchModeMessages) {
372+
converted = convertMessages(messages, channel);
373+
}
374+
else {
375+
converted = convertPayloads(messages, channel);
376+
}
377+
if (converted != null) {
378+
org.springframework.messaging.Message<?> message = finalize(channel, converted, new HashMap<>(),
379+
messages.get(messages.size() - 1).getMessageProperties().getDeliveryTag());
380+
try {
381+
if (this.retryOps == null) {
382+
setAttributesIfNecessary(messages, message);
383+
sendMessage(message);
384+
}
385+
else {
386+
this.retryOps.execute(
387+
context -> {
388+
StaticMessageHeaderAccessor.getDeliveryAttempt(message).incrementAndGet();
389+
if (this.batchModeMessages) {
390+
@SuppressWarnings("unchecked")
391+
List<org.springframework.messaging.Message<?>> payloads =
392+
(List<org.springframework.messaging.Message<?>>) message.getPayload();
393+
payloads.forEach(payload -> StaticMessageHeaderAccessor
394+
.getDeliveryAttempt(payload).incrementAndGet());
395+
}
396+
setAttributesIfNecessary(messages, message);
397+
sendMessage(message);
398+
return null;
399+
}, this.recoverer);
400+
}
401+
}
402+
finally {
403+
if (this.retryOps == null) {
404+
ATTRIBUTES_HOLDER.remove();
405+
}
406+
}
407+
}
408+
}
409+
410+
private List<org.springframework.messaging.Message<?>> convertMessages(List<Message> messages,
411+
Channel channel) {
412+
413+
List<org.springframework.messaging.Message<?>> converted = new ArrayList<>();
414+
try {
415+
messages.forEach(message -> {
416+
converted.add(createMessage(message, channel));
417+
});
418+
return converted;
419+
}
420+
catch (MessageConversionException e) {
421+
if (getErrorChannel() != null) {
422+
setAttributesIfNecessary(messages, null);
423+
getMessagingTemplate()
424+
.send(getErrorChannel(), buildErrorMessage(null,
425+
EndpointUtils.errorMessagePayload(messages, channel, this.manualAcks, e)));
426+
}
427+
else {
428+
throw e;
429+
}
430+
}
431+
return null;
432+
}
433+
434+
private List<?> convertPayloads(List<Message> messages, Channel channel) {
435+
List<Object> converted = new ArrayList<>();
436+
try {
437+
messages.forEach(message -> {
438+
converted.add(this.converter.fromMessage(message));
439+
});
440+
return converted;
441+
}
442+
catch (MessageConversionException e) {
443+
if (getErrorChannel() != null) {
444+
setAttributesIfNecessary(messages, null);
445+
getMessagingTemplate()
446+
.send(getErrorChannel(), buildErrorMessage(null,
447+
EndpointUtils.errorMessagePayload(messages, channel, this.manualAcks, e)));
448+
}
449+
else {
450+
throw e;
451+
}
452+
}
453+
return null;
300454
}
301455

302456
}

0 commit comments

Comments
 (0)