Differential D8684 Diff 40813 examples/docs_snippets/docs_snippets/legacy/dagster_pandas_guide/shape_constrained_pipeline.py
Changeset View
Changeset View
Standalone View
Standalone View
examples/docs_snippets/docs_snippets/legacy/dagster_pandas_guide/shape_constrained_pipeline.py
from datetime import datetime | from datetime import datetime | ||||
from dagster import OutputDefinition, pipeline, solid | from dagster import OutputDefinition, pipeline, solid | ||||
from dagster.utils import script_relative_path | from dagster.utils import script_relative_path | ||||
from dagster_pandas import RowCountConstraint, create_dagster_pandas_dataframe_type | from dagster_pandas import ( | ||||
RowCountConstraint, | |||||
create_dagster_pandas_dataframe_type, | |||||
) | |||||
from pandas import DataFrame, read_csv | from pandas import DataFrame, read_csv | ||||
# start_create_type | # start_create_type | ||||
ShapeConstrainedTripDataFrame = create_dagster_pandas_dataframe_type( | ShapeConstrainedTripDataFrame = create_dagster_pandas_dataframe_type( | ||||
name="ShapeConstrainedTripDataFrame", dataframe_constraints=[RowCountConstraint(4)] | name="ShapeConstrainedTripDataFrame", | ||||
dataframe_constraints=[RowCountConstraint(4)], | |||||
) | ) | ||||
# end_create_type | # end_create_type | ||||
@solid( | @solid( | ||||
output_defs=[ | output_defs=[ | ||||
OutputDefinition( | OutputDefinition( | ||||
name="shape_constrained_trip_dataframe", dagster_type=ShapeConstrainedTripDataFrame | name="shape_constrained_trip_dataframe", | ||||
dagster_type=ShapeConstrainedTripDataFrame, | |||||
) | ) | ||||
] | ] | ||||
) | ) | ||||
def load_shape_constrained_trip_dataframe() -> DataFrame: | def load_shape_constrained_trip_dataframe() -> DataFrame: | ||||
return read_csv( | return read_csv( | ||||
script_relative_path("./ebike_trips.csv"), | script_relative_path("./ebike_trips.csv"), | ||||
parse_dates=["start_time", "end_time"], | parse_dates=["start_time", "end_time"], | ||||
date_parser=lambda x: datetime.strptime(x, "%Y-%m-%d %H:%M:%S.%f"), | date_parser=lambda x: datetime.strptime(x, "%Y-%m-%d %H:%M:%S.%f"), | ||||
) | ) | ||||
@pipeline | @pipeline | ||||
def shape_constrained_pipeline(): | def shape_constrained_pipeline(): | ||||
load_shape_constrained_trip_dataframe() | load_shape_constrained_trip_dataframe() |