This is basically a complete overhaul of how asset lineage works. Names and organization are a little messy around the edges at the moment, but this is just a proof of concept.
Organizing the code this way has two major benefits:
- all the logic for determining the parents of what I'm calling an "AssetLineageNode" (for now, these are just solid inputs/outputs, but in the future could include things generated within the solid body) now lives in the same place, this AssetDependencyGraph class.
- transitive lineage is now possible to calculate through arbitrarily many links (even with dynamic outputs in the mix! even with execution happening in multiple processes!). those last two bits were the hard parts.
If we only ever needed to calculate transitive lineage on a pipeline that is being executed in a single process, things are actually fairly simple. As the pipeline runs, we build up this AssetDependencyGraph by adding step inputs/outputs as we encounter them.
When adding a step_input, we first check to see if it encodes any direct lineage info (basically saying "I am reading from X asset"). If it is, we store that as the lineage info for the input node. Then, we look at all the upstream output nodes for this input node (which we know will already have been executed because this pipeline is executed in topo order), and union their lineage as the set of dependencies for this node.
When adding a step_ouput, we again check if it encodes any direct lineage info (saying "I am writing to X asset"). We perform a similar process as for the input_node, but this time the parent nodes for the output are all of the input_nodes for the step that the output resides in (this is where we can swap out this logic to something more customizable). Because we process inputs before outputs, all the parent nodes are guaranteed to exist in the AssetDependencyGraph when we look them up.
Now, whenever we want to know what set of assets any node in the graph depends on (as long as it has been processed), we can just do a lookup.
The problem comes in when you are executing steps in multiple different processes. Obviously, they can't share this in-memory AssetDependencyGraph. However, all a step really needs in order to calculate its lineage is the lineage of the upstream outputs. So as long as we can somehow pass along the upstream output lineage for any given step, we can reconstruct enough of the AssetDependencyGraph to make things work. This is where KnownState comes in. Right now, I'm storing the propagated lineage information of every processed output node in this known state object. This is actually overkill, and with a bit of cleverness, we can delete things from this state as upstream outputs become irrelevant (concretely, when all of their downstream inputs have been processed). This optimization should ensure that the object remains a reasonable size (might be a little rough to be serializing thousands of these lineage info things).
I added some fields to the handled_output event in order to allow the ActiveExecution object to build this information up, but it would also be possible to move this into its own separate event.