2525import java .time .Duration ;
2626import java .util .ArrayList ;
2727import java .util .List ;
28- import java .util .function .BiFunction ;
2928
3029import org .springframework .core .convert .converter .Converter ;
3130import org .springframework .data .redis .connection .RedisStreamCommands .XClaimOptions ;
32- import org .springframework .data .redis .connection .convert .ListConverter ;
3331import org .springframework .data .redis .connection .stream .ByteRecord ;
3432import org .springframework .data .redis .connection .stream .Consumer ;
3533import org .springframework .data .redis .connection .stream .PendingMessagesSummary ;
5351@ SuppressWarnings ({ "rawtypes" })
5452class StreamConverters {
5553
56- private static final Converter <List <StreamMessage <byte [], byte []>>, List <RecordId >> MESSAGEs_TO_IDs = new ListConverter <>(
57- messageToIdConverter ());
58-
59- private static final BiFunction <List <PendingMessage >, String , org .springframework .data .redis .connection .stream .PendingMessages > PENDING_MESSAGES_CONVERTER = (
60- source , groupName ) -> {
61-
62-
63- List <org .springframework .data .redis .connection .stream .PendingMessage > messages = source .stream ()
64- .map (it -> {
65-
66- RecordId id = RecordId .of (it .getId ());
67- Consumer consumer = Consumer .from (groupName , it .getConsumer ());
68-
69- return new org .springframework .data .redis .connection .stream .PendingMessage (id , consumer ,
70- Duration .ofMillis (it .getMsSinceLastDelivery ()), it .getRedeliveryCount ());
71-
72- }).toList ();
73-
74- return new org .springframework .data .redis .connection .stream .PendingMessages (groupName , messages );
75-
76- };
77-
78- private static final BiFunction <PendingMessages , String , PendingMessagesSummary > PENDING_MESSAGES_SUMMARY_CONVERTER = (
79- source , groupName ) -> {
80-
81- org .springframework .data .domain .Range <String > range = source .getMessageIds ().isUnbounded ()
82- ? org .springframework .data .domain .Range .unbounded ()
83- : org .springframework .data .domain .Range .open (source .getMessageIds ().getLower ().getValue (),
84- source .getMessageIds ().getUpper ().getValue ());
85-
86- return new PendingMessagesSummary (groupName , source .getCount (), range , source .getConsumerMessageCount ());
87- };
88-
8954 /**
9055 * Convert {@link StreamReadOptions} to Lettuce's {@link XReadArgs}.
9156 *
@@ -126,7 +91,18 @@ static Converter<StreamMessage<byte[], byte[]>, RecordId> messageToIdConverter()
12691 */
12792 static org .springframework .data .redis .connection .stream .PendingMessages toPendingMessages (String groupName ,
12893 org .springframework .data .domain .Range <?> range , List <PendingMessage > source ) {
129- return PENDING_MESSAGES_CONVERTER .apply (source , groupName ).withinRange (range );
94+
95+ List <org .springframework .data .redis .connection .stream .PendingMessage > messages = source .stream ().map (it -> {
96+
97+ RecordId id = RecordId .of (it .getId ());
98+ Consumer consumer = Consumer .from (groupName , it .getConsumer ());
99+
100+ return new org .springframework .data .redis .connection .stream .PendingMessage (id , consumer ,
101+ Duration .ofMillis (it .getMsSinceLastDelivery ()), it .getRedeliveryCount ());
102+
103+ }).toList ();
104+
105+ return new org .springframework .data .redis .connection .stream .PendingMessages (groupName , messages ).withinRange (range );
130106 }
131107
132108 /**
@@ -138,7 +114,13 @@ static org.springframework.data.redis.connection.stream.PendingMessages toPendin
138114 * @since 2.3
139115 */
140116 static PendingMessagesSummary toPendingMessagesInfo (String groupName , PendingMessages source ) {
141- return PENDING_MESSAGES_SUMMARY_CONVERTER .apply (source , groupName );
117+
118+ org .springframework .data .domain .Range <String > range = source .getMessageIds ().isUnbounded ()
119+ ? org .springframework .data .domain .Range .unbounded ()
120+ : org .springframework .data .domain .Range .open (source .getMessageIds ().getLower ().getValue (),
121+ source .getMessageIds ().getUpper ().getValue ());
122+
123+ return new PendingMessagesSummary (groupName , source .getCount (), range , source .getConsumerMessageCount ());
142124 }
143125
144126 /**
0 commit comments