Differential D8385 Diff 41667 python_modules/dagster/dagster/daemon/run_coordinator/queued_run_coordinator_daemon.py
Changeset View
Changeset View
Standalone View
Standalone View
python_modules/dagster/dagster/daemon/run_coordinator/queued_run_coordinator_daemon.py
import logging | |||||
import sys | import sys | ||||
import time | |||||
from collections import defaultdict | from collections import defaultdict | ||||
from dagster import DagsterEvent, DagsterEventType, check | from dagster import DagsterEvent, DagsterEventType, check | ||||
from dagster.core.events.log import EventLogEntry | |||||
from dagster.core.instance import DagsterInstance | from dagster.core.instance import DagsterInstance | ||||
from dagster.core.storage.pipeline_run import ( | from dagster.core.storage.pipeline_run import ( | ||||
IN_PROGRESS_RUN_STATUSES, | IN_PROGRESS_RUN_STATUSES, | ||||
PipelineRun, | PipelineRun, | ||||
PipelineRunStatus, | PipelineRunStatus, | ||||
PipelineRunsFilter, | PipelineRunsFilter, | ||||
) | ) | ||||
from dagster.core.storage.tags import PRIORITY_TAG | from dagster.core.storage.tags import PRIORITY_TAG | ||||
▲ Show 20 Lines • Show All 171 Lines • ▼ Show 20 Lines | def _dequeue_run(self, instance, run, workspace): | ||||
) | ) | ||||
) | ) | ||||
return | return | ||||
dequeued_event = DagsterEvent( | dequeued_event = DagsterEvent( | ||||
event_type_value=DagsterEventType.PIPELINE_DEQUEUED.value, | event_type_value=DagsterEventType.PIPELINE_DEQUEUED.value, | ||||
pipeline_name=run.pipeline_name, | pipeline_name=run.pipeline_name, | ||||
) | ) | ||||
event_record = EventLogEntry( | instance.report_dagster_event(dequeued_event, run) | ||||
message="", | |||||
user_message="", | |||||
level=logging.INFO, | |||||
pipeline_name=run.pipeline_name, | |||||
run_id=run.run_id, | |||||
error_info=None, | |||||
timestamp=time.time(), | |||||
dagster_event=dequeued_event, | |||||
) | |||||
instance.handle_new_event(event_record) | |||||
instance.launch_run(run.run_id, workspace) | instance.launch_run(run.run_id, workspace) |