Changeset View
Changeset View
Standalone View
Standalone View
python_modules/dagster/dagster/core/test_utils.py
import os | import os | ||||
import signal | import signal | ||||
import sys | import sys | ||||
import tempfile | import tempfile | ||||
import time | import time | ||||
from contextlib import contextmanager | from contextlib import contextmanager, nullcontext | ||||
import pendulum | import pendulum | ||||
import yaml | import yaml | ||||
from dagster import Shape, check, composite_solid, pipeline, solid | from dagster import Shape, check, composite_solid, pipeline, solid | ||||
from dagster.core.host_representation import ExternalPipeline | from dagster.core.host_representation import ExternalPipeline | ||||
from dagster.core.host_representation.origin import ExternalPipelineOrigin | from dagster.core.host_representation.origin import ExternalPipelineOrigin | ||||
from dagster.core.instance import DagsterInstance | from dagster.core.instance import DagsterInstance | ||||
from dagster.core.launcher import RunLauncher | from dagster.core.launcher import RunLauncher | ||||
▲ Show 20 Lines • Show All 63 Lines • ▼ Show 20 Lines | finally: | ||||
if value is None: | if value is None: | ||||
if key in os.environ: | if key in os.environ: | ||||
del os.environ[key] | del os.environ[key] | ||||
else: | else: | ||||
os.environ[key] = value | os.environ[key] = value | ||||
@contextmanager | @contextmanager | ||||
def instance_for_test(overrides=None): | def instance_for_test(overrides=None, set_dagster_home=True): | ||||
with tempfile.TemporaryDirectory() as temp_dir: | with tempfile.TemporaryDirectory() as temp_dir: | ||||
with instance_for_test_tempdir(temp_dir, overrides) as instance: | with instance_for_test_tempdir( | ||||
temp_dir, overrides, set_dagster_home=set_dagster_home | |||||
) as instance: | |||||
yield instance | yield instance | ||||
@contextmanager | @contextmanager | ||||
def instance_for_test_tempdir(temp_dir, overrides=None): | def instance_for_test_tempdir(temp_dir, overrides=None, set_dagster_home=True): | ||||
# If using the default run launcher, wait for any grpc processes that created runs | # If using the default run launcher, wait for any grpc processes that created runs | ||||
# during test disposal to finish, since they might also be using this instance's tempdir | # during test disposal to finish, since they might also be using this instance's tempdir | ||||
instance_overrides = merge_dicts( | instance_overrides = merge_dicts( | ||||
{ | { | ||||
"run_launcher": { | "run_launcher": { | ||||
"class": "DefaultRunLauncher", | "class": "DefaultRunLauncher", | ||||
"module": "dagster.core.launcher.default_run_launcher", | "module": "dagster.core.launcher.default_run_launcher", | ||||
"config": { | "config": { | ||||
"wait_for_processes": True, | "wait_for_processes": True, | ||||
}, | }, | ||||
} | } | ||||
}, | }, | ||||
(overrides if overrides else {}), | (overrides if overrides else {}), | ||||
) | ) | ||||
# Write any overrides to disk and set DAGSTER_HOME so that they will still apply when | # Write any overrides to disk and set DAGSTER_HOME so that they will still apply when | ||||
# DagsterInstance.get() is called from a different process | # DagsterInstance.get() is called from a different process | ||||
with environ({"DAGSTER_HOME": temp_dir}): | with environ({"DAGSTER_HOME": temp_dir}) if set_dagster_home else nullcontext(): | ||||
johann: ooh haven't used this before | |||||
with open(os.path.join(temp_dir, "dagster.yaml"), "w") as fd: | with open(os.path.join(temp_dir, "dagster.yaml"), "w") as fd: | ||||
yaml.dump(instance_overrides, fd, default_flow_style=False) | yaml.dump(instance_overrides, fd, default_flow_style=False) | ||||
with DagsterInstance.get() as instance: | with DagsterInstance.from_config(temp_dir) as instance: | ||||
try: | try: | ||||
yield instance | yield instance | ||||
except: | except: | ||||
sys.stderr.write( | sys.stderr.write( | ||||
"Test raised an exception, attempting to clean up instance:" | "Test raised an exception, attempting to clean up instance:" | ||||
+ serializable_error_info_from_exc_info(sys.exc_info()).to_string() | + serializable_error_info_from_exc_info(sys.exc_info()).to_string() | ||||
+ "\n" | + "\n" | ||||
) | ) | ||||
▲ Show 20 Lines • Show All 285 Lines • Show Last 20 Lines |
ooh haven't used this before