Skip to content

Commit 6780bbd

Browse files
micheljungartembilan
authored andcommitted
GH-3367: Separate timeouts in BarrierMH
Fixes #3367 Introduce a `requestTimeout` and `triggerTimeout` for `BarrierMessageHandler` For instance, if an HTTP request sends a message to the barrier, it should time out after 1min if no trigger message is received. If the trigger message then arrives late and the HTTP request is no longer waiting, it shouldn't wait for 1min before discarding the request but do so immediately.
1 parent 3fb6567 commit 6780bbd

File tree

7 files changed

+113
-47
lines changed

7 files changed

+113
-47
lines changed

spring-integration-core/src/main/java/org/springframework/integration/aggregator/BarrierMessageHandler.java

Lines changed: 73 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2015-2019 the original author or authors.
2+
* Copyright 2015-2020 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -39,14 +39,15 @@
3939
* the timeout occurs. Only one thread with a particular correlation (result of invoking
4040
* the {@link CorrelationStrategy}) can be suspended at a time. If the inbound thread does
4141
* not arrive before the trigger thread, the latter is suspended until it does, or the
42-
* timeout occurs.
42+
* timeout occurs. Separate timeouts may be configured for request and trigger messages.
4343
* <p>
4444
* The default {@link CorrelationStrategy} is a {@link HeaderAttributeCorrelationStrategy}.
4545
* <p>
4646
* The default output processor is a {@link DefaultAggregatingMessageGroupProcessor}.
4747
*
4848
* @author Gary Russell
4949
* @author Artem Bilan
50+
* @author Michel Jung
5051
*
5152
* @since 4.2
5253
*/
@@ -57,7 +58,9 @@ public class BarrierMessageHandler extends AbstractReplyProducingMessageHandler
5758

5859
private final Map<Object, Thread> inProcess = new ConcurrentHashMap<>();
5960

60-
private final long timeout;
61+
private final long requestTimeout;
62+
63+
private final long triggerTimeout;
6164

6265
private final CorrelationStrategy correlationStrategy;
6366

@@ -70,48 +73,100 @@ public class BarrierMessageHandler extends AbstractReplyProducingMessageHandler
7073
/**
7174
* Construct an instance with the provided timeout and default correlation and
7275
* output strategies.
73-
* @param timeout the timeout in milliseconds.
76+
* @param timeout the timeout in milliseconds for both, request and trigger messages.
7477
*/
7578
public BarrierMessageHandler(long timeout) {
76-
this(timeout, new DefaultAggregatingMessageGroupProcessor());
79+
this(timeout, timeout);
7780
}
7881

7982
/**
8083
* Construct an instance with the provided timeout and output processor, and default
8184
* correlation strategy.
82-
* @param timeout the timeout in milliseconds.
85+
* @param timeout the timeout in milliseconds for both, request and trigger messages.
8386
* @param outputProcessor the output {@link MessageGroupProcessor}.
8487
*/
8588
public BarrierMessageHandler(long timeout, MessageGroupProcessor outputProcessor) {
86-
this(timeout, outputProcessor, null);
89+
this(timeout, timeout, outputProcessor);
8790
}
8891

8992
/**
9093
* Construct an instance with the provided timeout and correlation strategy, and default
9194
* output processor.
92-
* @param timeout the timeout in milliseconds.
95+
* @param timeout the timeout in milliseconds for both, request and trigger messages.
9396
* @param correlationStrategy the correlation strategy.
9497
*/
9598
public BarrierMessageHandler(long timeout, CorrelationStrategy correlationStrategy) {
96-
this(timeout, new DefaultAggregatingMessageGroupProcessor(), correlationStrategy);
99+
this(timeout, timeout, correlationStrategy);
97100
}
98101

99102
/**
100103
* Construct an instance with the provided timeout and output processor, and default
101104
* correlation strategy.
102-
* @param timeout the timeout in milliseconds.
105+
* @param timeout the timeout in milliseconds for both, request and trigger messages.
103106
* @param outputProcessor the output {@link MessageGroupProcessor}.
104107
* @param correlationStrategy the correlation strategy.
105108
*/
106109
public BarrierMessageHandler(long timeout, MessageGroupProcessor outputProcessor,
107110
CorrelationStrategy correlationStrategy) {
108111

112+
this(timeout, timeout, outputProcessor, correlationStrategy);
113+
}
114+
115+
/**
116+
* Construct an instance with the provided timeouts and default correlation and
117+
* output strategies.
118+
* @param requestTimeout the timeout in milliseconds when waiting for trigger message.
119+
* @param triggerTimeout the timeout in milliseconds when waiting for a request message.
120+
* @since 5.4
121+
*/
122+
public BarrierMessageHandler(long requestTimeout, long triggerTimeout) {
123+
this(requestTimeout, triggerTimeout, new DefaultAggregatingMessageGroupProcessor());
124+
}
125+
126+
/**
127+
* Construct an instance with the provided timeout and output processor, and default
128+
* correlation strategy.
129+
* @param requestTimeout the timeout in milliseconds when waiting for trigger message.
130+
* @param triggerTimeout the timeout in milliseconds when waiting for a request message.
131+
* @param outputProcessor the output {@link MessageGroupProcessor}.
132+
* @since 5.4
133+
*/
134+
public BarrierMessageHandler(long requestTimeout, long triggerTimeout, MessageGroupProcessor outputProcessor) {
135+
this(requestTimeout, triggerTimeout, outputProcessor, null);
136+
}
137+
138+
/**
139+
* Construct an instance with the provided timeout and correlation strategy, and default
140+
* output processor.
141+
* @param requestTimeout the timeout in milliseconds when waiting for trigger message.
142+
* @param triggerTimeout the timeout in milliseconds when waiting for a request message.
143+
* @param correlationStrategy the correlation strategy.
144+
* @since 5.4
145+
*/
146+
public BarrierMessageHandler(long requestTimeout, long triggerTimeout, CorrelationStrategy correlationStrategy) {
147+
this(requestTimeout, triggerTimeout, new DefaultAggregatingMessageGroupProcessor(), correlationStrategy);
148+
}
149+
150+
/**
151+
* Construct an instance with the provided timeout and output processor, and default
152+
* correlation strategy.
153+
* @param requestTimeout the timeout in milliseconds when waiting for trigger message.
154+
* @param triggerTimeout the timeout in milliseconds when waiting for a request message.
155+
* @param outputProcessor the output {@link MessageGroupProcessor}.
156+
* @param correlationStrategy the correlation strategy.
157+
* @since 5.4
158+
*/
159+
public BarrierMessageHandler(long requestTimeout, long triggerTimeout, MessageGroupProcessor outputProcessor,
160+
CorrelationStrategy correlationStrategy) {
161+
109162
Assert.notNull(outputProcessor, "'messageGroupProcessor' cannot be null");
110163
this.messageGroupProcessor = outputProcessor;
111-
this.correlationStrategy = (correlationStrategy == null
112-
? new HeaderAttributeCorrelationStrategy(IntegrationMessageHeaderAccessor.CORRELATION_ID)
113-
: correlationStrategy);
114-
this.timeout = timeout;
164+
this.correlationStrategy =
165+
correlationStrategy == null
166+
? new HeaderAttributeCorrelationStrategy(IntegrationMessageHeaderAccessor.CORRELATION_ID)
167+
: correlationStrategy;
168+
this.requestTimeout = requestTimeout;
169+
this.triggerTimeout = triggerTimeout;
115170
}
116171

117172
/**
@@ -163,12 +218,12 @@ protected Object handleRequestMessage(Message<?> requestMessage) {
163218
}
164219
Thread existing = this.inProcess.putIfAbsent(key, Thread.currentThread());
165220
if (existing != null) {
166-
throw new MessagingException(requestMessage, "Correlation key ("
167-
+ key + ") is already in use by " + existing.getName());
221+
throw new MessagingException(requestMessage,
222+
"Correlation key (" + key + ") is already in use by " + existing.getName());
168223
}
169224
SynchronousQueue<Message<?>> syncQueue = createOrObtainQueue(key);
170225
try {
171-
Message<?> releaseMessage = syncQueue.poll(this.timeout, TimeUnit.MILLISECONDS);
226+
Message<?> releaseMessage = syncQueue.poll(this.requestTimeout, TimeUnit.MILLISECONDS);
172227
if (releaseMessage != null) {
173228
return processRelease(key, requestMessage, releaseMessage);
174229
}
@@ -228,7 +283,7 @@ public void trigger(Message<?> message) {
228283
}
229284
SynchronousQueue<Message<?>> syncQueue = createOrObtainQueue(key);
230285
try {
231-
if (!syncQueue.offer(message, this.timeout, TimeUnit.MILLISECONDS)) {
286+
if (!syncQueue.offer(message, this.triggerTimeout, TimeUnit.MILLISECONDS)) {
232287
this.logger.error("Suspending thread timed out or did not arrive within timeout for: " + message);
233288
this.suspensions.remove(key);
234289
MessageChannel messageChannel = getDiscardChannel();

spring-integration-core/src/main/java/org/springframework/integration/config/xml/BarrierParser.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2015-2019 the original author or authors.
2+
* Copyright 2015-2020 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -27,16 +27,20 @@
2727
* Parser for {@code <int:barrier/>}.
2828
*
2929
* @author Gary Russell
30+
* @author Artem Bilan
3031
*
3132
* @since 4.2
3233
*/
3334
public class BarrierParser extends AbstractConsumerEndpointParser {
3435

3536
@Override
3637
protected BeanDefinitionBuilder parseHandler(Element element, ParserContext parserContext) {
37-
BeanDefinitionBuilder handlerBuilder = BeanDefinitionBuilder
38-
.genericBeanDefinition(BarrierMessageHandler.class);
38+
BeanDefinitionBuilder handlerBuilder = BeanDefinitionBuilder.genericBeanDefinition(BarrierMessageHandler.class);
3939
handlerBuilder.addConstructorArgValue(element.getAttribute("timeout"));
40+
String triggerTimeout = element.getAttribute("trigger-timeout");
41+
if (StringUtils.hasText(triggerTimeout)) {
42+
handlerBuilder.addConstructorArgValue(triggerTimeout);
43+
}
4044
String processor = element.getAttribute("output-processor");
4145
if (StringUtils.hasText(processor)) {
4246
handlerBuilder.addConstructorArgReference(processor);

spring-integration-core/src/main/resources/org/springframework/integration/config/spring-integration.xsd

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1736,8 +1736,7 @@
17361736
<xsd:complexType>
17371737
<xsd:choice minOccurs="0" maxOccurs="2">
17381738
<xsd:element name="transactional" type="transactionalType" minOccurs="0" maxOccurs="1"/>
1739-
<xsd:element name="request-handler-advice-chain" type="handlerAdviceChainType" minOccurs="0"
1740-
maxOccurs="1"/>
1739+
<xsd:element name="request-handler-advice-chain" type="handlerAdviceChainType" minOccurs="0"/>
17411740
<xsd:element ref="poller"/>
17421741
</xsd:choice>
17431742
<xsd:attributeGroup ref="inputOutputChannelGroup"/>
@@ -1751,7 +1750,15 @@
17511750
</xsd:documentation>
17521751
</xsd:annotation>
17531752
</xsd:attribute>
1754-
<xsd:attribute name="requires-reply" type="xsd:string" use="optional">
1753+
<xsd:attribute name="trigger-timeout" type="xsd:string">
1754+
<xsd:annotation>
1755+
<xsd:documentation>
1756+
The time in milliseconds to suspend the trigger thread.
1757+
If not provided a 'timeout' is used.
1758+
</xsd:documentation>
1759+
</xsd:annotation>
1760+
</xsd:attribute>
1761+
<xsd:attribute name="requires-reply" type="xsd:string">
17551762
<xsd:annotation>
17561763
<xsd:documentation>
17571764
Specify whether the barrier must return a non-null value. This value will be

spring-integration-core/src/test/java/org/springframework/integration/config/xml/BarrierParserTests-context.xml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,8 @@
1111

1212
<int:barrier id="barrier1" input-channel="in" output-channel="out" correlation-strategy-expression="'foo'"
1313
requires-reply="true" discard-channel="discards"
14-
timeout="10000">
14+
timeout="10000"
15+
trigger-timeout="5000">
1516
<int:poller fixed-delay="100" />
1617
</int:barrier>
1718

spring-integration-core/src/test/java/org/springframework/integration/config/xml/BarrierParserTests.java

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2015-2019 the original author or authors.
2+
* Copyright 2015-2020 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -18,8 +18,7 @@
1818

1919
import static org.assertj.core.api.Assertions.assertThat;
2020

21-
import org.junit.Test;
22-
import org.junit.runner.RunWith;
21+
import org.junit.jupiter.api.Test;
2322

2423
import org.springframework.beans.factory.annotation.Autowired;
2524
import org.springframework.integration.aggregator.BarrierMessageHandler;
@@ -35,16 +34,16 @@
3534
import org.springframework.messaging.PollableChannel;
3635
import org.springframework.messaging.support.GenericMessage;
3736
import org.springframework.test.annotation.DirtiesContext;
38-
import org.springframework.test.context.ContextConfiguration;
39-
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
37+
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;
4038

4139
/**
4240
* @author Gary Russell
41+
* @author Artem Bilan
42+
*
4343
* @since 4.2
4444
*
4545
*/
46-
@ContextConfiguration
47-
@RunWith(SpringJUnit4ClassRunner.class)
46+
@SpringJUnitConfig
4847
@DirtiesContext
4948
public class BarrierParserTests {
5049

@@ -71,8 +70,8 @@ public class BarrierParserTests {
7170

7271
@Test
7372
public void parserTestsWithMessage() {
74-
this.in.send(new GenericMessage<String>("foo"));
75-
this.release.send(new GenericMessage<String>("bar"));
73+
this.in.send(new GenericMessage<>("foo"));
74+
this.release.send(new GenericMessage<>("bar"));
7675
Message<?> received = out.receive(10000);
7776
assertThat(received).isNotNull();
7877
this.barrier1.stop();
@@ -82,7 +81,8 @@ public void parserTestsWithMessage() {
8281
public void parserFieldPopulationTests() {
8382
BarrierMessageHandler handler = TestUtils.getPropertyValue(this.barrier1, "handler",
8483
BarrierMessageHandler.class);
85-
assertThat(TestUtils.getPropertyValue(handler, "timeout")).isEqualTo(10000L);
84+
assertThat(TestUtils.getPropertyValue(handler, "requestTimeout")).isEqualTo(10000L);
85+
assertThat(TestUtils.getPropertyValue(handler, "triggerTimeout")).isEqualTo(5000L);
8686
assertThat(TestUtils.getPropertyValue(handler, "requiresReply", Boolean.class)).isTrue();
8787
assertThat(TestUtils.getPropertyValue(this.barrier2, "handler.correlationStrategy"))
8888
.isInstanceOf(HeaderAttributeCorrelationStrategy.class);

src/reference/asciidoc/barrier.adoc

Lines changed: 9 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -3,30 +3,27 @@
33

44
Sometimes, we need to suspend a message flow thread until some other asynchronous event occurs.
55
For example, consider an HTTP request that publishes a message to RabbitMQ.
6-
We might wish to not reply to the user until the RabbitMQ broker has issued an acknowledgment that the message was
7-
received.
6+
We might wish to not reply to the user until the RabbitMQ broker has issued an acknowledgment that the message was received.
87

98
In version 4.2, Spring Integration introduced the `<barrier/>` component for this purpose.
109
The underlying `MessageHandler` is the `BarrierMessageHandler`.
11-
This class also implements
12-
`MessageTriggerAction`, in which a message passed to the `trigger()` method releases a corresponding thread in the
13-
`handleRequestMessage()` method (if present).
10+
This class also implements `MessageTriggerAction`, in which a message passed to the `trigger()` method releases a corresponding thread in the `handleRequestMessage()` method (if present).
1411

1512
The suspended thread and trigger thread are correlated by invoking a `CorrelationStrategy` on the messages.
16-
When a message is sent to the `input-channel`, the thread is suspended for up to `timeout` milliseconds, waiting for
17-
a corresponding trigger message.
13+
When a message is sent to the `input-channel`, the thread is suspended for up to `requestTimeout` milliseconds, waiting for a corresponding trigger message.
1814
The default correlation strategy uses the `IntegrationMessageHeaderAccessor.CORRELATION_ID` header.
1915
When a trigger message arrives with the same correlation, the thread is released.
2016
The message sent to the `output-channel` after release is constructed by using a `MessageGroupProcessor`.
21-
By default, the message is a `Collection<?>` of the two payloads, and the headers are merged by using a
22-
`DefaultAggregatingMessageGroupProcessor`.
17+
By default, the message is a `Collection<?>` of the two payloads, and the headers are merged by using a `DefaultAggregatingMessageGroupProcessor`.
2318

24-
CAUTION: If the `trigger()` method is invoked first (or after the main thread times out), it is suspended for up to `timeout` waiting for the suspending message to arrive.
19+
CAUTION: If the `trigger()` method is invoked first (or after the main thread times out), it is suspended for up to `triggerTimeout` waiting for the suspending message to arrive.
2520
If you do not want to suspend the trigger thread, consider handing off to a `TaskExecutor` instead so that its thread is suspended instead.
2621

22+
NOTE: Prior version 5.4, there was only one `timeout` option for both request and trigger messages, but in some use-case it is better to have different timeouts for those actions.
23+
Therefore `requestTimeout` and `triggerTimeout` options have been introduced.
24+
2725
The `requires-reply` property determines the action to take if the suspended thread times out before the trigger message arrives.
28-
By default, it is `false`, which means the endpoint returns `null`, the flow ends, and the thread returns to the
29-
caller.
26+
By default, it is `false`, which means the endpoint returns `null`, the flow ends, and the thread returns to the caller.
3027
When `true`, a `ReplyRequiredException` is thrown.
3128

3229
You can call the `trigger()` method programmatically (obtain the bean reference by using the name, `barrier.handler` -- where `barrier` is the bean name of the barrier endpoint).

src/reference/asciidoc/whats-new.adoc

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,8 @@ See <<./aggregator.adoc#aggregator-expiring-groups, Aggregator Expiring Groups>>
5858

5959
The legacy metrics that were replaced by Micrometer meters have been removed.
6060

61+
The <<./barrier.adoc#barrier,Thread Barrier>> has now two separate timeout options: `requestTimeout` and `triggerTimeout`.
62+
6163
[[x5.4-tcp]]
6264
=== TCP Changes
6365

0 commit comments

Comments
 (0)