Skip to content

Commit 87c8e47

Browse files
authored
GH-3192: pub-sub DSL for broker-backed channels (#3193)
* GH-3192: pub-sub DSL for broker-backed channels Fixes #3192 * Introduce a `BroadcastCapableChannel` abstract to indicate those `SubscribableChannel` implementations which can provide a pub-sub functionality * Implement a `BroadcastCapableChannel` in broker-baked channels with pub-sub option * Introduce a `BaseIntegrationFlowDefinition.publishSubscribeChannel()` based on the `BroadcastCapableChannel` and `BroadcastPublishSubscribeSpec` to let to configure sub-flow subscribers in fluent manner * * Add some JavaDocs and document new feature * * Show the channel bean definition in the doc * Fix typo
1 parent 7dbdbde commit 87c8e47

File tree

11 files changed

+265
-63
lines changed

11 files changed

+265
-63
lines changed

spring-integration-amqp/src/main/java/org/springframework/integration/amqp/channel/PublishSubscribeAmqpChannel.java

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2019 the original author or authors.
2+
* Copyright 2002-2020 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -25,22 +25,25 @@
2525
import org.springframework.amqp.core.Queue;
2626
import org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer;
2727
import org.springframework.integration.amqp.support.AmqpHeaderMapper;
28+
import org.springframework.integration.channel.BroadcastCapableChannel;
2829
import org.springframework.integration.dispatcher.AbstractDispatcher;
2930
import org.springframework.integration.dispatcher.BroadcastingDispatcher;
3031

3132
/**
33+
* The {@link AbstractSubscribableAmqpChannel} extension for pub-sub semantics based on the {@link FanoutExchange}.
34+
*
3235
* @author Mark Fisher
3336
* @author Gary Russell
3437
* @author Artem Bilan
3538
*
3639
* @since 2.1
3740
*/
38-
public class PublishSubscribeAmqpChannel extends AbstractSubscribableAmqpChannel {
39-
40-
private volatile FanoutExchange exchange;
41+
public class PublishSubscribeAmqpChannel extends AbstractSubscribableAmqpChannel implements BroadcastCapableChannel {
4142

4243
private final Queue queue = new AnonymousQueue();
4344

45+
private volatile FanoutExchange exchange;
46+
4447
private volatile Binding binding;
4548

4649
/**
@@ -53,6 +56,7 @@ public class PublishSubscribeAmqpChannel extends AbstractSubscribableAmqpChannel
5356
*/
5457
public PublishSubscribeAmqpChannel(String channelName, AbstractMessageListenerContainer container,
5558
AmqpTemplate amqpTemplate) {
59+
5660
super(channelName, container, amqpTemplate, true);
5761
}
5862

@@ -69,6 +73,7 @@ public PublishSubscribeAmqpChannel(String channelName, AbstractMessageListenerCo
6973
*/
7074
public PublishSubscribeAmqpChannel(String channelName, AbstractMessageListenerContainer container,
7175
AmqpTemplate amqpTemplate, AmqpHeaderMapper outboundMapper, AmqpHeaderMapper inboundMapper) {
76+
7277
super(channelName, container, amqpTemplate, true, outboundMapper, inboundMapper);
7378
}
7479

@@ -104,7 +109,7 @@ protected String obtainQueueName(String channelName) {
104109
@Override
105110
protected AbstractDispatcher createDispatcher() {
106111
BroadcastingDispatcher broadcastingDispatcher = new BroadcastingDispatcher(true);
107-
broadcastingDispatcher.setBeanFactory(this.getBeanFactory());
112+
broadcastingDispatcher.setBeanFactory(getBeanFactory());
108113
return broadcastingDispatcher;
109114
}
110115

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
/*
2+
* Copyright 2020 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.integration.channel;
18+
19+
import org.springframework.messaging.SubscribableChannel;
20+
21+
/**
22+
* A {@link SubscribableChannel} variant for implementations with broadcasting capabilities.
23+
*
24+
* @author Artem Bilan
25+
*
26+
* @since 5.3
27+
*/
28+
public interface BroadcastCapableChannel extends SubscribableChannel {
29+
30+
/**
31+
* Return a state of this channel in regards of broadcasting capabilities.
32+
* @return the state of this channel in regards of broadcasting capabilities.
33+
*/
34+
default boolean isBroadcast() {
35+
return true;
36+
}
37+
38+
}

spring-integration-core/src/main/java/org/springframework/integration/channel/PublishSubscribeChannel.java

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@
3535
* @author Gary Russell
3636
* @author Artem Bilan
3737
*/
38-
public class PublishSubscribeChannel extends AbstractExecutorChannel {
38+
public class PublishSubscribeChannel extends AbstractExecutorChannel implements BroadcastCapableChannel {
3939

4040
private ErrorHandler errorHandler;
4141

@@ -45,6 +45,14 @@ public class PublishSubscribeChannel extends AbstractExecutorChannel {
4545

4646
private int minSubscribers;
4747

48+
/**
49+
* Create a PublishSubscribeChannel that will invoke the handlers in the
50+
* message sender's thread.
51+
*/
52+
public PublishSubscribeChannel() {
53+
this(null);
54+
}
55+
4856
/**
4957
* Create a PublishSubscribeChannel that will use an {@link Executor}
5058
* to invoke the handlers. If this is null, each invocation will occur in
@@ -56,14 +64,6 @@ public PublishSubscribeChannel(@Nullable Executor executor) {
5664
this.dispatcher = new BroadcastingDispatcher(executor);
5765
}
5866

59-
/**
60-
* Create a PublishSubscribeChannel that will invoke the handlers in the
61-
* message sender's thread.
62-
*/
63-
public PublishSubscribeChannel() {
64-
this(null);
65-
}
66-
6767

6868
@Override
6969
public String getComponentType() {
@@ -135,7 +135,7 @@ public void setMinSubscribers(int minSubscribers) {
135135
@Override
136136
public final void onInit() {
137137
super.onInit();
138-
BeanFactory beanFactory = this.getBeanFactory();
138+
BeanFactory beanFactory = getBeanFactory();
139139
BroadcastingDispatcher dispatcherToUse = getDispatcher();
140140
if (this.executor != null) {
141141
Assert.state(dispatcherToUse.getHandlerCount() == 0,

spring-integration-core/src/main/java/org/springframework/integration/dsl/BaseIntegrationFlowDefinition.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import org.springframework.expression.Expression;
3636
import org.springframework.expression.spel.standard.SpelExpressionParser;
3737
import org.springframework.integration.aggregator.AggregatingMessageHandler;
38+
import org.springframework.integration.channel.BroadcastCapableChannel;
3839
import org.springframework.integration.channel.DirectChannel;
3940
import org.springframework.integration.channel.FixedSubscriberChannel;
4041
import org.springframework.integration.channel.FluxMessageChannel;
@@ -294,6 +295,25 @@ public B publishSubscribeChannel(Executor executor,
294295
return addComponents(spec.getComponentsToRegister()).channel(spec);
295296
}
296297

298+
/**
299+
* The {@link BroadcastCapableChannel} {@link #channel}
300+
* method specific implementation to allow the use of the 'subflow' subscriber capability.
301+
* @param broadcastCapableChannel the {@link BroadcastCapableChannel} to subscriber sub-flows to.
302+
* @param publishSubscribeChannelConfigurer the {@link Consumer} to specify
303+
* {@link BroadcastPublishSubscribeSpec} 'subflow' definitions.
304+
* @return the current {@link BaseIntegrationFlowDefinition}.
305+
* @since 5.3
306+
*/
307+
public B publishSubscribeChannel(BroadcastCapableChannel broadcastCapableChannel,
308+
Consumer<BroadcastPublishSubscribeSpec> publishSubscribeChannelConfigurer) {
309+
310+
Assert.notNull(publishSubscribeChannelConfigurer, "'publishSubscribeChannelConfigurer' must not be null");
311+
BroadcastPublishSubscribeSpec spec = new BroadcastPublishSubscribeSpec(broadcastCapableChannel);
312+
publishSubscribeChannelConfigurer.accept(spec);
313+
return addComponents(spec.getComponentsToRegister())
314+
.channel(broadcastCapableChannel);
315+
}
316+
297317
/**
298318
* Populate the {@code Wire Tap} EI Pattern specific
299319
* {@link org.springframework.messaging.support.ChannelInterceptor} implementation
Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
/*
2+
* Copyright 2020 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.integration.dsl;
18+
19+
import java.util.LinkedHashMap;
20+
import java.util.Map;
21+
22+
import org.springframework.integration.channel.BroadcastCapableChannel;
23+
import org.springframework.messaging.MessageChannel;
24+
import org.springframework.util.Assert;
25+
26+
/**
27+
* An {@link IntegrationComponentSpec} for configuring sub-flow subscribers on the
28+
* provided {@link BroadcastCapableChannel}.
29+
*
30+
* @author Artem Bilan
31+
* @author Gary Russell
32+
*
33+
* @since 5.3
34+
*/
35+
public class BroadcastPublishSubscribeSpec
36+
extends IntegrationComponentSpec<BroadcastPublishSubscribeSpec, BroadcastCapableChannel>
37+
implements ComponentsRegistration {
38+
39+
private final Map<Object, String> subscriberFlows = new LinkedHashMap<>();
40+
41+
private int order;
42+
43+
protected BroadcastPublishSubscribeSpec(BroadcastCapableChannel channel) {
44+
Assert.state(channel.isBroadcast(),
45+
() -> "the " + channel +
46+
" must be in the 'broadcast' state for using from this 'BroadcastPublishSubscribeSpec'");
47+
this.target = channel;
48+
}
49+
50+
/**
51+
* Configure a {@link IntegrationFlow} to configure as a subscriber
52+
* for the current {@link BroadcastCapableChannel}.
53+
* @param subFlow the {@link IntegrationFlow} to configure as a subscriber
54+
* for the current {@link BroadcastCapableChannel}.
55+
* @return the current spec
56+
*/
57+
public BroadcastPublishSubscribeSpec subscribe(IntegrationFlow subFlow) {
58+
Assert.notNull(subFlow, "'subFlow' must not be null");
59+
60+
IntegrationFlowBuilder flowBuilder =
61+
IntegrationFlows.from(this.target)
62+
.bridge(consumer -> consumer.order(this.order++));
63+
64+
MessageChannel subFlowInput = subFlow.getInputChannel();
65+
66+
if (subFlowInput == null) {
67+
subFlow.configure(flowBuilder);
68+
}
69+
else {
70+
flowBuilder.channel(subFlowInput);
71+
}
72+
this.subscriberFlows.put(flowBuilder.get(), null);
73+
return _this();
74+
}
75+
76+
@Override
77+
public Map<Object, String> getComponentsToRegister() {
78+
return this.subscriberFlows;
79+
}
80+
81+
}

spring-integration-core/src/main/java/org/springframework/integration/dsl/PublishSubscribeSpec.java

Lines changed: 8 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -21,26 +21,27 @@
2121
import java.util.concurrent.Executor;
2222

2323
import org.springframework.lang.Nullable;
24-
import org.springframework.messaging.MessageChannel;
25-
import org.springframework.util.Assert;
2624

2725
/**
26+
* The {@link PublishSubscribeChannelSpec} extension to configure as a general flow callback for sub-flows
27+
* as subscribers.
28+
*
2829
* @author Artem Bilan
2930
* @author Gary Russell
3031
*
3132
* @since 5.0
3233
*/
3334
public class PublishSubscribeSpec extends PublishSubscribeChannelSpec<PublishSubscribeSpec> {
3435

35-
private final Map<Object, String> subscriberFlows = new LinkedHashMap<>();
36-
37-
private int order;
36+
private final BroadcastPublishSubscribeSpec delegate;
3837

3938
protected PublishSubscribeSpec() {
39+
this.delegate = new BroadcastPublishSubscribeSpec(this.channel);
4040
}
4141

4242
protected PublishSubscribeSpec(@Nullable Executor executor) {
4343
super(executor);
44+
this.delegate = new BroadcastPublishSubscribeSpec(this.channel);
4445
}
4546

4647
@Override
@@ -49,29 +50,15 @@ public PublishSubscribeSpec id(String id) { // NOSONAR - not useless, increases
4950
}
5051

5152
public PublishSubscribeSpec subscribe(IntegrationFlow subFlow) {
52-
Assert.notNull(subFlow, "'subFlow' must not be null");
53-
54-
IntegrationFlowBuilder flowBuilder =
55-
IntegrationFlows.from(this.channel)
56-
.bridge(consumer -> consumer.order(this.order++));
57-
58-
MessageChannel subFlowInput = subFlow.getInputChannel();
59-
60-
if (subFlowInput == null) {
61-
subFlow.configure(flowBuilder);
62-
}
63-
else {
64-
flowBuilder.channel(subFlowInput);
65-
}
66-
this.subscriberFlows.put(flowBuilder.get(), null);
53+
this.delegate.subscribe(subFlow);
6754
return _this();
6855
}
6956

7057
@Override
7158
public Map<Object, String> getComponentsToRegister() {
7259
Map<Object, String> objects = new LinkedHashMap<>();
7360
objects.putAll(super.getComponentsToRegister());
74-
objects.putAll(this.subscriberFlows);
61+
objects.putAll(this.delegate.getComponentsToRegister());
7562
return objects;
7663
}
7764

0 commit comments

Comments
 (0)