Page MenuHomePhabricator

Introduce Partition API
Changes PlannedPublic

Authored by sashank on Fri, Nov 15, 12:34 AM.

Details

Summary

This diff introduces a new API for defining pipeline partitions.

The partition axis is completely disjoint from the scheduling intervals. Each ScheduleDefinition can now specify a PartitionSelector that can select a partition of the set of pipeline partitions for a given scheduled execution. This allows room for complicated logic (e.g. pick the next partition in reverse chronological order that does not have a successful run). It also allows for future plumbing to select multiple partitions, as long as they can be enqueued efficiently.

This diff does not generate config (TBD), but it does add a tag dagster/partition, for run grouping.

Test Plan

unit

Diff Detail

Repository
R1 dagster
Branch
prha/partition_api
Lint
Lint OK
Unit
No Unit Test Coverage

Event Timeline

There are a very large number of changes, so older changes are hidden. Show Older Changes
prha edited the summary of this revision. (Show Details)Sat, Nov 23, 1:14 AM
prha edited reviewers, added: schrockn; removed: Restricted Project.
prha updated this revision to Diff 6821.Sat, Nov 23, 1:15 AM
prha edited the summary of this revision. (Show Details)

black

schrockn requested changes to this revision.Sun, Nov 24, 12:32 AM

This is super interesting. Just high level comments for now.

I think the biggest design decision that we need to suss out is what information gets encoded on the schedule versus pipeline versus not. The core decision is whether or we formally model a partitioning scheme on the pipeline itself or just do this through the config system.

My initial reaction is that it seems strange to model the partition on the pipeline, and especially the exact set of partitions (start_date hard coded on a pipeline definition seems wrong). This limits the flexibility of the pipeline quite a bit. I could imagine running the same pipeline with slightly different config on multiple schedules (quarterly and monthly, for example). If we *need* access to the partition value for business logic, this could be done by having a resource. This also has the benefit of being strongly typed and modeled in the config system rather than just the tags. If someone is running ad-hoc jobs, the fact that they need to provide a partition would be clearly communicated. The resource could be responsible for adding the tag, maybe. But there are also good arguments for making it a first class citizen. I'm curious to hear you expand on the tradeoffs.

The way this would hook up into the scheduler is that the config generation function would have access (in the date partition case) to the effective date which would in turn parameterize some sort of time resource, if the python code itself required access to time, rather than it just being a config thing (that just creates s3 paths or something).

  • Re: selectors. It's unclear to me why the partition selection function should be part of the identity of the schedule definition. The way I model it in my head is that the schedule definition would define an API that allows you create a set of partitions. Then there would be another API that allows one to enqueue a backfill (which is just a list of runs with different partition values).

    backfill_until(schedule_def, start_date) # execute backfills in order from the present to the start date backfill_from(schedule_def, from_date) # execute backfills from start date to present

However in reality this would be a pretty light wrapper around enqueueing a set of pipelines. The backfill functions just know how to poke at the schedule definitions to generate it. (btw the proposed api does imply moving start date to the ScheduleDefinition)

  • Materializing all the partitions every time in order to schedule a run might end up being costly/slow for long-running, frequently firing schedules. Especially true since you sort them. We may want to make special versions of these functions that iterate over one partition value at a time to make the first and last selector strategy cheap.

I haven't been privy to all the conversations around so it's possible that I am just rehashing some tradeoff that has already been discussed.

examples/dagster_examples/experimental/repo.py
144

using "name" is here seems weird since the common case is that it will be a value

python_modules/dagster-graphql/dagster_graphql/implementation/execution.py
158

we will want to push this into the core python API so that we can script partition backfills directly

python_modules/dagster-graphql/dagster_graphql_tests/graphql/setup.py
619

this name is concise but it threw me a little bit before a read through the whole diff. I thought you were creating a static set of partitions rather than a factory function that would be called whenever the getter is called. i still think i like this name in the concise/clarity tradeoff but partition_factory or something would more obvoius at first blush

This revision now requires changes to proceed.Sun, Nov 24, 12:32 AM
prha added a comment.Sun, Nov 24, 9:14 PM

Thanks for this feedback. It might be helpful to describe how I've been thinking about some of the specific decisions.

Re: defining partitions on pipelines vs defining partitions on the schedule:
Sashank's implementation had everything defined on the ScheduleDefinition, and seems in line with the approach you describe (see https://dagster.phacility.com/D1430?vs=on&id=6615#toc). But it seemed awkward to me to have to define a schedule just in order to define the set of partitions. It also meant that the ad-hoc execution of a partition through dagit would force everything to happen through schedules and to be partition-aware (see D1442). Upon discussing with Max/Nate/Alex, it really seemed like when I'm defining the pipeline, I already have an idea of how I'm partitioning my data. So it didn't seem onerous to have to define the partitions up front as well. I think the case of having different time intervals is a good one (e.g. by month, by week, by day). I was planning on having PipelineDefinition take in multiple partition_defs, but was punting that to a future diff, while I saw how that played out. I don't have 100% conviction that defining partitions on pipeline is right, but there are a lot of things that are nice about it.

Re: selectors:
My thought around selectors defined on the ScheduleDefinition was that the schedule determines the clock tick, but you could devise your own strategy to kick off a run for whatever partition(s) you wanted once that clock tick occurs. So you could actually define a backfill schedule by having a PartitionSelector that selects the next run for which you have no successful results (which would be possible to do because you would have access to the instance history). You then wouldn't need to support an explicit backfill api call. Having the schedule execute multiple partitions was something I was punting until we had the RunLauncher-like capability.

I've been keeping personal notes here: https://paper.dropbox.com/doc/Partitions--ApCMaX9T4tUeTK~_Mpq40869Ag-W63TFhiJ5a9qH8qmWp6w4

prha updated this revision to Diff 6939.Tue, Nov 26, 11:09 PM
  • move partition definitions to standalone repository yaml section
  • mapped config
schrockn added inline comments.Tue, Nov 26, 11:53 PM
examples/dagster_examples/experimental/sched.py
46

I don't really see a case where a ScheduleDefinition will define anything except a LastPartitionSelector, unless I not understanding something

python_modules/dagster-graphql/dagster_graphql/implementation/execution.py
149

catching this error seems bad, just let it escape?

python_modules/dagster/dagster/core/definitions/handle.py
27 β†—(On Diff #6939)

i'm sorry you had to write this

python_modules/dagster/dagster/core/definitions/partition.py
10

generally do prefer doing the namedtuple inheritance thing even thought it is a pain

19

same for of these

131

do you have a plan in mind to make it so we don't have to materialize all partitions? there could be a lot of partitions given how generalized the abstraction is

prha added inline comments.Wed, Nov 27, 12:26 AM
examples/dagster_examples/experimental/sched.py
46

Discussed offline, but recording here:

I think it is nice having this layer of abstraction to select an arbitrary (or eventually multiple) arbitrary partitions to operate.

For example, I could have a partition selector that picks the last partition that does not have a successful run. Or I could pick a partition selector that selects all the partitions since the last clock tick (e.g. once a day, schedule all the hourly stats).

python_modules/dagster-graphql/dagster_graphql/implementation/execution.py
149

πŸ‘

158

Yep, eventually... Still need to figure out the right API for the dagster-graphql -> dagster interaction. I'll add some placeholder TODO / comments.

python_modules/dagster/dagster/core/definitions/partition.py
10

will change

19

will change

131

I don't know quite yet... I suppose the default would just use generators instead of lists?

schrockn added inline comments.Wed, Nov 27, 12:36 AM
examples/dagster_examples/experimental/sched.py
46

Makes sense. I just think having some helper where don't have to import LastPartitionSelector for the common case will be good for easing into this part of the system

prha updated this revision to Diff 6945.Wed, Nov 27, 1:29 AM

update to comments, using namedtuple, adding default partition selector

schrockn resigned from this revision.Wed, Nov 27, 11:44 PM

resigning until sashank takes a looksie. if changes are substantial plz readd me

sashank commandeered this revision.Mon, Dec 2, 10:30 PM
sashank edited reviewers, added: prha; removed: sashank.
alangenfeld added inline comments.Mon, Dec 2, 11:07 PM
python_modules/dagster/dagster/core/definitions/schedule.py
44–136

in this current exploration of having partitions as their own top level concept - do schedules need to know about them at all? You could have something in the partitions domain that helps you fill out environment_dict_fn, should_execute, and to-be-added tags_fn to define a schedule based on a partition.

alangenfeld requested changes to this revision.Mon, Dec 2, 11:10 PM

to @sashank's queue

python_modules/dagster/dagster/core/definitions/schedule.py
44–136

in my opinion if you drop this scheduler portion the rest of the changes are good to land to start experimenting on

This revision now requires changes to proceed.Mon, Dec 2, 11:10 PM
prha added inline comments.Mon, Dec 2, 11:19 PM
python_modules/dagster/dagster/core/definitions/schedule.py
44–136

Without this, how would you schedule a partition-based pipeline run?

I agree that it's cleaner not to have overlaps with the environment_dict_fn, should_execute, etc...

sashank updated this revision to Diff 7058.Tue, Dec 3, 12:00 AM
  • Remove partition_environment_dict_fn
  • Readd partition argument to environment_dict_fn
  • Update existing example
  • Add value to Partition

@prha open to discussion on a better way to handle the dynamic function signature of environment_dict_fn based on whether the schedule has partitions.

sashank added inline comments.Tue, Dec 3, 12:08 AM
python_modules/dagster/dagster/core/definitions/schedule.py
44–136

I also think we need this to start experimenting w/ partitioned-based schedules

sashank updated this revision to Diff 7061.Tue, Dec 3, 12:18 AM

Update date_partition_range to store datetime objects

Don't need to block land on me - just throwing some ideas in the mix.

Just seems that you have so much information in the partition definition that it would be nice to create the schedule from that. You only need roughly name, clock rate, and partition selection on top of what you already have defined. ScheduleDefinition.for_partition also reasonable.

python_modules/dagster/dagster/core/definitions/schedule.py
44–136

Without this, how would you schedule a partition-based pipeline run?

I was imagining something like

log_date_set = PartitionSetDefinition(
    name='date_partitions',
    pipeline_name='log_partitions',
    partition_fn=date_partition_range(ten_days_ago),
    base_preset='default', # maybe base config instead?
    config_fn=partition_config_fn # merges over base preset
)

# creates a schedule that
#  fires at cron_schedule rate
# selects a partition
# skips if partition already selected otherwise
# applies config from partition selection atop base and runs
log_date_partition = log_date_set.create_schedule(
    name='log_dates',
    cron_schedule='* * * * *',
    selector=... # means to select partition
)
prha added inline comments.Tue, Dec 3, 12:59 AM
python_modules/dagster/dagster/core/definitions/schedule.py
44–136

oh, i like that quite a lot!

alangenfeld requested changes to this revision.Thu, Dec 5, 6:55 PM

queue management

This revision now requires changes to proceed.Thu, Dec 5, 6:55 PM
sashank updated this revision to Diff 7136.Thu, Dec 5, 11:15 PM
  • Add environment_dict_fn_for_partition and tags_fn_for_partition to PartitionSet.
  • Add method to PartitionSet to create a ScheduleDefinition from a PartitionSet. This allowed us to move all the partition logic from start_scheduled_execution to environment_dict_fn_for_partition and tags_fn_for_partition.
  • Add tags_fn to ScheduleDefinition
  • Add deprecation warnings for environment_dict and tags arguments to ScheduleDefinition. We should remove them in 0.7.0 to simplify the API.
sashank planned changes to this revision.Thu, Dec 5, 11:15 PM
alangenfeld resigned from this revision.Thu, Dec 5, 11:23 PM

redfordnod

ill let @prha take it from here

prha requested changes to this revision.Fri, Dec 6, 12:07 AM

some style nits, but also we should write some tests.

examples/dagster_examples/__init__.py
13 β†—(On Diff #7136)

I'm wondering if we can avoid the _ argument to the user-defined function.

We don't really even expose DagsterInstance in the top-level dagster exports. This feels like something we can change down the road, but for now might be better to simplify the signature and not expose instance?

examples/dagster_examples/experimental/sched.py
44

let's drop these, and just use the default arg.

python_modules/dagster/dagster/core/definitions/partition.py
146

we might want to add some tests to this, I think...

python_modules/dagster/dagster/core/definitions/schedule.py
82

we should probably write some tests for this too...

sashank planned changes to this revision.Fri, Dec 6, 12:11 AM
sashank added inline comments.Fri, Dec 6, 12:36 AM
examples/dagster_examples/__init__.py
13 β†—(On Diff #7136)

This is fair, but this would have the consequence of needing to remove instance as an argument from select_partition on a PartitionSelector.

We thread the instance down into select_partition to get the selected_partition to pass into environment_dict_fn_for_partition.

sashank added inline comments.Fri, Dec 6, 1:02 AM
examples/dagster_examples/__init__.py
13 β†—(On Diff #7136)

I do agree that it doesn't make sense to have instance as an argument here, so I would lean towards removing it as a param from select_partition to do so.

Do you think it's more important that select_partition has access to the instance?

sashank updated this revision to Diff 7146.Fri, Dec 6, 1:10 AM
  • Fix validation on ScheduleDefinition
  • Throw DagsterInvariantViolationError if selected_partition == None
sashank planned changes to this revision.Fri, Dec 6, 1:10 AM
prha added inline comments.Fri, Dec 6, 1:12 AM
examples/dagster_examples/__init__.py
13 β†—(On Diff #7136)

I think we can figure out a way to add that later if we need to. I'm fine removing it for now.

sashank updated this revision to Diff 7152.EditedFri, Dec 6, 1:28 AM

Remove use of instance in environment_config_fn, tags_fn, and PartitionSelector

sashank planned changes to this revision.Fri, Dec 6, 1:28 AM
sashank updated this revision to Diff 7153.Fri, Dec 6, 1:57 AM

fix tests

sashank planned changes to this revision.Fri, Dec 6, 1:57 AM