Skip to content

Commit 9367d7f

Browse files
committed
Fix MessagingGateway for reactive reply type
* Upgrade dependencies to be ready for release
1 parent 3bb445e commit 9367d7f

File tree

5 files changed

+32
-28
lines changed

5 files changed

+32
-28
lines changed

build.gradle

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -45,24 +45,24 @@ ext {
4545
modifiedFiles =
4646
files(grgit.status().unstaged.modified).filter{ f -> f.name.endsWith('.java') || f.name.endsWith('.kt') }
4747

48-
activeMqVersion = '5.15.12'
49-
apacheSshdVersion = '2.4.0'
48+
activeMqVersion = '5.15.13'
49+
apacheSshdVersion = '2.5.0'
5050
aspectjVersion = '1.9.5'
5151
assertjVersion = '3.16.1'
5252
assertkVersion = '0.22'
5353
avroVersion = '1.9.2'
5454
awaitilityVersion = '4.0.3'
5555
commonsDbcp2Version = '2.7.0'
56-
commonsIoVersion = '2.6'
56+
commonsIoVersion = '2.7'
5757
commonsNetVersion = '3.6'
5858
curatorVersion = '4.3.0'
5959
derbyVersion = '10.14.2.0'
6060
ftpServerVersion = '1.1.1'
6161
googleJsr305Version = '3.0.2'
62-
groovyVersion = '3.0.3'
62+
groovyVersion = '3.0.4'
6363
hamcrestVersion = '2.2'
6464
hazelcastVersion = '4.0.1'
65-
hibernateVersion = '5.4.14.Final'
65+
hibernateVersion = '5.4.18.Final'
6666
hsqldbVersion = '2.5.0'
6767
h2Version = '1.4.200'
6868
jacksonVersion = '2.11.0'
@@ -77,19 +77,19 @@ ext {
7777
jschVersion = '0.1.55'
7878
jsonpathVersion = '2.4.0'
7979
junit4Version = '4.13'
80-
junitJupiterVersion = '5.6.2'
80+
junitJupiterVersion = '5.7.0-M1'
8181
jythonVersion = '2.7.2'
8282
kryoShadedVersion = '4.0.2'
83-
lettuceVersion = '5.3.0.RELEASE'
84-
log4jVersion = '2.13.2'
85-
micrometerVersion = '1.5.1'
83+
lettuceVersion = '6.0.0.M1'
84+
log4jVersion = '2.13.3'
85+
micrometerVersion = '1.5.2'
8686
mockitoVersion = '3.3.3'
87-
mongoDriverVersion = '4.0.3'
87+
mongoDriverVersion = '4.0.4'
8888
mysqlVersion = '8.0.20'
8989
pahoMqttClientVersion = '1.2.2'
90-
postgresVersion = '42.2.12'
90+
postgresVersion = '42.2.14'
9191
reactorVersion = '2020.0.0-M1'
92-
resilience4jVersion = '1.4.0'
92+
resilience4jVersion = '1.5.0'
9393
romeToolsVersion = '1.12.2'
9494
rsocketVersion = '1.0.1'
9595
saajVersion = '1.5.2'
@@ -102,7 +102,7 @@ ext {
102102
springRetryVersion = '1.3.0'
103103
springVersion = project.hasProperty('springVersion') ? project.springVersion : '5.3.0-SNAPSHOT'
104104
springWsVersion = '3.0.9.RELEASE'
105-
tomcatVersion = "9.0.35"
105+
tomcatVersion = "9.0.36"
106106
xstreamVersion = '1.4.12'
107107

108108
javaProjects = subprojects - project(':spring-integration-bom')
@@ -325,7 +325,7 @@ configure(javaProjects) { subproject ->
325325

326326
checkstyle {
327327
configDirectory.set(rootProject.file("src/checkstyle"))
328-
toolVersion = project.hasProperty('checkstyleVersion') ? project.checkstyleVersion : '8.32'
328+
toolVersion = project.hasProperty('checkstyleVersion') ? project.checkstyleVersion : '8.33'
329329
}
330330

331331
jar {
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
distributionBase=GRADLE_USER_HOME
22
distributionPath=wrapper/dists
3-
distributionUrl=https\://services.gradle.org/distributions/gradle-6.4.1-bin.zip
3+
distributionUrl=https\://services.gradle.org/distributions/gradle-6.5-bin.zip
44
zipStoreBase=GRADLE_USER_HOME
55
zipStorePath=wrapper/dists

spring-integration-core/src/main/java/org/springframework/integration/gateway/MessagingGatewaySupport.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2019 the original author or authors.
2+
* Copyright 2002-2020 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -875,8 +875,7 @@ public boolean send(Message<?> message, long timeout) {
875875

876876
@Override
877877
public void subscribeTo(Publisher<? extends Message<?>> publisher) {
878-
this.replyMono.switchIfEmpty(Mono.from(publisher));
879-
this.replyMono.onComplete();
878+
publisher.subscribe(this.replyMono);
880879
}
881880

882881
}

spring-integration-core/src/main/java/org/springframework/integration/handler/AbstractMessageProducingHandler.java

Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2014-2019 the original author or authors.
2+
* Copyright 2014-2020 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -299,21 +299,26 @@ else if (reply instanceof AbstractIntegrationMessageBuilder<?>) {
299299
}
300300

301301
private void doProduceOutput(Message<?> requestMessage, MessageHeaders requestHeaders, Object reply,
302-
@Nullable Object replyChannel) {
302+
@Nullable Object replyChannelArg) {
303+
304+
Object replyChannel = replyChannelArg;
305+
if (replyChannel == null) {
306+
replyChannel = getOutputChannel();
307+
}
303308

304309
if (this.async && (reply instanceof ListenableFuture<?> || reply instanceof Publisher<?>)) {
305-
MessageChannel messageChannel = getOutputChannel();
306-
if (reply instanceof ListenableFuture<?> ||
307-
!(messageChannel instanceof ReactiveStreamsSubscribableChannel)) {
308-
asyncNonReactiveReply(requestMessage, reply, replyChannel);
309-
}
310-
else {
311-
((ReactiveStreamsSubscribableChannel) messageChannel)
310+
if (reply instanceof Publisher<?> &&
311+
replyChannel instanceof ReactiveStreamsSubscribableChannel) {
312+
313+
((ReactiveStreamsSubscribableChannel) replyChannel)
312314
.subscribeTo(
313315
Flux.from((Publisher<?>) reply)
314316
.doOnError((ex) -> sendErrorMessage(requestMessage, ex))
315317
.map(result -> createOutputMessage(result, requestHeaders)));
316318
}
319+
else {
320+
asyncNonReactiveReply(requestMessage, reply, replyChannel);
321+
}
317322
}
318323
else {
319324
sendOutput(createOutputMessage(reply, requestHeaders), replyChannel, false);

spring-integration-rsocket/src/test/java/org/springframework/integration/rsocket/dsl/RSocketDslTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,7 @@ public IntegrationFlow rsocketUpperCaseFlow() {
112112
return IntegrationFlows
113113
.from(RSockets.inboundGateway("/uppercase")
114114
.interactionModels(RSocketInteractionModel.requestChannel))
115-
.<Flux<String>>handle((payload, headers) -> payload.map(String::toUpperCase), e -> e.async(true))
115+
.<Flux<String>, Flux<String>>transform((flux) -> flux.map(String::toUpperCase))
116116
.get();
117117
}
118118

0 commit comments

Comments
 (0)