Page MenuHomePhabricator

Flytekit Proof of Concept

Authored by jordanbramble on Apr 13 2020, 10:06 PM.



For an architecture diagram that explains the logic behind the DagsterFlyteSdkWorflow class please review.

This diff implements a first pass of the class shaded in blue that inherits from a flytekit workflow. I have done this in an object oriented fashion, but provided a function that encapsulates everything. Please let me know if people would prefer something less object oriented here.

Currently, this iteration bypasses Flyte's type system as well as their Input + Output promises. To that end, this will only work when all of the input and output config is provided adequately via Dagster. However, I believe so long as we do this, then Flyte can run arbitrary configurations of our DAGs.

In a future diff we can map the input/output promises as well as types. The flyte team is willing to offer some guidance on working with their type system.

Test Plan

pytest python_modules/libraries/dagster-flyte/dagster_flyte_tests/

In order to test in kubernetes do the following:

  1. Follow the Installation guide here.
  2. minikube service --alsologtostderr -v=3 contour -n heptio-contour
  3. minikube ssh
  4. You will need to install git and potentially a text editor on your minikube instance.
  5. Copy up the examples directory contained in the docker-flyte library.
  6. curl -X POST localhost:30081/api/v1/projects -d '{"project": {"id": "dagstertest", "name": "dagstertest"} }'
  7. make docker_build
  8. run docker image ls and grab the image id of the image you just built.
  9. docker run --network host -e FLYTE_PLATFORM_URL='' {{ your docker image }} pyflyte -p dagstertest -d development -c sandbox.config register workflows
  10. Navigate to , find your workflow and launch it!

Diff Detail

R1 dagster
Automatic diff as part of commit; lint not applicable.
Automatic diff as part of commit; unit tests not applicable.

Event Timeline

There are a very large number of changes, so older changes are hidden. Show Older Changes

flytekit version 0.6.2

removes rogue typehint

calls super in a python2.7 compatible way

add a CI test, that builds an example flyte container

jordanbramble added inline comments.Apr 28 2020, 6:41 PM

In a future diff, I will figure out something better here


I think the thing to do here is replace the wrapper function, with a class that inherits from SdkRunnableTask.

tabs instead of spaces in makefile :(

newline between commans

nate added inline comments.Apr 29 2020, 12:04 AM

hmmm what's this about?


Can we expose these SdkRunnableTask args to a Dagster user somehow? most likely should be done using solid tags to avoid any API changes on our side


Can you add this explanation as a comment in a docstring for this function?

Also work using dagster.check_* to ensure the arguments to this function are as expected


yeah, see .buildkite/images/docker/test_project/Dockerfile - we should parametrize the Docker build for different versions of Python we use in CI (may need to skip py2 builds if flyte doesn't support py2)

If it's possible to re-use that image, would be great to just add some of this stuff to that image and use one, so that we can avoid doing another image build during CI


how does one vary this config for different environments? presumably we wouldn't be using minio in a prod context?

2 ↗(On Diff #12779)

we usually just use and not requirements, so you can put this requirement there—also, if we can leave the dep unpinned (or at least >=) that'd be preferable, so that we can be compatible w/ a range of flyte versions

jordanbramble added inline comments.Apr 29 2020, 3:43 AM

long term what I would like to do is have a flyte cluster running in GKE, that we can register as part of CI.


Flytekit does access the name of the returned callable in order to construct a representation. Since, I can't reasonably dynamically generate the name and signature of the function to be unique, based on the step key, I decided to reset the name.

Would it be helpful if I leave a comment here?


Good Idea, I think we could. Would you like to see that in this PR, or in a follow up?


I would expect a user to manually create this file for the specific workflow they are building. I provided this specific file as an example. It is not needed for CI (though I would like it to be in the future) and is interacted with via the pyflyte CLI.

adds instance checks to flyte compiler. Removes requirements.txt, and utilizes


In a future diff, I think the thing to do is replace this closure with an instantiation of a DagsterFlyteRunnable, that inherits from SdkRunnable and sets the name based on the step key.

jordanbramble retitled this revision from adds WIP bridge class for creating a flyte workflow that can be registered with a unit test that verifies class instantiation. to Flytekit Proof of Concept.Apr 29 2020, 4:00 AM
jordanbramble edited the summary of this revision. (Show Details)
schrockn requested changes to this revision.Apr 29 2020, 7:56 PM

Definitely a good start and thanks for setting up the CI.

let's definitely get that flytekit dep out of the core dagster dev-requirements file

16 ↗(On Diff #12819)

shouldn't need this here


maybe I don't understand the SdkWorkflow lifecycle well, but it definitely feels off to invoke all this heavyweight compute in the __init__

Look likes you need the execution_plan but I would defer the call to create_plan_iterator for sure

This revision now requires changes to proceed.Apr 29 2020, 7:56 PM

creates execution_plan and iterator when needed

  • exclude tests
  • install flytekit
add doc string
install dagster-flyte in make install_dev_python_modules
jordanbramble marked 4 inline comments as done.Apr 30 2020, 5:45 PM
jordanbramble added inline comments.

I have taken some of this out of the init, and moved it into get_flyte_sdk_workflow.

What may make sense is to also:

  1. remove the call to super().init()
  2. remove inheritance from SdkWorkflow
  3. create a build function that can be called by the user on demand
  4. build returns a SdkWorkflow
schrockn requested changes to this revision.Apr 30 2020, 5:53 PM

lifecycle for the plan iterator is still weird.


we should just create the plan iterator in here do a for loop over it

This revision now requires changes to proceed.Apr 30 2020, 5:53 PM
jordanbramble marked an inline comment as done.
moves create_plan_iterator call into get_sdk_node. move the set up and construction of SdkWorkflow into __call__
jordanbramble marked an inline comment as done.May 1 2020, 1:35 AM
schrockn accepted this revision.May 1 2020, 1:45 AM

cool. let's merge on master and continue to iterate


ok for now but obviously we will need a real instance in order for this to work


let's get this merged, but i'm pretty sure the right place for hte create_plan_iterator call will be within the wrapper

This revision is now accepted and ready to land.May 1 2020, 1:45 AM
add dagster-flyte to build kite install test. parameterize python version in flyte example container
jordanbramble added inline comments.May 1 2020, 2:48 AM

I may be missing something here. How would that work? If we expect solids to map roughly to python_tasks then we should have 1 python_task per solid. So if we are iterating through the execution_plan_iterator, then each python task will need to do its own work on that correct? Wrapper is the function that Flyte will execute, so if we are recreating it on each invocation of wrapper, then won't Flyte end up stepping through the same steps repetitively instead of doing the work of the solid associated with that particular python_task?

remove dagster-flyte untill it is in PyPI
jordanbramble marked an inline comment as done.May 1 2020, 2:57 AM
jordanbramble added inline comments.

I have parametrized this to run with different versions of Python. I think that this should be separate from the test images because this is meant to build a specific SdkWorkflow example, and eventually I would like CI to register to a k8s cluster. I am only running 3.6 in CI for now, in order to save build time.

jordanbramble marked an inline comment as done.

Only build flyte example once, for python 3.6

jordanbramble added inline comments.May 1 2020, 3:15 AM

Ahh, I noticed this same container was building every time for all versions of python. I've instead updated the pipeline to append this one single example build in 3.6 to the tests. This should speed up the build quite a bit I hope.

build_docker -> docker_build
This revision was automatically updated to reflect the committed changes.