Page MenuHomeElementl

conditional outputs based on output events instead of has_intermediate
ClosedPublic

Authored by yuhan on Nov 2 2020, 5:41 PM.

Details

Summary

Currently, steps are skipped if has_intermediate returns false for any inputs that come from optional outputs. (It's a little more complicated with fan-in, but not in a way that matters here).

With AssetStore, we're now supporting situations where steps write to locations that may already have data. This means that has_intermediate(run id, step output handle) is no longer synonymous with "did this step yield this output in this run?".

This diff decouples conditional execution from intermediate storage. With this diff, steps are skipped if any optional outputs have not been yielded by any completed upstream steps.

A couple nice things about this approach:

  • The logic for determining whether to skip a step is no longer spread between execute_step and ActiveExecution - it's now all in ActiveExecution.
  • We no longer launch celery tasks that spin up, discover their inputs are missing, emit a STEP_SKIPPED, and then spin down. We determine whether to skip the step before launching it.

https://github.com/dagster-io/dagster/issues/3298

As for airflow's task-skipping, as the task-skipping logic now lives fully in the ActiveExecution, and Airflow has no run-level plan process. In order to maintain compatibility, we implement should_skip_step for both python operator and docker operator (via dagster api execute_step, which 1) check if any inputs come from optional outputs (this pre-check helps us eliminate unnecessary db calls) 2) flag a skip signal if no upstream output events are yielded (this requires to fetch event logs from the instance)

Test Plan

bk

Diff Detail

Repository
R1 dagster
Lint
Lint Not Applicable
Unit
Tests Not Applicable

Event Timeline

There are a very large number of changes, so older changes are hidden. Show Older Changes
Harbormaster returned this revision to the author for changes because remote builds failed.Nov 4 2020, 4:51 PM
Harbormaster failed remote builds in B20777: Diff 25184!
Harbormaster returned this revision to the author for changes because remote builds failed.Nov 4 2020, 6:25 PM
Harbormaster failed remote builds in B20785: Diff 25193!
Harbormaster failed remote builds in B20786: Diff 25194!
Harbormaster returned this revision to the author for changes because remote builds failed.Nov 5 2020, 12:09 AM
Harbormaster failed remote builds in B20807: Diff 25223!
Harbormaster returned this revision to the author for changes because remote builds failed.Nov 5 2020, 1:27 AM
Harbormaster failed remote builds in B20811: Diff 25227!
Harbormaster returned this revision to the author for changes because remote builds failed.Nov 5 2020, 5:03 AM
Harbormaster failed remote builds in B20817: Diff 25235!
Harbormaster returned this revision to the author for changes because remote builds failed.Nov 5 2020, 4:36 PM
Harbormaster failed remote builds in B20822: Diff 25244!
sandyryza retitled this revision from conditional outputs with based on output events instead of has_intermediate to conditional outputs based on output events instead of has_intermediate.Nov 5 2020, 5:07 PM

THIS DIFF BREAKS TASK-SKIPPING FOR AIRFLOW

Even if we are ok with losing this feature in our airflow integration, the diff as is would result in baffling errors to users who attempt to use conditional outputs, so at a minimum we should error out with a clear message.

As for whether or not its ok to error out on conditional outputs when attempted to be used in the airflow integration - I'm not sure.

There is real complexity cost to the check_conditional_outputs impl we would need for [1] & [2]

python_modules/dagster/dagster/core/execution/plan/active.py
169–171

nit: maybe some newlines / bullet points to make this easily parsable

python_modules/libraries/dagster-airflow/dagster_airflow/operators/docker_operator.py
225

[2]

would need to be a flag on this CLI call that invokes the same fn used in [1]

python_modules/libraries/dagster-airflow/dagster_airflow/operators/util.py
140–143

[1]

  • for each step in step_keys
    • look at the pipeline def to see if its inputs come from conditional outputs
    • if no bail
    • else
      • fetch the output events for the step for this run_id from the event_log
      • raise Skip if missing

I feel pretty strongly that we should not break the Airflow skip functionality. But maybe not for the reason you think. I think we should support orchestration clusters that do *not* have run-level processes. The current constraint that there *must* be a launcher is a historical oddity. IMO we should not constraint ourselves on that front.

This revision now requires changes to proceed.Nov 30 2020, 6:34 PM

I feel pretty strongly that we should not break the Airflow skip functionality. But maybe not for the reason you think. I think we should support orchestration clusters that do *not* have run-level processes. The current constraint that there *must* be a launcher is a historical oddity. IMO we should not constraint ourselves on that front.

Got it. That leaves two options:

  1. Go with the approach laid out in this diff for the celery and multiprocess executors, and build separate handling for orchestrators that don't have run-level processes (like Airflow).
  2. Stick with the existing approach in the codebase, but modify it to query the instance DB instead of invoking has_intermediate.

I haven't formed a strong opinion, but am partial to some of the advantages of the first option:

  1. The second option means every task needs access to the instance DB. This might be tough in managed environments like Databricks.
  2. The second option adds the overhead of querying the instance DB to every task launch.
  3. For any particular executor, the skip logic all lives in one place. For the multiprocess and celery executors, it lives inside ActiveExecution. For Airflow, it all lives inside the task. This makes it easier to understand "why did my task skip or not skip"?

Thoughts?

Yeah I think in this case parallel implementations make sense. I just want to have the Airflow one around so we don't lose memory of how to do it, plus it's a useful think to support

have we considered the option to implement a has_asset method on asset stores?
we already have it on versioned asset store https://sourcegraph.com/github.com/dagster-io/dagster/-/blob/python_modules/dagster/dagster/core/storage/asset_store.py#L317:9

how about making has_asset to be an abstract method on the base AssetStore? then we could easily make it work with optional outputs exactly like intermediate storages. is there any context ive been missing?

have we considered the option to implement a has_asset method on asset stores?

What I was worried about with a has_asset direction is that, with AssetStore, we're now supporting situations where steps write to locations that may already have data. This means that has_asset is no longer synonymous with "did this step yield this output in this run?" like it was in the intermediate store case.

An alternative approach to the one laid out in this diff could be to say that we don't support conditional execution out-of-the-box for all asset stores, and the user needs to do the record keeping in their has_asset method if they want it.

So I think it kind of comes down to a question of what the spirit is behind the conditional execution semantics.

Thoughts @schrockn @alangenfeld?

want to clarify how conditional execution would work in this case:

def test_asset_store_optional_output_path_exists():
    with seven.TemporaryDirectory() as tmpdir_dir:
        asset_store = custom_path_fs_asset_store.configured({"base_dir": tmpdir_dir})
        filepath = os.path.join(tmpdir_dir, "foo")
        # file exists already
        with open(filepath, "wb") as write_obj:
            pickle.dump([1], write_obj)

        assert os.path.exists(filepath)

        skip = True

        @solid(output_defs=[OutputDefinition(is_required=False, asset_metadata={"path": filepath})])
        def solid_a(_context):
            if not skip:
                yield Output([1, 2])

        @solid(output_defs=[OutputDefinition(asset_metadata={"path": "bar"})])
        def solid_b(_context, array):
            return array

        @pipeline(mode_defs=[ModeDefinition("local", resource_defs={"asset_store": asset_store})])
        def asset_pipeline_optional_output_path_exists():
            solid_b(solid_a())

        result = execute_pipeline(asset_pipeline_optional_output_path_exists)
        assert result.success

        # should solid_b be skipped?
        assert result.result_for_solid("solid_b").skipped

in the intermediate_storage world, the execution would skip solid_b because solid_a's output wasn't there.
in the asset store world, although solid_a didn't yield an output, the file exists already, do we want solid_b to run or be skipped?

  • if we want solid_b to run, has_asset approach would work
  • if we'd like to skip solid_b, meaning as long as there's no corresponding output in this pipeline run, we would need some kind of skipping logics other than the path existence

D5422 is a RFC for the sake of discussion

So I think it kind of comes down to a question of what the spirit is behind the conditional execution semantics.

The conditional output yielding to skip downstream solids has been around since I started, but I think it is our answer to how to do skipping in a data oriented orchestrator.

solid_a didn't yield an output, the file exists already, do we want solid_b to run or be skipped?

Unless we want to revisit the system as a whole, I think solid_b has to skip (unless it is using a fan_in input in which case it only skips if all fanned in entries skip).

An alternative approach to the one laid out in this diff could be to say that we don't support conditional execution out-of-the-box for all asset stores, and the user needs to do the record keeping in their has_asset method if they want it.

To me the trade off is clear, I would much rather take complexity in our airflow/dask integrations having to do extra work over a potential its-not-working case with no clear error message in user AssetStore implementations.

python_modules/libraries/dagster-airflow/dagster_airflow/operators/util.py
140–143

^ I think this is not that crazy, and the fact you can bail without going to the DB in the vast majority of cases where none of the upstream outputs are marked as not-required makes it feel safe to me

yuhan edited reviewers, added: sandyryza; removed: yuhan.

should_skip for airflow task skipping (where it doesn't have run-level plan process)

yuhan edited the test plan for this revision. (Show Details)

seems p good - maybe some tests that directly exercise should_skip ? make sure to throw some composites & fan-in in the mix

python_modules/libraries/dagster-airflow/dagster_airflow/operators/util.py
143

Will there be a regression in terms of how informative our message is about why we're skipping?

147

Should we get rid of check_events_for_skips now?

python_modules/libraries/dagster-airflow/dagster_airflow/operators/util.py
143

nope the current message is just "Dagster emitted skip event, skipping execution in Airflow"

147

it can still capture STEP_SKIPPED event if any. Im not sure if we want to see that event as the source of truth for task skipping? - same case in the docker operator

should_skip -> should_skip_step + test fan in & fan out

@alangenfeld gooood call! caught a logic bug

(you may have got the same msg from another diff - plz ignore that one, it was a wip diff for bk testing)

LGTM! Looks like there's a lingering pdb.set_trace.

python_modules/dagster/dagster_tests/core_tests/execution_plan_tests/test_execution_plan.py
480

Seems like there's some debugging still in here?

This revision was not accepted when it landed; it landed in state Needs Review.Dec 9 2020, 6:19 PM
This revision was automatically updated to reflect the committed changes.
python_modules/dagster/dagster/cli/api.py
296

[1]

306–309

[1]

python_modules/libraries/dagster-airflow/dagster_airflow/operators/docker_operator.py
290

[2]