Changeset View
Changeset View
Standalone View
Standalone View
docs/content/concepts/solids-pipelines/pipelines.mdx
Show First 20 Lines • Show All 332 Lines • ▼ Show 20 Lines | |||||
from typing import List | from typing import List | ||||
from dagster import Field, pipeline, solid | from dagster import Field, pipeline, solid | ||||
from dagster.experimental import DynamicOutput, DynamicOutputDefinition | from dagster.experimental import DynamicOutput, DynamicOutputDefinition | ||||
from dagster.utils import file_relative_path | from dagster.utils import file_relative_path | ||||
@solid( | @solid( | ||||
config_schema={"path": Field(str, default_value=file_relative_path(__file__, "sample"))}, | config_schema={ | ||||
"path": Field( | |||||
str, default_value=file_relative_path(__file__, "sample") | |||||
) | |||||
}, | |||||
output_defs=[DynamicOutputDefinition(str)], | output_defs=[DynamicOutputDefinition(str)], | ||||
) | ) | ||||
def files_in_directory(context): | def files_in_directory(context): | ||||
path = context.solid_config["path"] | path = context.solid_config["path"] | ||||
dirname, _, filenames = next(os.walk(path)) | dirname, _, filenames = next(os.walk(path)) | ||||
for file in filenames: | for file in filenames: | ||||
yield DynamicOutput( | yield DynamicOutput( | ||||
value=os.path.join(dirname, file), | value=os.path.join(dirname, file), | ||||
Show All 29 Lines | |||||
If you need to model an explicit ordering dependency, you can use the <PyObject object="Nothing"/> Dagster type on the input definition of the downstream solid. This type specifies that you are passing "nothing" via Dagster between the solids, while still uses inputs and outputs to model the dependency between the two solids. | If you need to model an explicit ordering dependency, you can use the <PyObject object="Nothing"/> Dagster type on the input definition of the downstream solid. This type specifies that you are passing "nothing" via Dagster between the solids, while still uses inputs and outputs to model the dependency between the two solids. | ||||
```python file=/concepts/solids_pipelines/order_based_dependency_pipeline.py startafter=start_marker endbefore=end_marker | ```python file=/concepts/solids_pipelines/order_based_dependency_pipeline.py startafter=start_marker endbefore=end_marker | ||||
from dagster import InputDefinition, Nothing, pipeline, solid | from dagster import InputDefinition, Nothing, pipeline, solid | ||||
@solid | @solid | ||||
def create_table_1() -> Nothing: | def create_table_1() -> Nothing: | ||||
get_database_connection().execute("create table_1 as select * from some_source_table") | get_database_connection().execute( | ||||
"create table_1 as select * from some_source_table" | |||||
) | |||||
@solid(input_defs=[InputDefinition("start", Nothing)]) | @solid(input_defs=[InputDefinition("start", Nothing)]) | ||||
def create_table_2(): | def create_table_2(): | ||||
get_database_connection().execute("create table_2 as select * from table_1") | get_database_connection().execute( | ||||
"create table_2 as select * from table_1" | |||||
) | |||||
@pipeline | @pipeline | ||||
def nothing_dependency_pipeline(): | def nothing_dependency_pipeline(): | ||||
create_table_2(create_table_1()) | create_table_2(create_table_1()) | ||||
``` | ``` | ||||
In this example, `create_table_1` returns an output of type <PyObject object="Nothing"/>, and `create_table_2` has an input of type `Nothing`. This lets us connect them in the pipeline definition so that `create_table_2` executes only after `create_table_1` successfully executes. | In this example, `create_table_1` returns an output of type <PyObject object="Nothing"/>, and `create_table_2` has an input of type `Nothing`. This lets us connect them in the pipeline definition so that `create_table_2` executes only after `create_table_1` successfully executes. | ||||
▲ Show 20 Lines • Show All 105 Lines • Show Last 20 Lines |