Skip to content

Commit 0b6d3ea

Browse files
Sedosabenpankow
andauthored
[dagster-fivetran]: Add op_tags to build_fivetran_assets (#12474)
### Summary & Motivation Dagster supports the addition of custom k8s configurations to ops and jobs via tags. In an environment where a custom configuration is required to be attached e.g. Kubernetes Pods, the current `build_fivetran_assets` can't support attaching these tags to the underlying Op, causing the job to fail. In this PR I'm adding an optional `op_tags` parameter to support use case ### How I Tested These Changes Adding a unit test to asset op_tags are as expected --------- Co-authored-by: Ben Pankow <[email protected]>
1 parent dd3cf26 commit 0b6d3ea

File tree

2 files changed

+13
-1
lines changed

2 files changed

+13
-1
lines changed

python_modules/libraries/dagster-fivetran/dagster_fivetran/asset_defs.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ def _build_fivetran_assets(
4343
resource_defs: Optional[Mapping[str, ResourceDefinition]] = None,
4444
group_name: Optional[str] = None,
4545
infer_missing_tables: bool = False,
46+
op_tags: Optional[Mapping[str, Any]] = None,
4647
) -> Sequence[AssetsDefinition]:
4748
asset_key_prefix = check.opt_sequence_param(asset_key_prefix, "asset_key_prefix", of_type=str)
4849

@@ -69,6 +70,7 @@ def _build_fivetran_assets(
6970
compute_kind="fivetran",
7071
resource_defs=resource_defs,
7172
group_name=group_name,
73+
op_tags=op_tags,
7274
)
7375
def _assets(context):
7476
fivetran_output = context.resources.fivetran.sync_and_poll(
@@ -118,6 +120,7 @@ def build_fivetran_assets(
118120
metadata_by_table_name: Optional[Mapping[str, MetadataUserInput]] = None,
119121
group_name: Optional[str] = None,
120122
infer_missing_tables: bool = False,
123+
op_tags: Optional[Mapping[str, Any]] = None,
121124
) -> Sequence[AssetsDefinition]:
122125
"""
123126
Build a set of assets for a given Fivetran connector.
@@ -147,6 +150,10 @@ def build_fivetran_assets(
147150
in destination_tables even if they are not present in the Fivetran sync output. This is useful
148151
in cases where Fivetran does not sync any data for a table and therefore does not include it
149152
in the sync output API response.
153+
op_tags (Optional[Dict[str, Any]]):
154+
A dictionary of tags for the op that computes the asset. Frameworks may expect and
155+
require certain metadata to be attached to a op. Values that are not strings will be
156+
json encoded and must meet the criteria that json.loads(json.dumps(value)) == value.
150157
151158
**Examples:**
152159
@@ -193,6 +200,7 @@ def build_fivetran_assets(
193200
metadata_by_table_name=metadata_by_table_name,
194201
group_name=group_name,
195202
infer_missing_tables=infer_missing_tables,
203+
op_tags=op_tags,
196204
)
197205

198206

python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/test_asset_defs.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,8 @@ def test_fivetran_group_label(group_name, expected_group_name):
5656
(["schema1.tracked", "does.not_exist"], True, False),
5757
],
5858
)
59-
def test_fivetran_asset_run(tables, infer_missing_tables, should_error, schema_prefix):
59+
@pytest.mark.parametrize("op_tags", [None, {"key1": "value1"}])
60+
def test_fivetran_asset_run(tables, infer_missing_tables, should_error, schema_prefix, op_tags):
6061
ft_resource = fivetran_resource.configured({"api_key": "foo", "api_secret": "bar"})
6162
final_data = {"succeeded_at": "2021-01-01T02:00:00.0Z"}
6263
api_prefix = f"{FIVETRAN_API_BASE}/{FIVETRAN_API_VERSION_PATH}{FIVETRAN_CONNECTOR_PATH}{DEFAULT_CONNECTOR_ID}"
@@ -70,12 +71,15 @@ def test_fivetran_asset_run(tables, infer_missing_tables, should_error, schema_p
7071
poll_interval=0.1,
7172
poll_timeout=10,
7273
infer_missing_tables=infer_missing_tables,
74+
op_tags=op_tags,
7375
)
7476

7577
# expect the multi asset to have one asset key and one output for each specified asset key
7678
assert fivetran_assets[0].keys == {AssetKey(table.split(".")) for table in tables}
7779
assert len(fivetran_assets[0].op.output_defs) == len(tables)
7880

81+
assert fivetran_assets[0].op.tags == {**{"kind": "fivetran"}, **(op_tags or {})}
82+
7983
fivetran_assets_job = build_assets_job(
8084
name="fivetran_assets_job",
8185
assets=fivetran_assets,

0 commit comments

Comments
 (0)