Skip to content

Commit 1b1e36f

Browse files
authored
Use cache to reduce redundant database calls (#1488)
We were calling DB unnecessarily at various places to get the same info. This can be cached, this is evident from the below logs where it is reduced from 7 calls to 1 : *Before 1*: ``` [2022-12-23 02:09:16,425] {dag.py:3622} INFO - Running task top_five_animations [2022-12-23 02:09:16,438] {taskinstance.py:1511} INFO - Exporting the following env vars: ... [2022-12-23 02:09:16,439] {base.py:73} INFO - Using connection ID 'sqlite_default' for task execution. [2022-12-23 02:09:16,440] {base.py:73} INFO - Using connection ID 'sqlite_default' for task execution. [2022-12-23 02:09:16,440] {base_decorator.py:124} INFO - Returning table Table(name='top_animation', conn_id='sqlite_default', metadata=Metadata(schema=None, database=None), columns=[], temp=False, uri='astro://@?table=top_animation', extra={}) [2022-12-23 02:09:16,440] {base_decorator.py:124} INFO - Returning table Table(name='top_animation', conn_id='sqlite_default', metadata=Metadata(schema=None, database=None), columns=[], temp=False, uri='astro://@?table=top_animation', extra={}) [2022-12-23 02:09:16,441] {base.py:73} INFO - Using connection ID 'sqlite_default' for task execution. [2022-12-23 02:09:16,442] {base.py:73} INFO - Using connection ID 'sqlite_default' for task execution. [2022-12-23 02:09:16,445] {base.py:73} INFO - Using connection ID 'sqlite_default' for task execution. [2022-12-23 02:09:16,450] {base.py:73} INFO - Using connection ID 'sqlite_default' for task execution. [2022-12-23 02:09:16,450] {base.py:73} INFO - Using connection ID 'sqlite_default' for task execution. [2022-12-23 02:09:16,461] {taskinstance.py:1322} INFO - Marking task as SUCCESS. dag_id=calculate_popular_movies, task_id=top_five_animations, execution_date=20221223T020915, start_date=, end_date=20221223T020916 [2022-12-23 02:09:16,461] {taskinstance.py:1322} INFO - Marking task as SUCCESS. dag_id=calculate_popular_movies, task_id=top_five_animations, execution_date=20221223T020915, start_date=, end_date=20221223T020916 [2022-12-23 02:09:16,464] {dag.py:3626} INFO - top_five_animations ran successfully! [2022-12-23 02:09:16,464] {dag.py:3629} INFO - ***************************************************** [2022-12-23 02:09:16,465] {dagrun.py:606} INFO - Marking run <DagRun calculate_popular_movies @ 2022-12-23T02:09:15.324979+00:00: manual__2022-12-23T02:09:15.324979+00:00, state:running, queued_at: None. externally triggered: False> successful ``` *After 1*: ``` [2022-12-23 02:20:18,669] {dag.py:3622} INFO - Running task top_five_animations [2022-12-23 02:20:18,680] {taskinstance.py:1511} INFO - Exporting the following env vars: ... [2022-12-23 02:20:18,681] {base_decorator.py:124} INFO - Returning table Table(name='top_animation', conn_id='sqlite_default', metadata=Metadata(schema=None, database=None), columns=[], temp=False, uri='astro://@?table=top_animation', extra={}) [2022-12-23 02:20:18,681] {base_decorator.py:124} INFO - Returning table Table(name='top_animation', conn_id='sqlite_default', metadata=Metadata(schema=None, database=None), columns=[], temp=False, uri='astro://@?table=top_animation', extra={}) [2022-12-23 02:20:18,686] {base.py:73} INFO - Using connection ID 'sqlite_default' for task execution. [2022-12-23 02:20:18,708] {taskinstance.py:1322} INFO - Marking task as SUCCESS. dag_id=calculate_popular_movies, task_id=top_five_animations, execution_date=20221223T022017, start_date=, end_date=20221223T022018 [2022-12-23 02:20:18,708] {taskinstance.py:1322} INFO - Marking task as SUCCESS. dag_id=calculate_popular_movies, task_id=top_five_animations, execution_date=20221223T022017, start_date=, end_date=20221223T022018 [2022-12-23 02:20:18,711] {dag.py:3626} INFO - top_five_animations ran successfully! [2022-12-23 02:20:18,711] {dag.py:3629} INFO - ***************************************************** [2022-12-23 02:20:18,713] {dagrun.py:606} INFO - Marking run <DagRun calculate_popular_movies @ 2022-12-23T02:20:17.396648+00:00: manual__2022-12-23T02:20:17.396648+00:00, state:running, queued_at: None. externally triggered: False> successful [2022-12-23 02:20:18,715] {dagrun.py:657} INFO - DagRun Finished: dag_id=calculate_popular_movies, execution_date=2022-12-23T02:20:17.396648+00:00, run_id=manual__2022-12-23T02:20:17.396648+00:00, run_start_date=2022-12-23 02:20:17.396648+00:00, run_end_date=2022-12-23 02:20:18.713514+00:00, run_duration=1.316866, state=success, external_trigger=False, run_type=manual, data_interval_start=2022-12-23T02:20:17.396648+00:00, data_interval_end=2022-12-23T02:20:17.396648+00:00, dag_hash=None ``` *Before 2*: ``` [2022-12-23 01:55:54,386] {load_file.py:92} INFO - Loading https://raw.githubusercontent.com/astronomer/astro-sdk/main/tests/data/imdb_v2.csv into TempTable(name='_tmp_ztujoeesefaqclout728qnyomrc96suvgsntxnen11z4n40ia9wd99roe', conn_id='sqlite_default', metadata=Metadata(schema=None, database=None), columns=[], temp=True) ... [2022-12-23 01:55:54,388] {base.py:73} INFO - Using connection ID 'sqlite_default' for task execution. [2022-12-23 01:55:54,393] {base.py:73} INFO - Using connection ID 'sqlite_default' for task execution. [2022-12-23 01:55:54,499] {base.py:73} INFO - Using connection ID 'sqlite_default' for task execution. [2022-12-23 01:55:54,507] {base.py:499} INFO - Loading file(s) with Pandas... [2022-12-23 01:55:54,606] {base.py:73} INFO - Using connection ID 'sqlite_default' for task execution. [2022-12-23 01:55:54,658] {load_file.py:124} INFO - Completed loading the data into TempTable(name='_tmp_ztujoeesefaqclout728qnyomrc96suvgsntxnen11z4n40ia9wd99roe', conn_id='sqlite_default', metadata=Metadata(schema=None, database=None), columns=[], temp=True). [2022-12-23 01:55:54,663] {taskinstance.py:1322} INFO - Marking task as SUCCESS. dag_id=calculate_popular_movies, task_id=imdb_movies, execution_date=20221223T015554, start_date=, end_date=20221223T015554 ``` *After 2*: ``` [2022-12-23 01:56:37,620] {load_file.py:92} INFO - Loading https://raw.githubusercontent.com/astronomer/astro-sdk/main/tests/data/imdb_v2.csv into TempTable(name='_tmp_rnagpj5gmps5a3oplvlwvlmv6u918qw21inanpxg2j56lo725mrzgp9jo', conn_id='sqlite_default', metadata=Metadata(schema=None, database=None), columns=[], temp=True) ... [2022-12-23 01:56:37,621] {base.py:73} INFO - Using connection ID 'sqlite_default' for task execution. [2022-12-23 01:56:37,625] {base.py:73} INFO - Using connection ID 'sqlite_default' for task execution. [2022-12-23 01:56:37,730] {base.py:501} INFO - Loading file(s) with Pandas... [2022-12-23 01:56:37,881] {load_file.py:124} INFO - Completed loading the data into TempTable(name='_tmp_rnagpj5gmps5a3oplvlwvlmv6u918qw21inanpxg2j56lo725mrzgp9jo', conn_id='sqlite_default', metadata=Metadata(schema=None, database=None), columns=[], temp=True). [2022-12-23 01:56:37,886] {taskinstance.py:1322} INFO - Marking task as SUCCESS. dag_id=calculate_popular_movies, task_id=imdb_movies, execution_date=20221223T015637, start_date=, end_date=20221223T015637 ```
1 parent 001f36c commit 1b1e36f

30 files changed

+118
-113
lines changed

python-sdk/dev/scripts/pre_commit_context_typing_compat.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212

1313
SOURCES_ROOT = Path(__file__).parents[2]
1414
ASTRO_ROOT = SOURCES_ROOT / "src" / "astro"
15-
TYPING_COMPAT_PATH = "python-sdk/src/astro/utils/typing_compat.py"
15+
TYPING_COMPAT_PATH = "python-sdk/src/astro/utils/compat/typing.py"
1616

1717

1818
class ImportCrawler(NodeVisitor):

python-sdk/pyproject.toml

+2-1
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,8 @@ dependencies = [
2424
"python-frontmatter",
2525
"smart-open",
2626
"SQLAlchemy>=1.3.18",
27-
"apache-airflow-providers-common-sql"
27+
"apache-airflow-providers-common-sql",
28+
"cached_property>=1.5.0;python_version<='3.7'"
2829
]
2930

3031
keywords = ["airflow", "provider", "astronomer", "sql", "decorator", "task flow", "elt", "etl", "dag"]

python-sdk/src/astro/databases/__init__.py

+11-5
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
from typing import TYPE_CHECKING
66

77
from astro.options import LoadOptionsList
8+
from astro.utils.compat.functools import cache
89
from astro.utils.path import get_class_name, get_dict_with_module_names_to_dot_notations
910

1011
if TYPE_CHECKING: # pragma: no cover
@@ -34,13 +35,18 @@ def create_database(
3435
:param conn_id: Database connection ID in Airflow
3536
:param table: (optional) The Table object
3637
"""
37-
from airflow.hooks.base import BaseHook
38-
39-
conn_type = BaseHook.get_connection(conn_id).conn_type
40-
module_path = CONN_TYPE_TO_MODULE_PATH[conn_type]
41-
module = importlib.import_module(module_path)
38+
module = importlib.import_module(_get_conn(conn_id))
4239
class_name = get_class_name(module_ref=module, suffix="Database")
4340
database_class = getattr(module, class_name)
4441
load_options = load_options_list and load_options_list.get(database_class)
4542
database: BaseDatabase = database_class(conn_id, table, load_options=load_options)
4643
return database
44+
45+
46+
@cache
47+
def _get_conn(conn_id: str) -> str:
48+
from airflow.hooks.base import BaseHook
49+
50+
conn_type = BaseHook.get_connection(conn_id).conn_type
51+
module_path = CONN_TYPE_TO_MODULE_PATH[conn_type]
52+
return module_path

python-sdk/src/astro/databases/aws/redshift.py

+3-2
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
from astro.options import LoadOptions
3535
from astro.settings import REDSHIFT_SCHEMA
3636
from astro.table import BaseTable, Metadata, Table
37+
from astro.utils.compat.functools import cached_property
3738

3839
DEFAULT_CONN_ID = RedshiftSQLHook.default_conn_name
3940
NATIVE_PATHS_SUPPORTED_FILE_TYPES = {
@@ -89,7 +90,7 @@ def __init__(
8990
def sql_type(self):
9091
return "redshift"
9192

92-
@property
93+
@cached_property
9394
def hook(self) -> RedshiftSQLHook:
9495
"""Retrieve Airflow hook to interface with the Redshift database."""
9596
kwargs = {}
@@ -100,7 +101,7 @@ def hook(self) -> RedshiftSQLHook:
100101
kwargs.update({"schema": self.table.metadata.database})
101102
return RedshiftSQLHook(redshift_conn_id=self.conn_id, use_legacy_sql=False, **kwargs)
102103

103-
@property
104+
@cached_property
104105
def sqlalchemy_engine(self) -> Engine:
105106
"""Return SQAlchemy engine."""
106107
uri = self.hook.get_uri()

python-sdk/src/astro/databases/base.py

+3-2
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
from astro.options import LoadOptions
3737
from astro.settings import LOAD_FILE_ENABLE_NATIVE_FALLBACK, LOAD_TABLE_AUTODETECT_ROWS_COUNT, SCHEMA
3838
from astro.table import BaseTable, Metadata
39+
from astro.utils.compat.functools import cached_property
3940

4041

4142
class BaseDatabase(ABC):
@@ -85,7 +86,7 @@ def __repr__(self):
8586
def sql_type(self):
8687
raise NotImplementedError
8788

88-
@property
89+
@cached_property
8990
def hook(self) -> DbApiHook:
9091
"""Return an instance of the database-specific Airflow hook."""
9192
raise NotImplementedError
@@ -95,7 +96,7 @@ def connection(self) -> sqlalchemy.engine.base.Connection:
9596
"""Return a Sqlalchemy connection object for the given database."""
9697
return self.sqlalchemy_engine.connect()
9798

98-
@property
99+
@cached_property
99100
def sqlalchemy_engine(self) -> sqlalchemy.engine.base.Engine:
100101
"""Return Sqlalchemy engine."""
101102
return self.hook.get_sqlalchemy_engine() # type: ignore[no-any-return]

python-sdk/src/astro/databases/google/bigquery.py

+3-2
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@
4848
from astro.options import LoadOptions
4949
from astro.settings import BIGQUERY_SCHEMA, BIGQUERY_SCHEMA_LOCATION
5050
from astro.table import BaseTable, Metadata
51+
from astro.utils.compat.functools import cached_property
5152

5253
DEFAULT_CONN_ID = BigQueryHook.default_conn_name
5354
NATIVE_PATHS_SUPPORTED_FILE_TYPES = {
@@ -119,12 +120,12 @@ def __init__(
119120
def sql_type(self) -> str:
120121
return "bigquery"
121122

122-
@property
123+
@cached_property
123124
def hook(self) -> BigQueryHook:
124125
"""Retrieve Airflow hook to interface with the BigQuery database."""
125126
return BigQueryHook(gcp_conn_id=self.conn_id, use_legacy_sql=False, location=BIGQUERY_SCHEMA_LOCATION)
126127

127-
@property
128+
@cached_property
128129
def sqlalchemy_engine(self) -> Engine:
129130
"""Return SQAlchemy engine."""
130131
uri = self.hook.get_uri()

python-sdk/src/astro/databases/postgres.py

+2-1
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
from astro.options import LoadOptions
1616
from astro.settings import POSTGRES_SCHEMA
1717
from astro.table import BaseTable, Metadata
18+
from astro.utils.compat.functools import cached_property
1819

1920
DEFAULT_CONN_ID = PostgresHook.default_conn_name
2021

@@ -43,7 +44,7 @@ def __init__(
4344
def sql_type(self) -> str:
4445
return "postgresql"
4546

46-
@property
47+
@cached_property
4748
def hook(self) -> PostgresHook:
4849
"""Retrieve Airflow hook to interface with the Postgres database."""
4950
conn = PostgresHook(postgres_conn_id=self.conn_id).get_connection(self.conn_id)

python-sdk/src/astro/databases/snowflake.py

+2-1
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
from astro.options import SnowflakeLoadOptions
4242
from astro.settings import LOAD_TABLE_AUTODETECT_ROWS_COUNT, SNOWFLAKE_SCHEMA
4343
from astro.table import BaseTable, Metadata
44+
from astro.utils.compat.functools import cached_property
4445

4546
DEFAULT_CONN_ID = SnowflakeHook.default_conn_name
4647

@@ -267,7 +268,7 @@ def __init__(
267268
raise ValueError("Error: Requires a SnowflakeLoadOptions")
268269
self.load_options: SnowflakeLoadOptions | None = load_options
269270

270-
@property
271+
@cached_property
271272
def hook(self) -> SnowflakeHook:
272273
"""Retrieve Airflow hook to interface with the snowflake database."""
273274
kwargs = {}

python-sdk/src/astro/databases/sqlite.py

+3-2
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
from astro.databases.base import BaseDatabase
1212
from astro.options import LoadOptions
1313
from astro.table import BaseTable, Metadata
14+
from astro.utils.compat.functools import cached_property
1415

1516
DEFAULT_CONN_ID = SqliteHook.default_conn_name
1617

@@ -35,12 +36,12 @@ def __init__(
3536
def sql_type(self) -> str:
3637
return "sqlite"
3738

38-
@property
39+
@cached_property
3940
def hook(self) -> SqliteHook:
4041
"""Retrieve Airflow hook to interface with the Sqlite database."""
4142
return SqliteHook(sqlite_conn_id=self.conn_id)
4243

43-
@property
44+
@cached_property
4445
def sqlalchemy_engine(self) -> Engine:
4546
"""Return SQAlchemy engine."""
4647
# Airflow uses sqlite3 library and not SqlAlchemy for SqliteHook

python-sdk/src/astro/files/operators/files.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77

88
from astro.files.base import File
99
from astro.files.locations import create_file_location
10-
from astro.utils.typing_compat import Context
10+
from astro.utils.compat.typing import Context
1111

1212

1313
class ListFileOperator(BaseOperator):

python-sdk/src/astro/sql/operators/append.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
from astro.databases import create_database
1010
from astro.sql.operators.base_operator import AstroSQLBaseOperator
1111
from astro.table import BaseTable
12-
from astro.utils.typing_compat import Context
12+
from astro.utils.compat.typing import Context
1313

1414

1515
class AppendOperator(AstroSQLBaseOperator):

python-sdk/src/astro/sql/operators/base_decorator.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,8 @@
1313
from astro.databases.base import BaseDatabase
1414
from astro.sql.operators.upstream_task_mixin import UpstreamTaskMixin
1515
from astro.table import BaseTable, Table
16+
from astro.utils.compat.typing import Context
1617
from astro.utils.table import find_first_table
17-
from astro.utils.typing_compat import Context
1818

1919

2020
class BaseSQLDecoratedOperator(UpstreamTaskMixin, DecoratedOperator):

python-sdk/src/astro/sql/operators/cleanup.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@
2626
from astro.sql.operators.dataframe import DataframeOperator
2727
from astro.sql.operators.load_file import LoadFileOperator
2828
from astro.table import BaseTable, TempTable
29-
from astro.utils.typing_compat import Context
29+
from astro.utils.compat.typing import Context
3030

3131
OPERATOR_CLASSES_WITH_TABLE_OUTPUT = (
3232
DataframeOperator,

python-sdk/src/astro/sql/operators/data_validations/ColumnCheckOperator.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88

99
from astro.databases import create_database
1010
from astro.table import BaseTable
11-
from astro.utils.typing_compat import Context
11+
from astro.utils.compat.typing import Context
1212

1313

1414
class ColumnCheckOperator(SQLColumnCheckOperator):

python-sdk/src/astro/sql/operators/dataframe.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,9 @@
2121
from astro.files import File
2222
from astro.sql.operators.base_operator import AstroSQLBaseOperator
2323
from astro.sql.table import BaseTable, Table
24+
from astro.utils.compat.typing import Context
2425
from astro.utils.dataframe import convert_columns_names_capitalization
2526
from astro.utils.table import find_first_table
26-
from astro.utils.typing_compat import Context
2727

2828

2929
def _get_dataframe(

python-sdk/src/astro/sql/operators/drop.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
from astro.databases import create_database
99
from astro.sql.operators.base_operator import AstroSQLBaseOperator
1010
from astro.table import BaseTable
11-
from astro.utils.typing_compat import Context
11+
from astro.utils.compat.typing import Context
1212

1313

1414
class DropTableOperator(AstroSQLBaseOperator):

python-sdk/src/astro/sql/operators/export_to_file.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
from astro.files import File
1313
from astro.sql.operators.base_operator import AstroSQLBaseOperator
1414
from astro.table import BaseTable, Table
15-
from astro.utils.typing_compat import Context
15+
from astro.utils.compat.typing import Context
1616

1717

1818
class ExportToFileOperator(AstroSQLBaseOperator):

python-sdk/src/astro/sql/operators/load_file.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
from astro.settings import LOAD_FILE_ENABLE_NATIVE_FALLBACK
1818
from astro.sql.operators.base_operator import AstroSQLBaseOperator
1919
from astro.table import BaseTable
20-
from astro.utils.typing_compat import Context
20+
from astro.utils.compat.typing import Context
2121

2222

2323
class LoadFileOperator(AstroSQLBaseOperator):

python-sdk/src/astro/sql/operators/merge.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
from astro.databases import create_database
1111
from astro.sql.operators.base_operator import AstroSQLBaseOperator
1212
from astro.table import BaseTable
13-
from astro.utils.typing_compat import Context
13+
from astro.utils.compat.typing import Context
1414

1515

1616
class MergeOperator(AstroSQLBaseOperator):

python-sdk/src/astro/sql/operators/raw_sql.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
from astro import settings
2121
from astro.exceptions import IllegalLoadToDatabaseException
2222
from astro.sql.operators.base_decorator import BaseSQLDecoratedOperator
23-
from astro.utils.typing_compat import Context
23+
from astro.utils.compat.typing import Context
2424

2525

2626
class RawSQLOperator(BaseSQLDecoratedOperator):

python-sdk/src/astro/sql/operators/transform.py

+4-2
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
from sqlalchemy.sql.functions import Function
1515

1616
from astro.sql.operators.base_decorator import BaseSQLDecoratedOperator
17-
from astro.utils.typing_compat import Context
17+
from astro.utils.compat.typing import Context
1818

1919

2020
class TransformOperator(BaseSQLDecoratedOperator):
@@ -60,7 +60,9 @@ def execute(self, context: Context):
6060
parameters=self.parameters,
6161
)
6262
# TODO: remove pushing to XCom once we update the airflow version.
63-
context["ti"].xcom_push(key="output_table_row_count", value=str(self.output_table.row_count))
63+
context["ti"].xcom_push(
64+
key="output_table_row_count", value=str(self.database_impl.row_count(self.output_table))
65+
)
6466
context["ti"].xcom_push(key="output_table_conn_id", value=str(self.output_table.conn_id))
6567
return self.output_table
6668

python-sdk/src/astro/utils/compat/__init__.py

Whitespace-only changes.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
from __future__ import annotations
2+
3+
import sys
4+
5+
if sys.version_info >= (3, 8):
6+
from functools import cached_property
7+
else:
8+
from cached_property import cached_property
9+
10+
if sys.version_info >= (3, 9):
11+
from functools import cache
12+
else:
13+
from functools import lru_cache
14+
15+
cache = lru_cache(maxsize=None)
16+
17+
18+
__all__ = ["cache", "cached_property"]

python-sdk/src/astro/utils/table.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
from airflow.models.xcom_arg import XComArg
77

88
from astro.sql.table import BaseTable
9-
from astro.utils.typing_compat import Context
9+
from astro.utils.compat.typing import Context
1010

1111

1212
def _have_same_conn_id(tables: list[BaseTable]) -> bool:

python-sdk/tests/databases/test_bigquery.py

+19
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010

1111
from astro import settings
1212
from astro.databases.google.bigquery import BigqueryDatabase, S3ToBigqueryDataTransfer
13+
from astro.exceptions import DatabaseCustomError
1314
from astro.files import File
1415
from astro.table import TEMP_PREFIX, Metadata, Table
1516

@@ -139,3 +140,21 @@ def mock_get_dataset(dataset_id):
139140

140141
db = BigqueryDatabase(table=source_table, conn_id="test_conn")
141142
assert db.populate_table_metadata(input_table) == returned_table
143+
144+
145+
@mock.patch("astro.databases.google.bigquery.BigqueryDatabase.hook")
146+
def test_get_project_id_raise_exception(mock_hook):
147+
"""
148+
Test loading on files to bigquery natively for fallback without fallback
149+
gracefully for wrong file location.
150+
"""
151+
152+
class CustomAttributeError:
153+
def __str__(self):
154+
raise AttributeError
155+
156+
database = BigqueryDatabase()
157+
mock_hook.project_id = CustomAttributeError()
158+
159+
with pytest.raises(DatabaseCustomError):
160+
database.get_project_id(target_table=Table())

0 commit comments

Comments
 (0)