Differential D6164 Diff 30546 python_modules/dagster-graphql/dagster_graphql/implementation/fetch_solids.py
Changeset View
Changeset View
Standalone View
Standalone View
python_modules/dagster-graphql/dagster_graphql/implementation/fetch_solids.py
from collections import OrderedDict, defaultdict | from collections import OrderedDict, defaultdict | ||||
from dagster import check | from dagster import check | ||||
from dagster.core.host_representation import ExternalRepository | from dagster.core.host_representation import ExternalRepository | ||||
from dagster_graphql.schema.pipelines import DauphinPipeline | |||||
from dagster_graphql.schema.solids import ( | |||||
DauphinSolidInvocationSite, | |||||
DauphinUsedSolid, | |||||
build_dauphin_solid_handles, | |||||
) | |||||
def get_solid(repo, name): | def get_solid(repo, name): | ||||
return get_used_solid_map(repo)[name] | return get_used_solid_map(repo)[name] | ||||
def get_solids(repo): | def get_solids(repo): | ||||
return get_used_solid_map(repo).values() | return get_used_solid_map(repo).values() | ||||
def get_used_solid_map(repo): | def get_used_solid_map(repo): | ||||
from ..schema.pipelines.pipeline import GraphenePipeline | |||||
from ..schema.solids import build_solid_handles | |||||
from ..schema.used_solid import ( | |||||
GrapheneSolidInvocationSite, | |||||
GrapheneUsedSolid, | |||||
) | |||||
check.inst_param(repo, "repo", ExternalRepository) | check.inst_param(repo, "repo", ExternalRepository) | ||||
inv_by_def_name = defaultdict(list) | inv_by_def_name = defaultdict(list) | ||||
definitions = [] | definitions = [] | ||||
for external_pipeline in repo.get_all_external_pipelines(): | for external_pipeline in repo.get_all_external_pipelines(): | ||||
for handle in build_dauphin_solid_handles( | for handle in build_solid_handles(external_pipeline, external_pipeline.dep_structure_index): | ||||
external_pipeline, external_pipeline.dep_structure_index | definition = handle.solid.get_solid_definition() | ||||
): | |||||
definition = handle.solid.get_dauphin_solid_definition() | |||||
if definition.name not in inv_by_def_name: | if definition.name not in inv_by_def_name: | ||||
definitions.append(definition) | definitions.append(definition) | ||||
inv_by_def_name[definition.name].append( | inv_by_def_name[definition.name].append( | ||||
DauphinSolidInvocationSite( | GrapheneSolidInvocationSite( | ||||
pipeline=DauphinPipeline(external_pipeline), solidHandle=handle, | pipeline=GraphenePipeline(external_pipeline), solidHandle=handle, | ||||
) | ) | ||||
) | ) | ||||
return OrderedDict( | return OrderedDict( | ||||
( | ( | ||||
definition.name, | definition.name, | ||||
DauphinUsedSolid( | GrapheneUsedSolid( | ||||
definition=definition, | definition=definition, | ||||
invocations=sorted( | invocations=sorted( | ||||
inv_by_def_name[definition.name], | inv_by_def_name[definition.name], | ||||
key=lambda i: i.solidHandle.handleID.to_string(), | key=lambda i: i.solidHandle.handleID.to_string(), | ||||
), | ), | ||||
), | ), | ||||
) | ) | ||||
for definition in sorted(definitions, key=lambda d: d.name) | for definition in sorted(definitions, key=lambda d: d.name) | ||||
) | ) |