|
25 | 25 | import reactor.core.publisher.Mono; |
26 | 26 | import reactor.core.scheduler.Schedulers; |
27 | 27 | import reactor.util.context.Context; |
| 28 | +import reactor.util.context.ContextView; |
28 | 29 | import reactor.util.function.Tuples; |
| 30 | +import reactor.util.retry.Retry; |
29 | 31 |
|
30 | 32 | import java.time.Duration; |
31 | | -import java.util.ArrayList; |
| 33 | +import java.util.LinkedList; |
32 | 34 | import java.util.List; |
33 | 35 | import java.util.concurrent.CompletableFuture; |
34 | 36 | import java.util.concurrent.CompletionStage; |
35 | 37 | import java.util.concurrent.ThreadLocalRandom; |
36 | 38 | import java.util.concurrent.TimeUnit; |
37 | | -import java.util.function.Function; |
38 | 39 | import java.util.function.Supplier; |
39 | 40 |
|
40 | 41 | import org.neo4j.driver.Logger; |
@@ -145,7 +146,7 @@ public <T> CompletionStage<T> retryAsync( Supplier<CompletionStage<T>> work ) |
145 | 146 | @Override |
146 | 147 | public <T> Publisher<T> retryRx( Publisher<T> work ) |
147 | 148 | { |
148 | | - return Flux.from( work ).retryWhen( retryRxCondition() ); |
| 149 | + return Flux.from( work ).retryWhen( exponentialBackoffRetryRx() ); |
149 | 150 | } |
150 | 151 |
|
151 | 152 | protected boolean canRetryOn( Throwable error ) |
@@ -177,48 +178,52 @@ private static Throwable extractPossibleTerminationCause( Throwable error ) |
177 | 178 | return error; |
178 | 179 | } |
179 | 180 |
|
180 | | - private Function<Flux<Throwable>,Publisher<Context>> retryRxCondition() |
| 181 | + private Retry exponentialBackoffRetryRx() |
181 | 182 | { |
182 | | - return errorCurrentAttempt -> errorCurrentAttempt.flatMap( e -> Mono.subscriberContext().map( ctx -> Tuples.of( e, ctx ) ) ).flatMap( t2 -> |
183 | | - { |
184 | | - |
185 | | - Throwable throwable = t2.getT1(); |
186 | | - Throwable error = extractPossibleTerminationCause( throwable ); |
187 | | - |
188 | | - Context ctx = t2.getT2(); |
189 | | - |
190 | | - List<Throwable> errors = ctx.getOrDefault( "errors", null ); |
191 | | - |
192 | | - long startTime = ctx.getOrDefault( "startTime", -1L ); |
193 | | - long nextDelayMs = ctx.getOrDefault( "nextDelayMs", initialRetryDelayMs ); |
194 | | - |
195 | | - if ( canRetryOn( error ) ) |
196 | | - { |
197 | | - long currentTime = clock.millis(); |
198 | | - if ( startTime == -1 ) |
| 183 | + return Retry.from( retrySignals -> retrySignals.flatMap( retrySignal -> Mono.deferContextual( |
| 184 | + contextView -> Mono.just( Tuples.of( retrySignal, contextView ) ) ) ).flatMap( |
| 185 | + tuple -> |
199 | 186 | { |
200 | | - startTime = currentTime; |
201 | | - } |
| 187 | + Throwable throwable = tuple.getT1().failure(); |
| 188 | + Throwable error = extractPossibleTerminationCause( throwable ); |
202 | 189 |
|
203 | | - long elapsedTime = currentTime - startTime; |
204 | | - if ( elapsedTime < maxRetryTimeMs ) |
205 | | - { |
206 | | - long delayWithJitterMs = computeDelayWithJitter( nextDelayMs ); |
207 | | - log.warn( "Reactive transaction failed and is scheduled to retry in " + delayWithJitterMs + "ms", error ); |
| 190 | + ContextView contextView = tuple.getT2(); |
| 191 | + List<Throwable> errors = contextView.getOrDefault( "errors", null ); |
208 | 192 |
|
209 | | - nextDelayMs = (long) (nextDelayMs * multiplier); |
210 | | - errors = recordError( error, errors ); |
| 193 | + long startTime = contextView.getOrDefault( "startTime", -1L ); |
| 194 | + long nextDelayMs = contextView.getOrDefault( "nextDelayMs", initialRetryDelayMs ); |
211 | 195 |
|
212 | | - // retry on netty event loop thread |
213 | | - EventExecutor eventExecutor = eventExecutorGroup.next(); |
214 | | - return Mono.just( ctx.put( "errors", errors ).put( "startTime", startTime ).put( "nextDelayMs", nextDelayMs ) ).delayElement( |
215 | | - Duration.ofMillis( delayWithJitterMs ), Schedulers.fromExecutorService( eventExecutor ) ); |
216 | | - } |
217 | | - } |
218 | | - addSuppressed( throwable, errors ); |
| 196 | + if ( canRetryOn( error ) ) |
| 197 | + { |
| 198 | + long currentTime = clock.millis(); |
| 199 | + if ( startTime == -1 ) |
| 200 | + { |
| 201 | + startTime = currentTime; |
| 202 | + } |
| 203 | + |
| 204 | + long elapsedTime = currentTime - startTime; |
| 205 | + if ( elapsedTime < maxRetryTimeMs ) |
| 206 | + { |
| 207 | + long delayWithJitterMs = computeDelayWithJitter( nextDelayMs ); |
| 208 | + log.warn( "Reactive transaction failed and is scheduled to retry in " + delayWithJitterMs + "ms", error ); |
| 209 | + |
| 210 | + nextDelayMs = (long) (nextDelayMs * multiplier); |
| 211 | + errors = recordError( error, errors ); |
| 212 | + |
| 213 | + // retry on netty event loop thread |
| 214 | + EventExecutor eventExecutor = eventExecutorGroup.next(); |
| 215 | + Context context = Context.of( |
| 216 | + "errors", errors, |
| 217 | + "startTime", startTime, |
| 218 | + "nextDelayMs", nextDelayMs |
| 219 | + ); |
| 220 | + return Mono.just( context ).delayElement( Duration.ofMillis( delayWithJitterMs ), Schedulers.fromExecutorService( eventExecutor ) ); |
| 221 | + } |
| 222 | + } |
| 223 | + addSuppressed( throwable, errors ); |
219 | 224 |
|
220 | | - return Mono.error( throwable ); |
221 | | - } ); |
| 225 | + return Mono.error( throwable ); |
| 226 | + } ) ); |
222 | 227 | } |
223 | 228 |
|
224 | 229 | private <T> void executeWorkInEventLoop( CompletableFuture<T> resultFuture, Supplier<CompletionStage<T>> work ) |
@@ -373,7 +378,7 @@ private static List<Throwable> recordError( Throwable error, List<Throwable> err |
373 | 378 | { |
374 | 379 | if ( errors == null ) |
375 | 380 | { |
376 | | - errors = new ArrayList<>(); |
| 381 | + errors = new LinkedList<>(); |
377 | 382 | } |
378 | 383 | errors.add( error ); |
379 | 384 | return errors; |
|
0 commit comments