Page MenuHomePhabricator

Introduce Partition API
ClosedPublic

Authored by sashank on Fri, Nov 15, 12:34 AM.

Details

Summary

This diff introduces a new API for defining pipeline partitions.

The partition axis is completely disjoint from the scheduling intervals. Each ScheduleDefinition can now specify a PartitionSelector that can select a partition of the set of pipeline partitions for a given scheduled execution. This allows room for complicated logic (e.g. pick the next partition in reverse chronological order that does not have a successful run). It also allows for future plumbing to select multiple partitions, as long as they can be enqueued efficiently.

This diff does not generate config (TBD), but it does add a tag dagster/partition, for run grouping.

Test Plan

unit

Diff Detail

Repository
R1 dagster
Branch
prha/partition_api
Lint
Lint OK
Unit
No Unit Test Coverage

Event Timeline

There are a very large number of changes, so older changes are hidden. Show Older Changes
prha added inline comments.Wed, Nov 27, 12:26 AM
python_modules/dagster-graphql/dagster_graphql/implementation/execution.py
148

πŸ‘

157

Yep, eventually... Still need to figure out the right API for the dagster-graphql -> dagster interaction. I'll add some placeholder TODO / comments.

python_modules/dagster/dagster/core/definitions/partition.py
10

will change

19

will change

131

I don't know quite yet... I suppose the default would just use generators instead of lists?

schrockn added inline comments.Wed, Nov 27, 12:36 AM
examples/dagster_examples/experimental/sched.py
46

Makes sense. I just think having some helper where don't have to import LastPartitionSelector for the common case will be good for easing into this part of the system

prha updated this revision to Diff 6945.Wed, Nov 27, 1:29 AM

update to comments, using namedtuple, adding default partition selector

schrockn resigned from this revision.Wed, Nov 27, 11:44 PM

resigning until sashank takes a looksie. if changes are substantial plz readd me

sashank commandeered this revision.Mon, Dec 2, 10:30 PM
sashank edited reviewers, added: prha; removed: sashank.
alangenfeld added inline comments.Mon, Dec 2, 11:07 PM
python_modules/dagster/dagster/core/definitions/schedule.py
46–138

in this current exploration of having partitions as their own top level concept - do schedules need to know about them at all? You could have something in the partitions domain that helps you fill out environment_dict_fn, should_execute, and to-be-added tags_fn to define a schedule based on a partition.

alangenfeld requested changes to this revision.Mon, Dec 2, 11:10 PM

to @sashank's queue

python_modules/dagster/dagster/core/definitions/schedule.py
46–138

in my opinion if you drop this scheduler portion the rest of the changes are good to land to start experimenting on

This revision now requires changes to proceed.Mon, Dec 2, 11:10 PM
prha added inline comments.Mon, Dec 2, 11:19 PM
python_modules/dagster/dagster/core/definitions/schedule.py
46–138

Without this, how would you schedule a partition-based pipeline run?

I agree that it's cleaner not to have overlaps with the environment_dict_fn, should_execute, etc...

sashank updated this revision to Diff 7058.Tue, Dec 3, 12:00 AM
  • Remove partition_environment_dict_fn
  • Readd partition argument to environment_dict_fn
  • Update existing example
  • Add value to Partition

@prha open to discussion on a better way to handle the dynamic function signature of environment_dict_fn based on whether the schedule has partitions.

sashank added inline comments.Tue, Dec 3, 12:08 AM
python_modules/dagster/dagster/core/definitions/schedule.py
46–138

I also think we need this to start experimenting w/ partitioned-based schedules

sashank updated this revision to Diff 7061.Tue, Dec 3, 12:18 AM

Update date_partition_range to store datetime objects

Don't need to block land on me - just throwing some ideas in the mix.

Just seems that you have so much information in the partition definition that it would be nice to create the schedule from that. You only need roughly name, clock rate, and partition selection on top of what you already have defined. ScheduleDefinition.for_partition also reasonable.

python_modules/dagster/dagster/core/definitions/schedule.py
46–138

Without this, how would you schedule a partition-based pipeline run?

I was imagining something like

log_date_set = PartitionSetDefinition(
    name='date_partitions',
    pipeline_name='log_partitions',
    partition_fn=date_partition_range(ten_days_ago),
    base_preset='default', # maybe base config instead?
    config_fn=partition_config_fn # merges over base preset
)

# creates a schedule that
#  fires at cron_schedule rate
# selects a partition
# skips if partition already selected otherwise
# applies config from partition selection atop base and runs
log_date_partition = log_date_set.create_schedule(
    name='log_dates',
    cron_schedule='* * * * *',
    selector=... # means to select partition
)
prha added inline comments.Tue, Dec 3, 12:59 AM
python_modules/dagster/dagster/core/definitions/schedule.py
46–138

oh, i like that quite a lot!

alangenfeld requested changes to this revision.Thu, Dec 5, 6:55 PM

queue management

This revision now requires changes to proceed.Thu, Dec 5, 6:55 PM
sashank updated this revision to Diff 7136.Thu, Dec 5, 11:15 PM
  • Add environment_dict_fn_for_partition and tags_fn_for_partition to PartitionSet.
  • Add method to PartitionSet to create a ScheduleDefinition from a PartitionSet. This allowed us to move all the partition logic from start_scheduled_execution to environment_dict_fn_for_partition and tags_fn_for_partition.
  • Add tags_fn to ScheduleDefinition
  • Add deprecation warnings for environment_dict and tags arguments to ScheduleDefinition. We should remove them in 0.7.0 to simplify the API.
sashank planned changes to this revision.Thu, Dec 5, 11:15 PM
alangenfeld resigned from this revision.Thu, Dec 5, 11:23 PM

redfordnod

ill let @prha take it from here

prha requested changes to this revision.Fri, Dec 6, 12:07 AM

some style nits, but also we should write some tests.

examples/dagster_examples/__init__.py
13 β†—(On Diff #7136)

I'm wondering if we can avoid the _ argument to the user-defined function.

We don't really even expose DagsterInstance in the top-level dagster exports. This feels like something we can change down the road, but for now might be better to simplify the signature and not expose instance?

examples/dagster_examples/experimental/sched.py
44

let's drop these, and just use the default arg.

python_modules/dagster/dagster/core/definitions/partition.py
146

we might want to add some tests to this, I think...

python_modules/dagster/dagster/core/definitions/schedule.py
85

we should probably write some tests for this too...

sashank planned changes to this revision.Fri, Dec 6, 12:11 AM
sashank added inline comments.Fri, Dec 6, 12:36 AM
examples/dagster_examples/__init__.py
13 β†—(On Diff #7136)

This is fair, but this would have the consequence of needing to remove instance as an argument from select_partition on a PartitionSelector.

We thread the instance down into select_partition to get the selected_partition to pass into environment_dict_fn_for_partition.

sashank added inline comments.Fri, Dec 6, 1:02 AM
examples/dagster_examples/__init__.py
13 β†—(On Diff #7136)

I do agree that it doesn't make sense to have instance as an argument here, so I would lean towards removing it as a param from select_partition to do so.

Do you think it's more important that select_partition has access to the instance?

sashank updated this revision to Diff 7146.Fri, Dec 6, 1:10 AM
  • Fix validation on ScheduleDefinition
  • Throw DagsterInvariantViolationError if selected_partition == None
sashank planned changes to this revision.Fri, Dec 6, 1:10 AM
prha added inline comments.Fri, Dec 6, 1:12 AM
examples/dagster_examples/__init__.py
13 β†—(On Diff #7136)

I think we can figure out a way to add that later if we need to. I'm fine removing it for now.

sashank updated this revision to Diff 7152.EditedFri, Dec 6, 1:28 AM

Remove use of instance in environment_config_fn, tags_fn, and PartitionSelector

sashank planned changes to this revision.Fri, Dec 6, 1:28 AM
sashank updated this revision to Diff 7153.Fri, Dec 6, 1:57 AM

fix tests

sashank planned changes to this revision.Fri, Dec 6, 1:57 AM
sashank updated this revision to Diff 7211.Fri, Dec 6, 10:28 PM
sashank marked 5 inline comments as done.

Add tests

prha accepted this revision.Fri, Dec 6, 10:57 PM

Looks good! Once build issues are resolved (and one typo), let's land!

raichu

examples/dagster_examples/experimental/sched.py
36

should the schedule_name here be log_states?

This revision is now accepted and ready to land.Fri, Dec 6, 10:57 PM
sashank updated this revision to Diff 7215.Fri, Dec 6, 11:08 PM

make black

examples/dagster_examples/experimental/sched.py
36

yup good catch

python_modules/dagster/dagster/core/definitions/partition.py
146

Got rid of this method since we got rid of instance, and call select_partition directly now

python_modules/dagster/dagster/core/definitions/schedule.py
85

added

sashank updated this revision to Diff 7216.Fri, Dec 6, 11:09 PM

Fix typo

sashank updated this revision to Diff 7217.Fri, Dec 6, 11:11 PM

Remove instance parameter from dash_stats_datetime_partition_config function

Harbormaster failed remote builds in B5817: Diff 7216!
sashank updated this revision to Diff 7222.Fri, Dec 6, 11:28 PM

rebase on origin/master

This revision was automatically updated to reflect the committed changes.