Page MenuHomeElementl

step-skipping bug - remove StepExecutionSkipped
ClosedPublic

Authored by yuhan on Mar 25 2021, 12:13 AM.

Details

Summary

This diff removes the StepExecutionSkipped flag and defers to the airflow+docker call site to tell if they should skip a step -- In D4997, we introduced StepExecutionSkipped, a serializable object (not a dagster event). The dagster api execute_step CLI will output it to signal if we need to skip a step.

Problem:
https://dagster.slack.com/archives/C014N0PK37E/p1616620216068400
with a k8s+celery executor, a user hit:

Execution of pipeline "my_pipeline" failed. An exception was thrown during execution.
AttributeError: 'StepExecutionSkipped' object has no attribute 'is_step_failure'
Stack Trace:
  File "/opt/pysetup/.venv/lib/python3.8/site-packages/dagster/core/execution/api.py", line 754, in pipeline_execution_iterator
    if event.is_step_failure:
An exception was thrown during execution that is likely a framework error, rather than an error in user code.
AttributeError: 'StepExecutionSkipped' object has no attribute 'is_step_failure'
Stack Trace:
  File "/opt/pysetup/.venv/lib/python3.8/site-packages/dagster/grpc/impl.py", line 76, in core_execute_run
    yield from execute_run_iterator(recon_pipeline, pipeline_run, instance)
,  File "/opt/pysetup/.venv/lib/python3.8/site-packages/dagster/core/execution/api.py", line 829, in __iter__
    yield from self.iterator(
,  File "/opt/pysetup/.venv/lib/python3.8/site-packages/dagster/core/execution/api.py", line 754, in pipeline_execution_iterator
    if event.is_step_failure:

Diagnosis:
bc we are parsing all the info sent back as dagster events, when dagster api execute_step outputs this non-dagster-event StepExecutionSkipped, the core path will break as above.

The root problem is the normal execution flow on the dagster api execute_step should be flagging this at all. This should only be hit by the airflow + docker case - because each dagster step maps to an airflow task and airflow+docker uses the execute_step cli path, i.e. skipping a step means skipping an airflow task, so this case doesn't have a run-level plan process to control the skipping logic.

Test Plan

bk

Diff Detail

Repository
R1 dagster
Lint
Lint Not Applicable
Unit
Tests Not Applicable

Event Timeline

yuhan retitled this revision from airflow skip to step-skipping bug - remove StepExecutionSkipped.Mar 25 2021, 12:49 AM
yuhan edited the summary of this revision. (Show Details)
yuhan edited the test plan for this revision. (Show Details)
yuhan added reviewers: alangenfeld, sandyryza.
yuhan requested review of this revision.Mar 25 2021, 1:08 AM

does the test we have cover the case where we have steps downstream of the skip that should also skip?

python_modules/libraries/dagster-airflow/dagster_airflow/operators/docker_operator.py
294

we should "check for skip" before we try to do the execute right?

307–308

comments - maybe refactor in to a method

309

does this ever fire now? cant hurt to keep it i suppose

to your queue

This revision now requires changes to proceed.Mar 25 2021, 2:31 PM
yuhan edited the summary of this revision. (Show Details)

up

some other logic in this operator and the python operator also seem a bit skeptical to me, e.g. we don't pass in solids_to_execute or steps_to_execute to pipeline_run, check_events_for_skips, etc.

wanted to get this into the release and i will read more closely and see if i can clean up these operators a bit more afterwards

test "have steps downstream of the skip that should also skip" in a follow up diff

python_modules/libraries/dagster-airflow/dagster_airflow/operators/docker_operator.py
307–308

it takes many different args and the one in the python operator is a bit different too, so not sure if it's worth refactoring into a method.

execute_raw spins up a docker container so it seems like we need toe execute_raw before raise skipping

yuhan marked an inline comment as not done.Mar 25 2021, 7:32 PM
yuhan added inline comments.
python_modules/libraries/dagster-airflow/dagster_airflow/operators/docker_operator.py
294

self.recon_repo would be None if we execute it afterwards - i *think* it's bc it includes spinning up the docker container which needs to happen before self.recon_repo exists

technically the order doesn't matter when there's a skip exception raised. bc the whole thing will execute only events are successfully returned - when it should skip, it will raise AirflowSkipException and not return the events.

but again, there's still a bit mysterious paths to me - i need to understand them better to make the right call.

python_modules/libraries/dagster-airflow/dagster_airflow/operators/docker_operator.py
96

*

This revision is now accepted and ready to land.Mar 25 2021, 8:35 PM
yuhan marked an inline comment as not done.
  • move skipping logic before execute_raw
  • update docker tests
yuhan added inline comments.
python_modules/libraries/dagster-airflow/dagster_airflow/operators/docker_operator.py
294

for posterity: all i said above wasn't true. the problem was in the test_dagster_docker_operator.py where none of the tests had recon_repo