Differential D8675 Diff 40779 python_modules/dagster-graphql/dagster_graphql/schema/pipelines/pipeline.py
Changeset View
Changeset View
Standalone View
Standalone View
python_modules/dagster-graphql/dagster_graphql/schema/pipelines/pipeline.py
from functools import lru_cache | |||||
import graphene | import graphene | ||||
import yaml | import yaml | ||||
from dagster import check | from dagster import check | ||||
from dagster.core.events import StepMaterializationData | from dagster.core.events import StepMaterializationData | ||||
from dagster.core.events.log import EventLogEntry | from dagster.core.events.log import EventLogEntry | ||||
from dagster.core.host_representation.external import ExternalExecutionPlan, ExternalPipeline | from dagster.core.host_representation.external import ExternalExecutionPlan, ExternalPipeline | ||||
from dagster.core.host_representation.external_data import ExternalPresetData | from dagster.core.host_representation.external_data import ExternalPresetData | ||||
from dagster.core.host_representation.represented import RepresentedPipeline | from dagster.core.host_representation.represented import RepresentedPipeline | ||||
Show All 21 Lines | from ..logs.events import ( | ||||
GraphenePipelineRunStepStats, | GraphenePipelineRunStepStats, | ||||
GrapheneStepMaterializationEvent, | GrapheneStepMaterializationEvent, | ||||
) | ) | ||||
from ..paging import GrapheneCursor | from ..paging import GrapheneCursor | ||||
from ..repository_origin import GrapheneRepositoryOrigin | from ..repository_origin import GrapheneRepositoryOrigin | ||||
from ..schedules.schedules import GrapheneSchedule | from ..schedules.schedules import GrapheneSchedule | ||||
from ..sensors import GrapheneSensor | from ..sensors import GrapheneSensor | ||||
from ..solids import ( | from ..solids import ( | ||||
GrapheneGraphDefinition, | |||||
GrapheneSolid, | GrapheneSolid, | ||||
GrapheneSolidContainer, | |||||
GrapheneSolidHandle, | GrapheneSolidHandle, | ||||
build_solid_handles, | build_solid_handles, | ||||
build_solids, | build_solids, | ||||
) | ) | ||||
from ..tags import GrapheneAssetTag, GraphenePipelineTag | from ..tags import GrapheneAssetTag, GraphenePipelineTag | ||||
from ..util import non_null_list | from ..util import non_null_list | ||||
from .mode import GrapheneMode | from .mode import GrapheneMode | ||||
from .pipeline_ref import GraphenePipelineReference | from .pipeline_ref import GraphenePipelineReference | ||||
▲ Show 20 Lines • Show All 243 Lines • ▼ Show 20 Lines | class GrapheneIPipelineSnapshotMixin: | ||||
description = graphene.String() | description = graphene.String() | ||||
id = graphene.NonNull(graphene.ID) | id = graphene.NonNull(graphene.ID) | ||||
pipeline_snapshot_id = graphene.NonNull(graphene.String) | pipeline_snapshot_id = graphene.NonNull(graphene.String) | ||||
dagster_types = non_null_list(GrapheneDagsterType) | dagster_types = non_null_list(GrapheneDagsterType) | ||||
dagster_type_or_error = graphene.Field( | dagster_type_or_error = graphene.Field( | ||||
graphene.NonNull(GrapheneDagsterTypeOrError), | graphene.NonNull(GrapheneDagsterTypeOrError), | ||||
dagsterTypeName=graphene.Argument(graphene.NonNull(graphene.String)), | dagsterTypeName=graphene.Argument(graphene.NonNull(graphene.String)), | ||||
) | ) | ||||
solids = non_null_list(GrapheneSolid) | graph = non_null_list(GrapheneGraphDefinition) | ||||
modes = non_null_list(GrapheneMode) | modes = non_null_list(GrapheneMode) | ||||
solid_handles = graphene.Field( | |||||
non_null_list(GrapheneSolidHandle), parentHandleID=graphene.String() | |||||
) | |||||
solid_handle = graphene.Field( | |||||
GrapheneSolidHandle, | |||||
handleID=graphene.Argument(graphene.NonNull(graphene.String)), | |||||
) | |||||
tags = non_null_list(GraphenePipelineTag) | tags = non_null_list(GraphenePipelineTag) | ||||
runs = graphene.Field( | runs = graphene.Field( | ||||
non_null_list(GraphenePipelineRun), | non_null_list(GraphenePipelineRun), | ||||
cursor=graphene.String(), | cursor=graphene.String(), | ||||
limit=graphene.Int(), | limit=graphene.Int(), | ||||
) | ) | ||||
schedules = non_null_list(GrapheneSchedule) | schedules = non_null_list(GrapheneSchedule) | ||||
sensors = non_null_list(GrapheneSensor) | sensors = non_null_list(GrapheneSensor) | ||||
Show All 9 Lines | def resolve_pipeline_snapshot_id(self, _graphene_info): | ||||
return self.get_represented_pipeline().identifying_pipeline_snapshot_id | return self.get_represented_pipeline().identifying_pipeline_snapshot_id | ||||
def resolve_id(self, _graphene_info): | def resolve_id(self, _graphene_info): | ||||
return self.get_represented_pipeline().identifying_pipeline_snapshot_id | return self.get_represented_pipeline().identifying_pipeline_snapshot_id | ||||
def resolve_name(self, _graphene_info): | def resolve_name(self, _graphene_info): | ||||
return self.get_represented_pipeline().name | return self.get_represented_pipeline().name | ||||
def resolve_graph(self, _graphene_info): | |||||
return GrapheneGraphDefinition( | |||||
self.get_represented_pipeline(), self.get_represented_pipeline().graph_name | |||||
) | |||||
def resolve_description(self, _graphene_info): | def resolve_description(self, _graphene_info): | ||||
return self.get_represented_pipeline().description | return self.get_represented_pipeline().description | ||||
def resolve_dagster_types(self, _graphene_info): | def resolve_dagster_types(self, _graphene_info): | ||||
represented_pipeline = self.get_represented_pipeline() | represented_pipeline = self.get_represented_pipeline() | ||||
return sorted( | return sorted( | ||||
list( | list( | ||||
map( | map( | ||||
Show All 15 Lines | def resolve_dagster_type_or_error(self, _graphene_info, **kwargs): | ||||
GrapheneDagsterTypeNotFoundError(dagster_type_name=type_name) | GrapheneDagsterTypeNotFoundError(dagster_type_name=type_name) | ||||
) | ) | ||||
return to_dagster_type( | return to_dagster_type( | ||||
represented_pipeline.pipeline_snapshot, | represented_pipeline.pipeline_snapshot, | ||||
represented_pipeline.get_dagster_type_by_name(type_name).key, | represented_pipeline.get_dagster_type_by_name(type_name).key, | ||||
) | ) | ||||
def resolve_solids(self, _graphene_info): | |||||
represented_pipeline = self.get_represented_pipeline() | |||||
return build_solids( | |||||
represented_pipeline, | |||||
represented_pipeline.dep_structure_index, | |||||
) | |||||
def resolve_modes(self, _graphene_info): | def resolve_modes(self, _graphene_info): | ||||
represented_pipeline = self.get_represented_pipeline() | represented_pipeline = self.get_represented_pipeline() | ||||
return [ | return [ | ||||
GrapheneMode( | GrapheneMode( | ||||
represented_pipeline.config_schema_snapshot, | represented_pipeline.config_schema_snapshot, | ||||
represented_pipeline.identifying_pipeline_snapshot_id, | represented_pipeline.identifying_pipeline_snapshot_id, | ||||
mode_def_snap, | mode_def_snap, | ||||
) | ) | ||||
for mode_def_snap in sorted( | for mode_def_snap in sorted( | ||||
represented_pipeline.mode_def_snaps, key=lambda item: item.name | represented_pipeline.mode_def_snaps, key=lambda item: item.name | ||||
) | ) | ||||
] | ] | ||||
def resolve_solid_handle(self, _graphene_info, handleID): | |||||
return _get_solid_handles(self.get_represented_pipeline()).get(handleID) | |||||
def resolve_solid_handles(self, _graphene_info, **kwargs): | |||||
handles = _get_solid_handles(self.get_represented_pipeline()) | |||||
parentHandleID = kwargs.get("parentHandleID") | |||||
if parentHandleID == "": | |||||
handles = {key: handle for key, handle in handles.items() if not handle.parent} | |||||
elif parentHandleID is not None: | |||||
handles = { | |||||
key: handle | |||||
for key, handle in handles.items() | |||||
if handle.parent and handle.parent.handleID.to_string() == parentHandleID | |||||
} | |||||
return [handles[key] for key in sorted(handles)] | |||||
def resolve_tags(self, _graphene_info): | def resolve_tags(self, _graphene_info): | ||||
represented_pipeline = self.get_represented_pipeline() | represented_pipeline = self.get_represented_pipeline() | ||||
return [ | return [ | ||||
GraphenePipelineTag(key=key, value=value) | GraphenePipelineTag(key=key, value=value) | ||||
for key, value in represented_pipeline.pipeline_snapshot.tags.items() | for key, value in represented_pipeline.pipeline_snapshot.tags.items() | ||||
] | ] | ||||
def resolve_solidSelection(self, _graphene_info): | def resolve_solidSelection(self, _graphene_info): | ||||
Show All 37 Lines | class GrapheneIPipelineSnapshot(graphene.Interface): | ||||
name = graphene.NonNull(graphene.String) | name = graphene.NonNull(graphene.String) | ||||
description = graphene.String() | description = graphene.String() | ||||
pipeline_snapshot_id = graphene.NonNull(graphene.String) | pipeline_snapshot_id = graphene.NonNull(graphene.String) | ||||
dagster_types = non_null_list(GrapheneDagsterType) | dagster_types = non_null_list(GrapheneDagsterType) | ||||
dagster_type_or_error = graphene.Field( | dagster_type_or_error = graphene.Field( | ||||
graphene.NonNull(GrapheneDagsterTypeOrError), | graphene.NonNull(GrapheneDagsterTypeOrError), | ||||
dagsterTypeName=graphene.Argument(graphene.NonNull(graphene.String)), | dagsterTypeName=graphene.Argument(graphene.NonNull(graphene.String)), | ||||
) | ) | ||||
solids = non_null_list(GrapheneSolid) | |||||
modes = non_null_list(GrapheneMode) | modes = non_null_list(GrapheneMode) | ||||
solid_handles = graphene.Field( | |||||
non_null_list(GrapheneSolidHandle), parentHandleID=graphene.String() | |||||
) | |||||
solid_handle = graphene.Field( | |||||
GrapheneSolidHandle, | |||||
handleID=graphene.Argument(graphene.NonNull(graphene.String)), | |||||
) | |||||
tags = non_null_list(GraphenePipelineTag) | tags = non_null_list(GraphenePipelineTag) | ||||
class Meta: | class Meta: | ||||
name = "IPipelineSnapshot" | name = "IPipelineSnapshot" | ||||
class GraphenePipelinePreset(graphene.ObjectType): | class GraphenePipelinePreset(graphene.ObjectType): | ||||
name = graphene.NonNull(graphene.String) | name = graphene.NonNull(graphene.String) | ||||
Show All 40 Lines | class GraphenePipeline(GrapheneIPipelineSnapshotMixin, graphene.ObjectType): | ||||
presets = non_null_list(GraphenePipelinePreset) | presets = non_null_list(GraphenePipelinePreset) | ||||
runs = graphene.Field( | runs = graphene.Field( | ||||
non_null_list(GraphenePipelineRun), | non_null_list(GraphenePipelineRun), | ||||
cursor=graphene.String(), | cursor=graphene.String(), | ||||
limit=graphene.Int(), | limit=graphene.Int(), | ||||
) | ) | ||||
class Meta: | class Meta: | ||||
interfaces = (GrapheneSolidContainer, GrapheneIPipelineSnapshot) | interfaces = (GrapheneIPipelineSnapshot,) | ||||
name = "Pipeline" | name = "Pipeline" | ||||
def __init__(self, external_pipeline): | def __init__(self, external_pipeline): | ||||
super().__init__() | super().__init__() | ||||
self._external_pipeline = check.inst_param( | self._external_pipeline = check.inst_param( | ||||
external_pipeline, "external_pipeline", ExternalPipeline | external_pipeline, "external_pipeline", ExternalPipeline | ||||
) | ) | ||||
def resolve_id(self, _graphene_info): | def resolve_id(self, _graphene_info): | ||||
return self._external_pipeline.get_external_origin_id() | return self._external_pipeline.get_external_origin_id() | ||||
def get_represented_pipeline(self): | def get_represented_pipeline(self): | ||||
return self._external_pipeline | return self._external_pipeline | ||||
def resolve_presets(self, _graphene_info): | def resolve_presets(self, _graphene_info): | ||||
return [ | return [ | ||||
GraphenePipelinePreset(preset, self._external_pipeline.name) | GraphenePipelinePreset(preset, self._external_pipeline.name) | ||||
for preset in sorted(self._external_pipeline.active_presets, key=lambda item: item.name) | for preset in sorted(self._external_pipeline.active_presets, key=lambda item: item.name) | ||||
] | ] | ||||
@lru_cache(maxsize=32) | |||||
def _get_solid_handles(represented_pipeline): | |||||
check.inst_param(represented_pipeline, "represented_pipeline", RepresentedPipeline) | |||||
return { | |||||
str(item.handleID): item | |||||
for item in build_solid_handles( | |||||
represented_pipeline, represented_pipeline.dep_structure_index | |||||
) | |||||
} | |||||
class GraphenePipelineRunOrError(graphene.Union): | class GraphenePipelineRunOrError(graphene.Union): | ||||
class Meta: | class Meta: | ||||
types = (GraphenePipelineRun, GraphenePipelineRunNotFoundError, GraphenePythonError) | types = (GraphenePipelineRun, GraphenePipelineRunNotFoundError, GraphenePythonError) | ||||
name = "PipelineRunOrError" | name = "PipelineRunOrError" |