Skip to content

Commit 88c7646

Browse files
artembilangaryrussell
authored andcommitted
INT-4507: Use UNLINK in RedisMessageStore if any
JIRA: https://jira.spring.io/browse/INT-4507 * Add `RedisUtils` with the `isUnlinkAvailable()` to check the Redis server version to be sure that `UNLINK` is available or not * Use `RedisUtils.isUnlinkAvailable()` in the `RedisMessageStore` and `RedisLockRegistry` when a removal functionality is performed * Add `AbstractKeyValueMessageStore.doRemoveAll(Collection<Object> ids)` for optimization * Implement `doRemoveAll()` in the `RedisMessageStore` and `GemfireMessageStore` **Cherry-pick to 5.0.x**
1 parent d61cd79 commit 88c7646

File tree

6 files changed

+137
-20
lines changed

6 files changed

+137
-20
lines changed

spring-integration-core/src/main/java/org/springframework/integration/store/AbstractKeyValueMessageStore.java

Lines changed: 29 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2017 the original author or authors.
2+
* Copyright 2002-2018 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -20,8 +20,10 @@
2020
import java.util.Collection;
2121
import java.util.HashSet;
2222
import java.util.Iterator;
23+
import java.util.List;
2324
import java.util.Set;
2425
import java.util.UUID;
26+
import java.util.stream.Collectors;
2527

2628
import org.springframework.jmx.export.annotation.ManagedAttribute;
2729
import org.springframework.messaging.Message;
@@ -243,11 +245,21 @@ public void removeMessagesFromGroup(Object groupId, Collection<Message<?>> messa
243245
if (mgm != null) {
244246
Assert.isInstanceOf(MessageGroupMetadata.class, mgm);
245247
MessageGroupMetadata messageGroupMetadata = (MessageGroupMetadata) mgm;
246-
for (Message<?> messageToRemove : messages) {
247-
UUID messageId = messageToRemove.getHeaders().getId();
248-
messageGroupMetadata.remove(messageId);
249-
doRemove(this.messagePrefix + messageId);
250-
}
248+
249+
List<UUID> ids =
250+
messages.stream()
251+
.map(messageToRemove -> messageToRemove.getHeaders().getId())
252+
.collect(Collectors.toList());
253+
254+
messageGroupMetadata.removeAll(ids);
255+
256+
List<Object> messageIds =
257+
ids.stream()
258+
.map(id -> this.messagePrefix + id)
259+
.collect(Collectors.toList());
260+
261+
doRemoveAll(messageIds);
262+
251263
messageGroupMetadata.setLastModified(System.currentTimeMillis());
252264
doStore(this.groupPrefix + groupId, messageGroupMetadata);
253265
}
@@ -275,10 +287,13 @@ public void removeMessageGroup(Object groupId) {
275287
Assert.isInstanceOf(MessageGroupMetadata.class, mgm);
276288
MessageGroupMetadata messageGroupMetadata = (MessageGroupMetadata) mgm;
277289

278-
Iterator<UUID> messageIds = messageGroupMetadata.messageIdIterator();
279-
while (messageIds.hasNext()) {
280-
removeMessage(messageIds.next());
281-
}
290+
List<Object> messageIds =
291+
messageGroupMetadata.getMessageIds()
292+
.stream()
293+
.map(id -> this.messagePrefix + id)
294+
.collect(Collectors.toList());
295+
296+
doRemoveAll(messageIds);
282297
}
283298
}
284299

@@ -325,7 +340,7 @@ public Message<?> getOneMessageFromGroup(Object groupId) {
325340
@Override
326341
public Collection<Message<?>> getMessagesForGroup(Object groupId) {
327342
MessageGroupMetadata groupMetadata = getGroupMetadata(groupId);
328-
ArrayList<Message<?>> messages = new ArrayList<Message<?>>();
343+
ArrayList<Message<?>> messages = new ArrayList<>();
329344
if (groupMetadata != null) {
330345
Iterator<UUID> messageIds = groupMetadata.messageIdIterator();
331346
while (messageIds.hasNext()) {
@@ -345,7 +360,7 @@ public Iterator<MessageGroup> iterator() {
345360
}
346361

347362
private Collection<String> normalizeKeys(Collection<String> keys) {
348-
Set<String> normalizedKeys = new HashSet<String>();
363+
Set<String> normalizedKeys = new HashSet<>();
349364
for (Object key : keys) {
350365
String strKey = (String) key;
351366
if (strKey.startsWith(this.groupPrefix)) {
@@ -378,6 +393,8 @@ public int messageGroupSize(Object groupId) {
378393

379394
protected abstract Object doRemove(Object id);
380395

396+
protected abstract void doRemoveAll(Collection<Object> ids);
397+
381398
protected abstract Collection<?> doListKeys(String keyPattern);
382399

383400
private final class MessageGroupIterator implements Iterator<MessageGroup> {

spring-integration-core/src/main/java/org/springframework/integration/store/MessageGroupMetadata.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package org.springframework.integration.store;
1818

1919
import java.io.Serializable;
20+
import java.util.Collection;
2021
import java.util.Iterator;
2122
import java.util.LinkedList;
2223
import java.util.List;
@@ -39,7 +40,7 @@ public class MessageGroupMetadata implements Serializable {
3940

4041
private static final long serialVersionUID = 1L;
4142

42-
private List<UUID> messageIds = new LinkedList<UUID>();
43+
private List<UUID> messageIds = new LinkedList<>();
4344

4445
private long timestamp;
4546

@@ -68,6 +69,10 @@ public void remove(UUID messageId) {
6869
this.messageIds.remove(messageId);
6970
}
7071

72+
public void removeAll(Collection<UUID> messageIds) {
73+
this.messageIds.removeAll(messageIds);
74+
}
75+
7176
boolean add(UUID messageId) {
7277
return !this.messageIds.contains(messageId) && this.messageIds.add(messageId);
7378
}

spring-integration-gemfire/src/main/java/org/springframework/integration/gemfire/store/GemfireMessageStore.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2017 the original author or authors.
2+
* Copyright 2002-2018 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -96,6 +96,11 @@ protected Object doRemove(Object id) {
9696
return this.messageStoreRegion.remove(id);
9797
}
9898

99+
@Override
100+
protected void doRemoveAll(Collection<Object> ids) {
101+
this.messageStoreRegion.removeAll(ids);
102+
}
103+
99104
@Override
100105
protected Collection<?> doListKeys(String keyPattern) {
101106
Assert.hasText(keyPattern, "'keyPattern' must not be empty");

spring-integration-redis/src/main/java/org/springframework/integration/redis/store/RedisMessageStore.java

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2007-2017 the original author or authors.
2+
* Copyright 2007-2018 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -26,6 +26,7 @@
2626
import org.springframework.data.redis.serializer.RedisSerializer;
2727
import org.springframework.data.redis.serializer.SerializationException;
2828
import org.springframework.data.redis.serializer.StringRedisSerializer;
29+
import org.springframework.integration.redis.util.RedisUtils;
2930
import org.springframework.integration.store.AbstractKeyValueMessageStore;
3031
import org.springframework.integration.store.MessageGroupStore;
3132
import org.springframework.integration.store.MessageStore;
@@ -66,7 +67,7 @@ public RedisMessageStore(RedisConnectionFactory connectionFactory) {
6667
*/
6768
public RedisMessageStore(RedisConnectionFactory connectionFactory, String prefix) {
6869
super(prefix);
69-
this.redisTemplate = new RedisTemplate<Object, Object>();
70+
this.redisTemplate = new RedisTemplate<>();
7071
this.redisTemplate.setConnectionFactory(connectionFactory);
7172
this.redisTemplate.setKeySerializer(new StringRedisSerializer());
7273
this.redisTemplate.setValueSerializer(new JdkSerializationRedisSerializer());
@@ -130,11 +131,26 @@ protected Object doRemove(Object id) {
130131
Assert.notNull(id, "'id' must not be null");
131132
Object removedObject = this.doRetrieve(id);
132133
if (removedObject != null) {
133-
this.redisTemplate.delete(id);
134+
if (RedisUtils.isUnlinkAvailable(this.redisTemplate)) {
135+
this.redisTemplate.unlink(id);
136+
}
137+
else {
138+
this.redisTemplate.delete(id);
139+
}
134140
}
135141
return removedObject;
136142
}
137143

144+
@Override
145+
protected void doRemoveAll(Collection<Object> ids) {
146+
if (RedisUtils.isUnlinkAvailable(this.redisTemplate)) {
147+
this.redisTemplate.unlink(ids);
148+
}
149+
else {
150+
this.redisTemplate.delete(ids);
151+
}
152+
}
153+
138154
@Override
139155
protected Collection<?> doListKeys(String keyPattern) {
140156
Assert.hasText(keyPattern, "'keyPattern' must not be empty");

spring-integration-redis/src/main/java/org/springframework/integration/redis/util/RedisLockRegistry.java

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -302,11 +302,10 @@ public void unlock() {
302302
}
303303
try {
304304
if (Thread.currentThread().isInterrupted()) {
305-
RedisLockRegistry.this.executor.execute(() ->
306-
RedisLockRegistry.this.redisTemplate.delete(this.lockKey));
305+
RedisLockRegistry.this.executor.execute(this::removeLockKey);
307306
}
308307
else {
309-
RedisLockRegistry.this.redisTemplate.delete(this.lockKey);
308+
removeLockKey();
310309
}
311310

312311
if (logger.isDebugEnabled()) {
@@ -321,6 +320,15 @@ public void unlock() {
321320
}
322321
}
323322

323+
private void removeLockKey() {
324+
if (RedisUtils.isUnlinkAvailable(RedisLockRegistry.this.redisTemplate)) {
325+
RedisLockRegistry.this.redisTemplate.unlink(this.lockKey);
326+
}
327+
else {
328+
RedisLockRegistry.this.redisTemplate.delete(this.lockKey);
329+
}
330+
}
331+
324332
@Override
325333
public Condition newCondition() {
326334
throw new UnsupportedOperationException("Conditions are not supported");
Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
/*
2+
* Copyright 2018 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.integration.redis.util;
18+
19+
import java.util.Properties;
20+
21+
import org.springframework.data.redis.core.RedisCallback;
22+
import org.springframework.data.redis.core.RedisOperations;
23+
24+
/**
25+
* A set of utility methods for common Redis functions.
26+
*
27+
* @author Artem Bilan
28+
*
29+
* @since 5.1
30+
*/
31+
public final class RedisUtils {
32+
33+
private static final String SECTION = "server";
34+
35+
private static final String VERSION_PROPERTY = "redis_version";
36+
37+
private static final int MAJOR_VERSION_TO_COMPARE = 4;
38+
39+
private static Boolean unlinkAvailable;
40+
41+
/**
42+
* Perform an {@code INFO} command on the provided {@link RedisOperations} to check
43+
* the Redis server version to be sure that {@code UNLINK} is available or not.
44+
* @param redisOperations the {@link RedisOperations} to perform {@code INFO} command.
45+
* @return true or false if {@code UNLINK} Redis command is available or not.
46+
* @throws IllegalStateException when {@code INFO} returns null from the Redis.
47+
*/
48+
public static boolean isUnlinkAvailable(RedisOperations<?, ?> redisOperations) {
49+
if (unlinkAvailable == null) {
50+
Properties info = redisOperations.execute(
51+
(RedisCallback<Properties>) connection -> connection.serverCommands().info(SECTION));
52+
if (info != null) {
53+
int majorVersion = Integer.parseInt(info.getProperty(VERSION_PROPERTY).split("\\.")[0]);
54+
unlinkAvailable = majorVersion >= MAJOR_VERSION_TO_COMPARE;
55+
}
56+
else {
57+
throw new IllegalStateException("The INFO command cannot be used in pipeline/transaction.");
58+
}
59+
}
60+
return unlinkAvailable;
61+
}
62+
63+
private RedisUtils() {
64+
}
65+
66+
}

0 commit comments

Comments
 (0)