Differential D8637 Diff 40863 python_modules/libraries/dagster-aws/dagster_aws_tests/ecs_tests/test_launcher.py
Changeset View
Changeset View
Standalone View
Standalone View
python_modules/libraries/dagster-aws/dagster_aws_tests/ecs_tests/test_launcher.py
# pylint: disable=redefined-outer-name, protected-access, unused-argument | # pylint: disable=redefined-outer-name, protected-access, unused-argument | ||||
from contextlib import contextmanager | |||||
import boto3 | import boto3 | ||||
import pytest | import pytest | ||||
from dagster.core.definitions.reconstructable import ReconstructableRepository | from dagster.core.definitions.reconstructable import ReconstructableRepository | ||||
from dagster.core.host_representation.origin import InProcessRepositoryLocationOrigin | from dagster.core.host_representation.origin import InProcessRepositoryLocationOrigin | ||||
from dagster.core.test_utils import instance_for_test | from dagster.core.test_utils import instance_for_test | ||||
from . import repo | from . import repo | ||||
▲ Show 20 Lines • Show All 43 Lines • ▼ Show 20 Lines | def stub_aws(ecs, ec2, monkeypatch): | ||||
# This only works because our launcher happens to use a client for ecs and | # This only works because our launcher happens to use a client for ecs and | ||||
# a resource for ec2 - if that were to change or if new aws objects were to | # a resource for ec2 - if that were to change or if new aws objects were to | ||||
# be introduced, this fixture would need to be refactored. | # be introduced, this fixture would need to be refactored. | ||||
monkeypatch.setattr(boto3, "client", lambda *args, **kwargs: ecs) | monkeypatch.setattr(boto3, "client", lambda *args, **kwargs: ecs) | ||||
monkeypatch.setattr(boto3, "resource", lambda *args, **kwargs: ec2) | monkeypatch.setattr(boto3, "resource", lambda *args, **kwargs: ec2) | ||||
@pytest.fixture | @pytest.fixture | ||||
def instance(stub_aws, task, monkeypatch, requests_mock): | def stub_ecs_metadata(task, monkeypatch, requests_mock): | ||||
dgibson: name's a bit vague, what kind of metadata? | |||||
container_uri = "http://metadata_host" | container_uri = "http://metadata_host" | ||||
monkeypatch.setenv("ECS_CONTAINER_METADATA_URI_V4", container_uri) | monkeypatch.setenv("ECS_CONTAINER_METADATA_URI_V4", container_uri) | ||||
container = task["containers"][0]["name"] | container = task["containers"][0]["name"] | ||||
requests_mock.get(container_uri, json={"Name": container}) | requests_mock.get(container_uri, json={"Name": container}) | ||||
task_uri = container_uri + "/task" | task_uri = container_uri + "/task" | ||||
requests_mock.get( | requests_mock.get( | ||||
task_uri, | task_uri, | ||||
json={ | json={ | ||||
"Cluster": task["clusterArn"], | "Cluster": task["clusterArn"], | ||||
"TaskARN": task["taskArn"], | "TaskARN": task["taskArn"], | ||||
}, | }, | ||||
) | ) | ||||
overrides = {"run_launcher": {"module": "dagster_aws.ecs", "class": "EcsRunLauncher"}} | |||||
with instance_for_test(overrides) as instance: | |||||
yield instance | @pytest.fixture | ||||
def instance_cm(stub_aws, stub_ecs_metadata): | |||||
@contextmanager | |||||
def cm(config=None): | |||||
overrides = { | |||||
"run_launcher": { | |||||
"module": "dagster_aws.ecs", | |||||
"class": "EcsRunLauncher", | |||||
"config": {**(config or {})}, | |||||
} | |||||
} | |||||
with instance_for_test(overrides) as dagster_instance: | |||||
yield dagster_instance | |||||
return cm | |||||
Not Done Inline Actionsthis pattern confuses me / seems wrong at first glance? You have delayed cleaning up the tempdir, but you haven't delayed cleaning up the instance, which also does cleanup / shuts things down besides the tempdir. In general its not safe to use an instance (or a contextmanager object in general) after it has gone through its exit method, which seems like it will happen once the instance is returned Maybe there's some fixture magic that i'm not seeing or misunderstanding? dgibson: this pattern confuses me / seems wrong at first glance? You have delayed cleaning up the… | |||||
@pytest.fixture | |||||
def instance(instance_cm): | |||||
with instance_cm() as dagster_instance: | |||||
yield dagster_instance | |||||
@pytest.fixture | @pytest.fixture | ||||
def pipeline(): | def pipeline(): | ||||
return repo.pipeline | return repo.pipeline | ||||
@pytest.fixture | @pytest.fixture | ||||
def external_pipeline(image): | def external_pipeline(image): | ||||
with InProcessRepositoryLocationOrigin( | with InProcessRepositoryLocationOrigin( | ||||
ReconstructableRepository.for_file( | ReconstructableRepository.for_file( | ||||
repo.__file__, repo.repository.__name__, container_image=image | repo.__file__, repo.repository.__name__, container_image=image | ||||
), | ), | ||||
).create_location() as location: | ).create_location() as location: | ||||
yield location.get_repository(repo.repository.__name__).get_full_external_pipeline( | yield location.get_repository(repo.repository.__name__).get_full_external_pipeline( | ||||
repo.pipeline.__name__ | repo.pipeline.__name__ | ||||
) | ) | ||||
@pytest.fixture | |||||
def run(instance, pipeline): | |||||
return instance.create_run_for_pipeline(pipeline) | |||||
def test_launching( | def test_launching( | ||||
ecs, instance, run, external_pipeline, subnet, network_interface, image, environment | ecs, instance, pipeline, external_pipeline, subnet, network_interface, image, environment | ||||
): | ): | ||||
run = instance.create_run_for_pipeline(pipeline) | |||||
assert not run.tags | assert not run.tags | ||||
initial_task_definitions = ecs.list_task_definitions()["taskDefinitionArns"] | initial_task_definitions = ecs.list_task_definitions()["taskDefinitionArns"] | ||||
initial_tasks = ecs.list_tasks()["taskArns"] | initial_tasks = ecs.list_tasks()["taskArns"] | ||||
instance.launch_run(run.run_id, external_pipeline) | instance.launch_run(run.run_id, external_pipeline) | ||||
# A new task definition is created | # A new task definition is created | ||||
task_definitions = ecs.list_task_definitions()["taskDefinitionArns"] | task_definitions = ecs.list_task_definitions()["taskDefinitionArns"] | ||||
assert len(task_definitions) == len(initial_task_definitions) + 1 | assert len(task_definitions) == len(initial_task_definitions) + 1 | ||||
Show All 31 Lines | ): | ||||
overrides = task["overrides"]["containerOverrides"] | overrides = task["overrides"]["containerOverrides"] | ||||
assert len(overrides) == 1 | assert len(overrides) == 1 | ||||
override = overrides[0] | override = overrides[0] | ||||
assert override["name"] == "run" | assert override["name"] == "run" | ||||
assert "execute_run" in override["command"] | assert "execute_run" in override["command"] | ||||
assert run.run_id in str(override["command"]) | assert run.run_id in str(override["command"]) | ||||
def test_termination(instance, run, external_pipeline): | def test_termination(instance, pipeline, external_pipeline): | ||||
run = instance.create_run_for_pipeline(pipeline) | |||||
assert not instance.run_launcher.can_terminate(run.run_id) | assert not instance.run_launcher.can_terminate(run.run_id) | ||||
instance.launch_run(run.run_id, external_pipeline) | instance.launch_run(run.run_id, external_pipeline) | ||||
assert instance.run_launcher.can_terminate(run.run_id) | assert instance.run_launcher.can_terminate(run.run_id) | ||||
assert instance.run_launcher.terminate(run.run_id) | assert instance.run_launcher.terminate(run.run_id) | ||||
assert not instance.run_launcher.can_terminate(run.run_id) | assert not instance.run_launcher.can_terminate(run.run_id) | ||||
assert not instance.run_launcher.terminate(run.run_id) | assert not instance.run_launcher.terminate(run.run_id) |
name's a bit vague, what kind of metadata?