Changeset View
Changeset View
Standalone View
Standalone View
python_modules/dagster-graphql/dagster_graphql/test/utils.py
import sys | import sys | ||||
from dagster_graphql.implementation.context import DagsterGraphQLContext | from dagster_graphql.implementation.context import DagsterGraphQLContext | ||||
from dagster_graphql.schema import create_schema | from dagster_graphql.schema import create_schema | ||||
from graphql import graphql | from graphql import graphql | ||||
from dagster import check | from dagster import check | ||||
from dagster.cli.workspace import Workspace | from dagster.cli.workspace import Workspace | ||||
from dagster.core.code_pointer import CodePointer | from dagster.core.code_pointer import CodePointer | ||||
from dagster.core.definitions.reconstructable import ReconstructableRepository | |||||
from dagster.core.host_representation import RepositoryLocationHandle | from dagster.core.host_representation import RepositoryLocationHandle | ||||
from dagster.core.host_representation.handle import UserProcessApi | from dagster.core.host_representation.handle import UserProcessApi | ||||
from dagster.core.instance import DagsterInstance | from dagster.core.instance import DagsterInstance | ||||
from dagster.core.types.loadable_target_origin import LoadableTargetOrigin | from dagster.core.types.loadable_target_origin import LoadableTargetOrigin | ||||
def main_repo_location_name(): | |||||
return "test_location" | |||||
def main_repo_name(): | |||||
return "test_repo" | |||||
def execute_dagster_graphql(context, query, variables=None): | def execute_dagster_graphql(context, query, variables=None): | ||||
result = graphql( | result = graphql( | ||||
create_schema(), | create_schema(), | ||||
query, | query, | ||||
context_value=context, | context_value=context, | ||||
variable_values=variables, | variable_values=variables, | ||||
allow_subscriptions=True, | allow_subscriptions=True, | ||||
return_promise=False, | return_promise=False, | ||||
Show All 11 Lines | |||||
def execute_dagster_graphql_and_finish_runs(context, query, variables=None): | def execute_dagster_graphql_and_finish_runs(context, query, variables=None): | ||||
result = execute_dagster_graphql(context, query, variables) | result = execute_dagster_graphql(context, query, variables) | ||||
context.instance.run_launcher.join() | context.instance.run_launcher.join() | ||||
return result | return result | ||||
def define_context_for_file(python_file, fn_name, instance): | def define_in_process_context(python_file, fn_name, instance): | ||||
check.inst_param(instance, "instance", DagsterInstance) | check.inst_param(instance, "instance", DagsterInstance) | ||||
return DagsterGraphQLContext( | return DagsterGraphQLContext( | ||||
workspace=Workspace( | workspace=Workspace( | ||||
[ | [ | ||||
RepositoryLocationHandle.create_in_process_location( | RepositoryLocationHandle.create_in_process_location( | ||||
CodePointer.from_python_file(python_file, fn_name, None) | CodePointer.from_python_file(python_file, fn_name, None) | ||||
) | ) | ||||
] | ] | ||||
), | ), | ||||
instance=instance, | instance=instance, | ||||
) | ) | ||||
def define_out_of_process_context(python_file, fn_name, instance): | def define_out_of_process_context(python_file, fn_name, instance): | ||||
check.inst_param(instance, "instance", DagsterInstance) | check.inst_param(instance, "instance", DagsterInstance) | ||||
return DagsterGraphQLContext( | return DagsterGraphQLContext( | ||||
workspace=Workspace( | workspace=Workspace( | ||||
[ | [ | ||||
RepositoryLocationHandle.create_python_env_location( | RepositoryLocationHandle.create_python_env_location( | ||||
loadable_target_origin=LoadableTargetOrigin( | loadable_target_origin=LoadableTargetOrigin( | ||||
executable_path=sys.executable, python_file=python_file, attribute=fn_name, | executable_path=sys.executable, python_file=python_file, attribute=fn_name, | ||||
), | ), | ||||
location_name="test_location", | location_name=main_repo_location_name(), | ||||
user_process_api=UserProcessApi.CLI, | user_process_api=UserProcessApi.CLI, | ||||
) | ) | ||||
] | ] | ||||
), | ), | ||||
instance=instance, | instance=instance, | ||||
) | ) | ||||
def define_context_for_repository_yaml(path, instance): | |||||
check.inst_param(instance, "instance", DagsterInstance) | |||||
return DagsterGraphQLContext( | |||||
workspace=Workspace( | |||||
[ | |||||
RepositoryLocationHandle.create_in_process_location( | |||||
ReconstructableRepository.from_legacy_repository_yaml(path).pointer | |||||
) | |||||
] | |||||
), | |||||
instance=instance, | |||||
) | |||||
def infer_repository(graphql_context): | def infer_repository(graphql_context): | ||||
if len(graphql_context.repository_locations) == 1: | if len(graphql_context.repository_locations) == 1: | ||||
# This is to account for having a single in process repository | # This is to account for having a single in process repository | ||||
repository_location = graphql_context.repository_locations[0] | repository_location = graphql_context.repository_locations[0] | ||||
repositories = repository_location.get_repositories() | repositories = repository_location.get_repositories() | ||||
assert len(repositories) == 1 | assert len(repositories) == 1 | ||||
return next(iter(repositories.values())) | return next(iter(repositories.values())) | ||||
Show All 31 Lines |