Differential D8385 Diff 41667 python_modules/dagster-graphql/dagster_graphql/implementation/execution/__init__.py
Changeset View
Changeset View
Standalone View
Standalone View
python_modules/dagster-graphql/dagster_graphql/implementation/execution/__init__.py
from typing import Optional, Tuple | |||||
from dagster import check | from dagster import check | ||||
from dagster.core.storage.captured_log_manager import CapturedLogManager | |||||
from dagster.core.storage.compute_log_manager import ComputeIOType | from dagster.core.storage.compute_log_manager import ComputeIOType | ||||
from dagster.core.storage.pipeline_run import PipelineRunStatus | from dagster.core.storage.pipeline_run import PipelineRunStatus | ||||
from dagster.serdes import serialize_dagster_namedtuple | from dagster.serdes import serialize_dagster_namedtuple | ||||
from dagster.utils.error import serializable_error_info_from_exc_info | from dagster.utils.error import serializable_error_info_from_exc_info | ||||
from graphql.execution.base import ResolveInfo | from graphql.execution.base import ResolveInfo | ||||
from rx import Observable | from rx import Observable | ||||
from ..external import ( | from ..external import ( | ||||
▲ Show 20 Lines • Show All 138 Lines • ▼ Show 20 Lines | ).map( | ||||
lambda events: GraphenePipelineRunLogsSubscriptionSuccess( | lambda events: GraphenePipelineRunLogsSubscriptionSuccess( | ||||
run=GraphenePipelineRun(run), | run=GraphenePipelineRun(run), | ||||
messages=[from_event_record(event, run.pipeline_name) for event in events], | messages=[from_event_record(event, run.pipeline_name) for event in events], | ||||
) | ) | ||||
) | ) | ||||
def get_compute_log_observable(graphene_info, run_id, step_key, io_type, cursor=None): | def get_compute_log_observable(graphene_info, run_id, step_key, io_type, cursor=None): | ||||
from ...schema.logs.compute_logs import from_compute_log_file | from ...schema.logs.compute_logs import ( | ||||
from_compute_log_file, | |||||
captured_log_callback, | |||||
captured_log_update_to_graphene, | |||||
) | |||||
check.inst_param(graphene_info, "graphene_info", ResolveInfo) | check.inst_param(graphene_info, "graphene_info", ResolveInfo) | ||||
check.str_param(run_id, "run_id") | check.str_param(run_id, "run_id") | ||||
check.str_param(step_key, "step_key") | check.str_param(step_key, "step_key") | ||||
check.inst_param(io_type, "io_type", ComputeIOType) | check.inst_param(io_type, "io_type", ComputeIOType) | ||||
check.opt_str_param(cursor, "cursor") | check.opt_str_param(cursor, "cursor") | ||||
return graphene_info.context.instance.compute_log_manager.observable( | compute_log_manager = graphene_info.context.instance.compute_log_manager | ||||
run_id, step_key, io_type, cursor | |||||
).map(lambda update: from_compute_log_file(graphene_info, update)) | if isinstance(compute_log_manager, CapturedLogManager): | ||||
# The captured_log_callback sets up a loop that pushes compute log updates to the created | |||||
# observable, using the captured log manager API. The captured_log_update_to_graphene | |||||
# converts the captured log updates to the appropriate Graphene response objects. | |||||
on_captured_log_update = captured_log_callback( | |||||
compute_log_manager, step_key, run_id, io_type, cursor | |||||
) | |||||
observable = Observable.create(on_captured_log_update) # pylint: disable=E1101 | |||||
return observable.map(captured_log_update_to_graphene) | |||||
else: | |||||
# The legacy compute log manager implemented an `observable` method, which was convenient | |||||
# but strongly coupled the GraphQL response payload with the core compute log behavior. | |||||
return compute_log_manager.observable(run_id, step_key, io_type, cursor).map( | |||||
lambda update: from_compute_log_file(graphene_info, update) | |||||
) | |||||
@capture_error | @capture_error | ||||
def wipe_assets(graphene_info, asset_keys): | def wipe_assets(graphene_info, asset_keys): | ||||
from ...schema.roots.mutation import GrapheneAssetWipeSuccess | from ...schema.roots.mutation import GrapheneAssetWipeSuccess | ||||
instance = graphene_info.context.instance | instance = graphene_info.context.instance | ||||
instance.wipe_assets(asset_keys) | instance.wipe_assets(asset_keys) | ||||
return GrapheneAssetWipeSuccess(assetKeys=asset_keys) | return GrapheneAssetWipeSuccess(assetKeys=asset_keys) | ||||
alangenfeld: + comments |
+ comments