32
32
import com .google .cloud .bigquery .storage .v1 .BigQueryWriteSettings ;
33
33
import com .google .cloud .bigquery .storage .v1 .Exceptions ;
34
34
import com .google .cloud .bigquery .storage .v1 .Exceptions .AppendSerializationError ;
35
+ import com .google .cloud .bigquery .storage .v1 .Exceptions .MaximumRequestCallbackWaitTimeExceededException ;
35
36
import com .google .cloud .bigquery .storage .v1 .Exceptions .StorageException ;
37
+ import com .google .cloud .bigquery .storage .v1 .Exceptions .StreamWriterClosedException ;
36
38
import com .google .cloud .bigquery .storage .v1 .JsonStreamWriter ;
37
39
import com .google .cloud .bigquery .storage .v1 .TableName ;
38
40
import com .google .common .util .concurrent .MoreExecutors ;
@@ -150,7 +152,7 @@ private static class DataWriter {
150
152
151
153
private AtomicInteger recreateCount = new AtomicInteger (0 );
152
154
153
- public void initialize ( TableName parentTable )
155
+ private JsonStreamWriter createStreamWriter ( String tableName )
154
156
throws DescriptorValidationException , IOException , InterruptedException {
155
157
// Configure in-stream automatic retry settings.
156
158
// Error codes that are immediately retried:
@@ -165,32 +167,35 @@ public void initialize(TableName parentTable)
165
167
.setMaxRetryDelay (Duration .ofMinutes (1 ))
166
168
.build ();
167
169
168
- // Initialize client without settings, internally within stream writer a new client will be
169
- // created with full settings.
170
- client = BigQueryWriteClient .create ();
171
-
172
170
// Use the JSON stream writer to send records in JSON format. Specify the table name to write
173
171
// to the default stream.
174
172
// For more information about JsonStreamWriter, see:
175
173
// https://googleapis.dev/java/google-cloud-bigquerystorage/latest/com/google/cloud/bigquery/storage/v1/JsonStreamWriter.html
176
- streamWriter =
177
- JsonStreamWriter .newBuilder (parentTable .toString (), client )
178
- .setExecutorProvider (
179
- FixedExecutorProvider .create (Executors .newScheduledThreadPool (100 )))
180
- .setChannelProvider (
181
- BigQueryWriteSettings .defaultGrpcTransportProviderBuilder ()
182
- .setKeepAliveTime (org .threeten .bp .Duration .ofMinutes (1 ))
183
- .setKeepAliveTimeout (org .threeten .bp .Duration .ofMinutes (1 ))
184
- .setKeepAliveWithoutCalls (true )
185
- .setChannelsPerCpu (2 )
186
- .build ())
187
- .setEnableConnectionPool (true )
188
- // If value is missing in json and there is a default value configured on bigquery
189
- // column, apply the default value to the missing value field.
190
- .setDefaultMissingValueInterpretation (
191
- AppendRowsRequest .MissingValueInterpretation .DEFAULT_VALUE )
192
- .setRetrySettings (retrySettings )
193
- .build ();
174
+ return JsonStreamWriter .newBuilder (tableName , client )
175
+ .setExecutorProvider (FixedExecutorProvider .create (Executors .newScheduledThreadPool (100 )))
176
+ .setChannelProvider (
177
+ BigQueryWriteSettings .defaultGrpcTransportProviderBuilder ()
178
+ .setKeepAliveTime (org .threeten .bp .Duration .ofMinutes (1 ))
179
+ .setKeepAliveTimeout (org .threeten .bp .Duration .ofMinutes (1 ))
180
+ .setKeepAliveWithoutCalls (true )
181
+ .setChannelsPerCpu (2 )
182
+ .build ())
183
+ .setEnableConnectionPool (true )
184
+ // If value is missing in json and there is a default value configured on bigquery
185
+ // column, apply the default value to the missing value field.
186
+ .setDefaultMissingValueInterpretation (
187
+ AppendRowsRequest .MissingValueInterpretation .DEFAULT_VALUE )
188
+ .setRetrySettings (retrySettings )
189
+ .build ();
190
+ }
191
+
192
+ public void initialize (TableName parentTable )
193
+ throws DescriptorValidationException , IOException , InterruptedException {
194
+ // Initialize client without settings, internally within stream writer a new client will be
195
+ // created with full settings.
196
+ client = BigQueryWriteClient .create ();
197
+
198
+ streamWriter = createStreamWriter (parentTable .toString ());
194
199
}
195
200
196
201
public void append (AppendContext appendContext )
@@ -199,7 +204,7 @@ public void append(AppendContext appendContext)
199
204
if (!streamWriter .isUserClosed ()
200
205
&& streamWriter .isClosed ()
201
206
&& recreateCount .getAndIncrement () < MAX_RECREATE_COUNT ) {
202
- streamWriter = JsonStreamWriter . newBuilder (streamWriter .getStreamName (), client ). build ( );
207
+ streamWriter = createStreamWriter (streamWriter .getStreamName ());
203
208
this .error = null ;
204
209
}
205
210
// If earlier appends have failed, we need to reset before continuing.
@@ -282,6 +287,30 @@ public void onFailure(Throwable throwable) {
282
287
}
283
288
}
284
289
290
+ boolean resendRequest = false ;
291
+ if (throwable instanceof MaximumRequestCallbackWaitTimeExceededException ) {
292
+ resendRequest = true ;
293
+ } else if (throwable instanceof StreamWriterClosedException ) {
294
+ if (!parent .streamWriter .isUserClosed ()) {
295
+ resendRequest = true ;
296
+ }
297
+ }
298
+ if (resendRequest ) {
299
+ // Retry this request.
300
+ try {
301
+ this .parent .append (new AppendContext (appendContext .data ));
302
+ } catch (DescriptorValidationException e ) {
303
+ throw new RuntimeException (e );
304
+ } catch (IOException e ) {
305
+ throw new RuntimeException (e );
306
+ } catch (InterruptedException e ) {
307
+ throw new RuntimeException (e );
308
+ }
309
+ // Mark the existing attempt as done since we got a response for it
310
+ done ();
311
+ return ;
312
+ }
313
+
285
314
synchronized (this .parent .lock ) {
286
315
if (this .parent .error == null ) {
287
316
StorageException storageException = Exceptions .toStorageException (throwable );
0 commit comments