Differential D4671 Diff 23887 python_modules/dagster/dagster/core/host_representation/repository_location.py
Changeset View
Changeset View
Standalone View
Standalone View
python_modules/dagster/dagster/core/host_representation/repository_location.py
import datetime | |||||
from abc import ABCMeta, abstractmethod, abstractproperty | from abc import ABCMeta, abstractmethod, abstractproperty | ||||
from datetime import datetime | |||||
import pendulum | |||||
import six | import six | ||||
from dagster import check | from dagster import check | ||||
from dagster.api.snapshot_executable import ( | from dagster.api.snapshot_executable import ( | ||||
sync_get_external_executable_params, | sync_get_external_executable_params, | ||||
sync_get_external_executable_params_grpc, | sync_get_external_executable_params_grpc, | ||||
) | ) | ||||
from dagster.api.snapshot_execution_plan import sync_get_external_execution_plan_grpc | from dagster.api.snapshot_execution_plan import sync_get_external_execution_plan_grpc | ||||
▲ Show 20 Lines • Show All 47 Lines • ▼ Show 20 Lines | |||||
from dagster.grpc.types import ( | from dagster.grpc.types import ( | ||||
ExternalExecutableArgs, | ExternalExecutableArgs, | ||||
ExternalScheduleExecutionArgs, | ExternalScheduleExecutionArgs, | ||||
PartitionArgs, | PartitionArgs, | ||||
PartitionNamesArgs, | PartitionNamesArgs, | ||||
PartitionSetExecutionParamArgs, | PartitionSetExecutionParamArgs, | ||||
ScheduleExecutionDataMode, | ScheduleExecutionDataMode, | ||||
) | ) | ||||
from dagster.seven import get_timestamp_from_utc_datetime | |||||
from dagster.utils.hosted_user_process import external_repo_from_def, recon_repository_from_origin | from dagster.utils.hosted_user_process import external_repo_from_def, recon_repository_from_origin | ||||
from .selector import PipelineSelector | from .selector import PipelineSelector | ||||
class RepositoryLocation(six.with_metaclass(ABCMeta)): | class RepositoryLocation(six.with_metaclass(ABCMeta)): | ||||
""" | """ | ||||
A RepositoryLocation represents a target containing user code which has a set of Dagster | A RepositoryLocation represents a target containing user code which has a set of Dagster | ||||
▲ Show 20 Lines • Show All 85 Lines • ▼ Show 20 Lines | class RepositoryLocation(six.with_metaclass(ABCMeta)): | ||||
@abstractmethod | @abstractmethod | ||||
def get_external_schedule_execution_data( | def get_external_schedule_execution_data( | ||||
self, | self, | ||||
instance, | instance, | ||||
repository_handle, | repository_handle, | ||||
schedule_name, | schedule_name, | ||||
schedule_execution_data_mode, | schedule_execution_data_mode, | ||||
scheduled_execution_datetime_utc, | scheduled_execution_time, | ||||
): | ): | ||||
pass | pass | ||||
@abstractmethod | @abstractmethod | ||||
def get_external_executable_params(self, instance, repository_handle, name): | def get_external_executable_params(self, instance, repository_handle, name): | ||||
pass | pass | ||||
@abstractproperty | @abstractproperty | ||||
▲ Show 20 Lines • Show All 184 Lines • ▼ Show 20 Lines | def get_external_partition_names(self, repository_handle, partition_set_name): | ||||
return get_partition_names(args) | return get_partition_names(args) | ||||
def get_external_schedule_execution_data( | def get_external_schedule_execution_data( | ||||
self, | self, | ||||
instance, | instance, | ||||
repository_handle, | repository_handle, | ||||
schedule_name, | schedule_name, | ||||
schedule_execution_data_mode, | schedule_execution_data_mode, | ||||
scheduled_execution_datetime_utc, | scheduled_execution_time, | ||||
): | ): | ||||
check.inst_param(instance, "instance", DagsterInstance) | check.inst_param(instance, "instance", DagsterInstance) | ||||
check.inst_param(repository_handle, "repository_handle", RepositoryHandle) | check.inst_param(repository_handle, "repository_handle", RepositoryHandle) | ||||
check.str_param(schedule_name, "schedule_name") | check.str_param(schedule_name, "schedule_name") | ||||
check.inst_param( | check.inst_param( | ||||
schedule_execution_data_mode, "schedule_execution_data_mode", ScheduleExecutionDataMode | schedule_execution_data_mode, "schedule_execution_data_mode", ScheduleExecutionDataMode | ||||
) | ) | ||||
check.opt_inst_param( | check.opt_inst_param( | ||||
scheduled_execution_datetime_utc, "scheduled_execution_datetime_utc", datetime | scheduled_execution_time, "scheduled_execution_time", pendulum.Pendulum | ||||
) | ) | ||||
repo_origin = repository_handle.get_origin() | repo_origin = repository_handle.get_origin() | ||||
args = ExternalScheduleExecutionArgs( | args = ExternalScheduleExecutionArgs( | ||||
instance_ref=instance.get_ref(), | instance_ref=instance.get_ref(), | ||||
repository_origin=repo_origin, | repository_origin=repo_origin, | ||||
schedule_name=schedule_name, | schedule_name=schedule_name, | ||||
scheduled_execution_timestamp_utc=( | |||||
get_timestamp_from_utc_datetime(scheduled_execution_datetime_utc) | |||||
if scheduled_execution_datetime_utc | |||||
else None | |||||
), | |||||
schedule_execution_data_mode=schedule_execution_data_mode, | schedule_execution_data_mode=schedule_execution_data_mode, | ||||
scheduled_execution_timestamp=scheduled_execution_time.timestamp() | |||||
if scheduled_execution_time | |||||
else None, | |||||
scheduled_execution_timezone=scheduled_execution_time.timezone.name | |||||
if scheduled_execution_time | |||||
else None, | |||||
) | ) | ||||
recon_repo = recon_repository_from_origin(repo_origin) | recon_repo = recon_repository_from_origin(repo_origin) | ||||
return get_external_schedule_execution(recon_repo, args) | return get_external_schedule_execution(recon_repo, args) | ||||
def get_external_executable_params(self, instance, repository_handle, name): | def get_external_executable_params(self, instance, repository_handle, name): | ||||
check.inst_param(instance, "instance", DagsterInstance) | check.inst_param(instance, "instance", DagsterInstance) | ||||
check.inst_param(repository_handle, "repository_handle", RepositoryHandle) | check.inst_param(repository_handle, "repository_handle", RepositoryHandle) | ||||
check.str_param(name, "name") | check.str_param(name, "name") | ||||
▲ Show 20 Lines • Show All 150 Lines • ▼ Show 20 Lines | def get_external_partition_names(self, repository_handle, partition_set_name): | ||||
) | ) | ||||
def get_external_schedule_execution_data( | def get_external_schedule_execution_data( | ||||
self, | self, | ||||
instance, | instance, | ||||
repository_handle, | repository_handle, | ||||
schedule_name, | schedule_name, | ||||
schedule_execution_data_mode, | schedule_execution_data_mode, | ||||
scheduled_execution_datetime_utc, | scheduled_execution_time, | ||||
): | ): | ||||
check.inst_param(instance, "instance", DagsterInstance) | check.inst_param(instance, "instance", DagsterInstance) | ||||
check.inst_param(repository_handle, "repository_handle", RepositoryHandle) | check.inst_param(repository_handle, "repository_handle", RepositoryHandle) | ||||
check.str_param(schedule_name, "schedule_name") | check.str_param(schedule_name, "schedule_name") | ||||
check.inst_param( | check.inst_param( | ||||
schedule_execution_data_mode, "schedule_execution_data_mode", ScheduleExecutionDataMode | schedule_execution_data_mode, "schedule_execution_data_mode", ScheduleExecutionDataMode | ||||
) | ) | ||||
check.opt_inst_param( | check.opt_inst_param( | ||||
scheduled_execution_datetime_utc, "scheduled_execution_datetime_utc", datetime | scheduled_execution_time, "scheduled_execution_time", datetime.datetime | ||||
) | ) | ||||
return sync_get_external_schedule_execution_data_grpc( | return sync_get_external_schedule_execution_data_grpc( | ||||
self._handle.client, | self._handle.client, | ||||
instance, | instance, | ||||
repository_handle, | repository_handle, | ||||
schedule_name, | schedule_name, | ||||
schedule_execution_data_mode, | schedule_execution_data_mode, | ||||
scheduled_execution_datetime_utc, | scheduled_execution_time, | ||||
) | ) | ||||
def get_external_executable_params(self, instance, repository_handle, name): | def get_external_executable_params(self, instance, repository_handle, name): | ||||
check.inst_param(instance, "instance", DagsterInstance) | check.inst_param(instance, "instance", DagsterInstance) | ||||
check.inst_param(repository_handle, "repository_handle", RepositoryHandle) | check.inst_param(repository_handle, "repository_handle", RepositoryHandle) | ||||
check.str_param(name, "name") | check.str_param(name, "name") | ||||
return sync_get_external_executable_params_grpc( | return sync_get_external_executable_params_grpc( | ||||
self._handle.client, instance, repository_handle, name | self._handle.client, instance, repository_handle, name | ||||
▲ Show 20 Lines • Show All 143 Lines • ▼ Show 20 Lines | def get_external_partition_names(self, repository_handle, partition_set_name): | ||||
return sync_get_external_partition_names(repository_handle, partition_set_name) | return sync_get_external_partition_names(repository_handle, partition_set_name) | ||||
def get_external_schedule_execution_data( | def get_external_schedule_execution_data( | ||||
self, | self, | ||||
instance, | instance, | ||||
repository_handle, | repository_handle, | ||||
schedule_name, | schedule_name, | ||||
schedule_execution_data_mode, | schedule_execution_data_mode, | ||||
scheduled_execution_datetime_utc, | scheduled_execution_time, | ||||
): | ): | ||||
check.inst_param(instance, "instance", DagsterInstance) | check.inst_param(instance, "instance", DagsterInstance) | ||||
check.inst_param(repository_handle, "repository_handle", RepositoryHandle) | check.inst_param(repository_handle, "repository_handle", RepositoryHandle) | ||||
check.str_param(schedule_name, "schedule_name") | check.str_param(schedule_name, "schedule_name") | ||||
check.inst_param( | check.inst_param( | ||||
schedule_execution_data_mode, "schedule_execution_data_mode", ScheduleExecutionDataMode | schedule_execution_data_mode, "schedule_execution_data_mode", ScheduleExecutionDataMode | ||||
) | ) | ||||
check.opt_inst_param( | check.opt_inst_param( | ||||
scheduled_execution_datetime_utc, "scheduled_execution_datetime_utc", datetime | scheduled_execution_time, "scheduled_execution_time", datetime.datetime | ||||
) | ) | ||||
return sync_get_external_schedule_execution_data( | return sync_get_external_schedule_execution_data( | ||||
instance, | instance, | ||||
repository_handle, | repository_handle, | ||||
schedule_name, | schedule_name, | ||||
schedule_execution_data_mode, | schedule_execution_data_mode, | ||||
scheduled_execution_datetime_utc, | scheduled_execution_time, | ||||
) | ) | ||||
def get_external_executable_params(self, instance, repository_handle, name): | def get_external_executable_params(self, instance, repository_handle, name): | ||||
check.inst_param(instance, "instance", DagsterInstance) | check.inst_param(instance, "instance", DagsterInstance) | ||||
check.inst_param(repository_handle, "repository_handle", RepositoryHandle) | check.inst_param(repository_handle, "repository_handle", RepositoryHandle) | ||||
check.str_param(name, "name") | check.str_param(name, "name") | ||||
return sync_get_external_executable_params(instance, repository_handle, name) | return sync_get_external_executable_params(instance, repository_handle, name) | ||||
Show All 10 Lines |