Differential D4743 Diff 24684 python_modules/dagster/dagster/core/storage/event_log/sql_event_log.py
Changeset View
Changeset View
Standalone View
Standalone View
python_modules/dagster/dagster/core/storage/event_log/sql_event_log.py
import logging | import logging | ||||
from abc import abstractmethod | from abc import abstractmethod | ||||
from collections import defaultdict | from collections import defaultdict | ||||
from datetime import datetime | from datetime import datetime | ||||
import six | import six | ||||
import sqlalchemy as db | import sqlalchemy as db | ||||
from dagster import check, seven | from dagster import check, seven | ||||
from dagster.core.definitions.events import AssetKey, Materialization | from dagster.core.definitions.events import AssetKey, Materialization | ||||
from dagster.core.errors import DagsterEventLogInvalidForRun | from dagster.core.errors import DagsterEventLogInvalidForRun | ||||
from dagster.core.events import DagsterEventType | from dagster.core.events import DagsterEventType | ||||
from dagster.core.events.log import EventRecord | from dagster.core.events.log import EventRecord | ||||
from dagster.core.execution.plan.objects import StepOutputHandle | from dagster.core.execution.plan.objects import StepOutputHandle | ||||
from dagster.core.execution.stats import RunStepKeyStatsSnapshot, StepEventStatus | from dagster.core.execution.stats import RunStepKeyStatsSnapshot, StepEventStatus | ||||
from dagster.serdes import deserialize_json_to_dagster_namedtuple, serialize_dagster_namedtuple | from dagster.serdes import deserialize_json_to_dagster_namedtuple, serialize_dagster_namedtuple | ||||
from dagster.seven import get_utc_timezone | |||||
from dagster.utils import datetime_as_float, utc_datetime_from_timestamp | from dagster.utils import datetime_as_float, utc_datetime_from_timestamp | ||||
from ..pipeline_run import PipelineRunStatsSnapshot | from ..pipeline_run import PipelineRunStatsSnapshot | ||||
from .base import AssetAwareEventLogStorage, EventLogStorage | from .base import AssetAwareEventLogStorage, EventLogStorage | ||||
from .migration import migrate_asset_key_data | from .migration import migrate_asset_key_data | ||||
from .schema import AssetKeyTable, SecondaryIndexMigrationTable, SqlEventLogStorageTable | from .schema import AssetKeyTable, SecondaryIndexMigrationTable, SqlEventLogStorageTable | ||||
from .version_addresses import get_addresses_for_step_output_versions_helper | from .version_addresses import get_addresses_for_step_output_versions_helper | ||||
SECONDARY_INDEX_ASSET_KEY = "asset_key_table" | SECONDARY_INDEX_ASSET_KEY = "asset_key_table" | ||||
REINDEX_DATA_MIGRATIONS = { | REINDEX_DATA_MIGRATIONS = { | ||||
SECONDARY_INDEX_ASSET_KEY: migrate_asset_key_data, | SECONDARY_INDEX_ASSET_KEY: migrate_asset_key_data, | ||||
} | } | ||||
utc = get_utc_timezone() | |||||
dgibson: So this adds get_utc_timezone calls in a bunch of new places - should we settle on using… | |||||
Done Inline Actionsi don't understand the logic behind get_utc_timezone at all -- i would define a single constant like dagster.seven.UTC given my druthers max: i don't understand the logic behind `get_utc_timezone` at all -- i would define a single… | |||||
class SqlEventLogStorage(EventLogStorage): | class SqlEventLogStorage(EventLogStorage): | ||||
"""Base class for SQL backed event log storages. | """Base class for SQL backed event log storages. | ||||
""" | """ | ||||
@abstractmethod | @abstractmethod | ||||
def connect(self, run_id=None): | def connect(self, run_id=None): | ||||
"""Context manager yielding a connection. | """Context manager yielding a connection. | ||||
▲ Show 20 Lines • Show All 163 Lines • ▼ Show 20 Lines | def get_stats_for_run(self, run_id): | ||||
) | ) | ||||
return PipelineRunStatsSnapshot( | return PipelineRunStatsSnapshot( | ||||
run_id=run_id, | run_id=run_id, | ||||
steps_succeeded=counts.get(DagsterEventType.STEP_SUCCESS.value, 0), | steps_succeeded=counts.get(DagsterEventType.STEP_SUCCESS.value, 0), | ||||
steps_failed=counts.get(DagsterEventType.STEP_FAILURE.value, 0), | steps_failed=counts.get(DagsterEventType.STEP_FAILURE.value, 0), | ||||
materializations=counts.get(DagsterEventType.STEP_MATERIALIZATION.value, 0), | materializations=counts.get(DagsterEventType.STEP_MATERIALIZATION.value, 0), | ||||
expectations=counts.get(DagsterEventType.STEP_EXPECTATION_RESULT.value, 0), | expectations=counts.get(DagsterEventType.STEP_EXPECTATION_RESULT.value, 0), | ||||
start_time=datetime_as_float(start_time) if start_time else None, | start_time=datetime_as_float(start_time.replace(tzinfo=utc)) | ||||
end_time=datetime_as_float(end_time) if end_time else None, | if start_time | ||||
else None, | |||||
end_time=datetime_as_float(end_time.replace(tzinfo=utc)) if end_time else None, | |||||
) | ) | ||||
except (seven.JSONDecodeError, check.CheckError) as err: | except (seven.JSONDecodeError, check.CheckError) as err: | ||||
six.raise_from(DagsterEventLogInvalidForRun(run_id=run_id), err) | six.raise_from(DagsterEventLogInvalidForRun(run_id=run_id), err) | ||||
def get_step_stats_for_run(self, run_id, step_keys=None): | def get_step_stats_for_run(self, run_id, step_keys=None): | ||||
check.str_param(run_id, "run_id") | check.str_param(run_id, "run_id") | ||||
check.opt_list_param(step_keys, "step_keys", of_type=str) | check.opt_list_param(step_keys, "step_keys", of_type=str) | ||||
▲ Show 20 Lines • Show All 421 Lines • Show Last 20 Lines |
So this adds get_utc_timezone calls in a bunch of new places - should we settle on using pendulum timezones instead since that is what get_current_datetime_in_utc uses, so that there's a single timezone representation in the codebase?