Skip to content

Commit d25e4fa

Browse files
[cli] Implement dagster definitions validate (#23692)
## Summary & Motivation This PR implements `dagster definitions validate`, a command to validate the definitions of a Dagster project. --- ### Expected behavior: Have a command to validate the definitions of a Dagster project. A behavior similar to [Definitions.validate_loadable](https://github.com/dagster-io/dagster/blob/e755c47bc8e1f596fdb4c4d68551732ee656dcfb/python_modules/dagster/dagster/_core/definitions/definitions_class.py#L632-L643) is expected. Example: ``` > dagster definitions validate -f path/to/my/definitions.py Validating definitions in path/to/my/definitions.py ... > dagster definitions validate -m my_dagster_project.definitions Validating definitions in my_dagster_project.definitions. ... > dagster definitions validate Validating definitions in ... ... ``` ### Input: Similar to `dagster dev` as described [here](https://docs.dagster.io/guides/running-dagster-locally#locating-your-code) - the definitions can be validated: - from a file, specified as a command line argument, eg. `definitions.py` - from a module, specified as a command line argument, eg. `my_dagster_project.definitions` - without command line arguments, read from `pyproject.toml` or `workspace.yml`. ### Output: Initially as logs in stdout/stderr. ### In scope: - Support the validation of Dagster definitions defined in a `Definitions` object. - Implement a behavior similar to `Definitions.validate_loadable` where the enclosed definitions will be loadable by Dagster: - No assets have conflicting keys. - No jobs, sensors, or schedules have conflicting names. - All asset jobs can be resolved. - All resource requirements are satisfied. ### Out of scope: - Support the validation of Dagster definitions defined in a `Repository` object. - See comment [here](https://dagsterlabs.slack.com/archives/C06C9QVJBUL/p1723762251730029?thread_ts=1723761841.463129&cid=C06C9QVJBUL) - Validate definitions dependencies across code locations - See thread [here](https://dagsterlabs.slack.com/archives/C06C9QVJBUL/p1723642633214359) - Could be added in a next iteration. ### Potential improvement and next steps: - Support usage in CI/CD - Support output as JSON - Support definitions constraints - eg. Sensors' `minimum_interval_seconds` should always be of a at least 120 seconds. - Support validation of definitions with dependencies across code locations. ## How I Tested These Changes Tested locally BK with additional tests ## Changelog Add the `dagster definitions validate` command to Dagster CLI. This command validates if Dagster definitions are loadable. - [x] `NEW` _(added new feature or capability)_ - [ ] `BUGFIX` _(fixed a bug)_ - [ ] `DOCS` _(added or updated documentation)_
1 parent 215d515 commit d25e4fa

File tree

13 files changed

+197
-0
lines changed

13 files changed

+197
-0
lines changed

python_modules/dagster/dagster/_cli/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
from dagster._cli.asset import asset_cli
55
from dagster._cli.code_server import code_server_cli
66
from dagster._cli.debug import debug_cli
7+
from dagster._cli.definitions import definitions_cli
78
from dagster._cli.dev import dev_command
89
from dagster._cli.instance import instance_cli
910
from dagster._cli.job import job_cli
@@ -27,6 +28,7 @@ def create_dagster_cli():
2728
"project": project_cli,
2829
"dev": dev_command,
2930
"code-server": code_server_cli,
31+
"definitions": definitions_cli,
3032
}
3133

3234
@click.group(
Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
import logging
2+
import sys
3+
4+
import click
5+
6+
from dagster import __version__ as dagster_version
7+
from dagster._cli.job import apply_click_params
8+
from dagster._cli.utils import get_possibly_temporary_instance_for_cli
9+
from dagster._cli.workspace.cli_target import (
10+
ClickArgValue,
11+
get_workspace_from_kwargs,
12+
python_file_option,
13+
python_module_option,
14+
workspace_option,
15+
)
16+
from dagster._utils.log import configure_loggers
17+
18+
19+
@click.group(name="definitions")
20+
def definitions_cli():
21+
"""Commands for working with Dagster definitions."""
22+
23+
24+
def validate_command_options(f):
25+
return apply_click_params(
26+
f,
27+
workspace_option(),
28+
python_file_option(allow_multiple=True),
29+
python_module_option(allow_multiple=True),
30+
)
31+
32+
33+
@validate_command_options
34+
@click.option(
35+
"--log-level",
36+
help="Set the log level for dagster services.",
37+
show_default=True,
38+
default="info",
39+
type=click.Choice(["critical", "error", "warning", "info", "debug"], case_sensitive=False),
40+
)
41+
@click.option(
42+
"--log-format",
43+
type=click.Choice(["colored", "json", "rich"], case_sensitive=False),
44+
show_default=True,
45+
required=False,
46+
default="colored",
47+
help="Format of the logs for dagster services",
48+
)
49+
@definitions_cli.command(
50+
name="validate",
51+
help="Validate if Dagster definitions are loadable.",
52+
)
53+
def definitions_validate_command(log_level: str, log_format: str, **kwargs: ClickArgValue):
54+
configure_loggers(formatter=log_format, log_level=log_level.upper())
55+
logger = logging.getLogger("dagster")
56+
57+
logger.info("Starting validation...")
58+
with get_possibly_temporary_instance_for_cli(
59+
"dagster definitions validate", logger=logger
60+
) as instance:
61+
with get_workspace_from_kwargs(
62+
instance=instance, version=dagster_version, kwargs=kwargs
63+
) as workspace:
64+
invalid = any(
65+
entry
66+
for entry in workspace.get_code_location_entries().values()
67+
if entry.load_error
68+
)
69+
for code_location, entry in workspace.get_code_location_entries().items():
70+
if entry.load_error:
71+
logger.error(
72+
f"Validation failed for code location {code_location} with exception: "
73+
f"{entry.load_error.message}."
74+
)
75+
else:
76+
logger.info(f"Validation successful for code location {code_location}.")
77+
logger.info("Ending validation...")
78+
sys.exit(0) if not invalid else sys.exit(1)

python_modules/dagster/dagster_tests/cli_tests/command_tests/definitions_command_projects/empty_project/empty_project/__init__.py

Whitespace-only changes.

python_modules/dagster/dagster_tests/cli_tests/command_tests/definitions_command_projects/invalid_project/invalid_project/__init__.py

Whitespace-only changes.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
from dagster import Definitions, asset, define_asset_job
2+
3+
4+
@asset(name="my_asset")
5+
def my_first_asset() -> None: ...
6+
7+
8+
@asset(name="my_asset")
9+
def my_second_asset() -> None: ...
10+
11+
12+
my_job = define_asset_job(name="my_job", selection="my_asset")
13+
14+
defs = Definitions(assets=[my_first_asset, my_second_asset], jobs=[my_job])
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
[tool.dagster]
2+
module_name = "invalid_project.definitions"
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
load_from:
2+
- python_file:
3+
relative_path: invalid_project/definitions.py
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
[tool.dagster]
2+
module_name = "valid_project.definitions"
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
from dagster import Definitions, asset, define_asset_job
2+
3+
4+
@asset
5+
def my_asset() -> None: ...
6+
7+
8+
my_job = define_asset_job(name="my_job", selection="my_asset")
9+
10+
defs = Definitions(assets=[my_asset], jobs=[my_job])
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
from dagster import Definitions, asset, define_asset_job
2+
3+
4+
@asset
5+
def my_other_asset() -> None: ...
6+
7+
8+
my_other_job = define_asset_job(name="my_other_job", selection="my_other_asset")
9+
10+
defs = Definitions(assets=[my_other_asset], jobs=[my_other_job])
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
load_from:
2+
- python_file:
3+
relative_path: valid_project/definitions.py
Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
from typing import Optional, Sequence
2+
3+
import pytest
4+
from click.testing import CliRunner
5+
from dagster._cli.definitions import definitions_validate_command
6+
from dagster._utils import file_relative_path
7+
8+
EMPTY_PROJECT_PATH = file_relative_path(__file__, "definitions_command_projects/empty_project")
9+
VALID_PROJECT_PATH = file_relative_path(__file__, "definitions_command_projects/valid_project")
10+
INVALID_PROJECT_PATH = file_relative_path(__file__, "definitions_command_projects/invalid_project")
11+
12+
13+
def invoke_validate(options: Optional[Sequence[str]] = None):
14+
runner = CliRunner()
15+
return runner.invoke(definitions_validate_command, options)
16+
17+
18+
def test_empty_project(monkeypatch):
19+
with monkeypatch.context() as m:
20+
m.chdir(EMPTY_PROJECT_PATH)
21+
result = invoke_validate()
22+
assert result.exit_code == 2
23+
assert (
24+
"Error: No arguments given and no [tool.dagster] block in pyproject.toml found."
25+
in result.output
26+
)
27+
28+
29+
@pytest.mark.parametrize(
30+
"options",
31+
[
32+
[],
33+
["-f", "valid_project/definitions.py"],
34+
["-f", "valid_project/definitions.py"],
35+
["-m", "valid_project.definitions"],
36+
["-w", "workspace.yaml"],
37+
],
38+
)
39+
def test_valid_project(options, monkeypatch):
40+
with monkeypatch.context() as m:
41+
m.chdir(VALID_PROJECT_PATH)
42+
result = invoke_validate(options=options)
43+
assert result.exit_code == 0
44+
assert "Validation successful" in result.output
45+
46+
47+
def test_valid_project_with_multiple_definitions_files(monkeypatch):
48+
with monkeypatch.context() as m:
49+
m.chdir(VALID_PROJECT_PATH)
50+
options = ["-f", "valid_project/definitions.py", "-f", "valid_project/more_definitions.py"]
51+
result = invoke_validate(options=options)
52+
assert result.exit_code == 0
53+
assert "Validation successful for code location definitions.py." in result.output
54+
assert "Validation successful for code location more_definitions.py." in result.output
55+
56+
57+
@pytest.mark.parametrize(
58+
"options",
59+
[
60+
[],
61+
["-f", "invalid_project/definitions.py"],
62+
["-m", "invalid_project.definitions"],
63+
["-w", "workspace.yaml"],
64+
],
65+
)
66+
def test_invalid_project(options, monkeypatch):
67+
with monkeypatch.context() as m:
68+
m.chdir(INVALID_PROJECT_PATH)
69+
result = invoke_validate(options=options)
70+
assert result.exit_code == 1
71+
assert "Validation failed" in result.output
72+
assert "Duplicate asset key: AssetKey(['my_asset'])" in result.output

0 commit comments

Comments
 (0)