Changeset View
Changeset View
Standalone View
Standalone View
python_modules/dagster/dagster/cli/api.py
from __future__ import print_function | from __future__ import print_function | ||||
import os | import os | ||||
import signal | import signal | ||||
import sys | import sys | ||||
import threading | |||||
import time | import time | ||||
import warnings | |||||
from collections import namedtuple | from collections import namedtuple | ||||
from contextlib import contextmanager | from contextlib import contextmanager | ||||
import click | import click | ||||
from dagster import check, seven | from dagster import check, seven | ||||
from dagster.cli.workspace.cli_target import ( | from dagster.cli.workspace.cli_target import ( | ||||
get_repository_location_from_kwargs, | get_repository_location_from_kwargs, | ||||
▲ Show 20 Lines • Show All 308 Lines • ▼ Show 20 Lines | @click.command( | ||||
name="execute_run_with_structured_logs", | name="execute_run_with_structured_logs", | ||||
help=( | help=( | ||||
"[INTERNAL] This is an internal utility. Users should generally not invoke this command " | "[INTERNAL] This is an internal utility. Users should generally not invoke this command " | ||||
"interactively." | "interactively." | ||||
), | ), | ||||
) | ) | ||||
@click.argument("input_json", type=click.STRING) | @click.argument("input_json", type=click.STRING) | ||||
def execute_run_with_structured_logs_command(input_json): | def execute_run_with_structured_logs_command(input_json): | ||||
try: | |||||
signal.signal(signal.SIGTERM, signal.getsignal(signal.SIGINT)) | signal.signal(signal.SIGTERM, signal.getsignal(signal.SIGINT)) | ||||
except ValueError: | |||||
warnings.warn( | |||||
( | |||||
"Unexpected error attempting to manage signal handling on thread {thread_name}. " | |||||
"You should not invoke this API (execute_run_with_structured_logs) from threads " | |||||
"other than the main thread." | |||||
).format(thread_name=threading.current_thread().name) | |||||
) | |||||
args = check.inst(deserialize_json_to_dagster_namedtuple(input_json), ExecuteRunArgs) | args = check.inst(deserialize_json_to_dagster_namedtuple(input_json), ExecuteRunArgs) | ||||
recon_pipeline = recon_pipeline_from_origin(args.pipeline_origin) | recon_pipeline = recon_pipeline_from_origin(args.pipeline_origin) | ||||
with ( | with ( | ||||
DagsterInstance.from_ref(args.instance_ref) if args.instance_ref else DagsterInstance.get() | DagsterInstance.from_ref(args.instance_ref) if args.instance_ref else DagsterInstance.get() | ||||
) as instance: | ) as instance: | ||||
buffer = [] | buffer = [] | ||||
▲ Show 20 Lines • Show All 60 Lines • ▼ Show 20 Lines | @click.command( | ||||
name="execute_step_with_structured_logs", | name="execute_step_with_structured_logs", | ||||
help=( | help=( | ||||
"[INTERNAL] This is an internal utility. Users should generally not invoke this command " | "[INTERNAL] This is an internal utility. Users should generally not invoke this command " | ||||
"interactively." | "interactively." | ||||
), | ), | ||||
) | ) | ||||
@click.argument("input_json", type=click.STRING) | @click.argument("input_json", type=click.STRING) | ||||
def execute_step_with_structured_logs_command(input_json): | def execute_step_with_structured_logs_command(input_json): | ||||
try: | |||||
signal.signal(signal.SIGTERM, signal.getsignal(signal.SIGINT)) | signal.signal(signal.SIGTERM, signal.getsignal(signal.SIGINT)) | ||||
except ValueError: | |||||
warnings.warn( | |||||
( | |||||
"Unexpected error attempting to manage signal handling on thread {thread_name}. " | |||||
"You should not invoke this API (execute_step_with_structured_logs) from threads " | |||||
"other than the main thread." | |||||
).format(thread_name=threading.current_thread().name) | |||||
) | |||||
args = check.inst(deserialize_json_to_dagster_namedtuple(input_json), ExecuteStepArgs) | args = check.inst(deserialize_json_to_dagster_namedtuple(input_json), ExecuteStepArgs) | ||||
with ( | with ( | ||||
DagsterInstance.from_ref(args.instance_ref) if args.instance_ref else DagsterInstance.get() | DagsterInstance.from_ref(args.instance_ref) if args.instance_ref else DagsterInstance.get() | ||||
) as instance: | ) as instance: | ||||
pipeline_run = instance.get_run_by_id(args.pipeline_run_id) | pipeline_run = instance.get_run_by_id(args.pipeline_run_id) | ||||
recon_pipeline = recon_pipeline_from_origin(args.pipeline_origin) | recon_pipeline = recon_pipeline_from_origin(args.pipeline_origin) | ||||
▲ Show 20 Lines • Show All 395 Lines • Show Last 20 Lines |