Page MenuHomePhabricator

Lakehouse renovation

Authored by sandyryza on May 13 2020, 8:58 PM.



A rough start. simple_pyspark_lakehouse/ and simple_pyspark_lakehouse/ are the places to look to get a flavor for the API.

What a lakehouse is
A lakehouse is composed of:

  • Assets, each of which addresses an object in some durable store.
  • ComputedAssets, which are Assets with functions to derive them from other artifacts.
  • Storage defs, which are ResourceDefinitions, each of which defines a durable store that artifacts can live in.
  • TypeStoragePolicies, each of which defines how to translate between a storage def and an in-memory type.
  • PresetDefinitions

Some differences between the lakehouse model and the vanilla dagster model:

  • Unlike solids, assets know where they live.
  • Unlike solids, assets know what other artifacts they depend on - there's no separate step of hooking up inputs to outputs.
  • Unlike solids, assets don't know how to save or load their inputs or outputs. Saving and load are a separate layer.

The biggest piece that's missing from this revision is asset typing and metadata, e.g. defining the columns on a table artifact.

Interop with solids.
This PR includes an experimental "SolidAsset" that lets an Asset be populated via a solid. This was inspired by trying to get bay_bikes working on lakehouse/

Solid/asset-level I/O configurability
The lakehouse makes the storage definitions and TypeStorageAdapters responsible for all decisions about how to persist artifacts, and leaves no agency to individual solids in configuring where their inputs and outputs live.

If we can hold this line, I think it makes everyone's life a whole lot simpler. A big advantage of a lakehouse is cutting down on tables named things like "users_sandy_test_7".

That said, in development, it's often desirable for a pipeline to get its inputs from a production environment, but write its outputs / intermediates to a development environment. It might be worth supporting this use case directly, e.g. by enabling users to specify a parent lakehouse_environment when generating pipelines.

Test Plan

none yet

Diff Detail

R1 dagster
Automatic diff as part of commit; lint not applicable.
Automatic diff as part of commit; unit tests not applicable.

Event Timeline

There are a very large number of changes, so older changes are hidden. Show Older Changes
Harbormaster returned this revision to the author for changes because remote builds failed.May 20 2020, 5:50 PM
Harbormaster failed remote builds in B11626: Diff 14293!
Harbormaster returned this revision to the author for changes because remote builds failed.May 21 2020, 12:59 AM
Harbormaster failed remote builds in B11693: Diff 14378!
  • get examples tests passing
Harbormaster returned this revision to the author for changes because remote builds failed.May 21 2020, 4:41 PM
Harbormaster failed remote builds in B11718: Diff 14405!
Harbormaster returned this revision to the author for changes because remote builds failed.May 21 2020, 5:23 PM
Harbormaster failed remote builds in B11731: Diff 14418!
  • artifact -> asset
  • simplify environments
  • cleanup
  • take out fancy type annotations
  • saver_loader -> adapter
  • tests and doc
  • get examples tests passing
  • remove from Dockerfile
  • simple_pyspark_lakehouse -> simple_lakehouse
  • messages for saver_loader check invariants
  • test_house
  • get example working
Harbormaster returned this revision to the author for changes because remote builds failed.May 21 2020, 7:06 PM
Harbormaster failed remote builds in B11744: Diff 14436!
  • black and update examples snapshots


  • derived assets --> computed assets
  • instead of kwargs_deps call it input_assets. may also change type signature
  • eliminate environment. instead have a helper function that creates a mode and a preset simultaneously. create_lakehouse_preset or similiar

consider storage policy of instead of storage adapter

ya i think those last few naming bits are the only issues i can see

This revision now requires changes to proceed.May 22 2020, 5:38 PM

nick and alex naming feedback

Ok this is looking awesome. Most of my feedback is pretty minor. I'm req'ing changes mostly to get two questions answered:

  1. I'm confused about the PythonObjectDagsterType. Want to make sure i understand the system.
  2. Concerned about intermediates with your autogenerated composite.

Thanks for bearing with all the feedback. So excited about this direction.


so input_assets get correlated to arguments just via ordering i presume


Field is superfluous btw. Can just be

@resource(config={'bucket': str, 'prefix': str})


Actually let me use this new crazy suggest edit feature?!


checks could be nice throughout



can you make a comment as to what is going on here?


could be interesting to push boto3 interactions behind a resource for testability


not sure how i feel about operator overloading for this. Seems like explicitness would be better. Unless I am missing something


why this invariant? seems like it could be an arbitrary type?


I think this would be more clear if you stored the sole output definition (e.g. self._output_def) in __init__. I was a bit confused until I went back up to __init__ and saw the invariant around number of outputs.


could be nice to have this incantation in This is the third instance of this in the codebase


ah here's the answer to my question. this is worth documenting for sure. makes a bunch of sense though. I think this is a good compromise


a concrete code sample in docs would be very useful


hmmm seem to be generating a lot of intermediates


So this is this going to be a dataframe or a pointer to a dataframe? If this is the dataframe itself than the use of the composite I think is problematic because we will be writing out the dataframes a lot


output_saver seems like a vestige of old naming

This revision now requires changes to proceed.May 26 2020, 2:34 AM

@schrockn thanks for all the feedback. All reasonable. Regarding your main questions - it looks like they're about the SolidAsset, which I threw in on Friday and is a bit half-baked. It could make most sense to leave that for a separate revision?

Answers to your questions though:

I'm confused about the PythonObjectDagsterType. Want to make sure i understand the system.

With PythonObjectDagsterType, we can get the Python type from dagster type and verify at definition-time that we have a TypeStoragePolicy for it (which is what we do for ComputedAssets). This is of course limiting - e.g. means we can't use PandasDataFrameDagsterType. Probably more consistent with the rest of Dagster not to be that strict - I'll make it more permissive unless you object.

Concerned about intermediates with your autogenerated composite.

Yeah, I had that concern as well. Do you have any thoughts on an approach? I considered introspecting the solid's compute_fn and constructing a new one, but that wouldn't work for composite solids. As you brought up, if we could pass around a pointer, that would help, but Spark doesn't make that easy (DataFrames are backed by Java objects so pretty sure can't be pickled). A third idea would be to give users some utilities that make it easy to define a solid that writes to a Lakehouse storage using a Lakehouse TypeStoragePolicy - this would be more performant but a little less elegant.

It feels like, in an ideal world, solid boundaries wouldn't always need to require materializations? I think I remember you or Alex bringing this up before - would require some way for users to dictate that a subdag of solids all execute in the same process. Obviously out of scope for this PR, but if that's on the horizon it could solve this problem.


Oops this is a relic from some wacky sugar I was playing around with. Taking it out.

sandyryza marked 10 inline comments as done.
  • more lakehouse

remove unrelated changes and fix tests

Ok great. make sure to look at (and ideally test) that bug where (i think) you are assigning a tuple to a string


so we are making lakehouse py3 only?


not sure we want be assigning to so many globals in an example


eliminate duplicate in docs


eek this looks like a nasty bug

sandyryza added inline comments.

as discussed on slack, will handle py2 support in a separate revision.


wrapped in a function


It's a subtle non-dupe - the first gives input_assets as a list, and the second gives it as a dict.


if I'm not misunderstanding what you observed, I believe this is doing the right thing - path is a Tuple[str, ...], which is checked in ComputedAsset's __init__. lakehouse/ tests both specifying a path and not specifying a path.

arguably it might be simpler for path to be a string - the reason it's a tuple is so that the storage definition can decide how it resolves to a path in the specific storage - e.g. databases do namespacing differently than filesystems do.




oic .maybe call it path_tuple or something

sandyryza marked an inline comment as done.

fewer globals

@alangenfeld you requested changes on an earlier version - do you still have any reservations?

fine place to start - some edges to polish as we use it a bit and understand the ergonomics better


if you had a long set of assets & corresponding inputs ordering only could be a little dicey

wonder if some amount of prefix match would be a reasonable restriction


how does this look in dagit with these __ prefixed names?

This revision is now accepted and ready to land.May 27 2020, 9:44 PM

input_assets accepts either a list or a dict. once you have a lot, a dict is probably more clear/less error-prone.


This revision was automatically updated to reflect the committed changes.