@@ -180,20 +180,23 @@ void serverEcho() {
180180 private void echo (MessageChannel inputChannel , FluxMessageChannel resultChannel ,
181181 RSocketRequester rsocketRequester ) {
182182
183+ StepVerifier verifier =
184+ StepVerifier .create (
185+ Flux .from (resultChannel )
186+ .map (Message ::getPayload )
187+ .cast (String .class ))
188+ .expectNext ("Hello" )
189+ .thenCancel ()
190+ .verifyLater ();
191+
183192 inputChannel .send (
184193 MessageBuilder .withPayload ("Hello" )
185194 .setHeader (ROUTE_HEADER , "echo" )
186195 .setHeader (COMMAND_HEADER , RSocketOutboundGateway .Command .requestResponse )
187196 .setHeader (RSocketRequesterMethodArgumentResolver .RSOCKET_REQUESTER_HEADER , rsocketRequester )
188197 .build ());
189198
190- StepVerifier .create (
191- Flux .from (resultChannel )
192- .map (Message ::getPayload )
193- .cast (String .class ))
194- .expectNext ("Hello" )
195- .thenCancel ()
196- .verify ();
199+ verifier .verify ();
197200 }
198201
199202 @ Test
@@ -209,20 +212,23 @@ void serverEchoAsync() {
209212 private void echoAsync (MessageChannel inputChannel , FluxMessageChannel resultChannel ,
210213 RSocketRequester rsocketRequester ) {
211214
215+ StepVerifier verifier =
216+ StepVerifier .create (
217+ Flux .from (resultChannel )
218+ .map (Message ::getPayload )
219+ .cast (String .class ))
220+ .expectNext ("Hello async" )
221+ .thenCancel ()
222+ .verifyLater ();
223+
212224 inputChannel .send (
213225 MessageBuilder .withPayload ("Hello" )
214226 .setHeader (ROUTE_HEADER , "echo-async" )
215227 .setHeader (COMMAND_HEADER , RSocketOutboundGateway .Command .requestResponse )
216228 .setHeader (RSocketRequesterMethodArgumentResolver .RSOCKET_REQUESTER_HEADER , rsocketRequester )
217229 .build ());
218230
219- StepVerifier .create (
220- Flux .from (resultChannel )
221- .map (Message ::getPayload )
222- .cast (String .class ))
223- .expectNext ("Hello async" )
224- .thenCancel ()
225- .verify ();
231+ verifier .verify ();
226232 }
227233
228234 @ Test
@@ -238,29 +244,25 @@ void serverEchoStream() {
238244 private void echoStream (MessageChannel inputChannel , FluxMessageChannel resultChannel ,
239245 RSocketRequester rsocketRequester ) {
240246
247+ @ SuppressWarnings ("unchecked" )
248+ StepVerifier verifier =
249+ StepVerifier .create (
250+ Flux .from (resultChannel )
251+ .next ()
252+ .map (Message ::getPayload )
253+ .flatMapMany ((payload ) -> (Flux <String >) payload ))
254+ .expectNext ("Hello 0" ).expectNextCount (6 ).expectNext ("Hello 7" )
255+ .thenCancel ()
256+ .verifyLater ();
257+
241258 inputChannel .send (
242259 MessageBuilder .withPayload ("Hello" )
243260 .setHeader (ROUTE_HEADER , "echo-stream" )
244261 .setHeader (COMMAND_HEADER , RSocketOutboundGateway .Command .requestStreamOrChannel )
245262 .setHeader (RSocketRequesterMethodArgumentResolver .RSOCKET_REQUESTER_HEADER , rsocketRequester )
246263 .build ());
247264
248- Message <?> resultMessage =
249- Flux .from (resultChannel )
250- .blockFirst ();
251-
252- assertThat (resultMessage )
253- .isNotNull ()
254- .extracting (Message ::getPayload )
255- .isInstanceOf (Flux .class );
256-
257- @ SuppressWarnings ("unchecked" )
258- Flux <String > resultStream = (Flux <String >) resultMessage .getPayload ();
259- StepVerifier .create (resultStream )
260- .expectNext ("Hello 0" ).expectNextCount (6 ).expectNext ("Hello 7" )
261- .thenCancel ()
262- .verify ();
263-
265+ verifier .verify ();
264266 }
265267
266268 @ Test
@@ -276,28 +278,25 @@ void serverEchoChannel() {
276278 private void echoChannel (MessageChannel inputChannel , FluxMessageChannel resultChannel ,
277279 RSocketRequester rsocketRequester ) {
278280
281+ @ SuppressWarnings ("unchecked" )
282+ StepVerifier verifier =
283+ StepVerifier .create (
284+ Flux .from (resultChannel )
285+ .next ()
286+ .map (Message ::getPayload )
287+ .flatMapMany ((payload ) -> (Flux <String >) payload ))
288+ .expectNext ("Hello 1 async" ).expectNextCount (8 ).expectNext ("Hello 10 async" )
289+ .thenCancel ()
290+ .verifyLater ();
291+
279292 inputChannel .send (
280293 MessageBuilder .withPayload (Flux .range (1 , 10 ).map (i -> "Hello " + i ))
281294 .setHeader (ROUTE_HEADER , "echo-channel" )
282295 .setHeader (COMMAND_HEADER , RSocketOutboundGateway .Command .requestStreamOrChannel )
283296 .setHeader (RSocketRequesterMethodArgumentResolver .RSOCKET_REQUESTER_HEADER , rsocketRequester )
284297 .build ());
285298
286- Message <?> resultMessage =
287- Flux .from (resultChannel )
288- .blockFirst ();
289-
290- assertThat (resultMessage )
291- .isNotNull ()
292- .extracting (Message ::getPayload )
293- .isInstanceOf (Flux .class );
294-
295- @ SuppressWarnings ("unchecked" )
296- Flux <String > resultStream = (Flux <String >) resultMessage .getPayload ();
297- StepVerifier .create (resultStream )
298- .expectNext ("Hello 1 async" ).expectNextCount (8 ).expectNext ("Hello 10 async" )
299- .thenCancel ()
300- .verify ();
299+ verifier .verify ();
301300 }
302301
303302
@@ -314,26 +313,21 @@ void serverVoidReturnValue() {
314313 private void voidReturnValue (MessageChannel inputChannel , FluxMessageChannel resultChannel ,
315314 RSocketRequester rsocketRequester ) {
316315
316+ StepVerifier verifier =
317+ StepVerifier .create (resultChannel )
318+ .expectSubscription ()
319+ .expectNoEvent (Duration .ofMillis (100 ))
320+ .thenCancel ()
321+ .verifyLater ();
322+
317323 inputChannel .send (
318324 MessageBuilder .withPayload ("Hello" )
319325 .setHeader (ROUTE_HEADER , "void-return-value" )
320- .setHeader (COMMAND_HEADER , RSocketOutboundGateway .Command .requestStreamOrChannel )
326+ .setHeader (COMMAND_HEADER , RSocketOutboundGateway .Command .requestResponse )
321327 .setHeader (RSocketRequesterMethodArgumentResolver .RSOCKET_REQUESTER_HEADER , rsocketRequester )
322328 .build ());
323329
324- Message <?> resultMessage =
325- Flux .from (resultChannel )
326- .blockFirst ();
327-
328- assertThat (resultMessage )
329- .isNotNull ()
330- .extracting (Message ::getPayload )
331- .isInstanceOf (Flux .class );
332-
333- Flux <?> resultStream = (Flux <?>) resultMessage .getPayload ();
334- StepVerifier .create (resultStream )
335- .expectComplete ()
336- .verify ();
330+ verifier .verify ();
337331 }
338332
339333 @ Test
@@ -349,26 +343,21 @@ void serverVoidReturnValueFromExceptionHandler() {
349343 private void voidReturnValueFromExceptionHandler (MessageChannel inputChannel , FluxMessageChannel resultChannel ,
350344 RSocketRequester rsocketRequester ) {
351345
346+ StepVerifier verifier =
347+ StepVerifier .create (resultChannel )
348+ .expectSubscription ()
349+ .expectNoEvent (Duration .ofMillis (100 ))
350+ .thenCancel ()
351+ .verifyLater ();
352+
352353 inputChannel .send (
353354 MessageBuilder .withPayload ("bad" )
354355 .setHeader (ROUTE_HEADER , "void-return-value" )
355- .setHeader (COMMAND_HEADER , RSocketOutboundGateway .Command .requestStreamOrChannel )
356+ .setHeader (COMMAND_HEADER , RSocketOutboundGateway .Command .requestResponse )
356357 .setHeader (RSocketRequesterMethodArgumentResolver .RSOCKET_REQUESTER_HEADER , rsocketRequester )
357358 .build ());
358359
359- Message <?> resultMessage =
360- Flux .from (resultChannel )
361- .blockFirst ();
362-
363- assertThat (resultMessage )
364- .isNotNull ()
365- .extracting (Message ::getPayload )
366- .isInstanceOf (Flux .class );
367-
368- Flux <?> resultStream = (Flux <?>) resultMessage .getPayload ();
369- StepVerifier .create (resultStream )
370- .expectComplete ()
371- .verify ();
360+ verifier .verify ();
372361 }
373362
374363 @ Test
@@ -384,20 +373,23 @@ void serverHandleWithThrownException() {
384373 private void handleWithThrownException (MessageChannel inputChannel , FluxMessageChannel resultChannel ,
385374 RSocketRequester rsocketRequester ) {
386375
376+ StepVerifier verifier =
377+ StepVerifier .create (
378+ Flux .from (resultChannel )
379+ .map (Message ::getPayload )
380+ .cast (String .class ))
381+ .expectNext ("Invalid input error handled" )
382+ .thenCancel ()
383+ .verifyLater ();
384+
387385 inputChannel .send (
388386 MessageBuilder .withPayload ("a" )
389387 .setHeader (ROUTE_HEADER , "thrown-exception" )
390388 .setHeader (COMMAND_HEADER , RSocketOutboundGateway .Command .requestResponse )
391389 .setHeader (RSocketRequesterMethodArgumentResolver .RSOCKET_REQUESTER_HEADER , rsocketRequester )
392390 .build ());
393391
394- StepVerifier .create (
395- Flux .from (resultChannel )
396- .map (Message ::getPayload )
397- .cast (String .class ))
398- .expectNext ("Invalid input error handled" )
399- .thenCancel ()
400- .verify ();
392+ verifier .verify ();
401393 }
402394
403395 @ Test
@@ -413,20 +405,23 @@ void serverHandleWithErrorSignal() {
413405 private void handleWithErrorSignal (MessageChannel inputChannel , FluxMessageChannel resultChannel ,
414406 RSocketRequester rsocketRequester ) {
415407
408+ StepVerifier verifier =
409+ StepVerifier .create (
410+ Flux .from (resultChannel )
411+ .map (Message ::getPayload )
412+ .cast (String .class ))
413+ .expectNext ("Invalid input error handled" )
414+ .thenCancel ()
415+ .verifyLater ();
416+
416417 inputChannel .send (
417418 MessageBuilder .withPayload ("a" )
418419 .setHeader (ROUTE_HEADER , "error-signal" )
419420 .setHeader (COMMAND_HEADER , RSocketOutboundGateway .Command .requestResponse )
420421 .setHeader (RSocketRequesterMethodArgumentResolver .RSOCKET_REQUESTER_HEADER , rsocketRequester )
421422 .build ());
422423
423- StepVerifier .create (
424- Flux .from (resultChannel )
425- .map (Message ::getPayload )
426- .cast (String .class ))
427- .expectNext ("Invalid input error handled" )
428- .thenCancel ()
429- .verify ();
424+ verifier .verify ();
430425 }
431426
432427 @ Test
0 commit comments