pyspark on EMR


pyspark on EMR

Putting this up for discussion.

This is a (rough) cut at the thing @sashank, @max and I were discussing on Friday, where we use a decorator to express that a pyspark solid's compute function should behave differently when shipping compute to a remote EMR cluster.

EMR must run pyspark workloads with yarn specified as the master URI. To run this way, Spark must have access to a local Hadoop/Spark/YARN install on the machine running spark-submit or calling SparkSession.builder.getOrCreate(), and this local installation must be properly configured with core-site.xml, etc. along with all other Hadoop XML files that define the DNS names and hosts in the YARN cluster.

This is easy when invoking a job directly on the EMR master (already configured), or through the EMR boto3 APIs, but not something you can set up locally (very difficult to get everything configured correctly, and fragile).

Because of this, there isn't a way to tell EMR to "run just this one solid"—EMR expects us to invoke spark-submit <options> foo.py either in a shell command on the master or via the EMR APIs, and it expects foo.py to be a self-contained pyspark application.

Given this, there is no obvious way to have pyspark solids structured like we've done historically, because the locally-constructed SparkSession won't be capable of interacting with a remote EMR cluster.


We have a few options that I can think of so far:

  1. Open all requisite ports between local machine and EMR cluster for Spark/YARN/Hadoop to communicate. This is wildly insecure (YARN has no security, anyone can then submit work to the YARN cluster) so not a real option. This also would require all of the local Spark/YARN/Hadoop installation and XML configs, which is a huge burden to get configured right—imagine how you'd have to reconfigure everything for every cluster you talk to in the ephemeral case.
  2. Maintain SSH tunnels between the local host running dagster and EMR cluster. This is very fragile, as the job will immediately be killed if your networking changes, and annoying to maintain healthy SSH tunnels (I've been down this route, it's a pain). Moreover, it would require configuring the local installations as in (1).
  3. Give up on the tight dagster <> pyspark integration, and just have separate orchestration and compute—this is the norm today in Airflow, where you'd have a Pyspark operator that subprocess.Popens a spark-submit my-pyspark-code.py, and my-pyspark-code.py is a Python file that lives separately from the orchestration Python. EMR APIs effectively work the same, see an example here https://stackoverflow.com/a/37117296/11295366.
  4. Structure Pyspark solids so that they execute differently on different Spark targets. Locally, we can just assume we can grab a SparkSession and go; for submitting the work to EMR, we should wrap execution of the solid into a separate Python file and submit ourselves as a workload to the EMR APIs. EMR expects (1) a main Python file, and (2) a zip archive of associated Python code you'd like to distribute to the Spark workers in the cluster. The EMR APIs will then effectively shell out to spark-submit locally on the EMR master, where the Spark installation is configured, such that instantiating a SparkSession will work fine.

This diff implements a version of (4) above; tracking follow-up issues:

1. The overriding of compute behavior based on where we're executing feels janky, and it feels like there's a weird mix of mode/resource/compute override going on here.
2. The zip archive bundling seems prone to issues/fragile; I'm building a zip of the Python code I find in the current directory and shipping it to S3; I worry that the from ... import ... to gain access to the invoking pipeline will be easy to get wrong. -> will eventually support making this pluggable

  1. There's no guarantee we're going to be able to communicate back to the invoking Dagster instance on the host that kicked off the EMR job, rehydrate resources properly, or generally play nice with the rest of Dagster at all. Ideally we should probably assume we can't, and just use the EMR APIs in the invoking Dagster parent process to read back any metadata from the task execution via the EMR APIs.

4. This bundles setting up the environment and execution into one step, which should be broken apart.
5. Need to catch mode mismatches per Alex's comment -> no longer an issue given the switch to resource-based implementation

Test Plan: manual

Reviewers: alangenfeld, max, schrockn

Reviewed By: schrockn

Subscribers: schrockn, themissinghlink, alangenfeld, max, sashank

Differential Revision: https://dagster.phacility.com/D1349


nateAuthored on Nov 3 2019, 5:39 PM
Differential Revision
D1349: pyspark on EMR
R1:2f21cf977784: Fix weather dataset config and validation runner code so that code works and…