Page MenuHomeElementl
Paste P150

parallel pipeline
ActivePublic

Authored by catherinewu on Feb 5 2021, 11:54 PM.
from dagster import fs_io_manager, PresetDefinition, pipeline, solid, String, Field, ModeDefinition
@solid(
config_schema={
"table_name": Field(
config=String,
description='something helpful :)',
is_required=True
)
}
)
def load_data(context):
return context.solid_config["table_name"],
@solid
def process_data(_, table_name):
return table_name
@solid
def write_data(_, table_name):
return table_name
@solid
def fan_in(_, names):
return names
multiprocess_run_config = {
"execution": {
"multiprocess": {
"config": {
"max_concurrent" : 4
}
}
},
}
@pipeline(
preset_defs=[
PresetDefinition(name='default'),
PresetDefinition(name='multiprocess', run_config=multiprocess_run_config)
],
mode_defs=[ModeDefinition(resource_defs={"io_manager": fs_io_manager})]
)
def parallel_pipeline():
table_names = ['foo', 'bar', 'baz']
fan_in_results = []
for table_name in table_names:
load_data_result = load_data.configured(name='load_data_' + table_name, config_or_config_fn={"table_name": table_name})()
process_data_result = process_data.alias('process_data_' + table_name)(load_data_result)
write_data_result = write_data.alias('write_data_' + table_name)(process_data_result)
fan_in_results.append(write_data_result)
fan_in(fan_in_results)

Event Timeline

catherinewu created this object with visibility "Public (No Login Required)".
catherinewu changed the title of this paste from parallel solids to parallel pipeline.Feb 6 2021, 12:36 AM
catherinewu edited the content of this paste. (Show Details)