Page MenuHomePhabricator

pyspark on EMR
Needs ReviewPublic

Authored by natekupp on Sun, Nov 3, 5:51 PM.

Details

Reviewers
None
Group Reviewers
Restricted Project
Summary

Summary
Putting this up for discussion.

This is a (rough) cut at the thing @sashank, @max and I were discussing on Friday, where we use a decorator to express that a pyspark solid's compute function should behave differently when shipping compute to a remote EMR cluster.

Context
EMR must run pyspark workloads with yarn specified as the master URI. To run this way, Spark must have access to a local Hadoop/Spark/YARN install on the machine running spark-submit or calling SparkSession.builder.getOrCreate(), and this local installation must be properly configured with core-site.xml, etc. along with all other Hadoop XML files that define the DNS names and hosts in the YARN cluster.

This is easy when invoking a job directly on the EMR master (already configured), or through the EMR boto3 APIs, but not something you can set up locally (very difficult to get everything configured correctly, and fragile).

Because of this, there isn't a way to tell EMR to "run just this one solid"—EMR expects us to invoke spark-submit <options> foo.py either in a shell command on the master or via the EMR APIs, and it expects foo.py to be a self-contained pyspark application.

Given this, there is no obvious way to have pyspark solids structured like we've done historically, because the locally-constructed SparkSession won't be capable of interacting with a remote EMR cluster.

Options

We have a few options that I can think of so far:

  1. Open all requisite ports between local machine and EMR cluster for Spark/YARN/Hadoop to communicate. This is wildly insecure (YARN has no security, anyone can then submit work to the YARN cluster) so not a real option. This also would require all of the local Spark/YARN/Hadoop installation and XML configs, which is a huge burden to get configured right—imagine how you'd have to reconfigure everything for every cluster you talk to in the ephemeral case.
  2. Maintain SSH tunnels between the local host running dagster and EMR cluster. This is very fragile, as the job will immediately be killed if your networking changes, and annoying to maintain healthy SSH tunnels (I've been down this route, it's a pain). Moreover, it would require configuring the local installations as in (1).
  3. Give up on the tight dagster <> pyspark integration, and just have separate orchestration and compute—this is the norm today in Airflow, where you'd have a Pyspark operator that subprocess.Popens a spark-submit my-pyspark-code.py, and my-pyspark-code.py is a Python file that lives separately from the orchestration Python. EMR APIs effectively work the same, see an example here https://stackoverflow.com/a/37117296/11295366.
  4. Structure Pyspark solids so that they execute differently on different Spark targets. Locally, we can just assume we can grab a SparkSession and go; for submitting the work to EMR, we should wrap execution of the solid into a separate Python file and submit ourselves as a workload to the EMR APIs. EMR expects (1) a main Python file, and (2) a zip archive of associated Python code you'd like to distribute to the Spark workers in the cluster. The EMR APIs will then effectively shell out to spark-submit locally on the EMR master, where the Spark installation is configured, such that instantiating a SparkSession will work fine.

Notes
This diff implements a version of (4) above; tracking follow-up issues:

1. The overriding of compute behavior based on where we're executing feels janky, and it feels like there's a weird mix of mode/resource/compute override going on here.
2. The zip archive bundling seems prone to issues/fragile; I'm building a zip of the Python code I find in the current directory and shipping it to S3; I worry that the from ... import ... to gain access to the invoking pipeline will be easy to get wrong. -> will eventually support making this pluggable

  1. There's no guarantee we're going to be able to communicate back to the invoking Dagster instance on the host that kicked off the EMR job, rehydrate resources properly, or generally play nice with the rest of Dagster at all. Ideally we should probably assume we can't, and just use the EMR APIs in the invoking Dagster parent process to read back any metadata from the task execution via the EMR APIs.

4. This bundles setting up the environment and execution into one step, which should be broken apart.
5. Need to catch mode mismatches per Alex's comment -> no longer an issue given the switch to resource-based implementation

Test Plan

manual

Diff Detail

Repository
R1 dagster
Branch
pyspark_emr
Lint
Lint OK
Unit
No Unit Test Coverage

Event Timeline

natekupp created this revision.Sun, Nov 3, 5:51 PM
Harbormaster failed remote builds in B4956: Diff 6172!
natekupp edited the summary of this revision. (Show Details)Sun, Nov 3, 6:39 PM
natekupp added a reviewer: Restricted Project.
natekupp added subscribers: sashank, max.
natekupp updated this revision to Diff 6182.Mon, Nov 4, 5:55 PM
natekupp edited the summary of this revision. (Show Details)

up

max added a comment.Wed, Nov 6, 5:19 PM

I think this is a little overcomplicated -- we could let people select between spark targets using a selector config field on the solid, and if we want them to be able to add their own targets then expose an optional facility on the decorator to do that -- but ideally, most users should be able to write @pyspark_solid and let the config editor take it from there

python_modules/dagster/dagster/core/definitions/decorators.py
154 ↗(On Diff #6182)

why do we need this separate class? I feel like all of this could just be done using a config selector to pick between emr/local/...

natekupp updated this revision to Diff 6412.Sun, Nov 10, 11:50 PM

Taking another pass and cleaning things up

natekupp retitled this revision from RFC: pyspark on EMR to pyspark on EMR.Mon, Nov 11, 5:30 AM
alangenfeld added inline comments.
python_modules/libraries/dagster-aws/dagster_aws/emr/targets.py
109 ↗(On Diff #6416)

is this sufficient?

python_modules/libraries/dagster-aws/dagster_aws_tests/emr_tests/test_pyspark.py
8–19

when would we catch incompatible mode names ?

natekupp added inline comments.Mon, Nov 11, 7:43 PM
python_modules/libraries/dagster-aws/dagster_aws/emr/targets.py
109 ↗(On Diff #6416)

yeah I think so, I can't imagine anyone would create this file on their own machine for any reason? but I will document this more here and add a link to the EMR docs

python_modules/libraries/dagster-aws/dagster_aws_tests/emr_tests/test_pyspark.py
8–19

damn, I intended to deal with this last night and totally forgot about it. will think about how to handle this

natekupp edited the summary of this revision. (Show Details)Mon, Nov 11, 7:44 PM
alangenfeld added inline comments.Mon, Nov 11, 7:45 PM
python_modules/libraries/dagster-aws/dagster_aws_tests/emr_tests/test_pyspark.py
8–19

you might be able to do something cute with resources / required_resource_keys

natekupp edited the summary of this revision. (Show Details)Mon, Nov 11, 7:45 PM
alangenfeld added inline comments.Mon, Nov 11, 7:51 PM
python_modules/libraries/dagster-aws/dagster_aws_tests/emr_tests/test_pyspark.py
8–19

another consideration is how often do you want to declare the targets? should you have to do it at every @pyspark_solid?

natekupp planned changes to this revision.Tue, Nov 12, 8:00 PM

Going to move this to resources

natekupp updated this revision to Diff 6490.Wed, Nov 13, 12:50 AM

update to resource-based implementation

natekupp edited the summary of this revision. (Show Details)Wed, Nov 13, 1:02 AM
natekupp updated this revision to Diff 6499.Wed, Nov 13, 5:27 PM

fixes for using Spark configs, etc.

Nice, this reminds me of some of the lakehouse stuff - I sense a pattern will continue emerging around these higher-level solid/resource relationship setups.

There's no guarantee we're going to be able to communicate back to the invoking Dagster instance on the host that kicked off the EMR job, rehydrate resources properly, or generally play nice with the rest of Dagster at all. Ideally we should probably assume we can't, and just use the EMR APIs in the invoking Dagster parent process to read back any metadata from the task execution via the EMR APIs.

I wonder if we could execute the pyspark solid body in some sort of "clean room" env to ensure it doesn't try to reach out to any external resources. Might be a good test util in the least.

python_modules/libraries/dagster-pyspark/dagster_pyspark_tests/test_decorators.py
22–30

pyspark_resource arg could theoretically be handled by our so-far unused resource_mapper_fn utility. I don't think we've actually exposed it in our composition function api yet.

if we do want to keep this way to do it as well - probably best to name it a little more clearly pyspark_resource_key

@themissinghlink have you worked with pyspark much? Do you have thoughts here?

btw i plan on looking at this and will do so tmrw (friday) am