Skip to content

Commit 5d33b1b

Browse files
committed
bigquery: Make Query more configurable and add a Run method.
Change-Id: I05384196ef43489b3fd8d960e913bcb8c27bee44 Reviewed-on: https://code-review.googlesource.com/8154 Reviewed-by: Jonathan Amsterdam <[email protected]>
1 parent 07f82cd commit 5d33b1b

File tree

13 files changed

+715
-291
lines changed

13 files changed

+715
-291
lines changed

bigquery/bigquery.go

Lines changed: 0 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -85,39 +85,6 @@ func (c *Client) Close() error {
8585
return nil
8686
}
8787

88-
// Query creates a query with string q. You may optionally set
89-
// DefaultProjectID and DefaultDatasetID on the returned query before using it.
90-
func (c *Client) Query(q string) *Query {
91-
return &Query{Q: q, client: c}
92-
}
93-
94-
// Read submits a query for execution and returns the results via an Iterator.
95-
//
96-
// Read uses a temporary table to hold the results of the query job.
97-
//
98-
// For more control over how a query is performed, don't use this method but
99-
// instead pass the Query as a Source to Client.Copy, and call Read on the
100-
// resulting Job.
101-
func (q *Query) Read(ctx context.Context, options ...ReadOption) (*Iterator, error) {
102-
dest := &Table{}
103-
job, err := q.client.Copy(ctx, dest, q, WriteTruncate)
104-
if err != nil {
105-
return nil, err
106-
}
107-
return job.Read(ctx, options...)
108-
}
109-
110-
// executeQuery submits a query for execution and returns the results via an Iterator.
111-
func (c *Client) executeQuery(ctx context.Context, q *Query, options ...ReadOption) (*Iterator, error) {
112-
dest := &Table{}
113-
job, err := c.Copy(ctx, dest, q, WriteTruncate)
114-
if err != nil {
115-
return nil, err
116-
}
117-
118-
return c.Read(ctx, job, options...)
119-
}
120-
12188
// Dataset creates a handle to a BigQuery dataset in the client's project.
12289
func (c *Client) Dataset(id string) *Dataset {
12390
return c.DatasetInProject(c.projectID, id)

bigquery/copy.go

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -70,12 +70,7 @@ func (c *Copier) Run(ctx context.Context) (*Job, error) {
7070
},
7171
}
7272

73-
if c.JobID != "" {
74-
job.JobReference = &bq.JobReference{
75-
JobId: c.JobID,
76-
ProjectId: c.c.projectID,
77-
}
78-
}
73+
setJobRef(job, c.JobID, c.c.projectID)
7974

8075
conf.DestinationTable = c.Dst.tableRefProto()
8176
for _, t := range c.Srcs {

bigquery/extract.go

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -61,12 +61,7 @@ func (e *Extractor) Run(ctx context.Context) (*Job, error) {
6161
},
6262
}
6363

64-
if e.JobID != "" {
65-
job.JobReference = &bq.JobReference{
66-
JobId: e.JobID,
67-
ProjectId: e.c.projectID,
68-
}
69-
}
64+
setJobRef(job, e.JobID, e.c.projectID)
7065

7166
e.Dst.customizeExtractDst(job.Configuration.Extract)
7267
e.Src.customizeExtractSrc(job.Configuration.Extract)

bigquery/integration_test.go

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -136,11 +136,10 @@ func TestIntegration(t *testing.T) {
136136
checkRead(table)
137137

138138
// Query the table.
139-
q := &Query{
140-
Q: "select name, num from t1",
141-
DefaultProjectID: projID,
142-
DefaultDatasetID: ds.id,
143-
}
139+
q := c.Query("select name, num from t1")
140+
q.DefaultProjectID = projID
141+
q.DefaultDatasetID = ds.id
142+
144143
checkRead(q)
145144

146145
// Query the long way.

bigquery/job.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,21 @@ type JobStatus struct {
7171
Errors []*Error
7272
}
7373

74+
// setJobRef initializes job's JobReference if given a non-empty jobID.
75+
// projectID must be non-empty.
76+
func setJobRef(job *bq.Job, jobID, projectID string) {
77+
if jobID == "" {
78+
return
79+
}
80+
// We don't check whether projectID is empty; the server will return an
81+
// error when it encounters the resulting JobReference.
82+
83+
job.JobReference = &bq.JobReference{
84+
JobId: jobID,
85+
ProjectId: projectID,
86+
}
87+
}
88+
7489
// jobOption is an Option which modifies a bq.Job proto.
7590
// This is used for configuring values that apply to all operations, such as setting a jobReference.
7691
type jobOption interface {

bigquery/legacy.go

Lines changed: 220 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -59,12 +59,30 @@ func (c *Client) Read(ctx context.Context, src ReadSource, options ...ReadOption
5959
case *Job:
6060
return src.Read(ctx, options...)
6161
case *Query:
62-
// For compatibility, support Query values created by literal, rather
63-
// than Client.Query.
62+
// Query used not to contain a QueryConfig. By moving its
63+
// top-level fields down into a QueryConfig field, we break
64+
// code that uses a Query literal. If users make the minimal
65+
// change to fix this (e.g. moving the "Q" field into a nested
66+
// QueryConfig within the Query), they will end up with a Query
67+
// that has no Client. It's preferable to make Read continue
68+
// to work in this case too, at least until we delete Read
69+
// completely. So we copy QueryConfig into a Query with an
70+
// actual client.
6471
if src.client == nil {
65-
src.client = c
72+
src = &Query{
73+
client: c,
74+
QueryConfig: src.QueryConfig,
75+
Q: src.Q,
76+
DefaultProjectID: src.DefaultProjectID,
77+
DefaultDatasetID: src.DefaultDatasetID,
78+
}
6679
}
6780
return src.Read(ctx, options...)
81+
case *QueryConfig:
82+
// For compatibility, support QueryConfig values created by literal, rather
83+
// than Client.Query.
84+
q := &Query{client: c, QueryConfig: *src}
85+
return q.Read(ctx, options...)
6886
case *Table:
6987
return src.Read(ctx, options...)
7088
}
@@ -194,14 +212,42 @@ func (opt ignoreUnknownValues) customizeLoad(conf *bq.JobConfigurationLoad) {
194212
conf.IgnoreUnknownValues = true
195213
}
196214

215+
// CreateDisposition returns an Option that specifies the TableCreateDisposition to use.
216+
// Deprecated: use the CreateDisposition field in Query, CopyConfig or LoadConfig instead.
217+
func CreateDisposition(disp TableCreateDisposition) Option { return disp }
218+
219+
func (opt TableCreateDisposition) implementsOption() {}
220+
221+
func (opt TableCreateDisposition) customizeCopy(conf *bq.JobConfigurationTableCopy) {
222+
conf.CreateDisposition = string(opt)
223+
}
224+
197225
func (opt TableCreateDisposition) customizeLoad(conf *bq.JobConfigurationLoad) {
198226
conf.CreateDisposition = string(opt)
199227
}
200228

229+
func (opt TableCreateDisposition) customizeQuery(conf *bq.JobConfigurationQuery) {
230+
conf.CreateDisposition = string(opt)
231+
}
232+
233+
// WriteDisposition returns an Option that specifies the TableWriteDisposition to use.
234+
// Deprecated: use the WriteDisposition field in Query, CopyConfig or LoadConfig instead.
235+
func WriteDisposition(disp TableWriteDisposition) Option { return disp }
236+
237+
func (opt TableWriteDisposition) implementsOption() {}
238+
239+
func (opt TableWriteDisposition) customizeCopy(conf *bq.JobConfigurationTableCopy) {
240+
conf.WriteDisposition = string(opt)
241+
}
242+
201243
func (opt TableWriteDisposition) customizeLoad(conf *bq.JobConfigurationLoad) {
202244
conf.WriteDisposition = string(opt)
203245
}
204246

247+
func (opt TableWriteDisposition) customizeQuery(conf *bq.JobConfigurationQuery) {
248+
conf.WriteDisposition = string(opt)
249+
}
250+
205251
type extractOption interface {
206252
customizeExtract(conf *bq.JobConfigurationExtract)
207253
}
@@ -299,6 +345,9 @@ func (c *Client) Copy(ctx context.Context, dst Destination, src Source, options
299345
return c.cp(ctx, dst, src, options)
300346
case *Query:
301347
return c.query(ctx, dst, src, options)
348+
case *QueryConfig:
349+
q := &Query{QueryConfig: *src}
350+
return c.query(ctx, dst, q, options)
302351
}
303352
case *GCSReference:
304353
if src, ok := src.(*Table); ok {
@@ -322,3 +371,171 @@ type Destination interface {
322371
type ReadSource interface {
323372
implementsReadSource()
324373
}
374+
375+
type queryOption interface {
376+
customizeQuery(conf *bq.JobConfigurationQuery)
377+
}
378+
379+
// DisableQueryCache returns an Option that prevents results being fetched from the query cache.
380+
// If this Option is not used, results are fetched from the cache if they are available.
381+
// The query cache is a best-effort cache that is flushed whenever tables in the query are modified.
382+
// Cached results are only available when TableID is unspecified in the query's destination Table.
383+
// For more information, see https://cloud.google.com/bigquery/querying-data#querycaching
384+
//
385+
// Deprecated: use Query.DisableQueryCache instead.
386+
func DisableQueryCache() Option { return disableQueryCache{} }
387+
388+
type disableQueryCache struct{}
389+
390+
func (opt disableQueryCache) implementsOption() {}
391+
392+
func (opt disableQueryCache) customizeQuery(conf *bq.JobConfigurationQuery) {
393+
f := false
394+
conf.UseQueryCache = &f
395+
}
396+
397+
// DisableFlattenedResults returns an Option that prevents results being flattened.
398+
// If this Option is not used, results from nested and repeated fields are flattened.
399+
// DisableFlattenedResults implies AllowLargeResults
400+
// For more information, see https://cloud.google.com/bigquery/docs/data#nested
401+
// Deprecated: use Query.DisableFlattenedResults instead.
402+
func DisableFlattenedResults() Option { return disableFlattenedResults{} }
403+
404+
type disableFlattenedResults struct{}
405+
406+
func (opt disableFlattenedResults) implementsOption() {}
407+
408+
func (opt disableFlattenedResults) customizeQuery(conf *bq.JobConfigurationQuery) {
409+
f := false
410+
conf.FlattenResults = &f
411+
// DisableFlattenedResults implies AllowLargeResults
412+
allowLargeResults{}.customizeQuery(conf)
413+
}
414+
415+
// AllowLargeResults returns an Option that allows the query to produce arbitrarily large result tables.
416+
// The destination must be a table.
417+
// When using this option, queries will take longer to execute, even if the result set is small.
418+
// For additional limitations, see https://cloud.google.com/bigquery/querying-data#largequeryresults
419+
// Deprecated: use Query.AllowLargeResults instead.
420+
func AllowLargeResults() Option { return allowLargeResults{} }
421+
422+
type allowLargeResults struct{}
423+
424+
func (opt allowLargeResults) implementsOption() {}
425+
426+
func (opt allowLargeResults) customizeQuery(conf *bq.JobConfigurationQuery) {
427+
conf.AllowLargeResults = true
428+
}
429+
430+
// JobPriority returns an Option that causes a query to be scheduled with the specified priority.
431+
// The default priority is InteractivePriority.
432+
// For more information, see https://cloud.google.com/bigquery/querying-data#batchqueries
433+
// Deprecated: use Query.Priority instead.
434+
func JobPriority(priority string) Option { return jobPriority(priority) }
435+
436+
type jobPriority string
437+
438+
func (opt jobPriority) implementsOption() {}
439+
440+
func (opt jobPriority) customizeQuery(conf *bq.JobConfigurationQuery) {
441+
conf.Priority = string(opt)
442+
}
443+
444+
// MaxBillingTier returns an Option that sets the maximum billing tier for a Query.
445+
// Queries that have resource usage beyond this tier will fail (without
446+
// incurring a charge). If this Option is not used, the project default will be used.
447+
// Deprecated: use Query.MaxBillingTier instead.
448+
func MaxBillingTier(tier int) Option { return maxBillingTier(tier) }
449+
450+
type maxBillingTier int
451+
452+
func (opt maxBillingTier) implementsOption() {}
453+
454+
func (opt maxBillingTier) customizeQuery(conf *bq.JobConfigurationQuery) {
455+
tier := int64(opt)
456+
conf.MaximumBillingTier = &tier
457+
}
458+
459+
// MaxBytesBilled returns an Option that limits the number of bytes billed for
460+
// this job. Queries that would exceed this limit will fail (without incurring
461+
// a charge).
462+
// If this Option is not used, or bytes is < 1, the project default will be
463+
// used.
464+
// Deprecated: use Query.MaxBytesBilled instead.
465+
func MaxBytesBilled(bytes int64) Option { return maxBytesBilled(bytes) }
466+
467+
type maxBytesBilled int64
468+
469+
func (opt maxBytesBilled) implementsOption() {}
470+
471+
func (opt maxBytesBilled) customizeQuery(conf *bq.JobConfigurationQuery) {
472+
if opt >= 1 {
473+
conf.MaximumBytesBilled = int64(opt)
474+
}
475+
}
476+
477+
// QueryUseStandardSQL returns an Option that set the query to use standard SQL.
478+
// The default setting is false (using legacy SQL).
479+
// Deprecated: use Query.UseStandardSQL instead.
480+
func QueryUseStandardSQL() Option { return queryUseStandardSQL{} }
481+
482+
type queryUseStandardSQL struct{}
483+
484+
func (opt queryUseStandardSQL) implementsOption() {}
485+
486+
func (opt queryUseStandardSQL) customizeQuery(conf *bq.JobConfigurationQuery) {
487+
conf.UseLegacySql = false
488+
conf.ForceSendFields = append(conf.ForceSendFields, "UseLegacySql")
489+
}
490+
491+
func (c *Client) query(ctx context.Context, dst *Table, src *Query, options []Option) (*Job, error) {
492+
job, options := initJobProto(c.projectID, options)
493+
payload := &bq.JobConfigurationQuery{}
494+
495+
dst.customizeQueryDst(payload)
496+
497+
// QueryConfig now contains a Dst field. If it is set, it will override dst.
498+
// This should not affect existing client code which does not set QueryConfig.Dst.
499+
src.QueryConfig.customizeQuerySrc(payload)
500+
501+
// For compatability, allow some legacy fields to be set directly on the query.
502+
// TODO(jba): delete this code when deleting Client.Copy.
503+
if src.Q != "" {
504+
payload.Query = src.Q
505+
}
506+
if src.DefaultProjectID != "" || src.DefaultDatasetID != "" {
507+
payload.DefaultDataset = &bq.DatasetReference{
508+
DatasetId: src.DefaultDatasetID,
509+
ProjectId: src.DefaultProjectID,
510+
}
511+
}
512+
// end of compatability code.
513+
514+
for _, opt := range options {
515+
o, ok := opt.(queryOption)
516+
if !ok {
517+
return nil, fmt.Errorf("option (%#v) not applicable to dst/src pair: dst: %T ; src: %T", opt, dst, src)
518+
}
519+
o.customizeQuery(payload)
520+
}
521+
522+
job.Configuration = &bq.JobConfiguration{
523+
Query: payload,
524+
}
525+
j, err := c.service.insertJob(ctx, job, c.projectID)
526+
if err != nil {
527+
return nil, err
528+
}
529+
j.isQuery = true
530+
return j, nil
531+
}
532+
533+
// Read submits a query for execution and returns the results via an Iterator.
534+
// Deprecated: Call Read on the Job returned by Query.Run instead.
535+
func (q *Query) Read(ctx context.Context, options ...ReadOption) (*Iterator, error) {
536+
job, err := q.Run(ctx)
537+
if err != nil {
538+
return nil, err
539+
}
540+
return job.Read(ctx, options...)
541+
}

bigquery/load_test.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -278,14 +278,18 @@ func TestConfiguringLoader(t *testing.T) {
278278
want := defaultLoadJob()
279279
want.Configuration.Load.CreateDisposition = "CREATE_NEVER"
280280
want.Configuration.Load.WriteDisposition = "WRITE_TRUNCATE"
281+
want.JobReference = &bq.JobReference{
282+
JobId: "ajob",
283+
ProjectId: "project-id",
284+
}
281285

282286
loader := dst.LoaderFrom(src)
283287
loader.CreateDisposition = CreateNever
284288
loader.WriteDisposition = WriteTruncate
289+
loader.JobID = "ajob"
285290

286291
if _, err := loader.Run(context.Background()); err != nil {
287-
t.Errorf("err calling Loader.Run: %v", err)
288-
return
292+
t.Fatalf("err calling Loader.Run: %v", err)
289293
}
290294
if !reflect.DeepEqual(s.Job, want) {
291295
t.Errorf("loading: got:\n%v\nwant:\n%v", s.Job, want)

0 commit comments

Comments
 (0)