Differential D4918 Diff 25032 python_modules/dagster/dagster_tests/core_tests/launcher_tests/test_cli_api_running_multiprocessing.py
Changeset View
Changeset View
Standalone View
Standalone View
python_modules/dagster/dagster_tests/core_tests/launcher_tests/test_cli_api_running_multiprocessing.py
Show All 18 Lines | from dagster import ( | ||||
execute_pipeline, | execute_pipeline, | ||||
lambda_solid, | lambda_solid, | ||||
pipeline, | pipeline, | ||||
reconstructable, | reconstructable, | ||||
seven, | seven, | ||||
solid, | solid, | ||||
) | ) | ||||
from dagster.core.events import DagsterEventType | from dagster.core.events import DagsterEventType | ||||
from dagster.core.host_representation.handle import RepositoryHandle, RepositoryLocationHandle | from dagster.core.host_representation import ( | ||||
InProcessRepositoryLocationOrigin, | |||||
RepositoryHandle, | |||||
RepositoryLocationHandle, | |||||
) | |||||
from dagster.core.storage.pipeline_run import PipelineRunStatus | from dagster.core.storage.pipeline_run import PipelineRunStatus | ||||
from dagster.core.test_utils import instance_for_test | from dagster.core.test_utils import instance_for_test | ||||
from dagster.utils import file_relative_path, safe_tempfile_path | from dagster.utils import file_relative_path, safe_tempfile_path | ||||
from dagster.utils.hosted_user_process import external_pipeline_from_recon_pipeline | from dagster.utils.hosted_user_process import external_pipeline_from_recon_pipeline | ||||
@dagster_type_loader(String) | @dagster_type_loader(String) | ||||
def df_input_schema(_context, path): | def df_input_schema(_context, path): | ||||
▲ Show 20 Lines • Show All 112 Lines • ▼ Show 20 Lines | def test_works_in_memory(): | ||||
} | } | ||||
assert execute_pipeline(passing_pipeline, run_config).success | assert execute_pipeline(passing_pipeline, run_config).success | ||||
def _external_pipeline_from_def(pipeline_def, solid_selection=None): | def _external_pipeline_from_def(pipeline_def, solid_selection=None): | ||||
recon_pipeline = reconstructable(pipeline_def) | recon_pipeline = reconstructable(pipeline_def) | ||||
recon_repo = recon_pipeline.repository | recon_repo = recon_pipeline.repository | ||||
repo_def = recon_repo.get_definition() | repo_def = recon_repo.get_definition() | ||||
location_handle = RepositoryLocationHandle.create_in_process_location(recon_repo.pointer) | location_handle = RepositoryLocationHandle.create_from_repository_location_origin( | ||||
InProcessRepositoryLocationOrigin(recon_repo) | |||||
) | |||||
repository_handle = RepositoryHandle( | repository_handle = RepositoryHandle( | ||||
repository_name=repo_def.name, repository_location_handle=location_handle, | repository_name=repo_def.name, repository_location_handle=location_handle, | ||||
) | ) | ||||
return external_pipeline_from_recon_pipeline( | return external_pipeline_from_recon_pipeline( | ||||
reconstructable(pipeline_def), | reconstructable(pipeline_def), | ||||
solid_selection=solid_selection, | solid_selection=solid_selection, | ||||
repository_handle=repository_handle, | repository_handle=repository_handle, | ||||
) | ) | ||||
▲ Show 20 Lines • Show All 220 Lines • Show Last 20 Lines |