Skip to content

Commit 90e4c54

Browse files
artembilangaryrussell
authored andcommitted
INT-4317: JMS: dynamic deliverMode and timeToLive (#2561)
* INT-4317: JMS: dynamic deliverMode and timeToLive JIRA: https://jira.spring.io/browse/INT-4317 * Add `deliveryModeExpression` and `timeToLiveExpression` properties to the `JmsSendingMessageHandler` and expose them in the Java DSL and XML components * Add `setMapInboundDeliveryMode()` and `setMapInboundExpiration()` `boolean` properties (default `false`) to the `DefaultJmsHeaderMapper` for transferring `JMSDeliveryMode` and `JMSExpiration` into appropriate `JmsHeaders.DELIVERY_MODE` and `JmsHeaders.EXPIRATION` headers * Upgrade to latest Kotlin and AssertK * Upgrade to Kotlin 1.2.61
1 parent 21338d1 commit 90e4c54

File tree

14 files changed

+361
-55
lines changed

14 files changed

+361
-55
lines changed

build.gradle

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
buildscript {
2-
ext.kotlinVersion = '1.2.51'
2+
ext.kotlinVersion = '1.2.61'
33
repositories {
44
maven { url 'https://repo.spring.io/plugins-release' }
55
}
@@ -88,7 +88,7 @@ subprojects { subproject ->
8888
apacheSshdVersion = '1.7.0'
8989
aspectjVersion = '1.9.0'
9090
assertjVersion = '3.9.1'
91-
assertkVersion = '0.10'
91+
assertkVersion = '0.12'
9292
boonVersion = '0.34'
9393
commonsDbcp2Version = '2.2.0'
9494
commonsIoVersion = '2.6'
@@ -178,9 +178,7 @@ subprojects { subproject ->
178178
testRuntime "org.apache.logging.log4j:log4j-slf4j-impl:$log4jVersion"
179179
testRuntime "org.apache.logging.log4j:log4j-jcl:$log4jVersion"
180180

181-
testCompile("com.willowtreeapps.assertk:assertk:$assertkVersion") {
182-
exclude group: 'org.jetbrains.kotlin', module: 'kotlin-reflect'
183-
}
181+
testCompile("com.willowtreeapps.assertk:assertk-jvm:$assertkVersion")
184182

185183
testCompile "org.jetbrains.kotlin:kotlin-stdlib-jdk8:$kotlinVersion"
186184
testRuntime "org.jetbrains.kotlin:kotlin-reflect:$kotlinVersion"

spring-integration-jms/src/main/java/org/springframework/integration/jms/DefaultJmsHeaderMapper.java

Lines changed: 38 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2016 the original author or authors.
2+
* Copyright 2002-2018 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -68,15 +68,35 @@ public class DefaultJmsHeaderMapper extends JmsHeaderMapper {
6868

6969
private volatile boolean mapInboundPriority = true;
7070

71+
private volatile boolean mapInboundDeliveryMode = false;
72+
73+
private volatile boolean mapInboundExpiration = false;
74+
7175
/**
7276
* Suppress the mapping of inbound priority by using this setter with 'false'.
73-
*
7477
* @param mapInboundPriority 'false' to suppress mapping the inbound priority.
7578
*/
7679
public void setMapInboundPriority(boolean mapInboundPriority) {
7780
this.mapInboundPriority = mapInboundPriority;
7881
}
7982

83+
/**
84+
* Map the inbound {@code deliveryMode} by using this setter with 'true'.
85+
* @param mapInboundDeliveryMode 'true' to map the inbound delivery mode.
86+
* @since 5.1
87+
*/
88+
public void setMapInboundDeliveryMode(boolean mapInboundDeliveryMode) {
89+
this.mapInboundDeliveryMode = mapInboundDeliveryMode;
90+
}
91+
/**
92+
* Map the inbound {@code expiration} by using this setter with 'true'.
93+
* @param mapInboundExpiration 'true' to map the inbound expiration.
94+
* @since 5.1
95+
*/
96+
public void setMapInboundExpiration(boolean mapInboundExpiration) {
97+
this.mapInboundExpiration = mapInboundExpiration;
98+
}
99+
80100
/**
81101
* Specify a prefix to be appended to the integration message header name
82102
* for any JMS property that is being mapped into the MessageHeaders.
@@ -248,6 +268,22 @@ public Map<String, Object> toHeaders(javax.jms.Message jmsMessage) {
248268
this.logger.info("failed to read JMSPriority property, skipping", e);
249269
}
250270
}
271+
if (this.mapInboundDeliveryMode) {
272+
try {
273+
headers.put(JmsHeaders.DELIVERY_MODE, jmsMessage.getJMSDeliveryMode());
274+
}
275+
catch (Exception e) {
276+
this.logger.info("failed to read JMSDeliveryMode property, skipping", e);
277+
}
278+
}
279+
if (this.mapInboundExpiration) {
280+
try {
281+
headers.put(JmsHeaders.EXPIRATION, jmsMessage.getJMSExpiration());
282+
}
283+
catch (Exception e) {
284+
this.logger.info("failed to read JMSExpiration property, skipping", e);
285+
}
286+
}
251287
Enumeration<?> jmsPropertyNames = jmsMessage.getPropertyNames();
252288
if (jmsPropertyNames != null) {
253289
while (jmsPropertyNames.hasMoreElements()) {

spring-integration-jms/src/main/java/org/springframework/integration/jms/DynamicJmsTemplate.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,4 +71,16 @@ public long getReceiveTimeout() {
7171
return (receiveTimeout != null) ? receiveTimeout : super.getReceiveTimeout();
7272
}
7373

74+
@Override
75+
public int getDeliveryMode() {
76+
Integer deliveryMode = DynamicJmsTemplateProperties.getDeliveryMode();
77+
return (deliveryMode != null) ? deliveryMode : super.getDeliveryMode();
78+
}
79+
80+
@Override
81+
public long getTimeToLive() {
82+
Long timeToLive = DynamicJmsTemplateProperties.getTimeToLive();
83+
return (timeToLive != null) ? timeToLive : super.getTimeToLive();
84+
}
85+
7486
}

spring-integration-jms/src/main/java/org/springframework/integration/jms/DynamicJmsTemplateProperties.java

Lines changed: 36 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2011 the original author or authors.
2+
* Copyright 2002-2018 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -18,15 +18,24 @@
1818

1919
/**
2020
* @author Mark Fisher
21+
* @author Artem Bilan
22+
*
2123
* @since 2.0.2
2224
*/
2325
abstract class DynamicJmsTemplateProperties {
2426

25-
private static final ThreadLocal<Integer> priorityHolder = new ThreadLocal<Integer>();
27+
private static final ThreadLocal<Integer> priorityHolder = new ThreadLocal<>();
28+
29+
private static final ThreadLocal<Long> receiveTimeoutHolder = new ThreadLocal<>();
30+
31+
private static final ThreadLocal<Integer> deliverModeHolder = new ThreadLocal<>();
2632

27-
private static final ThreadLocal<Long> receiveTimeoutHolder = new ThreadLocal<Long>();
33+
private static final ThreadLocal<Long> timeToLiveHolder = new ThreadLocal<>();
2834

2935

36+
private DynamicJmsTemplateProperties() {
37+
}
38+
3039
public static Integer getPriority() {
3140
return priorityHolder.get();
3241
}
@@ -51,4 +60,28 @@ public static void clearReceiveTimeout() {
5160
receiveTimeoutHolder.remove();
5261
}
5362

63+
public static Integer getDeliveryMode() {
64+
return deliverModeHolder.get();
65+
}
66+
67+
public static void setDeliveryMode(Integer deliveryMode) {
68+
deliverModeHolder.set(deliveryMode);
69+
}
70+
71+
public static void clearDeliveryMode() {
72+
deliverModeHolder.remove();
73+
}
74+
75+
public static Long getTimeToLive() {
76+
return timeToLiveHolder.get();
77+
}
78+
79+
public static void setTimeToLive(Long timeToLive) {
80+
timeToLiveHolder.set(timeToLive);
81+
}
82+
83+
public static void clearTimeToLive() {
84+
timeToLiveHolder.remove();
85+
}
86+
5487
}

spring-integration-jms/src/main/java/org/springframework/integration/jms/JmsSendingMessageHandler.java

Lines changed: 85 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2017 the original author or authors.
2+
* Copyright 2002-2018 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -17,11 +17,12 @@
1717
package org.springframework.integration.jms;
1818

1919
import javax.jms.Destination;
20-
import javax.jms.JMSException;
2120

2221
import org.springframework.core.convert.ConversionService;
22+
import org.springframework.expression.EvaluationContext;
2323
import org.springframework.expression.Expression;
24-
import org.springframework.integration.IntegrationMessageHeaderAccessor;
24+
import org.springframework.integration.StaticMessageHeaderAccessor;
25+
import org.springframework.integration.expression.ExpressionUtils;
2526
import org.springframework.integration.handler.AbstractMessageHandler;
2627
import org.springframework.integration.handler.ExpressionEvaluatingMessageProcessor;
2728
import org.springframework.jms.core.JmsTemplate;
@@ -35,20 +36,27 @@
3536
*
3637
* @author Mark Fisher
3738
* @author Oleg Zhurakousky
39+
* @author Artem Bilan
3840
*/
3941
public class JmsSendingMessageHandler extends AbstractMessageHandler {
4042

4143
private final JmsTemplate jmsTemplate;
4244

43-
private volatile Destination destination;
45+
private Destination destination;
4446

45-
private volatile String destinationName;
47+
private String destinationName;
4648

47-
private volatile JmsHeaderMapper headerMapper = new DefaultJmsHeaderMapper();
49+
private JmsHeaderMapper headerMapper = new DefaultJmsHeaderMapper();
4850

49-
private volatile boolean extractPayload = true;
51+
private boolean extractPayload = true;
5052

51-
private volatile ExpressionEvaluatingMessageProcessor<?> destinationExpressionProcessor;
53+
private ExpressionEvaluatingMessageProcessor<?> destinationExpressionProcessor;
54+
55+
private Expression deliveryModeExpression;
56+
57+
private Expression timeToLiveExpression;
58+
59+
private EvaluationContext evaluationContext;
5260

5361

5462
public JmsSendingMessageHandler(JmsTemplate jmsTemplate) {
@@ -75,22 +83,61 @@ public void setDestinationExpression(Expression destinationExpression) {
7583
}
7684

7785
public void setHeaderMapper(JmsHeaderMapper headerMapper) {
86+
Assert.notNull(headerMapper, "'headerMapper' cannot be null");
7887
this.headerMapper = headerMapper;
7988
}
8089

8190
/**
8291
* Specify whether the payload should be extracted from each integration
8392
* Message to be used as the JMS Message body.
84-
*
8593
* <p>The default value is <code>true</code>. To force passing of the full
8694
* Spring Integration Message instead, set this to <code>false</code>.
87-
*
8895
* @param extractPayload true to extract the payload.
8996
*/
9097
public void setExtractPayload(boolean extractPayload) {
9198
this.extractPayload = extractPayload;
9299
}
93100

101+
/**
102+
* Specify a SpEL expression to evaluate a {@code deliveryMode} for the JMS message to send.
103+
* This option is applied only of QoS is enabled on the {@link JmsTemplate}.
104+
* @param deliveryModeExpression to use
105+
* @since 5.1
106+
* @see #setDeliveryModeExpression(Expression)
107+
*/
108+
public void setDeliveryModeExpressionString(String deliveryModeExpression) {
109+
setDeliveryModeExpression(EXPRESSION_PARSER.parseExpression(deliveryModeExpression));
110+
}
111+
112+
/**
113+
* Specify a SpEL expression to evaluate a {@code deliveryMode} for the JMS message to send.
114+
* This option is applied only of QoS is enabled on the {@link JmsTemplate}.
115+
* @param deliveryModeExpression to use
116+
* @since 5.1
117+
*/
118+
public void setDeliveryModeExpression(Expression deliveryModeExpression) {
119+
this.deliveryModeExpression = deliveryModeExpression;
120+
}
121+
122+
/**
123+
* Specify a SpEL expression to evaluate a {@code timeToLive} for the JMS message to send.
124+
* @param timeToLiveExpression to use
125+
* @since 5.1
126+
* @see #setTimeToLiveExpression(Expression)
127+
*/
128+
public void setTimeToLiveExpressionString(String timeToLiveExpression) {
129+
setTimeToLiveExpression(EXPRESSION_PARSER.parseExpression(timeToLiveExpression));
130+
}
131+
132+
/**
133+
* Specify a SpEL expression to evaluate a {@code timeToLive} for the JMS message to send.
134+
* @param timeToLiveExpression to use
135+
* @since 5.1
136+
*/
137+
public void setTimeToLiveExpression(Expression timeToLiveExpression) {
138+
this.timeToLiveExpression = timeToLiveExpression;
139+
}
140+
94141
@Override
95142
public String getComponentType() {
96143
return "jms:outbound-channel-adapter";
@@ -105,22 +152,42 @@ protected void onInit() {
105152
this.destinationExpressionProcessor.setConversionService(conversionService);
106153
}
107154
}
155+
this.evaluationContext = ExpressionUtils.createStandardEvaluationContext(getBeanFactory());
108156
}
109157

110158
@Override
111-
protected void handleMessageInternal(final Message<?> message) throws Exception {
112-
if (message == null) {
113-
throw new IllegalArgumentException("message must not be null");
114-
}
159+
protected void handleMessageInternal(final Message<?> message) {
115160
Object destination = this.determineDestination(message);
116161
Object objectToSend = (this.extractPayload) ? message.getPayload() : message;
117162
MessagePostProcessor messagePostProcessor = new HeaderMappingMessagePostProcessor(message, this.headerMapper);
163+
164+
if (this.jmsTemplate instanceof DynamicJmsTemplate && this.jmsTemplate.isExplicitQosEnabled()) {
165+
Integer priority = StaticMessageHeaderAccessor.getPriority(message);
166+
if (priority != null) {
167+
DynamicJmsTemplateProperties.setPriority(priority);
168+
}
169+
if (this.deliveryModeExpression != null) {
170+
Integer deliveryMode =
171+
this.deliveryModeExpression.getValue(this.evaluationContext, message, Integer.class);
172+
173+
if (deliveryMode != null) {
174+
DynamicJmsTemplateProperties.setDeliveryMode(deliveryMode);
175+
}
176+
}
177+
if (this.timeToLiveExpression != null) {
178+
Long timeToLive = this.timeToLiveExpression.getValue(this.evaluationContext, message, Long.class);
179+
if (timeToLive != null) {
180+
DynamicJmsTemplateProperties.setTimeToLive(timeToLive);
181+
}
182+
}
183+
}
118184
try {
119-
DynamicJmsTemplateProperties.setPriority(new IntegrationMessageHeaderAccessor(message).getPriority());
120-
this.send(destination, objectToSend, messagePostProcessor);
185+
send(destination, objectToSend, messagePostProcessor);
121186
}
122187
finally {
123188
DynamicJmsTemplateProperties.clearPriority();
189+
DynamicJmsTemplateProperties.clearDeliveryMode();
190+
DynamicJmsTemplateProperties.clearTimeToLive();
124191
}
125192
}
126193

@@ -167,10 +234,11 @@ private static final class HeaderMappingMessagePostProcessor implements MessageP
167234
}
168235

169236
@Override
170-
public javax.jms.Message postProcessMessage(javax.jms.Message jmsMessage) throws JMSException {
237+
public javax.jms.Message postProcessMessage(javax.jms.Message jmsMessage) {
171238
this.headerMapper.fromHeaders(this.integrationMessage.getHeaders(), jmsMessage);
172239
return jmsMessage;
173240
}
241+
174242
}
175243

176244
}

0 commit comments

Comments
 (0)