Page MenuHomeElementl

DynamicOutput execution support
ClosedPublic

Authored by alangenfeld on Dec 11 2020, 12:06 AM.

Details

Summary

The big one, this introduces all the Unresolved pieces to support dynamic unfolding and makes execution work.

Interesting things to note:

  • This takes the short cut of not giving the snapshot full information, it just gives it a representation of the deps with the unresolved steps, so using the [?] step key form
  • ExecutionPlan updates its structures as it resolves, couldn't really think of a better way to do this.
  • The execution result objects are solid handle oriented, and so this just takes the approach of collecting the mapped results in a simliar way to how we do on the step that does the original mapping. Not the most elegant but works.

image.png (1×3 px, 523 KB)

Test Plan

added tests, will likely add more

Diff Detail

Repository
R1 dagster
Lint
Lint Passed
Unit
No Test Coverage

Event Timeline

There are a very large number of changes, so older changes are hidden. Show Older Changes
alangenfeld edited the summary of this revision. (Show Details)

mappable -> dynamic

alangenfeld retitled this revision from MappableOutput execution support to DynamicOutput execution support.Dec 14 2020, 10:40 PM

Overall, looks amazing -- so excited for this!

I think a consolidated articulation of all the new classes, how they resolve to each other, and when the resolution happens (maybe by stepping through a dynamic pipeline execution step by step w/in excalidraw?) could be useful to future readers.

Small note: I think the code in master uses mapping_key but this diff users map_key instead -- might want to standardize on one or the other. Same w/ some file names: "test_mappable_output.py", "mapping_tests/test_mapped_execution.py"

python_modules/dagster-graphql/dagster_graphql/schema/execution.py
122

"that has not yet resolved"? or "that has not yet been resolved"?

python_modules/dagster/dagster/core/execution/memoization.py
101

do we need this if statement? does pipeline_context.for_step(step) break for dynamic steps?

python_modules/dagster/dagster/core/execution/plan/active.py
36

could be nice to convey that only successfully yielded outputs are included. maybe "self._resolved_dynamic_outputs"? "self._successful_dynamic_outputs"?

118

enumerate(self._pending_resolve) since self._pending_resolve is already a list?

306

nice

python_modules/dagster/dagster/core/execution/plan/compute.py
42

should we check that exactly one of the step_inputs is type UnresolvedStepInput

python_modules/dagster/dagster/core/execution/plan/handle.py
49

should we add unit tests for DynamicStepHandle.to_key, DynamicStepHandle.unresolved_form, UnresolvedStepHandle.to_key, UnresolvedStepHandle.resolve, like test_step_handles for StepHandle.from_key

52

I might add a line clarifying when the UnresolvedStepHandle resolves into DynamicStepHandle (ie after upstream dynamic step yields all dynamic output and step succeeds, then all downstream unresolved step handles resolve into dynamic step handles)

python_modules/dagster/dagster/core/execution/plan/inputs.py
401

nit: step_output_handle=, input_def=, config_data=

444

nit: map_key for consistency

446

nit: step_output_handle=

python_modules/dagster/dagster/core/execution/plan/plan.py
496

this intersection is responsible for respecting the ExecutionPlan's step_handles_to_execute?

511

add a test for this section?

python_modules/dagster/dagster/core/execution/plan/step.py
227

does _create_step_outputs handle this?

python_modules/dagster/dagster/core/execution/results.py
428

what is the case when only one of output.mapping_key and step.get_mapping_key() is set?

python_modules/dagster/dagster_tests/core_tests/mapping_tests/test_mapped_execution.py
66

nice

sweet thanks for the feedback, unaddressed inline comments should be resolved by incoming diff w/ comments or changes.

python_modules/dagster/dagster/core/execution/plan/active.py
118

the list is to make a copy since we are mutating it while iterating over it, will leave a comment

124

^

python_modules/dagster/dagster/core/execution/plan/compute.py
42

you can have more than one once in the case that you have a downstream dag from a dynamic output - from the regular outputs of those not-yet-resolved steps

the assertion at [1] I think gets at what you were thinking

python_modules/dagster/dagster/core/execution/plan/plan.py
511

this is exercised by the multiprocessing test - in each step execution subprocess, let me know if you had something more specific in mind

python_modules/dagster/dagster/core/execution/plan/step.py
183

[1]

227

good catch

python_modules/dagster/dagster/core/execution/results.py
428
  • output.mapping_key is set: this is a normal execution step with dynamic outputs
  • step.get_mapping_key() is set: this is a step downstream of a dynamic output

see [2] in https://dagster.phacility.com/D5612 for the case where they would both be set, which as that diff stands we *allow* but you would not be able to use the python api results object to interact with due to this invariant. I think the fix would be probably be double dict of step resolved mapping key -> output mapping key -> results

alangenfeld marked 5 inline comments as done.

feedback

Thanks for your patience in terms of me getting to this:

A few high-level comments:

  1. Having the regex parsing in such a "core" function (ExecutionStep.from_key) gives me the willies. Even just renaming and moving it to make it clear that this should only be at public API ingress points would help. Since we are also in effect implementing a DSL that we will have to parse and support forever I think it is worth more thought and documentation.
  2. I'm having a bit of trouble with the resolved/unresolved/dynamic language. There are two layers (definition and execution step space) as well as a lifecycle within the execution step space. Right now DynamicOutputDefinition lead to Unresolved Steps and Unresolved Step Outputs and then lead to Dynamic Step Outputs on Compute Steps?
  3. I'm concerned about this move to make ExecutionPlans more mutable with the resolve method. I'd much rather see a lineage of immutable plans. This would also help clarify the current BS where "subplans" are actually just the entire plan with the "step_keys_to_execute" property.
python_modules/dagster-graphql/dagster_graphql/schema/execution.py
113

should we call this a dynamic step as well?

python_modules/dagster/dagster/core/execution/plan/active.py
117–125

you could use copy.copy and/or assign to a new intermediate variable to encode the intent of what is happening directly in code (and make it resilient to future changes)

python_modules/dagster/dagster/core/execution/plan/compute.py
49

what do the step outputs on an *unresolved* step represent?

python_modules/dagster/dagster/core/execution/plan/handle.py
20–67

are we using this in programatically significant ways? this is scary. i would rather make this feel less "first class" if this just a discourage translation layer. That might just be a naming thing. In a follow up we could move this to a standalone function to indicate that this should only be used at public API ingress points?

21–25

consider pre-compiling these regex'es

python_modules/dagster/dagster/core/execution/plan/inputs.py
374

FromResolved?

FromDynamicallyResolved?

There is a language issue going on here a bit which doesnt feel *quite* right

381

Ah it appears you have the same concern about what step output means in the unresolved context

python_modules/dagster/dagster/core/execution/plan/outputs.py
114

preview_form? hmmm

python_modules/dagster/dagster/core/execution/plan/plan.py
375–379

can we be more strict here and explictly do an elif for the append case and then error on the else?

476–501

This is unfortunately mutative. What would this codepath look like if we instead froze step_dict and built up a lineage of execution plans so that we could save information about the complication process?

508–510

"if it is an exact match, pull it in"

This comment is hard to parse

python_modules/dagster/dagster/core/execution/plan/step.py
16

what's this comment?

This revision now requires changes to proceed.Dec 21 2020, 3:51 PM
python_modules/dagster/dagster/core/execution/plan/plan.py
476–501

after exploring this one tough bit is external places like the context that hold a reference to the ExecutionPlan object, so likely any immutable record stack would have to happen internal to an object that maintains stable reference identity. Think this one might have to move to a follow up

python_modules/dagster/dagster/core/execution/plan/handle.py
21–25

from some small research it looks like python caches them already

feedback - renames and cleanup

alangenfeld edited the summary of this revision. (Show Details)

more renaming, updated summary with excalidraw

fixup step_keys_to_execute for None values

picard_clap

Excellent. Really exciting move forward. I had some minor comments around places where you can be a little more defensive, but I think this ended up at a nice place

python_modules/dagster/dagster/core/execution/plan/active.py
118

copy thing worth a comment

python_modules/dagster/dagster/core/execution/plan/inputs.py
344

doing the top-level include here seems bad. I would dot into dagster.core.types or whatever it is

python_modules/dagster/dagster/core/execution/resolve_versions.py
100–103

would rather see if isinstance(step, AllUnresolved)

followed by an invariant check on ExecutionStep

to make sure we are defensive against adding additional subtypes

139

similar thing here. helper function that does more checks would be good

This revision is now accepted and ready to land.Jan 7 2021, 9:31 PM
This revision was automatically updated to reflect the committed changes.