Page MenuHomePhabricator

[core] remove EventSink
ClosedPublic

Authored by alangenfeld on Tue, Sep 10, 8:22 PM.

Details

Reviewers
schrockn
Group Reviewers
Restricted Project
Commits
R1:46ad4b2c243c: [core] remove EventSink
Summary

EventSink has been rendered unnecessary by DagsterInstance- this diff takes the steps necessary to remove it.

  • replace use of EventSink to communicate events with event store via the instance
  • simplify the pipeline run observable to avoid issues with gevent cooperative multitasking not working
Test Plan

unit tests (some should be added for dagstermill)
run the sleepy_pipeline in dagit in single and multi process mode - ensure logs are streamed back correctly

Diff Detail

Repository
R1 dagster
Lint
Automatic diff as part of commit; lint not applicable.
Unit
Automatic diff as part of commit; unit tests not applicable.

Event Timeline

alangenfeld created this revision.Tue, Sep 10, 8:22 PM
alangenfeld retitled this revision from [WIP] remove EventSink to [core] remove EventSink.Wed, Sep 11, 6:20 PM
alangenfeld edited the summary of this revision. (Show Details)
alangenfeld edited the test plan for this revision. (Show Details)
alangenfeld added a reviewer: Restricted Project.
alangenfeld updated this revision to Diff 4612.Wed, Sep 11, 7:05 PM
alangenfeld edited the summary of this revision. (Show Details)

rebase

Harbormaster failed remote builds in B3702: Diff 4612!
schrockn accepted this revision.Wed, Sep 11, 10:23 PM
This revision is now accepted and ready to land.Wed, Sep 11, 10:23 PM

take short path to moving run creation

alangenfeld updated this revision to Diff 4659.Thu, Sep 12, 5:29 PM

incorporate execute_run changes

alangenfeld updated this revision to Diff 4682.Thu, Sep 12, 9:07 PM

restack on sqlite event storage

max added a subscriber: max.Thu, Sep 12, 9:51 PM
max added inline comments.
python_modules/dagster-graphql/dagster_graphql/implementation/execution.py
244

curious how this will scale

python_modules/dagster-graphql/dagster_graphql/implementation/pipeline_execution_manager.py
147

I think that this new code path will reintroduce the spurious pipeline start / pipeline success / pipeline failure events we were trying previously to avoid

281–282

what is going on here

python_modules/dagster-graphql/dagster_graphql/implementation/pipeline_run_storage.py
15–16

is the idea here just, don't debounce events at all?

python_modules/dagster-graphql/dagster_graphql_tests/test_multiprocessing.py
98–99

undebug

python_modules/dagster/dagster/core/instance/__init__.py
59

undebug

python_modules/dagstermill/dagstermill/manager.py
133

can you say more

dagstermill fixes

alangenfeld updated this revision to Diff 4719.Fri, Sep 13, 5:27 PM

dask fixes

alangenfeld added inline comments.Fri, Sep 13, 6:09 PM
python_modules/dagster-graphql/dagster_graphql/implementation/pipeline_execution_manager.py
147

execute_run_iterator is the same impl as execute_pipeline_iterator

python_modules/dagster-graphql/dagster_graphql/implementation/pipeline_run_storage.py
15–16

ya - we can add it back if its a problem in a non-gevent based way

alangenfeld updated this revision to Diff 4734.Fri, Sep 13, 6:36 PM

remove dupe test

alangenfeld updated this revision to Diff 4735.Fri, Sep 13, 7:03 PM

enforce mode in PipelineRun

alangenfeld updated this revision to Diff 4736.Fri, Sep 13, 7:16 PM

default mode

This revision was automatically updated to reflect the committed changes.
prha added a subscriber: prha.Fri, Sep 13, 10:59 PM

I'm getting a bunch of serdes warnings about tags not being set and my old runs are not showing up after this landed... I wonder if we could support some kind of serialization defaults so that folks don't lose their history every time we update the pipeline run signature

Output:

Could not load pipeline run from /tmp/prha/history/runs/f903c719-a421-44a3-b73c-6d5c2b30a05a.json, continuing.
  Original exception: TypeError: __new__() missing 1 required positional argument: 'tags'