Changeset View
Changeset View
Standalone View
Standalone View
python_modules/dagster/dagster/core/storage/runs/sql_run_storage.py
import logging | import logging | ||||
import zlib | import zlib | ||||
from abc import abstractmethod | from abc import abstractmethod | ||||
from collections import defaultdict | from collections import defaultdict | ||||
from datetime import datetime | |||||
from enum import Enum | from enum import Enum | ||||
import six | import six | ||||
import sqlalchemy as db | import sqlalchemy as db | ||||
from dagster import check | from dagster import check | ||||
from dagster.core.errors import DagsterRunAlreadyExists, DagsterSnapshotDoesNotExist | from dagster.core.errors import DagsterRunAlreadyExists, DagsterSnapshotDoesNotExist | ||||
from dagster.core.events import DagsterEvent, DagsterEventType | from dagster.core.events import DagsterEvent, DagsterEventType | ||||
from dagster.core.snap import ( | from dagster.core.snap import ( | ||||
ExecutionPlanSnapshot, | ExecutionPlanSnapshot, | ||||
PipelineSnapshot, | PipelineSnapshot, | ||||
create_execution_plan_snapshot_id, | create_execution_plan_snapshot_id, | ||||
create_pipeline_snapshot_id, | create_pipeline_snapshot_id, | ||||
) | ) | ||||
from dagster.core.storage.tags import ROOT_RUN_ID_TAG | from dagster.core.storage.tags import ROOT_RUN_ID_TAG | ||||
from dagster.serdes import deserialize_json_to_dagster_namedtuple, serialize_dagster_namedtuple | from dagster.serdes import deserialize_json_to_dagster_namedtuple, serialize_dagster_namedtuple | ||||
from dagster.seven import JSONDecodeError | from dagster.seven import JSONDecodeError, get_current_datetime_in_utc | ||||
from dagster.utils import merge_dicts | from dagster.utils import merge_dicts | ||||
from ..pipeline_run import PipelineRun, PipelineRunStatus, PipelineRunsFilter | from ..pipeline_run import PipelineRun, PipelineRunStatus, PipelineRunsFilter | ||||
from .base import RunStorage | from .base import RunStorage | ||||
from .schema import RunTagsTable, RunsTable, SnapshotsTable | from .schema import RunTagsTable, RunsTable, SnapshotsTable | ||||
class SnapshotType(Enum): | class SnapshotType(Enum): | ||||
▲ Show 20 Lines • Show All 89 Lines • ▼ Show 20 Lines | def handle_run_event(self, run_id, event): | ||||
with self.connect() as conn: | with self.connect() as conn: | ||||
conn.execute( | conn.execute( | ||||
RunsTable.update() # pylint: disable=no-value-for-parameter | RunsTable.update() # pylint: disable=no-value-for-parameter | ||||
.where(RunsTable.c.run_id == run_id) | .where(RunsTable.c.run_id == run_id) | ||||
.values( | .values( | ||||
status=new_pipeline_status.value, | status=new_pipeline_status.value, | ||||
run_body=serialize_dagster_namedtuple(run.with_status(new_pipeline_status)), | run_body=serialize_dagster_namedtuple(run.with_status(new_pipeline_status)), | ||||
update_timestamp=datetime.now(), | update_timestamp=get_current_datetime_in_utc(), | ||||
) | ) | ||||
) | ) | ||||
def _row_to_run(self, row): | def _row_to_run(self, row): | ||||
return deserialize_json_to_dagster_namedtuple(row[0]) | return deserialize_json_to_dagster_namedtuple(row[0]) | ||||
def _rows_to_runs(self, rows): | def _rows_to_runs(self, rows): | ||||
return list(map(self._row_to_run, rows)) | return list(map(self._row_to_run, rows)) | ||||
▲ Show 20 Lines • Show All 121 Lines • ▼ Show 20 Lines | def add_run_tags(self, run_id, new_tags): | ||||
with self.connect() as conn: | with self.connect() as conn: | ||||
conn.execute( | conn.execute( | ||||
RunsTable.update() # pylint: disable=no-value-for-parameter | RunsTable.update() # pylint: disable=no-value-for-parameter | ||||
.where(RunsTable.c.run_id == run_id) | .where(RunsTable.c.run_id == run_id) | ||||
.values( | .values( | ||||
run_body=serialize_dagster_namedtuple( | run_body=serialize_dagster_namedtuple( | ||||
run.with_tags(merge_dicts(current_tags, new_tags)) | run.with_tags(merge_dicts(current_tags, new_tags)) | ||||
), | ), | ||||
update_timestamp=datetime.now(), | update_timestamp=get_current_datetime_in_utc(), | ||||
) | ) | ||||
) | ) | ||||
current_tags_set = set(current_tags.keys()) | current_tags_set = set(current_tags.keys()) | ||||
new_tags_set = set(new_tags.keys()) | new_tags_set = set(new_tags.keys()) | ||||
existing_tags = current_tags_set & new_tags_set | existing_tags = current_tags_set & new_tags_set | ||||
added_tags = new_tags_set.difference(existing_tags) | added_tags = new_tags_set.difference(existing_tags) | ||||
▲ Show 20 Lines • Show All 294 Lines • Show Last 20 Lines |