Changeset View
Changeset View
Standalone View
Standalone View
examples/docs_snippets/docs_snippets/integrations/dbt.py
Show All 16 Lines | |||||
def scope_dbt_cli_run_specific_models(): | def scope_dbt_cli_run_specific_models(): | ||||
# start_marker_dbt_cli_run_specific_models | # start_marker_dbt_cli_run_specific_models | ||||
from dagster import pipeline | from dagster import pipeline | ||||
from dagster_dbt import dbt_cli_run | from dagster_dbt import dbt_cli_run | ||||
config = {"project-dir": "path/to/dbt/project", "models": ["tag:staging"]} | config = {"project-dir": "path/to/dbt/project", "models": ["tag:staging"]} | ||||
run_staging_models = dbt_cli_run.configured(config, name="run_staging_models") | run_staging_models = dbt_cli_run.configured( | ||||
config, name="run_staging_models" | |||||
) | |||||
@pipeline | @pipeline | ||||
def my_dbt_pipeline(): | def my_dbt_pipeline(): | ||||
run_staging_models() | run_staging_models() | ||||
# end_marker_dbt_cli_run_specific_models | # end_marker_dbt_cli_run_specific_models | ||||
def scope_dbt_cli_run_after_another_solid(): | def scope_dbt_cli_run_after_another_solid(): | ||||
# start_marker_dbt_cli_run_after_another_solid | # start_marker_dbt_cli_run_after_another_solid | ||||
from dagster import pipeline, solid | from dagster import pipeline, solid | ||||
from dagster_dbt import dbt_cli_run | from dagster_dbt import dbt_cli_run | ||||
config = {"project-dir": "path/to/dbt/project", "models": ["tag:staging"]} | config = {"project-dir": "path/to/dbt/project", "models": ["tag:staging"]} | ||||
run_staging_models = dbt_cli_run.configured(config, name="run_staging_models") | run_staging_models = dbt_cli_run.configured( | ||||
config, name="run_staging_models" | |||||
) | |||||
@solid | @solid | ||||
def do_something(context): | def do_something(context): | ||||
# solid logic here | # solid logic here | ||||
context.log.info("Executing necessary logic before dbt run") | context.log.info("Executing necessary logic before dbt run") | ||||
@pipeline | @pipeline | ||||
def my_dbt_pipeline(): | def my_dbt_pipeline(): | ||||
run_staging_models(start_after=do_something()) | run_staging_models(start_after=do_something()) | ||||
# end_marker_dbt_cli_run_after_another_solid | # end_marker_dbt_cli_run_after_another_solid | ||||
def scope_dbt_rpc_resource(): | def scope_dbt_rpc_resource(): | ||||
# start_marker_dbt_rpc_resource | # start_marker_dbt_rpc_resource | ||||
from dagster_dbt import dbt_rpc_resource | from dagster_dbt import dbt_rpc_resource | ||||
my_remote_rpc = dbt_rpc_resource.configured({"host": "80.80.80.80", "port": 8080}) | my_remote_rpc = dbt_rpc_resource.configured( | ||||
{"host": "80.80.80.80", "port": 8080} | |||||
) | |||||
# end_marker_dbt_rpc_resource | # end_marker_dbt_rpc_resource | ||||
def scope_dbt_rpc_run(): | def scope_dbt_rpc_run(): | ||||
from dagster_dbt import dbt_rpc_resource | from dagster_dbt import dbt_rpc_resource | ||||
my_remote_rpc = dbt_rpc_resource.configured({"host": "80.80.80.80", "port": 8080}) | my_remote_rpc = dbt_rpc_resource.configured( | ||||
{"host": "80.80.80.80", "port": 8080} | |||||
) | |||||
# start_marker_dbt_rpc_run | # start_marker_dbt_rpc_run | ||||
from dagster import ModeDefinition, pipeline | from dagster import ModeDefinition, pipeline | ||||
from dagster_dbt import dbt_rpc_run | from dagster_dbt import dbt_rpc_run | ||||
@pipeline(mode_defs=[ModeDefinition(resource_defs={"dbt_rpc": my_remote_rpc})]) | @pipeline( | ||||
mode_defs=[ModeDefinition(resource_defs={"dbt_rpc": my_remote_rpc})] | |||||
) | |||||
def my_dbt_pipeline(): | def my_dbt_pipeline(): | ||||
dbt_rpc_run() | dbt_rpc_run() | ||||
# end_marker_dbt_rpc_run | # end_marker_dbt_rpc_run | ||||
def scope_dbt_rpc_run_specific_models(): | def scope_dbt_rpc_run_specific_models(): | ||||
from dagster_dbt import dbt_rpc_resource | from dagster_dbt import dbt_rpc_resource | ||||
my_remote_rpc = dbt_rpc_resource.configured({"host": "80.80.80.80", "port": 8080}) | my_remote_rpc = dbt_rpc_resource.configured( | ||||
{"host": "80.80.80.80", "port": 8080} | |||||
) | |||||
# start_marker_dbt_rpc_run_specific_models | # start_marker_dbt_rpc_run_specific_models | ||||
from dagster import ModeDefinition, pipeline | from dagster import ModeDefinition, pipeline | ||||
from dagster_dbt import dbt_rpc_run | from dagster_dbt import dbt_rpc_run | ||||
run_staging_models = dbt_rpc_run.configured( | run_staging_models = dbt_rpc_run.configured( | ||||
{"models": ["tag:staging"]}, | {"models": ["tag:staging"]}, | ||||
name="run_staging_models", | name="run_staging_models", | ||||
) | ) | ||||
@pipeline(mode_defs=[ModeDefinition(resource_defs={"dbt_rpc": my_remote_rpc})]) | @pipeline( | ||||
mode_defs=[ModeDefinition(resource_defs={"dbt_rpc": my_remote_rpc})] | |||||
) | |||||
def my_dbt_pipeline(): | def my_dbt_pipeline(): | ||||
run_staging_models() | run_staging_models() | ||||
# end_marker_dbt_rpc_run_specific_models | # end_marker_dbt_rpc_run_specific_models | ||||
def scope_dbt_rpc_run_and_wait(): | def scope_dbt_rpc_run_and_wait(): | ||||
from dagster_dbt import dbt_rpc_resource | from dagster_dbt import dbt_rpc_resource | ||||
my_remote_rpc = dbt_rpc_resource.configured({"host": "80.80.80.80", "port": 8080}) | my_remote_rpc = dbt_rpc_resource.configured( | ||||
{"host": "80.80.80.80", "port": 8080} | |||||
) | |||||
# start_marker_dbt_rpc_run_and_wait | # start_marker_dbt_rpc_run_and_wait | ||||
from dagster import ModeDefinition, pipeline | from dagster import ModeDefinition, pipeline | ||||
from dagster_dbt import dbt_rpc_run_and_wait | from dagster_dbt import dbt_rpc_run_and_wait | ||||
@pipeline(mode_defs=[ModeDefinition(resource_defs={"dbt_rpc": my_remote_rpc})]) | @pipeline( | ||||
mode_defs=[ModeDefinition(resource_defs={"dbt_rpc": my_remote_rpc})] | |||||
) | |||||
def my_dbt_pipeline(): | def my_dbt_pipeline(): | ||||
dbt_rpc_run_and_wait() | dbt_rpc_run_and_wait() | ||||
# end_marker_dbt_rpc_run_and_wait | # end_marker_dbt_rpc_run_and_wait | ||||
def scope_dbt_cli_config_profile_and_target(): | def scope_dbt_cli_config_profile_and_target(): | ||||
PROFILE_NAME, TARGET_NAME = "", "" | PROFILE_NAME, TARGET_NAME = "", "" | ||||
▲ Show 20 Lines • Show All 57 Lines • ▼ Show 20 Lines | def scope_dbt_cli_config_disable_assets(): | ||||
# end_marker_dbt_cli_config_disable_assets | # end_marker_dbt_cli_config_disable_assets | ||||
def scope_dbt_rpc_resource_example(): | def scope_dbt_rpc_resource_example(): | ||||
HOST, PORT = "", "" | HOST, PORT = "", "" | ||||
# start_marker_dbt_rpc_resource_example | # start_marker_dbt_rpc_resource_example | ||||
from dagster_dbt import dbt_rpc_resource | from dagster_dbt import dbt_rpc_resource | ||||
custom_resource = dbt_rpc_resource.configured({"host": HOST, "post": PORT}) | custom_resource = dbt_rpc_resource.configured( | ||||
{"host": HOST, "post": PORT} | |||||
) | |||||
# end_marker_dbt_rpc_resource_example | # end_marker_dbt_rpc_resource_example | ||||
def scope_dbt_rpc_config_select_models(): | def scope_dbt_rpc_config_select_models(): | ||||
# start_marker_dbt_rpc_config_select_models | # start_marker_dbt_rpc_config_select_models | ||||
config = {"models": ["my_dbt_model+", "path.to.models", "tag:nightly"]} | config = {"models": ["my_dbt_model+", "path.to.models", "tag:nightly"]} | ||||
from dagster_dbt import dbt_rpc_run | from dagster_dbt import dbt_rpc_run | ||||
Show All 33 Lines |