Differential D9079 Diff 42358 python_modules/dagster/dagster_tests/general_tests/grpc_tests/test_persistent.py
Changeset View
Changeset View
Standalone View
Standalone View
python_modules/dagster/dagster_tests/general_tests/grpc_tests/test_persistent.py
import os | import os | ||||
import re | import re | ||||
import subprocess | import subprocess | ||||
import uuid | import uuid | ||||
import pytest | import pytest | ||||
from dagster import seven | from dagster import seven | ||||
from dagster.core.errors import DagsterUserCodeProcessError | |||||
from dagster.core.host_representation.origin import ( | from dagster.core.host_representation.origin import ( | ||||
ExternalRepositoryOrigin, | ExternalRepositoryOrigin, | ||||
GrpcServerRepositoryLocationOrigin, | GrpcServerRepositoryLocationOrigin, | ||||
) | ) | ||||
from dagster.core.test_utils import environ, instance_for_test, new_cwd | from dagster.core.test_utils import environ, instance_for_test, new_cwd | ||||
from dagster.grpc.client import DagsterGrpcClient | from dagster.grpc.client import DagsterGrpcClient | ||||
from dagster.grpc.server import wait_for_grpc_server | from dagster.grpc.server import wait_for_grpc_server | ||||
from dagster.grpc.types import SensorExecutionArgs | from dagster.grpc.types import SensorExecutionArgs | ||||
from dagster.serdes import deserialize_json_to_dagster_namedtuple | from dagster.serdes import deserialize_json_to_dagster_namedtuple | ||||
from dagster.serdes.ipc import DagsterIPCProtocolError | |||||
from dagster.seven import get_system_temp_directory | from dagster.seven import get_system_temp_directory | ||||
from dagster.utils import file_relative_path, find_free_port | from dagster.utils import file_relative_path, find_free_port | ||||
from dagster.utils.error import SerializableErrorInfo | from dagster.utils.error import SerializableErrorInfo | ||||
def _get_ipc_output_file(): | def _get_ipc_output_file(): | ||||
return os.path.join( | return os.path.join( | ||||
get_system_temp_directory(), "grpc-server-startup-{uuid}".format(uuid=uuid.uuid4().hex) | get_system_temp_directory(), "grpc-server-startup-{uuid}".format(uuid=uuid.uuid4().hex) | ||||
) | ) | ||||
def test_ping(): | def test_ping(): | ||||
port = find_free_port() | port = find_free_port() | ||||
python_file = file_relative_path(__file__, "grpc_repo.py") | python_file = file_relative_path(__file__, "grpc_repo.py") | ||||
subprocess_args = [ | ipc_output_file = _get_ipc_output_file() | ||||
process = subprocess.Popen( | |||||
[ | |||||
"dagster", | "dagster", | ||||
"api", | "api", | ||||
"grpc", | "grpc", | ||||
"--port", | "--port", | ||||
str(port), | str(port), | ||||
"--python-file", | "--python-file", | ||||
python_file, | python_file, | ||||
] | "--ipc-output-file", | ||||
ipc_output_file, | |||||
process = subprocess.Popen(subprocess_args, stdout=subprocess.PIPE) | ], | ||||
stdout=subprocess.PIPE, | |||||
) | |||||
try: | try: | ||||
wait_for_grpc_server( | wait_for_grpc_server(process, ipc_output_file) | ||||
process, DagsterGrpcClient(port=port, host="localhost"), subprocess_args | |||||
) | |||||
assert DagsterGrpcClient(port=port).ping("foobar") == "foobar" | assert DagsterGrpcClient(port=port).ping("foobar") == "foobar" | ||||
finally: | finally: | ||||
process.terminate() | process.terminate() | ||||
def test_load_via_env_var(): | def test_load_via_env_var(): | ||||
port = find_free_port() | port = find_free_port() | ||||
python_file = file_relative_path(__file__, "grpc_repo.py") | python_file = file_relative_path(__file__, "grpc_repo.py") | ||||
subprocess_args = [ | with environ( | ||||
{"DAGSTER_CLI_API_GRPC_HOST": "localhost", "DAGSTER_CLI_API_GRPC_PORT": str(port)} | |||||
): | |||||
ipc_output_file = _get_ipc_output_file() | |||||
process = subprocess.Popen( | |||||
[ | |||||
"dagster", | "dagster", | ||||
"api", | "api", | ||||
"grpc", | "grpc", | ||||
"--python-file", | "--python-file", | ||||
python_file, | python_file, | ||||
] | "--ipc-output-file", | ||||
ipc_output_file, | |||||
with environ( | ], | ||||
{"DAGSTER_CLI_API_GRPC_HOST": "localhost", "DAGSTER_CLI_API_GRPC_PORT": str(port)} | |||||
): | |||||
process = subprocess.Popen( | |||||
subprocess_args, | |||||
stdout=subprocess.PIPE, | stdout=subprocess.PIPE, | ||||
) | ) | ||||
try: | try: | ||||
wait_for_grpc_server( | wait_for_grpc_server(process, ipc_output_file) | ||||
process, DagsterGrpcClient(port=port, host="localhost"), subprocess_args | |||||
) | |||||
assert DagsterGrpcClient(port=port).ping("foobar") == "foobar" | assert DagsterGrpcClient(port=port).ping("foobar") == "foobar" | ||||
finally: | finally: | ||||
process.terminate() | process.terminate() | ||||
def test_load_with_invalid_param(capfd): | def test_load_with_invalid_param(capfd): | ||||
port = find_free_port() | port = find_free_port() | ||||
python_file = file_relative_path(__file__, "grpc_repo.py") | python_file = file_relative_path(__file__, "grpc_repo.py") | ||||
subprocess_args = [ | ipc_output_file = _get_ipc_output_file() | ||||
process = subprocess.Popen( | |||||
[ | |||||
"dagster", | "dagster", | ||||
"api", | "api", | ||||
"grpc", | "grpc", | ||||
"--port", | "--port", | ||||
str(port), | str(port), | ||||
"--python-file", | "--python-file", | ||||
python_file, | python_file, | ||||
"--ipc-output-file", | |||||
ipc_output_file, | |||||
"--foo-param", | "--foo-param", | ||||
"bar_value", | "bar_value", | ||||
] | ], | ||||
process = subprocess.Popen( | |||||
subprocess_args, | |||||
stdout=subprocess.PIPE, | stdout=subprocess.PIPE, | ||||
) | ) | ||||
try: | try: | ||||
with pytest.raises( | with pytest.raises(DagsterIPCProtocolError): | ||||
Exception, | wait_for_grpc_server(process, ipc_output_file) | ||||
match='gRPC server exited with return code 2 while starting up with the command: "dagster api grpc --port', | |||||
): | |||||
wait_for_grpc_server( | |||||
process, DagsterGrpcClient(port=port, host="localhost"), subprocess_args | |||||
) | |||||
finally: | finally: | ||||
process.terminate() | process.terminate() | ||||
_, err = capfd.readouterr() | _, err = capfd.readouterr() | ||||
assert "no such optio" in err | |||||
assert "no such option" in err | |||||
def test_load_with_error(capfd): | def test_load_with_error(capfd): | ||||
port = find_free_port() | port = find_free_port() | ||||
python_file = file_relative_path(__file__, "grpc_repo_with_error.py") | python_file = file_relative_path(__file__, "grpc_repo_with_error.py") | ||||
subprocess_args = [ | ipc_output_file = _get_ipc_output_file() | ||||
process = subprocess.Popen( | |||||
[ | |||||
"dagster", | "dagster", | ||||
"api", | "api", | ||||
"grpc", | "grpc", | ||||
"--port", | "--port", | ||||
str(port), | str(port), | ||||
"--python-file", | "--python-file", | ||||
python_file, | python_file, | ||||
] | "--ipc-output-file", | ||||
ipc_output_file, | |||||
process = subprocess.Popen( | ], | ||||
subprocess_args, | |||||
stdout=subprocess.PIPE, | stdout=subprocess.PIPE, | ||||
) | ) | ||||
try: | try: | ||||
with pytest.raises(Exception): | with pytest.raises(DagsterUserCodeProcessError): | ||||
wait_for_grpc_server( | wait_for_grpc_server(process, ipc_output_file) | ||||
process, DagsterGrpcClient(port=port, host="localhost"), subprocess_args | |||||
) | |||||
process.wait() | process.wait() | ||||
_, err = capfd.readouterr() | _, err = capfd.readouterr() | ||||
assert "No module named" in err | assert "No module named" in err | ||||
finally: | finally: | ||||
if process.poll() is None: | if process.poll() is None: | ||||
process.terminate() | process.terminate() | ||||
Show All 16 Lines | else: | ||||
assert "No such file or directory" in err | assert "No such file or directory" in err | ||||
def test_load_with_empty_working_directory(capfd): | def test_load_with_empty_working_directory(capfd): | ||||
port = find_free_port() | port = find_free_port() | ||||
# File that will fail if working directory isn't set to default | # File that will fail if working directory isn't set to default | ||||
python_file = file_relative_path(__file__, "grpc_repo_with_local_import.py") | python_file = file_relative_path(__file__, "grpc_repo_with_local_import.py") | ||||
subprocess_args = [ | with new_cwd(os.path.dirname(__file__)): | ||||
ipc_output_file = _get_ipc_output_file() | |||||
process = subprocess.Popen( | |||||
[ | |||||
"dagster", | "dagster", | ||||
"api", | "api", | ||||
"grpc", | "grpc", | ||||
"--port", | "--port", | ||||
str(port), | str(port), | ||||
"--python-file", | "--python-file", | ||||
python_file, | python_file, | ||||
] | "--ipc-output-file", | ||||
ipc_output_file, | |||||
with new_cwd(os.path.dirname(__file__)): | ], | ||||
process = subprocess.Popen( | |||||
subprocess_args, | |||||
stdout=subprocess.PIPE, | stdout=subprocess.PIPE, | ||||
) | ) | ||||
try: | try: | ||||
wait_for_grpc_server( | wait_for_grpc_server(process, ipc_output_file) | ||||
process, DagsterGrpcClient(port=port, host="localhost"), subprocess_args | |||||
) | |||||
assert DagsterGrpcClient(port=port).ping("foobar") == "foobar" | assert DagsterGrpcClient(port=port).ping("foobar") == "foobar" | ||||
finally: | finally: | ||||
process.terminate() | process.terminate() | ||||
# indicating the working directory is empty fails | # indicating the working directory is empty fails | ||||
subprocess_args = [ | ipc_output_file = _get_ipc_output_file() | ||||
process = subprocess.Popen( | |||||
[ | |||||
"dagster", | "dagster", | ||||
"api", | "api", | ||||
"grpc", | "grpc", | ||||
"--port", | "--port", | ||||
str(port), | str(port), | ||||
"--python-file", | "--python-file", | ||||
python_file, | python_file, | ||||
"--empty-working-directory", | "--empty-working-directory", | ||||
] | "--ipc-output-file", | ||||
ipc_output_file, | |||||
process = subprocess.Popen( | ], | ||||
subprocess_args, | |||||
stdout=subprocess.PIPE, | stdout=subprocess.PIPE, | ||||
) | ) | ||||
try: | try: | ||||
with pytest.raises(Exception): | with pytest.raises(DagsterUserCodeProcessError): | ||||
wait_for_grpc_server( | wait_for_grpc_server(process, ipc_output_file) | ||||
process, DagsterGrpcClient(port=port, host="localhost"), subprocess_args | |||||
) | |||||
process.wait() | process.wait() | ||||
_, err = capfd.readouterr() | _, err = capfd.readouterr() | ||||
assert "No module named" in err | assert "No module named" in err | ||||
finally: | finally: | ||||
if process.poll() is None: | if process.poll() is None: | ||||
process.terminate() | process.terminate() | ||||
@pytest.mark.skipif(seven.IS_WINDOWS, reason="Crashes in subprocesses crash test runs on Windows") | @pytest.mark.skipif(seven.IS_WINDOWS, reason="Crashes in subprocesses crash test runs on Windows") | ||||
def test_crash_during_load(): | def test_crash_during_load(): | ||||
port = find_free_port() | port = find_free_port() | ||||
python_file = file_relative_path(__file__, "crashy_grpc_repo.py") | python_file = file_relative_path(__file__, "crashy_grpc_repo.py") | ||||
subprocess_args = [ | ipc_output_file = _get_ipc_output_file() | ||||
process = subprocess.Popen( | |||||
[ | |||||
"dagster", | "dagster", | ||||
"api", | "api", | ||||
"grpc", | "grpc", | ||||
"--port", | "--port", | ||||
str(port), | str(port), | ||||
"--python-file", | "--python-file", | ||||
python_file, | python_file, | ||||
] | "--ipc-output-file", | ||||
ipc_output_file, | |||||
process = subprocess.Popen( | ], | ||||
subprocess_args, | |||||
stdout=subprocess.PIPE, | stdout=subprocess.PIPE, | ||||
) | ) | ||||
try: | try: | ||||
with pytest.raises( | with pytest.raises( | ||||
Exception, | Exception, | ||||
match=re.escape( | match=re.escape("Process exited with return code 123 while waiting for events"), | ||||
'gRPC server exited with return code 123 while starting up with the command: "dagster api grpc --port' | |||||
), | |||||
): | ): | ||||
wait_for_grpc_server( | wait_for_grpc_server(process, ipc_output_file) | ||||
process, DagsterGrpcClient(port=port, host="localhost"), subprocess_args | |||||
) | |||||
finally: | finally: | ||||
if process.poll() is None: | if process.poll() is None: | ||||
process.terminate() | process.terminate() | ||||
def test_lazy_load_with_error(): | def test_lazy_load_with_error(): | ||||
port = find_free_port() | port = find_free_port() | ||||
python_file = file_relative_path(__file__, "grpc_repo_with_error.py") | python_file = file_relative_path(__file__, "grpc_repo_with_error.py") | ||||
subprocess_args = [ | ipc_output_file = _get_ipc_output_file() | ||||
process = subprocess.Popen( | |||||
[ | |||||
"dagster", | "dagster", | ||||
"api", | "api", | ||||
"grpc", | "grpc", | ||||
"--port", | "--port", | ||||
str(port), | str(port), | ||||
"--python-file", | "--python-file", | ||||
python_file, | python_file, | ||||
"--lazy-load-user-code", | "--lazy-load-user-code", | ||||
] | "--ipc-output-file", | ||||
ipc_output_file, | |||||
process = subprocess.Popen(subprocess_args, stdout=subprocess.PIPE) | ], | ||||
stdout=subprocess.PIPE, | |||||
) | |||||
try: | try: | ||||
wait_for_grpc_server( | wait_for_grpc_server(process, ipc_output_file) | ||||
process, DagsterGrpcClient(port=port, host="localhost"), subprocess_args | |||||
) | |||||
list_repositories_response = deserialize_json_to_dagster_namedtuple( | list_repositories_response = deserialize_json_to_dagster_namedtuple( | ||||
DagsterGrpcClient(port=port).list_repositories() | DagsterGrpcClient(port=port).list_repositories() | ||||
) | ) | ||||
assert isinstance(list_repositories_response, SerializableErrorInfo) | assert isinstance(list_repositories_response, SerializableErrorInfo) | ||||
assert "No module named" in list_repositories_response.message | assert "No module named" in list_repositories_response.message | ||||
finally: | finally: | ||||
process.terminate() | process.terminate() | ||||
def test_lazy_load_via_env_var(): | def test_lazy_load_via_env_var(): | ||||
with environ({"DAGSTER_CLI_API_GRPC_LAZY_LOAD_USER_CODE": "1"}): | with environ({"DAGSTER_CLI_API_GRPC_LAZY_LOAD_USER_CODE": "1"}): | ||||
port = find_free_port() | port = find_free_port() | ||||
python_file = file_relative_path(__file__, "grpc_repo_with_error.py") | python_file = file_relative_path(__file__, "grpc_repo_with_error.py") | ||||
subprocess_args = [ | ipc_output_file = _get_ipc_output_file() | ||||
process = subprocess.Popen( | |||||
[ | |||||
"dagster", | "dagster", | ||||
"api", | "api", | ||||
"grpc", | "grpc", | ||||
"--port", | "--port", | ||||
str(port), | str(port), | ||||
"--python-file", | "--python-file", | ||||
python_file, | python_file, | ||||
] | "--ipc-output-file", | ||||
ipc_output_file, | |||||
process = subprocess.Popen( | ], | ||||
subprocess_args, | |||||
stdout=subprocess.PIPE, | stdout=subprocess.PIPE, | ||||
) | ) | ||||
try: | try: | ||||
wait_for_grpc_server( | wait_for_grpc_server(process, ipc_output_file) | ||||
process, DagsterGrpcClient(port=port, host="localhost"), subprocess_args | |||||
) | |||||
list_repositories_response = deserialize_json_to_dagster_namedtuple( | list_repositories_response = deserialize_json_to_dagster_namedtuple( | ||||
DagsterGrpcClient(port=port).list_repositories() | DagsterGrpcClient(port=port).list_repositories() | ||||
) | ) | ||||
assert isinstance(list_repositories_response, SerializableErrorInfo) | assert isinstance(list_repositories_response, SerializableErrorInfo) | ||||
assert "No module named" in list_repositories_response.message | assert "No module named" in list_repositories_response.message | ||||
finally: | finally: | ||||
process.terminate() | process.terminate() | ||||
def test_streaming(): | def test_streaming(): | ||||
port = find_free_port() | port = find_free_port() | ||||
python_file = file_relative_path(__file__, "grpc_repo.py") | python_file = file_relative_path(__file__, "grpc_repo.py") | ||||
subprocess_args = [ | ipc_output_file = _get_ipc_output_file() | ||||
process = subprocess.Popen( | |||||
[ | |||||
"dagster", | "dagster", | ||||
"api", | "api", | ||||
"grpc", | "grpc", | ||||
"--port", | "--port", | ||||
str(port), | str(port), | ||||
"--python-file", | "--python-file", | ||||
python_file, | python_file, | ||||
] | "--ipc-output-file", | ||||
ipc_output_file, | |||||
process = subprocess.Popen( | ], | ||||
subprocess_args, | |||||
stdout=subprocess.PIPE, | stdout=subprocess.PIPE, | ||||
) | ) | ||||
try: | try: | ||||
wait_for_grpc_server( | wait_for_grpc_server(process, ipc_output_file) | ||||
process, DagsterGrpcClient(port=port, host="localhost"), subprocess_args | |||||
) | |||||
api_client = DagsterGrpcClient(port=port) | api_client = DagsterGrpcClient(port=port) | ||||
results = [result for result in api_client.streaming_ping(sequence_length=10, echo="foo")] | results = [result for result in api_client.streaming_ping(sequence_length=10, echo="foo")] | ||||
assert len(results) == 10 | assert len(results) == 10 | ||||
for sequence_number, result in enumerate(results): | for sequence_number, result in enumerate(results): | ||||
assert result["sequence_number"] == sequence_number | assert result["sequence_number"] == sequence_number | ||||
assert result["echo"] == "foo" | assert result["echo"] == "foo" | ||||
finally: | finally: | ||||
process.terminate() | process.terminate() | ||||
def test_sensor_timeout(): | def test_sensor_timeout(): | ||||
port = find_free_port() | port = find_free_port() | ||||
python_file = file_relative_path(__file__, "grpc_repo.py") | python_file = file_relative_path(__file__, "grpc_repo.py") | ||||
subprocess_args = [ | ipc_output_file = _get_ipc_output_file() | ||||
process = subprocess.Popen( | |||||
[ | |||||
"dagster", | "dagster", | ||||
"api", | "api", | ||||
"grpc", | "grpc", | ||||
"--port", | "--port", | ||||
str(port), | str(port), | ||||
"--python-file", | "--python-file", | ||||
python_file, | python_file, | ||||
] | "--ipc-output-file", | ||||
ipc_output_file, | |||||
process = subprocess.Popen( | ], | ||||
subprocess_args, | |||||
stdout=subprocess.PIPE, | stdout=subprocess.PIPE, | ||||
) | ) | ||||
try: | try: | ||||
wait_for_grpc_server( | wait_for_grpc_server(process, ipc_output_file) | ||||
process, DagsterGrpcClient(port=port, host="localhost"), subprocess_args | |||||
) | |||||
client = DagsterGrpcClient(port=port) | client = DagsterGrpcClient(port=port) | ||||
with instance_for_test() as instance: | with instance_for_test() as instance: | ||||
repo_origin = ExternalRepositoryOrigin( | repo_origin = ExternalRepositoryOrigin( | ||||
repository_location_origin=GrpcServerRepositoryLocationOrigin( | repository_location_origin=GrpcServerRepositoryLocationOrigin( | ||||
port=port, host="localhost" | port=port, host="localhost" | ||||
), | ), | ||||
repository_name="bar_repo", | repository_name="bar_repo", | ||||
Show All 27 Lines |