-
Notifications
You must be signed in to change notification settings - Fork 1.1k
Use EmitterProcessor in the FluxMessageChannel #3104
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Use EmitterProcessor in the FluxMessageChannel #3104
Conversation
|
@bsideup , Would you mind to take a look into this fix, please? Thanks |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(here was a comment but I missed an important detail so I am updating it :D)
...tegration-core/src/main/java/org/springframework/integration/channel/FluxMessageChannel.java
Outdated
Show resolved
Hide resolved
...tegration-core/src/main/java/org/springframework/integration/channel/FluxMessageChannel.java
Outdated
Show resolved
Hide resolved
...tegration-core/src/main/java/org/springframework/integration/channel/FluxMessageChannel.java
Outdated
Show resolved
Hide resolved
fb52b63 to
e066b8c
Compare
|
I think I found a solution with the Stay tuned! Will come back to you today 😄 |
86bee7f to
cefe941
Compare
|
So, here you are a fix based on the |
| Mono.fromSupplier(this.subscribed::get) | ||
| .filter((subscribers) -> subscribers > 0) | ||
| .switchIfEmpty(this.subscriptionDelay.next())) | ||
| .subscribe(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the proper way would be to have a subscribeTo() to return Mono<Void> instead to let that caller to subscribe or compose further its reactive flow.
This way we would defer an emission from this source publisher as late as possible.
But I'd like to revise that stuff in a separate PR.
Thanks
1f1c315 to
f267182
Compare
|
@bsideup , gentle ping. 😄 |
13bd6a0 to
15d2628
Compare
|
OK! I have minimized the change for this PR as much as possible - 2 files. 😄 Looking forward for your feedback! |
15d2628 to
5d0fa19
Compare
|
OK. I pushed what we have discussed with some workaround for subscription race condition. Thank you for your feedback, @bsideup, looking forward for more! |
5d0fa19 to
b60bc55
Compare
|
@bsideup , so, I have rebased to Would be great to have some feedback before release this Wednesday. Thanks |
Fixes spring-projects#3107 The `MessagingGatewaySupport` has an `errorOnTimeout` option to throw a `MessageTimeoutException` when downstream reply doesn't come back in time for configured reply timeout * Expose an `errorOnTimeout` option as a `TcpInboundGateway` ctor property * Add new factory methods into a `Tcp` factory for Java DSL * Ensure a property works as expected in the `IpIntegrationTests` * Document a new option
The `EmitterProcessor` has a good logic to block upstream producer when its downstream subscriber cannot keep up with overproducing. * Rework `FluxMessageChannel` logic to rely on the `EmitterProcessor` instead of `Flux.create()` * Cancel `FluxMessageChannel` internal subscriptions in the `destroy()` * Fix `ReactiveStreamsTests.testFluxTransform()` for the splitter's delimiter * Ensure in the `FluxMessageChannelTests.testFluxMessageChannel` that we can have several concurrent subscribers to the `FluxMessageChannel`
* Change `subscribers` list into just `AtomicInteger` count marker * fix `DefaultSplitterTests` according a new logic in the `FluxMessageChannel`
…ssageChannel` to wait until this one subscribed. * Use an `EmitterProcessor` to catch subscriptions and pass them as a signal to delayed upstream publishers * Fix `FluxMessageChannelTests.testFluxMessageChannelCleanUp` to verify an actual property instead of removed. * Fix `RSocketOutboundGatewayIntegrationTests` for the proper subscription into a `FluxMessageChannel` before actual interaction with an RSocket gateway. This should help us also to avoid some race conditions in the future
…way" This reverts commit fa6119d.
…bedSignal`. This one is used `delaySubscription` for the upstream publishers * Use a `AtomicBoolean` for subscription state since `doOnSubscribe()` is called before `EmitterProcessor` adds subscribers for its `downstreams` * Use `publishOn(Schedulers.boundedElastic())` for upstream publishers to avoid blocking over there when our `EmitterProcessor` doesn't have enough demand * Refactor reactive tests to have a subscription into the `FluxMessageChannel` earlier than emission happens for it
instead of `doOnSubscribe` * Check for `this.processor.hasDownstreams()` before emitting such an event
f75e027 to
9a35815
Compare
|
Pushed the fix as we discussed in private chat. Thanks |
|
|
||
| this.publishers.values().forEach(ConnectableFlux::connect); | ||
| if (this.processor.hasDownstreams()) { | ||
| this.subscribedSignal.onNext(true); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
probably this.subscribedSignal.onNext(this.processor.hasDownstreams()) would be better
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
He-he! True. Will fix soon.
…ribedSignal`. This way we are less vulnerable race conditions when subscribers are changed actively
|
We're done here with @bsideup . So, let us know if you are OK and we merge (or you). Thanks |
The
EmitterProcessorhas a good logic to block upstream producerwhen its downstream subscriber cannot keep up with overproducing.
FluxMessageChannellogic to rely on theEmitterProcessorinstead of
Flux.create()FluxMessageChannelinternal subscriptions in thedestroy()ReactiveStreamsTests.testFluxTransform()for the splitter'sdelimiter
FluxMessageChannelTests.testFluxMessageChannelthat we can have several concurrent subscribers to the
FluxMessageChannel