1616
1717package rx .operators ;
1818
19+ import java .util .Random ;
1920import static org .junit .Assert .assertEquals ;
2021import static org .junit .Assert .assertTrue ;
2122import static org .junit .Assert .fail ;
@@ -239,12 +240,16 @@ public void testConcurrencyAndSerialization() throws InterruptedException {
239240 @ Override
240241 public Observable <String > call (final GroupedObservable <Boolean , GroupedObservable <String , Integer >> outerGroup ) {
241242 return outerGroup .flatMap (new Func1 <GroupedObservable <String , Integer >, Observable <String >>() {
242-
243243 @ Override
244244 public Observable <String > call (final GroupedObservable <String , Integer > innerGroup ) {
245245 final AtomicInteger threadsPerGroup = new AtomicInteger ();
246246 return innerGroup .take (100 ).map (new Func1 <Integer , String >() {
247-
247+ final ThreadLocal <Random > tlr = new ThreadLocal <Random >() {
248+ @ Override
249+ protected Random initialValue () {
250+ return new Random ();
251+ }
252+ };
248253 @ Override
249254 public String call (Integer i ) {
250255 int outerThreadCount = outerThreads .incrementAndGet ();
@@ -256,7 +261,11 @@ public String call(Integer i) {
256261 throw new RuntimeException ("more than 1 thread for this group [" + innerGroup .getKey () + "]: " + innerThreadCount + " (before)" );
257262 }
258263 try {
264+ // give the other threads a shot.
265+ Thread .sleep (tlr .get ().nextInt (10 ) + 1 );
259266 return (outerGroup .getKey () ? "Even" : "Odd " ) + " => from source: " + innerGroup .getKey () + " Value: " + i ;
267+ } catch (InterruptedException ex ) {
268+ throw new RuntimeException ("Interrupted [" + innerGroup .getKey () + "]: " + i );
260269 } finally {
261270 int outerThreadCountAfter = outerThreads .decrementAndGet ();
262271 setMaxConcurrency (maxOuterConcurrency , outerThreadCountAfter );
0 commit comments