Page MenuHomePhabricator

(An Untitled Masterwork)
ActivePublic

Authored by alangenfeld on Jul 31 2019, 5:20 PM.
def ALTERANATE_bash_solid_composition(job_descriptor):
job = FlowkeeperJob(job_descriptor)
input_definitions = []
output_definitions = []
input_mappings = []
for i in job.get_inputs():
input_definition = InputDefinition(name=i.get_id(), dagster_type=fk_type_map[i.type])
input_definitions.append(input_definition)
for o in job.get_outputs():
output_definitions.append(OutputDefinition(name=o.get_id(), dagster_type=fk_type_map[o.type]))
# Getting the job
command_template = Template(job.get_job()['command'])
@solid(
name=job.get_id() + '_bash',
description='invokes bash command {}'.format(command_tempate),
input_defs=input_definitions,
output_defs=[OutputDefinition(int, 'status')]
)
def _bash(context, **kwargs):
template = jinja2.Template(command_template, kwargs) # guessing here that kwargs should fill in the template
command = template.render().split()
process = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
with process.stdout:
log_subprocess_output(context, process.stdout)
return process.get_exit_code_or_something
@solid(
name=job.get_id() + '_verify',
input_defs=[InputDefinition('status', int)]
output_defs=output_definitions
)
def _check(context, status):
# check status
for o in job.get_outputs():
# check outputs and return a thing for each
value = turn_job_desc_in_to_output_path(o)
yield Output(value, o.get_id())
@composite_solid(
name=job.get_id(),
input_defs=input_definitions,
output_defs=output_definitions,
)
def _job(**inputs):
s = _bash(**inputs)
return _check(s)
return _job

Event Timeline

alangenfeld created this object with visibility "Public (No Login Required)".