Skip to content

Commit 1cf9f5e

Browse files
authored
fix: prevent creating unnecessary client objects in multithreaded environments (#1757)
This prevents extra authentication and default location queries in multithreaded environments.
1 parent 27fac78 commit 1cf9f5e

File tree

2 files changed

+136
-103
lines changed

2 files changed

+136
-103
lines changed

bigframes/pandas/io/api.py

Lines changed: 34 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
from __future__ import annotations
1616

1717
import inspect
18+
import threading
1819
import typing
1920
from typing import (
2021
Any,
@@ -465,6 +466,8 @@ def from_glob_path(
465466

466467
from_glob_path.__doc__ = inspect.getdoc(bigframes.session.Session.from_glob_path)
467468

469+
_default_location_lock = threading.Lock()
470+
468471

469472
def _set_default_session_location_if_possible(query):
470473
# Set the location as per the query if this is the first query the user is
@@ -475,31 +478,34 @@ def _set_default_session_location_if_possible(query):
475478
# If query is a table name, then it would be the location of the table.
476479
# If query is a SQL with a table, then it would be table's location.
477480
# If query is a SQL with no table, then it would be the BQ default location.
478-
if (
479-
config.options.bigquery._session_started
480-
or config.options.bigquery.location
481-
or config.options.bigquery.use_regional_endpoints
482-
):
483-
return
484-
485-
clients_provider = bigframes.session.clients.ClientsProvider(
486-
project=config.options.bigquery.project,
487-
location=config.options.bigquery.location,
488-
use_regional_endpoints=config.options.bigquery.use_regional_endpoints,
489-
credentials=config.options.bigquery.credentials,
490-
application_name=config.options.bigquery.application_name,
491-
bq_kms_key_name=config.options.bigquery.kms_key_name,
492-
client_endpoints_override=config.options.bigquery.client_endpoints_override,
493-
)
494-
495-
bqclient = clients_provider.bqclient
496-
497-
if bigframes.session._io.bigquery.is_query(query):
498-
# Intentionally run outside of the session so that we can detect the
499-
# location before creating the session. Since it's a dry_run, labels
500-
# aren't necessary.
501-
job = bqclient.query(query, bigquery.QueryJobConfig(dry_run=True))
502-
config.options.bigquery.location = job.location
503-
else:
504-
table = bqclient.get_table(query)
505-
config.options.bigquery.location = table.location
481+
global _default_location_lock
482+
483+
with _default_location_lock:
484+
if (
485+
config.options.bigquery._session_started
486+
or config.options.bigquery.location
487+
or config.options.bigquery.use_regional_endpoints
488+
):
489+
return
490+
491+
clients_provider = bigframes.session.clients.ClientsProvider(
492+
project=config.options.bigquery.project,
493+
location=config.options.bigquery.location,
494+
use_regional_endpoints=config.options.bigquery.use_regional_endpoints,
495+
credentials=config.options.bigquery.credentials,
496+
application_name=config.options.bigquery.application_name,
497+
bq_kms_key_name=config.options.bigquery.kms_key_name,
498+
client_endpoints_override=config.options.bigquery.client_endpoints_override,
499+
)
500+
501+
bqclient = clients_provider.bqclient
502+
503+
if bigframes.session._io.bigquery.is_query(query):
504+
# Intentionally run outside of the session so that we can detect the
505+
# location before creating the session. Since it's a dry_run, labels
506+
# aren't necessary.
507+
job = bqclient.query(query, bigquery.QueryJobConfig(dry_run=True))
508+
config.options.bigquery.location = job.location
509+
else:
510+
table = bqclient.get_table(query)
511+
config.options.bigquery.location = table.location

bigframes/session/clients.py

Lines changed: 102 additions & 75 deletions
Original file line numberDiff line numberDiff line change
@@ -15,12 +15,12 @@
1515
"""Clients manages the connection to Google APIs."""
1616

1717
import os
18+
import threading
1819
import typing
1920
from typing import Optional
2021

2122
import google.api_core.client_info
2223
import google.api_core.client_options
23-
import google.api_core.exceptions
2424
import google.api_core.gapic_v1.client_info
2525
import google.auth.credentials
2626
import google.cloud.bigquery as bigquery
@@ -84,6 +84,9 @@ def __init__(
8484
if credentials is None:
8585
credentials, credentials_project = _get_default_credentials_with_project()
8686

87+
# Ensure an access token is available.
88+
credentials.refresh(google.auth.transport.requests.Request())
89+
8790
# Prefer the project in this order:
8891
# 1. Project explicitly specified by the user
8992
# 2. Project set in the environment
@@ -127,19 +130,30 @@ def __init__(
127130
self._client_endpoints_override = client_endpoints_override
128131

129132
# cloud clients initialized for lazy load
133+
self._bqclient_lock = threading.Lock()
130134
self._bqclient = None
135+
136+
self._bqconnectionclient_lock = threading.Lock()
131137
self._bqconnectionclient: Optional[
132138
google.cloud.bigquery_connection_v1.ConnectionServiceClient
133139
] = None
140+
141+
self._bqstoragereadclient_lock = threading.Lock()
134142
self._bqstoragereadclient: Optional[
135143
google.cloud.bigquery_storage_v1.BigQueryReadClient
136144
] = None
145+
146+
self._bqstoragewriteclient_lock = threading.Lock()
137147
self._bqstoragewriteclient: Optional[
138148
google.cloud.bigquery_storage_v1.BigQueryWriteClient
139149
] = None
150+
151+
self._cloudfunctionsclient_lock = threading.Lock()
140152
self._cloudfunctionsclient: Optional[
141153
google.cloud.functions_v2.FunctionServiceClient
142154
] = None
155+
156+
self._resourcemanagerclient_lock = threading.Lock()
143157
self._resourcemanagerclient: Optional[
144158
google.cloud.resourcemanager_v3.ProjectsClient
145159
] = None
@@ -166,6 +180,7 @@ def _create_bigquery_client(self):
166180
project=self._project,
167181
location=self._location,
168182
)
183+
169184
if self._bq_kms_key_name:
170185
# Note: Key configuration only applies automatically to load and query jobs, not copy jobs.
171186
encryption_config = bigquery.EncryptionConfiguration(
@@ -186,114 +201,126 @@ def _create_bigquery_client(self):
186201

187202
@property
188203
def bqclient(self):
189-
if not self._bqclient:
190-
self._bqclient = self._create_bigquery_client()
204+
with self._bqclient_lock:
205+
if not self._bqclient:
206+
self._bqclient = self._create_bigquery_client()
191207

192208
return self._bqclient
193209

194210
@property
195211
def bqconnectionclient(self):
196-
if not self._bqconnectionclient:
197-
bqconnection_options = None
198-
if "bqconnectionclient" in self._client_endpoints_override:
199-
bqconnection_options = google.api_core.client_options.ClientOptions(
200-
api_endpoint=self._client_endpoints_override["bqconnectionclient"]
201-
)
212+
with self._bqconnectionclient_lock:
213+
if not self._bqconnectionclient:
214+
bqconnection_options = None
215+
if "bqconnectionclient" in self._client_endpoints_override:
216+
bqconnection_options = google.api_core.client_options.ClientOptions(
217+
api_endpoint=self._client_endpoints_override[
218+
"bqconnectionclient"
219+
]
220+
)
202221

203-
bqconnection_info = google.api_core.gapic_v1.client_info.ClientInfo(
204-
user_agent=self._application_name
205-
)
206-
self._bqconnectionclient = (
207-
google.cloud.bigquery_connection_v1.ConnectionServiceClient(
208-
client_info=bqconnection_info,
209-
client_options=bqconnection_options,
210-
credentials=self._credentials,
222+
bqconnection_info = google.api_core.gapic_v1.client_info.ClientInfo(
223+
user_agent=self._application_name
224+
)
225+
self._bqconnectionclient = (
226+
google.cloud.bigquery_connection_v1.ConnectionServiceClient(
227+
client_info=bqconnection_info,
228+
client_options=bqconnection_options,
229+
credentials=self._credentials,
230+
)
211231
)
212-
)
213232

214233
return self._bqconnectionclient
215234

216235
@property
217236
def bqstoragereadclient(self):
218-
if not self._bqstoragereadclient:
219-
bqstorage_options = None
220-
if "bqstoragereadclient" in self._client_endpoints_override:
221-
bqstorage_options = google.api_core.client_options.ClientOptions(
222-
api_endpoint=self._client_endpoints_override["bqstoragereadclient"]
223-
)
224-
elif self._use_regional_endpoints:
225-
bqstorage_options = google.api_core.client_options.ClientOptions(
226-
api_endpoint=_BIGQUERYSTORAGE_REGIONAL_ENDPOINT.format(
227-
location=self._location
237+
with self._bqstoragereadclient_lock:
238+
if not self._bqstoragereadclient:
239+
bqstorage_options = None
240+
if "bqstoragereadclient" in self._client_endpoints_override:
241+
bqstorage_options = google.api_core.client_options.ClientOptions(
242+
api_endpoint=self._client_endpoints_override[
243+
"bqstoragereadclient"
244+
]
245+
)
246+
elif self._use_regional_endpoints:
247+
bqstorage_options = google.api_core.client_options.ClientOptions(
248+
api_endpoint=_BIGQUERYSTORAGE_REGIONAL_ENDPOINT.format(
249+
location=self._location
250+
)
228251
)
229-
)
230252

231-
bqstorage_info = google.api_core.gapic_v1.client_info.ClientInfo(
232-
user_agent=self._application_name
233-
)
234-
self._bqstoragereadclient = (
235-
google.cloud.bigquery_storage_v1.BigQueryReadClient(
236-
client_info=bqstorage_info,
237-
client_options=bqstorage_options,
238-
credentials=self._credentials,
253+
bqstorage_info = google.api_core.gapic_v1.client_info.ClientInfo(
254+
user_agent=self._application_name
255+
)
256+
self._bqstoragereadclient = (
257+
google.cloud.bigquery_storage_v1.BigQueryReadClient(
258+
client_info=bqstorage_info,
259+
client_options=bqstorage_options,
260+
credentials=self._credentials,
261+
)
239262
)
240-
)
241263

242264
return self._bqstoragereadclient
243265

244266
@property
245267
def bqstoragewriteclient(self):
246-
if not self._bqstoragewriteclient:
247-
bqstorage_options = None
248-
if "bqstoragewriteclient" in self._client_endpoints_override:
249-
bqstorage_options = google.api_core.client_options.ClientOptions(
250-
api_endpoint=self._client_endpoints_override["bqstoragewriteclient"]
251-
)
252-
elif self._use_regional_endpoints:
253-
bqstorage_options = google.api_core.client_options.ClientOptions(
254-
api_endpoint=_BIGQUERYSTORAGE_REGIONAL_ENDPOINT.format(
255-
location=self._location
268+
with self._bqstoragewriteclient_lock:
269+
if not self._bqstoragewriteclient:
270+
bqstorage_options = None
271+
if "bqstoragewriteclient" in self._client_endpoints_override:
272+
bqstorage_options = google.api_core.client_options.ClientOptions(
273+
api_endpoint=self._client_endpoints_override[
274+
"bqstoragewriteclient"
275+
]
276+
)
277+
elif self._use_regional_endpoints:
278+
bqstorage_options = google.api_core.client_options.ClientOptions(
279+
api_endpoint=_BIGQUERYSTORAGE_REGIONAL_ENDPOINT.format(
280+
location=self._location
281+
)
256282
)
257-
)
258283

259-
bqstorage_info = google.api_core.gapic_v1.client_info.ClientInfo(
260-
user_agent=self._application_name
261-
)
262-
self._bqstoragewriteclient = (
263-
google.cloud.bigquery_storage_v1.BigQueryWriteClient(
264-
client_info=bqstorage_info,
265-
client_options=bqstorage_options,
266-
credentials=self._credentials,
284+
bqstorage_info = google.api_core.gapic_v1.client_info.ClientInfo(
285+
user_agent=self._application_name
286+
)
287+
self._bqstoragewriteclient = (
288+
google.cloud.bigquery_storage_v1.BigQueryWriteClient(
289+
client_info=bqstorage_info,
290+
client_options=bqstorage_options,
291+
credentials=self._credentials,
292+
)
267293
)
268-
)
269294

270295
return self._bqstoragewriteclient
271296

272297
@property
273298
def cloudfunctionsclient(self):
274-
if not self._cloudfunctionsclient:
275-
functions_info = google.api_core.gapic_v1.client_info.ClientInfo(
276-
user_agent=self._application_name
277-
)
278-
self._cloudfunctionsclient = (
279-
google.cloud.functions_v2.FunctionServiceClient(
280-
client_info=functions_info,
281-
credentials=self._credentials,
299+
with self._cloudfunctionsclient_lock:
300+
if not self._cloudfunctionsclient:
301+
functions_info = google.api_core.gapic_v1.client_info.ClientInfo(
302+
user_agent=self._application_name
303+
)
304+
self._cloudfunctionsclient = (
305+
google.cloud.functions_v2.FunctionServiceClient(
306+
client_info=functions_info,
307+
credentials=self._credentials,
308+
)
282309
)
283-
)
284310

285311
return self._cloudfunctionsclient
286312

287313
@property
288314
def resourcemanagerclient(self):
289-
if not self._resourcemanagerclient:
290-
resourcemanager_info = google.api_core.gapic_v1.client_info.ClientInfo(
291-
user_agent=self._application_name
292-
)
293-
self._resourcemanagerclient = (
294-
google.cloud.resourcemanager_v3.ProjectsClient(
295-
credentials=self._credentials, client_info=resourcemanager_info
315+
with self._resourcemanagerclient_lock:
316+
if not self._resourcemanagerclient:
317+
resourcemanager_info = google.api_core.gapic_v1.client_info.ClientInfo(
318+
user_agent=self._application_name
319+
)
320+
self._resourcemanagerclient = (
321+
google.cloud.resourcemanager_v3.ProjectsClient(
322+
credentials=self._credentials, client_info=resourcemanager_info
323+
)
296324
)
297-
)
298325

299326
return self._resourcemanagerclient

0 commit comments

Comments
 (0)