Page MenuHomePhabricator

[sensors-5] Redefine sensors in terms of SensorTickData / SensorRunParams
ClosedPublic

Authored by prha on Tue, Nov 3, 11:01 PM.

Details

Summary

Alternative API for sensors, where a single sensor evaluation can launch multiple runs, each with unique run config and tags

Expose a namedtuple class SensorTickData, that contains a list of SensorRunParams. SensorRunParams contain tags and run_config, which could then be reused for partition set definitions, schedules, etc.

Test Plan

bk

Diff Detail

Repository
R1 dagster
Lint
Automatic diff as part of commit; lint not applicable.
Unit
Automatic diff as part of commit; unit tests not applicable.

Event Timeline

prha requested review of this revision.Tue, Nov 3, 11:18 PM
prha retitled this revision from [sensors-5] redefine sensors to [sensors-5] RFC: redefine sensors.Wed, Nov 4, 2:07 AM
prha edited the summary of this revision. (Show Details)
prha added reviewers: dgibson, schrockn, sandyryza.

Here's a comparison of what it would look like to implement the "launch a run for each file in an S3 directory". I think the sensors-5 version is considerably cleaner.

sensors-2:

def run_per_s3_file_config_fn(context):
    all_s3_files = get_all_s3_files_in_directory("some_dir")
    for file in all_s3_files:
        if not run_exists(context, file):
            return {"the_file": file}


@sensor(run_config_fn=run_per_s3_file_config_fn)
def run_per_s3_file(context):
    all_s3_files = get_all_s3_files_in_directory("some_dir")
    for file in all_s3_files:
        if not run_exists(context, file):
            return True

sensors-5:

@sensor
def run_per_s3_file(context):
    all_s3_files = get_all_s3_files_in_directory("some_dir")
    for file in all_s3_files:
        if not run_exists(context, file):
            yield {"the_file": file}

And here's a comparison of what it would look like to implement the "launch a run if there's something at this S3 path". I don't think the sensors-5 version is significantly messier.

sensors-2:

@sensor
def run_if_s3_file():
    return s3_file_exists("some_path")

sensors-5:

@sensor
def run_if_s3_file():
    if s3_file_exists("some_path"):
        yield {}

Another advantage of sensors-5 over sensors-2: If someone wants to have a sensor that can kick off a bunch of runs "at once", sensors-2 would require them to have a very fast tick. Maybe kicking off a ton of runs at once isn't something we want to support? Though I still think there are backfill approaches that could benefit from sensors kicking off a bunch of runs.

Preaching to the choir, @sandyryza

The only downside of this approach is that I prefer having distinct, named params, instead of some dictionary with named keys. This would require another top-level object for folks to used (e.g. JobParams)

sensors-5 probably makes more sense for implementing schedules on top of this as well, we're definitely going to want to potentially kick off multiple runs per 'tick' for those.

expose JobConfig, so that the main sensor function returns a list of JobConfig
Added scheduler logic for skipping duplicate execution_key job invocations

prha retitled this revision from [sensors-5] RFC: redefine sensors to [sensors-5] Redefine sensors in terms of JobConfig.Fri, Nov 6, 8:00 PM
prha edited the summary of this revision. (Show Details)

Sorry for entering the fray late if there has already been back and forth on the "JobConfig" naming. My two cents:

I think "Config" means something pretty specific in dagster already - i.e. something like "an object that can satisfy a config schema". If we can avoid diluting it, I think that's preferable. Was there pushback on "Params"? I liked that.

Are the objects that are yielded by sensors 1:1 with runs? If so, could we call it RunParams?

Yeah, I'm not too attached to JobConfig, just exploring the naming space. I like having the word Job in there, to cement the concept of these things that job definitions return, that parameterize launched runs. I guess I would prefer JobParams over RunParams.

My impression of the common usage of naming something XConfig or XParams is that X is the thing being configured or parameterized. E.g. "run config" is the configuration for a particular run.

If I'm understanding the meaning of job correctly, it's not the Job itself that is being configured. The sensor is the job, and the sensor itself isn't being configured or parameterized by the produced object.

I'm really confused by JobConfig. From my perspective the current JobConfig in the diff represents the parameters to a run. It represents "I have determined that in this tick there should be a run launcher, and here are the parameters". JobConfig sounds like an argument that would be passed into the *job* definition.

I think it might be helpful to write proposed english language descriptions of runs and jobs to make it clear what we are building so that we are all 100% in agreement about the proposed ontology

Your impression is correct. The current JobConfig is really a 1-to-1 mapping of the dynamically-resolved parameters required to launch a run (run_config, tags). It's a bit annoying to have both of these. Each sensor can resolve to a list of these parameters.

The early iterations of this diff (see https://dagster.phacility.com/D5026?id=25165) just had the sensor body return a list of dictionaries that had run_config and tags as keys, but it feels awkward and error prone to have the body return a dict.

I think the top-level options to explore are:

  1. Have sensor bodies return dictionaries and have very explicit error messages if they pass the wrong keys. This keeps tags/run_config separately defined, and both dynamically resolved.
  2. Rename JobConfig to something like RunParams, keeping tags/run_config separate, but introducing a noun that contains both. We can redefine the schedule definitions and the partition set definitions to use run_param_fn instead of all having individual run_config_fn, tag_fn, should_execute_fn. Should note that aside from mode, this is essentially a preset.
  3. Try to keep only a single concept of run_config and have tags be defined within the run_config. This might be conceptually cleaner, but has a bunch of far-ranging product implications (UI changes, may affect presets, etc).

I think one way to explore this is to ask how this API is going to expand in the future? Use cases including:

  1. The user returning information about why a run was *not* scheduled
  2. The user returning information about a future potential run.

There may be others.

I would also consider how a user would describe what they are returning to their colleagues in an english sentence.

"At every tick a sensor returns instructions back to the system. This can include runs to be launched,"

or some other thing.

prha edited the summary of this revision. (Show Details)

renamed

prha retitled this revision from [sensors-5] Redefine sensors in terms of JobConfig to [sensors-5] Redefine sensors in terms of SensorTickData / SensorRunParams.

Big question I have is whether execution_key should be required, and would also love tests around failure recovery to make sure we keep the run-exactly-once guarantee no matter what, but I think this makes sense

python_modules/dagster/dagster/core/definitions/sensor.py
29–30

specifically we promise that even if you return multiple runs with the same key, we will only launch exactly one for that key right? To recover from failures better?

57

any plans to add the cursor to this as well

python_modules/dagster/dagster/core/storage/schedules/sql_schedule_storage.py
330

are we going to want a version of this that takes in multiple keys?

Just imagining that if sensors are returning lists of runs, each with an execution_key, we'll probably want to look for any ticks for all of those keys, and it might be nice to do that in one query rather than N queries?

relatedly is it that useful to have a has_job_tick function vs. using get_job_tick and returning the actual row?

python_modules/dagster/dagster/scheduler/sensor.py
356–357

as above we could maybe do this in one SQL query vs. N, might be a premature optimization though

358

I'm wondering if we should require an execution_key so that we can actually guarantee this type of check will be done for all sensors - otherwise correctness issues could happen and we will be blamed, fairly or not

363–369

do we also need a version of this check that also checks the run DB for a run with a certain tag with that execution key? https://dagster.phacility.com/source/dagster/browse/master/python_modules/dagster/dagster/scheduler/scheduler.py$286-293

python_modules/dagster/dagster_tests/scheduler_tests/test_sensor_run.py
267–270

nice, this is good - I will sleep a lot better once there are also tests simulating failure at various states as well :)

This revision is now accepted and ready to land.Tue, Nov 17, 3:44 PM

update, add run tag checking to help with sensor failure recovery, added tests

prha requested review of this revision.Mon, Nov 23, 11:07 PM

this looks great. the only mustfix is changing .launch_run to .submit_run, everything else is just a suggestion

python_modules/dagster/dagster/core/definitions/decorators/sensor.py
12–22

nit I think you could take out the 'more flexible' part and just say what it can return

15

we could give a little more guidance here about why you might do this (to surface the reason your schedule didn't run)?

One thing that's not clear to me is if returning zero SensorRunParams is an option, or if you have to return a SensorSkipData if you're skipping

41

should we typecheck the items coming out as well?

python_modules/dagster/dagster/core/definitions/sensor.py
29

do you have a strong opinion on whether this should be optional? I'm still leaning towards making you specify it to give people a 'pit of success' (but allowing None if for some reason your sensor runs aren't dedeupeable)

I think we could be super explicit here about what we mean by deduplication - the sensor will ensure that exactly one run happens for each execution_key, and will not launch runs if the same execution_key is returned again later

54

last_completed_time seems like a better name than last_evaluation_time (wrt failures)

110–116

ah I guess we typecheck here later, maybe ignore my previous suggestion

python_modules/dagster/dagster/core/host_representation/external_data.py
240–241

should we enforce here that they're mutually exclusive? (they are right?)

python_modules/dagster/dagster/core/storage/pipeline_run.py
309–316

this has the same problem as its schedule sibling that it doesn't uniquely identify the sensor :/ we should get the repository origin in here as well somehow (not blocking this diff)

312

schedule => sensor

python_modules/dagster/dagster/core/test_utils.py
349–351

nit not sure this statement is necc. broadly applicable to every test, we might just want to disable the tests that are using this on windows vs relying on this returning an empty list (maybe I should have done that on the diff that added this comment)

python_modules/dagster/dagster/scheduler/sensor.py
207–213

still think we should consider a single query here, this could get pretty slow for large sensors?

229

do we have a test that verifies that we recover when it fails partway through a multi-run loop (within a single sensor)? That's one case that couldn't happen in the previous scheduler tests

235

this should be submit_run (so that it can use the queuer if needed)

241–252

check what the scheduler does for this part now, it's gotten simpler / more stuff is handled by instance.submit_Run

https://dagster.phacility.com/source/dagster/browse/master/python_modules/dagster/dagster/scheduler/scheduler.py$324-345

331–334

nit - tags_for_sensor could take in an optional execution_key arg as well

python_modules/dagster/dagster_tests/scheduler_tests/test_sensor_failure_recovery.py
33–35

do you need to freeze time for sensor tests? I thought most of that was to get croniter to actually launch the ticks as expected

maybe for the last_evaluation_time

172

seems like we should include the key in this message

This revision is now accepted and ready to land.Tue, Nov 24, 4:03 PM

dgibson

python_modules/dagster/dagster/core/definitions/sensor.py
29

I kind of agree that it should be required. The execution patterns and failure recovery behaviors are pretty weird without it. I can add a flag that overrides the check, but prefer to do in a separate diff for now, since I need to think through how this would work with schedules.

python_modules/dagster/dagster_tests/scheduler_tests/test_sensor_failure_recovery.py
33–35

for these tests, it's just to get the timestamp right in the logs.

enforce execution key parameter

prha marked 16 inline comments as done.Tue, Nov 24, 6:02 PM