Currently, steps are skipped if has_intermediate returns false for any inputs that come from optional outputs. (It's a little more complicated with fan-in, but not in a way that matters here).
With AssetStore, we're now supporting situations where steps write to locations that may already have data. This means that has_intermediate(run id, step output handle) is no longer synonymous with "did this step yield this output in this run?".
This diff decouples conditional execution from intermediate storage. With this diff, steps are skipped if any optional outputs have not been yielded by any completed upstream steps.
A couple nice things about this approach:
- The logic for determining whether to skip a step is no longer spread between execute_step and ActiveExecution - it's now all in ActiveExecution.
- We no longer launch celery tasks that spin up, discover their inputs are missing, emit a STEP_SKIPPED, and then spin down. We determine whether to skip the step before launching it.
As for airflow's task-skipping, as the task-skipping logic now lives fully in the ActiveExecution, and Airflow has no run-level plan process. In order to maintain compatibility, we implement should_skip_step for both python operator and docker operator (via dagster api execute_step, which 1) check if any inputs come from optional outputs (this pre-check helps us eliminate unnecessary db calls) 2) flag a skip signal if no upstream output events are yielded (this requires to fetch event logs from the instance)