Skip to content

Commit e422c1d

Browse files
artembilangaryrussell
authored andcommitted
INT-4466: Do not force release groups if no match
JIRA: https://jira.spring.io/browse/INT-4466 When we schedule group for force complete in the `AbstractCorrelatingMessageHandler`, we don't track a group `timestamp` and its `lastModified` before the scheduled task. This way, in the cluster environment, we may schedule several tasks for different messages and the first started may release the group too early. Just because we extract a `MessageGroup` from the store already in the task per se. * Propagate the actual `timestamp` and `lastModified` from group before scheduling task. Compare these value with the actual group metadata in the `processForceRelease()` before performing real `forceRelease()`. This way we restore behavior before fixing memory leak, when we propagated full `MessageGroup` to the scheduled task * Implement `ResequencingMessageHandler.getComponentType()` for consistency * Remove unnecessary overhead with the `volatile` on many `AbstractCorrelatingMessageHandler` properties, which hardly ever can be changed at runtime **Cherry-pick to 5.0.x and 4.3.x**
1 parent 07b20c1 commit e422c1d

File tree

2 files changed

+30
-19
lines changed

2 files changed

+30
-19
lines changed

spring-integration-core/src/main/java/org/springframework/integration/aggregator/AbstractCorrelatingMessageHandler.java

Lines changed: 24 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2017 the original author or authors.
2+
* Copyright 2002-2018 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -96,39 +96,39 @@ public abstract class AbstractCorrelatingMessageHandler extends AbstractMessageP
9696

9797
private final MessageGroupProcessor outputProcessor;
9898

99-
private volatile MessageGroupStore messageStore;
99+
private MessageGroupStore messageStore;
100100

101-
private volatile CorrelationStrategy correlationStrategy;
101+
private CorrelationStrategy correlationStrategy;
102102

103-
private volatile ReleaseStrategy releaseStrategy;
103+
private ReleaseStrategy releaseStrategy;
104104

105-
private volatile MessageChannel discardChannel;
105+
private MessageChannel discardChannel;
106106

107-
private volatile String discardChannelName;
107+
private String discardChannelName;
108108

109109
private boolean sendPartialResultOnExpiry = false;
110110

111-
private volatile boolean sequenceAware = false;
111+
private boolean sequenceAware = false;
112112

113-
private volatile LockRegistry lockRegistry = new DefaultLockRegistry();
113+
private LockRegistry lockRegistry = new DefaultLockRegistry();
114114

115115
private boolean lockRegistrySet = false;
116116

117-
private volatile long minimumTimeoutForEmptyGroups;
117+
private long minimumTimeoutForEmptyGroups;
118118

119-
private volatile boolean releasePartialSequences;
119+
private boolean releasePartialSequences;
120120

121-
private volatile Expression groupTimeoutExpression;
121+
private Expression groupTimeoutExpression;
122122

123-
private volatile List<Advice> forceReleaseAdviceChain;
123+
private List<Advice> forceReleaseAdviceChain;
124124

125125
private MessageGroupProcessor forceReleaseProcessor = new ForceReleaseMessageGroupProcessor();
126126

127127
private EvaluationContext evaluationContext;
128128

129-
private volatile ApplicationEventPublisher applicationEventPublisher;
129+
private ApplicationEventPublisher applicationEventPublisher;
130130

131-
private volatile boolean expireGroupsUponTimeout = true;
131+
private boolean expireGroupsUponTimeout = true;
132132

133133
public AbstractCorrelatingMessageHandler(MessageGroupProcessor processor, MessageGroupStore store,
134134
CorrelationStrategy correlationStrategy, ReleaseStrategy releaseStrategy) {
@@ -415,7 +415,7 @@ protected void handleMessageInternal(Message<?> message) throws Exception {
415415
if (this.releaseStrategy.canRelease(messageGroup)) {
416416
Collection<Message<?>> completedMessages = null;
417417
try {
418-
completedMessages = this.completeGroup(message, correlationKey, messageGroup);
418+
completedMessages = completeGroup(message, correlationKey, messageGroup);
419419
}
420420
finally {
421421
// Always clean up even if there was an exception
@@ -445,13 +445,15 @@ private void scheduleGroupToForceComplete(MessageGroup messageGroup) {
445445
if (groupTimeout != null && groupTimeout >= 0) {
446446
if (groupTimeout > 0) {
447447
final Object groupId = messageGroup.getGroupId();
448+
final long timestamp = messageGroup.getTimestamp();
449+
final long lastModified = messageGroup.getLastModified();
448450
ScheduledFuture<?> scheduledFuture = getTaskScheduler()
449451
.schedule(new Runnable() {
450452

451453
@Override
452454
public void run() {
453455
try {
454-
processForceRelease(groupId);
456+
processForceRelease(groupId, timestamp, lastModified);
455457
}
456458
catch (MessageDeliveryException e) {
457459
if (logger.isWarnEnabled()) {
@@ -480,9 +482,11 @@ private void scheduleGroupToForceComplete(Object groupId) {
480482
scheduleGroupToForceComplete(messageGroup);
481483
}
482484

483-
private void processForceRelease(Object groupId) {
485+
private void processForceRelease(Object groupId, long timestamp, long lastModified) {
484486
MessageGroup messageGroup = this.messageStore.getMessageGroup(groupId);
485-
this.forceReleaseProcessor.processMessageGroup(messageGroup);
487+
if (messageGroup.getTimestamp() == timestamp && messageGroup.getLastModified() == lastModified) {
488+
this.forceReleaseProcessor.processMessageGroup(messageGroup);
489+
}
486490
}
487491

488492
private void discardMessage(Message<?> message) {
@@ -546,9 +550,11 @@ protected void forceComplete(MessageGroup group) {
546550
}
547551
long lastModifiedNow = groupNow.getLastModified();
548552
int groupSize = groupNow.size();
553+
549554
if ((!groupNow.isComplete() || groupSize == 0)
550555
&& group.getLastModified() == lastModifiedNow
551556
&& group.getTimestamp() == groupNow.getTimestamp()) {
557+
552558
if (groupSize > 0) {
553559
if (this.releaseStrategy.canRelease(groupNow)) {
554560
completeGroup(correlationKey, groupNow);

spring-integration-core/src/main/java/org/springframework/integration/aggregator/ResequencingMessageHandler.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2016 the original author or authors.
2+
* Copyright 2002-2018 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -65,6 +65,11 @@ public final void setExpireGroupsUponTimeout(boolean expireGroupsUponTimeout) {
6565
super.setExpireGroupsUponTimeout(expireGroupsUponTimeout);
6666
}
6767

68+
@Override
69+
public String getComponentType() {
70+
return "resequencer";
71+
}
72+
6873
@Override
6974
protected boolean shouldCopyRequestHeaders() {
7075
return false;

0 commit comments

Comments
 (0)