Changeset View
Changeset View
Standalone View
Standalone View
python_modules/dagster/dagster/utils/test/__init__.py
Show All 17 Lines | from dagster import ( | ||||
check, | check, | ||||
execute_pipeline, | execute_pipeline, | ||||
lambda_solid, | lambda_solid, | ||||
seven, | seven, | ||||
) | ) | ||||
from dagster.core.definitions.logger import LoggerDefinition | from dagster.core.definitions.logger import LoggerDefinition | ||||
from dagster.core.definitions.pipeline_base import InMemoryPipeline | from dagster.core.definitions.pipeline_base import InMemoryPipeline | ||||
from dagster.core.definitions.resource import ScopedResourcesBuilder | from dagster.core.definitions.resource import ScopedResourcesBuilder | ||||
from dagster.core.definitions.solid import ISolidDefinition | from dagster.core.definitions.solid import NodeDefinition | ||||
from dagster.core.execution.api import create_execution_plan, scoped_pipeline_context | from dagster.core.execution.api import create_execution_plan, scoped_pipeline_context | ||||
from dagster.core.execution.context_creation_pipeline import ( | from dagster.core.execution.context_creation_pipeline import ( | ||||
SystemPipelineExecutionContext, | SystemPipelineExecutionContext, | ||||
construct_execution_context_data, | construct_execution_context_data, | ||||
create_context_creation_data, | create_context_creation_data, | ||||
create_executor, | create_executor, | ||||
create_log_manager, | create_log_manager, | ||||
) | ) | ||||
▲ Show 20 Lines • Show All 253 Lines • ▼ Show 20 Lines | Args: | ||||
execution, as a dict. | execution, as a dict. | ||||
raise_on_error (Optional[bool]): Whether or not to raise exceptions when they occur. | raise_on_error (Optional[bool]): Whether or not to raise exceptions when they occur. | ||||
Defaults to ``True``, since this is the most useful behavior in test. | Defaults to ``True``, since this is the most useful behavior in test. | ||||
Returns: | Returns: | ||||
Union[CompositeSolidExecutionResult, SolidExecutionResult]: The result of executing the | Union[CompositeSolidExecutionResult, SolidExecutionResult]: The result of executing the | ||||
solid. | solid. | ||||
""" | """ | ||||
check.inst_param(solid_def, "solid_def", ISolidDefinition) | check.inst_param(solid_def, "solid_def", NodeDefinition) | ||||
check.opt_inst_param(mode_def, "mode_def", ModeDefinition) | check.opt_inst_param(mode_def, "mode_def", ModeDefinition) | ||||
input_values = check.opt_dict_param(input_values, "input_values", key_type=str) | input_values = check.opt_dict_param(input_values, "input_values", key_type=str) | ||||
solid_defs = [solid_def] | solid_defs = [solid_def] | ||||
def create_value_solid(input_name, input_value): | def create_value_solid(input_name, input_value): | ||||
@lambda_solid(name=input_name) | @lambda_solid(name=input_name) | ||||
def input_solid(): | def input_solid(): | ||||
return input_value | return input_value | ||||
▲ Show 20 Lines • Show All 63 Lines • ▼ Show 20 Lines | with yield_empty_pipeline_context() as pipeline_context: | ||||
"Type checks can only return TypeCheck. Type {type_name} returned {value}.".format( | "Type checks can only return TypeCheck. Type {type_name} returned {value}.".format( | ||||
type_name=dagster_type.name, value=repr(type_check) | type_name=dagster_type.name, value=repr(type_check) | ||||
) | ) | ||||
) | ) | ||||
return type_check | return type_check | ||||
@contextmanager | @contextmanager | ||||
def restore_directory(src): | def copy_directory(src): | ||||
with seven.TemporaryDirectory() as temp_dir: | with seven.TemporaryDirectory() as temp_dir: | ||||
dst = os.path.join(temp_dir, os.path.basename(src)) | dst = os.path.join(temp_dir, os.path.basename(src)) | ||||
shutil.copytree(src, dst) | shutil.copytree(src, dst) | ||||
try: | yield dst | ||||
yield | |||||
finally: | |||||
shutil.rmtree(src) | |||||
shutil.copytree(dst, src) | |||||
class FilesystemTestScheduler(Scheduler, ConfigurableClass): | class FilesystemTestScheduler(Scheduler, ConfigurableClass): | ||||
"""This class is used in dagster core and dagster_graphql to test the scheduler's interactions | """This class is used in dagster core and dagster_graphql to test the scheduler's interactions | ||||
with schedule storage, which are implemented in the methods defined on the base Scheduler class. | with schedule storage, which are implemented in the methods defined on the base Scheduler class. | ||||
Therefore, the following methods used to actually schedule jobs (e.g. create and remove cron jobs | Therefore, the following methods used to actually schedule jobs (e.g. create and remove cron jobs | ||||
on a cron tab) are left unimplemented. | on a cron tab) are left unimplemented. | ||||
""" | """ | ||||
Show All 36 Lines |