Changeset View
Changeset View
Standalone View
Standalone View
python_modules/libraries/dagster-airflow/dagster_airflow/cli.py
import os | import os | ||||
from datetime import datetime, timedelta | from datetime import timedelta | ||||
import click | import click | ||||
import six | import six | ||||
import yaml | import yaml | ||||
from dagster import check, seven | from dagster import check, seven | ||||
from dagster.cli.load_handle import recon_repo_for_cli_args | from dagster.cli.load_handle import recon_repo_for_cli_args | ||||
from dagster.seven import get_current_datetime_in_utc | |||||
from dagster.utils import load_yaml_from_glob_list | from dagster.utils import load_yaml_from_glob_list | ||||
from dagster.utils.indenting_printer import IndentingStringIoPrinter | from dagster.utils.indenting_printer import IndentingStringIoPrinter | ||||
def construct_environment_yaml(preset_name, config, pipeline_name, module_name): | def construct_environment_yaml(preset_name, config, pipeline_name, module_name): | ||||
# Load environment dict from either a preset or yaml file globs | # Load environment dict from either a preset or yaml file globs | ||||
if preset_name: | if preset_name: | ||||
if config: | if config: | ||||
Show All 18 Lines | if "storage" not in run_config: | ||||
run_config["storage"] = { | run_config["storage"] = { | ||||
"filesystem": {"config": {"base_dir": six.ensure_str(dagster_tmp_path)}} | "filesystem": {"config": {"base_dir": six.ensure_str(dagster_tmp_path)}} | ||||
} | } | ||||
return run_config | return run_config | ||||
def construct_scaffolded_file_contents(module_name, pipeline_name, run_config): | def construct_scaffolded_file_contents(module_name, pipeline_name, run_config): | ||||
yesterday = datetime.now() - timedelta(1) | yesterday = get_current_datetime_in_utc() - timedelta(1) | ||||
dgibson: i don't know enough about airflow to understand the implications of this change | |||||
printer = IndentingStringIoPrinter(indent_level=4) | printer = IndentingStringIoPrinter(indent_level=4) | ||||
printer.line("'''") | printer.line("'''") | ||||
printer.line( | printer.line( | ||||
"The airflow DAG scaffold for {module_name}.{pipeline_name}".format( | "The airflow DAG scaffold for {module_name}.{pipeline_name}".format( | ||||
module_name=module_name, pipeline_name=pipeline_name | module_name=module_name, pipeline_name=pipeline_name | ||||
) | ) | ||||
) | ) | ||||
▲ Show 20 Lines • Show All 119 Lines • Show Last 20 Lines |
i don't know enough about airflow to understand the implications of this change