11/*
2- * Copyright 2014-2017 the original author or authors.
2+ * Copyright 2014-2018 the original author or authors.
33 *
44 * Licensed under the Apache License, Version 2.0 (the "License");
55 * you may not use this file except in compliance with the License.
1616
1717package org .springframework .integration .amqp .dsl ;
1818
19+ import static org .hamcrest .Matchers .instanceOf ;
1920import static org .junit .Assert .assertEquals ;
2021import static org .junit .Assert .assertNotNull ;
22+ import static org .junit .Assert .assertNull ;
2123import static org .junit .Assert .assertSame ;
24+ import static org .junit .Assert .assertThat ;
2225import static org .junit .Assert .assertTrue ;
2326
27+ import java .util .concurrent .atomic .AtomicReference ;
28+
2429import org .junit .jupiter .api .AfterAll ;
2530import org .junit .jupiter .api .Test ;
2631
3742import org .springframework .amqp .rabbit .junit .RabbitAvailableCondition ;
3843import org .springframework .amqp .rabbit .listener .DirectMessageListenerContainer ;
3944import org .springframework .amqp .rabbit .listener .SimpleMessageListenerContainer ;
45+ import org .springframework .amqp .rabbit .listener .exception .ListenerExecutionFailedException ;
46+ import org .springframework .amqp .support .converter .MessageConversionException ;
47+ import org .springframework .amqp .support .converter .SimpleMessageConverter ;
4048import org .springframework .beans .factory .annotation .Autowired ;
4149import org .springframework .beans .factory .annotation .Qualifier ;
50+ import org .springframework .context .ConfigurableApplicationContext ;
4251import org .springframework .context .Lifecycle ;
4352import org .springframework .context .annotation .Bean ;
4453import org .springframework .context .annotation .Configuration ;
4554import org .springframework .integration .amqp .channel .AbstractAmqpChannel ;
4655import org .springframework .integration .amqp .inbound .AmqpInboundGateway ;
4756import org .springframework .integration .amqp .support .AmqpHeaderMapper ;
57+ import org .springframework .integration .amqp .support .AmqpMessageHeaderErrorMessageStrategy ;
4858import org .springframework .integration .amqp .support .DefaultAmqpHeaderMapper ;
4959import org .springframework .integration .channel .QueueChannel ;
5060import org .springframework .integration .config .EnableIntegration ;
5161import org .springframework .integration .dsl .IntegrationFlow ;
5262import org .springframework .integration .dsl .IntegrationFlowBuilder ;
5363import org .springframework .integration .dsl .IntegrationFlows ;
5464import org .springframework .integration .support .MessageBuilder ;
65+ import org .springframework .integration .support .StringObjectMapBuilder ;
5566import org .springframework .integration .test .util .TestUtils ;
5667import org .springframework .messaging .Message ;
5768import org .springframework .messaging .MessageChannel ;
6778 */
6879@ SpringJUnitConfig
6980@ RabbitAvailable (queues = { "amqpOutboundInput" , "amqpReplyChannel" , "asyncReplies" ,
70- "defaultReplyTo" , "si.dsl.test" , "testTemplateChannelTransacted" })
81+ "defaultReplyTo" , "si.dsl.test" , "si.dsl.exception.test.dlq" ,
82+ "si.dsl.conv.exception.test.dlq" , "testTemplateChannelTransacted" })
7183@ DirtiesContext
7284public class AmqpTests {
7385
@@ -92,8 +104,10 @@ public class AmqpTests {
92104 private Lifecycle asyncOutboundGateway ;
93105
94106 @ AfterAll
95- public static void tearDown () {
96- RabbitAvailableCondition .getBrokerRunning ().removeTestQueues ();
107+ public static void tearDown (ConfigurableApplicationContext context ) {
108+ context .stop (); // prevent queues from being redeclared after deletion
109+ RabbitAvailableCondition .getBrokerRunning ().removeTestQueues ("si.dsl.exception.test" ,
110+ "si.dsl.conv.exception.test" );
97111 }
98112
99113 @ Test
@@ -173,6 +187,33 @@ public void testAmqpAsyncOutboundGatewayFlow() throws Exception {
173187 this .asyncOutboundGateway .stop ();
174188 }
175189
190+ @ Autowired
191+ private AtomicReference <ListenerExecutionFailedException > lefe ;
192+
193+ @ Autowired
194+ private AtomicReference <?> raw ;
195+
196+
197+ @ Test
198+ public void testInboundMessagingExceptionFlow () {
199+ this .amqpTemplate .convertAndSend ("si.dsl.exception.test" , "foo" );
200+ assertNotNull (this .amqpTemplate .receive ("si.dsl.exception.test.dlq" , 30_000 ));
201+ assertNull (this .lefe .get ());
202+ assertNotNull (this .raw .get ());
203+ this .raw .set (null );
204+ }
205+
206+ @ Test
207+ public void testInboundConversionExceptionFlow () {
208+ this .amqpTemplate .convertAndSend ("si.dsl.conv.exception.test" , "foo" );
209+ assertNotNull (this .amqpTemplate .receive ("si.dsl.conv.exception.test.dlq" , 30_000 ));
210+ assertNotNull (this .lefe .get ());
211+ assertThat (this .lefe .get ().getCause (), instanceOf (MessageConversionException .class ));
212+ assertNotNull (this .raw .get ());
213+ this .raw .set (null );
214+ this .lefe .set (null );
215+ }
216+
176217 @ Autowired
177218 private AbstractAmqpChannel unitChannel ;
178219
@@ -277,6 +318,84 @@ public IntegrationFlow amqpInboundFlow(ConnectionFactory rabbitConnectionFactory
277318 .get ();
278319 }
279320
321+ @ Bean
322+ public AtomicReference <ListenerExecutionFailedException > lefe () {
323+ return new AtomicReference <>();
324+ }
325+
326+ @ Bean
327+ public AtomicReference <org .springframework .amqp .core .Message > raw () {
328+ return new AtomicReference <>();
329+ }
330+
331+ @ Bean
332+ public Queue exQueue () {
333+ return new Queue ("si.dsl.exception.test" , true , false , false ,
334+ new StringObjectMapBuilder ()
335+ .put ("x-dead-letter-exchange" , "" )
336+ .put ("x-dead-letter-routing-key" , exDLQ ().getName ())
337+ .get ());
338+ }
339+
340+ @ Bean
341+ public Queue exDLQ () {
342+ return new Queue ("si.dsl.exception.test.dlq" );
343+ }
344+
345+ @ Bean
346+ public IntegrationFlow inboundWithExceptionFlow (ConnectionFactory cf ) {
347+ return IntegrationFlows .from (Amqp .inboundAdapter (cf , exQueue ())
348+ .configureContainer (c -> c .defaultRequeueRejected (false ))
349+ .errorChannel ("errors.input" ))
350+ .handle (m -> {
351+ throw new RuntimeException ("fail" );
352+ })
353+ .get ();
354+ }
355+
356+ @ Bean
357+ public IntegrationFlow errors () {
358+ return f -> f .handle (m -> {
359+ raw ().set (m .getHeaders ().get (AmqpMessageHeaderErrorMessageStrategy .AMQP_RAW_MESSAGE ,
360+ org .springframework .amqp .core .Message .class ));
361+ if (m .getPayload () instanceof ListenerExecutionFailedException ) {
362+ lefe ().set ((ListenerExecutionFailedException ) m .getPayload ());
363+ }
364+ throw (RuntimeException ) m .getPayload ();
365+ });
366+ }
367+
368+ @ Bean
369+ public Queue exConvQueue () {
370+ return new Queue ("si.dsl.conv.exception.test" , true , false , false ,
371+ new StringObjectMapBuilder ()
372+ .put ("x-dead-letter-exchange" , "" )
373+ .put ("x-dead-letter-routing-key" , exConvDLQ ().getName ())
374+ .get ());
375+ }
376+
377+ @ Bean
378+ public Queue exConvDLQ () {
379+ return new Queue ("si.dsl.conv.exception.test.dlq" );
380+ }
381+
382+ @ Bean
383+ public IntegrationFlow inboundWithConvExceptionFlow (ConnectionFactory cf ) {
384+ return IntegrationFlows .from (Amqp .inboundAdapter (cf , exConvQueue ())
385+ .configureContainer (c -> c .defaultRequeueRejected (false ))
386+ .messageConverter (new SimpleMessageConverter () {
387+
388+ @ Override
389+ public Object fromMessage (org .springframework .amqp .core .Message message )
390+ throws MessageConversionException {
391+ throw new MessageConversionException ("fail" );
392+ }
393+
394+ })
395+ .errorChannel ("errors.input" ))
396+ .get ();
397+ }
398+
280399 @ Bean
281400 public Queue asyncReplies () {
282401 return new Queue ("asyncReplies" );
0 commit comments