Changeset View
Changeset View
Standalone View
Standalone View
examples/hacker_news/hacker_news/resources/parquet_io_manager.py
Show All 40 Lines | ): | ||||
yield EventMetadataEntry.path(path=path, label="path") | yield EventMetadataEntry.path(path=path, label="path") | ||||
def load_input(self, context) -> Union[pyspark.sql.DataFrame, str]: | def load_input(self, context) -> Union[pyspark.sql.DataFrame, str]: | ||||
# In this load_input function, we vary the behavior based on the type of the downstream input | # In this load_input function, we vary the behavior based on the type of the downstream input | ||||
path = self._get_path(context.upstream_output) | path = self._get_path(context.upstream_output) | ||||
if context.dagster_type.typing_type == pyspark.sql.DataFrame: | if context.dagster_type.typing_type == pyspark.sql.DataFrame: | ||||
# return pyspark dataframe | # return pyspark dataframe | ||||
return context.resources.pyspark.spark_session.read.parquet(path) | return context.resources.pyspark.spark_session.read.parquet(path) | ||||
elif context.dagster_type.key == "String": | elif context.dagster_type.typing_type == str: | ||||
# return path to parquet files | # return path to parquet files | ||||
return path | return path | ||||
return check.failed( | return check.failed( | ||||
f"Inputs of type {context.dagster_type} not supported. Please specify a valid type " | f"Inputs of type {context.dagster_type} not supported. Please specify a valid type " | ||||
"for this input either in the solid signature or on the corresponding InputDefinition." | "for this input either in the solid signature or on the corresponding InputDefinition." | ||||
) | ) | ||||
Show All 38 Lines |