Skip to content

Commit d0603a8

Browse files
committed
Fix IntegrationFlowContext concurrency issue
When we register `IntegrationFlow` s concurrently at runtime, we may end up with the problem when we register the same object with the same bean name, but in different places. Or when we turn off bean overriding, we end up with the exception that bean with the name already registered * Wrap `IntegrationFlow` bean registration in the `StandardIntegrationFlowContext` into the `Lock` when its bean name is generating * Make `StandardIntegrationFlowContext.registry` as `ConcurrentHashMap` to avoid `ConcurrentModificationException` during `put()` and `remove()` * Fix concurrency for beans registration with the generation names in the `IntegrationFlowBeanPostProcessor` using an `IntegrationFlow` id as a prefix for uniqueness. **Cherry-pick to 5.0.x** Fix generated bean name in the WebFluxDslTests Use only single `Lock` in the `StandardIntegrationFlowContext`: we don't need a fully blown `LockRegistry` there anymore since we have only one synchronization block there and it is always around the same type * Add `What's New` note, and mention changes in the `dsl.adoc` Minor doc polishing. # Conflicts: # spring-integration-core/src/main/java/org/springframework/integration/dsl/context/StandardIntegrationFlowContext.java # spring-integration-core/src/test/java/org/springframework/integration/dsl/manualflow/ManualFlowTests.java # src/reference/asciidoc/dsl.adoc # src/reference/asciidoc/whats-new.adoc
1 parent 566b4b8 commit d0603a8

File tree

6 files changed

+162
-45
lines changed

6 files changed

+162
-45
lines changed

spring-integration-core/src/main/java/org/springframework/integration/config/dsl/IntegrationFlowBeanPostProcessor.java

Lines changed: 11 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -128,21 +128,18 @@ private Object processStandardIntegrationFlow(StandardIntegrationFlow flow, Stri
128128
String id = endpointSpec.getId();
129129

130130
if (id == null) {
131-
id = generateBeanName(endpoint, entry.getValue());
131+
id = generateBeanName(endpoint, flowNamePrefix, entry.getValue());
132132
}
133133

134134
Collection<?> messageHandlers =
135135
this.beanFactory.getBeansOfType(messageHandler.getClass(), false, false)
136136
.values();
137137

138138
if (!messageHandlers.contains(messageHandler)) {
139-
String handlerBeanName = generateBeanName(messageHandler);
140-
String[] handlerAlias = new String[] { id + IntegrationConfigUtils.HANDLER_ALIAS_SUFFIX };
139+
String handlerBeanName = generateBeanName(messageHandler, flowNamePrefix);
141140

142141
registerComponent(messageHandler, handlerBeanName, flowBeanName);
143-
for (String alias : handlerAlias) {
144-
this.beanFactory.registerAlias(handlerBeanName, alias);
145-
}
142+
this.beanFactory.registerAlias(handlerBeanName, id + IntegrationConfigUtils.HANDLER_ALIAS_SUFFIX);
146143
}
147144

148145
registerComponent(endpoint, id, flowBeanName);
@@ -192,12 +189,13 @@ else if (component instanceof SourcePollingChannelAdapterSpec) {
192189
.values()
193190
.contains(o.getKey()))
194191
.forEach(o ->
195-
registerComponent(o.getKey(), generateBeanName(o.getKey(), o.getValue())));
192+
registerComponent(o.getKey(),
193+
generateBeanName(o.getKey(), flowNamePrefix, o.getValue())));
196194
}
197195
SourcePollingChannelAdapterFactoryBean pollingChannelAdapterFactoryBean = spec.get().getT1();
198196
String id = spec.getId();
199197
if (!StringUtils.hasText(id)) {
200-
id = generateBeanName(pollingChannelAdapterFactoryBean, entry.getValue());
198+
id = generateBeanName(pollingChannelAdapterFactoryBean, flowNamePrefix, entry.getValue());
201199
}
202200
registerComponent(pollingChannelAdapterFactoryBean, id, flowBeanName);
203201
targetIntegrationComponents.put(pollingChannelAdapterFactoryBean, id);
@@ -243,7 +241,7 @@ else if (component instanceof AnnotationGatewayProxyFactoryBean) {
243241
targetIntegrationComponents.put(component, gatewayId);
244242
}
245243
else {
246-
String generatedBeanName = generateBeanName(component, entry.getValue());
244+
String generatedBeanName = generateBeanName(component, flowNamePrefix, entry.getValue());
247245
registerComponent(component, generatedBeanName, flowBeanName);
248246
targetIntegrationComponents.put(component, generatedBeanName);
249247
}
@@ -307,19 +305,19 @@ private void registerComponent(Object component, String beanName, String parentN
307305
this.beanFactory.getBean(beanName);
308306
}
309307

310-
private String generateBeanName(Object instance) {
311-
return generateBeanName(instance, null);
308+
private String generateBeanName(Object instance, String prefix) {
309+
return generateBeanName(instance, prefix, null);
312310
}
313311

314-
private String generateBeanName(Object instance, String fallbackId) {
312+
private String generateBeanName(Object instance, String prefix, String fallbackId) {
315313
if (instance instanceof NamedComponent && ((NamedComponent) instance).getComponentName() != null) {
316314
return ((NamedComponent) instance).getComponentName();
317315
}
318316
else if (fallbackId != null) {
319317
return fallbackId;
320318
}
321319

322-
String generatedBeanName = instance.getClass().getName();
320+
String generatedBeanName = prefix + instance.getClass().getName();
323321
String id = generatedBeanName;
324322
int counter = -1;
325323
while (counter == -1 || this.beanFactory.containsBean(id)) {

spring-integration-core/src/main/java/org/springframework/integration/dsl/context/IntegrationFlowContext.java

Lines changed: 41 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,9 @@
2020
import java.util.Collections;
2121
import java.util.HashMap;
2222
import java.util.Map;
23+
import java.util.concurrent.ConcurrentHashMap;
24+
import java.util.concurrent.locks.Lock;
25+
import java.util.concurrent.locks.ReentrantLock;
2326

2427
import org.springframework.beans.BeansException;
2528
import org.springframework.beans.factory.BeanFactory;
@@ -65,7 +68,9 @@
6568
*/
6669
public final class IntegrationFlowContext implements BeanFactoryAware {
6770

68-
private final Map<String, IntegrationFlowRegistration> registry = new HashMap<>();
71+
private final Map<String, IntegrationFlowRegistration> registry = new ConcurrentHashMap<>();
72+
73+
private final Lock registerFlowsLock = new ReentrantLock();
6974

7075
private ConfigurableListableBeanFactory beanFactory;
7176

@@ -91,20 +96,33 @@ public IntegrationFlowRegistrationBuilder registration(IntegrationFlow integrati
9196
return new IntegrationFlowRegistrationBuilder(integrationFlow);
9297
}
9398

99+
94100
private void register(IntegrationFlowRegistrationBuilder builder) {
95101
IntegrationFlow integrationFlow = builder.integrationFlowRegistration.getIntegrationFlow();
96102
String flowId = builder.integrationFlowRegistration.getId();
97-
if (flowId == null) {
98-
flowId = generateBeanName(integrationFlow, null);
99-
builder.id(flowId);
103+
Lock registerBeanLock = null;
104+
try {
105+
if (flowId == null) {
106+
registerBeanLock = this.registerFlowsLock;
107+
registerBeanLock.lock();
108+
flowId = generateBeanName(integrationFlow, null);
109+
builder.id(flowId);
110+
}
111+
else if (this.registry.containsKey(flowId)) {
112+
throw new IllegalArgumentException("An IntegrationFlow '" + this.registry.get(flowId) +
113+
"' with flowId '" + flowId + "' is already registered.\n" +
114+
"An existing IntegrationFlowRegistration must be destroyed before overriding.");
115+
}
116+
117+
integrationFlow = (IntegrationFlow) registerBean(integrationFlow, flowId, null);
100118
}
101-
else if (this.registry.containsKey(flowId)) {
102-
throw new IllegalArgumentException("An IntegrationFlow '" + this.registry.get(flowId) +
103-
"' with flowId '" + flowId + "' is already registered.\n" +
104-
"An existing IntegrationFlowRegistration must be destroyed before overriding.");
119+
finally {
120+
if (registerBeanLock != null) {
121+
registerBeanLock.unlock();
122+
}
105123
}
106-
IntegrationFlow theFlow = (IntegrationFlow) registerBean(integrationFlow, flowId, null);
107-
builder.integrationFlowRegistration.setIntegrationFlow(theFlow);
124+
125+
builder.integrationFlowRegistration.setIntegrationFlow(integrationFlow);
108126

109127
final String theFlowId = flowId;
110128
builder.additionalBeans.forEach((key, value) -> registerBean(key, value, theFlowId));
@@ -149,19 +167,26 @@ public IntegrationFlowRegistration getRegistrationById(String flowId) {
149167
* for provided {@code flowId} and clean up all the local cache for it.
150168
* @param flowId the bean name to destroy from
151169
*/
152-
public synchronized void remove(String flowId) {
170+
public void remove(String flowId) {
153171
if (this.registry.containsKey(flowId)) {
154172
IntegrationFlowRegistration flowRegistration = this.registry.remove(flowId);
155173
flowRegistration.stop();
156174

157-
Arrays.stream(this.beanFactory.getDependentBeans(flowId))
158-
.forEach(((BeanDefinitionRegistry) this.beanFactory)::removeBeanDefinition);
175+
BeanDefinitionRegistry beanDefinitionRegistry = (BeanDefinitionRegistry) this.beanFactory;
159176

160-
((BeanDefinitionRegistry) this.beanFactory).removeBeanDefinition(flowId);
177+
Arrays.stream(this.beanFactory.getDependentBeans(flowId))
178+
.forEach(beanName -> {
179+
beanDefinitionRegistry.removeBeanDefinition(beanName);
180+
// TODO until https://jira.spring.io/browse/SPR-16837
181+
Arrays.asList(beanDefinitionRegistry.getAliases(beanName))
182+
.forEach(beanDefinitionRegistry::removeAlias);
183+
});
184+
185+
beanDefinitionRegistry.removeBeanDefinition(flowId);
161186
}
162187
else {
163-
throw new IllegalStateException("Only manually registered IntegrationFlows can be removed. "
164-
+ "But [" + flowId + "] ins't one of them.");
188+
throw new IllegalStateException("An IntegrationFlow with the id "
189+
+ "[" + flowId + "] doesn't exist in the registry.");
165190
}
166191
}
167192

0 commit comments

Comments
 (0)