Changeset View
Changeset View
Standalone View
Standalone View
python_modules/libraries/dagster-k8s/dagster_k8s/job.py
import hashlib | import hashlib | ||||
import json | import json | ||||
import os | |||||
import random | import random | ||||
import string | import string | ||||
from collections import namedtuple | from collections import namedtuple | ||||
from typing import List | |||||
import kubernetes | import kubernetes | ||||
from dagster import Array, Field, Noneable, StringSource | from dagster import Array, Field, Noneable, StringSource | ||||
from dagster import __version__ as dagster_version | from dagster import __version__ as dagster_version | ||||
from dagster import check | from dagster import check | ||||
from dagster.config.field_utils import Permissive, Shape | from dagster.config.field_utils import Permissive, Shape | ||||
from dagster.config.validate import validate_config | from dagster.config.validate import validate_config | ||||
from dagster.core.errors import DagsterInvalidConfigError | from dagster.core.errors import DagsterInvalidConfigError | ||||
▲ Show 20 Lines • Show All 161 Lines • ▼ Show 20 Lines | def get_job_name_from_run_id(run_id): | ||||
return "dagster-run-{}".format(run_id) | return "dagster-run-{}".format(run_id) | ||||
@whitelist_for_serdes | @whitelist_for_serdes | ||||
class DagsterK8sJobConfig( | class DagsterK8sJobConfig( | ||||
namedtuple( | namedtuple( | ||||
"_K8sJobTaskConfig", | "_K8sJobTaskConfig", | ||||
"job_image dagster_home image_pull_policy image_pull_secrets service_account_name " | "job_image dagster_home image_pull_policy image_pull_secrets service_account_name " | ||||
"instance_config_map postgres_password_secret env_config_maps env_secrets volume_mounts", | "instance_config_map postgres_password_secret env_config_maps env_secrets env_vars " | ||||
"volume_mounts", | |||||
) | ) | ||||
): | ): | ||||
"""Configuration parameters for launching Dagster Jobs on Kubernetes. | """Configuration parameters for launching Dagster Jobs on Kubernetes. | ||||
Params: | Params: | ||||
dagster_home (str): The location of DAGSTER_HOME in the Job container; this is where the | dagster_home (str): The location of DAGSTER_HOME in the Job container; this is where the | ||||
``dagster.yaml`` file will be mounted from the instance ConfigMap specified here. | ``dagster.yaml`` file will be mounted from the instance ConfigMap specified here. | ||||
image_pull_policy (Optional[str]): Allows the image pull policy to be overridden, e.g. to | image_pull_policy (Optional[str]): Allows the image pull policy to be overridden, e.g. to | ||||
Show All 16 Lines | Params: | ||||
password can be retrieved. Will be mounted and supplied as an environment variable to | password can be retrieved. Will be mounted and supplied as an environment variable to | ||||
the Job Pod. | the Job Pod. | ||||
env_config_maps (Optional[List[str]]): A list of custom ConfigMapEnvSource names from which to | env_config_maps (Optional[List[str]]): A list of custom ConfigMapEnvSource names from which to | ||||
draw environment variables (using ``envFrom``) for the Job. Default: ``[]``. See: | draw environment variables (using ``envFrom``) for the Job. Default: ``[]``. See: | ||||
https://kubernetes.io/docs/tasks/inject-data-application/define-environment-variable-container/#define-an-environment-variable-for-a-container | https://kubernetes.io/docs/tasks/inject-data-application/define-environment-variable-container/#define-an-environment-variable-for-a-container | ||||
env_secrets (Optional[List[str]]): A list of custom Secret names from which to | env_secrets (Optional[List[str]]): A list of custom Secret names from which to | ||||
draw environment variables (using ``envFrom``) for the Job. Default: ``[]``. See: | draw environment variables (using ``envFrom``) for the Job. Default: ``[]``. See: | ||||
https://kubernetes.io/docs/tasks/inject-data-application/distribute-credentials-secure/#configure-all-key-value-pairs-in-a-secret-as-container-environment-variables | https://kubernetes.io/docs/tasks/inject-data-application/distribute-credentials-secure/#configure-all-key-value-pairs-in-a-secret-as-container-environment-variables | ||||
env_vars (Optional[List[str]]): A list of environment variables to inject into the Job. | |||||
Default: ``[]``. See: https://kubernetes.io/docs/tasks/inject-data-application/distribute-credentials-secure/#configure-all-key-value-pairs-in-a-secret-as-container-environment-variables | |||||
job_image (Optional[str]): The docker image to use. The Job container will be launched with this | job_image (Optional[str]): The docker image to use. The Job container will be launched with this | ||||
image. Should not be specified if using userDeployments. | image. Should not be specified if using userDeployments. | ||||
""" | """ | ||||
def __new__( | def __new__( | ||||
cls, | cls, | ||||
job_image=None, | job_image=None, | ||||
dagster_home=None, | dagster_home=None, | ||||
image_pull_policy=None, | image_pull_policy=None, | ||||
image_pull_secrets=None, | image_pull_secrets=None, | ||||
service_account_name=None, | service_account_name=None, | ||||
instance_config_map=None, | instance_config_map=None, | ||||
postgres_password_secret=None, | postgres_password_secret=None, | ||||
env_config_maps=None, | env_config_maps=None, | ||||
env_secrets=None, | env_secrets=None, | ||||
env_vars=None, | |||||
volume_mounts=None, | volume_mounts=None, | ||||
): | ): | ||||
return super(DagsterK8sJobConfig, cls).__new__( | return super(DagsterK8sJobConfig, cls).__new__( | ||||
cls, | cls, | ||||
job_image=check.opt_str_param(job_image, "job_image"), | job_image=check.opt_str_param(job_image, "job_image"), | ||||
dagster_home=check.opt_str_param( | dagster_home=check.opt_str_param( | ||||
dagster_home, "dagster_home", default=DAGSTER_HOME_DEFAULT | dagster_home, "dagster_home", default=DAGSTER_HOME_DEFAULT | ||||
), | ), | ||||
image_pull_policy=check.opt_str_param(image_pull_policy, "image_pull_policy"), | image_pull_policy=check.opt_str_param(image_pull_policy, "image_pull_policy"), | ||||
image_pull_secrets=check.opt_list_param( | image_pull_secrets=check.opt_list_param( | ||||
image_pull_secrets, "image_pull_secrets", of_type=dict | image_pull_secrets, "image_pull_secrets", of_type=dict | ||||
), | ), | ||||
service_account_name=check.opt_str_param(service_account_name, "service_account_name"), | service_account_name=check.opt_str_param(service_account_name, "service_account_name"), | ||||
instance_config_map=check.str_param(instance_config_map, "instance_config_map"), | instance_config_map=check.str_param(instance_config_map, "instance_config_map"), | ||||
postgres_password_secret=check.opt_str_param( | postgres_password_secret=check.opt_str_param( | ||||
postgres_password_secret, "postgres_password_secret" | postgres_password_secret, "postgres_password_secret" | ||||
), | ), | ||||
env_config_maps=check.opt_list_param(env_config_maps, "env_config_maps", of_type=str), | env_config_maps=check.opt_list_param(env_config_maps, "env_config_maps", of_type=str), | ||||
env_secrets=check.opt_list_param(env_secrets, "env_secrets", of_type=str), | env_secrets=check.opt_list_param(env_secrets, "env_secrets", of_type=str), | ||||
env_vars=check.opt_list_param(env_vars, "env_secrets", of_type=str), | |||||
volume_mounts=check.opt_list_param(volume_mounts, "volume_mounts"), | volume_mounts=check.opt_list_param(volume_mounts, "volume_mounts"), | ||||
) | ) | ||||
@classmethod | @classmethod | ||||
def config_type(cls): | def config_type(cls): | ||||
"""Combined config type which includes both run launcher and pipeline run config.""" | """Combined config type which includes both run launcher and pipeline run config.""" | ||||
cfg_run_launcher = DagsterK8sJobConfig.config_type_run_launcher() | cfg_run_launcher = DagsterK8sJobConfig.config_type_run_launcher() | ||||
cfg_pipeline_run = DagsterK8sJobConfig.config_type_pipeline_run() | cfg_pipeline_run = DagsterK8sJobConfig.config_type_pipeline_run() | ||||
▲ Show 20 Lines • Show All 68 Lines • ▼ Show 20 Lines | def config_type_pipeline_run(cls, default_image_pull_policy=None): | ||||
), | ), | ||||
"env_secrets": Field( | "env_secrets": Field( | ||||
Noneable(Array(StringSource)), | Noneable(Array(StringSource)), | ||||
is_required=False, | is_required=False, | ||||
description="A list of custom Secret names from which to draw environment " | description="A list of custom Secret names from which to draw environment " | ||||
"variables (using ``envFrom``) for the Job. Default: ``[]``. See:" | "variables (using ``envFrom``) for the Job. Default: ``[]``. See:" | ||||
"https://kubernetes.io/docs/tasks/inject-data-application/distribute-credentials-secure/#configure-all-key-value-pairs-in-a-secret-as-container-environment-variables", | "https://kubernetes.io/docs/tasks/inject-data-application/distribute-credentials-secure/#configure-all-key-value-pairs-in-a-secret-as-container-environment-variables", | ||||
), | ), | ||||
"env_vars": Field( | |||||
Noneable(Array(str)), | |||||
is_required=False, | |||||
description="A list of environment variables to inject into the Job. " | |||||
"Default: ``[]``. See: " | |||||
"https://kubernetes.io/docs/tasks/inject-data-application/distribute-credentials-secure/#configure-all-key-value-pairs-in-a-secret-as-container-environment-variables", | |||||
), | |||||
} | } | ||||
@property | @property | ||||
def env(self) -> List[kubernetes.client.V1EnvVar]: | |||||
return [ | |||||
kubernetes.client.V1EnvVar(name=key, value=os.getenv(key)) | |||||
for key in (self.env_vars or []) | |||||
] | |||||
@property | |||||
def env_from_sources(self): | def env_from_sources(self): | ||||
"""This constructs a list of env_from sources. Along with a default base environment | """This constructs a list of env_from sources. Along with a default base environment | ||||
config map which we always load, the ConfigMaps and Secrets specified via | config map which we always load, the ConfigMaps and Secrets specified via | ||||
env_config_maps and env_secrets will be pulled into the job construction here. | env_config_maps and env_secrets will be pulled into the job construction here. | ||||
""" | """ | ||||
config_maps = [ | config_maps = [ | ||||
kubernetes.client.V1EnvFromSource( | kubernetes.client.V1EnvFromSource( | ||||
config_map_ref=kubernetes.client.V1ConfigMapEnvSource(name=config_map) | config_map_ref=kubernetes.client.V1ConfigMapEnvSource(name=config_map) | ||||
▲ Show 20 Lines • Show All 103 Lines • ▼ Show 20 Lines | if env_vars: | ||||
for key, value in env_vars.items(): | for key, value in env_vars.items(): | ||||
additional_k8s_env_vars.append(kubernetes.client.V1EnvVar(name=key, value=value)) | additional_k8s_env_vars.append(kubernetes.client.V1EnvVar(name=key, value=value)) | ||||
job_container = kubernetes.client.V1Container( | job_container = kubernetes.client.V1Container( | ||||
name=job_name, | name=job_name, | ||||
image=job_config.job_image, | image=job_config.job_image, | ||||
args=args, | args=args, | ||||
image_pull_policy=job_config.image_pull_policy, | image_pull_policy=job_config.image_pull_policy, | ||||
env=env + additional_k8s_env_vars, | env=env + job_config.env + additional_k8s_env_vars, | ||||
env_from=job_config.env_from_sources, | env_from=job_config.env_from_sources, | ||||
volume_mounts=[ | volume_mounts=[ | ||||
kubernetes.client.V1VolumeMount( | kubernetes.client.V1VolumeMount( | ||||
name="dagster-instance", | name="dagster-instance", | ||||
mount_path="{dagster_home}/dagster.yaml".format( | mount_path="{dagster_home}/dagster.yaml".format( | ||||
dagster_home=job_config.dagster_home | dagster_home=job_config.dagster_home | ||||
), | ), | ||||
sub_path="dagster.yaml", | sub_path="dagster.yaml", | ||||
▲ Show 20 Lines • Show All 95 Lines • Show Last 20 Lines |