Differential D8684 Diff 40955 examples/docs_snippets/docs_snippets/legacy/dagster_pandas_guide/custom_column_constraint_pipeline.py
Changeset View
Changeset View
Standalone View
Standalone View
examples/docs_snippets/docs_snippets/legacy/dagster_pandas_guide/custom_column_constraint_pipeline.py
Show All 14 Lines | |||||
class DivisibleByFiveConstraint(ColumnConstraint): | class DivisibleByFiveConstraint(ColumnConstraint): | ||||
def __init__(self): | def __init__(self): | ||||
message = "Value must be divisible by 5" | message = "Value must be divisible by 5" | ||||
super(DivisibleByFiveConstraint, self).__init__( | super(DivisibleByFiveConstraint, self).__init__( | ||||
error_description=message, markdown_description=message | error_description=message, markdown_description=message | ||||
) | ) | ||||
def validate(self, dataframe, column_name): | def validate(self, dataframe, column_name): | ||||
rows_with_unexpected_buckets = dataframe[dataframe[column_name].apply(lambda x: x % 5 != 0)] | rows_with_unexpected_buckets = dataframe[ | ||||
dataframe[column_name].apply(lambda x: x % 5 != 0) | |||||
] | |||||
if not rows_with_unexpected_buckets.empty: | if not rows_with_unexpected_buckets.empty: | ||||
raise ColumnConstraintViolationException( | raise ColumnConstraintViolationException( | ||||
constraint_name=self.name, | constraint_name=self.name, | ||||
constraint_description=self.error_description, | constraint_description=self.error_description, | ||||
column_name=column_name, | column_name=column_name, | ||||
offending_rows=rows_with_unexpected_buckets, | offending_rows=rows_with_unexpected_buckets, | ||||
) | ) | ||||
CustomTripDataFrame = create_dagster_pandas_dataframe_type( | CustomTripDataFrame = create_dagster_pandas_dataframe_type( | ||||
name="CustomTripDataFrame", | name="CustomTripDataFrame", | ||||
columns=[ | columns=[ | ||||
PandasColumn( | PandasColumn( | ||||
"amount_paid", | "amount_paid", | ||||
constraints=[ColumnDTypeInSetConstraint({"int64"}), DivisibleByFiveConstraint()], | constraints=[ | ||||
ColumnDTypeInSetConstraint({"int64"}), | |||||
DivisibleByFiveConstraint(), | |||||
], | |||||
) | ) | ||||
], | ], | ||||
) | ) | ||||
# end_custom_col | # end_custom_col | ||||
@solid( | @solid( | ||||
output_defs=[OutputDefinition(name="custom_trip_dataframe", dagster_type=CustomTripDataFrame)], | output_defs=[ | ||||
OutputDefinition( | |||||
name="custom_trip_dataframe", dagster_type=CustomTripDataFrame | |||||
) | |||||
], | |||||
) | ) | ||||
def load_custom_trip_dataframe() -> DataFrame: | def load_custom_trip_dataframe() -> DataFrame: | ||||
return read_csv( | return read_csv( | ||||
script_relative_path("./ebike_trips.csv"), | script_relative_path("./ebike_trips.csv"), | ||||
parse_dates=["start_time", "end_time"], | parse_dates=["start_time", "end_time"], | ||||
date_parser=lambda x: datetime.strptime(x, "%Y-%m-%d %H:%M:%S.%f"), | date_parser=lambda x: datetime.strptime(x, "%Y-%m-%d %H:%M:%S.%f"), | ||||
) | ) | ||||
@pipeline | @pipeline | ||||
def custom_column_constraint_pipeline(): | def custom_column_constraint_pipeline(): | ||||
load_custom_trip_dataframe() | load_custom_trip_dataframe() |