Skip to content

Commit af5bcdf

Browse files
committed
Honor Emission result in FluxMessageChannel
* Implement `Emission` handling in the `FluxMessageChannel` and `IntegrationReactiveUtils` * Upgrade to Spring Kafka `2.6.0` * Fix R2DBC components for deprecation in the Spring Data R2DBC * Implement `StatementMapper.SelectSpec` for query expression * Clean up for some sporadic test failures
1 parent e4b8b32 commit af5bcdf

File tree

9 files changed

+134
-97
lines changed

9 files changed

+134
-97
lines changed

build.gradle

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,7 @@ ext {
100100
soapVersion = '1.4.0'
101101
springAmqpVersion = project.hasProperty('springAmqpVersion') ? project.springAmqpVersion : '2.3.0-SNAPSHOT'
102102
springDataVersion = project.hasProperty('springDataVersion') ? project.springDataVersion : '2020.0.0-SNAPSHOT'
103-
springKafkaVersion = '2.6.0-SNAPSHOT'
103+
springKafkaVersion = '2.6.0'
104104
springRetryVersion = '1.3.0'
105105
springSecurityVersion = project.hasProperty('springSecurityVersion') ? project.springSecurityVersion : '5.4.0-RC1'
106106
springVersion = project.hasProperty('springVersion') ? project.springVersion : '5.3.0-SNAPSHOT'
@@ -199,7 +199,6 @@ configure(javaProjects) { subproject ->
199199
compileKotlin {
200200
kotlinOptions {
201201
jvmTarget = '1.8'
202-
allWarningsAsErrors = true
203202
}
204203
}
205204
compileTestKotlin {

spring-integration-core/src/main/java/org/springframework/integration/channel/FluxMessageChannel.java

Lines changed: 34 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2020 the original author or authors.
2+
* Copyright 2015-2020 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -16,6 +16,9 @@
1616

1717
package org.springframework.integration.channel;
1818

19+
import java.util.concurrent.TimeUnit;
20+
import java.util.concurrent.locks.LockSupport;
21+
1922
import org.reactivestreams.Publisher;
2023
import org.reactivestreams.Subscriber;
2124

@@ -50,16 +53,43 @@ public class FluxMessageChannel extends AbstractMessageChannel
5053

5154
private final Disposable.Composite upstreamSubscriptions = Disposables.composite();
5255

56+
private volatile boolean active = true;
57+
5358
public FluxMessageChannel() {
5459
this.sink = Sinks.many().multicast().onBackpressureBuffer(1, false);
5560
this.processor = FluxProcessor.fromSink(this.sink);
5661
}
5762

5863
@Override
5964
protected boolean doSend(Message<?> message, long timeout) {
60-
Assert.state(this.processor.hasDownstreams(),
65+
Assert.state(this.active && this.processor.hasDownstreams(),
6166
() -> "The [" + this + "] doesn't have subscribers to accept messages");
62-
return this.sink.tryEmitNext(message).hasSucceeded();
67+
long remainingTime = 0;
68+
if (timeout > 0) {
69+
remainingTime = timeout;
70+
}
71+
long parkTimeout = 10; // NOSONAR
72+
long parkTimeoutNs = TimeUnit.MILLISECONDS.toNanos(parkTimeout);
73+
while (this.active && !tryEmitMessage(message)) {
74+
if (timeout >= 0 && (remainingTime -= parkTimeout) <= 0) {
75+
return false;
76+
}
77+
LockSupport.parkNanos(parkTimeoutNs);
78+
}
79+
return true;
80+
}
81+
82+
private boolean tryEmitMessage(Message<?> message) {
83+
switch (this.sink.tryEmitNext(message)) {
84+
case FAIL_OVERFLOW:
85+
return false;
86+
case FAIL_TERMINATED:
87+
case FAIL_CANCELLED:
88+
throw new IllegalStateException("Cannot emit messages into the cancelled or terminated sink: "
89+
+ this.sink);
90+
default:
91+
return true;
92+
}
6393
}
6494

6595
@Override
@@ -89,6 +119,7 @@ public void subscribeTo(Publisher<? extends Message<?>> publisher) {
89119

90120
@Override
91121
public void destroy() {
122+
this.active = false;
92123
this.subscribedSignal.emitNext(false);
93124
this.upstreamSubscriptions.dispose();
94125
this.processor.onComplete();

spring-integration-core/src/main/java/org/springframework/integration/util/IntegrationReactiveUtils.java

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -126,11 +126,22 @@ else if (messageChannel instanceof PollableChannel) {
126126

127127
private static <T> Flux<Message<T>> adaptSubscribableChannelToPublisher(SubscribableChannel inputChannel) {
128128
return Flux.defer(() -> {
129-
Sinks.Many<Message<T>> sink = Sinks.many().multicast().onBackpressureBuffer(1);
130-
@SuppressWarnings("unchecked")
129+
Sinks.Many<Message<T>> sink = Sinks.many().unicast().onBackpressureError();
131130
MessageHandler messageHandler = (message) -> {
132-
while (!sink.tryEmitNext((Message<T>) message).hasSucceeded()) {
133-
LockSupport.parkNanos(100); // NOSONAR
131+
while (true) {
132+
@SuppressWarnings("unchecked")
133+
Sinks.Emission emission = sink.tryEmitNext((Message<T>) message);
134+
switch (emission) {
135+
case FAIL_OVERFLOW:
136+
LockSupport.parkNanos(1000); // NOSONAR
137+
break;
138+
case FAIL_TERMINATED:
139+
case FAIL_CANCELLED:
140+
throw new IllegalStateException("Cannot emit messages into the cancelled " +
141+
"or terminated sink for message channel: " + inputChannel);
142+
default:
143+
return;
144+
}
134145
}
135146
};
136147
inputChannel.subscribe(messageHandler);

spring-integration-core/src/test/java/org/springframework/integration/channel/reactive/ReactiveStreamsConsumerTests.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,6 @@
5151
import org.springframework.integration.config.ConsumerEndpointFactoryBean;
5252
import org.springframework.integration.endpoint.ReactiveStreamsConsumer;
5353
import org.springframework.integration.handler.MethodInvokingMessageHandler;
54-
import org.springframework.integration.test.condition.LogLevels;
5554
import org.springframework.messaging.Message;
5655
import org.springframework.messaging.MessageDeliveryException;
5756
import org.springframework.messaging.MessageHandler;
@@ -175,7 +174,6 @@ public void onComplete() {
175174
reactiveConsumer.stop();
176175
}
177176

178-
@LogLevels(level = "trace", categories = "org.springframework.integration")
179177
@Test
180178
@SuppressWarnings("unchecked")
181179
public void testReactiveStreamsConsumerPollableChannel() throws InterruptedException {
Original file line numberDiff line numberDiff line change
@@ -1,62 +1,60 @@
11
<?xml version="1.0" encoding="UTF-8"?>
22
<beans:beans xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
3-
xmlns:int-file="http://www.springframework.org/schema/integration/file"
4-
xmlns:int="http://www.springframework.org/schema/integration"
5-
xmlns:beans="http://www.springframework.org/schema/beans"
6-
xmlns:context="http://www.springframework.org/schema/context"
7-
xsi:schemaLocation="http://www.springframework.org/schema/beans
3+
xmlns:int-file="http://www.springframework.org/schema/integration/file"
4+
xmlns:int="http://www.springframework.org/schema/integration"
5+
xmlns:beans="http://www.springframework.org/schema/beans"
6+
xsi:schemaLocation="http://www.springframework.org/schema/beans
87
https://www.springframework.org/schema/beans/spring-beans.xsd
9-
http://www.springframework.org/schema/context
10-
https://www.springframework.org/schema/context/spring-context.xsd
118
http://www.springframework.org/schema/integration
129
https://www.springframework.org/schema/integration/spring-integration.xsd
1310
http://www.springframework.org/schema/integration/file
1411
https://www.springframework.org/schema/integration/file/spring-integration-file.xsd">
1512

16-
<int:channel id="inputChannelSaveToBaseDir" />
17-
<int:channel id="inputChannelSaveToBaseDirDeleteSource" />
18-
<int:channel id="inputChannelSaveToSubDir" />
19-
<int:channel id="inputChannelSaveToSubDirAutoCreateOff" />
20-
<int:channel id="inputChannelSaveToSubDirWrongExpression" />
21-
<int:channel id="inputChannelSaveToSubDirWithHeader" />
22-
<int:channel id="inputChannelSaveToSubDirWithFile" />
23-
<int:channel id="inputChannelSaveToSubDirEmptyStringExpression" />
13+
<int:channel id="inputChannelSaveToBaseDir"/>
14+
<int:channel id="inputChannelSaveToBaseDirDeleteSource"/>
15+
<int:channel id="inputChannelSaveToSubDir"/>
16+
<int:channel id="inputChannelSaveToSubDirAutoCreateOff"/>
17+
<int:channel id="inputChannelSaveToSubDirWrongExpression"/>
18+
<int:channel id="inputChannelSaveToSubDirWithHeader"/>
19+
<int:channel id="inputChannelSaveToSubDirWithFile"/>
20+
<int:channel id="inputChannelSaveToSubDirEmptyStringExpression"/>
2421

2522
<int-file:outbound-channel-adapter id="save-to-base-directory"
26-
channel="inputChannelSaveToBaseDir" auto-create-directory="true"
27-
filename-generator-expression="'foo.txt'" directory="target/base-directory" />
23+
channel="inputChannelSaveToBaseDir" auto-create-directory="true"
24+
filename-generator-expression="'foo.txt'" directory="target/base-directory"/>
2825

2926
<int-file:outbound-channel-adapter id="delete-source-files"
30-
channel="inputChannelSaveToBaseDirDeleteSource" auto-create-directory="true"
31-
delete-source-files="true" filename-generator-expression="'foo.txt'"
32-
directory="target/base-directory" />
27+
channel="inputChannelSaveToBaseDirDeleteSource" auto-create-directory="true"
28+
delete-source-files="true" filename-generator-expression="'foo.txt'"
29+
directory="target/base-directory"/>
3330

3431
<int-file:outbound-channel-adapter id="save-to-sub-directory-wrong-expression"
35-
channel="inputChannelSaveToSubDirWrongExpression" auto-create-directory="true"
36-
directory-expression="'target/base-directory/sub-directory/foo.txt'" />
32+
channel="inputChannelSaveToSubDirWrongExpression" auto-create-directory="true"
33+
directory-expression="'target/base-directory/sub-directory/foo.txt'"/>
3734

3835
<int-file:outbound-channel-adapter id="save-to-sub-directory-empty-string-expression"
39-
channel="inputChannelSaveToSubDirEmptyStringExpression" auto-create-directory="true"
40-
directory-expression="' '" />
36+
channel="inputChannelSaveToSubDirEmptyStringExpression"
37+
auto-create-directory="true"
38+
directory-expression="' '"/>
4139

4240
<int-file:outbound-channel-adapter id="save-to-sub-directory"
43-
channel="inputChannelSaveToSubDir" auto-create-directory="true"
44-
directory-expression="'target/base-directory/sub-directory'"
45-
filename-generator-expression="'foo.txt'" />
41+
channel="inputChannelSaveToSubDir" auto-create-directory="true"
42+
directory-expression="'target/base-directory/sub-directory'"
43+
filename-generator-expression="'foo.txt'"/>
4644

4745
<int-file:outbound-channel-adapter id="save-to-sub-directory-with-header"
48-
channel="inputChannelSaveToSubDirWithHeader" auto-create-directory="true"
49-
directory-expression="headers['myFileLocation']"
50-
filename-generator-expression="'foo.txt'" />
46+
channel="inputChannelSaveToSubDirWithHeader" auto-create-directory="true"
47+
directory-expression="headers['myFileLocation']"
48+
filename-generator-expression="'foo.txt'"/>
5149

5250
<int-file:outbound-channel-adapter id="save-to-sub-directory-auto-create-off"
53-
channel="inputChannelSaveToSubDirAutoCreateOff" auto-create-directory="false"
54-
directory-expression="'target/base-directory2/sub-directory2'"
55-
filename-generator-expression="'foo.txt'" />
51+
channel="inputChannelSaveToSubDirAutoCreateOff" auto-create-directory="false"
52+
directory-expression="'target/base-directory2/sub-directory2'"
53+
filename-generator-expression="'foo.txt'"/>
5654

5755
<int-file:outbound-channel-adapter id="save-to-sub-directory-with-file-expression"
58-
channel="inputChannelSaveToSubDirWithFile" auto-create-directory="true"
59-
directory-expression="headers['subDirectory']"
60-
filename-generator-expression="'foo.txt'" />
56+
channel="inputChannelSaveToSubDirWithFile" auto-create-directory="true"
57+
directory-expression="headers['subDirectory']"
58+
filename-generator-expression="'foo.txt'"/>
6159

6260
</beans:beans>

spring-integration-file/src/test/java/org/springframework/integration/file/FileOutboundChannelAdapterIntegrationTests.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2019 the original author or authors.
2+
* Copyright 2002-2020 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -21,6 +21,7 @@
2121

2222
import java.io.File;
2323
import java.io.FileOutputStream;
24+
import java.nio.charset.StandardCharsets;
2425

2526
import org.junit.jupiter.api.BeforeEach;
2627
import org.junit.jupiter.api.Test;
@@ -32,6 +33,7 @@
3233
import org.springframework.messaging.Message;
3334
import org.springframework.messaging.MessageChannel;
3435
import org.springframework.messaging.MessageHandlingException;
36+
import org.springframework.test.annotation.DirtiesContext;
3537
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;
3638
import org.springframework.util.FileCopyUtils;
3739

@@ -41,10 +43,9 @@
4143
* @author Gary Russell
4244
*/
4345
@SpringJUnitConfig
46+
@DirtiesContext
4447
class FileOutboundChannelAdapterIntegrationTests {
4548

46-
static final String DEFAULT_ENCODING = "UTF-8";
47-
4849
static final String SAMPLE_CONTENT = "HelloWorld";
4950

5051
@Autowired
@@ -78,7 +79,7 @@ class FileOutboundChannelAdapterIntegrationTests {
7879
@BeforeEach
7980
void setUp(@TempDir File tmpDir) throws Exception {
8081
sourceFile = new File(tmpDir, "anyFile.txt");
81-
FileCopyUtils.copy(SAMPLE_CONTENT.getBytes(DEFAULT_ENCODING),
82+
FileCopyUtils.copy(SAMPLE_CONTENT.getBytes(StandardCharsets.UTF_8),
8283
new FileOutputStream(sourceFile, false));
8384
message = MessageBuilder.withPayload(sourceFile).build();
8485
}

spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/StoredProcOutboundGatewayWithNamespaceIntegrationTests.java

Lines changed: 19 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2019 the original author or authors.
2+
* Copyright 2002-2020 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -26,30 +26,27 @@
2626
import java.util.concurrent.TimeUnit;
2727
import java.util.concurrent.atomic.AtomicInteger;
2828

29-
import org.junit.Before;
30-
import org.junit.Test;
31-
import org.junit.runner.RunWith;
29+
import org.junit.jupiter.api.BeforeEach;
30+
import org.junit.jupiter.api.Test;
3231

3332
import org.springframework.beans.factory.annotation.Autowired;
3433
import org.springframework.integration.annotation.ServiceActivator;
3534
import org.springframework.integration.jdbc.storedproc.CreateUser;
3635
import org.springframework.integration.jdbc.storedproc.User;
37-
import org.springframework.integration.support.MessageBuilder;
3836
import org.springframework.jdbc.core.JdbcTemplate;
3937
import org.springframework.messaging.Message;
4038
import org.springframework.messaging.MessageChannel;
4139
import org.springframework.messaging.PollableChannel;
40+
import org.springframework.messaging.support.GenericMessage;
4241
import org.springframework.test.annotation.DirtiesContext;
43-
import org.springframework.test.context.ContextConfiguration;
44-
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
42+
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;
4543

4644
/**
4745
* @author Gunnar Hillert
4846
* @author Artem Bilan
4947
*/
50-
@ContextConfiguration
51-
@RunWith(SpringJUnit4ClassRunner.class)
52-
@DirtiesContext // close at the end after class
48+
@SpringJUnitConfig
49+
@DirtiesContext
5350
public class StoredProcOutboundGatewayWithNamespaceIntegrationTests {
5451

5552
@Autowired
@@ -67,7 +64,7 @@ public class StoredProcOutboundGatewayWithNamespaceIntegrationTests {
6764
@Autowired
6865
PollableChannel replyChannel;
6966

70-
@Before
67+
@BeforeEach
7168
public void setUp() {
7269
this.jdbcTemplate.execute("delete from USERS");
7370
}
@@ -77,7 +74,7 @@ public void test() throws Exception {
7774

7875
createUser.createUser(new User("myUsername", "myPassword", "myEmail"));
7976

80-
List<Message<Collection<User>>> received = new ArrayList<Message<Collection<User>>>();
77+
List<Message<Collection<User>>> received = new ArrayList<>();
8178

8279
received.add(consumer.poll(2000));
8380

@@ -98,10 +95,9 @@ public void test() throws Exception {
9895

9996
}
10097

101-
@Test //INT-1029
102-
public void testStoredProcOutboundGatewayInsideChain() throws Exception {
103-
104-
Message<User> requestMessage = MessageBuilder.withPayload(new User("myUsername", "myPassword", "myEmail")).build();
98+
@Test
99+
public void testStoredProcOutboundGatewayInsideChain() {
100+
Message<User> requestMessage = new GenericMessage<>(new User("myUsername", "myPassword", "myEmail"));
105101

106102
storedProcOutboundGatewayInsideChain.send(requestMessage);
107103

@@ -127,35 +123,38 @@ static class Counter {
127123

128124
private final AtomicInteger count = new AtomicInteger();
129125

130-
public Integer next() throws InterruptedException {
126+
public Integer next() {
131127
if (count.get() > 2) {
132128
//prevent message overload
133129
return null;
134130
}
135131
return count.incrementAndGet();
136132
}
133+
137134
}
138135

139136

140137
static class Consumer {
141138

142-
private final BlockingQueue<Message<Collection<User>>> messages = new LinkedBlockingQueue<Message<Collection<User>>>();
139+
private final BlockingQueue<Message<Collection<User>>> messages = new LinkedBlockingQueue<>();
143140

144141
@ServiceActivator
145142
public void receive(Message<Collection<User>> message) {
146-
messages.add(message);
143+
this.messages.add(message);
147144
}
148145

149146
Message<Collection<User>> poll(long timeoutInMillis) throws InterruptedException {
150-
return messages.poll(timeoutInMillis, TimeUnit.MILLISECONDS);
147+
return this.messages.poll(timeoutInMillis, TimeUnit.MILLISECONDS);
151148
}
149+
152150
}
153151

154152
static class TestService {
155153

156154
public String quote(String s) {
157155
return "'" + s + "'";
158156
}
157+
159158
}
160159

161160
}

0 commit comments

Comments
 (0)