Page MenuHomePhabricator

pyspark on EMR
ClosedPublic

Authored by nate on Nov 3 2019, 5:51 PM.

Details

Summary

Summary
Putting this up for discussion.

This is a (rough) cut at the thing @sashank, @max and I were discussing on Friday, where we use a decorator to express that a pyspark solid's compute function should behave differently when shipping compute to a remote EMR cluster.

Context
EMR must run pyspark workloads with yarn specified as the master URI. To run this way, Spark must have access to a local Hadoop/Spark/YARN install on the machine running spark-submit or calling SparkSession.builder.getOrCreate(), and this local installation must be properly configured with core-site.xml, etc. along with all other Hadoop XML files that define the DNS names and hosts in the YARN cluster.

This is easy when invoking a job directly on the EMR master (already configured), or through the EMR boto3 APIs, but not something you can set up locally (very difficult to get everything configured correctly, and fragile).

Because of this, there isn't a way to tell EMR to "run just this one solid"—EMR expects us to invoke spark-submit <options> foo.py either in a shell command on the master or via the EMR APIs, and it expects foo.py to be a self-contained pyspark application.

Given this, there is no obvious way to have pyspark solids structured like we've done historically, because the locally-constructed SparkSession won't be capable of interacting with a remote EMR cluster.

Options

We have a few options that I can think of so far:

  1. Open all requisite ports between local machine and EMR cluster for Spark/YARN/Hadoop to communicate. This is wildly insecure (YARN has no security, anyone can then submit work to the YARN cluster) so not a real option. This also would require all of the local Spark/YARN/Hadoop installation and XML configs, which is a huge burden to get configured right—imagine how you'd have to reconfigure everything for every cluster you talk to in the ephemeral case.
  2. Maintain SSH tunnels between the local host running dagster and EMR cluster. This is very fragile, as the job will immediately be killed if your networking changes, and annoying to maintain healthy SSH tunnels (I've been down this route, it's a pain). Moreover, it would require configuring the local installations as in (1).
  3. Give up on the tight dagster <> pyspark integration, and just have separate orchestration and compute—this is the norm today in Airflow, where you'd have a Pyspark operator that subprocess.Popens a spark-submit my-pyspark-code.py, and my-pyspark-code.py is a Python file that lives separately from the orchestration Python. EMR APIs effectively work the same, see an example here https://stackoverflow.com/a/37117296/11295366.
  4. Structure Pyspark solids so that they execute differently on different Spark targets. Locally, we can just assume we can grab a SparkSession and go; for submitting the work to EMR, we should wrap execution of the solid into a separate Python file and submit ourselves as a workload to the EMR APIs. EMR expects (1) a main Python file, and (2) a zip archive of associated Python code you'd like to distribute to the Spark workers in the cluster. The EMR APIs will then effectively shell out to spark-submit locally on the EMR master, where the Spark installation is configured, such that instantiating a SparkSession will work fine.

Notes
This diff implements a version of (4) above; tracking follow-up issues:

1. The overriding of compute behavior based on where we're executing feels janky, and it feels like there's a weird mix of mode/resource/compute override going on here.
2. The zip archive bundling seems prone to issues/fragile; I'm building a zip of the Python code I find in the current directory and shipping it to S3; I worry that the from ... import ... to gain access to the invoking pipeline will be easy to get wrong. -> will eventually support making this pluggable

  1. There's no guarantee we're going to be able to communicate back to the invoking Dagster instance on the host that kicked off the EMR job, rehydrate resources properly, or generally play nice with the rest of Dagster at all. Ideally we should probably assume we can't, and just use the EMR APIs in the invoking Dagster parent process to read back any metadata from the task execution via the EMR APIs.

4. This bundles setting up the environment and execution into one step, which should be broken apart.
5. Need to catch mode mismatches per Alex's comment -> no longer an issue given the switch to resource-based implementation

Test Plan

manual

Diff Detail

Repository
R1 dagster
Lint
Automatic diff as part of commit; lint not applicable.
Unit
Automatic diff as part of commit; unit tests not applicable.

Event Timeline

There are a very large number of changes, so older changes are hidden. Show Older Changes
nate added inline comments.Nov 11 2019, 7:43 PM
python_modules/libraries/dagster-aws/dagster_aws/emr/targets.py
109 ↗(On Diff #6416)

yeah I think so, I can't imagine anyone would create this file on their own machine for any reason? but I will document this more here and add a link to the EMR docs

python_modules/libraries/dagster-aws/dagster_aws_tests/emr_tests/test_pyspark.py
8–19

damn, I intended to deal with this last night and totally forgot about it. will think about how to handle this

nate edited the summary of this revision. (Show Details)Nov 11 2019, 7:44 PM
alangenfeld added inline comments.Nov 11 2019, 7:45 PM
python_modules/libraries/dagster-aws/dagster_aws_tests/emr_tests/test_pyspark.py
8–19

you might be able to do something cute with resources / required_resource_keys

nate edited the summary of this revision. (Show Details)Nov 11 2019, 7:45 PM
alangenfeld added inline comments.Nov 11 2019, 7:51 PM
python_modules/libraries/dagster-aws/dagster_aws_tests/emr_tests/test_pyspark.py
8–19

another consideration is how often do you want to declare the targets? should you have to do it at every @pyspark_solid?

nate planned changes to this revision.Nov 12 2019, 8:00 PM

Going to move this to resources

nate updated this revision to Diff 6490.Nov 13 2019, 12:50 AM

update to resource-based implementation

nate edited the summary of this revision. (Show Details)Nov 13 2019, 1:02 AM
nate updated this revision to Diff 6499.Nov 13 2019, 5:27 PM

fixes for using Spark configs, etc.

Nice, this reminds me of some of the lakehouse stuff - I sense a pattern will continue emerging around these higher-level solid/resource relationship setups.

There's no guarantee we're going to be able to communicate back to the invoking Dagster instance on the host that kicked off the EMR job, rehydrate resources properly, or generally play nice with the rest of Dagster at all. Ideally we should probably assume we can't, and just use the EMR APIs in the invoking Dagster parent process to read back any metadata from the task execution via the EMR APIs.

I wonder if we could execute the pyspark solid body in some sort of "clean room" env to ensure it doesn't try to reach out to any external resources. Might be a good test util in the least.

python_modules/libraries/dagster-pyspark/dagster_pyspark_tests/test_decorators.py
22–30

pyspark_resource arg could theoretically be handled by our so-far unused resource_mapper_fn utility. I don't think we've actually exposed it in our composition function api yet.

if we do want to keep this way to do it as well - probably best to name it a little more clearly pyspark_resource_key

@themissinghlink have you worked with pyspark much? Do you have thoughts here?

btw i plan on looking at this and will do so tmrw (friday) am

nate updated this revision to Diff 6559.Nov 14 2019, 9:07 PM

rebase

nate edited reviewers, added: alangenfeld, max, schrockn; removed: Restricted Project.Nov 19 2019, 4:23 AM

Reading your summary I definitely vote in favor of approach 4

do you want to land this stuff in a beta subfolder and remove the breaking changes?

alangenfeld requested changes to this revision.Nov 20 2019, 3:55 PM
alangenfeld added inline comments.
python_modules/libraries/dagster-aws/dagster_aws/emr/utils.py
68–70

where did elasticmapreduce come from? Should this key be configurable?

python_modules/libraries/dagster-pyspark/dagster_pyspark/resources.py
39

Default -> Local? just wondering if there is a more meaningful name to give this thing

python_modules/libraries/dagster-pyspark/dagster_pyspark_tests/test_decorators.py
22–30

^

This revision now requires changes to proceed.Nov 20 2019, 3:55 PM
nate updated this revision to Diff 6808.Nov 22 2019, 11:46 PM
nate marked 7 inline comments as done.

comments

python_modules/libraries/dagster-aws/dagster_aws/emr/utils.py
68–70

I'm just going to remove this until we're ready to build out a "real" log ingestion route here; the EMR and S3 APIs are annoyingly different in how they behave, and this function isn't called right now

python_modules/libraries/dagster-pyspark/dagster_pyspark/resources.py
39

yeah I was struggling with this too, agree Default feels questionable. Local would be a little bit of a misnomer because although you're ultimately kicking things off locally, it's entirely possible your local Spark installation is configured to talk to a remote cluster. Will think about it some more

python_modules/libraries/dagster-pyspark/dagster_pyspark_tests/test_decorators.py
22–30

Yes, pyspark_resource_key sounds right to me—updating. From spelunking around it looks like wiring through resource_mapper_fn all the way to the decorators is a bigger project, so I'll leave that out of this diff—but agree this would be a nice use case for it

nate updated this revision to Diff 6816.Nov 23 2019, 12:55 AM

fix test

nate added a comment.Nov 23 2019, 12:56 AM

@alangenfeld re:

do you want to land this stuff in a beta subfolder and remove the breaking changes?

with the latest round, this is now at a point where there are no breaking changes. I could go either way on the beta folder—let's discuss on Monday?

nate updated this revision to Diff 6853.Nov 25 2019, 2:22 AM

fix tests

schrockn requested changes to this revision.Nov 25 2019, 3:22 AM

This is getting very exciting and really starting to realize our vision of having solids be pure business logic layers where the dagster ecosystem helps out a *ton* with surrounding infrastructure. It's actually quite shocking how much work it is to pyspark running on EMR now that it is laid out all in one diff.

We are going to need to implement this soon:

https://github.com/dagster-io/dagster/issues/1937

python_modules/libraries/dagster-aws/dagster_aws/emr/utils.py
41

plopping in a dict like this works? i would imagine there has to be some escaping that needs to be done or something

51–52

seems potentially dangerous?

I would assume that on success execution the zip file gets deleted and it existing would indicate something went wrong.

54–61

I'm a bit confused how this is going to work. What about installed modules? Maybe I just don't understand pyspark

python_modules/libraries/dagster-aws/dagster_aws_tests/emr_tests/test_pyspark.py
14

a helper property spark_context might be nice instead of having to dot through spark_session and also remember that sparkContext is camelCase

python_modules/libraries/dagster-aws/setup.py
44

this dependency feels wrong. this will require anyone using aws for any reason to install spark transitively, which is quite expensive.

python_modules/libraries/dagster-pyspark/dagster_pyspark/decorators.py
57

shouldn't this be PySparkResourceDefinition rather than DefaultPySparkResource?

python_modules/libraries/dagster-pyspark/dagster_pyspark/resources.py
40

LocallyConfiguredPySparkResource? SystemPySparkResource?

This revision now requires changes to proceed.Nov 25 2019, 3:22 AM

I would have expected something more like described here: http://blog.thehumangeo.com/amazon-emr-spark-python3.html that uploads a requirements.txt and installs it

nate updated this revision to Diff 6872.Nov 25 2019, 9:41 PM
nate marked 7 inline comments as done.

comments

nate added inline comments.Nov 25 2019, 9:43 PM
python_modules/libraries/dagster-aws/dagster_aws/emr/utils.py
41

yeah I *think* Python will do the right thing with __repr__ - for the test included, this environment_dict serializes to:

environment_dict={'resources': {'pyspark': {'config': {'pipeline_file': '/Users/nate/src/dagster/python_modules/libraries/dagster-aws/dagster_aws_tests/emr_tests/test_pyspark.py', 'pipeline_fn_name': 'pipe', 'job_flow_id': 'j-J9WKVEQ27KTJB', 'staging_bucket': 'dagster-scratch-80542c2', 'region_name': 'us-west-1'}}}},

I can think about other test cases to try to exercise this more though, it definitely stands out to me as potentially fragile

51–52

yeah this shouldn't be here, vestigial leftovers from early testing—removing

54–61

Yeah this is gross but as far as I can tell this is what pyspark expects (along with requirements separately installed via requirements.txt). So:

# Folder: my_pyspark_project/
# a.py
def foo():
    print(1)

# b.py
def bar():
    print(2)

# main.py
from a import foo
from b import bar

foo()
bar()

Then, zip up my_pyspark_project/ as my_pyspark_project.zip. Run spark-submit --py-files my_pyspark_project.zip main.py and this will print 1, 2.

I added a comment including this, let me know if more would be helpful

python_modules/libraries/dagster-aws/dagster_aws_tests/emr_tests/test_pyspark.py
14

good call, done

python_modules/libraries/dagster-aws/setup.py
44

fair point, moved to extras_require

python_modules/libraries/dagster-pyspark/dagster_pyspark/decorators.py
57

yes, definitely should be, thanks for catching. I wasn't executing this code path in tests, added tests which cover this for both the default pyspark and the EMR pyspark case.

python_modules/libraries/dagster-pyspark/dagster_pyspark/resources.py
40

I like System__

schrockn accepted this revision.Nov 25 2019, 9:53 PM

This is looking like a great place to start. Seems like the location of the requirements file should be configurable to me

python_modules/libraries/dagster-aws/dagster_aws/emr/resources.py
123

seems like this should be configurable

This is really cool. I have a small nit, but you were right that main.py file is really cool!

python_modules/libraries/dagster-aws/dagster_aws/emr/utils.py
11

I think we this should throw this into a separate file and read this in. It makes it easier to read and maintain later. Just make it a main.py.template file

nate updated this revision to Diff 6904.Nov 26 2019, 12:59 AM

comments and other fixes

nate planned changes to this revision.Nov 26 2019, 1:01 AM

There are still some lingering issues w/ multiple pyspark solids on EMR; I need to debug more tomorrow

python_modules/libraries/dagster-aws/dagster_aws/emr/resources.py
123

yeah agree, made this a config

nate updated this revision to Diff 6906.Nov 26 2019, 3:50 AM

tests

This revision was not accepted when it landed; it landed in state Needs Review.Nov 26 2019, 4:05 AM
Closed by commit R1:1e02665ca7b2: pyspark on EMR (authored by nate). · Explain Why
This revision was automatically updated to reflect the committed changes.