Changeset View
Changeset View
Standalone View
Standalone View
python_modules/libraries/dagster-dbt/dagster_dbt/cli/solids.py
from typing import Dict | from typing import Dict | ||||
from dagster import ( | from dagster import ( | ||||
AssetMaterialization, | AssetMaterialization, | ||||
Bool, | |||||
EventMetadataEntry, | EventMetadataEntry, | ||||
InputDefinition, | InputDefinition, | ||||
Noneable, | Noneable, | ||||
Nothing, | Nothing, | ||||
Output, | Output, | ||||
OutputDefinition, | OutputDefinition, | ||||
Permissive, | Permissive, | ||||
StringSource, | StringSource, | ||||
▲ Show 20 Lines • Show All 134 Lines • ▼ Show 20 Lines | config_schema={ | ||||
default_value=False, | default_value=False, | ||||
), | ), | ||||
"fail-fast": Field( | "fail-fast": Field( | ||||
config=bool, | config=bool, | ||||
description="Stop execution upon a first failure. (--fail-fast)", | description="Stop execution upon a first failure. (--fail-fast)", | ||||
is_required=False, | is_required=False, | ||||
default_value=False, | default_value=False, | ||||
), | ), | ||||
"yield_materializations": Field( | |||||
config=Bool, | |||||
is_required=False, | |||||
default_value=True, | |||||
description=( | |||||
"If True, materializations corresponding to the results of the dbt operation will " | |||||
"be yielded when the solid executes. Default: True" | |||||
), | |||||
), | |||||
}, | }, | ||||
tags={"kind": "dbt"}, | tags={"kind": "dbt"}, | ||||
) | ) | ||||
@experimental | @experimental | ||||
def dbt_cli_run(context) -> DbtCliOutput: | def dbt_cli_run(context) -> DbtCliOutput: | ||||
"""This solid executes ``dbt run`` via the dbt CLI.""" | """This solid executes ``dbt run`` via the dbt CLI.""" | ||||
from ..utils import generate_materializations | |||||
cli_output = execute_cli( | cli_output = execute_cli( | ||||
context.solid_config["dbt_executable"], | context.solid_config["dbt_executable"], | ||||
command=("run",), | command=("run",), | ||||
flags_dict=passthrough_flags_only( | flags_dict=passthrough_flags_only( | ||||
context.solid_config, ("threads", "models", "exclude", "full-refresh", "fail-fast") | context.solid_config, ("threads", "models", "exclude", "full-refresh", "fail-fast") | ||||
), | ), | ||||
log=context.log, | log=context.log, | ||||
warn_error=context.solid_config["warn-error"], | warn_error=context.solid_config["warn-error"], | ||||
ignore_handled_error=context.solid_config["ignore_handled_error"], | ignore_handled_error=context.solid_config["ignore_handled_error"], | ||||
) | ) | ||||
run_results = parse_run_results(context.solid_config["project-dir"]) | run_results = parse_run_results(context.solid_config["project-dir"]) | ||||
cli_output = {**run_results, **cli_output} | cli_output_dict = {**run_results, **cli_output} | ||||
cli_output = DbtCliOutput.from_dict(cli_output_dict) | |||||
if context.solid_config["yield_materializations"]: | |||||
for materialization in generate_materializations(cli_output): | |||||
yield materialization | |||||
yield AssetMaterialization( | yield AssetMaterialization( | ||||
asset_key="dbt_run_cli_output", | asset_key="dbt_run_cli_output", | ||||
description="Output from the CLI execution of `dbt run`.", | description="Output from the CLI execution of `dbt run`.", | ||||
metadata_entries=[EventMetadataEntry.json(cli_output, label="CLI Output")], | metadata_entries=[EventMetadataEntry.json(cli_output_dict, label="CLI Output")], | ||||
) | ) | ||||
yield Output(DbtCliOutput.from_dict(cli_output)) | yield Output(cli_output) | ||||
@solid( | @solid( | ||||
description="A solid to invoke dbt test via CLI.", | description="A solid to invoke dbt test via CLI.", | ||||
input_defs=[InputDefinition(name="start_after", dagster_type=Nothing)], | input_defs=[InputDefinition(name="start_after", dagster_type=Nothing)], | ||||
output_defs=[OutputDefinition(name="result", dagster_type=DbtCliOutput)], | output_defs=[OutputDefinition(name="result", dagster_type=DbtCliOutput)], | ||||
config_schema={ | config_schema={ | ||||
**CLI_CONFIG_SCHEMA, | **CLI_CONFIG_SCHEMA, | ||||
Show All 31 Lines | config_schema={ | ||||
description="The dbt models to run.", | description="The dbt models to run.", | ||||
), | ), | ||||
"exclude": Field( | "exclude": Field( | ||||
config=Noneable([str]), | config=Noneable([str]), | ||||
default_value=None, | default_value=None, | ||||
is_required=False, | is_required=False, | ||||
description="The dbt models to exclude.", | description="The dbt models to exclude.", | ||||
), | ), | ||||
"yield_materializations": Field( | |||||
config=Bool, is_required=False, default_value=True, description="FIXME" | |||||
), | |||||
}, | }, | ||||
tags={"kind": "dbt"}, | tags={"kind": "dbt"}, | ||||
) | ) | ||||
@experimental | @experimental | ||||
def dbt_cli_test(context) -> DbtCliOutput: | def dbt_cli_test(context) -> DbtCliOutput: | ||||
"""This solid executes ``dbt test`` via the dbt CLI.""" | """This solid executes ``dbt test`` via the dbt CLI.""" | ||||
cli_output = execute_cli( | cli_output = execute_cli( | ||||
context.solid_config["dbt_executable"], | context.solid_config["dbt_executable"], | ||||
command=("test",), | command=("test",), | ||||
flags_dict=passthrough_flags_only( | flags_dict=passthrough_flags_only( | ||||
context.solid_config, ("data", "schema", "fail-fast", "threads", "models", "exclude") | context.solid_config, ("data", "schema", "fail-fast", "threads", "models", "exclude") | ||||
), | ), | ||||
log=context.log, | log=context.log, | ||||
warn_error=context.solid_config["warn-error"], | warn_error=context.solid_config["warn-error"], | ||||
ignore_handled_error=context.solid_config["ignore_handled_error"], | ignore_handled_error=context.solid_config["ignore_handled_error"], | ||||
) | ) | ||||
run_results = parse_run_results(context.solid_config["project-dir"]) | run_results = parse_run_results(context.solid_config["project-dir"]) | ||||
cli_output = {**run_results, **cli_output} | cli_output = {**run_results, **cli_output} | ||||
if context.solid_config["yield_materializations"]: | |||||
yield AssetMaterialization( | yield AssetMaterialization( | ||||
asset_key="dbt_test_cli_output", | asset_key="dbt_test_cli_output", | ||||
description="Output from the CLI execution of `dbt test`.", | description="Output from the CLI execution of `dbt test`.", | ||||
metadata_entries=[EventMetadataEntry.json(cli_output, label="CLI Output")], | metadata_entries=[EventMetadataEntry.json(cli_output, label="CLI Output")], | ||||
) | ) | ||||
yield Output(DbtCliOutput.from_dict(cli_output)) | yield Output(DbtCliOutput.from_dict(cli_output)) | ||||
@solid( | @solid( | ||||
description="A solid to invoke dbt snapshot via CLI.", | description="A solid to invoke dbt snapshot via CLI.", | ||||
input_defs=[InputDefinition(name="start_after", dagster_type=Nothing)], | input_defs=[InputDefinition(name="start_after", dagster_type=Nothing)], | ||||
output_defs=[OutputDefinition(name="result", dagster_type=Dict)], | output_defs=[OutputDefinition(name="result", dagster_type=Dict)], | ||||
Show All 15 Lines | config_schema={ | ||||
description="The dbt models to run.", | description="The dbt models to run.", | ||||
), | ), | ||||
"exclude": Field( | "exclude": Field( | ||||
config=Noneable([str]), | config=Noneable([str]), | ||||
default_value=None, | default_value=None, | ||||
is_required=False, | is_required=False, | ||||
description="The dbt models to exclude.", | description="The dbt models to exclude.", | ||||
), | ), | ||||
"yield_materializations": Field( | |||||
config=Bool, is_required=False, default_value=True, description="FIXME" | |||||
), | |||||
}, | }, | ||||
tags={"kind": "dbt"}, | tags={"kind": "dbt"}, | ||||
) | ) | ||||
@experimental | @experimental | ||||
def dbt_cli_snapshot(context) -> Dict: | def dbt_cli_snapshot(context) -> Dict: | ||||
"""This solid executes ``dbt snapshot`` via the dbt CLI.""" | """This solid executes ``dbt snapshot`` via the dbt CLI.""" | ||||
cli_output = execute_cli( | cli_output = execute_cli( | ||||
context.solid_config["dbt_executable"], | context.solid_config["dbt_executable"], | ||||
command=("snapshot",), | command=("snapshot",), | ||||
flags_dict=passthrough_flags_only(context.solid_config, ("threads", "models", "exclude")), | flags_dict=passthrough_flags_only(context.solid_config, ("threads", "models", "exclude")), | ||||
log=context.log, | log=context.log, | ||||
warn_error=context.solid_config["warn-error"], | warn_error=context.solid_config["warn-error"], | ||||
ignore_handled_error=context.solid_config["ignore_handled_error"], | ignore_handled_error=context.solid_config["ignore_handled_error"], | ||||
) | ) | ||||
if context.solid_config["yield_materializations"]: | |||||
yield AssetMaterialization( | yield AssetMaterialization( | ||||
asset_key="dbt_snapshot_cli_output", | asset_key="dbt_snapshot_cli_output", | ||||
description="Output from the CLI execution of `dbt snapshot`.", | description="Output from the CLI execution of `dbt snapshot`.", | ||||
metadata_entries=[EventMetadataEntry.json(cli_output, label="CLI Output")], | metadata_entries=[EventMetadataEntry.json(cli_output, label="CLI Output")], | ||||
) | ) | ||||
yield Output(cli_output) | yield Output(cli_output) | ||||
@solid( | @solid( | ||||
description="A solid to invoke dbt run-operation via CLI.", | description="A solid to invoke dbt run-operation via CLI.", | ||||
input_defs=[InputDefinition(name="start_after", dagster_type=Nothing)], | input_defs=[InputDefinition(name="start_after", dagster_type=Nothing)], | ||||
output_defs=[OutputDefinition(name="result", dagster_type=Dict)], | output_defs=[OutputDefinition(name="result", dagster_type=Dict)], | ||||
Show All 10 Lines | config_schema={ | ||||
config=Permissive({}), | config=Permissive({}), | ||||
is_required=False, | is_required=False, | ||||
description=( | description=( | ||||
"Supply arguments to the macro. This dictionary will be mapped to the keyword " | "Supply arguments to the macro. This dictionary will be mapped to the keyword " | ||||
"arguments defined in the selected macro. This argument should be a dictionary, " | "arguments defined in the selected macro. This argument should be a dictionary, " | ||||
"eg. {'my_variable': 'my_value'}" | "eg. {'my_variable': 'my_value'}" | ||||
), | ), | ||||
), | ), | ||||
"yield_materializations": Field( | |||||
config=Bool, is_required=False, default_value=True, description="FIXME" | |||||
), | |||||
}, | }, | ||||
tags={"kind": "dbt"}, | tags={"kind": "dbt"}, | ||||
) | ) | ||||
@experimental | @experimental | ||||
def dbt_cli_run_operation(context) -> Dict: | def dbt_cli_run_operation(context) -> Dict: | ||||
"""This solid executes ``dbt run-operation`` via the dbt CLI.""" | """This solid executes ``dbt run-operation`` via the dbt CLI.""" | ||||
cli_output = execute_cli( | cli_output = execute_cli( | ||||
context.solid_config["dbt_executable"], | context.solid_config["dbt_executable"], | ||||
command=("run-operation", context.solid_config["macro"]), | command=("run-operation", context.solid_config["macro"]), | ||||
flags_dict=passthrough_flags_only(context.solid_config, ("args",)), | flags_dict=passthrough_flags_only(context.solid_config, ("args",)), | ||||
log=context.log, | log=context.log, | ||||
warn_error=context.solid_config["warn-error"], | warn_error=context.solid_config["warn-error"], | ||||
ignore_handled_error=context.solid_config["ignore_handled_error"], | ignore_handled_error=context.solid_config["ignore_handled_error"], | ||||
) | ) | ||||
if context.solid_config["yield_materializations"]: | |||||
yield AssetMaterialization( | yield AssetMaterialization( | ||||
asset_key="dbt_run_operation_cli_output", | asset_key="dbt_run_operation_cli_output", | ||||
description="Output from the CLI execution of `dbt run-operation`.", | description="Output from the CLI execution of `dbt run-operation`.", | ||||
metadata_entries=[EventMetadataEntry.json(cli_output, label="CLI Output")], | metadata_entries=[EventMetadataEntry.json(cli_output, label="CLI Output")], | ||||
) | ) | ||||
yield Output(cli_output) | yield Output(cli_output) | ||||
@solid( | @solid( | ||||
description="A solid to invoke dbt source snapshot-freshness via CLI.", | description="A solid to invoke dbt source snapshot-freshness via CLI.", | ||||
input_defs=[InputDefinition(name="start_after", dagster_type=Nothing)], | input_defs=[InputDefinition(name="start_after", dagster_type=Nothing)], | ||||
output_defs=[OutputDefinition(name="result", dagster_type=Dict)], | output_defs=[OutputDefinition(name="result", dagster_type=Dict)], | ||||
Show All 17 Lines | config_schema={ | ||||
config=Noneable(int), | config=Noneable(int), | ||||
default_value=None, | default_value=None, | ||||
is_required=False, | is_required=False, | ||||
description=( | description=( | ||||
"Specify number of threads to use while executing models. Overrides " | "Specify number of threads to use while executing models. Overrides " | ||||
"settings in profiles.yml." | "settings in profiles.yml." | ||||
), | ), | ||||
), | ), | ||||
"yield_materializations": Field( | |||||
config=Bool, is_required=False, default_value=True, description="FIXME" | |||||
), | |||||
}, | }, | ||||
tags={"kind": "dbt"}, | tags={"kind": "dbt"}, | ||||
) | ) | ||||
@experimental | @experimental | ||||
def dbt_cli_snapshot_freshness(context) -> Dict: | def dbt_cli_snapshot_freshness(context) -> Dict: | ||||
"""This solid executes ``dbt source snapshot-freshness`` via the dbt CLI.""" | """This solid executes ``dbt source snapshot-freshness`` via the dbt CLI.""" | ||||
cli_output = execute_cli( | cli_output = execute_cli( | ||||
context.solid_config["dbt_executable"], | context.solid_config["dbt_executable"], | ||||
command=("source", "snapshot-freshness"), | command=("source", "snapshot-freshness"), | ||||
flags_dict=passthrough_flags_only(context.solid_config, ("select", "output", "threads")), | flags_dict=passthrough_flags_only(context.solid_config, ("select", "output", "threads")), | ||||
log=context.log, | log=context.log, | ||||
warn_error=context.solid_config["warn-error"], | warn_error=context.solid_config["warn-error"], | ||||
ignore_handled_error=context.solid_config["ignore_handled_error"], | ignore_handled_error=context.solid_config["ignore_handled_error"], | ||||
) | ) | ||||
if context.solid_config["yield_materializations"]: | |||||
yield AssetMaterialization( | yield AssetMaterialization( | ||||
asset_key="dbt_source_snapshot-freshness_cli_output", | asset_key="dbt_source_snapshot-freshness_cli_output", | ||||
description="Output from the CLI execution of `dbt source snapshot-freshness`.", | description="Output from the CLI execution of `dbt source snapshot-freshness`.", | ||||
metadata_entries=[EventMetadataEntry.json(cli_output, label="CLI Output")], | metadata_entries=[EventMetadataEntry.json(cli_output, label="CLI Output")], | ||||
) | ) | ||||
yield Output(cli_output) | yield Output(cli_output) | ||||
@solid( | @solid( | ||||
description="A solid to invoke dbt compile via CLI.", | description="A solid to invoke dbt compile via CLI.", | ||||
input_defs=[InputDefinition(name="start_after", dagster_type=Nothing)], | input_defs=[InputDefinition(name="start_after", dagster_type=Nothing)], | ||||
output_defs=[OutputDefinition(name="result", dagster_type=Dict)], | output_defs=[OutputDefinition(name="result", dagster_type=Dict)], | ||||
▲ Show 20 Lines • Show All 49 Lines • ▼ Show 20 Lines | config_schema={ | ||||
config=bool, | config=bool, | ||||
description=( | description=( | ||||
"If specified, DBT will drop incremental models and fully-recalculate " | "If specified, DBT will drop incremental models and fully-recalculate " | ||||
"the incremental table from the model definition. (--full-refresh)" | "the incremental table from the model definition. (--full-refresh)" | ||||
), | ), | ||||
is_required=False, | is_required=False, | ||||
default_value=False, | default_value=False, | ||||
), | ), | ||||
"yield_materializations": Field( | |||||
config=Bool, is_required=False, default_value=True, description="FIXME" | |||||
), | |||||
}, | }, | ||||
tags={"kind": "dbt"}, | tags={"kind": "dbt"}, | ||||
) | ) | ||||
@experimental | @experimental | ||||
def dbt_cli_compile(context) -> Dict: | def dbt_cli_compile(context) -> Dict: | ||||
"""This solid executes ``dbt compile`` via the dbt CLI.""" | """This solid executes ``dbt compile`` via the dbt CLI.""" | ||||
cli_output = execute_cli( | cli_output = execute_cli( | ||||
context.solid_config["dbt_executable"], | context.solid_config["dbt_executable"], | ||||
Show All 11 Lines | cli_output = execute_cli( | ||||
"full-refresh", | "full-refresh", | ||||
), | ), | ||||
), | ), | ||||
log=context.log, | log=context.log, | ||||
warn_error=context.solid_config["warn-error"], | warn_error=context.solid_config["warn-error"], | ||||
ignore_handled_error=context.solid_config["ignore_handled_error"], | ignore_handled_error=context.solid_config["ignore_handled_error"], | ||||
) | ) | ||||
if context.solid_config["yield_materializations"]: | |||||
yield AssetMaterialization( | yield AssetMaterialization( | ||||
asset_key="dbt_compile_cli_output", | asset_key="dbt_compile_cli_output", | ||||
description="Output from the CLI execution of `dbt compile`.", | description="Output from the CLI execution of `dbt compile`.", | ||||
metadata_entries=[EventMetadataEntry.json(cli_output, label="CLI Output")], | metadata_entries=[EventMetadataEntry.json(cli_output, label="CLI Output")], | ||||
) | ) | ||||
yield Output(cli_output) | yield Output(cli_output) |