Page MenuHomePhabricator

address-configurable-intermediate-storage-3 cross-run external_intermediates + pass by reference
AbandonedPublic

Authored by yuhan on Oct 7 2020, 12:33 AM.

Details

Summary

This diff enables passing data object by reference between solids (use Address created in D4579 as the internal representation)

The info we need to pass data (set and get intermediates)

  1. a write/read method (this diff)
  2. a path where the object will be loaded from or materialized to (D4579)
  3. the object

1) write/read method
write

  • object_store.set_object(path, obj, serialization_strategy)
  • dagster_type.materializer.materialize_runtime_values(context, config_value, obj)

read

  • object_store.get_object(path, serialization_strategy)
  • dagster_type.loader.construct_from_config_value(context, config_value)

so the processing priority could be

  1. use dagster_type.materializer/loader to handle set/get intermediates if config_value is either configured in solid_config (see _create_output_materializations) or provided through Output(value, address=Address(config_value)) (see _set_intermediates)
    • limitation 1: materializer and loader have to be paired -- i think pairing materializer and loader makes sense as they are essentially serialization and deserialization in serialization_strategy, which also means there's an opportunity to merge the serialization_strategy and materializer/loader.
    • limitation 2: materializer/loader is tied to dagster type. if a user wants to utilize materializer/loader, they would need to write their own custom dagster type, which would introduce boilerplate and doesn't seem ideal.
      • *will address this later in the stack to allow separating dagster type and materializer/loader. My rough thought is to put custom write/read method on Output or OutputDefinition [TODO 1]
  2. else, use object_store and Address(path) to handle intermediates, which will use serialization_strategy or default pickling as its write/read method. -- this also goes back to the previous point of potential refactor. but it doesn't seem critical so i'd like to punt to next iteration.

set/get intermediate logic in intermediate_storage would be

  • write(address, obj) -> set_object event either
    • object_store.set_object(path, obj, serialization_strategy) [TODO 2: user can specify which object_store to use per output/input]
    • or dagster_type.materializer.materialize_runtime_values(context, config_value, obj) (get config_value via solid_config.outputs or Output of the current step)
  • read(address) -> obj, get_object event either
    • object_store.get_object(path, serialization_strategy) [TODO 2: user can specify which object_store to use per output/input]
    • or dagster_type.loader.construct_from_config_value(context, config_value) (get config_value via solid_config.outputs or Output of the source step)

2) cross-run intermediates based on events
here we created new ObjectStoreOperation types:

  • ObjectStoreOperationType.SET_EXTERNAL_OBJECT when an object is set externally (outside the intermediate dir)
  • ObjectStoreOperationType.GET_EXTERNAL_OBJECT when an object is got externally (outside the intermediate dir)

we enabled cross-run intermediate storage using event logs (see instance.get_external_intermediates) to map step_output_handle->address and therefore enable pass by reference between runs (e.g. re-execution)

TODO

  1. Decouple dagster type and materializer/loader. My rough thought is to put custom write/read method on Output or OutputDefinition.
  2. Enable users to configure storage per output via object_store
    • Currently users can configure a system-level or pipeline-level storage, but they are not able to get out-of-box support if they want to store intermediates to different storage in the same pipeline run.
    • For example: a user wants to persist the output of solid_1 to local filesystem and upload the output of solid_2 to a dev s3 bucket.
Test Plan

bk

Diff Detail

Repository
R1 dagster
Lint
Lint OK
Unit
No Unit Test Coverage

Event Timeline

yuhan retitled this revision from input/output-address-operation-3 cross-run external_intermediates + pass by reference to address-configurable-intermediate-storage-3 cross-run external_intermediates + pass by reference.
yuhan edited the summary of this revision. (Show Details)
yuhan added reviewers: sandyryza, cdecarolis.
python_modules/dagster/dagster/core/execution/memoization.py
65–66

TODO

python_modules/dagster/dagster/core/storage/intermediate_storage.py
91–92

TODO

Harbormaster returned this revision to the author for changes because remote builds failed.Oct 7 2020, 12:50 AM
Harbormaster failed remote builds in B19241: Diff 23378!

set external_intermediates on intermediate storage at init time (immutability)

Harbormaster returned this revision to the author for changes because remote builds failed.Oct 7 2020, 6:22 AM
Harbormaster failed remote builds in B19248: Diff 23387!
yuhan requested review of this revision.Oct 7 2020, 6:41 AM