Differential D4718 Diff 24113 integration_tests/python_modules/dagster-k8s-test-infra/dagster_k8s_test_infra/helm.py
Changeset View
Changeset View
Standalone View
Standalone View
integration_tests/python_modules/dagster-k8s-test-infra/dagster_k8s_test_infra/helm.py
Show First 20 Lines • Show All 58 Lines • ▼ Show 20 Lines | |||||
@pytest.fixture(scope="session") | @pytest.fixture(scope="session") | ||||
def helm_namespace( | def helm_namespace( | ||||
cluster_provider, request | cluster_provider, request | ||||
): # pylint: disable=unused-argument, redefined-outer-name | ): # pylint: disable=unused-argument, redefined-outer-name | ||||
yield _helm_namespace_helper(helm_chart, request) | yield _helm_namespace_helper(helm_chart, request) | ||||
@pytest.fixture(scope="session") | |||||
def helm_namespace_for_k8s_run_launcher( | |||||
cluster_provider, request | |||||
): # pylint: disable=unused-argument, redefined-outer-name | |||||
yield _helm_namespace_helper(helm_chart_for_k8s_run_launcher, request) | |||||
@contextmanager | @contextmanager | ||||
def test_namespace(should_cleanup=True): | def test_namespace(should_cleanup=True): | ||||
# Will be something like dagster-test-3fcd70 to avoid ns collisions in shared test environment | # Will be something like dagster-test-3fcd70 to avoid ns collisions in shared test environment | ||||
namespace = get_test_namespace() | namespace = get_test_namespace() | ||||
print("--- \033[32m:k8s: Creating test namespace %s\033[0m" % namespace) | print("--- \033[32m:k8s: Creating test namespace %s\033[0m" % namespace) | ||||
kube_api = kubernetes.client.CoreV1Api() | kube_api = kubernetes.client.CoreV1Api() | ||||
▲ Show 20 Lines • Show All 96 Lines • ▼ Show 20 Lines | try: | ||||
while dagit_pod is None: | while dagit_pod is None: | ||||
pods = kube_api.list_namespaced_pod(namespace=namespace) | pods = kube_api.list_namespaced_pod(namespace=namespace) | ||||
pod_names = [p.metadata.name for p in pods.items if "dagit" in p.metadata.name] | pod_names = [p.metadata.name for p in pods.items if "dagit" in p.metadata.name] | ||||
if pod_names: | if pod_names: | ||||
dagit_pod = pod_names[0] | dagit_pod = pod_names[0] | ||||
time.sleep(1) | time.sleep(1) | ||||
# Wait for Celery worker queues to become ready | # Wait for Celery worker queues to become ready | ||||
print("Waiting for celery workers") | |||||
pods = kubernetes.client.CoreV1Api().list_namespaced_pod(namespace=namespace) | pods = kubernetes.client.CoreV1Api().list_namespaced_pod(namespace=namespace) | ||||
pod_names = [p.metadata.name for p in pods.items if "celery-workers" in p.metadata.name] | pod_names = [p.metadata.name for p in pods.items if "celery-workers" in p.metadata.name] | ||||
if helm_config.get("celery", {}).get("enabled"): | |||||
print("Waiting for celery workers") | |||||
for pod_name in pod_names: | for pod_name in pod_names: | ||||
print("Waiting for Celery worker pod %s" % pod_name) | print("Waiting for Celery worker pod %s" % pod_name) | ||||
wait_for_pod(pod_name, namespace=namespace) | wait_for_pod(pod_name, namespace=namespace) | ||||
else: | |||||
assert ( | |||||
len(pod_names) == 0 | |||||
), "celery-worker pods {pod_names} exists when celery is not enabled.".format( | |||||
pod_names=pod_names | |||||
) | |||||
if helm_config.get("userDeployments") and helm_config.get("userDeployments", {}).get( | if helm_config.get("userDeployments") and helm_config.get("userDeployments", {}).get( | ||||
"enabled" | "enabled" | ||||
): | ): | ||||
# Wait for user code deployments to be ready | # Wait for user code deployments to be ready | ||||
print("Waiting for user code deployments") | print("Waiting for user code deployments") | ||||
pods = kubernetes.client.CoreV1Api().list_namespaced_pod(namespace=namespace) | pods = kubernetes.client.CoreV1Api().list_namespaced_pod(namespace=namespace) | ||||
pod_names = [ | pod_names = [ | ||||
▲ Show 20 Lines • Show All 49 Lines • ▼ Show 20 Lines | helm_config = { | ||||
}, | }, | ||||
"startupProbe": { | "startupProbe": { | ||||
"tcpSocket": {"port": "flower"}, | "tcpSocket": {"port": "flower"}, | ||||
"failureThreshold": 6, | "failureThreshold": 6, | ||||
"periodSeconds": 10, | "periodSeconds": 10, | ||||
}, | }, | ||||
}, | }, | ||||
"celery": { | "celery": { | ||||
"enabled": True, | |||||
"image": {"repository": repository, "tag": tag, "pullPolicy": pull_policy}, | "image": {"repository": repository, "tag": tag, "pullPolicy": pull_policy}, | ||||
# https://github.com/dagster-io/dagster/issues/2671 | # https://github.com/dagster-io/dagster/issues/2671 | ||||
# 'extraWorkerQueues': [{'name': 'extra-queue-1', 'replicaCount': 1},], | # 'extraWorkerQueues': [{'name': 'extra-queue-1', 'replicaCount': 1},], | ||||
"livenessProbe": { | "livenessProbe": { | ||||
"initialDelaySeconds": 15, | "initialDelaySeconds": 15, | ||||
"periodSeconds": 10, | "periodSeconds": 10, | ||||
"timeoutSeconds": 10, | "timeoutSeconds": 10, | ||||
"successThreshold": 1, | "successThreshold": 1, | ||||
"failureThreshold": 3, | "failureThreshold": 3, | ||||
}, | }, | ||||
}, | }, | ||||
"scheduler": {"k8sEnabled": "true", "schedulerNamespace": namespace}, | "scheduler": {"k8sEnabled": "true", "schedulerNamespace": namespace}, | ||||
"serviceAccount": {"name": "dagit-admin"}, | "serviceAccount": {"name": "dagit-admin"}, | ||||
"postgresqlPassword": "test", | "postgresqlPassword": "test", | ||||
"postgresqlDatabase": "test", | "postgresqlDatabase": "test", | ||||
"postgresqlUser": "test", | "postgresqlUser": "test", | ||||
} | } | ||||
with _helm_chart_helper(namespace, should_cleanup, helm_config): | with _helm_chart_helper(namespace, should_cleanup, helm_config): | ||||
yield | yield | ||||
@contextmanager | @contextmanager | ||||
def helm_chart_for_k8s_run_launcher(namespace, docker_image, should_cleanup=True): | |||||
check.str_param(namespace, "namespace") | |||||
check.str_param(docker_image, "docker_image") | |||||
check.bool_param(should_cleanup, "should_cleanup") | |||||
repository, tag = docker_image.split(":") | |||||
pull_policy = image_pull_policy() | |||||
helm_config = { | |||||
"dagit": { | |||||
"image": {"repository": repository, "tag": tag, "pullPolicy": pull_policy}, | |||||
"env": {"TEST_SET_ENV_VAR": "test_dagit_env_var"}, | |||||
"env_config_maps": [TEST_CONFIGMAP_NAME], | |||||
"env_secrets": [TEST_SECRET_NAME], | |||||
"livenessProbe": { | |||||
"tcpSocket": {"port": "http"}, | |||||
"periodSeconds": 20, | |||||
"failureThreshold": 3, | |||||
}, | |||||
"startupProbe": { | |||||
"tcpSocket": {"port": "http"}, | |||||
"failureThreshold": 6, | |||||
"periodSeconds": 10, | |||||
}, | |||||
}, | |||||
"celery": {"enabled": False}, | |||||
"scheduler": {"k8sEnabled": "true", "schedulerNamespace": namespace}, | |||||
"serviceAccount": {"name": "dagit-admin"}, | |||||
"postgresqlPassword": "test", | |||||
"postgresqlDatabase": "test", | |||||
"postgresqlUser": "test", | |||||
} | |||||
with _helm_chart_helper(namespace, should_cleanup, helm_config): | |||||
yield | |||||
@contextmanager | |||||
def helm_chart_for_user_deployments(namespace, docker_image, should_cleanup=True): | def helm_chart_for_user_deployments(namespace, docker_image, should_cleanup=True): | ||||
check.str_param(namespace, "namespace") | check.str_param(namespace, "namespace") | ||||
check.str_param(docker_image, "docker_image") | check.str_param(docker_image, "docker_image") | ||||
check.bool_param(should_cleanup, "should_cleanup") | check.bool_param(should_cleanup, "should_cleanup") | ||||
repository, tag = docker_image.split(":") | repository, tag = docker_image.split(":") | ||||
pull_policy = image_pull_policy() | pull_policy = image_pull_policy() | ||||
helm_config = { | helm_config = { | ||||
Show All 37 Lines | helm_config = { | ||||
}, | }, | ||||
"startupProbe": { | "startupProbe": { | ||||
"tcpSocket": {"port": "flower"}, | "tcpSocket": {"port": "flower"}, | ||||
"failureThreshold": 6, | "failureThreshold": 6, | ||||
"periodSeconds": 10, | "periodSeconds": 10, | ||||
}, | }, | ||||
}, | }, | ||||
"celery": { | "celery": { | ||||
"enabled": True, | |||||
"image": {"repository": repository, "tag": tag, "pullPolicy": pull_policy}, | "image": {"repository": repository, "tag": tag, "pullPolicy": pull_policy}, | ||||
# https://github.com/dagster-io/dagster/issues/2671 | # https://github.com/dagster-io/dagster/issues/2671 | ||||
# 'extraWorkerQueues': [{'name': 'extra-queue-1', 'replicaCount': 1},], | # 'extraWorkerQueues': [{'name': 'extra-queue-1', 'replicaCount': 1},], | ||||
"livenessProbe": { | "livenessProbe": { | ||||
"initialDelaySeconds": 15, | "initialDelaySeconds": 15, | ||||
"periodSeconds": 10, | "periodSeconds": 10, | ||||
"timeoutSeconds": 10, | "timeoutSeconds": 10, | ||||
"successThreshold": 1, | "successThreshold": 1, | ||||
Show All 16 Lines |