Skip to content

Commit da5d002

Browse files
garyrussellartembilan
authored andcommitted
AMQP: Batch mode option to receive element headers
In preparation for spring-attic/spring-cloud-stream-binder-rabbit#290 SCSt is not set up to receive a `Message<Message<?>>`. Similar to the kafka endpoint, add the batched headers in a header.
1 parent 094eb2e commit da5d002

File tree

4 files changed

+36
-5
lines changed

4 files changed

+36
-5
lines changed

spring-integration-amqp/src/main/java/org/springframework/integration/amqp/inbound/AmqpInboundChannelAdapter.java

Lines changed: 29 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343
import org.springframework.integration.context.OrderlyShutdownCapable;
4444
import org.springframework.integration.endpoint.MessageProducerSupport;
4545
import org.springframework.integration.support.ErrorMessageUtils;
46+
import org.springframework.lang.Nullable;
4647
import org.springframework.messaging.MessageChannel;
4748
import org.springframework.retry.RecoveryCallback;
4849
import org.springframework.retry.RetryOperations;
@@ -65,6 +66,12 @@
6566
public class AmqpInboundChannelAdapter extends MessageProducerSupport implements
6667
OrderlyShutdownCapable {
6768

69+
/**
70+
* Header containing {@code List<Map<String, Object>} headers when batch mode
71+
* is {@link BatchMode#EXTRACT_PAYLOADS_WITH_HEADERS}.
72+
*/
73+
public static final String CONSOLIDATED_HEADERS = AmqpHeaders.PREFIX + "batchedHeaders";
74+
6875
/**
6976
* Defines the payload type when the listener container is configured with consumerBatchEnabled.
7077
*/
@@ -80,7 +87,14 @@ public enum BatchMode {
8087
* Payload is a {@code List<?>} where each element is the converted body of the
8188
* Spring AMQP Message.
8289
*/
83-
EXTRACT_PAYLOADS
90+
EXTRACT_PAYLOADS,
91+
92+
/**
93+
* Payload is a {@code List<?>} where each element is the converted body of the
94+
* Spring AMQP Message. The headers for each message are provided in a header
95+
* {@link AmqpInboundChannelAdapter#CONSOLIDATED_HEADERS}.
96+
*/
97+
EXTRACT_PAYLOADS_WITH_HEADERS
8498

8599
}
86100

@@ -332,7 +346,7 @@ protected org.springframework.messaging.Message<Object> createMessageFromAmqp(Me
332346
headers.put(IntegrationMessageHeaderAccessor.SOURCE_DATA, message);
333347
}
334348
long deliveryTag = message.getMessageProperties().getDeliveryTag();
335-
return createMessageFromPayload(payload, channel, headers, deliveryTag);
349+
return createMessageFromPayload(payload, channel, headers, deliveryTag, null);
336350
}
337351

338352
protected Object convertPayload(Message message) {
@@ -350,7 +364,8 @@ protected Object convertPayload(Message message) {
350364
}
351365

352366
protected org.springframework.messaging.Message<Object> createMessageFromPayload(Object payload,
353-
Channel channel, Map<String, Object> headers, long deliveryTag) {
367+
Channel channel, Map<String, Object> headers, long deliveryTag,
368+
@Nullable List<Map<String, Object>> listHeaders) {
354369

355370
if (this.manualAcks) {
356371
headers.put(AmqpHeaders.DELIVERY_TAG, deliveryTag);
@@ -359,6 +374,9 @@ protected org.springframework.messaging.Message<Object> createMessageFromPayload
359374
if (AmqpInboundChannelAdapter.this.retryTemplate != null) {
360375
headers.put(IntegrationMessageHeaderAccessor.DELIVERY_ATTEMPT, new AtomicInteger());
361376
}
377+
if (listHeaders != null) {
378+
headers.put(CONSOLIDATED_HEADERS, listHeaders);
379+
}
362380
return getMessageBuilderFactory()
363381
.withPayload(payload)
364382
.copyHeaders(headers)
@@ -374,16 +392,23 @@ protected class BatchListener extends Listener implements ChannelAwareBatchMessa
374392
@Override
375393
public void onMessageBatch(List<Message> messages, Channel channel) {
376394
List<?> converted;
395+
List<Map<String, Object>> headers = null;
377396
if (this.batchModeMessages) {
378397
converted = convertMessages(messages, channel);
379398
}
380399
else {
381400
converted = convertPayloads(messages, channel);
401+
if (BatchMode.EXTRACT_PAYLOADS_WITH_HEADERS.equals(AmqpInboundChannelAdapter.this.batchMode)) {
402+
List<Map<String, Object>> listHeaders = new ArrayList<>();
403+
messages.forEach(msg -> listHeaders.add(AmqpInboundChannelAdapter.this.headerMapper
404+
.toHeadersFromRequest(msg.getMessageProperties())));
405+
headers = listHeaders;
406+
}
382407
}
383408
if (converted != null) {
384409
org.springframework.messaging.Message<?> message =
385410
createMessageFromPayload(converted, channel, new HashMap<>(),
386-
messages.get(messages.size() - 1).getMessageProperties().getDeliveryTag());
411+
messages.get(messages.size() - 1).getMessageProperties().getDeliveryTag(), headers);
387412
try {
388413
if (this.retryOps == null) {
389414
setAttributesIfNecessary(messages, message);

spring-integration-amqp/src/test/java/org/springframework/integration/amqp/inbound/InboundEndpointTests.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -449,7 +449,7 @@ public void testConsumerBatchExtract() throws Exception {
449449
AmqpInboundChannelAdapter adapter = new AmqpInboundChannelAdapter(container);
450450
QueueChannel out = new QueueChannel();
451451
adapter.setOutputChannel(out);
452-
adapter.setBatchMode(BatchMode.EXTRACT_PAYLOADS);
452+
adapter.setBatchMode(BatchMode.EXTRACT_PAYLOADS_WITH_HEADERS);
453453
adapter.afterPropertiesSet();
454454
ChannelAwareBatchMessageListener listener = (ChannelAwareBatchMessageListener) container.getMessageListener();
455455
MessageProperties messageProperties = new MessageProperties();
@@ -461,6 +461,8 @@ public void testConsumerBatchExtract() throws Exception {
461461
Message<?> received = out.receive(0);
462462
assertThat(received).isNotNull();
463463
assertThat(((List<String>) received.getPayload())).contains("test1", "test2");
464+
assertThat(received.getHeaders().get(AmqpInboundChannelAdapter.CONSOLIDATED_HEADERS, List.class))
465+
.hasSize(2);
464466
}
465467

466468
@SuppressWarnings({ "unchecked" })

src/reference/asciidoc/amqp.adoc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -171,6 +171,7 @@ See the https://docs.spring.io/spring-amqp/reference/html/[Spring AMQP Reference
171171
<27> When the container's `consumerBatchEnabled` is `true`, determines how the adapter presents the batch of messages in the message payload.
172172
When set to `MESSAGES` (default), the payload is a `List<Message<?>>` where each message has headers mapped from the incoming AMQP `Message` and the payload is the converted `body`.
173173
When set to `EXTRACT_PAYLOADS`, the payload is a `List<?>` where the elements are converted from the AMQP `Message` body.
174+
`EXTRACT_PAYLOADS_WITH_HEADERS` is similar to `EXTRACT_PAYLOADS` but, in addition, the headers from each message are mapped from the `MessageProperties` into a `List<Map<String, Object>` at the corresponding index; the header name is `AmqpInboundChannelAdapter.CONSOLIDATED_HEADERS`.
174175
====
175176

176177
[NOTE]

src/reference/asciidoc/whats-new.adoc

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,3 +65,6 @@ See <<./rmi.adoc#rmi, RMI Support>> for more information.
6565

6666
The outbound endpoints now have a new mechanism for handling publisher confirms and returns.
6767
See <<./amqp.adoc#alternative-confirms-returns,Alternative Mechanism for Publisher Confirms and Returns>> for more information.
68+
69+
A new `BatchMode.EXTRACT_PAYLOAD_WITH_HEADERS` is supported by the `AmqpInboundChannelAdapter`.
70+
See <<./amqp.adoc#amqp-inbound-channel-adapter,Inbound Channel Adapter>> for more information.

0 commit comments

Comments
 (0)