Page MenuHomeElementl

enable setting a step selection on run requests
Needs RevisionPublic

Authored by sandyryza on Feb 6 2021, 7:16 PM.

Details

Summary

Some uses -

  • My pipeline includes a step that processes a big dataset and then a step that joins it with a smaller dataset. I want the second step to run every hour, but I only need the first step to run once a day.
  • My pipeline has a step for each file in a directory. I want to run that step each time the file changes.

This essentially offers a path to memoization that doesn't require adding new abstractions.

Test Plan

bk

Diff Detail

Repository
R1 dagster
Branch
sensor-step-selection (branched from master)
Lint
Lint Passed
Unit
No Test Coverage

Event Timeline

nice, seems reasonable - only question I have is if you put in an invalid step selection, is that being validated / error surfaced at the right place? Presumably it'd be the execution plan stage which I believe we handle pretty well currently (surfacing it as a tick error, similar to if you misconfigured the run config)

re: your TODO, Getting this in for schedules should be close to identical code I think/hope. Probably worth doing in fairly short order for consistency?

Will let @prha take a look too

prha requested changes to this revision.Feb 8 2021, 4:43 PM

Yeah, because RunRequest can be a common API between schedules and sensors now, we should have both be threaded through, preferably in the same commit.

+1 to dgibson's question about surfacing the appropriate error message on invalid step selections.

Returning back to your queue

This revision now requires changes to proceed.Feb 8 2021, 4:43 PM
python_modules/dagster/dagster/core/definitions/job.py
71

I think this should be on the other python execution APIs as well, and would be great if we could do "retry-from-failure" via this entry point.

Not for this diff, but just to test the waters - what do folks if the type of this was

step_selection: (Optional[Union[List[str], StepSelection]]

and you could do something like

step_selection=StartFromFailures(previous_run_id=...)

(with better names)
or

step_selection=SkipWhatsMemoized()

@alangenfeld -

I think this should be on the other python execution APIs as well, and would be great if we could do "retry-from-failure" via this entry point.

So I think this raises an interesting question, which is basically: where should interesting step selection logic live? A thesis of this diff is kind of that it can live in the sensor/schedule itself.

I.e. the sensor/schedule body itself could include:

remaining_steps = instance.get_unfinished_steps(previous_run_id=...)
yield RunRequest(step_selection=remaining_steps)

or:

execution_plan = create_execution_plan(my_pipeline)
missing_data_steps = steps_with_unmemoized_outputs(execution_plan)
yield RunRequest(step_selection=missing_data_steps)

This is attractive to me because it's one less place users need to think about their code executing. Thoughts?

I think the main benefit of a step_selection=DeclarativeSpec(...) approach is that we can capture the intent, which I think can lead to richer product experiences.

For example to reach parity with the existing dagit retry-from-failure experience, the RunRequest would have to include the right dagster/... tags to associate the run with its previous run and origin run.

python_modules/dagster/dagster/scheduler/sensor.py
373

not sure if it has any impact but may be best to leave this as None if there was no step selection

Just noting that this will continue the pattern of intermixing step and solid selections in our public APIs

This now works on schedules as well as sensors

Just noting that this will continue the pattern of intermixing step and solid selections in our public APIs

Ya theres a lot to think through about our current situation and how we can move forward to something better.

I think there are two fundamental problems at play:

  • The degree to which we communicate / hide ExecutionPlan & ExecutionStep as concepts to users
  • The subtlety between two different but similar capabilities we offer
    • modifying a target Pipeline (orchestration graph) by choosing a subset of it
      • config schema / behavior change coming from nodes with inputs that are no longer satisfied
    • modifying a target ExecutionPlan (execution graph) by choosing a subset of it
      • only the whitelisted subset of nodes execute, no other behavior changes

I think there is a lot of valuable utility in handling this subtle distinction, but the way it manifests in the system today is not great. I think this is due

Current State

We only really treat the "execution graph" as tangible once execution starts. The ways of manipulating it are all centered on re-execution.
The python APIs allow for manipulating a target (orchestration graph) before passing it in, or passing in the manipulating arguments.

  • python APIs
    • orchestration graph modification
      • execute_pipeline takes solid_selection
      • schedules, sensors, preset definitions take solid_selection
      • PipelineDefinition has get_pipeline_subset_def
      • IPipeline has subset_for_execution (which ends up calling get_pipeline_subset_def
    • execution graph modification
      • reexecute_pipeline takes step_selection
      • create_execution_plan takes step_keys_to_execute
  • Dagit
    • orchestration graph modification
      • in the playground - selector syntax / visual selection
    • execution graph modification
      • when viewing a completed execution - selector syntax / from-failure retry option

Moving Forward

Abstractly I think it makes sense to take execution graph modifications in the execution APIs, and perform orchestration graph transformations on the graph before passing them in.

# executes x and downstream, x inputs loaded same as if upstreams executed
execute(target_graph, selection=["x*"]) 

# executes x and downstream, need to define how to get x's inputs (trigger root_input_managers)
execute(target_graph.subset(["x*"]))

but even that is pretty subtle against todays set of abstractions. I think it likely feels better if we had more weight around the act of binding ala "executable"

# just fires whitelisted nodes from full graph
full_exe = executable(target_graph, resources={...}, run_config={...})
execute(full_exe, selection=["x*"]) 


# only need to provide resources needed for nodes in subset
# run_config defines x inputs
target_exe = executable(target_graph, subset_selection=["x*"], resources={...}, run_config={...})
execute(target_exe)

Just wanted to write that all out since its been on the mind lately.

As for this diff, I think this is functionality we want to add. I do think its confusing that you have solid_selection on ScheduleDefinition and then step_selection on RunRequest. Is a overview docs entry on the difference sufficient mitigation? Are there better names for the args that would help?

We only really treat the "execution graph" as tangible once execution starts.

An exception to this is backfills - i.e. we let you launch a backfill over a step selection.

My broad take on the solid selection vs. step selection issue is that we should heavily deemphasize solid subselection. The best way to think about solid selection is that it's creating a new pipeline definition that happens to be very similar to its parent pipeline definition. It's a fairly heavy-weight thing to do, and I struggle to come up with situations where it's the right thing to do - maybe unit-testing a subgraph?

So the direction I tend towards would be to de-emphasize solid selection everywhere and start adding in step selection everywhere.

@alangenfeld @max - in your minds, is resolving the solid selection / step selection issue something we should do before merging this?

My broad take on the solid selection vs. step selection issue is that we should heavily deemphasize solid subselection

I agree, though i still worry about the user experience of someone trying these execution selection patterns and hitting issues with any run scoped io managers they have. Solvable problems though.

in your minds, is resolving the solid selection / step selection issue something we should do before merging this?

I doubt this is without further iteration, but I don't think we need to block it. If no one else chimes in I will accept.

For example, I feel something like execution_selection is a better name for this stuff since it skates around the existence of execution "steps" which we do not communicate effectively. That said we can rename step_selection later since there are the other python APIs that will need to change as well. RunRequest is serialized not persisted so should be lower cost.

if you put in an invalid step selection, is that being validated / error surfaced at the right place?

add a test for this - unless im mistaken the current tests are just for the happy path

to your queue for a pass on the bad-selection test & manual verification in dagit

This revision now requires changes to proceed.Feb 22 2021, 9:15 PM