Skip to content

Commit 7450432

Browse files
artembilangaryrussell
authored andcommitted
INT-4434: Allow to use sub-flows from beans
JIRA: https://jira.spring.io/browse/INT-4434 There were a restriction introduced since Java DSL `1.2` do not use `IntegrationFlow` beans for sub-flow definitions, e.g. in routers. It is considered as regression by community because it worked before in version `1.1` * Introduce `IntegrationFlow.getInputChannel()` to be able to bridge from the main flow to the flow which is treated as sub-flow. In most cases we talk about an independent bean for the `IntegrationFlow` which can be used as a stand along one and as a sub-flow in other flow **Cherry-pick to 5.0.x** Add JavaDocs to the `EndpointSpec.obtainInputChannelFromFlow()`
1 parent b1a3ca7 commit 7450432

File tree

16 files changed

+158
-109
lines changed

16 files changed

+158
-109
lines changed

spring-integration-core/src/main/java/org/springframework/integration/dsl/AbstractRouterSpec.java

Lines changed: 2 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2017 the original author or authors.
2+
* Copyright 2016-2018 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -16,10 +16,8 @@
1616

1717
package org.springframework.integration.dsl;
1818

19-
import org.springframework.integration.channel.DirectChannel;
2019
import org.springframework.integration.router.AbstractMessageRouter;
2120
import org.springframework.messaging.MessageChannel;
22-
import org.springframework.util.Assert;
2321

2422
/**
2523
* A {@link MessageHandlerSpec} for {@link AbstractMessageRouter}s.
@@ -64,7 +62,6 @@ public S applySequence(boolean applySequence) {
6462
* Specify a {@link MessageChannel} bean name as a default output from the router.
6563
* @param channelName the {@link MessageChannel} bean name.
6664
* @return the router spec.
67-
* @since 1.2
6865
* @see AbstractMessageRouter#setDefaultOutputChannelName(String)
6966
*/
7067
public S defaultOutputChannel(String channelName) {
@@ -76,7 +73,6 @@ public S defaultOutputChannel(String channelName) {
7673
* Specify a {@link MessageChannel} as a default output from the router.
7774
* @param channel the {@link MessageChannel} to use.
7875
* @return the router spec.
79-
* @since 1.2
8076
* @see AbstractMessageRouter#setDefaultOutputChannel(MessageChannel)
8177
*/
8278
public S defaultOutputChannel(MessageChannel channel) {
@@ -88,25 +84,16 @@ public S defaultOutputChannel(MessageChannel channel) {
8884
* Specify an {@link IntegrationFlow} as an output from the router when no any other mapping has matched.
8985
* @param subFlow the {@link IntegrationFlow} for default mapping.
9086
* @return the router spec.
91-
* @since 1.2
9287
*/
9388
public S defaultSubFlowMapping(IntegrationFlow subFlow) {
94-
Assert.notNull(subFlow, "'subFlow' must not be null");
95-
DirectChannel channel = new DirectChannel();
96-
IntegrationFlowBuilder flowBuilder = IntegrationFlows.from(channel);
97-
subFlow.configure(flowBuilder);
98-
99-
this.componentsToRegister.put(flowBuilder, null);
100-
101-
return defaultOutputChannel(channel);
89+
return defaultOutputChannel(obtainInputChannelFromFlow(subFlow, false));
10290
}
10391

10492
/**
10593
* Make a default output mapping of the router to the parent flow.
10694
* Use the next, after router, parent flow {@link MessageChannel} as a
10795
* {@link AbstractMessageRouter#setDefaultOutputChannel(MessageChannel)} of this router.
10896
* @return the router spec.
109-
* @since 1.2
11097
*/
11198
public S defaultOutputToParentFlow() {
11299
this.defaultToParentFlow = true;

spring-integration-core/src/main/java/org/springframework/integration/dsl/EndpointSpec.java

Lines changed: 41 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2017 the original author or authors.
2+
* Copyright 2016-2018 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -23,8 +23,10 @@
2323
import org.springframework.beans.factory.BeanNameAware;
2424
import org.springframework.context.SmartLifecycle;
2525
import org.springframework.core.ResolvableType;
26+
import org.springframework.integration.channel.DirectChannel;
2627
import org.springframework.integration.endpoint.AbstractPollingEndpoint;
2728
import org.springframework.integration.scheduling.PollerMetadata;
29+
import org.springframework.messaging.MessageChannel;
2830
import org.springframework.messaging.MessageHandler;
2931
import org.springframework.util.Assert;
3032

@@ -141,4 +143,42 @@ protected void assertHandler() {
141143
Assert.state(this.handler != null, "'this.handler' must not be null.");
142144
}
143145

146+
/**
147+
* Try to get a {@link MessageChannel} as an input for the provided {@link IntegrationFlow}
148+
* or create one and wrap the provided flow to a new one.
149+
* @param subFlow the {@link IntegrationFlow} to extract input channel.
150+
* @return the input channel of the flow of create one
151+
* @since 5.0.4
152+
*/
153+
protected MessageChannel obtainInputChannelFromFlow(IntegrationFlow subFlow) {
154+
return obtainInputChannelFromFlow(subFlow, true);
155+
}
156+
157+
/**
158+
* Try to get a {@link MessageChannel} as an input for the provided {@link IntegrationFlow}
159+
* or create one and wrap the provided flow to a new one.
160+
* @param subFlow the {@link IntegrationFlow} to extract input channel.
161+
* @param evaluateInternalBuilder true if an internal {@link IntegrationFlowDefinition} should be
162+
* evaluated to an {@link IntegrationFlow} component or left as a builder in the {@link #componentsToRegister}
163+
* for future use-case. For example the builder is used for router configurations to retain beans
164+
* registration order for parent-child dependencies.
165+
* @return the input channel of the flow of create one
166+
* @since 5.0.4
167+
*/
168+
protected MessageChannel obtainInputChannelFromFlow(IntegrationFlow subFlow, boolean evaluateInternalBuilder) {
169+
Assert.notNull(subFlow, "'subFlow' must not be null");
170+
MessageChannel messageChannel = subFlow.getInputChannel();
171+
if (messageChannel == null) {
172+
messageChannel = new DirectChannel();
173+
IntegrationFlowDefinition<?> flowBuilder = IntegrationFlows.from(messageChannel);
174+
subFlow.configure(flowBuilder);
175+
this.componentsToRegister.put(evaluateInternalBuilder ? flowBuilder.get() : flowBuilder, null);
176+
}
177+
else {
178+
this.componentsToRegister.put(subFlow, null);
179+
}
180+
181+
return messageChannel;
182+
}
183+
144184
}

spring-integration-core/src/main/java/org/springframework/integration/dsl/EnricherSpec.java

Lines changed: 5 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
import java.util.function.Function;
2222

2323
import org.springframework.expression.Expression;
24-
import org.springframework.integration.channel.DirectChannel;
2524
import org.springframework.integration.config.ConsumerEndpointFactoryBean;
2625
import org.springframework.integration.expression.FunctionExpression;
2726
import org.springframework.integration.expression.ValueExpression;
@@ -48,10 +47,9 @@
4847
*/
4948
public class EnricherSpec extends ConsumerEndpointSpec<EnricherSpec, ContentEnricher> {
5049

51-
private final Map<String, Expression> propertyExpressions = new HashMap<String, Expression>();
50+
private final Map<String, Expression> propertyExpressions = new HashMap<>();
5251

53-
private final Map<String, HeaderValueMessageProcessor<?>> headerExpressions =
54-
new HashMap<String, HeaderValueMessageProcessor<?>>();
52+
private final Map<String, HeaderValueMessageProcessor<?>> headerExpressions = new HashMap<>();
5553

5654
EnricherSpec() {
5755
super(new ContentEnricher());
@@ -167,15 +165,7 @@ public <P> EnricherSpec requestPayload(Function<Message<P>, ?> requestPayloadFun
167165
* @return the enricher spec
168166
*/
169167
public EnricherSpec requestSubFlow(IntegrationFlow subFlow) {
170-
Assert.notNull(subFlow, "'subFlow' must not be null");
171-
172-
DirectChannel requestChannel = new DirectChannel();
173-
IntegrationFlowBuilder flowBuilder = IntegrationFlows.from(requestChannel);
174-
subFlow.configure(flowBuilder);
175-
176-
this.componentsToRegister.put(flowBuilder.get(), null);
177-
178-
return requestChannel(requestChannel);
168+
return requestChannel(obtainInputChannelFromFlow(subFlow));
179169
}
180170

181171
/**
@@ -196,7 +186,7 @@ public EnricherSpec shouldClonePayload(boolean shouldClonePayload) {
196186
* @see ContentEnricher#setPropertyExpressions(Map)
197187
*/
198188
public <V> EnricherSpec property(String key, V value) {
199-
this.propertyExpressions.put(key, new ValueExpression<V>(value));
189+
this.propertyExpressions.put(key, new ValueExpression<>(value));
200190
return _this();
201191
}
202192

@@ -234,7 +224,7 @@ public <P> EnricherSpec propertyFunction(String key, Function<Message<P>, Object
234224
* @see ContentEnricher#setHeaderExpressions(Map)
235225
*/
236226
public <V> EnricherSpec header(String name, V value) {
237-
return this.header(name, value, null);
227+
return header(name, value, null);
238228
}
239229

240230
/**

spring-integration-core/src/main/java/org/springframework/integration/dsl/FilterEndpointSpec.java

Lines changed: 2 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2017 the original author or authors.
2+
* Copyright 2016-2018 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -16,10 +16,8 @@
1616

1717
package org.springframework.integration.dsl;
1818

19-
import org.springframework.integration.channel.DirectChannel;
2019
import org.springframework.integration.filter.MessageFilter;
2120
import org.springframework.messaging.MessageChannel;
22-
import org.springframework.util.Assert;
2321

2422
/**
2523
* A {@link ConsumerEndpointSpec} implementation for the {@link MessageFilter}.
@@ -87,12 +85,7 @@ public FilterEndpointSpec discardChannel(String discardChannelName) {
8785
* @return the endpoint spec.
8886
*/
8987
public FilterEndpointSpec discardFlow(IntegrationFlow discardFlow) {
90-
Assert.notNull(discardFlow, "'discardFlow' must not be null");
91-
DirectChannel channel = new DirectChannel();
92-
IntegrationFlowBuilder flowBuilder = IntegrationFlows.from(channel);
93-
discardFlow.configure(flowBuilder);
94-
this.componentsToRegister.put(flowBuilder.get(), null);
95-
return discardChannel(channel);
88+
return discardChannel(obtainInputChannelFromFlow(discardFlow));
9689
}
9790

9891
/**

spring-integration-core/src/main/java/org/springframework/integration/dsl/IntegrationFlow.java

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2017 the original author or authors.
2+
* Copyright 2016-2018 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -16,6 +16,8 @@
1616

1717
package org.springframework.integration.dsl;
1818

19+
import org.springframework.messaging.MessageChannel;
20+
1921
/**
2022
* The main Integration DSL abstraction.
2123
* <p>
@@ -80,4 +82,14 @@ public interface IntegrationFlow {
8082
*/
8183
void configure(IntegrationFlowDefinition<?> flow);
8284

85+
/**
86+
* Return the first {@link MessageChannel} component
87+
* which is essential a flow input channel.
88+
* @return the channel.
89+
* @since 5.0.4
90+
*/
91+
default MessageChannel getInputChannel() {
92+
return null;
93+
}
94+
8395
}

spring-integration-core/src/main/java/org/springframework/integration/dsl/IntegrationFlowAdapter.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,12 @@ public final void configure(IntegrationFlowDefinition<?> flow) {
6868
this.targetIntegrationFlow = flow.get();
6969
}
7070

71+
@Override
72+
public MessageChannel getInputChannel() {
73+
assertTargetIntegrationFlow();
74+
return this.targetIntegrationFlow.getInputChannel();
75+
}
76+
7177
@Override
7278
public void start() {
7379
assertTargetIntegrationFlow();

spring-integration-core/src/main/java/org/springframework/integration/dsl/IntegrationFlowBeanPostProcessor.java

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -74,11 +74,6 @@ public void setBeanFactory(BeanFactory beanFactory) throws BeansException {
7474
this.beanFactory = (ConfigurableListableBeanFactory) beanFactory;
7575
}
7676

77-
@Override
78-
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
79-
return bean;
80-
}
81-
8277
@Override
8378
public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {
8479
if (bean instanceof StandardIntegrationFlow) {

spring-integration-core/src/main/java/org/springframework/integration/dsl/IntegrationFlowDefinition.java

Lines changed: 22 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -334,13 +334,27 @@ public B wireTap(MessageChannel wireTapChannel) {
334334
* @return the current {@link IntegrationFlowDefinition}.
335335
*/
336336
public B wireTap(IntegrationFlow flow, Consumer<WireTapSpec> wireTapConfigurer) {
337-
DirectChannel wireTapChannel = new DirectChannel();
338-
IntegrationFlowBuilder flowBuilder = IntegrationFlows.from(wireTapChannel);
339-
flow.configure(flowBuilder);
340-
addComponent(flowBuilder.get());
337+
MessageChannel wireTapChannel = obtainInputChannelFromFlow(flow);
338+
341339
return wireTap(wireTapChannel, wireTapConfigurer);
342340
}
343341

342+
private MessageChannel obtainInputChannelFromFlow(IntegrationFlow flow) {
343+
Assert.notNull(flow, "'flow' must not be null");
344+
MessageChannel messageChannel = flow.getInputChannel();
345+
if (messageChannel == null) {
346+
messageChannel = new DirectChannel();
347+
IntegrationFlowDefinition<?> flowBuilder = IntegrationFlows.from(messageChannel);
348+
flow.configure(flowBuilder);
349+
addComponent(flowBuilder.get());
350+
}
351+
else {
352+
addComponent(flow);
353+
}
354+
355+
return messageChannel;
356+
}
357+
344358
/**
345359
* Populate the {@code Wire Tap} EI Pattern specific
346360
* {@link org.springframework.messaging.support.ChannelInterceptor} implementation
@@ -2164,11 +2178,8 @@ public B gateway(IntegrationFlow flow) {
21642178
* @return the current {@link IntegrationFlowDefinition}.
21652179
*/
21662180
public B gateway(IntegrationFlow flow, Consumer<GatewayEndpointSpec> endpointConfigurer) {
2167-
Assert.notNull(flow, "'flow' must not be null");
2168-
final DirectChannel requestChannel = new DirectChannel();
2169-
IntegrationFlowBuilder flowBuilder = IntegrationFlows.from(requestChannel);
2170-
flow.configure(flowBuilder);
2171-
addComponent(flowBuilder.get());
2181+
MessageChannel requestChannel = obtainInputChannelFromFlow(flow);
2182+
21722183
return gateway(requestChannel, endpointConfigurer);
21732184
}
21742185

@@ -2700,9 +2711,9 @@ protected StandardIntegrationFlow get() {
27002711
if (this.integrationFlow == null) {
27012712
if (this.currentMessageChannel instanceof FixedSubscriberChannelPrototype) {
27022713
throw new BeanCreationException("The 'currentMessageChannel' (" + this.currentMessageChannel
2703-
+ ") is a prototype for FixedSubscriberChannel which can't be created without MessageHandler "
2714+
+ ") is a prototype for 'FixedSubscriberChannel' which can't be created without 'MessageHandler' "
27042715
+ "constructor argument. That means that '.fixedSubscriberChannel()' can't be the last "
2705-
+ "EIP-method in the IntegrationFlow definition.");
2716+
+ "EIP-method in the 'IntegrationFlow' definition.");
27062717
}
27072718

27082719
if (this.integrationComponents.size() == 1) {

spring-integration-core/src/main/java/org/springframework/integration/dsl/PublishSubscribeSpec.java

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,9 @@
2020
import java.util.Map;
2121
import java.util.concurrent.Executor;
2222

23+
import org.springframework.messaging.MessageChannel;
24+
import org.springframework.util.Assert;
25+
2326
/**
2427
* @author Artem Bilan
2528
*
@@ -42,11 +45,21 @@ public PublishSubscribeSpec id(String id) {
4245
return super.id(id);
4346
}
4447

45-
public PublishSubscribeSpec subscribe(IntegrationFlow flow) {
48+
public PublishSubscribeSpec subscribe(IntegrationFlow subFlow) {
49+
Assert.notNull(subFlow, "'subFlow' must not be null");
50+
4651
IntegrationFlowBuilder flowBuilder =
4752
IntegrationFlows.from(this.channel)
4853
.bridge();
49-
flow.configure(flowBuilder);
54+
55+
MessageChannel subFlowInput = subFlow.getInputChannel();
56+
57+
if (subFlowInput == null) {
58+
subFlow.configure(flowBuilder);
59+
}
60+
else {
61+
flowBuilder.channel(subFlowInput);
62+
}
5063
this.subscriberFlows.put(flowBuilder.get(), null);
5164
return _this();
5265
}

0 commit comments

Comments
 (0)