Skip to content

Commit 6d6f53f

Browse files
rohan mukeshartembilan
authored andcommitted
Fix r2bdc componentType; Redis Streams ack issue
The R2DBC channel adapters have wrong componentType * Fix R2DBC component type to not mention `reactive-`: they are reactive by R2DBC definition The `ReactiveRedisStreamMessageProducer` calls ack() explicitly when populates `IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACK` header, but must provide an implementation of `SimpleAcknowledgment` instead * Fix `ReactiveRedisStreamMessageProducer` to populate the proper `IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACK` header and in the proper moment * Fix `ReactiveRedisStreamMessageProducerTests` to verify that `autoAck=false` works as expected together with the `IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACK` header functionality
1 parent 94e6521 commit 6d6f53f

File tree

5 files changed

+107
-10
lines changed

5 files changed

+107
-10
lines changed

spring-integration-r2dbc/src/main/java/org/springframework/integration/r2dbc/outbound/R2dbcMessageHandler.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,7 @@ public void setCriteriaExpression(Expression criteriaExpression) {
112112

113113
@Override
114114
public String getComponentType() {
115-
return "r2dbc:reactive-outbound-channel-adapter";
115+
return "r2dbc:outbound-channel-adapter";
116116
}
117117

118118
@Override

spring-integration-r2dbc/src/test/java/org/springframework/integration/r2dbc/inbound/R2dbcMessageSourceTests.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,12 @@ public void setup() {
8989
.verifyComplete());
9090
}
9191

92+
@Test
93+
public void validateComponentType() {
94+
assertThat(this.defaultR2dbcMessageSource.getComponentType())
95+
.isEqualTo("r2dbc:inbound-channel-adapter");
96+
}
97+
9298
@Test
9399
public void validateSuccessfulQueryWithoutSettingExpectedElement() {
94100
this.entityTemplate.insert(new Person("Bob", 35))

spring-integration-r2dbc/src/test/java/org/springframework/integration/r2dbc/outbound/R2dbcMessageHandlerTests.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
package org.springframework.integration.r2dbc.outbound;
1818

19+
import static org.assertj.core.api.Assertions.assertThat;
1920

2021
import java.time.Duration;
2122
import java.util.Arrays;
@@ -115,6 +116,11 @@ public void validateMessageHandlingWithInsertQueryCollection() {
115116

116117
}
117118

119+
@Test
120+
public void validateComponentType() {
121+
assertThat(this.r2dbcMessageHandler.getComponentType()).isEqualTo("r2dbc:outbound-channel-adapter");
122+
}
123+
118124
@Test
119125
public void validateMessageHandlingWithDefaultUpdateCollection() {
120126
Message<Person> message = MessageBuilder.withPayload(createPerson("Bob", 35)).build();

spring-integration-redis/src/main/java/org/springframework/integration/redis/inbound/ReactiveRedisStreamMessageProducer.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import org.springframework.data.redis.serializer.RedisSerializationContext;
2929
import org.springframework.data.redis.stream.StreamReceiver;
3030
import org.springframework.integration.IntegrationMessageHeaderAccessor;
31+
import org.springframework.integration.acks.SimpleAcknowledgment;
3132
import org.springframework.integration.endpoint.MessageProducerSupport;
3233
import org.springframework.integration.redis.support.RedisHeaders;
3334
import org.springframework.integration.support.AbstractIntegrationMessageBuilder;
@@ -48,6 +49,7 @@
4849
*
4950
* @author Attoumane Ahamadi
5051
* @author Artem Bilan
52+
* @author Rohan Mukesh
5153
*
5254
* @since 5.4
5355
*/
@@ -217,10 +219,12 @@ protected void doStart() {
217219
.setHeader(RedisHeaders.STREAM_MESSAGE_ID, event.getId())
218220
.setHeader(RedisHeaders.CONSUMER_GROUP, this.consumerGroup)
219221
.setHeader(RedisHeaders.CONSUMER, this.consumerName);
220-
if (!this.autoAck) {
222+
if (!this.autoAck && this.consumerGroup != null) {
221223
builder.setHeader(IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACK,
222-
this.reactiveStreamOperations.acknowledge(this.consumerGroup, event)
223-
.subscribe());
224+
(SimpleAcknowledgment) () ->
225+
this.reactiveStreamOperations
226+
.acknowledge(this.consumerGroup, event)
227+
.subscribe());
224228
}
225229
return builder.build();
226230
});

spring-integration-redis/src/test/java/org/springframework/integration/redis/inbound/ReactiveRedisStreamMessageProducerTests.java

Lines changed: 87 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import static org.assertj.core.api.Assertions.assertThat;
2020

2121
import java.time.Duration;
22+
import java.util.concurrent.atomic.AtomicReference;
2223

2324
import org.junit.After;
2425
import org.junit.Before;
@@ -28,10 +29,15 @@
2829
import org.springframework.beans.factory.annotation.Autowired;
2930
import org.springframework.context.annotation.Bean;
3031
import org.springframework.context.annotation.Configuration;
32+
import org.springframework.data.redis.connection.stream.PendingMessagesSummary;
3133
import org.springframework.data.redis.connection.stream.ReadOffset;
34+
import org.springframework.data.redis.connection.stream.StreamInfo;
3235
import org.springframework.data.redis.core.ReactiveRedisTemplate;
3336
import org.springframework.data.redis.serializer.RedisSerializationContext;
3437
import org.springframework.data.redis.stream.StreamReceiver;
38+
import org.springframework.integration.IntegrationMessageHeaderAccessor;
39+
import org.springframework.integration.StaticMessageHeaderAccessor;
40+
import org.springframework.integration.acks.SimpleAcknowledgment;
3541
import org.springframework.integration.channel.FluxMessageChannel;
3642
import org.springframework.integration.handler.ReactiveMessageHandlerAdapter;
3743
import org.springframework.integration.redis.outbound.ReactiveRedisStreamMessageHandler;
@@ -46,11 +52,13 @@
4652
import org.springframework.test.context.junit4.SpringRunner;
4753

4854
import reactor.core.publisher.Flux;
55+
import reactor.core.publisher.Mono;
4956
import reactor.test.StepVerifier;
5057

5158
/**
5259
* @author Attoumane Ahamadi
5360
* @author Artem Bilan
61+
* @author Rohan Mukesh
5462
*
5563
* @since 5.4
5664
*/
@@ -76,6 +84,16 @@ public class ReactiveRedisStreamMessageProducerTests extends RedisAvailableTests
7684

7785
@Before
7886
public void delKey() {
87+
this.template.hasKey(STREAM_KEY)
88+
.filter(Boolean::booleanValue)
89+
.flatMapMany(b ->
90+
this.template.opsForStream()
91+
.groups(STREAM_KEY)
92+
.map(StreamInfo.XInfoGroup::groupName)
93+
.flatMap(groupName ->
94+
this.template.opsForStream()
95+
.destroyGroup(STREAM_KEY, groupName)))
96+
.blockLast();
7997
this.template.delete(STREAM_KEY).block();
8098
}
8199

@@ -90,10 +108,11 @@ public void testConsumerGroupCreation() {
90108
this.redisStreamMessageProducer.setCreateConsumerGroup(true);
91109
this.redisStreamMessageProducer.setConsumerName(CONSUMER);
92110
this.redisStreamMessageProducer.afterPropertiesSet();
93-
this.redisStreamMessageProducer.start();
94111

95112
Flux.from(this.fluxMessageChannel).subscribe();
96113

114+
this.redisStreamMessageProducer.start();
115+
97116
this.template.opsForStream()
98117
.groups(STREAM_KEY)
99118
.next()
@@ -147,19 +166,81 @@ public void testReadingMessageAsConsumerInConsumerGroup() {
147166

148167
this.redisStreamMessageProducer.setCreateConsumerGroup(false);
149168
this.redisStreamMessageProducer.setConsumerName(CONSUMER);
169+
this.redisStreamMessageProducer.setReadOffset(ReadOffset.latest());
150170
this.redisStreamMessageProducer.afterPropertiesSet();
151171
this.redisStreamMessageProducer.start();
152172

173+
StepVerifier stepVerifier =
174+
Flux.from(this.fluxMessageChannel)
175+
.as(StepVerifier::create)
176+
.assertNext(message -> {
177+
assertThat(message.getPayload()).isEqualTo(person);
178+
assertThat(message.getHeaders()).containsKeys(RedisHeaders.CONSUMER_GROUP, RedisHeaders.CONSUMER);
179+
})
180+
.thenCancel()
181+
.verifyLater();
182+
153183
this.messageHandler.handleMessage(new GenericMessage<>(person));
154184

155-
Flux.from(this.fluxMessageChannel)
185+
stepVerifier.verify(Duration.ofSeconds(10));
186+
}
187+
188+
@Test
189+
@RedisAvailable
190+
public void testReadingPendingMessageWithNoAutoACK() {
191+
Address address = new Address("Winterfell, Westeros");
192+
Person person = new Person(address, "John Snow");
193+
194+
this.template.opsForStream()
195+
.createGroup(STREAM_KEY, this.redisStreamMessageProducer.getBeanName())
156196
.as(StepVerifier::create)
157-
.assertNext(message -> {
158-
assertThat(message.getPayload()).isEqualTo(person);
159-
assertThat(message.getHeaders()).containsKeys(RedisHeaders.CONSUMER_GROUP, RedisHeaders.CONSUMER);
160-
})
197+
.assertNext(message -> assertThat(message).isEqualTo("OK"))
161198
.thenCancel()
162199
.verify(Duration.ofSeconds(10));
200+
201+
this.redisStreamMessageProducer.setCreateConsumerGroup(false);
202+
this.redisStreamMessageProducer.setAutoAck(false);
203+
this.redisStreamMessageProducer.setConsumerName(CONSUMER);
204+
this.redisStreamMessageProducer.setReadOffset(ReadOffset.latest());
205+
this.redisStreamMessageProducer.afterPropertiesSet();
206+
this.redisStreamMessageProducer.start();
207+
208+
AtomicReference<SimpleAcknowledgment> acknowledgmentReference = new AtomicReference<>();
209+
210+
StepVerifier stepVerifier =
211+
Flux.from(this.fluxMessageChannel)
212+
.as(StepVerifier::create)
213+
.assertNext(message -> {
214+
assertThat(message.getPayload()).isEqualTo(person);
215+
assertThat(message.getHeaders())
216+
.containsKeys(IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACK);
217+
acknowledgmentReference.set(StaticMessageHeaderAccessor.getAcknowledgment(message));
218+
})
219+
.thenCancel()
220+
.verifyLater();
221+
222+
this.messageHandler.handleMessage(new GenericMessage<>(person));
223+
224+
stepVerifier.verify(Duration.ofSeconds(10));
225+
226+
Mono<PendingMessagesSummary> pending =
227+
template.opsForStream()
228+
.pending(STREAM_KEY, this.redisStreamMessageProducer.getBeanName());
229+
230+
StepVerifier.create(pending)
231+
.assertNext(pendingMessagesSummary ->
232+
assertThat(pendingMessagesSummary.getTotalPendingMessages()).isEqualTo(1))
233+
.verifyComplete();
234+
235+
acknowledgmentReference.get().acknowledge();
236+
237+
Mono<PendingMessagesSummary> pendingZeroMessage = template.opsForStream().pending(STREAM_KEY,
238+
this.redisStreamMessageProducer.getBeanName());
239+
240+
StepVerifier.create(pendingZeroMessage)
241+
.assertNext(pendingMessagesSummary ->
242+
assertThat(pendingMessagesSummary.getTotalPendingMessages()).isEqualTo(0))
243+
.verifyComplete();
163244
}
164245

165246
@Configuration

0 commit comments

Comments
 (0)