Skip to content

Commit d75dd09

Browse files
authored
GH-3194: Add generics to Amqp and Jms channel impls
Fixes #3194 For now we keep the hierarchy, but try to generalize channel specs a bit, so that ones who use `publishSubscribeChannel` can count on `BroadcastCapableChannel` as interface to utilize.
1 parent a8430c1 commit d75dd09

File tree

9 files changed

+51
-30
lines changed

9 files changed

+51
-30
lines changed

spring-integration-amqp/src/main/java/org/springframework/integration/amqp/dsl/Amqp.java

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
2323
import org.springframework.amqp.rabbit.listener.DirectMessageListenerContainer;
2424
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
25+
import org.springframework.integration.amqp.channel.PollableAmqpChannel;
2526
import org.springframework.integration.amqp.inbound.AmqpMessageSource.AmqpAckCallbackFactory;
2627
import org.springframework.lang.Nullable;
2728

@@ -30,6 +31,7 @@
3031
*
3132
* @author Artem Bilan
3233
* @author Gary Russell
34+
* @author Artem Vozhdayenko
3335
*
3436
* @since 5.0
3537
*/
@@ -269,7 +271,7 @@ public static AmqpAsyncOutboundGatewaySpec asyncOutboundGateway(AsyncRabbitTempl
269271
* @param connectionFactory the connectionFactory.
270272
* @return the AmqpPollableMessageChannelSpec.
271273
*/
272-
public static AmqpPollableMessageChannelSpec<?> pollableChannel(ConnectionFactory connectionFactory) {
274+
public static AmqpPollableMessageChannelSpec<?, PollableAmqpChannel> pollableChannel(ConnectionFactory connectionFactory) {
273275
return pollableChannel(null, connectionFactory);
274276
}
275277

@@ -279,19 +281,19 @@ public static AmqpPollableMessageChannelSpec<?> pollableChannel(ConnectionFactor
279281
* @param connectionFactory the connectionFactory.
280282
* @return the AmqpPollableMessageChannelSpec.
281283
*/
282-
public static AmqpPollableMessageChannelSpec<?> pollableChannel(@Nullable String id,
284+
public static AmqpPollableMessageChannelSpec<?, PollableAmqpChannel> pollableChannel(@Nullable String id,
283285
ConnectionFactory connectionFactory) {
284286

285-
return new AmqpPollableMessageChannelSpec<>(connectionFactory)
286-
.id(id);
287+
AmqpPollableMessageChannelSpec<?, PollableAmqpChannel> spec = new AmqpPollableMessageChannelSpec<>(connectionFactory);
288+
return spec.id(id);
287289
}
288290

289291
/**
290292
* Create an initial AmqpMessageChannelSpec.
291293
* @param connectionFactory the connectionFactory.
292294
* @return the AmqpMessageChannelSpec.
293295
*/
294-
public static AmqpMessageChannelSpec<?> channel(ConnectionFactory connectionFactory) {
296+
public static AmqpMessageChannelSpec<?, ?> channel(ConnectionFactory connectionFactory) {
295297
return channel(null, connectionFactory);
296298
}
297299

@@ -301,7 +303,7 @@ public static AmqpMessageChannelSpec<?> channel(ConnectionFactory connectionFact
301303
* @param connectionFactory the connectionFactory.
302304
* @return the AmqpMessageChannelSpec.
303305
*/
304-
public static AmqpMessageChannelSpec<?> channel(@Nullable String id, ConnectionFactory connectionFactory) {
306+
public static AmqpMessageChannelSpec<?, ?> channel(@Nullable String id, ConnectionFactory connectionFactory) {
305307
return new AmqpMessageChannelSpec<>(connectionFactory)
306308
.id(id);
307309
}

spring-integration-amqp/src/main/java/org/springframework/integration/amqp/dsl/AmqpMessageChannelSpec.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,10 +39,12 @@
3939
*
4040
* @author Artem Bilan
4141
* @author Gary Russell
42+
* @author Artem Vozhdayenko
4243
*
4344
* @since 5.0
4445
*/
45-
public class AmqpMessageChannelSpec<S extends AmqpMessageChannelSpec<S>> extends AmqpPollableMessageChannelSpec<S> {
46+
public class AmqpMessageChannelSpec<S extends AmqpMessageChannelSpec<S, T>, T extends AbstractAmqpChannel>
47+
extends AmqpPollableMessageChannelSpec<S, T> {
4648

4749
protected final List<Advice> adviceChain = new LinkedList<>(); // NOSONAR
4850

@@ -215,7 +217,7 @@ public S batchSize(int batchSize) {
215217
}
216218

217219
@Override
218-
protected AbstractAmqpChannel doGet() {
220+
protected T doGet() {
219221
this.amqpChannelFactoryBean.setAdviceChain(this.adviceChain.toArray(new Advice[0]));
220222
return super.doGet();
221223
}

spring-integration-amqp/src/main/java/org/springframework/integration/amqp/dsl/AmqpPollableMessageChannelSpec.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -36,11 +36,12 @@
3636
*
3737
* @author Artem Bilan
3838
* @author Gary Russell
39+
* @author Artem Vozhdayenko
3940
*
4041
* @since 5.0
4142
*/
42-
public class AmqpPollableMessageChannelSpec<S extends AmqpPollableMessageChannelSpec<S>>
43-
extends MessageChannelSpec<S, AbstractAmqpChannel> {
43+
public class AmqpPollableMessageChannelSpec<S extends AmqpPollableMessageChannelSpec<S, T>, T extends AbstractAmqpChannel>
44+
extends MessageChannelSpec<S, T> {
4445

4546
protected final AmqpChannelFactoryBean amqpChannelFactoryBean; // NOSONAR final
4647

@@ -206,10 +207,11 @@ public S headersMappedLast(boolean headersLast) {
206207
}
207208

208209
@Override
209-
protected AbstractAmqpChannel doGet() {
210+
@SuppressWarnings("unchecked")
211+
protected T doGet() {
210212
Assert.notNull(getId(), "The 'id' or 'queueName' must be specified");
211213
try {
212-
this.channel = this.amqpChannelFactoryBean.getObject();
214+
this.channel = (T) this.amqpChannelFactoryBean.getObject();
213215
}
214216
catch (Exception e) {
215217
throw new BeanCreationException("Cannot create the AMQP MessageChannel", e);

spring-integration-amqp/src/main/java/org/springframework/integration/amqp/dsl/AmqpPublishSubscribeMessageChannelSpec.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,16 +18,19 @@
1818

1919
import org.springframework.amqp.core.FanoutExchange;
2020
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
21+
import org.springframework.integration.amqp.channel.PollableAmqpChannel;
2122

2223
/**
2324
* A {@link AmqpMessageChannelSpec} for
2425
* {@link org.springframework.integration.amqp.channel.PublishSubscribeAmqpChannel}s.
2526
*
2627
* @author Artem Bilan
28+
* @author Artem Vozhdayenko
29+
*
2730
* @since 5.0
2831
*/
2932
public class AmqpPublishSubscribeMessageChannelSpec
30-
extends AmqpMessageChannelSpec<AmqpPublishSubscribeMessageChannelSpec> {
33+
extends AmqpMessageChannelSpec<AmqpPublishSubscribeMessageChannelSpec, PollableAmqpChannel> {
3134

3235
protected AmqpPublishSubscribeMessageChannelSpec(ConnectionFactory connectionFactory) {
3336
super(connectionFactory);

spring-integration-jms/src/main/java/org/springframework/integration/jms/dsl/Jms.java

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2014-2019 the original author or authors.
2+
* Copyright 2014-2020 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -19,6 +19,7 @@
1919
import javax.jms.ConnectionFactory;
2020
import javax.jms.Destination;
2121

22+
import org.springframework.integration.jms.PollableJmsChannel;
2223
import org.springframework.jms.core.JmsTemplate;
2324
import org.springframework.jms.listener.AbstractMessageListenerContainer;
2425
import org.springframework.jms.listener.DefaultMessageListenerContainer;
@@ -29,6 +30,7 @@
2930
*
3031
* @author Artem Bilan
3132
* @author Gary Russell
33+
* @author Artem Vozhdayenko
3234
*
3335
* @since 5.0
3436
*/
@@ -39,7 +41,7 @@ public final class Jms {
3941
* @param connectionFactory the JMS ConnectionFactory to build on
4042
* @return the {@link JmsPollableMessageChannelSpec} instance
4143
*/
42-
public static JmsPollableMessageChannelSpec<?> pollableChannel(ConnectionFactory connectionFactory) {
44+
public static JmsPollableMessageChannelSpec<?, PollableJmsChannel> pollableChannel(ConnectionFactory connectionFactory) {
4345
return pollableChannel(null, connectionFactory);
4446
}
4547

@@ -49,18 +51,18 @@ public static JmsPollableMessageChannelSpec<?> pollableChannel(ConnectionFactory
4951
* @param connectionFactory the JMS ConnectionFactory to build on
5052
* @return the {@link JmsPollableMessageChannelSpec} instance
5153
*/
52-
public static JmsPollableMessageChannelSpec<?> pollableChannel(@Nullable String id,
54+
public static JmsPollableMessageChannelSpec<?, PollableJmsChannel> pollableChannel(@Nullable String id,
5355
ConnectionFactory connectionFactory) {
54-
55-
return new JmsPollableMessageChannelSpec<>(connectionFactory).id(id);
56+
JmsPollableMessageChannelSpec<?, PollableJmsChannel> spec = new JmsPollableMessageChannelSpec<>(connectionFactory);
57+
return spec.id(id);
5658
}
5759

5860
/**
5961
* The factory to produce a {@link JmsMessageChannelSpec}.
6062
* @param connectionFactory the JMS ConnectionFactory to build on
6163
* @return the {@link JmsMessageChannelSpec} instance
6264
*/
63-
public static JmsMessageChannelSpec<?> channel(ConnectionFactory connectionFactory) {
65+
public static JmsMessageChannelSpec<?, ?> channel(ConnectionFactory connectionFactory) {
6466
return channel(null, connectionFactory);
6567
}
6668

@@ -70,7 +72,7 @@ public static JmsMessageChannelSpec<?> channel(ConnectionFactory connectionFacto
7072
* @param connectionFactory the JMS ConnectionFactory to build on
7173
* @return the {@link JmsMessageChannelSpec} instance
7274
*/
73-
public static JmsMessageChannelSpec<?> channel(@Nullable String id, ConnectionFactory connectionFactory) {
75+
public static JmsMessageChannelSpec<?, ?> channel(@Nullable String id, ConnectionFactory connectionFactory) {
7476
return new JmsMessageChannelSpec<>(connectionFactory)
7577
.id(id);
7678
}
@@ -180,7 +182,7 @@ public static JmsInboundGatewaySpec<?> inboundGateway(AbstractMessageListenerCon
180182
* @param <C> the {@link AbstractMessageListenerContainer} inheritor type
181183
* @return the {@link JmsInboundGatewaySpec} instance
182184
*/
183-
@SuppressWarnings({ "rawtypes", "unchecked" })
185+
@SuppressWarnings({"rawtypes", "unchecked"})
184186
public static <C extends AbstractMessageListenerContainer>
185187
JmsInboundGatewaySpec.JmsInboundGatewayListenerContainerSpec<?, C> inboundGateway(
186188
ConnectionFactory connectionFactory, Class<C> containerClass) {
@@ -241,7 +243,7 @@ public static JmsMessageDrivenChannelAdapterSpec<?> messageDrivenChannelAdapter(
241243
* @param <C> the {@link AbstractMessageListenerContainer} inheritor type
242244
* @return the {@link JmsMessageDrivenChannelAdapterSpec} instance
243245
*/
244-
@SuppressWarnings({ "rawtypes", "unchecked" })
246+
@SuppressWarnings({"rawtypes", "unchecked"})
245247
public static <C extends AbstractMessageListenerContainer>
246248
JmsMessageDrivenChannelAdapterSpec.JmsMessageDrivenChannelAdapterListenerContainerSpec<?, C>
247249
messageDrivenChannelAdapter(ConnectionFactory connectionFactory, Class<C> containerClass) {

spring-integration-jms/src/main/java/org/springframework/integration/jms/dsl/JmsMessageChannelSpec.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
import javax.jms.ConnectionFactory;
2222

23+
import org.springframework.integration.jms.AbstractJmsChannel;
2324
import org.springframework.integration.jms.config.JmsChannelFactoryBean;
2425
import org.springframework.jms.listener.AbstractMessageListenerContainer;
2526
import org.springframework.transaction.PlatformTransactionManager;
@@ -33,9 +34,12 @@
3334
*
3435
* @author Artem Bilan
3536
* @author Gary Russell
37+
* @author Artem Vozhdayenko
38+
*
3639
* @since 5.0
3740
*/
38-
public class JmsMessageChannelSpec<S extends JmsMessageChannelSpec<S>> extends JmsPollableMessageChannelSpec<S> {
41+
public class JmsMessageChannelSpec<S extends JmsMessageChannelSpec<S, T>, T
42+
extends AbstractJmsChannel> extends JmsPollableMessageChannelSpec<S, T> {
3943

4044
protected JmsMessageChannelSpec(ConnectionFactory connectionFactory) {
4145
super(new JmsChannelFactoryBean(true), connectionFactory);

spring-integration-jms/src/main/java/org/springframework/integration/jms/dsl/JmsPollableMessageChannelSpec.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -35,11 +35,12 @@
3535
*
3636
* @author Artem Bilan
3737
* @author Gary Russell
38+
* @author Artem Vozhdayenko
3839
*
3940
* @since 5.0
4041
*/
41-
public class JmsPollableMessageChannelSpec<S extends JmsPollableMessageChannelSpec<S>>
42-
extends MessageChannelSpec<S, AbstractJmsChannel> {
42+
public class JmsPollableMessageChannelSpec<S extends JmsPollableMessageChannelSpec<S, T>, T extends AbstractJmsChannel>
43+
extends MessageChannelSpec<S, T> {
4344

4445
protected final JmsChannelFactoryBean jmsChannelFactoryBean; // NOSONAR - final
4546

@@ -218,9 +219,10 @@ public S sessionTransacted(boolean sessionTransacted) {
218219
}
219220

220221
@Override
221-
protected AbstractJmsChannel doGet() {
222+
@SuppressWarnings("unchecked")
223+
protected T doGet() {
222224
try {
223-
this.channel = this.jmsChannelFactoryBean.getObject();
225+
this.channel = (T) this.jmsChannelFactoryBean.getObject();
224226
}
225227
catch (Exception e) {
226228
throw new BeanCreationException("Cannot create the JMS MessageChannel", e);

spring-integration-jms/src/main/java/org/springframework/integration/jms/dsl/JmsPublishSubscribeMessageChannelSpec.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,15 +18,19 @@
1818

1919
import javax.jms.ConnectionFactory;
2020

21+
import org.springframework.integration.jms.SubscribableJmsChannel;
22+
2123
/**
2224
* A {@link JmsMessageChannelSpec} for a {@link org.springframework.integration.jms.SubscribableJmsChannel}
2325
* configured with a topic.
2426
*
2527
* @author Artem Bilan
28+
* @author Artem Vozhdayenko
29+
*
2630
* @since 5.0
2731
*/
2832
public class JmsPublishSubscribeMessageChannelSpec
29-
extends JmsMessageChannelSpec<JmsPublishSubscribeMessageChannelSpec> {
33+
extends JmsMessageChannelSpec<JmsPublishSubscribeMessageChannelSpec, SubscribableJmsChannel> {
3034

3135
protected JmsPublishSubscribeMessageChannelSpec(ConnectionFactory connectionFactory) {
3236
super(connectionFactory);

spring-integration-jms/src/test/java/org/springframework/integration/jms/dsl/JmsTests.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,7 @@
8383
* @author Artem Bilan
8484
* @author Gary Russell
8585
* @author Nasko Vasilev
86+
* @author Artem Vozhdayenko
8687
*
8788
* @since 5.0
8889
*/
@@ -371,8 +372,7 @@ public IntegrationFlow pubSubFlow() {
371372

372373
@Bean
373374
public BroadcastCapableChannel jmsPublishSubscribeChannel() {
374-
// TODO reconsider target generic type for channel implementation to return from this kind of specs
375-
return (BroadcastCapableChannel) Jms.publishSubscribeChannel(jmsConnectionFactory())
375+
return Jms.publishSubscribeChannel(jmsConnectionFactory())
376376
.destination("pubsub")
377377
.get();
378378
}

0 commit comments

Comments
 (0)