Changeset View
Changeset View
Standalone View
Standalone View
python_modules/dagster-test/dagster_test/toys/many_events.py
from dagster import ( | from dagster import ( | ||||
AssetMaterialization, | AssetMaterialization, | ||||
EventMetadata, | EventMetadata, | ||||
ExpectationResult, | ExpectationResult, | ||||
InputDefinition, | In, | ||||
Nothing, | Nothing, | ||||
Out, | |||||
Output, | Output, | ||||
OutputDefinition, | |||||
file_relative_path, | file_relative_path, | ||||
pipeline, | graph, | ||||
solid, | op, | ||||
) | ) | ||||
MARKDOWN_EXAMPLE = "markdown_example.md" | MARKDOWN_EXAMPLE = "markdown_example.md" | ||||
raw_files = [ | raw_files = [ | ||||
"raw_file_users", | "raw_file_users", | ||||
"raw_file_groups", | "raw_file_groups", | ||||
"raw_file_events", | "raw_file_events", | ||||
"raw_file_friends", | "raw_file_friends", | ||||
"raw_file_pages", | "raw_file_pages", | ||||
"raw_file_fans", | "raw_file_fans", | ||||
"raw_file_event_admins", | "raw_file_event_admins", | ||||
"raw_file_group_admins", | "raw_file_group_admins", | ||||
] | ] | ||||
def create_raw_file_solid(name): | def create_raw_file_solid(name): | ||||
def do_expectation(_context, _value): | @op( | ||||
return ExpectationResult( | |||||
success=True, | |||||
label="output_table_exists", | |||||
description="Checked {name} exists".format(name=name), | |||||
) | |||||
@solid( | |||||
name=name, | name=name, | ||||
description="Inject raw file for input to table {} and do expectation on output".format( | description=f"Inject raw file for input to table {name} and do expectation on output", | ||||
name | |||||
), | |||||
) | ) | ||||
def raw_file_solid(_context): | def raw_file_solid(): | ||||
yield AssetMaterialization( | yield AssetMaterialization( | ||||
asset_key="table_info", | asset_key="table_info", | ||||
metadata={"table_path": EventMetadata.path("/path/to/{}.raw".format(name))}, | metadata={"table_path": EventMetadata.path("/path/to/{}.raw".format(name))}, | ||||
) | ) | ||||
yield do_expectation(_context, name) | yield ExpectationResult( | ||||
success=True, | |||||
label="output_table_exists", | |||||
description=f"Checked {name} exists", | |||||
) | |||||
yield Output(name) | yield Output(name) | ||||
return raw_file_solid | return raw_file_solid | ||||
raw_tables = [ | raw_tables = [ | ||||
"raw_users", | "raw_users", | ||||
"raw_groups", | "raw_groups", | ||||
Show All 9 Lines | |||||
def create_raw_file_solids(): | def create_raw_file_solids(): | ||||
return list(map(create_raw_file_solid, raw_files)) | return list(map(create_raw_file_solid, raw_files)) | ||||
def input_name_for_raw_file(raw_file): | def input_name_for_raw_file(raw_file): | ||||
return raw_file + "_ready" | return raw_file + "_ready" | ||||
@solid( | @op( | ||||
input_defs=[InputDefinition("start", Nothing)], | ins={"start": In(dagster_type=Nothing)}, | ||||
output_defs=[OutputDefinition(Nothing)], | out=Out(dagster_type=Nothing), | ||||
description="Load a bunch of raw tables from corresponding files", | description="Load a bunch of raw tables from corresponding files", | ||||
) | ) | ||||
def many_table_materializations(_context): | def many_table_materializations(): | ||||
with open(file_relative_path(__file__, MARKDOWN_EXAMPLE), "r") as f: | with open(file_relative_path(__file__, MARKDOWN_EXAMPLE), "r") as f: | ||||
md_str = f.read() | md_str = f.read() | ||||
for table in raw_tables: | for table in raw_tables: | ||||
yield AssetMaterialization( | yield AssetMaterialization( | ||||
asset_key="table_info", | asset_key="table_info", | ||||
metadata={ | metadata={ | ||||
"table_name": table, | "table_name": table, | ||||
"table_path": EventMetadata.path(f"/path/to/{table}"), | "table_path": EventMetadata.path(f"/path/to/{table}"), | ||||
"table_data": {"name": table}, | "table_data": {"name": table}, | ||||
"table_name_big": EventMetadata.url(f"https://bigty.pe/{table}"), | "table_name_big": EventMetadata.url(f"https://bigty.pe/{table}"), | ||||
"table_blurb": EventMetadata.md(md_str), | "table_blurb": EventMetadata.md(md_str), | ||||
"big_int": 29119888133298982934829348, | "big_int": 29119888133298982934829348, | ||||
"float_nan": float("nan"), | "float_nan": float("nan"), | ||||
}, | }, | ||||
) | ) | ||||
@solid( | @op( | ||||
input_defs=[InputDefinition("start", Nothing)], | ins={"start": In(dagster_type=Nothing)}, | ||||
output_defs=[OutputDefinition(Nothing)], | out=Out(dagster_type=Nothing), | ||||
description="This simulates a solid that would wrap something like dbt, " | description="This simulates a solid that would wrap something like dbt, " | ||||
"where it emits a bunch of tables and then say an expectation on each table, " | "where it emits a bunch of tables and then say an expectation on each table, " | ||||
"all in one solid", | "all in one solid", | ||||
) | ) | ||||
def many_materializations_and_passing_expectations(_context): | def many_materializations_and_passing_expectations(): | ||||
tables = [ | tables = [ | ||||
"users", | "users", | ||||
"groups", | "groups", | ||||
"events", | "events", | ||||
"friends", | "friends", | ||||
"pages", | "pages", | ||||
"fans", | "fans", | ||||
"event_admins", | "event_admins", | ||||
"group_admins", | "group_admins", | ||||
] | ] | ||||
for table in tables: | for table in tables: | ||||
yield AssetMaterialization( | yield AssetMaterialization( | ||||
asset_key="table_info", | asset_key="table_info", | ||||
metadata={ | metadata={ | ||||
"table_path": EventMetadata.path(f"/path/to/{table}.raw"), | "table_path": EventMetadata.path(f"/path/to/{table}.raw"), | ||||
}, | }, | ||||
) | ) | ||||
yield ExpectationResult( | yield ExpectationResult( | ||||
success=True, | success=True, | ||||
label="{table}.row_count".format(table=table), | label=f"{table}.row_count", | ||||
description="Row count passed for {table}".format(table=table), | description=f"Row count passed for {table}", | ||||
) | ) | ||||
@solid( | @op( | ||||
input_defs=[InputDefinition("start", Nothing)], | ins={"start": In(dagster_type=Nothing)}, | ||||
output_defs=[], | out=Out(dagster_type=Nothing), | ||||
description="A solid that just does a couple inline expectations, one of which fails", | description="A solid that just does a couple inline expectations, one of which fails", | ||||
) | ) | ||||
def check_users_and_groups_one_fails_one_succeeds(_context): | def check_users_and_groups_one_fails_one_succeeds(): | ||||
yield ExpectationResult( | yield ExpectationResult( | ||||
success=True, | success=True, | ||||
label="user_expectations", | label="user_expectations", | ||||
description="Battery of expectations for user", | description="Battery of expectations for user", | ||||
metadata={ | metadata={ | ||||
"table_summary": { | "table_summary": { | ||||
"columns": { | "columns": { | ||||
"name": {"nulls": 0, "empty": 0, "values": 123, "average_length": 3.394893}, | "name": {"nulls": 0, "empty": 0, "values": 123, "average_length": 3.394893}, | ||||
Show All 13 Lines | yield ExpectationResult( | ||||
"name": {"nulls": 1, "empty": 0, "values": 122, "average_length": 3.394893}, | "name": {"nulls": 1, "empty": 0, "values": 122, "average_length": 3.394893}, | ||||
"time_created": {"nulls": 1, "empty": 2, "values": 120, "average": 1231283}, | "time_created": {"nulls": 1, "empty": 2, "values": 120, "average": 1231283}, | ||||
} | } | ||||
} | } | ||||
}, | }, | ||||
) | ) | ||||
@solid( | @op( | ||||
input_defs=[InputDefinition("start", Nothing)], | ins={"start": In(dagster_type=Nothing)}, | ||||
output_defs=[], | out=Out(dagster_type=Nothing), | ||||
description="A solid that just does a couple inline expectations", | description="A solid that just does a couple inline expectations", | ||||
) | ) | ||||
def check_admins_both_succeed(_context): | def check_admins_both_succeed(): | ||||
yield ExpectationResult(success=True, label="Group admins check out") | yield ExpectationResult(success=True, label="Group admins check out") | ||||
yield ExpectationResult(success=True, label="Event admins check out") | yield ExpectationResult(success=True, label="Event admins check out") | ||||
@pipeline( | @graph( | ||||
description=( | description=( | ||||
"Demo pipeline that yields AssetMaterializations and ExpectationResults, along with the " | "Demo pipeline that yields AssetMaterializations and ExpectationResults, along with the " | ||||
"various forms of metadata that can be attached to them." | "various forms of metadata that can be attached to them." | ||||
) | ) | ||||
) | ) | ||||
def many_events(): | def many_events_graph(): | ||||
raw_files_solids = [raw_file_solid() for raw_file_solid in create_raw_file_solids()] | raw_files_solids = [raw_file_solid() for raw_file_solid in create_raw_file_solids()] | ||||
mtm = many_table_materializations(raw_files_solids) | mtm = many_table_materializations(raw_files_solids) | ||||
mmape = many_materializations_and_passing_expectations(mtm) | mmape = many_materializations_and_passing_expectations(mtm) | ||||
check_users_and_groups_one_fails_one_succeeds(mmape) | check_users_and_groups_one_fails_one_succeeds(mmape) | ||||
check_admins_both_succeed(mmape) | check_admins_both_succeed(mmape) | ||||
many_events_job = many_events_graph.to_job(name="many_events_job") |