Changeset View
Changeset View
Standalone View
Standalone View
python_modules/libraries/dagster-k8s/dagster_k8s/launcher.py
Show First 20 Lines • Show All 80 Lines • ▼ Show 20 Lines | Args: | ||||
other Kubernetes resources the Job requires (such as the service account) must be | other Kubernetes resources the Job requires (such as the service account) must be | ||||
present in this namespace. Default: ``"default"`` | present in this namespace. Default: ``"default"`` | ||||
env_config_maps (Optional[List[str]]): A list of custom ConfigMapEnvSource names from which to | env_config_maps (Optional[List[str]]): A list of custom ConfigMapEnvSource names from which to | ||||
draw environment variables (using ``envFrom``) for the Job. Default: ``[]``. See: | draw environment variables (using ``envFrom``) for the Job. Default: ``[]``. See: | ||||
https://kubernetes.io/docs/tasks/inject-data-application/define-environment-variable-container/#define-an-environment-variable-for-a-container | https://kubernetes.io/docs/tasks/inject-data-application/define-environment-variable-container/#define-an-environment-variable-for-a-container | ||||
env_secrets (Optional[List[str]]): A list of custom Secret names from which to | env_secrets (Optional[List[str]]): A list of custom Secret names from which to | ||||
draw environment variables (using ``envFrom``) for the Job. Default: ``[]``. See: | draw environment variables (using ``envFrom``) for the Job. Default: ``[]``. See: | ||||
https://kubernetes.io/docs/tasks/inject-data-application/distribute-credentials-secure/#configure-all-key-value-pairs-in-a-secret-as-container-environment-variables | https://kubernetes.io/docs/tasks/inject-data-application/distribute-credentials-secure/#configure-all-key-value-pairs-in-a-secret-as-container-environment-variables | ||||
env_vars (Optional[List[str]]): A list of environment variables to inject into the Job. | |||||
Default: ``[]``. See: https://kubernetes.io/docs/tasks/inject-data-application/distribute-credentials-secure/#configure-all-key-value-pairs-in-a-secret-as-container-environment-variables | |||||
""" | """ | ||||
def __init__( | def __init__( | ||||
self, | self, | ||||
service_account_name, | service_account_name, | ||||
instance_config_map, | instance_config_map, | ||||
postgres_password_secret=None, | postgres_password_secret=None, | ||||
dagster_home=None, | dagster_home=None, | ||||
job_image=None, | job_image=None, | ||||
image_pull_policy=None, | image_pull_policy=None, | ||||
image_pull_secrets=None, | image_pull_secrets=None, | ||||
load_incluster_config=True, | load_incluster_config=True, | ||||
kubeconfig_file=None, | kubeconfig_file=None, | ||||
inst_data=None, | inst_data=None, | ||||
job_namespace="default", | job_namespace="default", | ||||
env_config_maps=None, | env_config_maps=None, | ||||
env_secrets=None, | env_secrets=None, | ||||
env_vars=None, | |||||
k8s_client_batch_api=None, | k8s_client_batch_api=None, | ||||
): | ): | ||||
self._inst_data = check.opt_inst_param(inst_data, "inst_data", ConfigurableClassData) | self._inst_data = check.opt_inst_param(inst_data, "inst_data", ConfigurableClassData) | ||||
self.job_namespace = check.str_param(job_namespace, "job_namespace") | self.job_namespace = check.str_param(job_namespace, "job_namespace") | ||||
self.load_incluster_config = load_incluster_config | self.load_incluster_config = load_incluster_config | ||||
self.kubeconfig_file = kubeconfig_file | self.kubeconfig_file = kubeconfig_file | ||||
if load_incluster_config: | if load_incluster_config: | ||||
Show All 21 Lines | ): | ||||
self.instance_config_map = check.str_param(instance_config_map, "instance_config_map") | self.instance_config_map = check.str_param(instance_config_map, "instance_config_map") | ||||
self.postgres_password_secret = check.opt_str_param( | self.postgres_password_secret = check.opt_str_param( | ||||
postgres_password_secret, "postgres_password_secret" | postgres_password_secret, "postgres_password_secret" | ||||
) | ) | ||||
self._env_config_maps = check.opt_list_param( | self._env_config_maps = check.opt_list_param( | ||||
env_config_maps, "env_config_maps", of_type=str | env_config_maps, "env_config_maps", of_type=str | ||||
) | ) | ||||
self._env_secrets = check.opt_list_param(env_secrets, "env_secrets", of_type=str) | self._env_secrets = check.opt_list_param(env_secrets, "env_secrets", of_type=str) | ||||
self._env_vars = check.opt_list_param(env_vars, "env_vars", of_type=str) | |||||
super().__init__() | super().__init__() | ||||
@property | @property | ||||
def image_pull_policy(self): | def image_pull_policy(self): | ||||
return self._image_pull_policy | return self._image_pull_policy | ||||
@property | @property | ||||
▲ Show 20 Lines • Show All 57 Lines • ▼ Show 20 Lines | def get_static_job_config(self): | ||||
), | ), | ||||
postgres_password_secret=check.opt_str_param( | postgres_password_secret=check.opt_str_param( | ||||
self.postgres_password_secret, "postgres_password_secret" | self.postgres_password_secret, "postgres_password_secret" | ||||
), | ), | ||||
env_config_maps=check.opt_list_param( | env_config_maps=check.opt_list_param( | ||||
self._env_config_maps, "env_config_maps", of_type=str | self._env_config_maps, "env_config_maps", of_type=str | ||||
), | ), | ||||
env_secrets=check.opt_list_param(self._env_secrets, "env_secrets", of_type=str), | env_secrets=check.opt_list_param(self._env_secrets, "env_secrets", of_type=str), | ||||
env_vars=check.opt_list_param(self._env_vars, "env_vars", of_type=str), | |||||
) | ) | ||||
return self._job_config | return self._job_config | ||||
def _get_grpc_job_config(self, job_image): | def _get_grpc_job_config(self, job_image): | ||||
return DagsterK8sJobConfig( | return DagsterK8sJobConfig( | ||||
job_image=check.str_param(job_image, "job_image"), | job_image=check.str_param(job_image, "job_image"), | ||||
dagster_home=check.str_param(self.dagster_home, "dagster_home"), | dagster_home=check.str_param(self.dagster_home, "dagster_home"), | ||||
image_pull_policy=check.str_param(self._image_pull_policy, "image_pull_policy"), | image_pull_policy=check.str_param(self._image_pull_policy, "image_pull_policy"), | ||||
image_pull_secrets=check.opt_list_param( | image_pull_secrets=check.opt_list_param( | ||||
self._image_pull_secrets, "image_pull_secrets", of_type=dict | self._image_pull_secrets, "image_pull_secrets", of_type=dict | ||||
), | ), | ||||
service_account_name=check.str_param( | service_account_name=check.str_param( | ||||
self._service_account_name, "service_account_name" | self._service_account_name, "service_account_name" | ||||
), | ), | ||||
instance_config_map=check.str_param(self.instance_config_map, "instance_config_map"), | instance_config_map=check.str_param(self.instance_config_map, "instance_config_map"), | ||||
postgres_password_secret=check.opt_str_param( | postgres_password_secret=check.opt_str_param( | ||||
self.postgres_password_secret, "postgres_password_secret" | self.postgres_password_secret, "postgres_password_secret" | ||||
), | ), | ||||
env_config_maps=check.opt_list_param( | env_config_maps=check.opt_list_param( | ||||
self._env_config_maps, "env_config_maps", of_type=str | self._env_config_maps, "env_config_maps", of_type=str | ||||
), | ), | ||||
env_secrets=check.opt_list_param(self._env_secrets, "env_secrets", of_type=str), | env_secrets=check.opt_list_param(self._env_secrets, "env_secrets", of_type=str), | ||||
env_vars=check.opt_list_param(self._env_vars, "env_vars", of_type=str), | |||||
) | ) | ||||
def launch_run(self, context: LaunchRunContext) -> None: | def launch_run(self, context: LaunchRunContext) -> None: | ||||
run = context.pipeline_run | run = context.pipeline_run | ||||
job_name = "dagster-run-{}".format(run.run_id) | job_name = "dagster-run-{}".format(run.run_id) | ||||
pod_name = job_name | pod_name = job_name | ||||
▲ Show 20 Lines • Show All 119 Lines • Show Last 20 Lines |