Page MenuHomeElementl

[RFC] time-based partitioned config and schedule from partitioned config
ClosedPublic

Authored by cdecarolis on Jul 2 2021, 4:53 PM.

Details

Summary

Depends on D8618.

This changes the way we expect users to build partitioned schedules. It makes @daily_schedule and the like obsolete.

The aim is to eliminate PartitionSetDefinitions as free-standing objects. Instead, a job can have partitioned config, which means that each run of the job corresponds to a particular partition, and the config for the job is based on that partition.

This means that the set of partitions is supplied when building the job, not when building the schedule that targets the job. In English, one might say "I have a job where each run fills in a days' worth of data in my table. I have a schedule that triggers that job at 7 am to process the prior days' worth of data."

Test Plan

still needs more tests

Diff Detail

Repository
R1 dagster
Lint
Lint Not Applicable
Unit
Tests Not Applicable

Event Timeline

python_modules/dagster/dagster_tests/core_tests/definitions_tests/test_partitioned_schedule.py
16

here's what it would look like to define a schedule for a partitioned job. this would replace daily_schedule.

Harbormaster returned this revision to the author for changes because remote builds failed.Jul 2 2021, 5:37 PM
Harbormaster failed remote builds in B33086: Diff 40784!
python_modules/dagster/dagster_tests/core_tests/definitions_tests/test_partitioned_schedule.py
17โ€“39

nit: i think its worth updating this test to reflect generating a config that actually can execute

I think moving the partitioned config space to the job instead of the partition set makes sense.

We've talked offline about tags for partition, and how we will lose that per-partition/execution flexibility (but that is probably acceptable).

What is the minimal viable daily schedule in this new api?

python_modules/dagster/dagster/core/definitions/partitioned_schedule.py
23

Strongly recommend not doing this even if the result is immutable (searching through docs did not yield any meaningful result). This will only be called once at definition, and if mutable will do the crazy counterintuitive python behavior thing. At minimum a comment, but should probably just be None and then conditionally called time(0, 0)

What is the minimal viable daily schedule in this new api?

The minimum viable daily partitioned schedule looks like this:

@daily_partitioned_config(start_date="2021-05-05")
def my_partitioned_config(_start, _end):
    return {}

my_schedule = schedule_from_partitions(my_graph.to_job(config=my_partitioned_config))

It's somewhat more verbose than the current API. IMO this reflects the complexity of what the user is trying to do. I.e. when someone needs a partitioned schedule, it's because, independent of their instigation policy, they need their job to be executable / backfill-able over a particular discrete set of values.

For users who just want to execute a job every day, we'd direct them to the @schedule API (maybe we could accept cron_schedule="@daily" if we don't already).

Thoughts on whether this is a regression?

python_modules/dagster/dagster/core/definitions/partitioned_schedule.py
23

๐Ÿ‘

cdecarolis added a reviewer: sandyryza.
cdecarolis added a subscriber: cdecarolis.

finishing touches + other partition windows

cdecarolis edited the summary of this revision. (Show Details)

Add monthly, weekly, and hourly, support time of day

dbt failures unrelated

cdecarolis retitled this revision from [RFC] daily_partitioned_config and schedule_from_partitions to [RFC] time-based partitioned config and schedule from partitioned config.Mon, Jul 12, 9:25 PM

Thanks a ton for picking this up. To make it accessible to users, it needs a few more things:

  • Docstrings for the _partitioned_config functions.
  • Experimental warnings.
  • Inclusion in the experimental API docs page.
  • Inclusion in the top-level public API.
This revision now requires changes to proceed.Tue, Jul 13, 5:43 PM

Add to experimental API docs, top level API, docstring pass

sandyryza added inline comments.
python_modules/dagster/dagster/core/definitions/time_window_partitions.py
85

I missed this last time - I don't think we should include any "execution_X" parameters inside these decorators. If you think about it, the goal of the partitioned config is just defining a set of values that can be used to parameterize the pipeline. It doesn't actually say anything about when the pipeline should be executed.

So execution_time, etc. should be supplied when defining the schedule (schedule_from_partitions), not when defining the PartitionedConfig. Does that make sense? Can talk about it on slack if easier.

94

to_job would be more informative than "a Job".

This revision now requires changes to proceed.Tue, Jul 13, 8:47 PM
python_modules/dagster/dagster/core/definitions/time_window_partitions.py
85

yup I see what you were saying. I was pretty sure the names were at least borked - that we would want to re-orient around partitions, but I think what you're saying makes more sense.

94

๐Ÿ‘

Move execution params to schedule creation fxn

I left a final comment that should be addressed before merging. Otherwise, looks great.

python_modules/dagster/dagster/core/definitions/partitioned_schedule.py
25

I think it's a little bit confusingly redundant to have both minute_of_hour and time_of_day. Preferable would be either:

  • Just time_of_day, and assert that hour is 0 if the partition cadence is hourly.
  • hour_of_day and minute_of_hour, both ints.

Gonna go with separate args for minute and hour to keep in theme.

This revision was not accepted when it landed; it landed in state Needs Review.Wed, Jul 14, 6:01 PM
This revision was automatically updated to reflect the committed changes.