Changeset View
Changeset View
Standalone View
Standalone View
python_modules/libraries/dagster-k8s/dagster_k8s/executor.py
import os | import os | ||||
import kubernetes | import kubernetes | ||||
from dagster import Field, StringSource, executor | from dagster import Field, StringSource, executor | ||||
from dagster.core.definitions.executor import multiple_process_executor_requirements | from dagster.core.definitions.executor import multiple_process_executor_requirements | ||||
from dagster.core.events import DagsterEvent, DagsterEventType | from dagster.core.events import DagsterEvent, DagsterEventType, EngineEventData, EventMetadataEntry | ||||
from dagster.core.execution.plan.objects import StepFailureData | from dagster.core.execution.plan.objects import StepFailureData | ||||
from dagster.core.execution.retries import get_retries_config | from dagster.core.execution.retries import get_retries_config | ||||
from dagster.core.executor.base import Executor | from dagster.core.executor.base import Executor | ||||
from dagster.core.executor.init import InitExecutorContext | from dagster.core.executor.init import InitExecutorContext | ||||
from dagster.core.executor.step_delegating import StepDelegatingExecutor | from dagster.core.executor.step_delegating import StepDelegatingExecutor | ||||
from dagster.core.executor.step_delegating.step_handler import StepHandler | from dagster.core.executor.step_delegating.step_handler import StepHandler | ||||
from dagster.core.executor.step_delegating.step_handler.base import StepHandlerContext | from dagster.core.executor.step_delegating.step_handler.base import StepHandlerContext | ||||
from dagster.serdes.serdes import serialize_dagster_namedtuple | from dagster.serdes.serdes import serialize_dagster_namedtuple | ||||
from dagster.utils import frozentags, merge_dicts | from dagster.utils import frozentags, merge_dicts | ||||
from dagster.utils.backcompat import experimental | |||||
from .job import ( | from .job import ( | ||||
DagsterK8sJobConfig, | DagsterK8sJobConfig, | ||||
construct_dagster_k8s_job, | construct_dagster_k8s_job, | ||||
get_k8s_job_name, | get_k8s_job_name, | ||||
get_user_defined_k8s_config, | get_user_defined_k8s_config, | ||||
) | ) | ||||
from .utils import delete_job | from .utils import delete_job | ||||
@experimental | |||||
class K8sStepHandler(StepHandler): | class K8sStepHandler(StepHandler): | ||||
@property | @property | ||||
def name(self): | def name(self): | ||||
return "K8sStepHandler" | return "K8sStepHandler" | ||||
def __init__( | def __init__( | ||||
self, | self, | ||||
job_config: DagsterK8sJobConfig, | job_config: DagsterK8sJobConfig, | ||||
job_namespace: str, | job_namespace: str, | ||||
): | ): | ||||
super().__init__() | super().__init__() | ||||
self._job_config = job_config | self._job_config = job_config | ||||
self._job_namespace = job_namespace | self._job_namespace = job_namespace | ||||
def launch_step(self, step_handler_context: StepHandlerContext): | def launch_step(self, step_handler_context: StepHandlerContext): | ||||
events = [] | |||||
assert ( | assert ( | ||||
len(step_handler_context.execute_step_args.step_keys_to_execute) == 1 | len(step_handler_context.execute_step_args.step_keys_to_execute) == 1 | ||||
), "Launching multiple steps is not currently supported" | ), "Launching multiple steps is not currently supported" | ||||
step_key = step_handler_context.execute_step_args.step_keys_to_execute[0] | step_key = step_handler_context.execute_step_args.step_keys_to_execute[0] | ||||
k8s_name_key = get_k8s_job_name( | k8s_name_key = get_k8s_job_name( | ||||
step_handler_context.execute_step_args.pipeline_run_id, | step_handler_context.execute_step_args.pipeline_run_id, | ||||
step_key, | step_key, | ||||
Show All 16 Lines | def launch_step(self, step_handler_context: StepHandlerContext): | ||||
job = construct_dagster_k8s_job( | job = construct_dagster_k8s_job( | ||||
job_config, | job_config, | ||||
args, | args, | ||||
job_name, | job_name, | ||||
get_user_defined_k8s_config(frozentags()), | get_user_defined_k8s_config(frozentags()), | ||||
pod_name, | pod_name, | ||||
) | ) | ||||
events.append( | |||||
DagsterEvent( | |||||
event_type_value=DagsterEventType.ENGINE_EVENT.value, | |||||
pipeline_name=step_handler_context.execute_step_args.pipeline_origin.pipeline_name, | |||||
message=f"Executing step {step_key} in Kubernetes job {job_name}", | |||||
event_specific_data=EngineEventData( | |||||
[ | |||||
EventMetadataEntry.text(step_key, "Step key"), | |||||
EventMetadataEntry.text(job_name, "Kubernetes Job name"), | |||||
], | |||||
), | |||||
) | |||||
) | |||||
kubernetes.config.load_incluster_config() | kubernetes.config.load_incluster_config() | ||||
kubernetes.client.BatchV1Api().create_namespaced_job( | kubernetes.client.BatchV1Api().create_namespaced_job( | ||||
body=job, namespace=self._job_namespace | body=job, namespace=self._job_namespace | ||||
) | ) | ||||
return [] | return events | ||||
def check_step_health(self, step_handler_context: StepHandlerContext): | def check_step_health(self, step_handler_context: StepHandlerContext): | ||||
assert ( | assert ( | ||||
len(step_handler_context.execute_step_args.step_keys_to_execute) == 1 | len(step_handler_context.execute_step_args.step_keys_to_execute) == 1 | ||||
), "Launching multiple steps is not currently supported" | ), "Launching multiple steps is not currently supported" | ||||
step_key = step_handler_context.execute_step_args.step_keys_to_execute[0] | step_key = step_handler_context.execute_step_args.step_keys_to_execute[0] | ||||
k8s_name_key = get_k8s_job_name( | k8s_name_key = get_k8s_job_name( | ||||
▲ Show 20 Lines • Show All 74 Lines • Show Last 20 Lines |