Page MenuHomeElementl

executor-based IO-management
Needs RevisionPublic

Authored by sandyryza on Jul 2 2021, 9:33 PM.

Details

Summary

IOManager gets to decide how to store and load an output based on what executor is in use.

Test Plan

take out intermediate storage config from multiprocessing test

Diff Detail

Repository
R1 dagster
Branch
executor-based-io (branched from master)
Lint
Lint Passed
Unit
No Test Coverage

Event Timeline

Harbormaster returned this revision to the author for changes because remote builds failed.Jul 2 2021, 10:41 PM
Harbormaster failed remote builds in B33156: Diff 40871!

I like this better than defaulting to fs io manager.

python_modules/dagster/dagster/core/definitions/executor.py
46

may be good to comment some examples - my understanding is it'd be all cases where each step executes its process, e.g. dask_executor, celery_executor.

python_modules/dagster/dagster/core/storage/default_io_manager.py
22–27

iiuc, SHARED_NOTHING will reach here and we defer to users to use their own custom and remote io manager?

65

curious to hear people's thoughts on the future plan:
going with this path, in the future, if we have executors on instance, looks like we are able to tell which io manager to create here instead of the switch logics in the DefaultIOManager, right?

python_modules/dagster/dagster/core/storage/default_io_manager.py
22–27

Exactly

65

I had been imagining that, even once we have executors on instance, we would still have the switching logic inside DefaultIOManager. The execution_boundary_type would just come from the instance-level executor instead of a job-level executor.

If we went the direction of having the executor decide which IOManager to create, I worry that we'd need to complicate the APIs for specifying IOManagers. E.g. each OutputDefinition might need a dictionary of executor types to io_manager_keys.

Do you have advantages in mind for having the executor tell which IOManager to create?

will have to update the checks that are currently == mem_io_manager in plan.py with this approach , assuming we don't want to regress experience around retries from dagit

python_modules/dagster/dagster/core/definitions/executor.py
43–46

this modeling feels a little odd to me, boundary & shared_ . Something like this makes a lot of sense to me:

class SolidIsolationType(Enum):
    NONE = 1 # solids execute in the same process
    PROCESS = 2 # solids execute in separate processes on the same machine
    MACHINE = 3 # solids execute in separate (virtual) machines 
    # future example
    DYNAMIC = 4 # execution bundles happen on different machines, but solids in the same bundle execute in the same process

but im open to other ideas

python_modules/dagster/dagster/core/storage/default_io_manager.py
45–65
  • i think we should drop base_dir config here - if someone wants to control what dir it goes in they can just use fs_io_manager
  • can we just get the executor def property on to InitResourceContext so we can error earlier and not toggle behavior per load/store
python_modules/dagster/dagster/core/definitions/executor.py
43–46

In the dynamic case, do you have thoughts on how the IO manager would make a decision about whether to use memory vs. disk to store an output? E.g. if you want to use memory for storing outputs that will be consumed by solids in your bundle and disk for storing outputs that will be consumed by solids outside your bundle.

I'm not tied to the naming at all, but I was imagining that the ExecutionBoundaryType was a type for the set of edges coming out of an output, not for the solid. So that when we eventually support dynamic execution, the executor could supply a SHARED_PROCESS execution_boundary_type for outputs that will only be consumed by in-bundle solids and SHARED_FILESYSTEM (or SHARED_MACHINE or something) for outputs that will be consumed by some out-of-bundle solids.

python_modules/dagster/dagster/core/definitions/executor.py
43–46

Ah I see. I think there are two problems here:

  • the static, definition / init time handshake of execution and io managment to try to attempt to ensure things will work at runtime and provide a good product experience (ie current retry button gating when we know it wont work)
  • a future capability for dynamic io management for dynamic physical execution plans

I am skeptical about trying to use the same thing for both, as well as trying to model stuff for the latter without a more fleshed out prototype. I expect we will need some handshaking around the dynamic stuff at init time, in support of the implementation that fires at runtime to control each load/store

python_modules/dagster/dagster/core/storage/default_io_manager.py
65

Do you have advantages in mind for having the executor tell which IOManager to create?

I was mostly thinking about the error handling - if we initialize the right io manager instance inside the default_io_manager initialization, we can avoid the toggle logic at load/store time, so the error handling and stack trace could be much cleaner.

I guess my point was less of having executor tell which io manager to create here, but more about moving the io manager creation earlier and avoid the switch logic at run time.

i think this is directionally promising - but has too many risky externalities to land for tomorrows release.

It seems likely that this impl will not change behavior underneath people - so should be able to go out in a minor release

queueing to focus on stuff for the release

This revision now requires changes to proceed.Wed, Jul 7, 4:24 PM