Changeset View
Changeset View
Standalone View
Standalone View
docs/content/integrations/pandas.mdx
Show All 14 Lines | |||||
To create a custom `dagster_pandas` type, use `create_dagster_pandas_dataframe_type` and provide a list of `PandasColumn` objects which specify column-level schema and constraints. For example, we can construct a custom dataframe type to represent a set of e-bike trips in the following way: | To create a custom `dagster_pandas` type, use `create_dagster_pandas_dataframe_type` and provide a list of `PandasColumn` objects which specify column-level schema and constraints. For example, we can construct a custom dataframe type to represent a set of e-bike trips in the following way: | ||||
```python file=/legacy/dagster_pandas_guide/core_trip_pipeline.py startafter=start_core_trip_pipeline_marker_0 endbefore=end_core_trip_pipeline_marker_0 | ```python file=/legacy/dagster_pandas_guide/core_trip_pipeline.py startafter=start_core_trip_pipeline_marker_0 endbefore=end_core_trip_pipeline_marker_0 | ||||
TripDataFrame = create_dagster_pandas_dataframe_type( | TripDataFrame = create_dagster_pandas_dataframe_type( | ||||
name="TripDataFrame", | name="TripDataFrame", | ||||
columns=[ | columns=[ | ||||
PandasColumn.integer_column("bike_id", min_value=0), | PandasColumn.integer_column("bike_id", min_value=0), | ||||
PandasColumn.categorical_column("color", categories={"red", "green", "blue"}), | PandasColumn.categorical_column( | ||||
"color", categories={"red", "green", "blue"} | |||||
), | |||||
PandasColumn.datetime_column( | PandasColumn.datetime_column( | ||||
"start_time", min_datetime=datetime(year=2020, month=2, day=10) | "start_time", min_datetime=datetime(year=2020, month=2, day=10) | ||||
), | ), | ||||
PandasColumn.datetime_column("end_time", min_datetime=datetime(year=2020, month=2, day=10)), | PandasColumn.datetime_column( | ||||
"end_time", min_datetime=datetime(year=2020, month=2, day=10) | |||||
), | |||||
PandasColumn.string_column("station"), | PandasColumn.string_column("station"), | ||||
PandasColumn.exists("amount_paid"), | PandasColumn.exists("amount_paid"), | ||||
PandasColumn.boolean_column("was_member"), | PandasColumn.boolean_column("was_member"), | ||||
], | ], | ||||
) | ) | ||||
``` | ``` | ||||
Once our custom data type is defined, we can use it as the type declaration for the inputs / outputs of our solid: | Once our custom data type is defined, we can use it as the type declaration for the inputs / outputs of our solid: | ||||
```python file=/legacy/dagster_pandas_guide/core_trip_pipeline.py startafter=start_core_trip_pipeline_marker_1 endbefore=end_core_trip_pipeline_marker_1 | ```python file=/legacy/dagster_pandas_guide/core_trip_pipeline.py startafter=start_core_trip_pipeline_marker_1 endbefore=end_core_trip_pipeline_marker_1 | ||||
@solid(output_defs=[OutputDefinition(name="trip_dataframe", dagster_type=TripDataFrame)]) | @solid( | ||||
output_defs=[ | |||||
OutputDefinition(name="trip_dataframe", dagster_type=TripDataFrame) | |||||
] | |||||
) | |||||
def load_trip_dataframe() -> DataFrame: | def load_trip_dataframe() -> DataFrame: | ||||
return read_csv( | return read_csv( | ||||
script_relative_path("./ebike_trips.csv"), | script_relative_path("./ebike_trips.csv"), | ||||
parse_dates=["start_time", "end_time"], | parse_dates=["start_time", "end_time"], | ||||
date_parser=lambda x: datetime.strptime(x, "%Y-%m-%d %H:%M:%S.%f"), | date_parser=lambda x: datetime.strptime(x, "%Y-%m-%d %H:%M:%S.%f"), | ||||
) | ) | ||||
``` | ``` | ||||
Show All 11 Lines | |||||
Now that we have a custom dataframe type that performs schema validation during a pipeline run, we can express dataframe level constraints (e.g number of rows, or columns). | Now that we have a custom dataframe type that performs schema validation during a pipeline run, we can express dataframe level constraints (e.g number of rows, or columns). | ||||
To do this, we provide a list of dataframe constraints to `create_dagster_pandas_dataframe_type`; for example, using `RowCountConstraint`. More information on the available constraints can be found in the `dagster_pandas` [API docs](/\_apidocs/libraries/dagster_pandas). | To do this, we provide a list of dataframe constraints to `create_dagster_pandas_dataframe_type`; for example, using `RowCountConstraint`. More information on the available constraints can be found in the `dagster_pandas` [API docs](/\_apidocs/libraries/dagster_pandas). | ||||
This looks like: | This looks like: | ||||
```python file=/legacy/dagster_pandas_guide/shape_constrained_pipeline.py startafter=start_create_type endbefore=end_create_type | ```python file=/legacy/dagster_pandas_guide/shape_constrained_pipeline.py startafter=start_create_type endbefore=end_create_type | ||||
ShapeConstrainedTripDataFrame = create_dagster_pandas_dataframe_type( | ShapeConstrainedTripDataFrame = create_dagster_pandas_dataframe_type( | ||||
name="ShapeConstrainedTripDataFrame", dataframe_constraints=[RowCountConstraint(4)] | name="ShapeConstrainedTripDataFrame", | ||||
dataframe_constraints=[RowCountConstraint(4)], | |||||
) | ) | ||||
``` | ``` | ||||
If we rerun the above example with this dataframe, nothing should change. However, if we pass in 100 to the row count constraint, we can watch our pipeline fail that type check. | If we rerun the above example with this dataframe, nothing should change. However, if we pass in 100 to the row count constraint, we can watch our pipeline fail that type check. | ||||
## Dagster DataFrame Summary Statistics | ## Dagster DataFrame Summary Statistics | ||||
Aside from constraint validation, `create_dagster_pandas_dataframe_type` also takes in a summary statistics function that emits `EventMetadataEntry` objects which are surfaced during pipeline runs. Since data systems seldom control the quality of the data they receive, it becomes important to monitor data as it flows through your systems. In complex pipelines, this can help debug and monitor data drift over time. Let's illustrate how this works in our example: | Aside from constraint validation, `create_dagster_pandas_dataframe_type` also takes in a summary statistics function that emits `EventMetadataEntry` objects which are surfaced during pipeline runs. Since data systems seldom control the quality of the data they receive, it becomes important to monitor data as it flows through your systems. In complex pipelines, this can help debug and monitor data drift over time. Let's illustrate how this works in our example: | ||||
```python file=/legacy/dagster_pandas_guide/summary_stats_pipeline.py startafter=start_summary endbefore=end_summary | ```python file=/legacy/dagster_pandas_guide/summary_stats_pipeline.py startafter=start_summary endbefore=end_summary | ||||
def compute_trip_dataframe_summary_statistics(dataframe): | def compute_trip_dataframe_summary_statistics(dataframe): | ||||
return { | return { | ||||
"min_start_time": min(dataframe["start_time"]).strftime("%Y-%m-%d"), | "min_start_time": min(dataframe["start_time"]).strftime("%Y-%m-%d"), | ||||
"max_end_time": max(dataframe["end_time"]).strftime("%Y-%m-%d"), | "max_end_time": max(dataframe["end_time"]).strftime("%Y-%m-%d"), | ||||
"num_unique_bikes": str(dataframe["bike_id"].nunique()), | "num_unique_bikes": str(dataframe["bike_id"].nunique()), | ||||
"n_rows": len(dataframe), | "n_rows": len(dataframe), | ||||
"columns": str(dataframe.columns), | "columns": str(dataframe.columns), | ||||
} | } | ||||
SummaryStatsTripDataFrame = create_dagster_pandas_dataframe_type( | SummaryStatsTripDataFrame = create_dagster_pandas_dataframe_type( | ||||
name="SummaryStatsTripDataFrame", event_metadata_fn=compute_trip_dataframe_summary_statistics | name="SummaryStatsTripDataFrame", | ||||
event_metadata_fn=compute_trip_dataframe_summary_statistics, | |||||
) | ) | ||||
``` | ``` | ||||
Now if we run this pipeline in the Dagit playground: | Now if we run this pipeline in the Dagit playground: | ||||
<Image | <Image | ||||
alt="tutorial1.png" | alt="tutorial1.png" | ||||
src="/images/guides/dagster_pandas/tutorial1.png" | src="/images/guides/dagster_pandas/tutorial1.png" | ||||
Show All 11 Lines | |||||
class DivisibleByFiveConstraint(ColumnConstraint): | class DivisibleByFiveConstraint(ColumnConstraint): | ||||
def __init__(self): | def __init__(self): | ||||
message = "Value must be divisible by 5" | message = "Value must be divisible by 5" | ||||
super(DivisibleByFiveConstraint, self).__init__( | super(DivisibleByFiveConstraint, self).__init__( | ||||
error_description=message, markdown_description=message | error_description=message, markdown_description=message | ||||
) | ) | ||||
def validate(self, dataframe, column_name): | def validate(self, dataframe, column_name): | ||||
rows_with_unexpected_buckets = dataframe[dataframe[column_name].apply(lambda x: x % 5 != 0)] | rows_with_unexpected_buckets = dataframe[ | ||||
dataframe[column_name].apply(lambda x: x % 5 != 0) | |||||
] | |||||
if not rows_with_unexpected_buckets.empty: | if not rows_with_unexpected_buckets.empty: | ||||
raise ColumnConstraintViolationException( | raise ColumnConstraintViolationException( | ||||
constraint_name=self.name, | constraint_name=self.name, | ||||
constraint_description=self.error_description, | constraint_description=self.error_description, | ||||
column_name=column_name, | column_name=column_name, | ||||
offending_rows=rows_with_unexpected_buckets, | offending_rows=rows_with_unexpected_buckets, | ||||
) | ) | ||||
CustomTripDataFrame = create_dagster_pandas_dataframe_type( | CustomTripDataFrame = create_dagster_pandas_dataframe_type( | ||||
name="CustomTripDataFrame", | name="CustomTripDataFrame", | ||||
columns=[ | columns=[ | ||||
PandasColumn( | PandasColumn( | ||||
"amount_paid", | "amount_paid", | ||||
constraints=[ColumnDTypeInSetConstraint({"int64"}), DivisibleByFiveConstraint()], | constraints=[ | ||||
ColumnDTypeInSetConstraint({"int64"}), | |||||
DivisibleByFiveConstraint(), | |||||
], | |||||
) | ) | ||||
], | ], | ||||
) | ) | ||||
``` | ``` |