Page MenuHomePhabricator

Implements Flyte Type System mapping
ClosedPublic

Authored by jordanbramble on Wed, May 13, 12:51 AM.

Details

Summary

This currently implements type system support for scalar types on inputs and outputs. This also adds a dependency on a metaprogramming library called forge, that feels a bit out of date. However, this will get us to a demo faster I believe.

This executes multi-step DAGs in Flyte successfully and captures the inputs and output results.

Test Plan

First, run Flyte

  1. $ cd python_modules/libraries/dagster-flyte/examples
  2. $ make docker_build
  3. $ docker run --network host -e FLYTE_PLATFORM_URL='127.0.0.1:30081' {{ your docker image }} pyflyte -p dagstertest -d development -c sandbox.config register workflows

Diff Detail

Repository
R1 dagster
Branch
flyte_inputs_outputs (branched from master)
Lint
Lint OK
Unit
No Unit Test Coverage

Event Timeline

There are a very large number of changes, so older changes are hidden. Show Older Changes
jordanbramble marked 3 inline comments as done.

addresses feedback, uses key intead of display_name for type hashing

jordanbramble added inline comments.Fri, May 15, 7:27 PM
python_modules/libraries/dagster-flyte/dagster_flyte/flyte_compiler.py
244–249

should we throw an error if a result is returned that is not in the step_output_dict, or just ignore?

252–257

Yes, if you add inputs or outputs to the task, it will throw an error if those aren't also specified in the signature. So using *args, **kwargs is not sufficient here.

I think using forge is fine for a demo, but I should probably re-implement since this library hasn't been worked on in 2 years.

python_modules/libraries/dagster-flyte/dagster_flyte_tests/test_dagster_flyte.py
11–23

type annotations are required until we come up with an approach for supporting _Any type. Flyte currently does not have an analogous type. This will currently throw an error that Any type cannot be used with Flyte at this time

I'm mean it used to work. Seems like we should build on top of that rather than ripping it out

schrockn requested changes to this revision.Fri, May 15, 8:06 PM

Summarize our convo:

  1. Need to call execute plan, not the execution function
  2. Should support any type in dagster, and bypass the flyte type system
  3. Always persist everything to our intermediate store, for simplicity.

Critical to be able to demonstrate dagit side-by-side with flyte in a prod context.

This revision now requires changes to proceed.Fri, May 15, 8:06 PM

Summarize our convo:

  1. Need to call execute plan, not the execution function
  2. Should support any type in dagster, and bypass the flyte type system
  3. Always persist everything to our intermediate store, for simplicity.

Critical to be able to demonstrate dagit side-by-side with flyte in a prod context.

to be clear on #2, you're not saying to bypass the flyte type system entirely, which I was doing before. You're just saying for non-scalar types, we should preserve original functionality and bypass, correct? Ketan has said that we must have type system on scalars for a demo, because caching is one of their top use cases. If you just mean, still having a way to support non-scalar types by defaulting to what I used to do, I can totally do that.

Roger that on 1, 3.

addresses type system feedback, allows definition of arbitrary DAG, as well as inputs from multiple outputs

jordanbramble marked an inline comment as done.Wed, May 20, 8:49 AM
jordanbramble added inline comments.
python_modules/libraries/dagster-flyte/dagster_flyte/flyte_compiler.py
74

Flyte requires a type for the output of every task but has a separate concept of Outputs resulting at the end of a workflow. I currently have every task generate an Output.

229

WIP. Still enabling use of the intermediate store/manger w/ the execution_plan_iterator.

258

I believe this is easy to handle.

273

This is important to call out. When a StepInput has a source type of Multiple Outputs, the StepOutputHandles in the source_handles can correspond to StepOutputs with different dagster_types. AFAICT, Flyte requires you to specify this as [Types.Integer]. This requires all elements of the list to have the same type.

We can definitely improve upon this. For now I am requiring the StepOutputs to be of the same type in order to use them with source_type of Multiple_Outputs as input to another task.

367

Flyte does not seem to have a Union type at this time that would make this possible.

python_modules/libraries/dagster-flyte/dagster_flyte_tests/test_dagster_flyte.py
36

would be good to test this with lambda_solid as well.

removes raising an error on caching without flyte types

  1. This should be able to compile arbitrary DAGs into a corresponding representation in Flyte.
  2. This also works with solids that have StepInputs with source_types from Multiple Outputs.
  3. There are a few type system quirks addressed in comments.
  4. I have addressed the Any type feedback. However see my note on this below.
  5. I am planning to have the execution_plan and Intermediate store work done Wednesday night.

Currently, if a dagster_type cannot be mapped to a Flyte type, then we don't add that input/output to the task/workflow. Once the execution plan and Intermediate store feedback is addressed this will bypass the flyte type system and rely purely on what is contained in the execution plan in those cases. However, I think there is a decent argument for requiring a user to use type annotations. The argument is basically, a. flyte/flytekit users already have to type everything and b. these typed input/outputs are what the caching and versioning rely on (which is a very important use case according to Ketan.), though my current implementation should work fine.

Another thing that I haven't quite figured out. Flyte tasks should be independently executable. If we rely 100% on Flyte's type system for inputs and outputs, and use the ExecutionStep's compute_fn, then we can achieve this pretty easily. Though, we would lose the use of the DagitUI (for now). It seems that I will need to recreate some of the logic in inner_plan_iterator while having the wrapper invoke something similar to _dagster_event_sequence_for_step in execute_plan.py, after the inputs have been entered into the intermediate store. I don't think we will be able to independently execute these in that case though?

In order for DagitUI to work with Flyte, we will need to run the Dagit server inside of the container that is sent to Flyte correct, or have the pipeline write to an external event log correct?

jordanbramble added inline comments.Wed, May 20, 9:35 AM
python_modules/libraries/dagster-flyte/dagster_flyte/flyte_compiler.py
357

we should arguably pull some of these functions out of the FlyteCompiler and into utility functions.

schrockn requested changes to this revision.Wed, May 20, 2:51 PM

q mgmt. have some style feedback but big thing is getting execute_plan invoked rather than the compute fn

python_modules/libraries/dagster-flyte/dagster_flyte/flyte_compiler.py
43–49

generally prefer not doing real work in init but nbd. would recommend construction the execution plan elsewhere (e.g. in __call__)

207–213

req'ing changes to do queue management. woud like to get this implemented in terms of execution plan execution and then get this merge where we can iterate

280–283

recommend multiple except blocks instead of isinstance check

340–347

a little odd that the inputs dictionary that you manage contains both Inputs and StepOutputHandles. seems like it should be two separate dicts that builds the single flyte dict

python_modules/libraries/dagster-flyte/dagster_flyte_tests/test_dagster_flyte.py
36

lambda_solid is not materially different don't worry about it

69

if you name "context" "_" you don't have to do the pylint warning

This revision now requires changes to proceed.Wed, May 20, 2:51 PM
jordanbramble marked an inline comment as done.

use execute plan and intermediates manager - save

jordanbramble marked 4 inline comments as done.Fri, May 22, 8:37 PM
jordanbramble added inline comments.
python_modules/libraries/dagster-flyte/dagster_flyte/flyte_compiler.py
184

@schrockn Flyte expects to be able to parameterize a task with multiple values of an input (through pyflyte or the Flyte console). For User inputs, that originate from config (environment_dict, etc), we cannot overwrite these in the StepInputs, since the config_data dict is ReadOnly. Would it make since to create functionality for StepInput to update the values of its config data? If not, what might be another solution to enable this functionality in Flyte?

jordanbramble marked 2 inline comments as done.Fri, May 22, 8:37 PM

I am still testing this on multi step pipelines on Flyte infra. Expect a few more updates.

jordanbramble added inline comments.Fri, May 22, 9:51 PM
python_modules/libraries/dagster-flyte/dagster_flyte/flyte_compiler.py
113

It looks like I can delete this, and replace with step_context.intermediates_manager.set_intermediate(). Will update this.

ads dagit to setup.py - save

remove unnecessary intermediates_manger creation - save

make flyte tox runs more verbose

remove dagster_aws from setup.py

  • running on AWS KOPS cluster
  • update output_name in compiler
  • remove test file
  • defaultdict for compute_dict
  • removes ECR specific config
jordanbramble edited the summary of this revision. (Show Details)Mon, May 25, 6:16 AM

I have this working with multi-step DAGs using execute plan while retrieving and storing values in the intermediate store.
I will test this with an S3 Intermediate Store. We should also figure out an architecture for allowing config data for StepInputs to be overridden by Flyte.

python_modules/libraries/dagster-flyte/dagster_flyte/flyte_compiler.py
184

I think this is the final thing we should add here.

304

Still debugging. This works fine with filesystem storage but the intermediate store values are empty when this is used for InMemory storage.

jordanbramble edited the summary of this revision. (Show Details)
  • fix docker_build.sh to pass CI
schrockn accepted this revision.Tue, May 26, 1:59 AM

cool. this is a great start. let's get this merged. really excited to chat with the flyte team this week!

python_modules/libraries/dagster-flyte/README.md
36

Flyte

python_modules/libraries/dagster-flyte/dagster_flyte/flyte_compiler.py
187–194

👍🏻

203

can you comment about what the "in kwargs" clause does here

also wrapping this in a function that has a helpful name would make this more clear

In reality it looks like these conditions look overly defensive and in fact may mask bugs. They look more like invariants to me

This revision is now accepted and ready to land.Tue, May 26, 1:59 AM
  • move task output assignment into seperate function
jordanbramble marked an inline comment as done.Tue, May 26, 6:30 AM
jordanbramble added inline comments.
python_modules/libraries/dagster-flyte/dagster_flyte/flyte_compiler.py
203

good feedback, this is masking bugs. I've removed it and broken out the logic into a separate function. if the output is contained in the step_output_dict but not in kwargs, then we should just fail with keyError. Flyte requires you to specify both the inputs and the outputs in the function signature.

This revision was automatically updated to reflect the committed changes.