41
41
import java .util .concurrent .Executors ;
42
42
import java .util .concurrent .TimeUnit ;
43
43
import java .util .stream .Collectors ;
44
+ import java .util .stream .StreamSupport ;
44
45
import org .apache .beam .sdk .coders .Coder ;
45
46
import org .apache .beam .sdk .coders .KvCoder ;
46
47
import org .apache .beam .sdk .coders .StringUtf8Coder ;
@@ -451,8 +452,6 @@ long flush(
451
452
DynamicMessage .parseFrom (
452
453
Preconditions .checkStateNotNull (appendClientInfo ).descriptor ,
453
454
protoBytes ));
454
- new BigQueryStorageApiInsertError (
455
- failedRow , error .getRowIndexToErrorMessage ().get (failedIndex ));
456
455
failedRowsReceiver .output (
457
456
new BigQueryStorageApiInsertError (
458
457
failedRow , error .getRowIndexToErrorMessage ().get (failedIndex )));
@@ -488,7 +487,7 @@ long flush(
488
487
"Append to stream {} by client #{} failed with error, operations will be retried. Details: {}" ,
489
488
streamName ,
490
489
clientNumber ,
491
- retrieveErrorDetails (failedContext ));
490
+ retrieveErrorDetails (contexts ));
492
491
invalidateWriteStream ();
493
492
appendFailures .inc ();
494
493
return RetryType .RETRY_ALL_OPERATIONS ;
@@ -501,13 +500,13 @@ long flush(
501
500
return inserts .getSerializedRowsCount ();
502
501
}
503
502
504
- String retrieveErrorDetails (AppendRowsContext failedContext ) {
505
- return (failedContext .getError () != null )
506
- ? Arrays . stream (
507
- Preconditions . checkStateNotNull ( failedContext . getError ()). getStackTrace () )
508
- . map ( StackTraceElement :: toString )
509
- . collect ( Collectors . joining ( " \n " ) )
510
- : "no execption" ;
503
+ String retrieveErrorDetails (Iterable < AppendRowsContext > failedContext ) {
504
+ return StreamSupport . stream (failedContext .spliterator (), false )
505
+ .< @ Nullable Throwable > map ( AppendRowsContext :: getError )
506
+ . filter ( err -> err != null )
507
+ . flatMap ( thrw -> Arrays . stream ( Preconditions . checkStateNotNull ( thrw ). getStackTrace ()) )
508
+ . map ( StackTraceElement :: toString )
509
+ . collect ( Collectors . joining ( " \n " )) ;
511
510
}
512
511
}
513
512
0 commit comments