Skip to content

Commit e6489ab

Browse files
garyrussellartembilan
authored andcommitted
INT-4526: Fix Channel Interceptor NonNullApi Call
JIRA: https://jira.spring.io/browse/INT-4526 - `Message<?>` cannot be null in `postReceive()`. Polishing - PR Comments * Polishing code style * Fix `PollableJmsChannel` * Increase receive timeout in the `PollableJmsChannel`
1 parent 4b9eaeb commit e6489ab

File tree

7 files changed

+107
-92
lines changed

7 files changed

+107
-92
lines changed

spring-integration-amqp/src/main/java/org/springframework/integration/amqp/channel/PollableAmqpChannel.java

Lines changed: 21 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2017 the original author or authors.
2+
* Copyright 2002-2018 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -52,7 +52,7 @@ public class PollableAmqpChannel extends AbstractAmqpChannel
5252

5353
private final String channelName;
5454

55-
private volatile Queue queue;
55+
private Queue queue;
5656

5757
private volatile int executorInterceptorsSize;
5858

@@ -188,30 +188,33 @@ protected Message<?> doReceive(Long timeout) {
188188
}
189189
}
190190
Object object = performReceive(timeout);
191+
Message<?> message = null;
191192
if (object == null) {
192193
if (isLoggingEnabled() && logger.isTraceEnabled()) {
193194
logger.trace("postReceive on channel '" + this + "', message is null");
194195
}
195-
return null;
196-
}
197-
if (countsEnabled) {
198-
getMetrics().afterReceive();
199-
counted = true;
200-
}
201-
Message<?> message;
202-
if (object instanceof Message<?>) {
203-
message = (Message<?>) object;
204196
}
205197
else {
206-
message = getMessageBuilderFactory()
207-
.withPayload(object)
208-
.build();
209-
}
210-
if (isLoggingEnabled() && logger.isDebugEnabled()) {
211-
logger.debug("postReceive on channel '" + this + "', message: " + message);
198+
if (countsEnabled) {
199+
getMetrics().afterReceive();
200+
counted = true;
201+
}
202+
if (object instanceof Message<?>) {
203+
message = (Message<?>) object;
204+
}
205+
else {
206+
message = getMessageBuilderFactory()
207+
.withPayload(object)
208+
.build();
209+
}
210+
if (isLoggingEnabled() && logger.isDebugEnabled()) {
211+
logger.debug("postReceive on channel '" + this + "', message: " + message);
212+
}
212213
}
213214
if (interceptorStack != null) {
214-
message = interceptorList.postReceive(message, this);
215+
if (message != null) {
216+
message = interceptorList.postReceive(message, this);
217+
}
215218
interceptorList.afterReceiveCompletion(message, this, null, interceptorStack);
216219
}
217220
return message;

spring-integration-core/src/main/java/org/springframework/integration/channel/AbstractPollableChannel.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,7 @@ public Message<?> receive(long timeout) {
108108
return null;
109109
}
110110
}
111-
Message<?> message = this.doReceive(timeout);
111+
Message<?> message = doReceive(timeout);
112112
if (countsEnabled && message != null) {
113113
if (getMetricsCaptor() != null) {
114114
incrementReceiveCounter();
@@ -123,7 +123,9 @@ else if (logger.isTraceEnabled()) {
123123
logger.trace("postReceive on channel '" + this + "', message is null");
124124
}
125125
if (!CollectionUtils.isEmpty(interceptorStack)) {
126-
message = interceptorList.postReceive(message, this);
126+
if (message != null) {
127+
message = interceptorList.postReceive(message, this);
128+
}
127129
interceptorList.afterReceiveCompletion(message, this, null, interceptorStack);
128130
}
129131
return message;

spring-integration-core/src/test/java/org/springframework/integration/channel/interceptor/ChannelInterceptorTests.java

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -208,29 +208,23 @@ public void testPreReceiveInterceptorReturnsFalse() {
208208

209209
@Test
210210
public void testPostReceiveInterceptor() {
211-
final AtomicInteger invokedCount = new AtomicInteger();
212211
final AtomicInteger messageCount = new AtomicInteger();
213212
channel.addInterceptor(new ChannelInterceptor() {
214213

215214
@Override
216215
public Message<?> postReceive(Message<?> message, MessageChannel channel) {
217216
assertNotNull(channel);
218217
assertSame(ChannelInterceptorTests.this.channel, channel);
219-
if (message != null) {
220-
messageCount.incrementAndGet();
221-
}
222-
invokedCount.incrementAndGet();
218+
messageCount.incrementAndGet();
223219
return message;
224220
}
225221

226222
});
227223
channel.receive(0);
228-
assertEquals(1, invokedCount.get());
229224
assertEquals(0, messageCount.get());
230225
channel.send(new GenericMessage<String>("test"));
231226
Message<?> result = channel.receive(0);
232227
assertNotNull(result);
233-
assertEquals(2, invokedCount.get());
234228
assertEquals(1, messageCount.get());
235229
}
236230

spring-integration-core/src/test/java/org/springframework/integration/gateway/GatewayInterfaceTests.java

Lines changed: 43 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -155,7 +155,8 @@ public class GatewayInterfaceTests {
155155

156156
@Test
157157
public void testWithServiceSuperclassAnnotatedMethod() throws Exception {
158-
ConfigurableApplicationContext ac = new ClassPathXmlApplicationContext("GatewayInterfaceTests-context.xml", this.getClass());
158+
ConfigurableApplicationContext ac = new ClassPathXmlApplicationContext("GatewayInterfaceTests-context.xml", this
159+
.getClass());
159160
DirectChannel channel = ac.getBean("requestChannelFoo", DirectChannel.class);
160161
final Method fooMethod = Foo.class.getMethod("foo", String.class);
161162
final AtomicBoolean called = new AtomicBoolean();
@@ -180,7 +181,8 @@ public void testWithServiceSuperclassAnnotatedMethod() throws Exception {
180181

181182
@Test
182183
public void testWithServiceSuperclassAnnotatedMethodOverridePE() throws Exception {
183-
ConfigurableApplicationContext ac = new ClassPathXmlApplicationContext("GatewayInterfaceTests2-context.xml", this.getClass());
184+
ConfigurableApplicationContext ac = new ClassPathXmlApplicationContext("GatewayInterfaceTests2-context.xml", this
185+
.getClass());
184186
DirectChannel channel = ac.getBean("requestChannelFoo", DirectChannel.class);
185187
final Method fooMethod = Foo.class.getMethod("foo", String.class);
186188
final AtomicBoolean called = new AtomicBoolean();
@@ -202,7 +204,8 @@ public void testWithServiceSuperclassAnnotatedMethodOverridePE() throws Exceptio
202204

203205
@Test
204206
public void testWithServiceAnnotatedMethod() {
205-
ConfigurableApplicationContext ac = new ClassPathXmlApplicationContext("GatewayInterfaceTests-context.xml", this.getClass());
207+
ConfigurableApplicationContext ac = new ClassPathXmlApplicationContext("GatewayInterfaceTests-context.xml", this
208+
.getClass());
206209
DirectChannel channel = ac.getBean("requestChannelBar", DirectChannel.class);
207210
MessageHandler handler = mock(MessageHandler.class);
208211
channel.subscribe(handler);
@@ -214,17 +217,17 @@ public void testWithServiceAnnotatedMethod() {
214217

215218
@Test
216219
public void testWithServiceSuperclassUnAnnotatedMethod() throws Exception {
217-
ConfigurableApplicationContext ac = new ClassPathXmlApplicationContext("GatewayInterfaceTests-context.xml", this.getClass());
220+
ConfigurableApplicationContext ac = new ClassPathXmlApplicationContext("GatewayInterfaceTests-context.xml", this
221+
.getClass());
218222
DirectChannel channel = ac.getBean("requestChannelBaz", DirectChannel.class);
219223
final Method bazMethod = Foo.class.getMethod("baz", String.class);
220224
final AtomicBoolean called = new AtomicBoolean();
221225
MessageHandler handler = message -> {
222-
assertThat((String) message.getHeaders().get("name"), equalTo("overrideGlobal"));
223-
assertThat(
224-
(String) message.getHeaders().get("string"),
226+
assertThat(message.getHeaders().get("name"), equalTo("overrideGlobal"));
227+
assertThat(message.getHeaders().get("string"),
225228
equalTo("public abstract void org.springframework.integration.gateway.GatewayInterfaceTests$Foo.baz(java.lang.String)"));
226-
assertThat((Method) message.getHeaders().get("object"), equalTo(bazMethod));
227-
assertThat((String) message.getPayload(), equalTo("hello"));
229+
assertThat(message.getHeaders().get("object"), equalTo(bazMethod));
230+
assertThat(message.getPayload(), equalTo("hello"));
228231
called.set(true);
229232
};
230233
channel.subscribe(handler);
@@ -236,17 +239,17 @@ public void testWithServiceSuperclassUnAnnotatedMethod() throws Exception {
236239

237240
@Test
238241
public void testWithServiceUnAnnotatedMethodGlobalHeaderDoesntOverride() throws Exception {
239-
ConfigurableApplicationContext ac = new ClassPathXmlApplicationContext("GatewayInterfaceTests-context.xml", this.getClass());
242+
ConfigurableApplicationContext ac = new ClassPathXmlApplicationContext("GatewayInterfaceTests-context.xml", this
243+
.getClass());
240244
DirectChannel channel = ac.getBean("requestChannelBaz", DirectChannel.class);
241245
final Method quxMethod = Bar.class.getMethod("qux", String.class, String.class);
242246
final AtomicBoolean called = new AtomicBoolean();
243247
MessageHandler handler = message -> {
244-
assertThat((String) message.getHeaders().get("name"), equalTo("arg1"));
245-
assertThat(
246-
(String) message.getHeaders().get("string"),
248+
assertThat(message.getHeaders().get("name"), equalTo("arg1"));
249+
assertThat(message.getHeaders().get("string"),
247250
equalTo("public abstract void org.springframework.integration.gateway.GatewayInterfaceTests$Bar.qux(java.lang.String,java.lang.String)"));
248-
assertThat((Method) message.getHeaders().get("object"), equalTo(quxMethod));
249-
assertThat((String) message.getPayload(), equalTo("hello"));
251+
assertThat(message.getHeaders().get("object"), equalTo(quxMethod));
252+
assertThat(message.getPayload(), equalTo("hello"));
250253
called.set(true);
251254
};
252255
channel.subscribe(handler);
@@ -258,7 +261,8 @@ public void testWithServiceUnAnnotatedMethodGlobalHeaderDoesntOverride() throws
258261

259262
@Test
260263
public void testWithServiceCastAsSuperclassAnnotatedMethod() {
261-
ConfigurableApplicationContext ac = new ClassPathXmlApplicationContext("GatewayInterfaceTests-context.xml", this.getClass());
264+
ConfigurableApplicationContext ac = new ClassPathXmlApplicationContext("GatewayInterfaceTests-context.xml", this
265+
.getClass());
262266
DirectChannel channel = ac.getBean("requestChannelFoo", DirectChannel.class);
263267
MessageHandler handler = mock(MessageHandler.class);
264268
channel.subscribe(handler);
@@ -270,7 +274,8 @@ public void testWithServiceCastAsSuperclassAnnotatedMethod() {
270274

271275
@Test
272276
public void testWithServiceCastAsSuperclassUnAnnotatedMethod() {
273-
ConfigurableApplicationContext ac = new ClassPathXmlApplicationContext("GatewayInterfaceTests-context.xml", this.getClass());
277+
ConfigurableApplicationContext ac = new ClassPathXmlApplicationContext("GatewayInterfaceTests-context.xml", this
278+
.getClass());
274279
DirectChannel channel = ac.getBean("requestChannelBaz", DirectChannel.class);
275280
MessageHandler handler = mock(MessageHandler.class);
276281
channel.subscribe(handler);
@@ -281,8 +286,9 @@ public void testWithServiceCastAsSuperclassUnAnnotatedMethod() {
281286
}
282287

283288
@Test
284-
public void testWithServiceHashcode() throws Exception {
285-
ConfigurableApplicationContext ac = new ClassPathXmlApplicationContext("GatewayInterfaceTests-context.xml", this.getClass());
289+
public void testWithServiceHashcode() {
290+
ConfigurableApplicationContext ac = new ClassPathXmlApplicationContext("GatewayInterfaceTests-context.xml", this
291+
.getClass());
286292
DirectChannel channel = ac.getBean("requestChannelBaz", DirectChannel.class);
287293
MessageHandler handler = mock(MessageHandler.class);
288294
channel.subscribe(handler);
@@ -294,7 +300,8 @@ public void testWithServiceHashcode() throws Exception {
294300

295301
@Test
296302
public void testWithServiceToString() {
297-
ConfigurableApplicationContext ac = new ClassPathXmlApplicationContext("GatewayInterfaceTests-context.xml", this.getClass());
303+
ConfigurableApplicationContext ac = new ClassPathXmlApplicationContext("GatewayInterfaceTests-context.xml", this
304+
.getClass());
298305
DirectChannel channel = ac.getBean("requestChannelBaz", DirectChannel.class);
299306
MessageHandler handler = mock(MessageHandler.class);
300307
channel.subscribe(handler);
@@ -306,7 +313,8 @@ public void testWithServiceToString() {
306313

307314
@Test
308315
public void testWithServiceEquals() throws Exception {
309-
ConfigurableApplicationContext ac = new ClassPathXmlApplicationContext("GatewayInterfaceTests-context.xml", this.getClass());
316+
ConfigurableApplicationContext ac = new ClassPathXmlApplicationContext("GatewayInterfaceTests-context.xml", this
317+
.getClass());
310318
DirectChannel channel = ac.getBean("requestChannelBaz", DirectChannel.class);
311319
MessageHandler handler = mock(MessageHandler.class);
312320
channel.subscribe(handler);
@@ -326,7 +334,8 @@ public void testWithServiceEquals() throws Exception {
326334

327335
@Test
328336
public void testWithServiceGetClass() {
329-
ConfigurableApplicationContext ac = new ClassPathXmlApplicationContext("GatewayInterfaceTests-context.xml", this.getClass());
337+
ConfigurableApplicationContext ac = new ClassPathXmlApplicationContext("GatewayInterfaceTests-context.xml", this
338+
.getClass());
330339
DirectChannel channel = ac.getBean("requestChannelBaz", DirectChannel.class);
331340
MessageHandler handler = mock(MessageHandler.class);
332341
channel.subscribe(handler);
@@ -343,7 +352,8 @@ public void testWithServiceAsNotAnInterface() {
343352

344353
@Test
345354
public void testWithCustomMapper() {
346-
ConfigurableApplicationContext ac = new ClassPathXmlApplicationContext("GatewayInterfaceTests-context.xml", this.getClass());
355+
ConfigurableApplicationContext ac = new ClassPathXmlApplicationContext("GatewayInterfaceTests-context.xml", this
356+
.getClass());
347357
DirectChannel channel = ac.getBean("requestChannelBaz", DirectChannel.class);
348358
final AtomicBoolean called = new AtomicBoolean();
349359
MessageHandler handler = message -> {
@@ -358,14 +368,14 @@ public void testWithCustomMapper() {
358368
}
359369

360370
@Test
361-
public void testLateReply() throws Exception {
371+
public void testLateReply() {
362372
ConfigurableApplicationContext ac = new ClassPathXmlApplicationContext("GatewayInterfaceTests-context.xml",
363373
this.getClass());
364374
Bar baz = ac.getBean(Bar.class);
365375
String reply = baz.lateReply("hello", 1000, 0);
366376
assertNull(reply);
367377
PollableChannel errorChannel = ac.getBean("errorChannel", PollableChannel.class);
368-
Message<?> receive = errorChannel.receive(5000);
378+
Message<?> receive = errorChannel.receive(10000);
369379
assertNotNull(receive);
370380
MessagingException messagingException = (MessagingException) receive.getPayload();
371381
assertThat(messagingException.getMessage(),
@@ -375,7 +385,7 @@ public void testLateReply() throws Exception {
375385

376386
@Test
377387
public void testInt2634() {
378-
Map<Object, Object> param = Collections.<Object, Object>singletonMap(1, 1);
388+
Map<Object, Object> param = Collections.singletonMap(1, 1);
379389
Object result = this.int2634Gateway.test2(param);
380390
assertEquals(param, result);
381391

@@ -405,7 +415,7 @@ public void testExecs() throws Exception {
405415

406416
ListenableFuture<Thread> result2 = this.execGateway.test2(Thread.currentThread());
407417
final CountDownLatch latch = new CountDownLatch(1);
408-
final AtomicReference<Thread> thread = new AtomicReference<Thread>();
418+
final AtomicReference<Thread> thread = new AtomicReference<>();
409419
result2.addCallback(new ListenableFutureCallback<Thread>() {
410420

411421
@Override
@@ -528,22 +538,26 @@ public interface Bar extends Foo {
528538
void bar(String payload);
529539

530540
void qux(String payload, @Header("name") String nameHeader);
541+
531542
}
532543

533544
public static class NotAnInterface {
534545

535546
public void fail(String payload) {
536547
}
548+
537549
}
538550

539551
public interface Baz {
540552

541553
void baz(String payload);
554+
542555
}
543556

544557
public interface NoArgumentsGateway {
545558

546559
String pullData();
560+
547561
}
548562

549563
public static class BazMapper implements MethodArgsMessageMapper {
@@ -601,7 +615,7 @@ public Message<?> preSend(Message<?> message, MessageChannel channel) {
601615
Object payload;
602616
if (Thread.currentThread().equals(message.getPayload())) {
603617
// running on calling thread - need to return a Future.
604-
payload = new AsyncResult<Thread>(Thread.currentThread());
618+
payload = new AsyncResult<>(Thread.currentThread());
605619
}
606620
else {
607621
payload = Thread.currentThread();
@@ -633,6 +647,7 @@ public MessageHandler autoCreateServiceActivator() {
633647
public GatewayProxyFactoryBean annotationGatewayProxyFactoryBean() {
634648
return new AnnotationGatewayProxyFactoryBean(GatewayByAnnotationGPFB.class);
635649
}
650+
636651
}
637652

638653
@MessagingGateway

0 commit comments

Comments
 (0)