Differential D4743 Diff 24684 integration_tests/test_suites/celery-k8s-integration-test-suite/test_user_code_deployments.py
Changeset View
Changeset View
Standalone View
Standalone View
integration_tests/test_suites/celery-k8s-integration-test-suite/test_user_code_deployments.py
import datetime | import datetime | ||||
import json | import json | ||||
import sys | import sys | ||||
import kubernetes | import kubernetes | ||||
import pytest | import pytest | ||||
from dagster.seven import get_current_datetime_in_utc | |||||
from dagster_k8s.test import wait_for_job_and_get_raw_logs | from dagster_k8s.test import wait_for_job_and_get_raw_logs | ||||
from kubernetes.stream import stream | from kubernetes.stream import stream | ||||
from marks import mark_user_code_deployment | from marks import mark_user_code_deployment | ||||
# This test spins up a user code deployment, and then executes a launch pipeline command in the | # This test spins up a user code deployment, and then executes a launch pipeline command in the | ||||
# dagit pod to trigger a pipeline run | # dagit pod to trigger a pipeline run | ||||
@mark_user_code_deployment | @mark_user_code_deployment | ||||
▲ Show 20 Lines • Show All 61 Lines • ▼ Show 20 Lines | stream( | ||||
stdin=False, | stdin=False, | ||||
stdout=True, | stdout=True, | ||||
tty=False, | tty=False, | ||||
_preload_content=False, | _preload_content=False, | ||||
) | ) | ||||
runmaster_job_name = None | runmaster_job_name = None | ||||
timeout = datetime.timedelta(0, 90) | timeout = datetime.timedelta(0, 90) | ||||
start_time = datetime.datetime.now() | start_time = get_current_datetime_in_utc() | ||||
while datetime.datetime.now() < start_time + timeout and not runmaster_job_name: | while get_current_datetime_in_utc() < start_time + timeout and not runmaster_job_name: | ||||
jobs = batch_api.list_namespaced_job(namespace=namespace) | jobs = batch_api.list_namespaced_job(namespace=namespace) | ||||
runmaster_job_list = list( | runmaster_job_list = list( | ||||
filter(lambda item: "dagster-run-" in item.metadata.name, jobs.items) | filter(lambda item: "dagster-run-" in item.metadata.name, jobs.items) | ||||
) | ) | ||||
if len(runmaster_job_list) > 0: | if len(runmaster_job_list) > 0: | ||||
runmaster_job_name = runmaster_job_list[0].metadata.name | runmaster_job_name = runmaster_job_list[0].metadata.name | ||||
assert runmaster_job_name | assert runmaster_job_name | ||||
result = wait_for_job_and_get_raw_logs(job_name=runmaster_job_name, namespace=namespace) | result = wait_for_job_and_get_raw_logs(job_name=runmaster_job_name, namespace=namespace) | ||||
assert "PIPELINE_SUCCESS" in result, "no match, result: {}".format(result) | assert "PIPELINE_SUCCESS" in result, "no match, result: {}".format(result) |