Closed
Description
ref: #133
Here's some code we're trying internally to make larger downloads possible / faster.
Let me know thoughts (and anyone is welcome to take this and do a PR if they'd like, similar to bulk upload)
from google.cloud import bigquery, storage
# https://stackoverflow.com/questions/14622526
def create_from_query(query, dataset, table, block=False, if_exists='fail',
project=None, credentials=None):
"""
Create a bigquery table from a query
Parameters
----------
query : str
SQL-Like Query to return data values
dataset : str
dataset id
table : str
name of table
block : bool (default False)
if_exists : str (default: 'fail')
append - Specifies that rows may be appended to an existing table
fail - Specifies that the output table must be empty
replace - Specifies that write should replace a table
project : str (default to env var GOOGLE_CLOUD_PROJECT)
Google BigQuery Account project ID.
credentials : GoogleCredentials (optional)
Name of result column to use for index in results DataFrame
Returns
-------
job: google.cloud.bigquery.job.QueryJob
Returns the inserted QueryJob
"""
client = bigquery.Client(project=project, credentials=credentials)
if dataset not in [x.dataset_id for x in client.list_datasets()]:
# https://github.com/GoogleCloudPlatform/google-cloud-python/issues/4930
dataset = bigquery.Dataset(client.dataset(dataset))
client.create_dataset(dataset)
config = bigquery.job.QueryJobConfig()
config.use_legacy_sql = False
config.allow_large_results = True
config.destination = client.dataset(dataset).table(table)
config.write_disposition = if_exists_map[if_exists]
job_id = uuid.uuid4().hex[:10]
job = client.query(query=query, job_id=job_id, job_config=config)
if block:
wait_for_job(job)
return job
def export_table_to_gcs(
dataset,
table,
timeout_in_seconds=600,
bucket=None,
blob=None,
zipped=True,
):
"""
export table to gcs. returns tuple of (bucket, blob)
"""
client = bigquery.Client(GOOGLE_CLOUD_PROJECT)
table_ref = client.dataset(dataset).table(table)
job_config = ExtractJobConfig()
job_config.compression = 'GZIP' if zipped else 'NONE'
bucket = bucket or '{}-temp'.format(client.project)
blob = blob or '{}/{}.csv'.format(dataset, table)
if zipped and not blob.endswith('.gz'):
blob += '.gz'
destination_uri = 'gs://{}/{}'.format(bucket, blob)
extract_job = client.extract_table(
table_ref, destination_uri, job_config=job_config)
extract_job.result(timeout=timeout_in_seconds)
logger.info('Exported {}.{} -> {}'.format(dataset, table, destination_uri))
return bucket, blob
DEFAULT_BUCKET = '{}-temp'.format(GOOGLE_CLOUD_PROJECT)
def read_table(
dataset,
table,
project=None,
credentials=None,
timeout_in_seconds=600,
bucket=DEFAULT_BUCKET,
):
"""
reads an entire table from gbq into a dataframe
"""
storage = storage.Client(project, credentials)
prefix = 'gbq-exports/{}/{}/'.format(dataset, table)
bucket = storage.get_bucket(bucket)
for old_blob in bucket.list_blobs(prefix=prefix):
old_blob.delete()
logger.info('Old Blob Deleted: {}'.format(old_blob.name))
bq = bigquery.Client(project=project, credentials=credentials)
table = bq.dataset(dataset).table(table)
conf = ExtractJobConfig()
conf.compression = 'GZIP'
extract_job = bq.extract_table(
table,
'gs://{}/{}*.csv.gz'.format(bucket.name, prefix),
job_config=conf)
extract_job.result(timeout=timeout_in_seconds)
frames = []
path = tempfile.mkdtemp()
for blob in bucket.list_blobs(prefix=prefix):
filename = '{}/{}'.format(path, blob.name.replace('/', '__'))
with open(filename, 'wb') as f:
blob.download_to_file(f)
frames.append(pd.read_csv(filename))
blob.delete()
logger.info('Processed: {}'.format(blob.name))
return pd.concat(frames)
def read_gbq_bulk(
query,
project=None,
bucket=None,
dataset='temp_pandas',
credentials=None,
):
table_name = uuid.uuid4().hex[:6]
create_from_query(
query=query,
dataset=dataset,
table=table_name,
project=project,
credentials=credentials,
block=True,
)
df = read_table(
dataset=dataset,
table=table_name,
project=project,
)
bq = bigquery.Client(project=project, credentials=credentials)
table = bq.dataset(dataset).table(table_name)
bq.delete_table(table)
return df
Metadata
Metadata
Assignees
Labels
No labels