This diff enables passing data object by reference between solids (use Address created in D4579 as the internal representation)
The info we need to pass data (set and get intermediates)
- a write/read method (this diff)
- a path where the object will be loaded from or materialized to (D4579)
- the object
1) write/read method
write
- object_store.set_object(path, obj, serialization_strategy)
- dagster_type.materializer.materialize_runtime_values(context, config_value, obj)
read
- object_store.get_object(path, serialization_strategy)
- dagster_type.loader.construct_from_config_value(context, config_value)
so the processing priority could be
- use dagster_type.materializer/loader to handle set/get intermediates if config_value is either configured in solid_config (see _create_output_materializations) or provided through Output(value, address=Address(config_value)) (see _set_intermediates)
- limitation 1: materializer and loader have to be paired -- i think pairing materializer and loader makes sense as they are essentially serialization and deserialization in serialization_strategy, which also means there's an opportunity to merge the serialization_strategy and materializer/loader.
- limitation 2: materializer/loader is tied to dagster type. if a user wants to utilize materializer/loader, they would need to write their own custom dagster type, which would introduce boilerplate and doesn't seem ideal.
- *will address this later in the stack to allow separating dagster type and materializer/loader. My rough thought is to put custom write/read method on Output or OutputDefinition [TODO 1]
- else, use object_store and Address(path) to handle intermediates, which will use serialization_strategy or default pickling as its write/read method. -- this also goes back to the previous point of potential refactor. but it doesn't seem critical so i'd like to punt to next iteration.
set/get intermediate logic in intermediate_storage would be
- write(address, obj) -> set_object event either
- object_store.set_object(path, obj, serialization_strategy) [TODO 2: user can specify which object_store to use per output/input]
- or dagster_type.materializer.materialize_runtime_values(context, config_value, obj) (get config_value via solid_config.outputs or Output of the current step)
- read(address) -> obj, get_object event either
- object_store.get_object(path, serialization_strategy) [TODO 2: user can specify which object_store to use per output/input]
- or dagster_type.loader.construct_from_config_value(context, config_value) (get config_value via solid_config.outputs or Output of the source step)
2) cross-run intermediates based on events
here we created new ObjectStoreOperation types:
- ObjectStoreOperationType.SET_EXTERNAL_OBJECT when an object is set externally (outside the intermediate dir)
- ObjectStoreOperationType.GET_EXTERNAL_OBJECT when an object is got externally (outside the intermediate dir)
we enabled cross-run intermediate storage using event logs (see instance.get_external_intermediates) to map step_output_handle->address and therefore enable pass by reference between runs (e.g. re-execution)
TODO
- Decouple dagster type and materializer/loader. My rough thought is to put custom write/read method on Output or OutputDefinition.
- Enable users to configure storage per output via object_store
- Currently users can configure a system-level or pipeline-level storage, but they are not able to get out-of-box support if they want to store intermediates to different storage in the same pipeline run.
- For example: a user wants to persist the output of solid_1 to local filesystem and upload the output of solid_2 to a dev s3 bucket.