Skip to content

Commit 8cf6ff0

Browse files
authored
AMQP: Support CorrelationData Message Headers
In preparation for spring-attic/spring-cloud-stream-binder-rabbit#303 Now that `CorrelationData` has a `Future<?>`, users might simply add correlation data in a header and not receive confirm/return messages. - No longer require channels for returns and confirms - don't build the confirm message if there are no channels - reduce the log level for no channels to DEBUG - complete the user's future when a message is returned (async GW) * Fix typo.
1 parent f552294 commit 8cf6ff0

File tree

6 files changed

+161
-28
lines changed

6 files changed

+161
-28
lines changed

spring-integration-amqp/src/main/java/org/springframework/integration/amqp/outbound/AbstractAmqpOutboundEndpoint.java

Lines changed: 32 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@
6363
public abstract class AbstractAmqpOutboundEndpoint extends AbstractReplyProducingMessageHandler
6464
implements Lifecycle {
6565

66-
private static final UUID NO_ID = new UUID(0L, 0L);
66+
private static final String NO_ID = new UUID(0L, 0L).toString();
6767

6868
private String exchangeName;
6969

@@ -548,20 +548,33 @@ public boolean isRunning() {
548548

549549
protected CorrelationData generateCorrelationData(Message<?> requestMessage) {
550550
CorrelationData correlationData = null;
551+
UUID uuid = requestMessage.getHeaders().getId();
552+
String messageId;
553+
if (uuid == null) {
554+
messageId = NO_ID;
555+
}
556+
else {
557+
messageId = uuid.toString();
558+
}
551559
if (this.correlationDataGenerator != null) {
552-
UUID messageId = requestMessage.getHeaders().getId();
553-
if (messageId == null) {
554-
messageId = NO_ID;
555-
}
556560
Object userData = this.correlationDataGenerator.processMessage(requestMessage);
557561
if (userData != null) {
558-
correlationData = new CorrelationDataWrapper(messageId.toString(), userData, requestMessage);
562+
correlationData = new CorrelationDataWrapper(messageId, userData, requestMessage);
559563
}
560564
else {
561565
this.logger.debug("'confirmCorrelationExpression' resolved to 'null'; "
562566
+ "no publisher confirm will be sent to the ack or nack channel");
563567
}
564568
}
569+
if (correlationData == null) {
570+
Object correlation = requestMessage.getHeaders().get(AmqpHeaders.PUBLISH_CONFIRM_CORRELATION);
571+
if (correlation instanceof CorrelationData) {
572+
correlationData = (CorrelationData) correlation;
573+
}
574+
if (correlationData != null) {
575+
correlationData = new CorrelationDataWrapper(messageId, correlationData, requestMessage);
576+
}
577+
}
565578
return correlationData;
566579
}
567580

@@ -652,19 +665,21 @@ protected void handleConfirm(CorrelationData correlationData, boolean ack, Strin
652665
return;
653666
}
654667
Object userCorrelationData = wrapper.getUserData();
655-
Message<?> confirmMessage;
656-
confirmMessage = buildConfirmMessage(ack, cause, wrapper, userCorrelationData);
657-
if (ack && getConfirmAckChannel() != null) {
658-
sendOutput(confirmMessage, getConfirmAckChannel(), true);
659-
}
660-
else if (!ack && getConfirmNackChannel() != null) {
661-
sendOutput(confirmMessage, getConfirmNackChannel(), true);
668+
MessageChannel ackChannel = getConfirmAckChannel();
669+
if (ack && ackChannel != null) {
670+
sendOutput(buildConfirmMessage(ack, cause, wrapper, userCorrelationData), ackChannel, true);
662671
}
663672
else {
664-
if (logger.isInfoEnabled()) {
665-
logger.info("Nowhere to send publisher confirm "
666-
+ (ack ? "ack" : "nack") + " for "
667-
+ userCorrelationData);
673+
MessageChannel nackChannel = getConfirmNackChannel();
674+
if (!ack && nackChannel != null) {
675+
sendOutput(buildConfirmMessage(ack, cause, wrapper, userCorrelationData), nackChannel, true);
676+
}
677+
else {
678+
if (logger.isDebugEnabled()) {
679+
logger.debug("Nowhere to send publisher confirm "
680+
+ (ack ? "ack" : "nack") + " for "
681+
+ userCorrelationData);
682+
}
668683
}
669684
}
670685
}

spring-integration-amqp/src/main/java/org/springframework/integration/amqp/outbound/AsyncAmqpOutboundGateway.java

Lines changed: 17 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -22,12 +22,14 @@
2222
import org.springframework.amqp.rabbit.AsyncRabbitTemplate;
2323
import org.springframework.amqp.rabbit.AsyncRabbitTemplate.RabbitMessageFuture;
2424
import org.springframework.amqp.rabbit.connection.CorrelationData;
25+
import org.springframework.amqp.rabbit.connection.CorrelationData.Confirm;
2526
import org.springframework.amqp.rabbit.core.RabbitTemplate;
2627
import org.springframework.amqp.support.converter.MessageConverter;
2728
import org.springframework.integration.amqp.support.MappingUtils;
2829
import org.springframework.integration.handler.ReplyRequiredException;
2930
import org.springframework.integration.support.AbstractIntegrationMessageBuilder;
3031
import org.springframework.messaging.Message;
32+
import org.springframework.messaging.MessageChannel;
3133
import org.springframework.messaging.MessageHandlingException;
3234
import org.springframework.messaging.MessagingException;
3335
import org.springframework.util.Assert;
@@ -87,20 +89,23 @@ protected Object handleRequestMessage(Message<?> requestMessage) {
8789
addDelayProperty(requestMessage, amqpMessage);
8890
RabbitMessageFuture future = this.template.sendAndReceive(generateExchangeName(requestMessage),
8991
generateRoutingKey(requestMessage), amqpMessage);
90-
future.addCallback(new FutureCallback(requestMessage));
9192
CorrelationData correlationData = generateCorrelationData(requestMessage);
9293
if (correlationData != null && future.getConfirm() != null) {
9394
future.getConfirm().addCallback(new CorrelationCallback(correlationData, future));
9495
}
96+
future.addCallback(new FutureCallback(requestMessage, correlationData));
9597
return null;
9698
}
9799

98100
private final class FutureCallback implements ListenableFutureCallback<org.springframework.amqp.core.Message> {
99101

100102
private final Message<?> requestMessage;
101103

102-
FutureCallback(Message<?> requestMessage) {
104+
private final CorrelationDataWrapper correlationData;
105+
106+
FutureCallback(Message<?> requestMessage, CorrelationData correlationData) {
103107
this.requestMessage = requestMessage;
108+
this.correlationData = (CorrelationDataWrapper) correlationData;
104109
}
105110

106111
@Override
@@ -141,18 +146,21 @@ public void onFailure(Throwable ex) {
141146
}
142147
}
143148
if (ex instanceof AmqpMessageReturnedException) {
144-
if (getReturnChannel() == null) {
145-
logger.error("Returned message received and no return channel "
146-
+ ((AmqpMessageReturnedException) ex).getReturnedMessage());
147-
}
148-
else {
149-
AmqpMessageReturnedException amre = (AmqpMessageReturnedException) ex;
149+
AmqpMessageReturnedException amre = (AmqpMessageReturnedException) ex;
150+
MessageChannel returnChannel = getReturnChannel();
151+
if (returnChannel != null) {
150152
Message<?> returnedMessage = buildReturnedMessage(
151153
new ReturnedMessage(amre.getReturnedMessage(), amre.getReplyCode(), amre.getReplyText(),
152154
amre.getExchange(), amre.getRoutingKey()),
153155
AsyncAmqpOutboundGateway.this.messageConverter);
154-
sendOutput(returnedMessage, getReturnChannel(), true);
156+
sendOutput(returnedMessage, returnChannel, true);
155157
}
158+
this.correlationData.setReturnedMessage(amre.getReturnedMessage());
159+
/*
160+
* Complete the user's future (if present) since the async template will only complete
161+
* once, successfully, or with a failure.
162+
*/
163+
this.correlationData.getFuture().set(new Confirm(true, null));
156164
}
157165
else {
158166
sendErrorMessage(this.requestMessage, exceptionToSend);

spring-integration-amqp/src/test/java/org/springframework/integration/amqp/outbound/AmqpOutboundEndpointTests2.java

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2019 the original author or authors.
2+
* Copyright 2019-2020 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -20,6 +20,7 @@
2020
import static org.assertj.core.api.Assertions.assertThatThrownBy;
2121

2222
import java.util.Collections;
23+
import java.util.concurrent.TimeUnit;
2324

2425
import org.junit.jupiter.api.Test;
2526

@@ -29,16 +30,19 @@
2930
import org.springframework.amqp.core.QueueBuilder.Overflow;
3031
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
3132
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
33+
import org.springframework.amqp.rabbit.connection.CorrelationData;
3234
import org.springframework.amqp.rabbit.core.RabbitAdmin;
3335
import org.springframework.amqp.rabbit.core.RabbitTemplate;
3436
import org.springframework.amqp.rabbit.junit.RabbitAvailable;
3537
import org.springframework.amqp.rabbit.junit.RabbitAvailableCondition;
38+
import org.springframework.amqp.support.AmqpHeaders;
3639
import org.springframework.beans.factory.annotation.Autowired;
3740
import org.springframework.context.annotation.Bean;
3841
import org.springframework.context.annotation.Configuration;
3942
import org.springframework.integration.amqp.dsl.Amqp;
4043
import org.springframework.integration.config.EnableIntegration;
4144
import org.springframework.integration.dsl.IntegrationFlow;
45+
import org.springframework.integration.support.MessageBuilder;
4246
import org.springframework.messaging.MessageHandlingException;
4347
import org.springframework.messaging.support.GenericMessage;
4448
import org.springframework.test.annotation.DirtiesContext;
@@ -74,6 +78,17 @@ void testWithReturn(@Autowired IntegrationFlow flow) {
7478
.isEqualTo("Message was returned by the broker");
7579
}
7680

81+
@Test
82+
void testReturnConfirmNoChannels(@Autowired IntegrationFlow flow2) throws Exception {
83+
CorrelationData corr = new CorrelationData("foo");
84+
flow2.getInputChannel().send(MessageBuilder.withPayload("test")
85+
.setHeader("rk", "junkjunk")
86+
.setHeader(AmqpHeaders.PUBLISH_CONFIRM_CORRELATION, corr)
87+
.build());
88+
assertThat(corr.getFuture().get(10, TimeUnit.SECONDS).isAck()).isTrue();
89+
assertThat(corr.getReturnedMessage()).isNotNull();
90+
}
91+
7792
@Test
7893
@DisabledIf("#{systemEnvironment['TRAVIS'] ?: false}")
7994
// needs RabbitMQ 3.7
@@ -106,6 +121,13 @@ public IntegrationFlow flow(RabbitTemplate template) {
106121
.waitForConfirm(true));
107122
}
108123

124+
@Bean
125+
public IntegrationFlow flow2(RabbitTemplate template) {
126+
return f -> f.handle(Amqp.outboundAdapter(template)
127+
.exchangeName("")
128+
.routingKeyFunction(msg -> msg.getHeaders().get("rk", String.class)));
129+
}
130+
109131
@Bean
110132
public CachingConnectionFactory cf() {
111133
CachingConnectionFactory ccf = new CachingConnectionFactory(

spring-integration-amqp/src/test/java/org/springframework/integration/amqp/outbound/AsyncAmqpGatewayTests.java

Lines changed: 37 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2019 the original author or authors.
2+
* Copyright 2016-2020 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -37,6 +37,7 @@
3737
import org.springframework.amqp.rabbit.AsyncRabbitTemplate;
3838
import org.springframework.amqp.rabbit.AsyncRabbitTemplate.RabbitMessageFuture;
3939
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
40+
import org.springframework.amqp.rabbit.connection.CorrelationData;
4041
import org.springframework.amqp.rabbit.core.RabbitTemplate;
4142
import org.springframework.amqp.rabbit.junit.RabbitAvailable;
4243
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
@@ -49,6 +50,7 @@
4950
import org.springframework.integration.amqp.support.NackedAmqpMessageException;
5051
import org.springframework.integration.amqp.support.ReturnedAmqpMessageException;
5152
import org.springframework.integration.channel.DirectChannel;
53+
import org.springframework.integration.channel.NullChannel;
5254
import org.springframework.integration.channel.QueueChannel;
5355
import org.springframework.integration.support.MessageBuilder;
5456
import org.springframework.integration.test.condition.LogLevels;
@@ -220,4 +222,38 @@ void testConfirmsAndReturns() throws Exception {
220222
ccf.destroy();
221223
}
222224

225+
@Test
226+
void confirmsAndReturnsNoChannels() throws Exception {
227+
CachingConnectionFactory ccf = new CachingConnectionFactory("localhost");
228+
ccf.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.CORRELATED);
229+
ccf.setPublisherReturns(true);
230+
RabbitTemplate template = new RabbitTemplate(ccf);
231+
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(ccf);
232+
container.setBeanName("replyContainer");
233+
container.setQueueNames("asyncRQ1");
234+
container.afterPropertiesSet();
235+
container.start();
236+
AsyncRabbitTemplate asyncTemplate = new AsyncRabbitTemplate(template, container);
237+
asyncTemplate.setEnableConfirms(true);
238+
asyncTemplate.setMandatory(true);
239+
240+
AsyncAmqpOutboundGateway gateway = new AsyncAmqpOutboundGateway(asyncTemplate);
241+
gateway.setOutputChannel(new NullChannel());
242+
gateway.setExchangeName("");
243+
gateway.setRoutingKey("noRoute");
244+
gateway.setBeanFactory(mock(BeanFactory.class));
245+
gateway.afterPropertiesSet();
246+
gateway.start();
247+
248+
CorrelationData corr = new CorrelationData("foo");
249+
gateway.handleMessage(MessageBuilder.withPayload("test")
250+
.setHeader(AmqpHeaders.PUBLISH_CONFIRM_CORRELATION, corr)
251+
.build());
252+
assertThat(corr.getFuture().get(10, TimeUnit.SECONDS).isAck()).isTrue();
253+
assertThat(corr.getReturnedMessage()).isNotNull();
254+
255+
asyncTemplate.stop();
256+
ccf.destroy();
257+
}
258+
223259
}

0 commit comments

Comments
 (0)