Page MenuHomeElementl

RFC: fix result.result_for_solid when using mem_io_manager + default to mem_io_manager
ClosedPublic

Authored by yuhan on Jan 6 2021, 8:34 AM.

Details

Summary

this diff does two things

  1. https://github.com/dagster-io/dagster/issues/3508

    RFC: because io managers are resources, the default mem_io_manager won't work for things like result.result_for_solid. it's because [2] output_value will reconstruct the pipeline context, where resources get re-initiated, which means we would lose the in-memory values in the mem_io_manager as the io_manager in output_value is going to be a brand new instance of the InMemoryioManager.
  1. default to mem_io_manager

    switches the logic to check "is default intermediate storage" instead of "is default io manager" in the adapting logic - so we would be using mem_io_manager as the default rather than the adapted mem intermediate storage.

couldn't separate these two because this check will always switch to use intermediate storage if mem_io_manager is used

Test Plan

bk

Diff Detail

Repository
R1 dagster
Branch
yuhan/no-asset-store
Lint
Lint Passed
Unit
No Test Coverage

Event Timeline

Harbormaster returned this revision to the author for changes because remote builds failed.Jan 6 2021, 8:52 AM
Harbormaster failed remote builds in B23705: Diff 28828!
yuhan requested review of this revision.Jan 6 2021, 5:15 PM
yuhan retitled this revision from intermediate-storage-deprecation default to mem_object_manager to RFC: intermediate-storage-deprecation default to mem_object_manager.
yuhan edited the summary of this revision. (Show Details)
yuhan added inline comments.
python_modules/dagster/dagster/core/execution/results.py
521

[2]

python_modules/dagster/dagster/core/storage/mem_object_manager.py
13 ↗(On Diff #28828)

[1]

python_modules/dagster/dagster/core/execution/results.py
521

I'm curious as to whether yall think this could potentially be a problem for other types of resources (ie reconstructing them all isn't the right approach), or if the way we conceptualized resources has fundementally changed as a result of us moving previously separate concepts to be resources. I'd imagine as we make more and more stuff resources, this might be a problem in other cases as well.

yuhan added inline comments.
python_modules/dagster/dagster/core/execution/results.py
521

IMO it is more of the question how we view resources conceptually. My understanding to the current resources is that they are all definitions, meaning every time we run a pipeline or reconstruct the context, it would be a new instance of a resource definition.

I think it could potentially be a problem if we push more usage to resources, esp in some cases users may want to keep the states inside a resource instance cross pipeline runs.

@sandyryza @schrockn thoughts?

python_modules/dagster/dagster/core/execution/results.py
521

I think the core problem here is the reconstruct_context is a flawed concept.

In order to do testability right now I think we should do a pattern like this: https://github.com/dagster-io/dagster/discussions/3329#discussioncomment-146413 where you pass in the resource instance in a way that is accessible during the computation.

If I were to go back in time, the in-memory execution variants would look something like:

with create_context(...) as context:
   run_graph(context, graph, ....)
   assert context.resources.whatever

or something similar.

In short: I don't think we should try to make in memory instances work with reconstruct_context

python_modules/dagster/dagster/core/execution/results.py
521

Strongly agree that the context manager approach would be the ideal.

That said, I'm concerned about breaking users who are doing something like:

@solid
def my_solid(_):
    return 5

@pipeline
def my_pipeline():
    my_solid()

result = execute_pipeline(my_pipeline)
assert result.result_for_solid("my_solid").output_value() == 5

E.g. I suspect we do this in tests all over the place.

oof ok I didn't think about that deeply enough. that definitely needs to work

Not ideal, but a solution could be global state - a singleton dict shared by all instances of mem object manager. Its entries would be keyed on run ID to avoid collisions. An issue would be memory leaking.

workaround prototype: resource_instances_not_to_reinit to exclude certain resources during reconstruct_context

yuhan retitled this revision from RFC: intermediate-storage-deprecation default to mem_object_manager to RFC: fix result.result_for_solid when using mem_object_manager + default to mem_object_manager.Jan 7 2021, 8:45 PM
yuhan edited the summary of this revision. (Show Details)
yuhan added a reviewer: schrockn.
python_modules/dagster/dagster/core/execution/resources_init.py
107

Should we just do a more hacky check here instead of threading through resource_instances_not_to_reinit? If we are only going to use this for the mem_io_manager then it is more surgical just to hardcode a check here

python_modules/dagster/dagster/core/execution/resources_init.py
107

we will still need to thread the mem_io_manager object through tho. so it would be, instead of resource_instances_not_to_reinit which requires adding the do_not_reinit arg on ResourceDefinition (hacky too), we only pass mem_io_manager_instance through?

python_modules/dagster/dagster/core/execution/resources_init.py
107

ah i misread this. yeah you need this. hmmmm

depends on D5865
separate a obvious bug fix from this hacky diff

So just brainstorming here a bit. Another approach might be special-casing at a different spot.

Namely we could search all the resources where PipelineExecutionResult is constructed in execute_run and pass the mem_io_manager directly. And then output_value could first check to see if that is set and then only if it is *not* fall back to the reconstruct_context codepath. This would compartmentalize the extra complexity to PipelineExecutionResult' and SolidExecutionResult` rather than adding to the already complex context initialization codepath.

Just bouncing back to your queue to make sure you see this.

Thoughts?

python_modules/dagster/dagster/core/definitions/resource.py
74

"do_not_reinit" rather than "_do_not_reinit"

python_modules/dagster/dagster/core/execution/context_creation_pipeline.py
455

eliminate dict in comment

python_modules/dagster/dagster/core/execution/results.py
518–520

we can remove FIXME correct?

This revision now requires changes to proceed.Jan 8 2021, 12:49 PM

Namely we could search all the resources where PipelineExecutionResult is constructed in execute_run and pass the mem_io_manager directly.

^this part sounds good to me.

And then output_value could first check to see if that is set and then only if it is *not* fall back to the reconstruct_context codepath.

maybe a better way to go is

  1. pass in mem_io_manager_dict as hardcoded_resource_instances to PipelineExecutionResult
  2. reconstruct_context takes an optional hardcoded_resource_instances arg and resource init would skip re-create those given instances - i.e. so we keep doing [3] without asking users to pre-define do_not_reinit
return PipelineExecutionResult(
    pipeline.get_definition(),
    pipeline_run.run_id,
    event_list,
    lambda hardcoded_resource_instances: scoped_pipeline_context(
        execution_plan,
        pipeline_run.run_config,
        pipeline_run,
        instance,
        intermediate_storage=pipeline_context.intermediate_storage,
    ),
   hardcoded_resource_instances,
)

the benefit of this is

  1. we dont need a hacky do_not_reinit
  2. output_value can do reconstruct_context(hardcoded_resources)
  3. users can do in tests
with reconstruct_context(hardcoded_resources) as context:
   assert context.resources.whatever

downside: still complicates the context initialization codepath

python_modules/dagster/dagster/core/execution/resources_init.py
107–111

[3]

yuhan marked an inline comment as done.

clean up do_not_reinit
resource_instances_not_to_reinit -> hardcoded_resource_instances

object manager -> io manager rebase

yuhan retitled this revision from RFC: fix result.result_for_solid when using mem_object_manager + default to mem_object_manager to RFC: fix result.result_for_solid when using mem_io_manager + default to mem_io_manager.Jan 9 2021, 10:36 PM
yuhan edited the summary of this revision. (Show Details)

"hardcoded_resource_instances" is a pretty rough name since they aren't really hardcoded. they are not fixed in code. maybe "resource_instances_to_override" or something?

Also I went through the codepath so see what we do now and oh boy is the intermediate storage override hidden in a strange place. It would be awesome if you could refactor that in a follow up.

Accepting but please strongly consider rename!

python_modules/dagster/dagster/core/execution/context_creation_pipeline.py
206–209

wow this predates you but this is super confusing.

intermediate_storage = intermediate_storage if intermediate_storage else create_intermediate_storage(context_creation_data, scoped_resources_builder)

would make so much more sense. the argument is also misnamed in create_intermediate_storage adding to the confusion

347

yeah "intermediate_storage_data" needs to be renamed too

This revision is now accepted and ready to land.Jan 12 2021, 1:43 PM

resource_instances_to_override

python_modules/dagster/dagster/core/execution/context_creation_pipeline.py
206–209

will investigate in a followup after the release-blocking diffs are done