Changeset View
Changeset View
Standalone View
Standalone View
python_modules/dagster/dagster/core/execution/compute_logs.py
import hashlib | |||||
import io | import io | ||||
import os | import os | ||||
import subprocess | import subprocess | ||||
import sys | import sys | ||||
import tempfile | import tempfile | ||||
import time | import time | ||||
import uuid | import uuid | ||||
import warnings | import warnings | ||||
from contextlib import contextmanager | from contextlib import contextmanager | ||||
from typing import List | |||||
from dagster.core.execution import poll_compute_logs, watch_orphans | from dagster.core.execution import poll_compute_logs, watch_orphans | ||||
from dagster.core.execution.plan.plan import ExecutionStep | |||||
from dagster.serdes.ipc import interrupt_ipc_subprocess, open_ipc_subprocess | from dagster.serdes.ipc import interrupt_ipc_subprocess, open_ipc_subprocess | ||||
from dagster.seven import IS_WINDOWS, wait_for_process | from dagster.seven import IS_WINDOWS, json, wait_for_process | ||||
from dagster.utils import ensure_file | from dagster.utils import ensure_file | ||||
WIN_PY36_COMPUTE_LOG_DISABLED_MSG = """\u001b[33mWARNING: Compute log capture is disabled for the current environment. Set the environment variable `PYTHONLEGACYWINDOWSSTDIO` to enable.\n\u001b[0m""" | WIN_PY36_COMPUTE_LOG_DISABLED_MSG = """\u001b[33mWARNING: Compute log capture is disabled for the current environment. Set the environment variable `PYTHONLEGACYWINDOWSSTDIO` to enable.\n\u001b[0m""" | ||||
@contextmanager | @contextmanager | ||||
def redirect_to_file(stream, filepath): | def redirect_to_file(stream, filepath): | ||||
with open(filepath, "a+", buffering=1) as file_stream: | with open(filepath, "a+", buffering=1) as file_stream: | ||||
▲ Show 20 Lines • Show All 137 Lines • ▼ Show 20 Lines | except io.UnsupportedOperation: | ||||
# UnsupportedOperation if `fileno` is accessed. We need to make sure we do not error out, | # UnsupportedOperation if `fileno` is accessed. We need to make sure we do not error out, | ||||
# or tests will fail | # or tests will fail | ||||
return None | return None | ||||
if isinstance(fd, int): | if isinstance(fd, int): | ||||
return fd | return fd | ||||
return None | return None | ||||
def compute_log_key_for_steps(steps: List[ExecutionStep]) -> str: | |||||
# Construct a log_key for a list of execution steps (from an ExecutionPlan). Used to construct | |||||
# the log_key for a set of steps (e.g. when capturing the compute logs for all the steps | |||||
# executed from within a process) | |||||
alangenfeld: mypy & comment | |||||
if len(steps) == 1: | |||||
return steps[0].key | |||||
return hashlib.md5(json.dumps([step.key for step in steps]).encode("utf-8")).hexdigest() |
mypy & comment