Page MenuHomeElementl

RFC: fix io manager optional fan in
ClosedPublic

Authored by yuhan on Jan 7 2021, 9:07 AM.

Details

Summary

https://github.com/dagster-io/dagster/issues/3506

found this bug when test_pipeline_execution.py::test_multi_dep_optional failed after we switch the default from intermediate storage to mem object manager

because https://sourcegraph.com/github.com/dagster-io/dagster/-/blob/python_modules/dagster/dagster/core/execution/plan/inputs.py#L334:16 uses can_load_input_object for determining whether to skip an input from multi source

Test Plan

unit

Diff Detail

Repository
R1 dagster
Lint
Lint Not Applicable
Unit
Tests Not Applicable

Event Timeline

yuhan retitled this revision from [bug] object manager fan in optional upstream to RFC: add `can_load ` to `InputManager` - fix object manager optional fan in.Jan 7 2021, 9:27 AM
yuhan edited the summary of this revision. (Show Details)
yuhan requested review of this revision.Jan 7 2021, 9:38 AM

Oy - good catch on this one.

I think we have the same issues here that we had with the step-skipping, right? I.e. there's a difference between "can load" and "upstream step produced an output for this run". A fixed location asset might be loadable, but might not have been outputted by an upstream step in this run. We could end up with a situation where:

  • can_load is True for every source handle that a step depends on.
  • The step is still skipped, because none of the upstream steps yielded an Output.

This is maybe not the best from a performance perspective, but an alternative could be for the step to fetch events from the instance when there's fan-in and some of the inputs are optional? In the future, we could find some way for the plan process to pass this information to the step on launch?

AssetStoreOperation.serializable

yuhan planned changes to this revision.Jan 7 2021, 8:22 PM
  • can_load is True for every source handle that a step depends on.
  • The step is still skipped, because none of the upstream steps yielded an Output.

checking the latter only should be enough for optional inputs

python_modules/dagster/dagster/core/execution/plan/inputs.py
326–332

This is where the step skipping check happens.

I believe we would like to check output event instead of the presence of the output object. In order to do the check, we need to know if the upstream step is skipped, i.e. if it yields any output event. so it leads us to either

  1. ask instance to find out about it based on events
  2. figure out some way to pass that info through context (e.g. context.step) to avoid db call. it'd be tricky because ExecutionStep is locked at run time. Or maybe we can somehow inspect the event stream during core_dagster_event_sequence_for_step, which could be risky too because it's gonna be on the very critical path
python_modules/dagster/dagster/core/execution/plan/inputs.py
326–332

as far as I can tell, asking the instance is probably the best we can do here. ideally, perhaps we'd find some way to pass it in when we launch the step, but I think that would require some fairly big architectural changes.

python_modules/dagster/dagster/core/execution/plan/inputs.py
326–332

updated with the instance call approach (see instance.step_has_output) and tracked the ideal way in an issue https://github.com/dagster-io/dagster/issues/3511

not happy with this fix :( but it seems to be the best we could do atm.

python_modules/dagster/dagster/core/instance/__init__.py
914 ↗(On Diff #29044)

@sandyryza
a minor perf improvement might be to have an in-memory instance dict to keep track of skipped steps per run_id to avoid db interaction. but i dont think it's worth it too much since we already limit the #calls by checking fan-in and optional output.

yuhan retitled this revision from RFC: add `can_load ` to `InputManager` - fix object manager optional fan in to RFC: instance.step_has_output - fix object manager optional fan in.Jan 8 2021, 1:14 AM
yuhan edited the summary of this revision. (Show Details)

lgtm! a little hacky, but difficult to think of a better approach.

@alangenfeld - do you have any reservations?

This revision is now accepted and ready to land.Jan 8 2021, 1:55 AM

ooof this sucks

if we are going to go down this path, can_load_input_object is entirely contained in inputs.py so we can refactor things to avoid repeated fetches of the event logs for each entry in the fan in, and also contain this crime to one spot in code

This revision now requires changes to proceed.Jan 8 2021, 6:41 PM
python_modules/dagster/dagster/core/execution/plan/inputs.py
326–327

this is the only callsite

check event logs only once per fan-in

python_modules/dagster/dagster/core/execution/plan/inputs.py
326–327

the check happens in self._sources_to_skip now and we now make a single instance call once per fan-in input.

early return when all sources are required doens't work for pipeline like collect([ret_one(), echo(echo(skip()))]) where the collect's direct sources are all required but its transitive dependencies are not.

python_modules/dagster/dagster/core/execution/plan/inputs.py
326–327

was doing _sources_to_skip in https://dagster.phacility.com/D5855?id=29104 which pre-checked if all sources are required so we could avoid unnecessary instance call.
but it wouldn't work for cases like https://sourcegraph.com/github.com/dagster-io/dagster/-/blob/python_modules/dagster/dagster_tests/core_tests/test_pipeline_execution.py#L1046:9 where the fan-in step's transitive dependencies are optional but its direct sources are required.

so we either do a look all the way up for each entry for the fan-in step to find out if there's any optional step output in the graph, or we simply do instance call for every fan-in step input. here i chose to the latter

comments inline - I think this will be much clearer if all of this complexity is brought in to one spot with some comments

python_modules/dagster/dagster/core/execution/plan/inputs.py
154–155

given [1] and [2] as the only other implementations, what do you think about consolidating all of this in to [3] and removing can_load_input_object from the base class

247–248

[1]

273–274

[1]

308–318

[2] i expect this is never invoked

320–337

[3]

python_modules/dagster/dagster/core/instance/__init__.py
912–920 ↗(On Diff #29127)

i would prefer if we not add this function to the instance, and just invoke all_logs from the callsite

I have a good feeling that this will feel better all pulled in to one spot

This revision now requires changes to proceed.Jan 8 2021, 11:00 PM

object manager -> io manager rebase

yuhan retitled this revision from RFC: instance.step_has_output - fix object manager optional fan in to RFC: fix io manager optional fan in.Jan 9 2021, 10:35 PM
This revision is now accepted and ready to land.Jan 11 2021, 4:59 PM
This revision was automatically updated to reflect the committed changes.