Skip to content

Commit 096bec5

Browse files
committed
Fix compatibility with latest Spring Data Geode
* Some configs and code style polishing in the Gemfire module tests and in the `DefaultHeaderChannelRegistry`
1 parent 1f05d3e commit 096bec5

File tree

7 files changed

+51
-46
lines changed

7 files changed

+51
-46
lines changed

spring-integration-core/src/main/java/org/springframework/integration/channel/DefaultHeaderChannelRegistry.java

Lines changed: 25 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2013-2016 the original author or authors.
2+
* Copyright 2013-2018 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -29,6 +29,7 @@
2929
import org.springframework.integration.context.IntegrationObjectSupport;
3030
import org.springframework.integration.support.channel.BeanFactoryChannelResolver;
3131
import org.springframework.integration.support.channel.HeaderChannelRegistry;
32+
import org.springframework.lang.Nullable;
3233
import org.springframework.messaging.MessageChannel;
3334
import org.springframework.scheduling.TaskScheduler;
3435
import org.springframework.util.Assert;
@@ -42,6 +43,7 @@
4243
*
4344
* @author Gary Russell
4445
* @author Artem Bilan
46+
*
4547
* @since 3.0
4648
*
4749
*/
@@ -50,15 +52,15 @@ public class DefaultHeaderChannelRegistry extends IntegrationObjectSupport
5052

5153
private static final int DEFAULT_REAPER_DELAY = 60000;
5254

53-
protected final Map<String, MessageChannelWrapper> channels = new ConcurrentHashMap<String, DefaultHeaderChannelRegistry.MessageChannelWrapper>();
54-
5555
protected static final AtomicLong id = new AtomicLong();
5656

57+
protected final Map<String, MessageChannelWrapper> channels = new ConcurrentHashMap<>();
58+
5759
protected final String uuid = UUID.randomUUID().toString() + ":";
5860

59-
private volatile boolean removeOnGet;
61+
private boolean removeOnGet;
6062

61-
private volatile long reaperDelay;
63+
private long reaperDelay;
6264

6365
private volatile ScheduledFuture<?> reaperScheduledFuture;
6466

@@ -118,15 +120,17 @@ public final int size() {
118120
@Override
119121
protected void onInit() throws Exception {
120122
super.onInit();
121-
Assert.notNull(this.getTaskScheduler(), "a task scheduler is required");
123+
Assert.notNull(getTaskScheduler(), "a task scheduler is required");
122124
}
123125

124126
@Override
125127
public synchronized void start() {
126128
if (!this.running) {
127-
Assert.notNull(this.getTaskScheduler(), "a task scheduler is required");
128-
this.reaperScheduledFuture = this.getTaskScheduler().schedule(this,
129-
new Date(System.currentTimeMillis() + this.reaperDelay));
129+
Assert.notNull(getTaskScheduler(), "a task scheduler is required");
130+
this.reaperScheduledFuture =
131+
getTaskScheduler()
132+
.schedule(this, new Date(System.currentTimeMillis() + this.reaperDelay));
133+
130134
this.running = true;
131135
}
132136
}
@@ -142,7 +146,7 @@ public synchronized void stop() {
142146
}
143147

144148
public void stop(Runnable callback) {
145-
this.stop();
149+
stop();
146150
callback.run();
147151
}
148152

@@ -152,17 +156,17 @@ public boolean isRunning() {
152156
}
153157

154158
@Override
155-
public Object channelToChannelName(Object channel) {
159+
public Object channelToChannelName(@Nullable Object channel) {
156160
return channelToChannelName(channel, this.reaperDelay);
157161
}
158162

159163
@Override
160-
public Object channelToChannelName(Object channel, long timeToLive) {
164+
public Object channelToChannelName(@Nullable Object channel, long timeToLive) {
161165
if (!this.running && !this.explicitlyStopped && this.getTaskScheduler() != null) {
162166
start();
163167
}
164168
if (channel != null && channel instanceof MessageChannel) {
165-
String name = this.uuid + DefaultHeaderChannelRegistry.id.incrementAndGet();
169+
String name = this.uuid + id.incrementAndGet();
166170
this.channels.put(name, new MessageChannelWrapper((MessageChannel) channel,
167171
System.currentTimeMillis() + timeToLive));
168172
if (logger.isDebugEnabled()) {
@@ -176,7 +180,7 @@ public Object channelToChannelName(Object channel, long timeToLive) {
176180
}
177181

178182
@Override
179-
public MessageChannel channelNameToChannel(String name) {
183+
public MessageChannel channelNameToChannel(@Nullable String name) {
180184
if (name != null) {
181185
MessageChannelWrapper messageChannelWrapper;
182186
if (this.removeOnGet) {
@@ -188,6 +192,7 @@ public MessageChannel channelNameToChannel(String name) {
188192
if (logger.isDebugEnabled() && messageChannelWrapper != null) {
189193
logger.debug("Retrieved " + messageChannelWrapper.getChannel() + " with " + name);
190194
}
195+
191196
return messageChannelWrapper == null ? null : messageChannelWrapper.getChannel();
192197
}
193198
return null;
@@ -202,7 +207,8 @@ public synchronized void runReaper() {
202207
this.reaperScheduledFuture.cancel(true);
203208
this.reaperScheduledFuture = null;
204209
}
205-
this.run();
210+
211+
run();
206212
}
207213

208214
@Override
@@ -221,8 +227,10 @@ public synchronized void run() {
221227
iterator.remove();
222228
}
223229
}
224-
this.reaperScheduledFuture = this.getTaskScheduler().schedule(this,
225-
new Date(System.currentTimeMillis() + this.reaperDelay));
230+
this.reaperScheduledFuture =
231+
getTaskScheduler()
232+
.schedule(this, new Date(System.currentTimeMillis() + this.reaperDelay));
233+
226234
if (logger.isTraceEnabled()) {
227235
logger.trace("Reaper completed; channels size=" + this.channels.size());
228236
}

spring-integration-gemfire/src/test/java/org/springframework/integration/gemfire/config/xml/GemfireCqInboundChannelAdapterParserTests-context.xml

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,12 @@
11
<?xml version="1.0" encoding="UTF-8"?>
22
<beans xmlns="http://www.springframework.org/schema/beans"
33
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
4-
xmlns:gfe="http://www.springframework.org/schema/gemfire"
54
xmlns:int="http://www.springframework.org/schema/integration"
65
xmlns:util="http://www.springframework.org/schema/util"
76
xmlns:int-gfe="http://www.springframework.org/schema/integration/gemfire"
87
xmlns:context="http://www.springframework.org/schema/context"
98
xsi:schemaLocation="http://www.springframework.org/schema/integration/gemfire
109
http://www.springframework.org/schema/integration/gemfire/spring-integration-gemfire.xsd
11-
http://www.springframework.org/schema/gemfire
12-
http://www.springframework.org/schema/gemfire/spring-gemfire.xsd
1310
http://www.springframework.org/schema/integration
1411
http://www.springframework.org/schema/integration/spring-integration.xsd
1512
http://www.springframework.org/schema/beans

spring-integration-gemfire/src/test/java/org/springframework/integration/gemfire/inbound/CacheListeningMessageProducerTests.java

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2017 the original author or authors.
2+
* Copyright 2002-2018 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -28,8 +28,8 @@
2828

2929
import org.springframework.beans.factory.BeanFactory;
3030
import org.springframework.data.gemfire.CacheFactoryBean;
31+
import org.springframework.data.gemfire.GenericRegionFactoryBean;
3132
import org.springframework.data.gemfire.RegionAttributesFactoryBean;
32-
import org.springframework.data.gemfire.RegionFactoryBean;
3333
import org.springframework.expression.spel.standard.SpelExpressionParser;
3434
import org.springframework.integration.channel.QueueChannel;
3535
import org.springframework.messaging.Message;
@@ -38,6 +38,7 @@
3838
* @author Mark Fisher
3939
* @author Gary Russell
4040
* @author Artem Bilan
41+
*
4142
* @since 2.1
4243
*/
4344
public class CacheListeningMessageProducerTests {
@@ -46,17 +47,15 @@ public class CacheListeningMessageProducerTests {
4647

4748
private static CacheFactoryBean cacheFactoryBean;
4849

49-
private static RegionFactoryBean<String, String> regionFactoryBean;
50+
private static GenericRegionFactoryBean<String, String> regionFactoryBean;
5051

5152
private static Region<String, String> region;
5253

5354
@BeforeClass
5455
public static void setup() throws Exception {
5556
cacheFactoryBean = new CacheFactoryBean();
5657

57-
regionFactoryBean = new RegionFactoryBean<String, String>() {
58-
59-
};
58+
regionFactoryBean = new GenericRegionFactoryBean<>();
6059
regionFactoryBean.setName("test.receiveNewValuePayloadForCreateEvent");
6160
regionFactoryBean.setCache(cacheFactoryBean.getObject());
6261
setRegionAttributes(regionFactoryBean);
@@ -72,7 +71,7 @@ public static void teardown() throws Exception {
7271
}
7372

7473
@Test
75-
public void receiveNewValuePayloadForCreateEvent() throws Exception {
74+
public void receiveNewValuePayloadForCreateEvent() {
7675
QueueChannel channel = new QueueChannel();
7776
CacheListeningMessageProducer producer = new CacheListeningMessageProducer(region);
7877
producer.setPayloadExpression(PARSER.parseExpression("key + '=' + newValue"));
@@ -91,7 +90,7 @@ public void receiveNewValuePayloadForCreateEvent() throws Exception {
9190
}
9291

9392
@Test
94-
public void receiveNewValuePayloadForUpdateEvent() throws Exception {
93+
public void receiveNewValuePayloadForUpdateEvent() {
9594
QueueChannel channel = new QueueChannel();
9695
CacheListeningMessageProducer producer = new CacheListeningMessageProducer(region);
9796
producer.setPayloadExpression(PARSER.parseExpression("newValue"));
@@ -114,7 +113,7 @@ public void receiveNewValuePayloadForUpdateEvent() throws Exception {
114113
}
115114

116115
@Test
117-
public void receiveOldValuePayloadForDestroyEvent() throws Exception {
116+
public void receiveOldValuePayloadForDestroyEvent() {
118117
QueueChannel channel = new QueueChannel();
119118
CacheListeningMessageProducer producer = new CacheListeningMessageProducer(region);
120119
producer.setSupportedEventTypes(EventType.DESTROYED);
@@ -136,7 +135,7 @@ public void receiveOldValuePayloadForDestroyEvent() throws Exception {
136135
}
137136

138137
@Test
139-
public void receiveOldValuePayloadForInvalidateEvent() throws Exception {
138+
public void receiveOldValuePayloadForInvalidateEvent() {
140139
QueueChannel channel = new QueueChannel();
141140
CacheListeningMessageProducer producer = new CacheListeningMessageProducer(region);
142141
producer.setSupportedEventTypes(EventType.INVALIDATED);
@@ -158,7 +157,9 @@ public void receiveOldValuePayloadForInvalidateEvent() throws Exception {
158157
}
159158

160159
@SuppressWarnings("unchecked")
161-
private static void setRegionAttributes(RegionFactoryBean<String, String> regionFactoryBean) throws Exception {
160+
private static void setRegionAttributes(GenericRegionFactoryBean<String, String> regionFactoryBean)
161+
throws Exception {
162+
162163
RegionAttributesFactoryBean attributesFactoryBean = new RegionAttributesFactoryBean();
163164
attributesFactoryBean.afterPropertiesSet();
164165
regionFactoryBean.setAttributes(attributesFactoryBean.getObject());

spring-integration-gemfire/src/test/java/org/springframework/integration/gemfire/inbound/GemfireInboundChannelAdapterTests-context.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
http://www.springframework.org/schema/beans
1111
http://www.springframework.org/schema/beans/spring-beans.xsd">
1212

13-
<gfe:cache id="gemfire-cache-2" use-bean-factory-locator="false"/>
13+
<gfe:cache id="gemfire-cache-2"/>
1414

1515
<gfe:local-region id="region1" cache-ref="gemfire-cache-2"/>
1616

spring-integration-gemfire/src/test/java/org/springframework/integration/gemfire/outbound/GemfireOutboundChannelAdapterTests-context.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd
1010
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">
1111

12-
<gfe:cache use-bean-factory-locator="false"/>
12+
<gfe:cache/>
1313

1414
<gfe:replicated-region id="region1"/>
1515

spring-integration-gemfire/src/test/java/org/springframework/integration/gemfire/store/GemfireMessageStoreTests.java

Lines changed: 11 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2017 the original author or authors.
2+
* Copyright 2002-2018 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -33,7 +33,7 @@
3333
import org.junit.Test;
3434

3535
import org.springframework.data.gemfire.CacheFactoryBean;
36-
import org.springframework.data.gemfire.RegionFactoryBean;
36+
import org.springframework.data.gemfire.GenericRegionFactoryBean;
3737
import org.springframework.integration.channel.DirectChannel;
3838
import org.springframework.integration.history.MessageHistory;
3939
import org.springframework.integration.store.MessageGroup;
@@ -48,6 +48,7 @@
4848
* @author David Turanski
4949
* @author Gary Russell
5050
* @author Artem Bilan
51+
*
5152
* @since 2.1
5253
*/
5354
public class GemfireMessageStoreTests {
@@ -57,7 +58,7 @@ public class GemfireMessageStoreTests {
5758
private static Region<Object, Object> region;
5859

5960
@Test
60-
public void addAndGetMessage() throws Exception {
61+
public void addAndGetMessage() {
6162
GemfireMessageStore store = new GemfireMessageStore(region);
6263
Message<?> message = MessageBuilder.withPayload("test").build();
6364
store.addMessage(message);
@@ -67,9 +68,7 @@ public void addAndGetMessage() throws Exception {
6768

6869
@Test
6970
public void testRegionConstructor() throws Exception {
70-
RegionFactoryBean<Object, Object> region = new RegionFactoryBean<Object, Object>() {
71-
72-
};
71+
GenericRegionFactoryBean<Object, Object> region = new GenericRegionFactoryBean<>();
7372
region.setName("someRegion");
7473
region.setCache(cacheFactoryBean.getObject());
7574
region.afterPropertiesSet();
@@ -81,10 +80,10 @@ public void testRegionConstructor() throws Exception {
8180
}
8281

8382
@Test
84-
public void testWithMessageHistory() throws Exception {
83+
public void testWithMessageHistory() {
8584
GemfireMessageStore store = new GemfireMessageStore(region);
8685

87-
Message<?> message = new GenericMessage<String>("Hello");
86+
Message<?> message = new GenericMessage<>("Hello");
8887
DirectChannel fooChannel = new DirectChannel();
8988
fooChannel.setBeanName("fooChannel");
9089
DirectChannel barChannel = new DirectChannel();
@@ -103,11 +102,11 @@ public void testWithMessageHistory() throws Exception {
103102
}
104103

105104
@Test
106-
public void testAddAndRemoveMessagesFromMessageGroup() throws Exception {
105+
public void testAddAndRemoveMessagesFromMessageGroup() {
107106
GemfireMessageStore messageStore = new GemfireMessageStore(region);
108107

109108
String groupId = "X";
110-
List<Message<?>> messages = new ArrayList<Message<?>>();
109+
List<Message<?>> messages = new ArrayList<>();
111110
for (int i = 0; i < 25; i++) {
112111
Message<String> message = MessageBuilder.withPayload("foo").setCorrelationId(groupId).build();
113112
messageStore.addMessagesToGroup(groupId, message);
@@ -121,11 +120,11 @@ public void testAddAndRemoveMessagesFromMessageGroup() throws Exception {
121120
}
122121

123122
@Test
124-
public void testAddAndRemoveMessagesFromMessageGroupWithPrefix() throws Exception {
123+
public void testAddAndRemoveMessagesFromMessageGroupWithPrefix() {
125124
GemfireMessageStore messageStore = new GemfireMessageStore(region, "foo_");
126125

127126
String groupId = "X";
128-
List<Message<?>> messages = new ArrayList<Message<?>>();
127+
List<Message<?>> messages = new ArrayList<>();
129128
for (int i = 0; i < 25; i++) {
130129
Message<String> message = MessageBuilder.withPayload("foo").setCorrelationId(groupId).build();
131130
messageStore.addMessagesToGroup(groupId, message);

spring-integration-gemfire/src/test/java/org/springframework/integration/gemfire/util/AggregatorWithGemfireLocksTests-context.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
http://www.springframework.org/schema/geode http://www.springframework.org/schema/gemfire/spring-geode.xsd
88
http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd">
99

10-
<gfe:cache use-bean-factory-locator="false"/>
10+
<gfe:cache/>
1111

1212
<bean id="lockRegistry" class="org.springframework.integration.gemfire.util.GemfireLockRegistry">
1313
<constructor-arg ref="gemfireCache"/>

0 commit comments

Comments
 (0)