Skip to content

Commit dfa8e22

Browse files
authored
feat(bigquery/storage/managedwriter): support default value controls (#8686)
* feat(bigquery/storage/managedwriter): support default value controls In terms of public surface, this PR adds new options to control how missing values are interpreted when writing. For ManagedStream instantiation, the options are: * WithDefaultMissingValueInterpretation (blanket setting for all columns) * WithMissingValueInterpretations (per-column settings) To support updates, these are added as AppendOptions: * UpdateDefaultMissingValueInterpretation * UpdateMissingValueInterpretations Implementation-wise, this PR rips out the previous schema-specific versioner and expands the concept to a versioned AppendRowsRequest template. This more general mechanism allows us to version all settings that manifest as request fields in the AppendRowsRequest.
1 parent 3e5ba24 commit dfa8e22

14 files changed

+792
-207
lines changed

bigquery/storage/managedwriter/appendresult.go

Lines changed: 11 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -166,7 +166,7 @@ type pendingWrite struct {
166166
// likely outcome when processing requests and it allows us to be efficient on send.
167167
// We retain the additional information to build the complete request in the related fields.
168168
req *storagepb.AppendRowsRequest
169-
descVersion *descriptorVersion // schema at time of creation
169+
reqTmpl *versionedTemplate // request template at time of creation
170170
traceID string
171171
writeStreamID string
172172

@@ -188,21 +188,21 @@ type pendingWrite struct {
188188
// to the pending results for later consumption. The provided context is
189189
// embedded in the pending write, as the write may be retried and we want
190190
// to respect the original context for expiry/cancellation etc.
191-
func newPendingWrite(ctx context.Context, src *ManagedStream, req *storagepb.AppendRowsRequest, curDescVersion *descriptorVersion, writeStreamID, traceID string) *pendingWrite {
191+
func newPendingWrite(ctx context.Context, src *ManagedStream, req *storagepb.AppendRowsRequest, reqTmpl *versionedTemplate, writeStreamID, traceID string) *pendingWrite {
192192
pw := &pendingWrite{
193193
writer: src,
194194
result: newAppendResult(),
195195
reqCtx: ctx,
196196

197-
req: req,
198-
descVersion: curDescVersion,
197+
req: req, // minimal req, typically just row data
198+
reqTmpl: reqTmpl, // remainder of templated request
199199
writeStreamID: writeStreamID,
200200
traceID: traceID,
201201
}
202202
// Compute the approx size for flow control purposes.
203203
pw.reqSize = proto.Size(pw.req) + len(writeStreamID) + len(traceID)
204-
if pw.descVersion != nil {
205-
pw.reqSize += proto.Size(pw.descVersion.descriptorProto)
204+
if pw.reqTmpl != nil {
205+
pw.reqSize += proto.Size(pw.reqTmpl.tmpl)
206206
}
207207
return pw
208208
}
@@ -221,33 +221,22 @@ func (pw *pendingWrite) markDone(resp *storagepb.AppendRowsResponse, err error)
221221
close(pw.result.ready)
222222
// Cleanup references remaining on the write explicitly.
223223
pw.req = nil
224-
pw.descVersion = nil
224+
pw.reqTmpl = nil
225225
pw.writer = nil
226226
pw.reqCtx = nil
227227
}
228228

229229
func (pw *pendingWrite) constructFullRequest(addTrace bool) *storagepb.AppendRowsRequest {
230230
req := &storagepb.AppendRowsRequest{}
231+
if pw.reqTmpl != nil {
232+
req = proto.Clone(pw.reqTmpl.tmpl).(*storagepb.AppendRowsRequest)
233+
}
231234
if pw.req != nil {
232-
req = proto.Clone(pw.req).(*storagepb.AppendRowsRequest)
235+
proto.Merge(req, pw.req)
233236
}
234237
if addTrace {
235238
req.TraceId = buildTraceID(&streamSettings{TraceID: pw.traceID})
236239
}
237240
req.WriteStream = pw.writeStreamID
238-
if pw.descVersion != nil {
239-
ps := &storagepb.ProtoSchema{
240-
ProtoDescriptor: pw.descVersion.descriptorProto,
241-
}
242-
if pr := req.GetProtoRows(); pr != nil {
243-
pr.WriterSchema = ps
244-
} else {
245-
req.Rows = &storagepb.AppendRowsRequest_ProtoRows{
246-
ProtoRows: &storagepb.AppendRowsRequest_ProtoData{
247-
WriterSchema: ps,
248-
},
249-
}
250-
}
251-
}
252241
return req
253242
}

bigquery/storage/managedwriter/appendresult_test.go

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -132,7 +132,8 @@ func TestPendingWrite(t *testing.T) {
132132
func TestPendingWrite_ConstructFullRequest(t *testing.T) {
133133

134134
testDP := &descriptorpb.DescriptorProto{Name: proto.String("foo")}
135-
testDV := newDescriptorVersion(testDP)
135+
testTmpl := newVersionedTemplate().revise(reviseProtoSchema(testDP))
136+
136137
testEmptyTraceID := buildTraceID(&streamSettings{})
137138

138139
for _, tc := range []struct {
@@ -144,7 +145,7 @@ func TestPendingWrite_ConstructFullRequest(t *testing.T) {
144145
{
145146
desc: "nil request",
146147
pw: &pendingWrite{
147-
descVersion: testDV,
148+
reqTmpl: testTmpl,
148149
},
149150
want: &storagepb.AppendRowsRequest{
150151
Rows: &storagepb.AppendRowsRequest_ProtoRows{
@@ -159,8 +160,8 @@ func TestPendingWrite_ConstructFullRequest(t *testing.T) {
159160
{
160161
desc: "empty req w/trace",
161162
pw: &pendingWrite{
162-
req: &storagepb.AppendRowsRequest{},
163-
descVersion: testDV,
163+
req: &storagepb.AppendRowsRequest{},
164+
reqTmpl: testTmpl,
164165
},
165166
addTrace: true,
166167
want: &storagepb.AppendRowsRequest{
@@ -177,8 +178,8 @@ func TestPendingWrite_ConstructFullRequest(t *testing.T) {
177178
{
178179
desc: "basic req",
179180
pw: &pendingWrite{
180-
req: &storagepb.AppendRowsRequest{},
181-
descVersion: testDV,
181+
req: &storagepb.AppendRowsRequest{},
182+
reqTmpl: testTmpl,
182183
},
183184
want: &storagepb.AppendRowsRequest{
184185
Rows: &storagepb.AppendRowsRequest_ProtoRows{
@@ -194,7 +195,7 @@ func TestPendingWrite_ConstructFullRequest(t *testing.T) {
194195
desc: "everything w/trace",
195196
pw: &pendingWrite{
196197
req: &storagepb.AppendRowsRequest{},
197-
descVersion: testDV,
198+
reqTmpl: testTmpl,
198199
traceID: "foo",
199200
writeStreamID: "streamid",
200201
},

bigquery/storage/managedwriter/client.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -151,6 +151,7 @@ func (c *Client) buildManagedStream(ctx context.Context, streamFunc streamClient
151151
id: newUUID(writerIDPrefix),
152152
c: c,
153153
streamSettings: defaultStreamSettings(),
154+
curTemplate: newVersionedTemplate(),
154155
}
155156
// apply writer options.
156157
for _, opt := range opts {

bigquery/storage/managedwriter/connection.go

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -376,8 +376,22 @@ func (co *connection) lockingAppend(pw *pendingWrite) error {
376376
// Additionally, we check multiplex status as schema changes for explicit streams
377377
// require reconnect, whereas multiplex does not.
378378
forceReconnect := false
379-
if pw.writer != nil && pw.descVersion != nil && pw.descVersion.isNewer(pw.writer.curDescVersion) {
380-
pw.writer.curDescVersion = pw.descVersion
379+
promoted := false
380+
if pw.writer != nil && pw.reqTmpl != nil {
381+
if !pw.reqTmpl.Compatible(pw.writer.curTemplate) {
382+
if pw.writer.curTemplate == nil {
383+
// promote because there's no current template
384+
pw.writer.curTemplate = pw.reqTmpl
385+
promoted = true
386+
} else {
387+
if pw.writer.curTemplate.versionTime.Before(pw.reqTmpl.versionTime) {
388+
pw.writer.curTemplate = pw.reqTmpl
389+
promoted = true
390+
}
391+
}
392+
}
393+
}
394+
if promoted {
381395
if co.optimizer == nil {
382396
forceReconnect = true
383397
} else {

bigquery/storage/managedwriter/integration_test.go

Lines changed: 94 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -259,7 +259,9 @@ func TestIntegration_ManagedWriter(t *testing.T) {
259259
t.Run("TestLargeInsertWithRetry", func(t *testing.T) {
260260
testLargeInsertWithRetry(ctx, t, mwClient, bqClient, dataset)
261261
})
262-
262+
t.Run("DefaultValueHandling", func(t *testing.T) {
263+
testDefaultValueHandling(ctx, t, mwClient, bqClient, dataset)
264+
})
263265
})
264266
}
265267

@@ -1262,6 +1264,97 @@ func testSchemaEvolution(ctx context.Context, t *testing.T, mwClient *Client, bq
12621264
)
12631265
}
12641266

1267+
func testDefaultValueHandling(ctx context.Context, t *testing.T, mwClient *Client, bqClient *bigquery.Client, dataset *bigquery.Dataset, opts ...WriterOption) {
1268+
testTable := dataset.Table(tableIDs.New())
1269+
if err := testTable.Create(ctx, &bigquery.TableMetadata{Schema: testdata.DefaultValueSchema}); err != nil {
1270+
t.Fatalf("failed to create test table %s: %v", testTable.FullyQualifiedName(), err)
1271+
}
1272+
1273+
m := &testdata.DefaultValuesPartialSchema{
1274+
// We only populate the id, as remaining fields are used to test default values.
1275+
Id: proto.String("someval"),
1276+
}
1277+
var data []byte
1278+
var err error
1279+
if data, err = proto.Marshal(m); err != nil {
1280+
t.Fatalf("failed to marshal test row data")
1281+
}
1282+
descriptorProto := protodesc.ToDescriptorProto(m.ProtoReflect().Descriptor())
1283+
1284+
// setup a new stream.
1285+
opts = append(opts, WithDestinationTable(TableParentFromParts(testTable.ProjectID, testTable.DatasetID, testTable.TableID)))
1286+
opts = append(opts, WithSchemaDescriptor(descriptorProto))
1287+
ms, err := mwClient.NewManagedStream(ctx, opts...)
1288+
if err != nil {
1289+
t.Fatalf("NewManagedStream: %v", err)
1290+
}
1291+
validateTableConstraints(ctx, t, bqClient, testTable, "before send",
1292+
withExactRowCount(0))
1293+
1294+
var result *AppendResult
1295+
1296+
// Send one row, verify default values were set as expected.
1297+
1298+
result, err = ms.AppendRows(ctx, [][]byte{data})
1299+
if err != nil {
1300+
t.Errorf("append failed: %v", err)
1301+
}
1302+
// Wait for the result to indicate ready, then validate.
1303+
_, err = result.GetResult(ctx)
1304+
if err != nil {
1305+
t.Errorf("error on append: %v", err)
1306+
}
1307+
1308+
validateTableConstraints(ctx, t, bqClient, testTable, "after first row",
1309+
withExactRowCount(1),
1310+
withNonNullCount("id", 1),
1311+
withNullCount("strcol_withdef", 1),
1312+
withNullCount("intcol_withdef", 1),
1313+
withNullCount("otherstr_withdef", 0)) // not part of partial schema
1314+
1315+
// Change default MVI to use nulls.
1316+
// We expect the fields in the partial schema to leverage nulls rather than default values.
1317+
// The fields outside the partial schema continue to obey default values.
1318+
result, err = ms.AppendRows(ctx, [][]byte{data}, UpdateDefaultMissingValueInterpretation(storagepb.AppendRowsRequest_DEFAULT_VALUE))
1319+
if err != nil {
1320+
t.Errorf("append failed: %v", err)
1321+
}
1322+
// Wait for the result to indicate ready, then validate.
1323+
_, err = result.GetResult(ctx)
1324+
if err != nil {
1325+
t.Errorf("error on append: %v", err)
1326+
}
1327+
1328+
validateTableConstraints(ctx, t, bqClient, testTable, "after second row (default mvi is DEFAULT_VALUE)",
1329+
withExactRowCount(2),
1330+
withNullCount("strcol_withdef", 1), // doesn't increment, as it gets default value
1331+
withNullCount("intcol_withdef", 1)) // doesn't increment, as it gets default value
1332+
1333+
// Change per-column MVI to use default value
1334+
result, err = ms.AppendRows(ctx, [][]byte{data},
1335+
UpdateMissingValueInterpretations(map[string]storagepb.AppendRowsRequest_MissingValueInterpretation{
1336+
"strcol_withdef": storagepb.AppendRowsRequest_NULL_VALUE,
1337+
}))
1338+
if err != nil {
1339+
t.Errorf("append failed: %v", err)
1340+
}
1341+
// Wait for the result to indicate ready, then validate.
1342+
_, err = result.GetResult(ctx)
1343+
if err != nil {
1344+
t.Errorf("error on append: %v", err)
1345+
}
1346+
1347+
validateTableConstraints(ctx, t, bqClient, testTable, "after third row (explicit column mvi)",
1348+
withExactRowCount(3),
1349+
withNullCount("strcol_withdef", 2), // increments as it's null for this column
1350+
withNullCount("intcol_withdef", 1), // doesn't increment, still default value
1351+
withNonNullCount("otherstr_withdef", 3), // not part of descriptor, always gets default value
1352+
withNullCount("otherstr", 3), // not part of descriptor, always gets null
1353+
withNullCount("strcol", 3), // no default value defined, always gets null
1354+
withNullCount("intcol", 3), // no default value defined, always gets null
1355+
)
1356+
}
1357+
12651358
func TestIntegration_DetectProjectID(t *testing.T) {
12661359
ctx := context.Background()
12671360
testCreds := testutil.Credentials(ctx)

bigquery/storage/managedwriter/managed_stream.go

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -78,9 +78,9 @@ type ManagedStream struct {
7878

7979
streamSettings *streamSettings
8080
// retains the current descriptor for the stream.
81-
curDescVersion *descriptorVersion
82-
c *Client
83-
retry *statelessRetryer
81+
curTemplate *versionedTemplate
82+
c *Client
83+
retry *statelessRetryer
8484

8585
// writer state
8686
mu sync.Mutex
@@ -298,13 +298,20 @@ func (ms *ManagedStream) AppendRows(ctx context.Context, data [][]byte, opts ...
298298
return nil, err
299299
}
300300
// Ensure we build the request and pending write with a consistent schema version.
301-
curSchemaVersion := ms.curDescVersion
301+
curTemplate := ms.curTemplate
302302
req := ms.buildRequest(data)
303-
pw := newPendingWrite(ctx, ms, req, curSchemaVersion, ms.streamSettings.streamID, ms.streamSettings.TraceID)
303+
pw := newPendingWrite(ctx, ms, req, curTemplate, ms.streamSettings.streamID, ms.streamSettings.TraceID)
304304
// apply AppendOption opts
305305
for _, opt := range opts {
306306
opt(pw)
307307
}
308+
// Post-request fixup after options are applied.
309+
if pw.reqTmpl != nil {
310+
if pw.reqTmpl.tmpl != nil {
311+
// MVIs must be set on each request, but _default_ MVIs persist across the stream lifetime. Sigh.
312+
pw.req.MissingValueInterpretations = pw.reqTmpl.tmpl.GetMissingValueInterpretations()
313+
}
314+
}
308315

309316
// Call the underlying append. The stream has it's own retained context and will surface expiry on
310317
// it's own, but we also need to respect any deadline for the provided context.

bigquery/storage/managedwriter/managed_stream_test.go

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,7 @@ func TestManagedStream_RequestOptimization(t *testing.T) {
110110
}
111111
ms.streamSettings.streamID = "FOO"
112112
ms.streamSettings.TraceID = "TRACE"
113-
ms.curDescVersion = newDescriptorVersion(&descriptorpb.DescriptorProto{})
113+
ms.curTemplate = newVersionedTemplate().revise(reviseProtoSchema(&descriptorpb.DescriptorProto{}))
114114

115115
fakeData := [][]byte{
116116
[]byte("foo"),
@@ -191,7 +191,7 @@ func TestManagedStream_FlowControllerFailure(t *testing.T) {
191191
router.conn.fc = newFlowController(1, 0)
192192
router.conn.fc.acquire(ctx, 0)
193193

194-
ms.curDescVersion = newDescriptorVersion(&descriptorpb.DescriptorProto{})
194+
ms.curTemplate = newVersionedTemplate().revise(reviseProtoSchema(&descriptorpb.DescriptorProto{}))
195195

196196
fakeData := [][]byte{
197197
[]byte("foo"),
@@ -236,7 +236,7 @@ func TestManagedStream_AppendWithDeadline(t *testing.T) {
236236
t.Errorf("addWriter: %v", err)
237237
}
238238
conn := router.conn
239-
ms.curDescVersion = newDescriptorVersion(&descriptorpb.DescriptorProto{})
239+
ms.curTemplate = newVersionedTemplate().revise(reviseProtoSchema(&descriptorpb.DescriptorProto{}))
240240

241241
fakeData := [][]byte{
242242
[]byte("foo"),
@@ -293,7 +293,7 @@ func TestManagedStream_ContextExpiry(t *testing.T) {
293293
ctx: ctx,
294294
streamSettings: defaultStreamSettings(),
295295
}
296-
ms.curDescVersion = newDescriptorVersion(&descriptorpb.DescriptorProto{})
296+
ms.curTemplate = newVersionedTemplate().revise(reviseProtoSchema(&descriptorpb.DescriptorProto{}))
297297
if err := pool.addWriter(ms); err != nil {
298298
t.Errorf("addWriter: %v", err)
299299
}
@@ -316,7 +316,7 @@ func TestManagedStream_ContextExpiry(t *testing.T) {
316316
cancel()
317317

318318
// First, append with an invalid context.
319-
pw := newPendingWrite(cancelCtx, ms, fakeReq, ms.curDescVersion, "", "")
319+
pw := newPendingWrite(cancelCtx, ms, fakeReq, ms.curTemplate, "", "")
320320
err := ms.appendWithRetry(pw)
321321
if err != context.Canceled {
322322
t.Errorf("expected cancelled context error, got: %v", err)
@@ -457,7 +457,7 @@ func TestManagedStream_LeakingGoroutines(t *testing.T) {
457457
ctx: ctx,
458458
streamSettings: defaultStreamSettings(),
459459
}
460-
ms.curDescVersion = newDescriptorVersion(&descriptorpb.DescriptorProto{})
460+
ms.curTemplate = newVersionedTemplate().revise(reviseProtoSchema(&descriptorpb.DescriptorProto{}))
461461
if err := pool.addWriter(ms); err != nil {
462462
t.Errorf("addWriter: %v", err)
463463
}
@@ -509,7 +509,7 @@ func TestManagedStream_LeakingGoroutinesReconnect(t *testing.T) {
509509
retry: newStatelessRetryer(),
510510
}
511511
ms.retry.maxAttempts = 4
512-
ms.curDescVersion = newDescriptorVersion(&descriptorpb.DescriptorProto{})
512+
ms.curTemplate = newVersionedTemplate().revise(reviseProtoSchema(&descriptorpb.DescriptorProto{}))
513513
if err := pool.addWriter(ms); err != nil {
514514
t.Errorf("addWriter: %v", err)
515515
}
@@ -575,7 +575,7 @@ func TestManagedWriter_CancellationDuringRetry(t *testing.T) {
575575
streamSettings: defaultStreamSettings(),
576576
retry: newStatelessRetryer(),
577577
}
578-
ms.curDescVersion = newDescriptorVersion(&descriptorpb.DescriptorProto{})
578+
ms.curTemplate = newVersionedTemplate().revise(reviseProtoSchema(&descriptorpb.DescriptorProto{}))
579579
if err := pool.addWriter(ms); err != nil {
580580
t.Errorf("addWriter: %v", err)
581581
}
@@ -624,7 +624,7 @@ func TestManagedStream_Closure(t *testing.T) {
624624
streamSettings: defaultStreamSettings(),
625625
}
626626
ms.ctx, ms.cancel = context.WithCancel(pool.ctx)
627-
ms.curDescVersion = newDescriptorVersion(&descriptorpb.DescriptorProto{})
627+
ms.curTemplate = newVersionedTemplate().revise(reviseProtoSchema(&descriptorpb.DescriptorProto{}))
628628
if err := pool.addWriter(ms); err != nil {
629629
t.Errorf("addWriter A: %v", err)
630630
}

0 commit comments

Comments
 (0)