Differential D5198 Diff 26024 python_modules/dagster/dagster/core/storage/event_log/sql_event_log.py
Changeset View
Changeset View
Standalone View
Standalone View
python_modules/dagster/dagster/core/storage/event_log/sql_event_log.py
Show First 20 Lines • Show All 206 Lines • ▼ Show 20 Lines | def get_stats_for_run(self, run_id): | ||||
materializations=counts.get(DagsterEventType.STEP_MATERIALIZATION.value, 0), | materializations=counts.get(DagsterEventType.STEP_MATERIALIZATION.value, 0), | ||||
expectations=counts.get(DagsterEventType.STEP_EXPECTATION_RESULT.value, 0), | expectations=counts.get(DagsterEventType.STEP_EXPECTATION_RESULT.value, 0), | ||||
start_time=datetime_as_float(start_time) if start_time else None, | start_time=datetime_as_float(start_time) if start_time else None, | ||||
end_time=datetime_as_float(end_time) if end_time else None, | end_time=datetime_as_float(end_time) if end_time else None, | ||||
) | ) | ||||
except (seven.JSONDecodeError, check.CheckError) as err: | except (seven.JSONDecodeError, check.CheckError) as err: | ||||
six.raise_from(DagsterEventLogInvalidForRun(run_id=run_id), err) | six.raise_from(DagsterEventLogInvalidForRun(run_id=run_id), err) | ||||
def get_step_stats_for_run(self, run_id, step_keys=None): | def get_step_stats_for_run(self, run_id, step_keys=None): | ||||
check.str_param(run_id, "run_id") | check.str_param(run_id, "run_id") | ||||
check.opt_list_param(step_keys, "step_keys", of_type=str) | check.opt_list_param(step_keys, "step_keys", of_type=str) | ||||
STEP_STATS_EVENT_TYPES = [ | STEP_STATS_EVENT_TYPES = [ | ||||
DagsterEventType.STEP_START.value, | DagsterEventType.STEP_START.value, | ||||
DagsterEventType.STEP_SUCCESS.value, | DagsterEventType.STEP_SUCCESS.value, | ||||
DagsterEventType.STEP_SKIPPED.value, | DagsterEventType.STEP_SKIPPED.value, | ||||
DagsterEventType.STEP_FAILURE.value, | DagsterEventType.STEP_FAILURE.value, | ||||
DagsterEventType.STEP_RESTARTED.value, | DagsterEventType.STEP_RESTARTED.value, | ||||
sashank: Don't need this here since we don't use it | |||||
] | ] | ||||
by_step_query = ( | by_step_query = ( | ||||
db.select( | db.select( | ||||
[ | [ | ||||
SqlEventLogStorageTable.c.step_key, | SqlEventLogStorageTable.c.step_key, | ||||
SqlEventLogStorageTable.c.dagster_event_type, | SqlEventLogStorageTable.c.dagster_event_type, | ||||
db.func.max(SqlEventLogStorageTable.c.timestamp).label("timestamp"), | db.func.max(SqlEventLogStorageTable.c.timestamp).label("timestamp"), | ||||
db.func.count(SqlEventLogStorageTable.c.id).label("count"), | |||||
Not Done Inline Actions[1] prha: [1] | |||||
] | ] | ||||
) | ) | ||||
.where(SqlEventLogStorageTable.c.run_id == run_id) | .where(SqlEventLogStorageTable.c.run_id == run_id) | ||||
.where(SqlEventLogStorageTable.c.step_key != None) | .where(SqlEventLogStorageTable.c.step_key != None) | ||||
.where(SqlEventLogStorageTable.c.dagster_event_type.in_(STEP_STATS_EVENT_TYPES)) | .where(SqlEventLogStorageTable.c.dagster_event_type.in_(STEP_STATS_EVENT_TYPES)) | ||||
) | ) | ||||
if step_keys: | if step_keys: | ||||
by_step_query = by_step_query.where(SqlEventLogStorageTable.c.step_key.in_(step_keys)) | by_step_query = by_step_query.where(SqlEventLogStorageTable.c.step_key.in_(step_keys)) | ||||
by_step_query = by_step_query.group_by( | by_step_query = by_step_query.group_by( | ||||
SqlEventLogStorageTable.c.step_key, SqlEventLogStorageTable.c.dagster_event_type, | SqlEventLogStorageTable.c.step_key, SqlEventLogStorageTable.c.dagster_event_type, | ||||
) | ) | ||||
with self.connect(run_id) as conn: | with self.connect(run_id) as conn: | ||||
results = conn.execute(by_step_query).fetchall() | results = conn.execute(by_step_query).fetchall() | ||||
by_step_key = defaultdict(dict) | by_step_key = defaultdict(dict) | ||||
for result in results: | for result in results: | ||||
step_key = result.step_key | step_key = result.step_key | ||||
if result.dagster_event_type == DagsterEventType.STEP_START.value: | if result.dagster_event_type == DagsterEventType.STEP_START.value: | ||||
by_step_key[step_key]["start_time"] = ( | by_step_key[step_key]["start_time"] = ( | ||||
datetime_as_float(result.timestamp) if result.timestamp else None | datetime_as_float(result.timestamp) if result.timestamp else None | ||||
) | ) | ||||
by_step_key[step_key]["attempts"] = 1 | by_step_key[step_key]["attempts"] = by_step_key[step_key].get("attempts", 0) + 1 | ||||
if result.dagster_event_type == DagsterEventType.STEP_RESTARTED.value: | |||||
by_step_key[step_key]["attempts"] = ( | |||||
# In case we see step retarted events but not a step started event, we want to | |||||
# only count the restarted events, since the attempt count represents | |||||
# the number of times we have successfully started runnning the step | |||||
by_step_key[step_key].get("attempts", 0) | |||||
+ result.count | |||||
) | |||||
if result.dagster_event_type == DagsterEventType.STEP_FAILURE.value: | if result.dagster_event_type == DagsterEventType.STEP_FAILURE.value: | ||||
by_step_key[step_key]["end_time"] = ( | by_step_key[step_key]["end_time"] = ( | ||||
datetime_as_float(result.timestamp) if result.timestamp else None | datetime_as_float(result.timestamp) if result.timestamp else None | ||||
) | ) | ||||
by_step_key[step_key]["status"] = StepEventStatus.FAILURE | by_step_key[step_key]["status"] = StepEventStatus.FAILURE | ||||
if result.dagster_event_type == DagsterEventType.STEP_SUCCESS.value: | if result.dagster_event_type == DagsterEventType.STEP_SUCCESS.value: | ||||
by_step_key[step_key]["end_time"] = ( | by_step_key[step_key]["end_time"] = ( | ||||
datetime_as_float(result.timestamp) if result.timestamp else None | datetime_as_float(result.timestamp) if result.timestamp else None | ||||
) | ) | ||||
by_step_key[step_key]["status"] = StepEventStatus.SUCCESS | by_step_key[step_key]["status"] = StepEventStatus.SUCCESS | ||||
if result.dagster_event_type == DagsterEventType.STEP_SKIPPED.value: | if result.dagster_event_type == DagsterEventType.STEP_SKIPPED.value: | ||||
by_step_key[step_key]["end_time"] = ( | by_step_key[step_key]["end_time"] = ( | ||||
datetime_as_float(result.timestamp) if result.timestamp else None | datetime_as_float(result.timestamp) if result.timestamp else None | ||||
) | ) | ||||
by_step_key[step_key]["status"] = StepEventStatus.SKIPPED | by_step_key[step_key]["status"] = StepEventStatus.SKIPPED | ||||
materializations = defaultdict(list) | materializations = defaultdict(list) | ||||
expectation_results = defaultdict(list) | expectation_results = defaultdict(list) | ||||
raw_event_query = ( | raw_event_query = ( | ||||
db.select([SqlEventLogStorageTable.c.event]) | db.select([SqlEventLogStorageTable.c.event]) | ||||
.where(SqlEventLogStorageTable.c.run_id == run_id) | .where(SqlEventLogStorageTable.c.run_id == run_id) | ||||
.where(SqlEventLogStorageTable.c.step_key != None) | .where(SqlEventLogStorageTable.c.step_key != None) | ||||
.where( | .where( | ||||
SqlEventLogStorageTable.c.dagster_event_type.in_( | SqlEventLogStorageTable.c.dagster_event_type.in_( | ||||
[ | [ | ||||
DagsterEventType.STEP_RESTARTED.value, | |||||
DagsterEventType.STEP_MATERIALIZATION.value, | DagsterEventType.STEP_MATERIALIZATION.value, | ||||
Not Done Inline Actionswe don't need this... we can just add the count to the by_step_query in line 232, right? prha: we don't need this... we can just add the count to the by_step_query in line 232, right? | |||||
Done Inline Actionsunfortunately that query does a group by event type, so all the restart events get grouped. we need to be able to count each restart event sashank: unfortunately that query does a group by event type, so all the restart events get grouped. we… | |||||
Done Inline Actionswhich is why it's down here with materializations and step expectation results, where you can also have > 1 of the event type per step key sashank: which is why it's down here with materializations and step expectation results, where you can… | |||||
Not Done Inline ActionsBut couldn't you add a count to the selection and just use that? e.g. by_step_query = ( db.select( [ SqlEventLogStorageTable.c.step_key, SqlEventLogStorageTable.c.dagster_event_type, db.func.max(SqlEventLogStorageTable.c.timestamp).label("timestamp"), db.func.count(SqlEventLogStorageTable.c.id).label("count"), ] ) .where(SqlEventLogStorageTable.c.run_id == run_id) .where(SqlEventLogStorageTable.c.step_key != None) .where(SqlEventLogStorageTable.c.dagster_event_type.in_(STEP_STATS_EVENT_TYPES)) ) prha: But couldn't you add a count to the selection and just use that?
e.g.
```… | |||||
Done Inline Actionsgood call sashank: good call | |||||
DagsterEventType.STEP_EXPECTATION_RESULT.value, | DagsterEventType.STEP_EXPECTATION_RESULT.value, | ||||
] | ] | ||||
) | ) | ||||
) | ) | ||||
.order_by(SqlEventLogStorageTable.c.id.asc()) | .order_by(SqlEventLogStorageTable.c.id.asc()) | ||||
) | ) | ||||
if step_keys: | |||||
raw_event_query = raw_event_query.where( | |||||
SqlEventLogStorageTable.c.step_key.in_(step_keys) | |||||
) | |||||
with self.connect(run_id) as conn: | with self.connect(run_id) as conn: | ||||
results = conn.execute(raw_event_query).fetchall() | results = conn.execute(raw_event_query).fetchall() | ||||
try: | try: | ||||
for (json_str,) in results: | for (json_str,) in results: | ||||
event = check.inst_param( | event = check.inst_param( | ||||
deserialize_json_to_dagster_namedtuple(json_str), "event", EventRecord | deserialize_json_to_dagster_namedtuple(json_str), "event", EventRecord | ||||
) | ) | ||||
if event.dagster_event.event_type == DagsterEventType.STEP_RESTARTED: | if event.dagster_event.event_type == DagsterEventType.STEP_MATERIALIZATION: | ||||
Done Inline Actionsshould we assume 1 instead of 0 here ? Im still not sure how we got a restart without a start event alangenfeld: should we assume 1 instead of 0 here ? Im still not sure how we got a restart without a start… | |||||
Not Done Inline Actionsmaybe do 1, maybe leave a comment? alangenfeld: maybe do 1, maybe leave a comment? | |||||
by_step_key[event.step_key]["attempts"] = ( | |||||
by_step_key[event.step_key].get("attempts") + 1 | |||||
) | |||||
elif event.dagster_event.event_type == DagsterEventType.STEP_MATERIALIZATION: | |||||
materializations[event.step_key].append( | materializations[event.step_key].append( | ||||
event.dagster_event.event_specific_data.materialization | event.dagster_event.event_specific_data.materialization | ||||
) | ) | ||||
elif event.dagster_event.event_type == DagsterEventType.STEP_EXPECTATION_RESULT: | elif event.dagster_event.event_type == DagsterEventType.STEP_EXPECTATION_RESULT: | ||||
expectation_results[event.step_key].append( | expectation_results[event.step_key].append( | ||||
event.dagster_event.event_specific_data.expectation_result | event.dagster_event.event_specific_data.expectation_result | ||||
) | ) | ||||
except (seven.JSONDecodeError, check.CheckError) as err: | except (seven.JSONDecodeError, check.CheckError) as err: | ||||
six.raise_from(DagsterEventLogInvalidForRun(run_id=run_id), err) | six.raise_from(DagsterEventLogInvalidForRun(run_id=run_id), err) | ||||
return [ | return [ | ||||
RunStepKeyStatsSnapshot( | RunStepKeyStatsSnapshot( | ||||
run_id=run_id, | run_id=run_id, | ||||
step_key=step_key, | step_key=step_key, | ||||
status=value.get("status"), | status=value.get("status"), | ||||
start_time=value.get("start_time"), | start_time=value.get("start_time"), | ||||
end_time=value.get("end_time"), | end_time=value.get("end_time"), | ||||
materializations=materializations.get(step_key), | materializations=materializations.get(step_key), | ||||
expectation_results=expectation_results.get(step_key), | expectation_results=expectation_results.get(step_key), | ||||
attempts=value.get("attempts"), | attempts=value.get("attempts"), | ||||
) | ) | ||||
for step_key, value in by_step_key.items() | for step_key, value in by_step_key.items() | ||||
] | ] | ||||
Done Inline Actionshmm alangenfeld: hmm | |||||
def wipe(self): | def wipe(self): | ||||
"""Clears the event log storage.""" | """Clears the event log storage.""" | ||||
# Should be overridden by SqliteEventLogStorage and other storages that shard based on | # Should be overridden by SqliteEventLogStorage and other storages that shard based on | ||||
# run_id | # run_id | ||||
# https://stackoverflow.com/a/54386260/324449 | # https://stackoverflow.com/a/54386260/324449 | ||||
with self.connect() as conn: | with self.connect() as conn: | ||||
conn.execute(SqlEventLogStorageTable.delete()) # pylint: disable=no-value-for-parameter | conn.execute(SqlEventLogStorageTable.delete()) # pylint: disable=no-value-for-parameter | ||||
▲ Show 20 Lines • Show All 352 Lines • Show Last 20 Lines |
Don't need this here since we don't use it