@@ -493,10 +493,10 @@ public void assertUncompleted(String errorMsg) {
493
493
env .flop (errorMsg );
494
494
}
495
495
496
-
497
496
public void expectCompletion (long timeoutMillis , String errorMsg ) throws InterruptedException {
498
497
if (!isCompleted ()) {
499
498
T val = abq .poll (timeoutMillis , TimeUnit .MILLISECONDS );
499
+
500
500
if (val == null ) {
501
501
env .flop (String .format ("%s within %d ms" , errorMsg , timeoutMillis ));
502
502
} else {
@@ -528,24 +528,25 @@ public void complete() {
528
528
public T next (long timeoutMillis , String errorMsg ) throws InterruptedException {
529
529
Optional <T > value = abq .poll (timeoutMillis , TimeUnit .MILLISECONDS );
530
530
531
- if (value .isEmpty ()) {
532
- env .flop ("Expected element but got end-of-stream" );
533
- } else if (value .get () == null ) {
531
+ if (value == null ) {
534
532
env .flop (String .format ("%s within %d ms" , errorMsg , timeoutMillis ));
535
- } else {
533
+ } else if ( value . isDefined ()) {
536
534
return value .get ();
535
+ } else {
536
+ env .flop ("Expected element but got end-of-stream" );
537
537
}
538
538
539
539
return null ; // keep compiler happy
540
540
}
541
541
542
542
public Optional <T > nextOrEndOfStream (long timeoutMillis , String errorMsg ) throws InterruptedException {
543
543
Optional <T > value = abq .poll (timeoutMillis , TimeUnit .MILLISECONDS );
544
- if (value .isDefined ()) {
545
- return value ;
546
- } else {
544
+
545
+ if (value == null ) {
547
546
env .flop (String .format ("%s within %d ms" , errorMsg , timeoutMillis ));
548
547
return null ; // keep compiler happy
548
+ } else {
549
+ return value ;
549
550
}
550
551
}
551
552
@@ -564,13 +565,11 @@ public List<T> nextN(int elements, long timeoutMillis, String errorMsg) throws I
564
565
public void expectCompletion (long timeoutMillis , String errorMsg ) throws InterruptedException {
565
566
Optional <T > value = abq .poll (timeoutMillis , TimeUnit .MILLISECONDS );
566
567
567
- if (value .isEmpty ()) {
568
- // ok
569
- } else if (value .get () == null ) {
568
+ if (value == null ) {
570
569
env .flop (String .format ("%s within %d ms" , errorMsg , timeoutMillis ));
571
- } else {
570
+ } else if ( value . isDefined ()) {
572
571
env .flop ("Expected end-of-stream but got " + value .get ());
573
- }
572
+ } // else, ok
574
573
}
575
574
576
575
void expectNone (long withinMillis , String errorMsgPrefix ) throws InterruptedException {
0 commit comments