Page MenuHomePhabricator

Introduce Partition API
ClosedPublic

Authored by sashank on Nov 15 2019, 12:34 AM.

Details

Summary

This diff introduces a new API for defining pipeline partitions.

The partition axis is completely disjoint from the scheduling intervals. Each ScheduleDefinition can now specify a PartitionSelector that can select a partition of the set of pipeline partitions for a given scheduled execution. This allows room for complicated logic (e.g. pick the next partition in reverse chronological order that does not have a successful run). It also allows for future plumbing to select multiple partitions, as long as they can be enqueued efficiently.

This diff does not generate config (TBD), but it does add a tag dagster/partition, for run grouping.

Test Plan

unit

Diff Detail

Repository
R1 dagster
Lint
Automatic diff as part of commit; lint not applicable.
Unit
Automatic diff as part of commit; unit tests not applicable.

Event Timeline

There are a very large number of changes, so older changes are hidden. Show Older Changes
python_modules/dagster-graphql/dagster_graphql/implementation/execution.py
137

πŸ‘

146

Yep, eventually... Still need to figure out the right API for the dagster-graphql -> dagster interaction. I'll add some placeholder TODO / comments.

python_modules/dagster/dagster/core/definitions/partition.py
9

will change

18

will change

130

I don't know quite yet... I suppose the default would just use generators instead of lists?

examples/dagster_examples/experimental/sched.py
46

Makes sense. I just think having some helper where don't have to import LastPartitionSelector for the common case will be good for easing into this part of the system

update to comments, using namedtuple, adding default partition selector

resigning until sashank takes a looksie. if changes are substantial plz readd me

sashank edited reviewers, added: prha; removed: sashank.
python_modules/dagster/dagster/core/definitions/schedule.py
51–143

in this current exploration of having partitions as their own top level concept - do schedules need to know about them at all? You could have something in the partitions domain that helps you fill out environment_dict_fn, should_execute, and to-be-added tags_fn to define a schedule based on a partition.

to @sashank's queue

python_modules/dagster/dagster/core/definitions/schedule.py
51–143

in my opinion if you drop this scheduler portion the rest of the changes are good to land to start experimenting on

This revision now requires changes to proceed.Dec 2 2019, 11:10 PM
python_modules/dagster/dagster/core/definitions/schedule.py
51–143

Without this, how would you schedule a partition-based pipeline run?

I agree that it's cleaner not to have overlaps with the environment_dict_fn, should_execute, etc...

  • Remove partition_environment_dict_fn
  • Readd partition argument to environment_dict_fn
  • Update existing example
  • Add value to Partition

@prha open to discussion on a better way to handle the dynamic function signature of environment_dict_fn based on whether the schedule has partitions.

python_modules/dagster/dagster/core/definitions/schedule.py
51–143

I also think we need this to start experimenting w/ partitioned-based schedules

Update date_partition_range to store datetime objects

Don't need to block land on me - just throwing some ideas in the mix.

Just seems that you have so much information in the partition definition that it would be nice to create the schedule from that. You only need roughly name, clock rate, and partition selection on top of what you already have defined. ScheduleDefinition.for_partition also reasonable.

python_modules/dagster/dagster/core/definitions/schedule.py
51–143

Without this, how would you schedule a partition-based pipeline run?

I was imagining something like

log_date_set = PartitionSetDefinition(
    name='date_partitions',
    pipeline_name='log_partitions',
    partition_fn=date_partition_range(ten_days_ago),
    base_preset='default', # maybe base config instead?
    config_fn=partition_config_fn # merges over base preset
)

# creates a schedule that
#  fires at cron_schedule rate
# selects a partition
# skips if partition already selected otherwise
# applies config from partition selection atop base and runs
log_date_partition = log_date_set.create_schedule(
    name='log_dates',
    cron_schedule='* * * * *',
    selector=... # means to select partition
)
python_modules/dagster/dagster/core/definitions/schedule.py
51–143

oh, i like that quite a lot!

queue management

This revision now requires changes to proceed.Dec 5 2019, 6:55 PM
  • Add environment_dict_fn_for_partition and tags_fn_for_partition to PartitionSet.
  • Add method to PartitionSet to create a ScheduleDefinition from a PartitionSet. This allowed us to move all the partition logic from start_scheduled_execution to environment_dict_fn_for_partition and tags_fn_for_partition.
  • Add tags_fn to ScheduleDefinition
  • Add deprecation warnings for environment_dict and tags arguments to ScheduleDefinition. We should remove them in 0.7.0 to simplify the API.

redfordnod

ill let @prha take it from here

prha requested changes to this revision.Dec 6 2019, 12:07 AM

some style nits, but also we should write some tests.

examples/dagster_examples/__init__.py
13

I'm wondering if we can avoid the _ argument to the user-defined function.

We don't really even expose DagsterInstance in the top-level dagster exports. This feels like something we can change down the road, but for now might be better to simplify the signature and not expose instance?

examples/dagster_examples/experimental/sched.py
44

let's drop these, and just use the default arg.

python_modules/dagster/dagster/core/definitions/partition.py
145

we might want to add some tests to this, I think...

python_modules/dagster/dagster/core/definitions/schedule.py
81

we should probably write some tests for this too...

examples/dagster_examples/__init__.py
13

This is fair, but this would have the consequence of needing to remove instance as an argument from select_partition on a PartitionSelector.

We thread the instance down into select_partition to get the selected_partition to pass into environment_dict_fn_for_partition.

examples/dagster_examples/__init__.py
13

I do agree that it doesn't make sense to have instance as an argument here, so I would lean towards removing it as a param from select_partition to do so.

Do you think it's more important that select_partition has access to the instance?

  • Fix validation on ScheduleDefinition
  • Throw DagsterInvariantViolationError if selected_partition == None
examples/dagster_examples/__init__.py
13

I think we can figure out a way to add that later if we need to. I'm fine removing it for now.

Remove use of instance in environment_config_fn, tags_fn, and PartitionSelector

sashank marked 5 inline comments as done.

Add tests

Looks good! Once build issues are resolved (and one typo), let's land!

raichu

examples/dagster_examples/experimental/sched.py
36

should the schedule_name here be log_states?

This revision is now accepted and ready to land.Dec 6 2019, 10:57 PM

make black

examples/dagster_examples/experimental/sched.py
36

yup good catch

python_modules/dagster/dagster/core/definitions/partition.py
145

Got rid of this method since we got rid of instance, and call select_partition directly now

python_modules/dagster/dagster/core/definitions/schedule.py
81

added

Remove instance parameter from dash_stats_datetime_partition_config function

This revision was automatically updated to reflect the committed changes.