40
40
import java .io .IOException ;
41
41
import java .util .Map ;
42
42
import java .util .concurrent .Phaser ;
43
+ import java .util .concurrent .atomic .AtomicInteger ;
43
44
import javax .annotation .concurrent .GuardedBy ;
44
45
import org .json .JSONArray ;
45
46
import org .json .JSONObject ;
@@ -123,6 +124,7 @@ private static class AppendContext {
123
124
private static class DataWriter {
124
125
125
126
private static final int MAX_RETRY_COUNT = 3 ;
127
+ private static final int MAX_RECREATE_COUNT = 3 ;
126
128
private static final ImmutableList <Code > RETRIABLE_ERROR_CODES =
127
129
ImmutableList .of (
128
130
Code .INTERNAL ,
@@ -140,6 +142,8 @@ private static class DataWriter {
140
142
@ GuardedBy ("lock" )
141
143
private RuntimeException error = null ;
142
144
145
+ private AtomicInteger recreateCount = new AtomicInteger (0 );
146
+
143
147
public void initialize (TableName parentTable )
144
148
throws DescriptorValidationException , IOException , InterruptedException {
145
149
// Use the JSON stream writer to send records in JSON format. Specify the table name to write
@@ -151,8 +155,17 @@ public void initialize(TableName parentTable)
151
155
}
152
156
153
157
public void append (AppendContext appendContext )
154
- throws DescriptorValidationException , IOException {
158
+ throws DescriptorValidationException , IOException , InterruptedException {
155
159
synchronized (this .lock ) {
160
+ if (!streamWriter .isUserClosed ()
161
+ && streamWriter .isClosed ()
162
+ && recreateCount .getAndIncrement () < MAX_RECREATE_COUNT ) {
163
+ streamWriter =
164
+ JsonStreamWriter .newBuilder (
165
+ streamWriter .getStreamName (), BigQueryWriteClient .create ())
166
+ .build ();
167
+ this .error = null ;
168
+ }
156
169
// If earlier appends have failed, we need to reset before continuing.
157
170
if (this .error != null ) {
158
171
throw this .error ;
@@ -194,6 +207,7 @@ public AppendCompleteCallback(DataWriter parent, AppendContext appendContext) {
194
207
195
208
public void onSuccess (AppendRowsResponse response ) {
196
209
System .out .format ("Append success\n " );
210
+ this .parent .recreateCount .set (0 );
197
211
done ();
198
212
}
199
213
@@ -241,6 +255,8 @@ public void onFailure(Throwable throwable) {
241
255
throw new RuntimeException (e );
242
256
} catch (IOException e ) {
243
257
throw new RuntimeException (e );
258
+ } catch (InterruptedException e ) {
259
+ throw new RuntimeException (e );
244
260
}
245
261
}
246
262
// Mark the existing attempt as done since we got a response for it
0 commit comments