Page MenuHomeElementl

RFC: fix result.result_for_solid when using mem_io_manager + default to mem_io_manager
ClosedPublic

Authored by yuhan on Jan 6 2021, 8:34 AM.

Details

Summary

this diff does two things

  1. https://github.com/dagster-io/dagster/issues/3508

    RFC: because io managers are resources, the default mem_io_manager won't work for things like result.result_for_solid. it's because [2] output_value will reconstruct the pipeline context, where resources get re-initiated, which means we would lose the in-memory values in the mem_io_manager as the io_manager in output_value is going to be a brand new instance of the InMemoryioManager.
  1. default to mem_io_manager

    switches the logic to check "is default intermediate storage" instead of "is default io manager" in the adapting logic - so we would be using mem_io_manager as the default rather than the adapted mem intermediate storage.

couldn't separate these two because this check will always switch to use intermediate storage if mem_io_manager is used

Test Plan

bk

Diff Detail

Repository
R1 dagster
Lint
Lint Passed
Unit
No Test Coverage

Event Timeline

Harbormaster returned this revision to the author for changes because remote builds failed.Jan 6 2021, 8:52 AM
Harbormaster failed remote builds in B23705: Diff 28828!
yuhan requested review of this revision.Jan 6 2021, 5:15 PM
yuhan retitled this revision from intermediate-storage-deprecation default to mem_object_manager to RFC: intermediate-storage-deprecation default to mem_object_manager.
yuhan edited the summary of this revision. (Show Details)
yuhan added inline comments.
python_modules/dagster/dagster/core/execution/results.py
560

[2]

python_modules/dagster/dagster/core/storage/mem_object_manager.py
13 ↗(On Diff #28828)

[1]

python_modules/dagster/dagster/core/execution/results.py
560

I'm curious as to whether yall think this could potentially be a problem for other types of resources (ie reconstructing them all isn't the right approach), or if the way we conceptualized resources has fundementally changed as a result of us moving previously separate concepts to be resources. I'd imagine as we make more and more stuff resources, this might be a problem in other cases as well.

yuhan added inline comments.
python_modules/dagster/dagster/core/execution/results.py
560

IMO it is more of the question how we view resources conceptually. My understanding to the current resources is that they are all definitions, meaning every time we run a pipeline or reconstruct the context, it would be a new instance of a resource definition.

I think it could potentially be a problem if we push more usage to resources, esp in some cases users may want to keep the states inside a resource instance cross pipeline runs.

@sandyryza @schrockn thoughts?

python_modules/dagster/dagster/core/execution/results.py
560

I think the core problem here is the reconstruct_context is a flawed concept.

In order to do testability right now I think we should do a pattern like this: https://github.com/dagster-io/dagster/discussions/3329#discussioncomment-146413 where you pass in the resource instance in a way that is accessible during the computation.

If I were to go back in time, the in-memory execution variants would look something like:

with create_context(...) as context:
   run_graph(context, graph, ....)
   assert context.resources.whatever

or something similar.

In short: I don't think we should try to make in memory instances work with reconstruct_context

python_modules/dagster/dagster/core/execution/results.py
560

Strongly agree that the context manager approach would be the ideal.

That said, I'm concerned about breaking users who are doing something like:

@solid
def my_solid(_):
    return 5

@pipeline
def my_pipeline():
    my_solid()

result = execute_pipeline(my_pipeline)
assert result.result_for_solid("my_solid").output_value() == 5

E.g. I suspect we do this in tests all over the place.

oof ok I didn't think about that deeply enough. that definitely needs to work

Not ideal, but a solution could be global state - a singleton dict shared by all instances of mem object manager. Its entries would be keyed on run ID to avoid collisions. An issue would be memory leaking.

workaround prototype: resource_instances_not_to_reinit to exclude certain resources during reconstruct_context

yuhan retitled this revision from RFC: intermediate-storage-deprecation default to mem_object_manager to RFC: fix result.result_for_solid when using mem_object_manager + default to mem_object_manager.Jan 7 2021, 8:45 PM
yuhan edited the summary of this revision. (Show Details)
yuhan added a reviewer: schrockn.
python_modules/dagster/dagster/core/execution/resources_init.py
107

Should we just do a more hacky check here instead of threading through resource_instances_not_to_reinit? If we are only going to use this for the mem_io_manager then it is more surgical just to hardcode a check here

python_modules/dagster/dagster/core/execution/resources_init.py
107

we will still need to thread the mem_io_manager object through tho. so it would be, instead of resource_instances_not_to_reinit which requires adding the do_not_reinit arg on ResourceDefinition (hacky too), we only pass mem_io_manager_instance through?

python_modules/dagster/dagster/core/execution/resources_init.py
107

ah i misread this. yeah you need this. hmmmm

depends on D5865
separate a obvious bug fix from this hacky diff

So just brainstorming here a bit. Another approach might be special-casing at a different spot.

Namely we could search all the resources where PipelineExecutionResult is constructed in execute_run and pass the mem_io_manager directly. And then output_value could first check to see if that is set and then only if it is *not* fall back to the reconstruct_context codepath. This would compartmentalize the extra complexity to PipelineExecutionResult' and SolidExecutionResult` rather than adding to the already complex context initialization codepath.

Just bouncing back to your queue to make sure you see this.

Thoughts?

python_modules/dagster/dagster/core/definitions/resource.py
74 ↗(On Diff #29002)

"do_not_reinit" rather than "_do_not_reinit"

python_modules/dagster/dagster/core/execution/context_creation_pipeline.py
455

eliminate dict in comment

python_modules/dagster/dagster/core/execution/results.py
560–562

we can remove FIXME correct?

This revision now requires changes to proceed.Jan 8 2021, 12:49 PM

Namely we could search all the resources where PipelineExecutionResult is constructed in execute_run and pass the mem_io_manager directly.

^this part sounds good to me.

And then output_value could first check to see if that is set and then only if it is *not* fall back to the reconstruct_context codepath.

maybe a better way to go is

  1. pass in mem_io_manager_dict as hardcoded_resource_instances to PipelineExecutionResult
  2. reconstruct_context takes an optional hardcoded_resource_instances arg and resource init would skip re-create those given instances - i.e. so we keep doing [3] without asking users to pre-define do_not_reinit
return PipelineExecutionResult(
    pipeline.get_definition(),
    pipeline_run.run_id,
    event_list,
    lambda hardcoded_resource_instances: scoped_pipeline_context(
        execution_plan,
        pipeline_run.run_config,
        pipeline_run,
        instance,
        intermediate_storage=pipeline_context.intermediate_storage,
    ),
   hardcoded_resource_instances,
)

the benefit of this is

  1. we dont need a hacky do_not_reinit
  2. output_value can do reconstruct_context(hardcoded_resources)
  3. users can do in tests
with reconstruct_context(hardcoded_resources) as context:
   assert context.resources.whatever

downside: still complicates the context initialization codepath

python_modules/dagster/dagster/core/execution/resources_init.py
107–111

[3]

yuhan marked an inline comment as done.

clean up do_not_reinit
resource_instances_not_to_reinit -> hardcoded_resource_instances

object manager -> io manager rebase

yuhan retitled this revision from RFC: fix result.result_for_solid when using mem_object_manager + default to mem_object_manager to RFC: fix result.result_for_solid when using mem_io_manager + default to mem_io_manager.Jan 9 2021, 10:36 PM
yuhan edited the summary of this revision. (Show Details)

"hardcoded_resource_instances" is a pretty rough name since they aren't really hardcoded. they are not fixed in code. maybe "resource_instances_to_override" or something?

Also I went through the codepath so see what we do now and oh boy is the intermediate storage override hidden in a strange place. It would be awesome if you could refactor that in a follow up.

Accepting but please strongly consider rename!

python_modules/dagster/dagster/core/execution/context_creation_pipeline.py
206–209

wow this predates you but this is super confusing.

intermediate_storage = intermediate_storage if intermediate_storage else create_intermediate_storage(context_creation_data, scoped_resources_builder)

would make so much more sense. the argument is also misnamed in create_intermediate_storage adding to the confusion

347

yeah "intermediate_storage_data" needs to be renamed too

This revision is now accepted and ready to land.Jan 12 2021, 1:43 PM

resource_instances_to_override

python_modules/dagster/dagster/core/execution/context_creation_pipeline.py
206–209

will investigate in a followup after the release-blocking diffs are done