Skip to content

Commit c94eaf7

Browse files
artembilangaryrussell
authored andcommitted
Fix IntegrationReactiveUtils
The `Mono.doOnSuccess()` is always called for completed `MonoSink` even if the value is `null` * Fix `IntegrationReactiveUtils.messageSourceToFlux()` to check a message for `null` before calling `AckUtils.autoAck()` * Add an `logger.error()` message when `doOnError()` is invoked for that `Mono` **Cherry-pick to 5.3.x**
1 parent e714310 commit c94eaf7

File tree

1 file changed

+10
-2
lines changed

1 file changed

+10
-2
lines changed

spring-integration-core/src/main/java/org/springframework/integration/util/IntegrationReactiveUtils.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@
1919
import java.time.Duration;
2020
import java.util.concurrent.locks.LockSupport;
2121

22+
import org.apache.commons.logging.Log;
23+
import org.apache.commons.logging.LogFactory;
2224
import org.reactivestreams.Publisher;
2325

2426
import org.springframework.integration.StaticMessageHeaderAccessor;
@@ -45,6 +47,8 @@
4547
*/
4648
public final class IntegrationReactiveUtils {
4749

50+
private static final Log logger = LogFactory.getLog(IntegrationReactiveUtils.class);
51+
4852
/**
4953
* The subscriber context entry for {@link Flux#delayElements}
5054
* from the {@link Mono#repeatWhenEmpty(java.util.function.Function)}.
@@ -76,14 +80,18 @@ public static <T> Flux<Message<T>> messageSourceToFlux(MessageSource<T> messageS
7680
return Mono.
7781
<Message<T>>create(monoSink ->
7882
monoSink.onRequest(value -> monoSink.success(messageSource.receive())))
79-
.doOnSuccess((message) ->
80-
AckUtils.autoAck(StaticMessageHeaderAccessor.getAcknowledgmentCallback(message)))
83+
.doOnSuccess((message) -> {
84+
if (message != null) {
85+
AckUtils.autoAck(StaticMessageHeaderAccessor.getAcknowledgmentCallback(message));
86+
}
87+
})
8188
.doOnError(MessagingException.class,
8289
(ex) -> {
8390
Message<?> failedMessage = ex.getFailedMessage();
8491
if (failedMessage != null) {
8592
AckUtils.autoNack(StaticMessageHeaderAccessor.getAcknowledgmentCallback(failedMessage));
8693
}
94+
logger.error("Error from Flux for : " + messageSource, ex);
8795
})
8896
.subscribeOn(Schedulers.boundedElastic())
8997
.repeatWhenEmpty((repeat) ->

0 commit comments

Comments
 (0)