Changeset View
Changeset View
Standalone View
Standalone View
python_modules/dagster/dagster/api/snapshot_schedule.py
from datetime import datetime | import pendulum | ||||
from dagster import check | from dagster import check | ||||
from dagster.core.host_representation.external_data import ( | from dagster.core.host_representation.external_data import ( | ||||
ExternalScheduleExecutionData, | ExternalScheduleExecutionData, | ||||
ExternalScheduleExecutionErrorData, | ExternalScheduleExecutionErrorData, | ||||
) | ) | ||||
from dagster.core.host_representation.handle import RepositoryHandle | from dagster.core.host_representation.handle import RepositoryHandle | ||||
from dagster.core.types.loadable_target_origin import LoadableTargetOrigin | from dagster.core.types.loadable_target_origin import LoadableTargetOrigin | ||||
from dagster.grpc.types import ExternalScheduleExecutionArgs, ScheduleExecutionDataMode | from dagster.grpc.types import ExternalScheduleExecutionArgs, ScheduleExecutionDataMode | ||||
from dagster.seven import get_timestamp_from_utc_datetime | |||||
from .utils import execute_unary_api_cli_command | from .utils import execute_unary_api_cli_command | ||||
def sync_get_external_schedule_execution_data( | def sync_get_external_schedule_execution_data( | ||||
instance, | instance, | ||||
repository_handle, | repository_handle, | ||||
schedule_name, | schedule_name, | ||||
schedule_execution_data_mode, | schedule_execution_data_mode, | ||||
scheduled_execution_datetime_utc, | scheduled_execution_time, | ||||
): | ): | ||||
check.inst_param(repository_handle, "repository_handle", RepositoryHandle) | check.inst_param(repository_handle, "repository_handle", RepositoryHandle) | ||||
check.str_param(schedule_name, "schedule_name") | check.str_param(schedule_name, "schedule_name") | ||||
check.opt_inst_param( | check.opt_inst_param(scheduled_execution_time, "scheduled_execution_time", pendulum.Pendulum) | ||||
scheduled_execution_datetime_utc, "scheduled_execution_datetime_utc", datetime | |||||
) | |||||
check.inst_param( | check.inst_param( | ||||
schedule_execution_data_mode, "schedule_execution_data_mode", ScheduleExecutionDataMode | schedule_execution_data_mode, "schedule_execution_data_mode", ScheduleExecutionDataMode | ||||
) | ) | ||||
origin = repository_handle.get_origin() | origin = repository_handle.get_origin() | ||||
return check.inst( | return check.inst( | ||||
execute_unary_api_cli_command( | execute_unary_api_cli_command( | ||||
origin.executable_path, | origin.executable_path, | ||||
"schedule_config", | "schedule_config", | ||||
ExternalScheduleExecutionArgs( | ExternalScheduleExecutionArgs( | ||||
repository_origin=origin, | repository_origin=origin, | ||||
instance_ref=instance.get_ref(), | instance_ref=instance.get_ref(), | ||||
schedule_name=schedule_name, | schedule_name=schedule_name, | ||||
scheduled_execution_timestamp_utc=( | |||||
get_timestamp_from_utc_datetime(scheduled_execution_datetime_utc) | |||||
if scheduled_execution_datetime_utc | |||||
else None | |||||
), | |||||
schedule_execution_data_mode=schedule_execution_data_mode, | schedule_execution_data_mode=schedule_execution_data_mode, | ||||
scheduled_execution_timestamp=scheduled_execution_time.timestamp() | |||||
if scheduled_execution_time | |||||
else None, | |||||
scheduled_execution_timezone=scheduled_execution_time.timezone.name | |||||
if scheduled_execution_time | |||||
else None, | |||||
), | ), | ||||
), | ), | ||||
(ExternalScheduleExecutionData, ExternalScheduleExecutionErrorData), | (ExternalScheduleExecutionData, ExternalScheduleExecutionErrorData), | ||||
) | ) | ||||
def sync_get_external_schedule_execution_data_ephemeral_grpc( | def sync_get_external_schedule_execution_data_ephemeral_grpc( | ||||
instance, | instance, | ||||
repository_handle, | repository_handle, | ||||
schedule_name, | schedule_name, | ||||
schedule_execution_data_mode, | schedule_execution_data_mode, | ||||
scheduled_execution_datetime_utc, | scheduled_execution_time, | ||||
): | ): | ||||
from dagster.grpc.client import ephemeral_grpc_api_client | from dagster.grpc.client import ephemeral_grpc_api_client | ||||
origin = repository_handle.get_origin() | origin = repository_handle.get_origin() | ||||
with ephemeral_grpc_api_client( | with ephemeral_grpc_api_client( | ||||
LoadableTargetOrigin(executable_path=origin.executable_path) | LoadableTargetOrigin(executable_path=origin.executable_path) | ||||
) as api_client: | ) as api_client: | ||||
return sync_get_external_schedule_execution_data_grpc( | return sync_get_external_schedule_execution_data_grpc( | ||||
api_client, | api_client, | ||||
instance, | instance, | ||||
repository_handle, | repository_handle, | ||||
schedule_name, | schedule_name, | ||||
schedule_execution_data_mode, | schedule_execution_data_mode, | ||||
scheduled_execution_datetime_utc, | scheduled_execution_time, | ||||
) | ) | ||||
def sync_get_external_schedule_execution_data_grpc( | def sync_get_external_schedule_execution_data_grpc( | ||||
api_client, | api_client, | ||||
instance, | instance, | ||||
repository_handle, | repository_handle, | ||||
schedule_name, | schedule_name, | ||||
schedule_execution_data_mode, | schedule_execution_data_mode, | ||||
scheduled_execution_datetime_utc, | scheduled_execution_time, | ||||
): | ): | ||||
check.inst_param(repository_handle, "repository_handle", RepositoryHandle) | check.inst_param(repository_handle, "repository_handle", RepositoryHandle) | ||||
check.str_param(schedule_name, "schedule_name") | check.str_param(schedule_name, "schedule_name") | ||||
check.opt_inst_param( | |||||
scheduled_execution_datetime_utc, "scheduled_execution_datetime_utc", datetime | |||||
) | |||||
check.inst_param( | check.inst_param( | ||||
schedule_execution_data_mode, "schedule_execution_data_mode", ScheduleExecutionDataMode | schedule_execution_data_mode, "schedule_execution_data_mode", ScheduleExecutionDataMode | ||||
) | ) | ||||
check.opt_inst_param(scheduled_execution_time, "scheduled_execution_time", pendulum.Pendulum) | |||||
origin = repository_handle.get_origin() | origin = repository_handle.get_origin() | ||||
return check.inst( | return check.inst( | ||||
api_client.external_schedule_execution( | api_client.external_schedule_execution( | ||||
external_schedule_execution_args=ExternalScheduleExecutionArgs( | external_schedule_execution_args=ExternalScheduleExecutionArgs( | ||||
repository_origin=origin, | repository_origin=origin, | ||||
instance_ref=instance.get_ref(), | instance_ref=instance.get_ref(), | ||||
schedule_name=schedule_name, | schedule_name=schedule_name, | ||||
scheduled_execution_timestamp_utc=( | |||||
get_timestamp_from_utc_datetime(scheduled_execution_datetime_utc) | |||||
if scheduled_execution_datetime_utc | |||||
else None | |||||
), | |||||
schedule_execution_data_mode=schedule_execution_data_mode, | schedule_execution_data_mode=schedule_execution_data_mode, | ||||
scheduled_execution_timestamp=scheduled_execution_time.timestamp() | |||||
if scheduled_execution_time | |||||
else None, | |||||
scheduled_execution_timezone=scheduled_execution_time.timezone.name | |||||
if scheduled_execution_time | |||||
else None, | |||||
) | ) | ||||
), | ), | ||||
(ExternalScheduleExecutionData, ExternalScheduleExecutionErrorData), | (ExternalScheduleExecutionData, ExternalScheduleExecutionErrorData), | ||||
) | ) |