Changeset View
Changeset View
Standalone View
Standalone View
python_modules/dagster-test/dagster_test/toys/log_spew.py
import time | import time | ||||
from dagster import InputDefinition, Output, OutputDefinition, pipeline, solid | from dagster import In, MultiOut, Out, Output, graph, op | ||||
def nonce_solid(name, n_inputs, n_outputs): | def nonce_op(name, n_inputs, n_outputs): | ||||
"""Creates a solid with the given number of (meaningless) inputs and outputs. | """Creates a op with the given number of (meaningless) inputs and outputs. | ||||
Config controls the behavior of the nonce solid.""" | Config controls the behavior of the nonce op.""" | ||||
@solid( | @op( | ||||
name=name, | name=name, | ||||
input_defs=[InputDefinition(name="input_{}".format(i)) for i in range(n_inputs)], | ins={f"input_{i}": In() for i in range(n_inputs)}, | ||||
output_defs=[OutputDefinition(name="output_{}".format(i)) for i in range(n_outputs)], | out=MultiOut(outs=[Out(name=f"output_{i}") for i in range(n_outputs)]), | ||||
) | ) | ||||
def solid_fn(context, **_kwargs): | def op_fn(context, **_kwargs): | ||||
for i in range(200): | for i in range(200): | ||||
time.sleep(0.02) | time.sleep(0.02) | ||||
if i % 1000 == 420: | if i % 1000 == 420: | ||||
context.log.error("Error message seq={i} from solid {name}".format(i=i, name=name)) | context.log.error(f"Error message seq={i} from op {name}") | ||||
elif i % 100 == 0: | elif i % 100 == 0: | ||||
context.log.warning( | context.log.warning(f"Warning message seq={i} from op {name}") | ||||
"Warning message seq={i} from solid {name}".format(i=i, name=name) | |||||
) | |||||
elif i % 10 == 0: | elif i % 10 == 0: | ||||
context.log.info("Info message seq={i} from solid {name}".format(i=i, name=name)) | context.log.info(f"Info message seq={i} from op {name}") | ||||
else: | else: | ||||
context.log.debug("Debug message seq={i} from solid {name}".format(i=i, name=name)) | context.log.debug(f"Debug message seq={i} from op {name}") | ||||
for i in range(n_outputs): | for i in range(n_outputs): | ||||
yield Output(value="foo", output_name="output_{}".format(i)) | yield Output(value="foo", output_name=f"output_{i}") | ||||
return solid_fn | return op_fn | ||||
@pipeline( | @graph(description="Demo graph that spits out different types of log messages to the event log.") | ||||
description="Demo pipeline that spits out different types of log messages to the event log." | def log_spew_graph(): | ||||
) | one_in_one_out = nonce_op("one_in_one_out", 1, 1) | ||||
def log_spew(): | two_in_one_out = nonce_op("two_in_one_out", 2, 1) | ||||
one_in_one_out = nonce_solid("one_in_one_out", 1, 1) | |||||
two_in_one_out = nonce_solid("two_in_one_out", 2, 1) | op_a = nonce_op("no_in_two_out", 0, 2).alias("op_a") | ||||
op_b = one_in_one_out.alias("op_b") | |||||
solid_a = nonce_solid("no_in_two_out", 0, 2).alias("solid_a") | op_c = nonce_op("one_in_two_out", 1, 2).alias("op_c") | ||||
solid_b = one_in_one_out.alias("solid_b") | op_d = two_in_one_out.alias("op_d") | ||||
solid_c = nonce_solid("one_in_two_out", 1, 2).alias("solid_c") | op_e = one_in_one_out.alias("op_e") | ||||
solid_d = two_in_one_out.alias("solid_d") | op_f = two_in_one_out.alias("op_f") | ||||
solid_e = one_in_one_out.alias("solid_e") | op_g = nonce_op("one_in_none_out", 1, 0).alias("op_g") | ||||
solid_f = two_in_one_out.alias("solid_f") | |||||
solid_g = nonce_solid("one_in_none_out", 1, 0).alias("solid_g") | a_0, a_1 = op_a() | ||||
b = op_b(input_0=a_0) | |||||
a_0, a_1 = solid_a() | c_0, _c_1 = op_c(input_0=a_1) | ||||
b = solid_b(input_0=a_0) | d = op_d(input_0=b, input_1=c_0) | ||||
c_0, _c_1 = solid_c(input_0=a_1) | e = op_e(input_0=c_0) | ||||
d = solid_d(input_0=b, input_1=c_0) | f = op_f(input_0=d, input_1=e) | ||||
e = solid_e(input_0=c_0) | op_g(input_0=f) | ||||
f = solid_f(input_0=d, input_1=e) | |||||
solid_g(input_0=f) | |||||
log_spew_job = log_spew_graph.to_job(name="log_spew_job") |