Skip to content

Commit 3bb445e

Browse files
committed
Fix deprecations from Reactor
1 parent 498f42d commit 3bb445e

File tree

10 files changed

+48
-44
lines changed

10 files changed

+48
-44
lines changed

build.gradle

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -88,10 +88,10 @@ ext {
8888
mysqlVersion = '8.0.20'
8989
pahoMqttClientVersion = '1.2.2'
9090
postgresVersion = '42.2.12'
91-
reactorVersion = 'Dysprosium-SR7'
91+
reactorVersion = '2020.0.0-M1'
9292
resilience4jVersion = '1.4.0'
9393
romeToolsVersion = '1.12.2'
94-
rsocketVersion = '1.0.0'
94+
rsocketVersion = '1.0.1'
9595
saajVersion = '1.5.2'
9696
servletApiVersion = '4.0.1'
9797
smackVersion = '4.3.4'

spring-integration-core/src/main/java/org/springframework/integration/channel/FluxMessageChannel.java

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,11 @@
2424

2525
import reactor.core.Disposable;
2626
import reactor.core.Disposables;
27-
import reactor.core.publisher.EmitterProcessor;
2827
import reactor.core.publisher.Flux;
28+
import reactor.core.publisher.FluxIdentityProcessor;
2929
import reactor.core.publisher.FluxSink;
30-
import reactor.core.publisher.ReplayProcessor;
30+
import reactor.core.publisher.Processors;
31+
import reactor.core.publisher.Sinks;
3132
import reactor.core.scheduler.Schedulers;
3233

3334
/**
@@ -43,16 +44,17 @@
4344
public class FluxMessageChannel extends AbstractMessageChannel
4445
implements Publisher<Message<?>>, ReactiveStreamsSubscribableChannel {
4546

46-
private final EmitterProcessor<Message<?>> processor;
47+
private final FluxIdentityProcessor<Message<?>> processor;
4748

4849
private final FluxSink<Message<?>> sink;
4950

50-
private final ReplayProcessor<Boolean> subscribedSignal = ReplayProcessor.create(1);
51+
private final Sinks.StandaloneFluxSink<Boolean> subscribedSignal = Sinks.replay(1);
5152

5253
private final Disposable.Composite upstreamSubscriptions = Disposables.composite();
5354

55+
@SuppressWarnings("deprecation")
5456
public FluxMessageChannel() {
55-
this.processor = EmitterProcessor.create(1, false);
57+
this.processor = Processors.more().multicast(1, false);
5658
this.sink = this.processor.sink(FluxSink.OverflowStrategy.BUFFER);
5759
}
5860

@@ -67,16 +69,16 @@ protected boolean doSend(Message<?> message, long timeout) {
6769
@Override
6870
public void subscribe(Subscriber<? super Message<?>> subscriber) {
6971
this.processor
70-
.doFinally((s) -> this.subscribedSignal.onNext(this.processor.hasDownstreams()))
72+
.doFinally((s) -> this.subscribedSignal.next(this.processor.hasDownstreams()))
7173
.subscribe(subscriber);
72-
this.subscribedSignal.onNext(this.processor.hasDownstreams());
74+
this.subscribedSignal.next(this.processor.hasDownstreams());
7375
}
7476

7577
@Override
7678
public void subscribeTo(Publisher<? extends Message<?>> publisher) {
7779
this.upstreamSubscriptions.add(
7880
Flux.from(publisher)
79-
.delaySubscription(this.subscribedSignal.filter(Boolean::booleanValue).next())
81+
.delaySubscription(this.subscribedSignal.asFlux().filter(Boolean::booleanValue).next())
8082
.publishOn(Schedulers.boundedElastic())
8183
.doOnNext((message) -> {
8284
try {
@@ -91,7 +93,7 @@ public void subscribeTo(Publisher<? extends Message<?>> publisher) {
9193

9294
@Override
9395
public void destroy() {
94-
this.subscribedSignal.onNext(false);
96+
this.subscribedSignal.next(false);
9597
this.upstreamSubscriptions.dispose();
9698
this.processor.onComplete();
9799
super.destroy();

spring-integration-core/src/main/java/org/springframework/integration/util/IntegrationReactiveUtils.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,9 +30,10 @@
3030
import org.springframework.messaging.PollableChannel;
3131
import org.springframework.messaging.SubscribableChannel;
3232

33-
import reactor.core.publisher.EmitterProcessor;
3433
import reactor.core.publisher.Flux;
34+
import reactor.core.publisher.FluxIdentityProcessor;
3535
import reactor.core.publisher.Mono;
36+
import reactor.core.publisher.Processors;
3637
import reactor.core.scheduler.Schedulers;
3738

3839
/**
@@ -101,7 +102,7 @@ public static <T> Flux<Message<T>> messageSourceToFlux(MessageSource<T> messageS
101102
* - a {@link org.springframework.integration.channel.FluxMessageChannel}
102103
* is returned as is because it is already a {@link Publisher};
103104
* - a {@link SubscribableChannel} is subscribed with a {@link MessageHandler}
104-
* for the {@link EmitterProcessor#onNext(Object)} which is returned from this method;
105+
* for the {@link FluxIdentityProcessor#onNext(Object)} which is returned from this method;
105106
* - a {@link PollableChannel} is wrapped into a {@link MessageSource} lambda and reuses
106107
* {@link #messageSourceToFlux(MessageSource)}.
107108
* @param messageChannel the {@link MessageChannel} to adapt.
@@ -127,7 +128,7 @@ else if (messageChannel instanceof PollableChannel) {
127128

128129
private static <T> Flux<Message<T>> adaptSubscribableChannelToPublisher(SubscribableChannel inputChannel) {
129130
return Flux.defer(() -> {
130-
EmitterProcessor<Message<T>> publisher = EmitterProcessor.create(1);
131+
FluxIdentityProcessor<Message<T>> publisher = Processors.more().multicast(1);
131132
@SuppressWarnings("unchecked")
132133
MessageHandler messageHandler = (message) -> publisher.onNext((Message<T>) message);
133134
inputChannel.subscribe(messageHandler);

spring-integration-core/src/test/java/org/springframework/integration/channel/reactive/FluxMessageChannelTests.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2019 the original author or authors.
2+
* Copyright 2016-2020 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -50,8 +50,8 @@
5050
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;
5151

5252
import reactor.core.Disposable;
53-
import reactor.core.publisher.EmitterProcessor;
5453
import reactor.core.publisher.Flux;
54+
import reactor.core.publisher.FluxIdentityProcessor;
5555

5656
/**
5757
* @author Artem Bilan
@@ -141,7 +141,7 @@ void testFluxMessageChannelCleanUp() throws InterruptedException {
141141

142142
flowRegistration.destroy();
143143

144-
assertThat(TestUtils.getPropertyValue(flux, "processor", EmitterProcessor.class).isTerminated()).isTrue();
144+
assertThat(TestUtils.getPropertyValue(flux, "processor", FluxIdentityProcessor.class).isTerminated()).isTrue();
145145
}
146146

147147
@Configuration

spring-integration-core/src/test/java/org/springframework/integration/channel/reactive/ReactiveStreamsConsumerTests.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -57,9 +57,10 @@
5757
import org.springframework.messaging.ReactiveMessageHandler;
5858
import org.springframework.messaging.support.GenericMessage;
5959

60-
import reactor.core.publisher.EmitterProcessor;
6160
import reactor.core.publisher.Flux;
61+
import reactor.core.publisher.FluxIdentityProcessor;
6262
import reactor.core.publisher.Mono;
63+
import reactor.core.publisher.Processors;
6364
import reactor.test.StepVerifier;
6465
import reactor.util.Loggers;
6566

@@ -298,7 +299,7 @@ public void testReactiveStreamsConsumerViaConsumerEndpointFactoryBean() throws E
298299
public void testReactiveStreamsConsumerFluxMessageChannelReactiveMessageHandler() {
299300
FluxMessageChannel testChannel = new FluxMessageChannel();
300301

301-
EmitterProcessor<Message<?>> processor = EmitterProcessor.create(2, false);
302+
FluxIdentityProcessor<Message<?>> processor = Processors.more().multicast(2, false);
302303

303304
ReactiveMessageHandler messageHandler =
304305
m -> {

spring-integration-core/src/test/java/org/springframework/integration/config/annotation/MessagingAnnotationsWithBeanAnnotationTests.java

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@
8484
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;
8585

8686
import reactor.core.publisher.Mono;
87-
import reactor.core.publisher.MonoProcessor;
87+
import reactor.core.publisher.Sinks;
8888
import reactor.test.StepVerifier;
8989

9090
/**
@@ -241,7 +241,8 @@ public void testReactiveMessageHandler() {
241241
this.reactiveMessageHandlerChannel.send(new GenericMessage<>("test"));
242242

243243
StepVerifier.create(
244-
this.contextConfiguration.messageMonoProcessor
244+
this.contextConfiguration.messageMono
245+
.asMono()
245246
.map(Message::getPayload)
246247
.cast(String.class))
247248
.expectNext("test")
@@ -291,7 +292,7 @@ public MessageChannel routerChannel() {
291292
}
292293

293294
@Bean
294-
@Router(inputChannel = "routerChannel", channelMappings = { "true=odd", "false=filter" }, suffix = "Channel")
295+
@Router(inputChannel = "routerChannel", channelMappings = {"true=odd", "false=filter"}, suffix = "Channel")
295296
public MessageSelector router() {
296297
return new ExpressionEvaluatingSelector("payload % 2 == 0");
297298
}
@@ -373,7 +374,8 @@ public MessageHandler service() {
373374
@Filter(inputChannel = "skippedChannel5")
374375
@Profile("foo")
375376
public MessageHandler skippedMessageHandler() {
376-
return m -> { };
377+
return m -> {
378+
};
377379
}
378380

379381
@Bean
@@ -427,7 +429,7 @@ public Consumer<Message<?>> messageConsumerAsService() {
427429
return collector()::add;
428430
}
429431

430-
MonoProcessor<Message<?>> messageMonoProcessor = MonoProcessor.create();
432+
Sinks.StandaloneMonoSink<Message<?>> messageMono = Sinks.promise();
431433

432434
@Bean
433435
MessageChannel reactiveMessageHandlerChannel() {
@@ -438,8 +440,7 @@ MessageChannel reactiveMessageHandlerChannel() {
438440
@ServiceActivator(inputChannel = "reactiveMessageHandlerChannel")
439441
public ReactiveMessageHandler reactiveMessageHandlerService() {
440442
return (message) -> {
441-
messageMonoProcessor.onNext(message);
442-
messageMonoProcessor.onComplete();
443+
messageMono.success(message);
443444
return Mono.empty();
444445
};
445446
}

spring-integration-core/src/test/java/org/springframework/integration/config/xml/GatewayParserTests.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@
6262
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;
6363

6464
import reactor.core.publisher.Mono;
65-
import reactor.core.publisher.MonoProcessor;
65+
import reactor.core.publisher.Sinks;
6666
import reactor.test.StepVerifier;
6767

6868
/**
@@ -88,14 +88,14 @@ public void testOneWay() {
8888
Message<?> result = channel.receive(10000);
8989
assertThat(result.getPayload()).isEqualTo("foo");
9090

91-
MonoProcessor<Object> defaultMethodHandler = MonoProcessor.create();
91+
Sinks.StandaloneMonoSink<Object> defaultMethodHandler = Sinks.promise();
9292

93-
this.errorChannel.subscribe(message -> defaultMethodHandler.onNext(message.getPayload()));
93+
this.errorChannel.subscribe(message -> defaultMethodHandler.success(message.getPayload()));
9494

9595
String defaultMethodPayload = "defaultMethodPayload";
9696
service.defaultMethodGateway(defaultMethodPayload);
9797

98-
StepVerifier.create(defaultMethodHandler)
98+
StepVerifier.create(defaultMethodHandler.asMono())
9999
.expectNext(defaultMethodPayload)
100100
.verifyComplete();
101101
}
@@ -423,7 +423,7 @@ public void setBeanName(String beanName) {
423423
}
424424

425425
@Override
426-
@SuppressWarnings({ "rawtypes", "unchecked" })
426+
@SuppressWarnings({"rawtypes", "unchecked"})
427427
public <T> Future<T> submit(Callable<T> task) {
428428
try {
429429
Future<?> result = super.submit(task);

spring-integration-rsocket/src/test/java/org/springframework/integration/rsocket/dsl/RSocketDslTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,7 @@ public IntegrationFlow rsocketUpperCaseFlow() {
112112
return IntegrationFlows
113113
.from(RSockets.inboundGateway("/uppercase")
114114
.interactionModels(RSocketInteractionModel.requestChannel))
115-
.<Flux<String>, Flux<String>>transform((flux) -> flux.map(String::toUpperCase))
115+
.<Flux<String>>handle((payload, headers) -> payload.map(String::toUpperCase), e -> e.async(true))
116116
.get();
117117
}
118118

spring-integration-rsocket/src/test/java/org/springframework/integration/rsocket/inbound/RSocketInboundGatewayIntegrationTests.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@
5050
import io.rsocket.transport.netty.server.TcpServerTransport;
5151
import reactor.core.publisher.Flux;
5252
import reactor.core.publisher.Mono;
53-
import reactor.core.publisher.MonoProcessor;
53+
import reactor.core.publisher.Sinks;
5454
import reactor.test.StepVerifier;
5555

5656
/**
@@ -93,7 +93,7 @@ static void tearDown() {
9393
@BeforeEach
9494
void setupTest(TestInfo testInfo) {
9595
if (testInfo.getDisplayName().startsWith("server")) {
96-
this.serverRsocketRequester = serverConfig.clientRequester.block(Duration.ofSeconds(10));
96+
this.serverRsocketRequester = serverConfig.clientRequester.asMono().block(Duration.ofSeconds(10));
9797
}
9898
else {
9999
this.clientRsocketRequester =
@@ -181,7 +181,7 @@ public Mono<String> echoTransformation(Flux<String> payload) {
181181
@EnableIntegration
182182
static class ServerConfig extends CommonConfig {
183183

184-
final MonoProcessor<RSocketRequester> clientRequester = MonoProcessor.create();
184+
final Sinks.StandaloneMonoSink<RSocketRequester> clientRequester = Sinks.promise();
185185

186186
@Bean
187187
public CloseableChannel rsocketServer() {
@@ -204,7 +204,7 @@ public ServerRSocketConnector serverRSocketConnector(ServerRSocketMessageHandler
204204

205205
@EventListener
206206
public void onApplicationEvent(RSocketConnectedEvent event) {
207-
this.clientRequester.onNext(event.getRequester());
207+
this.clientRequester.success(event.getRequester());
208208
}
209209

210210
}

spring-integration-rsocket/src/test/java/org/springframework/integration/rsocket/outbound/RSocketOutboundGatewayIntegrationTests.java

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -66,8 +66,7 @@
6666
import reactor.core.Disposable;
6767
import reactor.core.publisher.Flux;
6868
import reactor.core.publisher.Mono;
69-
import reactor.core.publisher.MonoProcessor;
70-
import reactor.core.publisher.ReplayProcessor;
69+
import reactor.core.publisher.Sinks;
7170
import reactor.test.StepVerifier;
7271

7372
/**
@@ -133,7 +132,7 @@ static void tearDown() {
133132
@BeforeEach
134133
void setupTest(TestInfo testInfo) {
135134
if (testInfo.getDisplayName().startsWith("server")) {
136-
this.serverRsocketRequester = serverController.clientRequester.block(Duration.ofSeconds(10));
135+
this.serverRsocketRequester = serverController.clientRequester.asMono().block(Duration.ofSeconds(10));
137136
}
138137
}
139138

@@ -158,7 +157,7 @@ private void fireAndForget(MessageChannel inputChannel, FluxMessageChannel resul
158157
.setHeader(RSocketRequesterMethodArgumentResolver.RSOCKET_REQUESTER_HEADER, rsocketRequester)
159158
.build());
160159

161-
StepVerifier.create(controller.fireForgetPayloads)
160+
StepVerifier.create(controller.fireForgetPayloads.asFlux())
162161
.expectNext("Hello")
163162
.thenCancel()
164163
.verify();
@@ -537,13 +536,13 @@ public RSocketMessageHandler messageHandler() {
537536
@Controller
538537
static class TestController {
539538

540-
final ReplayProcessor<String> fireForgetPayloads = ReplayProcessor.create();
539+
final Sinks.StandaloneFluxSink<String> fireForgetPayloads = Sinks.replayAll();
541540

542-
final MonoProcessor<RSocketRequester> clientRequester = MonoProcessor.create();
541+
final Sinks.StandaloneMonoSink<RSocketRequester> clientRequester = Sinks.promise();
543542

544543
@MessageMapping("receive")
545544
void receive(String payload) {
546-
this.fireForgetPayloads.onNext(payload);
545+
this.fireForgetPayloads.next(payload);
547546
}
548547

549548
@MessageMapping("echo")
@@ -595,7 +594,7 @@ Mono<Void> handleExceptionWithVoidReturnValue(IllegalStateException ex) {
595594

596595
@ConnectMapping("clientConnect")
597596
void clientConnect(RSocketRequester requester) {
598-
this.clientRequester.onNext(requester);
597+
this.clientRequester.success(requester);
599598
}
600599

601600
}

0 commit comments

Comments
 (0)