Changeset View
Changeset View
Standalone View
Standalone View
python_modules/dagster-test/dagster_test/toys/log_s3.py
from dagster import AssetKey, AssetMaterialization, EventMetadata, Field, Output, pipeline, solid | from dagster import AssetKey, AssetMaterialization, EventMetadata, Field, Output, graph, op | ||||
@solid( | @op( | ||||
config_schema={ | config_schema={ | ||||
"bucket": Field(str, is_required=True), | "bucket": Field(str, is_required=True), | ||||
"s3_key": Field(str, is_required=True), | "s3_key": Field(str, is_required=True), | ||||
} | } | ||||
) | ) | ||||
def read_s3_key(context): | def read_s3_key(context): | ||||
s3_key = context.solid_config["s3_key"] | s3_key = context.solid_config["s3_key"] | ||||
bucket = context.solid_config["bucket"] | bucket = context.solid_config["bucket"] | ||||
path = f"s3://{bucket}/{s3_key}" | path = f"s3://{bucket}/{s3_key}" | ||||
context.log.info(f"Found file {path}") | context.log.info(f"Found file {path}") | ||||
yield AssetMaterialization( | yield AssetMaterialization( | ||||
asset_key=AssetKey(["log_s3", path]), | asset_key=AssetKey(["log_s3", path]), | ||||
metadata={"S3 path": EventMetadata.url(path)}, | metadata={"S3 path": EventMetadata.url(path)}, | ||||
) | ) | ||||
yield Output(path) | yield Output(path) | ||||
@pipeline(description="Demo pipeline that spits out some file info, given a path") | @graph(description="Demo pipeline that spits out some file info, given a path") | ||||
def log_s3_pipeline(): | def log_s3_graph(): | ||||
read_s3_key() | read_s3_key() | ||||
log_s3_job = log_s3_graph.to_job(name="log_s3_job") |