Skip to content

Commit 481d5eb

Browse files
committed
Fix new Sonar smells
* Use `tryEmitNext` on Reactor `Sink` since `emitNext` is deprecated * Add `MessageDeliveryException` emission when `send()` returns `false` in the `FluxMessageChannel` for `subscribeTo` provided `Publisher`
1 parent 383af8c commit 481d5eb

File tree

4 files changed

+13
-8
lines changed

4 files changed

+13
-8
lines changed

spring-integration-core/src/main/java/org/springframework/integration/channel/FluxMessageChannel.java

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.reactivestreams.Subscriber;
2424

2525
import org.springframework.messaging.Message;
26+
import org.springframework.messaging.MessageDeliveryException;
2627
import org.springframework.util.Assert;
2728

2829
import reactor.core.Disposable;
@@ -71,7 +72,8 @@ protected boolean doSend(Message<?> message, long timeout) {
7172
long parkTimeout = 10; // NOSONAR
7273
long parkTimeoutNs = TimeUnit.MILLISECONDS.toNanos(parkTimeout);
7374
while (this.active && !tryEmitMessage(message)) {
74-
if (timeout >= 0 && (remainingTime -= parkTimeout) <= 0) {
75+
remainingTime -= parkTimeout;
76+
if (timeout >= 0 && remainingTime <= 0) {
7577
return false;
7678
}
7779
LockSupport.parkNanos(parkTimeoutNs);
@@ -95,9 +97,9 @@ private boolean tryEmitMessage(Message<?> message) {
9597
@Override
9698
public void subscribe(Subscriber<? super Message<?>> subscriber) {
9799
this.processor
98-
.doFinally((s) -> this.subscribedSignal.emitNext(this.processor.hasDownstreams()))
100+
.doFinally((s) -> this.subscribedSignal.tryEmitNext(this.processor.hasDownstreams()))
99101
.subscribe(subscriber);
100-
this.subscribedSignal.emitNext(this.processor.hasDownstreams());
102+
this.subscribedSignal.tryEmitNext(this.processor.hasDownstreams());
101103
}
102104

103105
@Override
@@ -108,7 +110,10 @@ public void subscribeTo(Publisher<? extends Message<?>> publisher) {
108110
.publishOn(Schedulers.boundedElastic())
109111
.doOnNext((message) -> {
110112
try {
111-
send(message);
113+
if (!send(message)) {
114+
throw new MessageDeliveryException(message,
115+
"Failed to send message to channel '" + this);
116+
}
112117
}
113118
catch (Exception ex) {
114119
logger.warn("Error during processing event: " + message, ex);
@@ -120,7 +125,7 @@ public void subscribeTo(Publisher<? extends Message<?>> publisher) {
120125
@Override
121126
public void destroy() {
122127
this.active = false;
123-
this.subscribedSignal.emitNext(false);
128+
this.subscribedSignal.tryEmitNext(false);
124129
this.upstreamSubscriptions.dispose();
125130
this.processor.onComplete();
126131
super.destroy();

spring-integration-core/src/main/java/org/springframework/integration/util/IntegrationReactiveUtils.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,7 @@ public static <T> Flux<Message<T>> messageSourceToFlux(MessageSource<T> messageS
100100
* - a {@link org.springframework.integration.channel.FluxMessageChannel}
101101
* is returned as is because it is already a {@link Publisher};
102102
* - a {@link SubscribableChannel} is subscribed with a {@link MessageHandler}
103-
* for the {@link Sinks.Many#emitNext(Object)} which is returned from this method;
103+
* for the {@link Sinks.Many#tryEmitNext(Object)} which is returned from this method;
104104
* - a {@link PollableChannel} is wrapped into a {@link MessageSource} lambda and reuses
105105
* {@link #messageSourceToFlux(MessageSource)}.
106106
* @param messageChannel the {@link MessageChannel} to adapt.

spring-integration-core/src/test/java/org/springframework/integration/channel/reactive/ReactiveStreamsConsumerTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -301,7 +301,7 @@ public void testReactiveStreamsConsumerFluxMessageChannelReactiveMessageHandler(
301301

302302
ReactiveMessageHandler messageHandler =
303303
m -> {
304-
sink.emitNext(m);
304+
sink.tryEmitNext(m);
305305
return Mono.empty();
306306
};
307307

spring-integration-rsocket/src/test/java/org/springframework/integration/rsocket/outbound/RSocketOutboundGatewayIntegrationTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -544,7 +544,7 @@ static class TestController {
544544

545545
@MessageMapping("receive")
546546
void receive(String payload) {
547-
this.fireForgetPayloads.emitNext(payload);
547+
this.fireForgetPayloads.tryEmitNext(payload);
548548
}
549549

550550
@MessageMapping("echo")

0 commit comments

Comments
 (0)