Skip to content

Commit b21dc0d

Browse files
artembilangaryrussell
authored andcommitted
Fix IntegrationFlowContext concurrency issue
When we register `IntegrationFlow` s concurrently at runtime, we may end up with the problem when we register the same object with the same bean name, but in different places. Or when we turn off bean overriding, we end up with the exception that bean with the name already registered * Wrap `IntegrationFlow` bean registration in the `StandardIntegrationFlowContext` into the `Lock` when its bean name is generating * Make `StandardIntegrationFlowContext.registry` as `ConcurrentHashMap` to avoid `ConcurrentModificationException` during `put()` and `remove()` * Fix concurrency for beans registration with the generation names in the `IntegrationFlowBeanPostProcessor` using an `IntegrationFlow` id as a prefix for uniqueness. **Cherry-pick to 5.0.x** Fix generated bean name in the WebFluxDslTests Use only single `Lock` in the `StandardIntegrationFlowContext`: we don't need a fully blown `LockRegistry` there anymore since we have only one synchronization block there and it is always around the same type * Add `What's New` note, and mention changes in the `dsl.adoc` Minor doc polishing.
1 parent a750a78 commit b21dc0d

File tree

6 files changed

+160
-45
lines changed

6 files changed

+160
-45
lines changed

spring-integration-core/src/main/java/org/springframework/integration/dsl/IntegrationFlowBeanPostProcessor.java

Lines changed: 11 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -123,21 +123,18 @@ private Object processStandardIntegrationFlow(StandardIntegrationFlow flow, Stri
123123
String id = endpointSpec.getId();
124124

125125
if (id == null) {
126-
id = generateBeanName(endpoint, entry.getValue());
126+
id = generateBeanName(endpoint, flowNamePrefix, entry.getValue());
127127
}
128128

129129
Collection<?> messageHandlers =
130130
this.beanFactory.getBeansOfType(messageHandler.getClass(), false, false)
131131
.values();
132132

133133
if (!messageHandlers.contains(messageHandler)) {
134-
String handlerBeanName = generateBeanName(messageHandler);
135-
String[] handlerAlias = new String[] { id + IntegrationConfigUtils.HANDLER_ALIAS_SUFFIX };
134+
String handlerBeanName = generateBeanName(messageHandler, flowNamePrefix);
136135

137136
registerComponent(messageHandler, handlerBeanName, flowBeanName);
138-
for (String alias : handlerAlias) {
139-
this.beanFactory.registerAlias(handlerBeanName, alias);
140-
}
137+
this.beanFactory.registerAlias(handlerBeanName, id + IntegrationConfigUtils.HANDLER_ALIAS_SUFFIX);
141138
}
142139

143140
registerComponent(endpoint, id, flowBeanName);
@@ -187,12 +184,13 @@ else if (component instanceof SourcePollingChannelAdapterSpec) {
187184
.values()
188185
.contains(o.getKey()))
189186
.forEach(o ->
190-
registerComponent(o.getKey(), generateBeanName(o.getKey(), o.getValue())));
187+
registerComponent(o.getKey(),
188+
generateBeanName(o.getKey(), flowNamePrefix, o.getValue())));
191189
}
192190
SourcePollingChannelAdapterFactoryBean pollingChannelAdapterFactoryBean = spec.get().getT1();
193191
String id = spec.getId();
194192
if (!StringUtils.hasText(id)) {
195-
id = generateBeanName(pollingChannelAdapterFactoryBean, entry.getValue());
193+
id = generateBeanName(pollingChannelAdapterFactoryBean, flowNamePrefix, entry.getValue());
196194
}
197195
registerComponent(pollingChannelAdapterFactoryBean, id, flowBeanName);
198196
targetIntegrationComponents.put(pollingChannelAdapterFactoryBean, id);
@@ -238,7 +236,7 @@ else if (component instanceof AnnotationGatewayProxyFactoryBean) {
238236
targetIntegrationComponents.put(component, gatewayId);
239237
}
240238
else {
241-
String generatedBeanName = generateBeanName(component, entry.getValue());
239+
String generatedBeanName = generateBeanName(component, flowNamePrefix, entry.getValue());
242240
registerComponent(component, generatedBeanName, flowBeanName);
243241
targetIntegrationComponents.put(component, generatedBeanName);
244242
}
@@ -335,19 +333,19 @@ private void registerComponent(Object component, String beanName, String parentN
335333
this.beanFactory.getBean(beanName);
336334
}
337335

338-
private String generateBeanName(Object instance) {
339-
return generateBeanName(instance, null);
336+
private String generateBeanName(Object instance, String prefix) {
337+
return generateBeanName(instance, prefix, null);
340338
}
341339

342-
private String generateBeanName(Object instance, String fallbackId) {
340+
private String generateBeanName(Object instance, String prefix, String fallbackId) {
343341
if (instance instanceof NamedComponent && ((NamedComponent) instance).getComponentName() != null) {
344342
return ((NamedComponent) instance).getComponentName();
345343
}
346344
else if (fallbackId != null) {
347345
return fallbackId;
348346
}
349347

350-
String generatedBeanName = instance.getClass().getName();
348+
String generatedBeanName = prefix + instance.getClass().getName();
351349
String id = generatedBeanName;
352350
int counter = -1;
353351
while (counter == -1 || this.beanFactory.containsBean(id)) {

spring-integration-core/src/main/java/org/springframework/integration/dsl/context/StandardIntegrationFlowContext.java

Lines changed: 40 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,9 @@
2020
import java.util.Collections;
2121
import java.util.HashMap;
2222
import java.util.Map;
23+
import java.util.concurrent.ConcurrentHashMap;
24+
import java.util.concurrent.locks.Lock;
25+
import java.util.concurrent.locks.ReentrantLock;
2326

2427
import org.springframework.beans.BeansException;
2528
import org.springframework.beans.factory.BeanFactory;
@@ -46,7 +49,9 @@
4649
*/
4750
public final class StandardIntegrationFlowContext implements IntegrationFlowContext, BeanFactoryAware {
4851

49-
private final Map<String, IntegrationFlowRegistration> registry = new HashMap<>();
52+
private final Map<String, IntegrationFlowRegistration> registry = new ConcurrentHashMap<>();
53+
54+
private final Lock registerFlowsLock = new ReentrantLock();
5055

5156
private ConfigurableListableBeanFactory beanFactory;
5257

@@ -76,17 +81,29 @@ public StandardIntegrationFlowRegistrationBuilder registration(IntegrationFlow i
7681
private void register(StandardIntegrationFlowRegistrationBuilder builder) {
7782
IntegrationFlow integrationFlow = builder.integrationFlowRegistration.getIntegrationFlow();
7883
String flowId = builder.integrationFlowRegistration.getId();
79-
if (flowId == null) {
80-
flowId = generateBeanName(integrationFlow, null);
81-
builder.id(flowId);
84+
Lock registerBeanLock = null;
85+
try {
86+
if (flowId == null) {
87+
registerBeanLock = this.registerFlowsLock;
88+
registerBeanLock.lock();
89+
flowId = generateBeanName(integrationFlow, null);
90+
builder.id(flowId);
91+
}
92+
else if (this.registry.containsKey(flowId)) {
93+
throw new IllegalArgumentException("An IntegrationFlow '" + this.registry.get(flowId) +
94+
"' with flowId '" + flowId + "' is already registered.\n" +
95+
"An existing IntegrationFlowRegistration must be destroyed before overriding.");
96+
}
97+
98+
integrationFlow = (IntegrationFlow) registerBean(integrationFlow, flowId, null);
8299
}
83-
else if (this.registry.containsKey(flowId)) {
84-
throw new IllegalArgumentException("An IntegrationFlow '" + this.registry.get(flowId) +
85-
"' with flowId '" + flowId + "' is already registered.\n" +
86-
"An existing IntegrationFlowRegistration must be destroyed before overriding.");
100+
finally {
101+
if (registerBeanLock != null) {
102+
registerBeanLock.unlock();
103+
}
87104
}
88-
IntegrationFlow theFlow = (IntegrationFlow) registerBean(integrationFlow, flowId, null);
89-
builder.integrationFlowRegistration.setIntegrationFlow(theFlow);
105+
106+
builder.integrationFlowRegistration.setIntegrationFlow(integrationFlow);
90107

91108
final String theFlowId = flowId;
92109
builder.additionalBeans.forEach((key, value) -> registerBean(key, value, theFlowId));
@@ -133,19 +150,26 @@ public IntegrationFlowRegistration getRegistrationById(String flowId) {
133150
* @param flowId the bean name to destroy from
134151
*/
135152
@Override
136-
public synchronized void remove(String flowId) {
153+
public void remove(String flowId) {
137154
if (this.registry.containsKey(flowId)) {
138155
IntegrationFlowRegistration flowRegistration = this.registry.remove(flowId);
139156
flowRegistration.stop();
140157

141-
Arrays.stream(this.beanFactory.getDependentBeans(flowId))
142-
.forEach(((BeanDefinitionRegistry) this.beanFactory)::removeBeanDefinition);
158+
BeanDefinitionRegistry beanDefinitionRegistry = (BeanDefinitionRegistry) this.beanFactory;
143159

144-
((BeanDefinitionRegistry) this.beanFactory).removeBeanDefinition(flowId);
160+
Arrays.stream(this.beanFactory.getDependentBeans(flowId))
161+
.forEach(beanName -> {
162+
beanDefinitionRegistry.removeBeanDefinition(beanName);
163+
// TODO until https://jira.spring.io/browse/SPR-16837
164+
Arrays.asList(beanDefinitionRegistry.getAliases(beanName))
165+
.forEach(beanDefinitionRegistry::removeAlias);
166+
});
167+
168+
beanDefinitionRegistry.removeBeanDefinition(flowId);
145169
}
146170
else {
147-
throw new IllegalStateException("Only manually registered IntegrationFlows can be removed. "
148-
+ "But [" + flowId + "] ins't one of them.");
171+
throw new IllegalStateException("An IntegrationFlow with the id "
172+
+ "[" + flowId + "] doesn't exist in the registry.");
149173
}
150174
}
151175

0 commit comments

Comments
 (0)