Skip to content

Commit 929505e

Browse files
artembilangaryrussell
authored andcommitted
INT-4466: Do not force release groups if no match
JIRA: https://jira.spring.io/browse/INT-4466 When we schedule group for force complete in the `AbstractCorrelatingMessageHandler`, we don't track a group `timestamp` and its `lastModified` before the scheduled task. This way, in the cluster environment, we may schedule several tasks for different messages and the first started may release the group too early. Just because we extract a `MessageGroup` from the store already in the task per se. * Propagate the actual `timestamp` and `lastModified` from group before scheduling task. Compare these value with the actual group metadata in the `processForceRelease()` before performing real `forceRelease()`. This way we restore behavior before fixing memory leak, when we propagated full `MessageGroup` to the scheduled task * Implement `ResequencingMessageHandler.getComponentType()` for consistency * Remove unnecessary overhead with the `volatile` on many `AbstractCorrelatingMessageHandler` properties, which hardly ever can be changed at runtime **Cherry-pick to 5.0.x and 4.3.x**
1 parent eced157 commit 929505e

File tree

2 files changed

+32
-20
lines changed

2 files changed

+32
-20
lines changed

spring-integration-core/src/main/java/org/springframework/integration/aggregator/AbstractCorrelatingMessageHandler.java

Lines changed: 26 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2017 the original author or authors.
2+
* Copyright 2002-2018 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -88,6 +88,7 @@
8888
* @author David Liu
8989
* @author Enrique Rodriguez
9090
* @author Meherzad Lahewala
91+
*
9192
* @since 2.0
9293
*/
9394
public abstract class AbstractCorrelatingMessageHandler extends AbstractMessageProducingHandler
@@ -103,41 +104,41 @@ public abstract class AbstractCorrelatingMessageHandler extends AbstractMessageP
103104

104105
private MessageGroupProcessor outputProcessor;
105106

106-
private volatile MessageGroupStore messageStore;
107+
private MessageGroupStore messageStore;
107108

108-
private volatile CorrelationStrategy correlationStrategy;
109+
private CorrelationStrategy correlationStrategy;
109110

110-
private volatile ReleaseStrategy releaseStrategy;
111+
private ReleaseStrategy releaseStrategy;
111112

112-
private volatile boolean releaseStrategySet;
113+
private boolean releaseStrategySet;
113114

114-
private volatile MessageChannel discardChannel;
115+
private MessageChannel discardChannel;
115116

116-
private volatile String discardChannelName;
117+
private String discardChannelName;
117118

118119
private boolean sendPartialResultOnExpiry = false;
119120

120-
private volatile boolean sequenceAware = false;
121+
private boolean sequenceAware = false;
121122

122-
private volatile LockRegistry lockRegistry = new DefaultLockRegistry();
123+
private LockRegistry lockRegistry = new DefaultLockRegistry();
123124

124125
private boolean lockRegistrySet = false;
125126

126-
private volatile long minimumTimeoutForEmptyGroups;
127+
private long minimumTimeoutForEmptyGroups;
127128

128-
private volatile boolean releasePartialSequences;
129+
private boolean releasePartialSequences;
129130

130-
private volatile Expression groupTimeoutExpression;
131+
private Expression groupTimeoutExpression;
131132

132-
private volatile List<Advice> forceReleaseAdviceChain;
133+
private List<Advice> forceReleaseAdviceChain;
133134

134135
private MessageGroupProcessor forceReleaseProcessor = new ForceReleaseMessageGroupProcessor();
135136

136137
private EvaluationContext evaluationContext;
137138

138-
private volatile ApplicationEventPublisher applicationEventPublisher;
139+
private ApplicationEventPublisher applicationEventPublisher;
139140

140-
private volatile boolean expireGroupsUponTimeout = true;
141+
private boolean expireGroupsUponTimeout = true;
141142

142143
private volatile boolean running;
143144

@@ -436,7 +437,7 @@ protected void handleMessageInternal(Message<?> message) throws Exception {
436437
if (this.releaseStrategy.canRelease(messageGroup)) {
437438
Collection<Message<?>> completedMessages = null;
438439
try {
439-
completedMessages = this.completeGroup(message, correlationKey, messageGroup);
440+
completedMessages = completeGroup(message, correlationKey, messageGroup);
440441
}
441442
finally {
442443
// Possible clean (implementation dependency) up
@@ -520,10 +521,12 @@ private void scheduleGroupToForceComplete(MessageGroup messageGroup) {
520521
if (groupTimeout != null && groupTimeout >= 0) {
521522
if (groupTimeout > 0) {
522523
final Object groupId = messageGroup.getGroupId();
524+
final long timestamp = messageGroup.getTimestamp();
525+
final long lastModified = messageGroup.getLastModified();
523526
ScheduledFuture<?> scheduledFuture = getTaskScheduler()
524527
.schedule(() -> {
525528
try {
526-
processForceRelease(groupId);
529+
processForceRelease(groupId, timestamp, lastModified);
527530
}
528531
catch (MessageDeliveryException e) {
529532
if (AbstractCorrelatingMessageHandler.this.logger.isWarnEnabled()) {
@@ -550,9 +553,11 @@ private void scheduleGroupToForceComplete(Object groupId) {
550553
scheduleGroupToForceComplete(messageGroup);
551554
}
552555

553-
private void processForceRelease(Object groupId) {
556+
private void processForceRelease(Object groupId, long timestamp, long lastModified) {
554557
MessageGroup messageGroup = this.messageStore.getMessageGroup(groupId);
555-
this.forceReleaseProcessor.processMessageGroup(messageGroup);
558+
if (messageGroup.getTimestamp() == timestamp && messageGroup.getLastModified() == lastModified) {
559+
this.forceReleaseProcessor.processMessageGroup(messageGroup);
560+
}
556561
}
557562

558563
private void discardMessage(Message<?> message) {
@@ -616,9 +621,11 @@ protected void forceComplete(MessageGroup group) {
616621
}
617622
long lastModifiedNow = groupNow.getLastModified();
618623
int groupSize = groupNow.size();
624+
619625
if ((!groupNow.isComplete() || groupSize == 0)
620626
&& group.getLastModified() == lastModifiedNow
621627
&& group.getTimestamp() == groupNow.getTimestamp()) {
628+
622629
if (groupSize > 0) {
623630
if (this.releaseStrategy.canRelease(groupNow)) {
624631
completeGroup(correlationKey, groupNow);

spring-integration-core/src/main/java/org/springframework/integration/aggregator/ResequencingMessageHandler.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2017 the original author or authors.
2+
* Copyright 2002-2018 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -66,6 +66,11 @@ public final void setExpireGroupsUponTimeout(boolean expireGroupsUponTimeout) {
6666
super.setExpireGroupsUponTimeout(expireGroupsUponTimeout);
6767
}
6868

69+
@Override
70+
public String getComponentType() {
71+
return "resequencer";
72+
}
73+
6974
@Override
7075
protected boolean shouldCopyRequestHeaders() {
7176
return false;

0 commit comments

Comments
 (0)