Page MenuHomePhabricator

Consolidate bay bike csv files into one csv file and upload to bucket resource
ClosedPublic

Authored by themissinghlink on Nov 8 2019, 11:19 PM.

Details

Summary

Problem

We have like 24 csv files sitting in a directory. Now we want to consolidate them into one big csv file and upload it to a bucket.

Solution

  • Wrote consolidate_csv_files and upload_file_to_bucket solids.
  • Extended pipeline to use these solids and hooked in local mode resources.
  • Defined a poor mans AbstractBucket interface and a LocalBucket resource.
  • Define a GCS bucket for prod mode.
  • Updated unit tests which work.

Proof This Works

Use the following config in prod mode on dagit and see the dagster-scratch bucket for the consolidated.csv file!

resources:
  bucket:
    config:
      bucket_name: dagster-scratch-ccdfe1e
solids:
  consolidate_csv_files:
    inputs:
      source_dir:
        value: ./data
      target:
        value: ./data/consolidated.csv
  download_zipfiles_from_urls:
    inputs:
      base_url:
        value: 'https://s3.amazonaws.com/baywheels-data'
      chunk_size:
        value: 8192
      file_names:
          - value: 201801-fordgobike-tripdata.csv.zip
          - value: 201802-fordgobike-tripdata.csv.zip
          - value: 201803-fordgobike-tripdata.csv.zip
          - value: 201804-fordgobike-tripdata.csv.zip
          - value: 201805-fordgobike-tripdata.csv.zip
          - value: 201806-fordgobike-tripdata.csv.zip
          - value: 201807-fordgobike-tripdata.csv.zip
          - value: 201808-fordgobike-tripdata.csv.zip
          - value: 201809-fordgobike-tripdata.csv.zip
          - value: 201810-fordgobike-tripdata.csv.zip
          - value: 201811-fordgobike-tripdata.csv.zip
          - value: 201812-fordgobike-tripdata.csv.zip
          - value: 201901-fordgobike-tripdata.csv.zip
          - value: 201902-fordgobike-tripdata.csv.zip
          - value: 201903-fordgobike-tripdata.csv.zip
          - value: 201904-fordgobike-tripdata.csv.zip
          - value: 201905-baywheels-tripdata.csv.zip
          - value: 201906-baywheels-tripdata.csv.zip
          - value: 201907-baywheels-tripdata.csv.zip
          - value: 201908-baywheels-tripdata.csv.zip
          - value: 201909-baywheels-tripdata.csv.zip
      target_dir:
        value: /tmp
  unzip_files:
    inputs:
      source_dir:
        value: /tmp
      target_dir:
        value: ./data
Test Plan

unit

Diff Detail

Repository
R1 dagster
Lint
Automatic diff as part of commit; lint not applicable.
Unit
Automatic diff as part of commit; unit tests not applicable.

Event Timeline

themissinghlink edited the summary of this revision. (Show Details)Nov 8 2019, 11:26 PM
themissinghlink added a reviewer: Restricted Project.
themissinghlink edited the summary of this revision. (Show Details)
themissinghlink removed a reviewer: Restricted Project.Nov 8 2019, 11:28 PM
  • added tests to actually check data in bucket
  • fixed p35 compat issue with path strings
  • realized that this was all encapsulated in pipeline tests
themissinghlink added a reviewer: Restricted Project.Nov 11 2019, 5:01 PM
themissinghlink edited the summary of this revision. (Show Details)Nov 11 2019, 5:31 PM
max requested changes to this revision.Nov 11 2019, 8:54 PM
max added a subscriber: max.

i'd like to make sure this will work on windows

examples/dagster_examples/bay_bikes/resources.py
13

you're looking for six.with_metaclass

24

i don't think this will work with s3 keys like foo/bar/baz -- os.path.basename('foo/bar/baz') == 'baz'

33

this is a nice idea but won't work on windows

examples/dagster_examples/bay_bikes/solids.py
51–100

very cool, almost feels like we should have a wrapper for this sort of thing

59

fwiw i would make make expected_delimiter a config option rather than an input -- consider the range of options available on pd.read_csv, were we to write this as more a library solid

72

my gut is that typically we want s3 to be the resource and bucket to be configed but i have a hard time expressing why i feel that way. it feels like we are likely to use many buckets in the course of a typical pipeline.

examples/dagster_examples_tests/bay_bikes_tests/test_pipelines.py
6–7

might these be better modeled using a test preset on the pipeline?

This revision now requires changes to proceed.Nov 11 2019, 8:54 PM
  • WIP-just to get help from max
examples/dagster_examples/bay_bikes/resources.py
13

YES this is exactly what I was looking for! Thanks Max

24

Confused by what you mean here? I think the intention is for this to only be used to have a local directory mimic an s3 bucket for purposes of testing or local iteration across modes.

33

damn. Good call.

examples/dagster_examples/bay_bikes/solids.py
59

good call! Is the intuition that configs effect control flow of bodies of functions wheras inputs are minimum viable list of data needed for computation?

72

I guess I kind of liked the idea of having to define my bucket dependencies explicitly. I have seldom seen cases where I am using more than 2-3 buckets in any given pipeline without being insanely wasteful.

  • added tests to actually check data in bucket
  • fixed p35 compat issue with path strings
  • realized that this was all encapsulated in pipeline tests
  • WIP-just to get help from max
  • was playing around with connection pools in requests but am facing issues so reverting back
  • whoops now I reverted
  • added support for gcp bucket and went back to requests because urllib3 did not come packaged with ssl certificate issues
  • addressed review feedback
  • fixed linter bugs
  • apply arc lint fixes
examples/dagster_examples_tests/bay_bikes_tests/test_pipelines.py
6–7

Could you elaborate more on this? Unsure what you mean. I def agree that the above fixtures are probably overkill, but intrigued to find a better way!

themissinghlink edited the summary of this revision. (Show Details)Nov 12 2019, 12:59 AM
themissinghlink edited the summary of this revision. (Show Details)
  • added gcp support to examples
max requested changes to this revision.Nov 12 2019, 7:03 PM

requesting changes on this to fix the implementation of the local bucket to work with s3 keys like foo/bar/baz

examples/dagster_examples/bay_bikes/resources.py
24
def set_object(self, obj):
    destination = os.path.join(self.bucket_object, os.path.basename(obj))
    copyfile(obj, destination)
    return destination

this won't work with a key like 'foo/bar', because os.path.basename('foo/bar') is 'bar' -- when we call get_object` it won't be where we expect it to be

52

ok. you could also just put a file at the destination path which told you where to look for the original object, etc., if you want to maintain the lightweight aspect of the symlink implementation

examples/dagster_examples/bay_bikes/solids.py
59

it's delicate, but this is exactly the example i chose in the tutorial to try to illustrate the difference. the delimiter will almost never come from an upstream solid.

https://dagster.readthedocs.io/en/0.6.3/sections/learn/tutorial/config.html

72

suppose you are pulling files from one bucket, doing something to them, and putting them into another bucket.

examples/dagster_examples_tests/bay_bikes_tests/test_pipelines.py
6–7

presumably users will frequently want these config settings in order to do local test -- if they are a preset on the pipeline, then they can select them in dagit, etc.

This revision now requires changes to proceed.Nov 12 2019, 7:03 PM
examples/dagster_examples/bay_bikes/resources.py
52

Ah mocking symlinks, thats a smart idea! Will throw an implementation up.

examples/dagster_examples/bay_bikes/solids.py
59

makes sense!

72

wouldn't that just be 2 bucket resources? I just feel the following reads a lot better:

@solid(required_resource_keys={'source_bucket', 'sink_bucket'})
def transform_and_move(context):
    data = read_files_from_bucket(context.resources.source_bucket)
    upload_to_new_bucket(data, context.resources.sink_bucket)

than:

@solid(required_resource_keys={'s3'})
def transform_and_move(context):
    source_bucket = s3.Bucket('source_bucket_name')
    sink_bucket = s3.Bucket('sink_bucket_name') # I always forget the syntax but you know what I mean
    data = read_files_from_bucket(context.resources.source_bucket)
    upload_to_new_bucket(data, context.resources.sink_bucket)

Essentially it comes down to whether or not you want to have the complexity live with your solids or your resources. I would much rather have extra resources because it just means that my solids can exclusively express business logic which is their purpose.

  • added tests to actually check data in bucket
  • fixed p35 compat issue with path strings
  • realized that this was all encapsulated in pipeline tests
  • WIP-just to get help from max
  • was playing around with connection pools in requests but am facing issues so reverting back
  • whoops now I reverted
  • added support for gcp bucket and went back to requests because urllib3 did not come packaged with ssl certificate issues
  • addressed review feedback
  • fixed linter bugs
  • apply arc lint fixes
  • added gcp support to examples
  • made revision fixes
  • used mount resource and refactored code
  • made resource changes to clean up the gcp behavior
  • forgot to add new lines to env files
max accepted this revision.Nov 14 2019, 12:07 AM

not sure about this bucket api, please see comments. why not set_object(key, obj) and get_object(key)? maybe there's a missing upload_file api -- rn there's this odd asymmetry where we have set_object: str -> None and get_object: str -> blob (rather than set_object taking a blob and get_object giving one back)

examples/dagster_examples/bay_bikes/resources.py
90

this seems like a difficult API. suppose I have two files on my file system at /foo/bar/baz and at /quux/qux/baz. If I call set_object('/foo/bar/baz) and then set_object('/quux/qux/baz'), won't the second file overwrite the first file?

examples/dagster_examples/bay_bikes/solids.py
72

seems to me like solids are hard to reuse in this approach, but ok

This revision is now accepted and ready to land.Nov 14 2019, 12:07 AM
examples/dagster_examples/bay_bikes/resources.py
90

So this is intended but maybe I am missing the forrest for the trees here. So here is what is currently happening. csv files are landing in some directory on the file system (let's call it PATH).

If I create a blob with the key that is passed into set_key, then on the UI it looks like <bucket-name>/PATH/file_name.csv. This is a bit awkward as I would just like to throw things into a bucket, maybe I specify the bucket_key to upload the object to and change the interface to something like set_key(self, from_key, to_key). How would that feel?

examples/dagster_examples/bay_bikes/solids.py
72

OOOOOH I see what you are saying now. That absolutely right. However, the tricky part here is instead of creating an abstract bucket interface, I have to go a level higher and create an abstract resource manager which provisions buckets and and saves stuff to them. I actually like this idea now but this would be quite involved. To ensure this diff doesn't keep blowing up, can I address this in a later revision?

  • redesigned abstract bucket resource to transformaer resource to more cleanly match the semantics of what it was doing
  • fixed linter bug