Skip to content

Commit 8c03d72

Browse files
artembilangaryrussell
authored andcommitted
INT-4510: Test no memory leak in FluxMessageCh
JIRA: https://jira.spring.io/browse/INT-4510 Related to reactor/reactor-core#1290 The `Flux.publish()` and subsequent `connect()` doesn't fuse a subscriber for the hooks flow is interrupted (complete, or error, or disconnect). In this case the `FluxMessageChannel.publishers` store is not cleared from the finished publishers * Add test-case to check the `FluxMessageChannel.publishers` store after finishing the stream * Add `hide()` operator with TODO to remove when an appropriate Reactor version is ready **Cherry-pick to 5.0.x**
1 parent 8b13b28 commit 8c03d72

File tree

2 files changed

+38
-2
lines changed

2 files changed

+38
-2
lines changed

spring-integration-core/src/main/java/org/springframework/integration/channel/FluxMessageChannel.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,7 @@ public void subscribeTo(Publisher<Message<?>> publisher) {
8484
.handle((message, sink) -> sink.next(send(message)))
8585
.errorStrategyContinue()
8686
.doOnComplete(() -> this.publishers.remove(publisher))
87+
.hide() // TODO remove after upgrade to Reactor 3.1.9.RELEASE or later
8788
.publish();
8889

8990
this.publishers.put(publisher, connectableFlux);

spring-integration-core/src/test/java/org/springframework/integration/channel/reactive/FluxMessageChannelTests.java

Lines changed: 37 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2017 the original author or authors.
2+
* Copyright 2016-2018 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -26,8 +26,10 @@
2626

2727
import java.util.ArrayList;
2828
import java.util.List;
29+
import java.util.Map;
2930
import java.util.concurrent.CountDownLatch;
3031
import java.util.concurrent.TimeUnit;
32+
import java.util.stream.IntStream;
3133

3234
import org.junit.Test;
3335
import org.junit.runner.RunWith;
@@ -40,6 +42,10 @@
4042
import org.springframework.integration.channel.MessageChannelReactiveUtils;
4143
import org.springframework.integration.channel.QueueChannel;
4244
import org.springframework.integration.config.EnableIntegration;
45+
import org.springframework.integration.dsl.IntegrationFlow;
46+
import org.springframework.integration.dsl.MessageChannels;
47+
import org.springframework.integration.dsl.context.IntegrationFlowContext;
48+
import org.springframework.integration.test.util.TestUtils;
4349
import org.springframework.messaging.Message;
4450
import org.springframework.messaging.MessageChannel;
4551
import org.springframework.messaging.MessagingException;
@@ -53,6 +59,7 @@
5359

5460
/**
5561
* @author Artem Bilan
62+
*
5663
* @since 5.0
5764
*/
5865
@RunWith(SpringRunner.class)
@@ -68,8 +75,11 @@ public class FluxMessageChannelTests {
6875
@Autowired
6976
private PollableChannel errorChannel;
7077

78+
@Autowired
79+
private IntegrationFlowContext integrationFlowContext;
80+
7181
@Test
72-
public void testFluxMessageChannel() throws InterruptedException {
82+
public void testFluxMessageChannel() {
7383
QueueChannel replyChannel = new QueueChannel();
7484

7585
for (int i = 0; i < 10; i++) {
@@ -106,6 +116,31 @@ public void testMessageChannelReactiveAdaptation() throws InterruptedException {
106116
assertThat(results, contains("FOO", "BAR"));
107117
}
108118

119+
@Test
120+
public void testFluxMessageChannelCleanUp() throws InterruptedException {
121+
FluxMessageChannel flux = MessageChannels.flux().get();
122+
123+
CountDownLatch finishLatch = new CountDownLatch(1);
124+
125+
IntegrationFlow testFlow = f -> f
126+
.<String>split(__ -> Flux.fromStream(IntStream.range(0, 100).boxed()), null)
127+
.channel(flux)
128+
.aggregate(a -> a.releaseStrategy(m -> m.size() == 100))
129+
.handle(__ -> finishLatch.countDown());
130+
131+
IntegrationFlowContext.IntegrationFlowRegistration flowRegistration =
132+
this.integrationFlowContext.registration(testFlow)
133+
.register();
134+
135+
flowRegistration.getInputChannel().send(new GenericMessage<>("foo"));
136+
137+
assertTrue(finishLatch.await(10, TimeUnit.SECONDS));
138+
139+
assertTrue(TestUtils.getPropertyValue(flux, "publishers", Map.class).isEmpty());
140+
141+
flowRegistration.destroy();
142+
}
143+
109144
@Configuration
110145
@EnableIntegration
111146
public static class TestConfiguration {

0 commit comments

Comments
 (0)