Changeset View
Changeset View
Standalone View
Standalone View
python_modules/libraries/dagster-ge/dagster_ge/factory.py
import datetime | |||||
import great_expectations as ge | import great_expectations as ge | ||||
from dagster_pandas import DataFrame | from dagster_pandas import DataFrame | ||||
from great_expectations.core import convert_to_json_serializable | from great_expectations.core import convert_to_json_serializable | ||||
from great_expectations.render.page_renderer_util import ( | from great_expectations.render.page_renderer_util import ( | ||||
render_multiple_validation_result_pages_markdown, | render_multiple_validation_result_pages_markdown, | ||||
) | ) | ||||
from dagster import ( | from dagster import ( | ||||
EventMetadataEntry, | EventMetadataEntry, | ||||
ExpectationResult, | ExpectationResult, | ||||
InputDefinition, | InputDefinition, | ||||
Noneable, | Noneable, | ||||
Output, | Output, | ||||
OutputDefinition, | OutputDefinition, | ||||
StringSource, | StringSource, | ||||
check, | check, | ||||
resource, | resource, | ||||
solid, | solid, | ||||
) | ) | ||||
from dagster.seven import get_current_datetime_in_utc | |||||
@resource(config_schema={"ge_root_dir": Noneable(StringSource)}) | @resource(config_schema={"ge_root_dir": Noneable(StringSource)}) | ||||
def ge_data_context(context): | def ge_data_context(context): | ||||
if context.resource_config["ge_root_dir"] is None: | if context.resource_config["ge_root_dir"] is None: | ||||
yield ge.data_context.DataContext() | yield ge.data_context.DataContext() | ||||
else: | else: | ||||
yield ge.data_context.DataContext(context_root_dir=context.resource_config["ge_root_dir"]) | yield ge.data_context.DataContext(context_root_dir=context.resource_config["ge_root_dir"]) | ||||
▲ Show 20 Lines • Show All 69 Lines • ▼ Show 20 Lines | def ge_validation_solid(context, dataset): | ||||
context.log.warning( | context.log.warning( | ||||
"`datasource` field of `batch_kwargs` will be ignored; use the `datasource_name` " | "`datasource` field of `batch_kwargs` will be ignored; use the `datasource_name` " | ||||
"parameter of the solid factory instead." | "parameter of the solid factory instead." | ||||
) | ) | ||||
final_batch_kwargs["datasource"] = datasource_name | final_batch_kwargs["datasource"] = datasource_name | ||||
batch = data_context.get_batch(final_batch_kwargs, suite) | batch = data_context.get_batch(final_batch_kwargs, suite) | ||||
run_id = { | run_id = { | ||||
"run_name": datasource_name + " run", | "run_name": datasource_name + " run", | ||||
"run_time": datetime.datetime.utcnow(), | "run_time": get_current_datetime_in_utc(), | ||||
} | } | ||||
results = data_context.run_validation_operator( | results = data_context.run_validation_operator( | ||||
validation_operator, assets_to_validate=[batch], run_id=run_id | validation_operator, assets_to_validate=[batch], run_id=run_id | ||||
) | ) | ||||
res = convert_to_json_serializable(results.list_validation_results())[0] | res = convert_to_json_serializable(results.list_validation_results())[0] | ||||
md_str = render_multiple_validation_result_pages_markdown( | md_str = render_multiple_validation_result_pages_markdown( | ||||
validation_operator_result=results, run_info_at_end=True, | validation_operator_result=results, run_info_at_end=True, | ||||
) | ) | ||||
meta_stats = EventMetadataEntry.md(md_str=md_str, label="Expectation Results") | meta_stats = EventMetadataEntry.md(md_str=md_str, label="Expectation Results") | ||||
yield ExpectationResult( | yield ExpectationResult( | ||||
success=res["success"], metadata_entries=[meta_stats,], | success=res["success"], metadata_entries=[meta_stats,], | ||||
) | ) | ||||
yield Output(res) | yield Output(res) | ||||
return ge_validation_solid | return ge_validation_solid |