Skip to content

Commit c0a3262

Browse files
committed
Introduce IntegrationPattern abstraction
* Add `IntegrationPattern` contract to implement on the target components which represent particular EIP * Add `IntegrationPatternType` with an internal `IntegrationPatternCategory` to return from the component implementing `IntegrationPattern` * Parse `IntegrationPatternType` in the `IntegrationNode` for potential use on the UI for drawing a particular icon * More pattern representations * Clean up Checkstyle * Fix JavaDocs * Add `integrationPatternCategory` assertion into the `IntegrationGraphServerTests` * Add more IntegrationPattern implementations * Provide some delegation and overriding logic whenever we have components wrapping * Fix unused imports * Add `inbound_gateway` pattern indicator * Add conditional on `expectReply` to indicate a component as an `IntegrationPatternType.outbound_channel_adapter` or `IntegrationPatternType.outbound_gateway` * Make some code clean up in affected classes * Add a `gateway` type for `@MessagingGateway` * Comment the reason for `outbound_gateway` type in the `AbstractReplyProducingMessageHandler` * Bump `IntegrationGraphServer.GRAPH_VERSION` * Add new attributes into graph sample in the `graph.adoc` * Document an `IntegrationPattern` * Apply changes for version 5.3 * Rebased into `5.3-WIP` * Add a `whats-new.adoc` note about an `IntegrationPattern`
1 parent 9dc3519 commit c0a3262

File tree

45 files changed

+667
-110
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

45 files changed

+667
-110
lines changed

spring-integration-amqp/src/main/java/org/springframework/integration/amqp/outbound/AmqpOutboundEndpoint.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import org.springframework.amqp.rabbit.core.RabbitTemplate.ReturnCallback;
3131
import org.springframework.amqp.support.converter.MessageConverter;
3232
import org.springframework.context.Lifecycle;
33+
import org.springframework.integration.IntegrationPatternType;
3334
import org.springframework.integration.MessageTimeoutException;
3435
import org.springframework.integration.amqp.support.MappingUtils;
3536
import org.springframework.integration.support.AbstractIntegrationMessageBuilder;
@@ -96,6 +97,10 @@ public String getComponentType() {
9697
return this.expectReply ? "amqp:outbound-gateway" : "amqp:outbound-channel-adapter";
9798
}
9899

100+
@Override
101+
public IntegrationPatternType getIntegrationPatternType() {
102+
return this.expectReply ? super.getIntegrationPatternType() : IntegrationPatternType.outbound_channel_adapter;
103+
}
99104

100105
@Override
101106
public RabbitTemplate getRabbitTemplate() {
@@ -168,6 +173,7 @@ private void waitForConfirm(Message<?> requestMessage, CorrelationData correlati
168173

169174
private void send(String exchangeName, String routingKey,
170175
final Message<?> requestMessage, CorrelationData correlationData) {
176+
171177
if (this.rabbitTemplate != null) {
172178
MessageConverter converter = this.rabbitTemplate.getMessageConverter();
173179
org.springframework.amqp.core.Message amqpMessage = MappingUtils.mapMessage(requestMessage, converter,
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
/*
2+
* Copyright 2019 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.integration;
18+
19+
/**
20+
* Indicates that a component implements some Enterprise Integration Pattern.
21+
*
22+
* @author Artem Bilan
23+
*
24+
* @since 5.3
25+
*
26+
* @see IntegrationPatternType
27+
* @see <a href="https://www.enterpriseintegrationpatterns.com/patterns/messaging">EIP official site</a>
28+
*/
29+
public interface IntegrationPattern {
30+
31+
/**
32+
* Return a pattern type this component implements.
33+
* @return the {@link IntegrationPatternType} this component implements.
34+
*/
35+
IntegrationPatternType getIntegrationPatternType();
36+
37+
}
Lines changed: 161 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,161 @@
1+
/*
2+
* Copyright 2019 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.integration;
18+
19+
import java.util.Arrays;
20+
import java.util.Set;
21+
import java.util.stream.Collectors;
22+
23+
/**
24+
* The Enterprise Integration Pattern types.
25+
* Used to indicate which pattern a target component implements.
26+
*
27+
* @author Artem Bilan
28+
*
29+
* @since 5.3
30+
*/
31+
public enum IntegrationPatternType {
32+
33+
message_channel(IntegrationPatternCategory.messaging_channel),
34+
35+
publish_subscribe_channel(IntegrationPatternCategory.messaging_channel),
36+
37+
executor_channel(IntegrationPatternCategory.messaging_channel),
38+
39+
pollable_channel(IntegrationPatternCategory.messaging_channel),
40+
41+
reactive_channel(IntegrationPatternCategory.messaging_channel),
42+
43+
null_channel(IntegrationPatternCategory.messaging_channel),
44+
45+
bridge(IntegrationPatternCategory.messaging_endpoint),
46+
47+
service_activator(IntegrationPatternCategory.messaging_endpoint),
48+
49+
outbound_channel_adapter(IntegrationPatternCategory.messaging_endpoint),
50+
51+
inbound_channel_adapter(IntegrationPatternCategory.messaging_endpoint),
52+
53+
outbound_gateway(IntegrationPatternCategory.messaging_endpoint),
54+
55+
inbound_gateway(IntegrationPatternCategory.messaging_endpoint),
56+
57+
gateway(IntegrationPatternCategory.messaging_endpoint),
58+
59+
splitter(IntegrationPatternCategory.message_routing),
60+
61+
transformer(IntegrationPatternCategory.message_transformation),
62+
63+
header_enricher(IntegrationPatternCategory.message_transformation),
64+
65+
filter(IntegrationPatternCategory.message_routing),
66+
67+
content_enricher(IntegrationPatternCategory.message_transformation),
68+
69+
header_filter(IntegrationPatternCategory.message_transformation),
70+
71+
claim_check_in(IntegrationPatternCategory.message_transformation),
72+
73+
claim_check_out(IntegrationPatternCategory.message_transformation),
74+
75+
aggregator(IntegrationPatternCategory.message_routing),
76+
77+
resequencer(IntegrationPatternCategory.message_routing),
78+
79+
barrier(IntegrationPatternCategory.message_routing),
80+
81+
chain(IntegrationPatternCategory.message_routing),
82+
83+
scatter_gather(IntegrationPatternCategory.message_routing),
84+
85+
delayer(IntegrationPatternCategory.message_routing),
86+
87+
control_bus(IntegrationPatternCategory.system_management),
88+
89+
router(IntegrationPatternCategory.message_routing),
90+
91+
recipient_list_router(IntegrationPatternCategory.message_routing);
92+
93+
94+
private final IntegrationPatternCategory patternCategory;
95+
96+
IntegrationPatternType(IntegrationPatternCategory patternCategory) {
97+
this.patternCategory = patternCategory;
98+
}
99+
100+
public IntegrationPatternCategory getPatternCategory() {
101+
return this.patternCategory;
102+
}
103+
104+
/**
105+
* The Enterprise Integration Pattern categories.
106+
* Used to indicate which pattern category a target component belongs.
107+
*/
108+
public enum IntegrationPatternCategory {
109+
110+
messaging_channel(
111+
message_channel,
112+
publish_subscribe_channel,
113+
executor_channel,
114+
pollable_channel,
115+
reactive_channel,
116+
null_channel),
117+
118+
messaging_endpoint(
119+
service_activator,
120+
outbound_channel_adapter,
121+
inbound_channel_adapter,
122+
outbound_gateway,
123+
inbound_gateway,
124+
gateway,
125+
bridge),
126+
127+
message_routing(
128+
splitter,
129+
filter,
130+
aggregator,
131+
resequencer,
132+
barrier,
133+
chain,
134+
scatter_gather,
135+
delayer,
136+
router,
137+
recipient_list_router),
138+
139+
message_transformation(
140+
transformer,
141+
header_enricher,
142+
content_enricher,
143+
header_filter,
144+
claim_check_in,
145+
claim_check_out),
146+
147+
system_management(control_bus);
148+
149+
private final IntegrationPatternType[] patternTypes;
150+
151+
IntegrationPatternCategory(IntegrationPatternType... patternTypes) {
152+
this.patternTypes = patternTypes;
153+
}
154+
155+
public Set<IntegrationPatternType> getPatternTypes() {
156+
return Arrays.stream(this.patternTypes).collect(Collectors.toSet());
157+
}
158+
159+
}
160+
161+
}

spring-integration-core/src/main/java/org/springframework/integration/aggregator/AggregatingMessageHandler.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
import java.util.Collection;
2020

21+
import org.springframework.integration.IntegrationPatternType;
2122
import org.springframework.integration.store.MessageGroup;
2223
import org.springframework.integration.store.MessageGroupStore;
2324
import org.springframework.integration.store.SimpleMessageStore;
@@ -40,6 +41,7 @@ public class AggregatingMessageHandler extends AbstractCorrelatingMessageHandler
4041

4142
public AggregatingMessageHandler(MessageGroupProcessor processor, MessageGroupStore store,
4243
CorrelationStrategy correlationStrategy, ReleaseStrategy releaseStrategy) {
44+
4345
super(processor, store, correlationStrategy, releaseStrategy);
4446
}
4547

@@ -53,15 +55,18 @@ public AggregatingMessageHandler(MessageGroupProcessor processor) {
5355

5456
/**
5557
* Will set the 'expireGroupsUponCompletion' flag.
56-
*
5758
* @param expireGroupsUponCompletion true when groups should be expired on completion.
58-
*
5959
* @see #afterRelease
6060
*/
6161
public void setExpireGroupsUponCompletion(boolean expireGroupsUponCompletion) {
6262
this.expireGroupsUponCompletion = expireGroupsUponCompletion;
6363
}
6464

65+
@Override
66+
public IntegrationPatternType getIntegrationPatternType() {
67+
return IntegrationPatternType.aggregator;
68+
}
69+
6570
@Override
6671
protected boolean isExpireGroupsUponCompletion() {
6772
return this.expireGroupsUponCompletion;

spring-integration-core/src/main/java/org/springframework/integration/aggregator/BarrierMessageHandler.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import java.util.concurrent.TimeUnit;
2323

2424
import org.springframework.integration.IntegrationMessageHeaderAccessor;
25+
import org.springframework.integration.IntegrationPatternType;
2526
import org.springframework.integration.handler.AbstractReplyProducingMessageHandler;
2627
import org.springframework.integration.handler.DiscardingMessageHandler;
2728
import org.springframework.integration.handler.MessageTriggerAction;
@@ -149,6 +150,11 @@ public String getComponentType() {
149150
return "barrier";
150151
}
151152

153+
@Override
154+
public IntegrationPatternType getIntegrationPatternType() {
155+
return IntegrationPatternType.barrier;
156+
}
157+
152158
@Override
153159
protected Object handleRequestMessage(Message<?> requestMessage) {
154160
Object key = this.correlationStrategy.getCorrelationKey(requestMessage);

spring-integration-core/src/main/java/org/springframework/integration/aggregator/FluxAggregatorMessageHandler.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323

2424
import org.springframework.context.Lifecycle;
2525
import org.springframework.integration.IntegrationMessageHeaderAccessor;
26+
import org.springframework.integration.IntegrationPatternType;
2627
import org.springframework.integration.channel.ReactiveStreamsSubscribableChannel;
2728
import org.springframework.integration.handler.AbstractMessageProducingHandler;
2829
import org.springframework.messaging.Message;
@@ -216,6 +217,16 @@ public void setWindowConfigurer(Function<Flux<Message<?>>, Flux<Flux<Message<?>>
216217
this.windowConfigurer = windowConfigurer;
217218
}
218219

220+
@Override
221+
public String getComponentType() {
222+
return "flux-aggregator";
223+
}
224+
225+
@Override
226+
public IntegrationPatternType getIntegrationPatternType() {
227+
return IntegrationPatternType.aggregator;
228+
}
229+
219230
@Override
220231
public void start() {
221232
if (this.subscribed.compareAndSet(false, true)) {

spring-integration-core/src/main/java/org/springframework/integration/aggregator/ResequencingMessageHandler.java

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
import java.util.Collection;
2020

21+
import org.springframework.integration.IntegrationPatternType;
2122
import org.springframework.integration.store.MessageGroup;
2223
import org.springframework.integration.store.MessageGroupStore;
2324
import org.springframework.integration.store.SimpleMessageStore;
@@ -35,16 +36,15 @@
3536
*/
3637
public class ResequencingMessageHandler extends AbstractCorrelatingMessageHandler {
3738

38-
public ResequencingMessageHandler(MessageGroupProcessor processor,
39-
MessageGroupStore store, CorrelationStrategy correlationStrategy,
40-
ReleaseStrategy releaseStrategy) {
39+
public ResequencingMessageHandler(MessageGroupProcessor processor, MessageGroupStore store,
40+
CorrelationStrategy correlationStrategy, ReleaseStrategy releaseStrategy) {
41+
4142
super(processor, store, correlationStrategy, releaseStrategy);
4243
this.setExpireGroupsUponTimeout(false);
4344
}
4445

4546

46-
public ResequencingMessageHandler(MessageGroupProcessor processor,
47-
MessageGroupStore store) {
47+
public ResequencingMessageHandler(MessageGroupProcessor processor, MessageGroupStore store) {
4848
super(processor, store);
4949
this.setExpireGroupsUponTimeout(false);
5050
}
@@ -56,10 +56,8 @@ public ResequencingMessageHandler(MessageGroupProcessor processor) {
5656
}
5757

5858
/**
59-
* {@inheritDoc}
60-
*
61-
* (overridden to false for a resequencer so late messages are immediately discarded rather
62-
* than waiting for the next timeout)
59+
* Overridden to false for a resequencer so late messages are immediately discarded rather
60+
* than waiting for the next timeout
6361
*/
6462
@Override
6563
public final void setExpireGroupsUponTimeout(boolean expireGroupsUponTimeout) {
@@ -71,6 +69,11 @@ public String getComponentType() {
7169
return "resequencer";
7270
}
7371

72+
@Override
73+
public IntegrationPatternType getIntegrationPatternType() {
74+
return IntegrationPatternType.resequencer;
75+
}
76+
7477
@Override
7578
protected boolean shouldCopyRequestHeaders() {
7679
return false;

spring-integration-core/src/main/java/org/springframework/integration/channel/AbstractExecutorChannel.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import java.util.List;
2323
import java.util.concurrent.Executor;
2424

25+
import org.springframework.integration.IntegrationPatternType;
2526
import org.springframework.integration.dispatcher.AbstractDispatcher;
2627
import org.springframework.integration.support.MessagingExceptionWrapper;
2728
import org.springframework.lang.Nullable;
@@ -128,6 +129,11 @@ public boolean hasExecutorInterceptors() {
128129
return this.executorInterceptorsSize > 0;
129130
}
130131

132+
@Override
133+
public IntegrationPatternType getIntegrationPatternType() {
134+
return IntegrationPatternType.executor_channel;
135+
}
136+
131137
protected class MessageHandlingTask implements Runnable {
132138

133139
private final MessageHandlingRunnable delegate;

0 commit comments

Comments
 (0)