Fanout Workflow Example¶
Tip
This example uses the echo
app, make sure to complete
that tutorial first.
This is a more advanced example demonstrating dynamic fanout and joining of results. This example shows how to create multiple copies of the input data, configure them with different options, and then run the model in parallel. Finally, it collects the results and merges them into a single output.
This is useful when you need to parallelize the execution of a model with different configurations or options.
import copy
import json
from typing import Any
import nextmv
from nextpipe import AppOption, AppRunConfig, FlowSpec, app, foreach, join, needs, step
class Workflow(FlowSpec):
@foreach() # Run the successor step for each item in the result list of this step
@step
def fanout(data: dict[str, Any]) -> list[AppRunConfig]:
"""
Creates 3 copies of the input and configures them for 3 different app options.
"""
inputs = [copy.deepcopy(data) for _ in range(3)]
run_configs = [
AppRunConfig(
input=input,
options=[AppOption("param", i)],
name=f"run-{i}",
)
for i, input in enumerate(inputs)
]
return run_configs
@step
def stats(data: dict[str, Any]) -> dict[str, Any]:
"""
Calculates some statistics to put on the output as well.
"""
return {"stats": {"count": len(json.dumps(data))}}
@app(app_id="echo")
@needs(predecessors=[fanout])
@step
def solve() -> dict[str, Any]:
"""
Runs the model.
"""
pass
@needs(predecessors=[solve, stats])
@join() # Collect the results from the previous 'foreach' step and combine them into a list passed as the arg
@step
def merge(results: list[list[dict[str, Any]]]) -> dict[str, Any]:
"""Merges the results."""
# Only return the second result from the merged results
result, stats = results[1]
return {
"solve_result": result,
"stats": stats,
}
def main():
"""Runs the workflow."""
# Load input data
input = nextmv.load()
# Run workflow
workflow = Workflow("DecisionWorkflow", input.data)
workflow.run()
# Write the result
result = workflow.get_result(workflow.merge)
nextmv.write(result)
if __name__ == "__main__":
main()
Run the example:
$ echo '{"hello": "world!"}' | python main.py
[nextpipe] No application ID or run ID found, uplink is inactive.
[nextpipe] Flow: Workflow
[nextpipe] nextpipe: v0.2.2.dev0
[nextpipe] nextmv: 0.28.0
[nextpipe] Flow graph steps:
[nextpipe] Step:
[nextpipe] Definition: Step(fanout)
[nextpipe] Docstring:
Creates 3 copies of the input and configures them for 3 different app parameters.
[nextpipe] Step:
[nextpipe] Definition: Step(stats)
[nextpipe] Docstring:
Calculates some statistics to put on the output as well.
[nextpipe] Step:
[nextpipe] Definition: Step(solve, StepNeeds(fanout), StepRun(echo, devint, {}, InputType.JSON, False))
[nextpipe] Docstring:
Runs the model.
[nextpipe] Step:
[nextpipe] Definition: Step(merge, StepNeeds(solve,stats))
[nextpipe] Docstring: Merges the results.
[nextpipe] Mermaid diagram:
[nextpipe] graph LR
fanout{ }
fanout(fanout)
fanout -- foreach --> solve
stats(stats)
stats -- join --> merge
solve(solve)
solve -- join --> merge
merge(merge)
[nextpipe] Mermaid URL: https://mermaid.ink/svg/Z3JhcGggTFIKICBmYW5vdXR7IH0KICBmYW5vdXQoZmFub3V0KQogIGZhbm91dCAtLSBmb3JlYWNoIC0tPiBzb2x2ZQogIHN0YXRzKHN0YXRzKQogIHN0YXRzIC0tIGpvaW4gLS0+IG1lcmdlCiAgc29sdmUoc29sdmUpCiAgc29sdmUgLS0gam9pbiAtLT4gbWVyZ2UKICBtZXJnZShtZXJnZSkK?theme=dark
[nextpipe] Running node fanout_0
[nextpipe] Running node stats_0
[nextpipe] Running node solve_0
[nextpipe] Running node solve_1
[nextpipe] Running node solve_2
[nextpipe] Running node merge_0
{
"solve_result": {
"options": {},
"solution": {
"echo": {
"data": {
"hello": "world!"
},
"args": [
"-param=1"
]
}
},
"statistics": {
"run": {
"duration": 0.00014638900756835938
}
},
"assets": []
},
"stats": {
"stats": {
"count": 19
}
}
}
From the generated output, you can see that the results are collected into a
list. The @foreach
decorator allows the prepare
step to run in parallel for
each item in the list, and the @join
decorator collects the results from
the solve
step into a list.
The resulting Mermaid diagram for this flow looks like this:
graph LR
fanout{ }
fanout(fanout)
fanout -- foreach --> solve
stats(stats)
stats -- join --> merge
solve(solve)
solve -- join --> merge
merge(merge)