@@ -493,10 +493,10 @@ public void assertUncompleted(String errorMsg) {
493
493
env .flop (errorMsg );
494
494
}
495
495
496
-
497
496
public void expectCompletion (long timeoutMillis , String errorMsg ) throws InterruptedException {
498
497
if (!isCompleted ()) {
499
498
T val = abq .poll (timeoutMillis , TimeUnit .MILLISECONDS );
499
+
500
500
if (val == null ) {
501
501
env .flop (String .format ("%s within %d ms" , errorMsg , timeoutMillis ));
502
502
} else {
@@ -528,25 +528,25 @@ public void complete() {
528
528
public T next (long timeoutMillis , String errorMsg ) throws InterruptedException {
529
529
Optional <T > value = abq .poll (timeoutMillis , TimeUnit .MILLISECONDS );
530
530
531
- if (value .isEmpty ()) {
532
- env .flop ("Expected element but got end-of-stream" );
533
- } else if (value .get () == null ) {
531
+ if (value == null ) {
534
532
env .flop (String .format ("%s within %d ms" , errorMsg , timeoutMillis ));
535
- } else {
533
+ } else if ( value . isDefined ()) {
536
534
return value .get ();
535
+ } else {
536
+ env .flop ("Expected element but got end-of-stream" );
537
537
}
538
538
539
539
return null ; // keep compiler happy
540
540
}
541
541
542
542
public Optional <T > nextOrEndOfStream (long timeoutMillis , String errorMsg ) throws InterruptedException {
543
543
Optional <T > value = abq .poll (timeoutMillis , TimeUnit .MILLISECONDS );
544
- if (value .isDefined ()) {
545
- return value ;
546
- } else {
544
+
545
+ if (value == null ) {
547
546
env .flop (String .format ("%s within %d ms" , errorMsg , timeoutMillis ));
548
- return null ; // keep compiler happy
549
547
}
548
+
549
+ return value ;
550
550
}
551
551
552
552
public List <T > nextN (int elements , long timeoutMillis , String errorMsg ) throws InterruptedException {
@@ -564,13 +564,11 @@ public List<T> nextN(int elements, long timeoutMillis, String errorMsg) throws I
564
564
public void expectCompletion (long timeoutMillis , String errorMsg ) throws InterruptedException {
565
565
Optional <T > value = abq .poll (timeoutMillis , TimeUnit .MILLISECONDS );
566
566
567
- if (value .isEmpty ()) {
568
- // ok
569
- } else if (value .get () == null ) {
567
+ if (value == null ) {
570
568
env .flop (String .format ("%s within %d ms" , errorMsg , timeoutMillis ));
571
- } else {
569
+ } else if ( value . isDefined ()) {
572
570
env .flop ("Expected end-of-stream but got " + value .get ());
573
- }
571
+ } // else, ok
574
572
}
575
573
576
574
void expectNone (long withinMillis , String errorMsgPrefix ) throws InterruptedException {
0 commit comments