28
28
import uuid
29
29
from copy import deepcopy
30
30
from datetime import datetime , timedelta
31
+ from functools import cached_property
31
32
from typing import TYPE_CHECKING , Any , Iterable , Mapping , NoReturn , Sequence , Union , cast
32
33
33
34
from aiohttp import ClientSession as ClientSession
@@ -103,14 +104,49 @@ class BigQueryHook(GoogleBaseHook, DbApiHook):
103
104
conn_type = "gcpbigquery"
104
105
hook_name = "Google Bigquery"
105
106
107
+ @classmethod
108
+ def get_connection_form_widgets (cls ) -> dict [str , Any ]:
109
+ """Return connection widgets to add to connection form."""
110
+ from flask_appbuilder .fieldwidgets import BS3TextFieldWidget
111
+ from flask_babel import lazy_gettext
112
+ from wtforms import validators
113
+ from wtforms .fields .simple import BooleanField , StringField
114
+
115
+ from airflow .www .validators import ValidJson
116
+
117
+ connection_form_widgets = super ().get_connection_form_widgets ()
118
+ connection_form_widgets ["use_legacy_sql" ] = BooleanField (lazy_gettext ("Use Legacy SQL" ), default = True )
119
+ connection_form_widgets ["location" ] = StringField (
120
+ lazy_gettext ("Location" ), widget = BS3TextFieldWidget ()
121
+ )
122
+ connection_form_widgets ["priority" ] = StringField (
123
+ lazy_gettext ("Priority" ),
124
+ default = "INTERACTIVE" ,
125
+ widget = BS3TextFieldWidget (),
126
+ validators = [validators .AnyOf (["INTERACTIVE" , "BATCH" ])],
127
+ )
128
+ connection_form_widgets ["api_resource_configs" ] = StringField (
129
+ lazy_gettext ("API Resource Configs" ), widget = BS3TextFieldWidget (), validators = [ValidJson ()]
130
+ )
131
+ connection_form_widgets ["labels" ] = StringField (
132
+ lazy_gettext ("Labels" ), widget = BS3TextFieldWidget (), validators = [ValidJson ()]
133
+ )
134
+ connection_form_widgets ["labels" ] = StringField (
135
+ lazy_gettext ("Labels" ), widget = BS3TextFieldWidget (), validators = [ValidJson ()]
136
+ )
137
+ return connection_form_widgets
138
+
139
+ @classmethod
140
+ def get_ui_field_behaviour (cls ) -> dict [str , Any ]:
141
+ """Return custom field behaviour."""
142
+ return super ().get_ui_field_behaviour ()
143
+
106
144
def __init__ (
107
145
self ,
108
- gcp_conn_id : str = GoogleBaseHook .default_conn_name ,
109
146
use_legacy_sql : bool = True ,
110
147
location : str | None = None ,
111
148
priority : str = "INTERACTIVE" ,
112
149
api_resource_configs : dict | None = None ,
113
- impersonation_chain : str | Sequence [str ] | None = None ,
114
150
impersonation_scopes : str | Sequence [str ] | None = None ,
115
151
labels : dict | None = None ,
116
152
** kwargs ,
@@ -120,18 +156,25 @@ def __init__(
120
156
"The `delegate_to` parameter has been deprecated before and finally removed in this version"
121
157
" of Google Provider. You MUST convert it to `impersonate_chain`"
122
158
)
123
- super ().__init__ (
124
- gcp_conn_id = gcp_conn_id ,
125
- impersonation_chain = impersonation_chain ,
126
- )
127
- self .use_legacy_sql = use_legacy_sql
128
- self .location = location
129
- self .priority = priority
159
+ super ().__init__ (** kwargs )
160
+ self .use_legacy_sql : bool = self ._get_field ("use_legacy_sql" , use_legacy_sql )
161
+ self .location : str | None = self ._get_field ("location" , location )
162
+ self .priority : str = self ._get_field ("priority" , priority )
130
163
self .running_job_id : str | None = None
131
- self .api_resource_configs : dict = api_resource_configs or {}
132
- self .labels = labels
133
- self .credentials_path = "bigquery_hook_credentials.json"
134
- self .impersonation_scopes = impersonation_scopes
164
+ self .api_resource_configs : dict = self ._get_field ("api_resource_configs" , api_resource_configs or {})
165
+ self .labels = self ._get_field ("labels" , labels or {})
166
+ self .impersonation_scopes : str | Sequence [str ] | None = impersonation_scopes
167
+
168
+ @cached_property
169
+ @deprecated (
170
+ reason = (
171
+ "`BigQueryHook.credentials_path` property is deprecated and will be removed in the future. "
172
+ "This property used for obtaining credentials path but no longer in actual use. "
173
+ ),
174
+ category = AirflowProviderDeprecationWarning ,
175
+ )
176
+ def credentials_path (self ) -> str :
177
+ return "bigquery_hook_credentials.json"
135
178
136
179
def get_conn (self ) -> BigQueryConnection :
137
180
"""Get a BigQuery PEP 249 connection object."""
@@ -172,18 +215,17 @@ def get_uri(self) -> str:
172
215
"""Override from ``DbApiHook`` for ``get_sqlalchemy_engine()``."""
173
216
return f"bigquery://{ self .project_id } "
174
217
175
- def get_sqlalchemy_engine (self , engine_kwargs = None ):
218
+ def get_sqlalchemy_engine (self , engine_kwargs : dict | None = None ):
176
219
"""Create an SQLAlchemy engine object.
177
220
178
221
:param engine_kwargs: Kwargs used in :func:`~sqlalchemy.create_engine`.
179
222
"""
180
223
if engine_kwargs is None :
181
224
engine_kwargs = {}
182
- extras = self .get_connection (self .gcp_conn_id ).extra_dejson
183
- credentials_path = get_field (extras , "key_path" )
225
+ credentials_path = get_field (self .extras , "key_path" )
184
226
if credentials_path :
185
227
return create_engine (self .get_uri (), credentials_path = credentials_path , ** engine_kwargs )
186
- keyfile_dict = get_field (extras , "keyfile_dict" )
228
+ keyfile_dict = get_field (self . extras , "keyfile_dict" )
187
229
if keyfile_dict :
188
230
keyfile_content = keyfile_dict if isinstance (keyfile_dict , dict ) else json .loads (keyfile_dict )
189
231
return create_engine (self .get_uri (), credentials_info = keyfile_content , ** engine_kwargs )
0 commit comments