Changeset View
Changeset View
Standalone View
Standalone View
docs/content/integrations/dbt.mdx
Show First 20 Lines • Show All 43 Lines • ▼ Show 20 Lines | |||||
**Example:** The solid `dbt_cli_run` is configured to run specific models in your dbt project. This is similar to invoking `dbt run --models tag:staging`. | **Example:** The solid `dbt_cli_run` is configured to run specific models in your dbt project. This is similar to invoking `dbt run --models tag:staging`. | ||||
```python file=/integrations/dbt.py startafter=start_marker_dbt_cli_run_specific_models endbefore=end_marker_dbt_cli_run_specific_models dedent=4 | ```python file=/integrations/dbt.py startafter=start_marker_dbt_cli_run_specific_models endbefore=end_marker_dbt_cli_run_specific_models dedent=4 | ||||
from dagster import pipeline | from dagster import pipeline | ||||
from dagster_dbt import dbt_cli_run | from dagster_dbt import dbt_cli_run | ||||
config = {"project-dir": "path/to/dbt/project", "models": ["tag:staging"]} | config = {"project-dir": "path/to/dbt/project", "models": ["tag:staging"]} | ||||
run_staging_models = dbt_cli_run.configured(config, name="run_staging_models") | run_staging_models = dbt_cli_run.configured( | ||||
config, name="run_staging_models" | |||||
) | |||||
@pipeline | @pipeline | ||||
def my_dbt_pipeline(): | def my_dbt_pipeline(): | ||||
run_staging_models() | run_staging_models() | ||||
``` | ``` | ||||
In the code snippet above, the config `"models"` takes a list of strings. The string `"tag:staging"` uses [dbt's node selection syntax](https://docs.getdbt.com/reference/node-selection/syntax) to filter models with the tag `"staging"`. For more details, [visit the official dbt documentation on the node selection syntax](https://docs.getdbt.com/reference/node-selection/syntax). | In the code snippet above, the config `"models"` takes a list of strings. The string `"tag:staging"` uses [dbt's node selection syntax](https://docs.getdbt.com/reference/node-selection/syntax) to filter models with the tag `"staging"`. For more details, [visit the official dbt documentation on the node selection syntax](https://docs.getdbt.com/reference/node-selection/syntax). | ||||
**Example:** Similar to the above example, the solid dbt_cli_run is specified to run after another solid has executed. In your pipeline definition, you can assign the output from an upstream solid to the `start_after` argument of your dbt solid, which will model that dependency relationship for you. See the documentation for [Order Based Dependencies](/concepts/solids-pipelines/pipelines#order-based-dependencies-nothing-dependencies) for more info. | **Example:** Similar to the above example, the solid dbt_cli_run is specified to run after another solid has executed. In your pipeline definition, you can assign the output from an upstream solid to the `start_after` argument of your dbt solid, which will model that dependency relationship for you. See the documentation for [Order Based Dependencies](/concepts/solids-pipelines/pipelines#order-based-dependencies-nothing-dependencies) for more info. | ||||
```python file=/integrations/dbt.py startafter=start_marker_dbt_cli_run_after_another_solid endbefore=end_marker_dbt_cli_run_after_another_solid dedent=4 | ```python file=/integrations/dbt.py startafter=start_marker_dbt_cli_run_after_another_solid endbefore=end_marker_dbt_cli_run_after_another_solid dedent=4 | ||||
from dagster import pipeline, solid | from dagster import pipeline, solid | ||||
from dagster_dbt import dbt_cli_run | from dagster_dbt import dbt_cli_run | ||||
config = {"project-dir": "path/to/dbt/project", "models": ["tag:staging"]} | config = {"project-dir": "path/to/dbt/project", "models": ["tag:staging"]} | ||||
run_staging_models = dbt_cli_run.configured(config, name="run_staging_models") | run_staging_models = dbt_cli_run.configured( | ||||
config, name="run_staging_models" | |||||
) | |||||
@solid | @solid | ||||
def do_something(context): | def do_something(context): | ||||
# solid logic here | # solid logic here | ||||
context.log.info("Executing necessary logic before dbt run") | context.log.info("Executing necessary logic before dbt run") | ||||
@pipeline | @pipeline | ||||
def my_dbt_pipeline(): | def my_dbt_pipeline(): | ||||
Show All 10 Lines | |||||
Your dbt RPC server can be running locally or remotely. To use the dbt RPC solids in your Dagster pipeline, you will need to create a resource for your dbt RPC server. To learn more about Dagster resources, visit the [Resources Overview](/concepts/modes-resources). | Your dbt RPC server can be running locally or remotely. To use the dbt RPC solids in your Dagster pipeline, you will need to create a resource for your dbt RPC server. To learn more about Dagster resources, visit the [Resources Overview](/concepts/modes-resources). | ||||
`dagster_dbt.dbt_rpc_resource` can be configured with your specific host and port. | `dagster_dbt.dbt_rpc_resource` can be configured with your specific host and port. | ||||
```python file=/integrations/dbt.py startafter=start_marker_dbt_rpc_resource endbefore=end_marker_dbt_rpc_resource dedent=4 | ```python file=/integrations/dbt.py startafter=start_marker_dbt_rpc_resource endbefore=end_marker_dbt_rpc_resource dedent=4 | ||||
from dagster_dbt import dbt_rpc_resource | from dagster_dbt import dbt_rpc_resource | ||||
my_remote_rpc = dbt_rpc_resource.configured({"host": "80.80.80.80", "port": 8080}) | my_remote_rpc = dbt_rpc_resource.configured( | ||||
{"host": "80.80.80.80", "port": 8080} | |||||
) | |||||
``` | ``` | ||||
For convenience during local development, you may also use `dagster_dbt.local_dbt_rpc_resource`, which is preconfigured for a dbt RPC server that is running on `http://localhost:8580`. | For convenience during local development, you may also use `dagster_dbt.local_dbt_rpc_resource`, which is preconfigured for a dbt RPC server that is running on `http://localhost:8580`. | ||||
Here are some examples of how to send dbt commands to a dbt RPC server with a solid. | Here are some examples of how to send dbt commands to a dbt RPC server with a solid. | ||||
**Example:** The solid `dbt_rpc_run` will send a request to run your entire dbt project when you don't use any solid configuration. | **Example:** The solid `dbt_rpc_run` will send a request to run your entire dbt project when you don't use any solid configuration. | ||||
```python file=/integrations/dbt.py startafter=start_marker_dbt_rpc_run endbefore=end_marker_dbt_rpc_run dedent=4 | ```python file=/integrations/dbt.py startafter=start_marker_dbt_rpc_run endbefore=end_marker_dbt_rpc_run dedent=4 | ||||
from dagster import ModeDefinition, pipeline | from dagster import ModeDefinition, pipeline | ||||
from dagster_dbt import dbt_rpc_run | from dagster_dbt import dbt_rpc_run | ||||
@pipeline(mode_defs=[ModeDefinition(resource_defs={"dbt_rpc": my_remote_rpc})]) | @pipeline( | ||||
mode_defs=[ModeDefinition(resource_defs={"dbt_rpc": my_remote_rpc})] | |||||
) | |||||
def my_dbt_pipeline(): | def my_dbt_pipeline(): | ||||
dbt_rpc_run() | dbt_rpc_run() | ||||
``` | ``` | ||||
The code snippet above shows a Dagster pipeline with a single solid `dbt_rpc_run`. The solid `dbt_rpc_run` has a required resource key `"dbt_rpc"`. So, any pipeline that uses `dbt_rpc_run` will need a [ModeDefinition](/concepts/modes-resources) that defines a resource under the key `"dbt_rpc"`. | The code snippet above shows a Dagster pipeline with a single solid `dbt_rpc_run`. The solid `dbt_rpc_run` has a required resource key `"dbt_rpc"`. So, any pipeline that uses `dbt_rpc_run` will need a [ModeDefinition](/concepts/modes-resources) that defines a resource under the key `"dbt_rpc"`. | ||||
**Example:** The solid `dbt_rpc_run` is configured to run specific models in a dbt project. This is similar to having `"params": {"models": "tag:staging"}` in your dbt RPC request body. | **Example:** The solid `dbt_rpc_run` is configured to run specific models in a dbt project. This is similar to having `"params": {"models": "tag:staging"}` in your dbt RPC request body. | ||||
```python file=/integrations/dbt.py startafter=start_marker_dbt_rpc_run_specific_models endbefore=end_marker_dbt_rpc_run_specific_models dedent=4 | ```python file=/integrations/dbt.py startafter=start_marker_dbt_rpc_run_specific_models endbefore=end_marker_dbt_rpc_run_specific_models dedent=4 | ||||
from dagster import ModeDefinition, pipeline | from dagster import ModeDefinition, pipeline | ||||
from dagster_dbt import dbt_rpc_run | from dagster_dbt import dbt_rpc_run | ||||
run_staging_models = dbt_rpc_run.configured( | run_staging_models = dbt_rpc_run.configured( | ||||
{"models": ["tag:staging"]}, | {"models": ["tag:staging"]}, | ||||
name="run_staging_models", | name="run_staging_models", | ||||
) | ) | ||||
@pipeline(mode_defs=[ModeDefinition(resource_defs={"dbt_rpc": my_remote_rpc})]) | @pipeline( | ||||
mode_defs=[ModeDefinition(resource_defs={"dbt_rpc": my_remote_rpc})] | |||||
) | |||||
def my_dbt_pipeline(): | def my_dbt_pipeline(): | ||||
run_staging_models() | run_staging_models() | ||||
``` | ``` | ||||
Note that the solid above will NOT wait until the dbt RPC server has finished executing your request. Instead, it will return immediately with a request token from the dbt RPC server. If you want the solid to wait until execution is finished, see the `dagster_dbt.dbt_rpc_run_and_wait`. | Note that the solid above will NOT wait until the dbt RPC server has finished executing your request. Instead, it will return immediately with a request token from the dbt RPC server. If you want the solid to wait until execution is finished, see the `dagster_dbt.dbt_rpc_run_and_wait`. | ||||
**Example:** The solid `dbt_rpc_run_and_wait` will send a request to run specific models in a dbt project and then poll the dbt RPC server until it has finished executing your request. | **Example:** The solid `dbt_rpc_run_and_wait` will send a request to run specific models in a dbt project and then poll the dbt RPC server until it has finished executing your request. | ||||
```python file=/integrations/dbt.py startafter=start_marker_dbt_rpc_run_and_wait endbefore=end_marker_dbt_rpc_run_and_wait dedent=4 | ```python file=/integrations/dbt.py startafter=start_marker_dbt_rpc_run_and_wait endbefore=end_marker_dbt_rpc_run_and_wait dedent=4 | ||||
from dagster import ModeDefinition, pipeline | from dagster import ModeDefinition, pipeline | ||||
from dagster_dbt import dbt_rpc_run_and_wait | from dagster_dbt import dbt_rpc_run_and_wait | ||||
@pipeline(mode_defs=[ModeDefinition(resource_defs={"dbt_rpc": my_remote_rpc})]) | @pipeline( | ||||
mode_defs=[ModeDefinition(resource_defs={"dbt_rpc": my_remote_rpc})] | |||||
) | |||||
def my_dbt_pipeline(): | def my_dbt_pipeline(): | ||||
dbt_rpc_run_and_wait() | dbt_rpc_run_and_wait() | ||||
``` | ``` | ||||
## Use dbt Cloud in a Dagster pipeline | ## Use dbt Cloud in a Dagster pipeline | ||||
`dagster_dbt` currently does not provide solids for invoking dbt commands via dbt Cloud. However, this use case is possible by writing your own solid to create and start Jobs via [the dbt Cloud API](https://docs.getdbt.com/docs/dbt-cloud/cloud-api). For more details about each HTTP endpoint, [visit the official documentation for the dbt Cloud API](https://docs.getdbt.com/dbt-cloud/api). | `dagster_dbt` currently does not provide solids for invoking dbt commands via dbt Cloud. However, this use case is possible by writing your own solid to create and start Jobs via [the dbt Cloud API](https://docs.getdbt.com/docs/dbt-cloud/cloud-api). For more details about each HTTP endpoint, [visit the official documentation for the dbt Cloud API](https://docs.getdbt.com/dbt-cloud/api). | ||||
▲ Show 20 Lines • Show All 67 Lines • ▼ Show 20 Lines | |||||
custom_solid = dbt_cli_run.configured(config, name="custom_solid") | custom_solid = dbt_cli_run.configured(config, name="custom_solid") | ||||
``` | ``` | ||||
**dbt RPC: Configure a remote dbt RPC resource** | **dbt RPC: Configure a remote dbt RPC resource** | ||||
```python file=/integrations/dbt.py startafter=start_marker_dbt_rpc_resource_example endbefore=end_marker_dbt_rpc_resource_example dedent=4 | ```python file=/integrations/dbt.py startafter=start_marker_dbt_rpc_resource_example endbefore=end_marker_dbt_rpc_resource_example dedent=4 | ||||
from dagster_dbt import dbt_rpc_resource | from dagster_dbt import dbt_rpc_resource | ||||
custom_resource = dbt_rpc_resource.configured({"host": HOST, "post": PORT}) | custom_resource = dbt_rpc_resource.configured( | ||||
{"host": HOST, "post": PORT} | |||||
) | |||||
``` | ``` | ||||
**dbt RPC: Select specific models to run** | **dbt RPC: Select specific models to run** | ||||
```python file=/integrations/dbt.py startafter=start_marker_dbt_rpc_config_select_models endbefore=end_marker_dbt_rpc_config_select_models dedent=4 | ```python file=/integrations/dbt.py startafter=start_marker_dbt_rpc_config_select_models endbefore=end_marker_dbt_rpc_config_select_models dedent=4 | ||||
config = {"models": ["my_dbt_model+", "path.to.models", "tag:nightly"]} | config = {"models": ["my_dbt_model+", "path.to.models", "tag:nightly"]} | ||||
from dagster_dbt import dbt_rpc_run | from dagster_dbt import dbt_rpc_run | ||||
▲ Show 20 Lines • Show All 54 Lines • Show Last 20 Lines |