Skip to content

Commit 6591ce9

Browse files
committed
Fix ReactiveStreamsConsumer for plain subscriber
https://build.spring.io/browse/INT-MASTERSPRING40-978 Investigate a behaviour for `ReactiveStreamsConsumer` when we use a plain `Subscriber` directly instead of `doOn...` callbacks. It looks like there is some race condition when the data can be consumed upstream, but there is no consumer downstream ready yet
1 parent 538cfe4 commit 6591ce9

File tree

1 file changed

+13
-12
lines changed

1 file changed

+13
-12
lines changed

spring-integration-core/src/main/java/org/springframework/integration/endpoint/ReactiveStreamsConsumer.java

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -162,22 +162,23 @@ protected void doStart() {
162162
this.lifecycleDelegate.start();
163163
}
164164

165-
Flux<?> flux = null;
166165
if (this.reactiveMessageHandler != null) {
167-
flux = Flux.from(this.publisher)
168-
.flatMap(this.reactiveMessageHandler::handleMessage);
169-
}
170-
else if (this.subscriber != null) {
171-
flux = Flux.from(this.publisher)
172-
.doOnSubscribe(this.subscriber::onSubscribe)
173-
.doOnComplete(this.subscriber::onComplete)
174-
.doOnNext(this.subscriber::onNext);
175-
}
176-
if (flux != null) {
177166
this.subscription =
178-
flux.onErrorContinue((ex, data) -> this.errorHandler.handleError(ex))
167+
Flux.from(this.publisher)
168+
.flatMap(this.reactiveMessageHandler::handleMessage)
169+
.onErrorContinue(this::onError)
179170
.subscribe();
180171
}
172+
else if (this.subscriber != null) {
173+
Flux.from(this.publisher)
174+
.doOnSubscribe((subs) -> this.subscription = subs::cancel)
175+
.onErrorContinue(this::onError)
176+
.subscribe(this.subscriber);
177+
}
178+
}
179+
180+
private void onError(Throwable ex, Object data) {
181+
this.errorHandler.handleError(ex);
181182
}
182183

183184
@Override

0 commit comments

Comments
 (0)