Differential D6164 Diff 30381 python_modules/dagster-graphql/dagster_graphql/implementation/fetch_partition_sets.py
Changeset View
Changeset View
Standalone View
Standalone View
python_modules/dagster-graphql/dagster_graphql/implementation/fetch_partition_sets.py
import yaml | import yaml | ||||
from dagster import check | from dagster import check | ||||
from dagster.core.host_representation import ( | from dagster.core.host_representation import ( | ||||
ExternalPartitionConfigData, | ExternalPartitionConfigData, | ||||
ExternalPartitionExecutionErrorData, | ExternalPartitionExecutionErrorData, | ||||
ExternalPartitionNamesData, | ExternalPartitionNamesData, | ||||
ExternalPartitionSet, | ExternalPartitionSet, | ||||
ExternalPartitionTagsData, | ExternalPartitionTagsData, | ||||
RepositoryHandle, | RepositoryHandle, | ||||
RepositorySelector, | RepositorySelector, | ||||
) | ) | ||||
from dagster.core.storage.pipeline_run import PipelineRunsFilter | from dagster.core.storage.pipeline_run import PipelineRunsFilter | ||||
from dagster.core.storage.tags import PARTITION_NAME_TAG, PARTITION_SET_TAG, TagType, get_tag_type | from dagster.core.storage.tags import PARTITION_NAME_TAG, PARTITION_SET_TAG, TagType, get_tag_type | ||||
from graphql.execution.base import ResolveInfo | from graphql.execution.base import ResolveInfo | ||||
from .utils import capture_dauphin_error | from .utils import capture_error | ||||
@capture_dauphin_error | @capture_error | ||||
def get_partition_sets_or_error(graphene_info, repository_selector, pipeline_name): | def get_partition_sets_or_error(graphene_info, repository_selector, pipeline_name): | ||||
from ..schema.partition_sets import PartitionSet, PartitionSets | |||||
check.inst_param(graphene_info, "graphene_info", ResolveInfo) | check.inst_param(graphene_info, "graphene_info", ResolveInfo) | ||||
check.inst_param(repository_selector, "repository_selector", RepositorySelector) | check.inst_param(repository_selector, "repository_selector", RepositorySelector) | ||||
check.str_param(pipeline_name, "pipeline_name") | check.str_param(pipeline_name, "pipeline_name") | ||||
location = graphene_info.context.get_repository_location(repository_selector.location_name) | location = graphene_info.context.get_repository_location(repository_selector.location_name) | ||||
repository = location.get_repository(repository_selector.repository_name) | repository = location.get_repository(repository_selector.repository_name) | ||||
partition_sets = [ | partition_sets = [ | ||||
partition_set | partition_set | ||||
for partition_set in repository.get_external_partition_sets() | for partition_set in repository.get_external_partition_sets() | ||||
if partition_set.pipeline_name == pipeline_name | if partition_set.pipeline_name == pipeline_name | ||||
] | ] | ||||
return graphene_info.schema.type_named("PartitionSets")( | return PartitionSets( | ||||
results=[ | results=[ | ||||
graphene_info.schema.type_named("PartitionSet")( | PartitionSet( | ||||
external_repository_handle=repository.handle, external_partition_set=partition_set, | external_repository_handle=repository.handle, external_partition_set=partition_set, | ||||
) | ) | ||||
for partition_set in sorted( | for partition_set in sorted( | ||||
partition_sets, | partition_sets, | ||||
key=lambda partition_set: ( | key=lambda partition_set: ( | ||||
partition_set.pipeline_name, | partition_set.pipeline_name, | ||||
partition_set.mode, | partition_set.mode, | ||||
partition_set.name, | partition_set.name, | ||||
), | ), | ||||
) | ) | ||||
] | ] | ||||
) | ) | ||||
@capture_dauphin_error | @capture_error | ||||
def get_partition_set(graphene_info, repository_selector, partition_set_name): | def get_partition_set(graphene_info, repository_selector, partition_set_name): | ||||
from ..schema.partition_sets import PartitionSet, PartitionSetNotFoundError | |||||
check.inst_param(graphene_info, "graphene_info", ResolveInfo) | check.inst_param(graphene_info, "graphene_info", ResolveInfo) | ||||
check.inst_param(repository_selector, "repository_selector", RepositorySelector) | check.inst_param(repository_selector, "repository_selector", RepositorySelector) | ||||
check.str_param(partition_set_name, "partition_set_name") | check.str_param(partition_set_name, "partition_set_name") | ||||
location = graphene_info.context.get_repository_location(repository_selector.location_name) | location = graphene_info.context.get_repository_location(repository_selector.location_name) | ||||
repository = location.get_repository(repository_selector.repository_name) | repository = location.get_repository(repository_selector.repository_name) | ||||
partition_sets = repository.get_external_partition_sets() | partition_sets = repository.get_external_partition_sets() | ||||
for partition_set in partition_sets: | for partition_set in partition_sets: | ||||
if partition_set.name == partition_set_name: | if partition_set.name == partition_set_name: | ||||
return graphene_info.schema.type_named("PartitionSet")( | return PartitionSet( | ||||
external_repository_handle=repository.handle, external_partition_set=partition_set, | external_repository_handle=repository.handle, external_partition_set=partition_set, | ||||
) | ) | ||||
return graphene_info.schema.type_named("PartitionSetNotFoundError")(partition_set_name) | return PartitionSetNotFoundError(partition_set_name) | ||||
@capture_dauphin_error | @capture_error | ||||
def get_partition_by_name(graphene_info, repository_handle, partition_set, partition_name): | def get_partition_by_name(graphene_info, repository_handle, partition_set, partition_name): | ||||
from ..schema.partition_sets import Partition | |||||
check.inst_param(graphene_info, "graphene_info", ResolveInfo) | check.inst_param(graphene_info, "graphene_info", ResolveInfo) | ||||
check.inst_param(repository_handle, "repository_handle", RepositoryHandle) | check.inst_param(repository_handle, "repository_handle", RepositoryHandle) | ||||
check.inst_param(partition_set, "partition_set", ExternalPartitionSet) | check.inst_param(partition_set, "partition_set", ExternalPartitionSet) | ||||
check.str_param(partition_name, "partition_name") | check.str_param(partition_name, "partition_name") | ||||
return graphene_info.schema.type_named("Partition")( | return Partition( | ||||
external_repository_handle=repository_handle, | external_repository_handle=repository_handle, | ||||
external_partition_set=partition_set, | external_partition_set=partition_set, | ||||
partition_name=partition_name, | partition_name=partition_name, | ||||
) | ) | ||||
def get_partition_config(graphene_info, repository_handle, partition_set_name, partition_name): | def get_partition_config(graphene_info, repository_handle, partition_set_name, partition_name): | ||||
from ..schema.errors import PythonError | |||||
from ..schema.partition_sets import PartitionRunConfig | |||||
check.inst_param(repository_handle, "repository_handle", RepositoryHandle) | check.inst_param(repository_handle, "repository_handle", RepositoryHandle) | ||||
check.str_param(partition_set_name, "partition_set_name") | check.str_param(partition_set_name, "partition_set_name") | ||||
check.str_param(partition_name, "partition_name") | check.str_param(partition_name, "partition_name") | ||||
result = graphene_info.context.get_external_partition_config( | result = graphene_info.context.get_external_partition_config( | ||||
repository_handle, partition_set_name, partition_name, | repository_handle, partition_set_name, partition_name, | ||||
) | ) | ||||
if isinstance(result, ExternalPartitionConfigData): | if isinstance(result, ExternalPartitionConfigData): | ||||
return graphene_info.schema.type_named("PartitionRunConfig")( | return PartitionRunConfig(yaml=yaml.safe_dump(result.run_config, default_flow_style=False)) | ||||
yaml=yaml.safe_dump(result.run_config, default_flow_style=False) | |||||
) | |||||
else: | else: | ||||
return graphene_info.schema.type_named("PythonError")(result.error) | return PythonError(result.error) | ||||
@capture_dauphin_error | @capture_error | ||||
def get_partition_tags(graphene_info, repository_handle, partition_set_name, partition_name): | def get_partition_tags(graphene_info, repository_handle, partition_set_name, partition_name): | ||||
from ..schema.errors import PythonError | |||||
from ..schema.partition_sets import PartitionTags | |||||
from ..schema.tags import PipelineTag | |||||
check.inst_param(repository_handle, "repository_handle", RepositoryHandle) | check.inst_param(repository_handle, "repository_handle", RepositoryHandle) | ||||
check.str_param(partition_set_name, "partition_set_name") | check.str_param(partition_set_name, "partition_set_name") | ||||
check.str_param(partition_name, "partition_name") | check.str_param(partition_name, "partition_name") | ||||
result = graphene_info.context.get_external_partition_tags( | result = graphene_info.context.get_external_partition_tags( | ||||
repository_handle, partition_set_name, partition_name | repository_handle, partition_set_name, partition_name | ||||
) | ) | ||||
if isinstance(result, ExternalPartitionTagsData): | if isinstance(result, ExternalPartitionTagsData): | ||||
return graphene_info.schema.type_named("PartitionTags")( | return PartitionTags( | ||||
results=[ | results=[ | ||||
graphene_info.schema.type_named("PipelineTag")(key=key, value=value) | PipelineTag(key=key, value=value) | ||||
for key, value in result.tags.items() | for key, value in result.tags.items() | ||||
if get_tag_type(key) != TagType.HIDDEN | if get_tag_type(key) != TagType.HIDDEN | ||||
] | ] | ||||
) | ) | ||||
else: | else: | ||||
return graphene_info.schema.type_named("PythonError")(result.error) | return PythonError(result.error) | ||||
@capture_dauphin_error | @capture_error | ||||
def get_partitions( | def get_partitions( | ||||
graphene_info, repository_handle, partition_set, cursor=None, limit=None, reverse=False | graphene_info, repository_handle, partition_set, cursor=None, limit=None, reverse=False | ||||
): | ): | ||||
from ..schema.errors import PythonError | |||||
from ..schema.partition_sets import Partition, Partitions | |||||
check.inst_param(repository_handle, "repository_handle", RepositoryHandle) | check.inst_param(repository_handle, "repository_handle", RepositoryHandle) | ||||
check.inst_param(partition_set, "partition_set", ExternalPartitionSet) | check.inst_param(partition_set, "partition_set", ExternalPartitionSet) | ||||
result = graphene_info.context.get_external_partition_names( | result = graphene_info.context.get_external_partition_names( | ||||
repository_handle, partition_set.name | repository_handle, partition_set.name | ||||
) | ) | ||||
if isinstance(result, ExternalPartitionNamesData): | if isinstance(result, ExternalPartitionNamesData): | ||||
partition_names = _apply_cursor_limit_reverse( | partition_names = _apply_cursor_limit_reverse( | ||||
result.partition_names, cursor, limit, reverse | result.partition_names, cursor, limit, reverse | ||||
) | ) | ||||
return graphene_info.schema.type_named("Partitions")( | return Partitions( | ||||
results=[ | results=[ | ||||
graphene_info.schema.type_named("Partition")( | Partition( | ||||
external_partition_set=partition_set, | external_partition_set=partition_set, | ||||
external_repository_handle=repository_handle, | external_repository_handle=repository_handle, | ||||
partition_name=partition_name, | partition_name=partition_name, | ||||
) | ) | ||||
for partition_name in partition_names | for partition_name in partition_names | ||||
] | ] | ||||
) | ) | ||||
else: | else: | ||||
assert isinstance(result, ExternalPartitionExecutionErrorData) | assert isinstance(result, ExternalPartitionExecutionErrorData) | ||||
return graphene_info.schema.type_named("PythonError")(result.error) | return PythonError(result.error) | ||||
def _apply_cursor_limit_reverse(items, cursor, limit, reverse): | def _apply_cursor_limit_reverse(items, cursor, limit, reverse): | ||||
start = 0 | start = 0 | ||||
end = len(items) | end = len(items) | ||||
index = 0 | index = 0 | ||||
if cursor: | if cursor: | ||||
index = next((idx for (idx, item) in enumerate(items) if item == cursor), None) | index = next((idx for (idx, item) in enumerate(items) if item == cursor), None) | ||||
if reverse: | if reverse: | ||||
end = index | end = index | ||||
else: | else: | ||||
start = index + 1 | start = index + 1 | ||||
if limit: | if limit: | ||||
if reverse: | if reverse: | ||||
start = end - limit | start = end - limit | ||||
else: | else: | ||||
end = start + limit | end = start + limit | ||||
return items[max(start, 0) : end] | return items[max(start, 0) : end] | ||||
@capture_dauphin_error | @capture_error | ||||
def get_partition_set_partition_statuses(graphene_info, repository_handle, partition_set_name): | def get_partition_set_partition_statuses(graphene_info, repository_handle, partition_set_name): | ||||
from ..schema.partition_sets import PartitionStatus, PartitionStatuses | |||||
check.inst_param(repository_handle, "repository_handle", RepositoryHandle) | check.inst_param(repository_handle, "repository_handle", RepositoryHandle) | ||||
check.str_param(partition_set_name, "partition_set_name") | check.str_param(partition_set_name, "partition_set_name") | ||||
result = graphene_info.context.get_external_partition_names( | result = graphene_info.context.get_external_partition_names( | ||||
repository_handle, partition_set_name | repository_handle, partition_set_name | ||||
) | ) | ||||
all_partition_set_runs = graphene_info.context.instance.get_runs( | all_partition_set_runs = graphene_info.context.instance.get_runs( | ||||
PipelineRunsFilter(tags={PARTITION_SET_TAG: partition_set_name}) | PipelineRunsFilter(tags={PARTITION_SET_TAG: partition_set_name}) | ||||
) | ) | ||||
runs_by_partition = {} | runs_by_partition = {} | ||||
for run in all_partition_set_runs: | for run in all_partition_set_runs: | ||||
partition_name = run.tags.get(PARTITION_NAME_TAG) | partition_name = run.tags.get(PARTITION_NAME_TAG) | ||||
if not partition_name or partition_name in runs_by_partition: | if not partition_name or partition_name in runs_by_partition: | ||||
# all_partition_set_runs is in descending order by creation time, we should ignore | # all_partition_set_runs is in descending order by creation time, we should ignore | ||||
# runs for the same partition if we've already considered the partition | # runs for the same partition if we've already considered the partition | ||||
continue | continue | ||||
runs_by_partition[partition_name] = run | runs_by_partition[partition_name] = run | ||||
return graphene_info.schema.type_named("PartitionStatuses")( | return PartitionStatuses( | ||||
results=[ | results=[ | ||||
graphene_info.schema.type_named("PartitionStatus")( | PartitionStatus( | ||||
id=f"{partition_set_name}:{partition_name}", | id=f"{partition_set_name}:{partition_name}", | ||||
partitionName=partition_name, | partitionName=partition_name, | ||||
runStatus=runs_by_partition[partition_name].status | runStatus=runs_by_partition[partition_name].status | ||||
if runs_by_partition.get(partition_name) | if runs_by_partition.get(partition_name) | ||||
else None, | else None, | ||||
) | ) | ||||
for partition_name in result.partition_names | for partition_name in result.partition_names | ||||
] | ] | ||||
) | ) |