|
16 | 16 |
|
17 | 17 | package org.springframework.integration.channel; |
18 | 18 |
|
19 | | -import java.util.concurrent.atomic.AtomicInteger; |
| 19 | +import java.util.concurrent.atomic.AtomicBoolean; |
20 | 20 |
|
21 | 21 | import org.reactivestreams.Publisher; |
22 | 22 | import org.reactivestreams.Subscriber; |
|
27 | 27 | import reactor.core.publisher.EmitterProcessor; |
28 | 28 | import reactor.core.publisher.Flux; |
29 | 29 | import reactor.core.publisher.FluxSink; |
30 | | -import reactor.core.publisher.Mono; |
| 30 | +import reactor.core.publisher.ReplayProcessor; |
| 31 | +import reactor.core.scheduler.Schedulers; |
31 | 32 |
|
32 | 33 | /** |
33 | 34 | * The {@link AbstractMessageChannel} implementation for the |
34 | 35 | * Reactive Streams {@link Publisher} based on the Project Reactor {@link Flux}. |
35 | 36 | * |
36 | 37 | * @author Artem Bilan |
37 | 38 | * @author Gary Russell |
| 39 | + * @author Sergei Egorov |
38 | 40 | * |
39 | 41 | * @since 5.0 |
40 | 42 | */ |
41 | 43 | public class FluxMessageChannel extends AbstractMessageChannel |
42 | 44 | implements Publisher<Message<?>>, ReactiveStreamsSubscribableChannel { |
43 | 45 |
|
44 | | - private final AtomicInteger subscribed = new AtomicInteger(); |
| 46 | + private final EmitterProcessor<Message<?>> processor; |
45 | 47 |
|
46 | | - private final EmitterProcessor<Message<?>> flux; |
| 48 | + private final FluxSink<Message<?>> sink; |
47 | 49 |
|
48 | | - private final EmitterProcessor<Integer> subscriptionDelay = EmitterProcessor.create(false); |
| 50 | + private final AtomicBoolean subscribed = new AtomicBoolean(); |
49 | 51 |
|
50 | | - private FluxSink<Message<?>> sink; |
| 52 | + private final ReplayProcessor<Boolean> subscribedSignal = ReplayProcessor.create(1); |
51 | 53 |
|
52 | 54 | public FluxMessageChannel() { |
53 | | - this.flux = EmitterProcessor.create(1, false); |
54 | | - this.sink = this.flux.sink(); |
| 55 | + this.processor = EmitterProcessor.create(1, false); |
| 56 | + this.sink = this.processor.sink(FluxSink.OverflowStrategy.BUFFER); |
| 57 | + this.subscribedSignal.doOnNext(this.subscribed::set).subscribe(); |
55 | 58 | } |
56 | 59 |
|
57 | 60 | @Override |
58 | 61 | protected boolean doSend(Message<?> message, long timeout) { |
59 | | - Assert.state(this.subscribed.get() > 0, |
| 62 | + Assert.state(this.subscribed.get(), |
60 | 63 | () -> "The [" + this + "] doesn't have subscribers to accept messages"); |
61 | 64 | this.sink.next(message); |
62 | 65 | return true; |
63 | 66 | } |
64 | 67 |
|
65 | 68 | @Override |
66 | 69 | public void subscribe(Subscriber<? super Message<?>> subscriber) { |
67 | | - this.flux.doFinally((signal) -> this.subscribed.decrementAndGet()) |
68 | | - .retry() |
| 70 | + this.processor.doOnSubscribe((s) -> this.subscribedSignal.onNext(true)) |
| 71 | + .doFinally((s) -> this.subscribedSignal.onNext(this.processor.hasDownstreams())) |
69 | 72 | .subscribe(subscriber); |
70 | | - this.subscriptionDelay.onNext(this.subscribed.incrementAndGet()); |
71 | 73 | } |
72 | 74 |
|
73 | 75 | @Override |
74 | 76 | public void subscribeTo(Publisher<? extends Message<?>> publisher) { |
75 | 77 | Flux.from(publisher) |
76 | | - .handle((message, sink) -> sink.next(send(message))) |
77 | | - .onErrorContinue((throwable, event) -> |
78 | | - logger.warn("Error during processing event: " + event, throwable)) |
79 | | - .delaySubscription( |
80 | | - Mono.fromSupplier(this.subscribed::get) |
81 | | - .filter((subscribers) -> subscribers > 0) |
82 | | - .switchIfEmpty(this.subscriptionDelay.next())) |
| 78 | + .delaySubscription(this.subscribedSignal.filter(Boolean::booleanValue).next()) |
| 79 | + .publishOn(Schedulers.boundedElastic()) |
| 80 | + .doOnNext((message) -> { |
| 81 | + try { |
| 82 | + send(message); |
| 83 | + } |
| 84 | + catch (Exception e) { |
| 85 | + logger.warn("Error during processing event: " + message, e); |
| 86 | + } |
| 87 | + }) |
83 | 88 | .subscribe(); |
84 | 89 | } |
85 | 90 |
|
86 | 91 | @Override |
87 | 92 | public void destroy() { |
88 | | - this.subscriptionDelay.onComplete(); |
89 | | - this.flux.onComplete(); |
| 93 | + this.subscribedSignal.onNext(false); |
| 94 | + this.processor.onComplete(); |
90 | 95 | super.destroy(); |
91 | 96 | } |
92 | 97 |
|
|
0 commit comments