Changeset View
Changeset View
Standalone View
Standalone View
python_modules/dagit/dagit_tests/pipeline.py
import datetime | |||||
from dagster import ( | from dagster import ( | ||||
InputDefinition, | InputDefinition, | ||||
Int, | Int, | ||||
OutputDefinition, | OutputDefinition, | ||||
daily_schedule, | daily_schedule, | ||||
lambda_solid, | lambda_solid, | ||||
pipeline, | pipeline, | ||||
repository, | repository, | ||||
) | ) | ||||
from dagster.core.test_utils import today_at_midnight | |||||
@lambda_solid(input_defs=[InputDefinition("num", Int)], output_def=OutputDefinition(Int)) | @lambda_solid(input_defs=[InputDefinition("num", Int)], output_def=OutputDefinition(Int)) | ||||
def add_one(num): | def add_one(num): | ||||
return num + 1 | return num + 1 | ||||
@lambda_solid(input_defs=[InputDefinition("num", Int)], output_def=OutputDefinition(Int)) | @lambda_solid(input_defs=[InputDefinition("num", Int)], output_def=OutputDefinition(Int)) | ||||
def mult_two(num): | def mult_two(num): | ||||
return num * 2 | return num * 2 | ||||
@pipeline | @pipeline | ||||
def math(): | def math(): | ||||
return mult_two(num=add_one()) | return mult_two(num=add_one()) | ||||
@daily_schedule(pipeline_name="math", start_date=datetime.datetime.now()) | @daily_schedule( | ||||
pipeline_name="math", start_date=today_at_midnight(), | |||||
) | |||||
def my_schedule(_): | def my_schedule(_): | ||||
return {"solids": {"mult_two": {"inputs": {"num": {"value": 2}}}}} | return {"solids": {"mult_two": {"inputs": {"num": {"value": 2}}}}} | ||||
@repository | @repository | ||||
def test_repository(): | def test_repository(): | ||||
return [math] | return [math] |