Changeset View
Changeset View
Standalone View
Standalone View
python_modules/dagster/dagster/api/snapshot_schedule.py
from datetime import datetime | import pendulum | ||||
from dagster import check | from dagster import check | ||||
from dagster.core.host_representation.external_data import ( | from dagster.core.host_representation.external_data import ( | ||||
ExternalScheduleExecutionData, | ExternalScheduleExecutionData, | ||||
ExternalScheduleExecutionErrorData, | ExternalScheduleExecutionErrorData, | ||||
) | ) | ||||
from dagster.core.host_representation.handle import RepositoryHandle | from dagster.core.host_representation.handle import RepositoryHandle | ||||
from dagster.core.types.loadable_target_origin import LoadableTargetOrigin | from dagster.core.types.loadable_target_origin import LoadableTargetOrigin | ||||
from dagster.grpc.types import ExternalScheduleExecutionArgs, ScheduleExecutionDataMode | from dagster.grpc.types import ExternalScheduleExecutionArgs, ScheduleExecutionDataMode | ||||
from dagster.seven import get_timestamp_from_utc_datetime | |||||
from .utils import execute_unary_api_cli_command | from .utils import execute_unary_api_cli_command | ||||
def sync_get_external_schedule_execution_data( | def sync_get_external_schedule_execution_data( | ||||
instance, | instance, | ||||
repository_handle, | repository_handle, | ||||
schedule_name, | schedule_name, | ||||
schedule_execution_data_mode, | schedule_execution_data_mode, | ||||
scheduled_execution_datetime_utc, | scheduled_execution_datetime_utc, | ||||
): | ): | ||||
check.inst_param(repository_handle, "repository_handle", RepositoryHandle) | check.inst_param(repository_handle, "repository_handle", RepositoryHandle) | ||||
check.str_param(schedule_name, "schedule_name") | check.str_param(schedule_name, "schedule_name") | ||||
check.opt_inst_param( | check.opt_inst_param( | ||||
scheduled_execution_datetime_utc, "scheduled_execution_datetime_utc", datetime | scheduled_execution_datetime_utc, "scheduled_execution_datetime_utc", pendulum.DateTime | ||||
) | ) | ||||
check.inst_param( | check.inst_param( | ||||
schedule_execution_data_mode, "schedule_execution_data_mode", ScheduleExecutionDataMode | schedule_execution_data_mode, "schedule_execution_data_mode", ScheduleExecutionDataMode | ||||
) | ) | ||||
origin = repository_handle.get_origin() | origin = repository_handle.get_origin() | ||||
return check.inst( | return check.inst( | ||||
execute_unary_api_cli_command( | execute_unary_api_cli_command( | ||||
origin.executable_path, | origin.executable_path, | ||||
"schedule_config", | "schedule_config", | ||||
ExternalScheduleExecutionArgs( | ExternalScheduleExecutionArgs( | ||||
repository_origin=origin, | repository_origin=origin, | ||||
instance_ref=instance.get_ref(), | instance_ref=instance.get_ref(), | ||||
schedule_name=schedule_name, | schedule_name=schedule_name, | ||||
scheduled_execution_timestamp_utc=( | scheduled_execution_timestamp_utc=( | ||||
get_timestamp_from_utc_datetime(scheduled_execution_datetime_utc) | scheduled_execution_datetime_utc.float_timestamp | ||||
if scheduled_execution_datetime_utc | if scheduled_execution_datetime_utc | ||||
else None | else None | ||||
), | ), | ||||
schedule_execution_data_mode=schedule_execution_data_mode, | schedule_execution_data_mode=schedule_execution_data_mode, | ||||
), | ), | ||||
), | ), | ||||
(ExternalScheduleExecutionData, ExternalScheduleExecutionErrorData), | (ExternalScheduleExecutionData, ExternalScheduleExecutionErrorData), | ||||
) | ) | ||||
Show All 28 Lines | def sync_get_external_schedule_execution_data_grpc( | ||||
repository_handle, | repository_handle, | ||||
schedule_name, | schedule_name, | ||||
schedule_execution_data_mode, | schedule_execution_data_mode, | ||||
scheduled_execution_datetime_utc, | scheduled_execution_datetime_utc, | ||||
): | ): | ||||
check.inst_param(repository_handle, "repository_handle", RepositoryHandle) | check.inst_param(repository_handle, "repository_handle", RepositoryHandle) | ||||
check.str_param(schedule_name, "schedule_name") | check.str_param(schedule_name, "schedule_name") | ||||
check.opt_inst_param( | check.opt_inst_param( | ||||
scheduled_execution_datetime_utc, "scheduled_execution_datetime_utc", datetime | scheduled_execution_datetime_utc, "scheduled_execution_datetime_utc", pendulum.DateTime | ||||
) | ) | ||||
check.inst_param( | check.inst_param( | ||||
schedule_execution_data_mode, "schedule_execution_data_mode", ScheduleExecutionDataMode | schedule_execution_data_mode, "schedule_execution_data_mode", ScheduleExecutionDataMode | ||||
) | ) | ||||
origin = repository_handle.get_origin() | origin = repository_handle.get_origin() | ||||
return check.inst( | return check.inst( | ||||
api_client.external_schedule_execution( | api_client.external_schedule_execution( | ||||
external_schedule_execution_args=ExternalScheduleExecutionArgs( | external_schedule_execution_args=ExternalScheduleExecutionArgs( | ||||
repository_origin=origin, | repository_origin=origin, | ||||
instance_ref=instance.get_ref(), | instance_ref=instance.get_ref(), | ||||
schedule_name=schedule_name, | schedule_name=schedule_name, | ||||
scheduled_execution_timestamp_utc=( | scheduled_execution_timestamp_utc=( | ||||
get_timestamp_from_utc_datetime(scheduled_execution_datetime_utc) | scheduled_execution_datetime_utc.float_timestamp | ||||
if scheduled_execution_datetime_utc | if scheduled_execution_datetime_utc | ||||
else None | else None | ||||
), | ), | ||||
schedule_execution_data_mode=schedule_execution_data_mode, | schedule_execution_data_mode=schedule_execution_data_mode, | ||||
) | ) | ||||
), | ), | ||||
(ExternalScheduleExecutionData, ExternalScheduleExecutionErrorData), | (ExternalScheduleExecutionData, ExternalScheduleExecutionErrorData), | ||||
) | ) |