Changeset View
Changeset View
Standalone View
Standalone View
python_modules/libraries/dagster-aws/dagster_aws/ecs/launcher.py
Show First 20 Lines • Show All 126 Lines • ▼ Show 20 Lines | def launch_run(self, context: LaunchRunContext) -> None: | ||||
self.ecs.tag_resource(resourceArn=arn, tags=self._ecs_tags(run.run_id)) | self.ecs.tag_resource(resourceArn=arn, tags=self._ecs_tags(run.run_id)) | ||||
self._instance.report_engine_event( | self._instance.report_engine_event( | ||||
message=f"Launching run in task {arn} on cluster {metadata.cluster}", | message=f"Launching run in task {arn} on cluster {metadata.cluster}", | ||||
pipeline_run=run, | pipeline_run=run, | ||||
cls=self.__class__, | cls=self.__class__, | ||||
) | ) | ||||
def can_terminate(self, run_id): | def can_terminate(self, run_id): | ||||
arn = self._instance.get_run_by_id(run_id).tags.get("ecs/task_arn") | tags = self._instance.get_run_by_id(run_id).tags | ||||
if arn: | arn = tags.get("ecs/task_arn") | ||||
cluster = self._task_metadata().cluster | cluster = tags.get("ecs/cluster") | ||||
if arn and cluster: | |||||
status = self.ecs.describe_tasks(tasks=[arn], cluster=cluster)["tasks"][0]["lastStatus"] | status = self.ecs.describe_tasks(tasks=[arn], cluster=cluster)["tasks"][0]["lastStatus"] | ||||
dgibson: should we check that ["tasks"] is non-empty too to avoid an index error | |||||
Done Inline ActionsI haven't been able to reproduce a case where the arn and cluster tags both exist, the request succeeds, but the tasks key is empty. I'm not entirely sure what we'd do in that case - we could retry (but for how long?) or we could return False (but that might not actually be accurate). I think for now, I'd prefer to keep it as is and if we start to see this bubble up, we can reconsider. jordansanders: I haven't been able to reproduce a case where the arn and cluster tags both exist, the request… | |||||
Done Inline ActionsActually, maybe I'll just return False. jordansanders: Actually, maybe I'll just return False. | |||||
if status != "STOPPED": | if status != "STOPPED": | ||||
return True | return True | ||||
return False | return False | ||||
def terminate(self, run_id): | def terminate(self, run_id): | ||||
cluster = self._task_metadata().cluster | tags = self._instance.get_run_by_id(run_id).tags | ||||
arn = self._instance.get_run_by_id(run_id).tags.get("ecs/task_arn") | arn = tags.get("ecs/task_arn") | ||||
status = self.ecs.describe_tasks(tasks=[arn], cluster=cluster)["tasks"][0]["lastStatus"] | cluster = tags.get("ecs/cluster") | ||||
status = ( | |||||
self.ecs.describe_tasks(tasks=[arn], cluster=cluster) | |||||
.get("tasks", [{}])[0] | |||||
.get("lastStatus") | |||||
) | |||||
if status == "STOPPED": | if status == "STOPPED": | ||||
return False | return False | ||||
self.ecs.stop_task(task=arn, cluster=cluster) | self.ecs.stop_task(task=arn, cluster=cluster) | ||||
return True | return True | ||||
def _task_definition(self, metadata, image): | def _task_definition(self, metadata, image): | ||||
""" | """ | ||||
▲ Show 20 Lines • Show All 114 Lines • Show Last 20 Lines |
should we check that ["tasks"] is non-empty too to avoid an index error