Complex Workflow Example¶
Tip
This example requires you to subscribe to the following Nextmv Marketplace apps, and name them as follows:
- Nextmv Routing:
routing-nextroute
- OR-Tools Routing:
routing-ortools
- PyVroom Routing:
routing-pyvroom
This example combines several concepts to create a more complex workflow that:
- Integrates multiple solvers (Nextroute, OR-Tools, PyVroom) to solve the same routing problem.
- Creates parallel execution paths with different data transformations.
- Uses advanced features including:
- Repeated execution (
@repeat
) - Data transformation (
convert
step) - Dependencies between steps (
@needs
) - External solver integration (
@app
) - Compares and selects results from multiple solvers to find the optimal solution.
The complexity comes from orchestrating multiple solvers in parallel with different data preparation requirements and aggregating their results into a single optimal solution.
import copy
from typing import Any
import nextmv
from nextpipe import FlowSpec, app, log, needs, repeat, step
class Workflow(FlowSpec):
@step
def prepare(input: dict[str, Any]) -> dict[str, Any]:
"""Prepares the data."""
return input
@needs(predecessors=[prepare])
@step
def convert(input: dict[str, Any]) -> dict[str, Any]:
"""Converts the data."""
clone = copy.deepcopy(input)
if "defaults" in clone and "stops" in clone["defaults"] and "quantity" in clone["defaults"]["stops"]:
clone["defaults"]["stops"]["quantity"] *= -1
for stop in clone["stops"]:
if "quantity" in stop:
stop["quantity"] *= -1
return clone
@repeat(repetitions=2)
@app(app_id="routing-nextroute", instance_id="latest")
@needs(predecessors=[prepare])
@step
def run_nextroute() -> list[dict[str, Any]]:
"""Runs the model."""
pass
@app(app_id="routing-ortools", instance_id="latest")
@needs(predecessors=[convert])
@step
def run_ortools() -> dict[str, Any]:
"""Runs the model."""
pass
@app(app_id="routing-pyvroom", instance_id="latest")
@needs(predecessors=[convert])
@step
def run_pyvroom() -> dict[str, Any]:
"""Runs the model."""
pass
@needs(predecessors=[run_nextroute, run_ortools, run_pyvroom])
@step
def pick_best(
results_nextroute: list[dict[str, Any]],
result_ortools: dict[str, Any],
result_pyvroom: dict[str, Any],
):
"""Aggregates the results."""
results = results_nextroute + [result_ortools, result_pyvroom]
best_solution_idx = min(
range(len(results)),
key=lambda i: results[i]["statistics"]["result"]["value"],
)
values = [result["statistics"]["result"]["value"] for result in results]
values.sort()
log(f"Values: {values}")
# For test stability reasons, we always return the or-tools result
_ = results.pop(best_solution_idx)
return result_ortools
def main():
"""Runs the workflow."""
# Load input data
input = nextmv.load()
# Run workflow
workflow = Workflow("DecisionWorkflow", input.data)
workflow.run()
# Write the result
result = workflow.get_result(workflow.pick_best)
nextmv.write(result)
if __name__ == "__main__":
main()
To run the example, we are going to use an example JSON file from a GitHub
Gist. We are going to use the curl
command to download the JSON file and pipe it
to the main.py
script. The JSON file contains a set of stops and vehicles for
the routing problem.
$ curl "https://gist.githubusercontent.com/merschformann/a90959b87d1360b604e4a9f6457340ca/raw/661e631376bdf78a07548a3cd136c1fc6e47c639/muenster.json" | python main.py
% Total % Received % Xferd Average Speed Time Time Time Current
Dload Upload Total Spent Left Speed
100 4928k 100 4928k 0 0 19.1M 0 --:--:-- --:--:-- --:--:-- 19.1M
[nextpipe] No application ID or run ID found, uplink is inactive.
[nextpipe] Flow: Workflow
[nextpipe] nextpipe: v0.2.2.dev0
[nextpipe] nextmv: 0.28.0
[nextpipe] Flow graph steps:
[nextpipe] Step:
[nextpipe] Definition: Step(prepare)
[nextpipe] Docstring: Prepares the data.
[nextpipe] Step:
[nextpipe] Definition: Step(convert, StepNeeds(prepare))
[nextpipe] Docstring: Converts the data.
[nextpipe] Step:
[nextpipe] Definition: Step(run_nextroute, StepNeeds(prepare), StepRepeat(2), StepRun(routing-nextroute, latest, {}, InputType.JSON, False))
[nextpipe] Docstring: Runs the model.
[nextpipe] Step:
[nextpipe] Definition: Step(run_ortools, StepNeeds(convert), StepRun(routing-ortools, latest, {}, InputType.JSON, False))
[nextpipe] Docstring: Runs the model.
[nextpipe] Step:
[nextpipe] Definition: Step(run_pyvroom, StepNeeds(convert), StepRun(routing-pyvroom, latest, {}, InputType.JSON, False))
[nextpipe] Docstring: Runs the model.
[nextpipe] Step:
[nextpipe] Definition: Step(pick_best, StepNeeds(run_nextroute,run_ortools,run_pyvroom))
[nextpipe] Docstring: Aggregates the results.
[nextpipe] Mermaid diagram:
[nextpipe] graph LR
prepare(prepare)
prepare --> convert
prepare --> run_nextroute
convert(convert)
convert --> run_ortools
convert --> run_pyvroom
run_nextroute{ }
run_nextroute_join{ }
run_nextroute_0(run_nextroute_0)
run_nextroute --> run_nextroute_0
run_nextroute_0 --> run_nextroute_join
run_nextroute_1(run_nextroute_1)
run_nextroute --> run_nextroute_1
run_nextroute_1 --> run_nextroute_join
run_nextroute_join --> pick_best
run_ortools(run_ortools)
run_ortools --> pick_best
run_pyvroom(run_pyvroom)
run_pyvroom --> pick_best
pick_best(pick_best)
[nextpipe] Mermaid URL: https://mermaid.ink/svg/Z3JhcGggTFIKICBwcmVwYXJlKHByZXBhcmUpCiAgcHJlcGFyZSAtLT4gY29udmVydAogIHByZXBhcmUgLS0+IHJ1bl9uZXh0cm91dGUKICBjb252ZXJ0KGNvbnZlcnQpCiAgY29udmVydCAtLT4gcnVuX29ydG9vbHMKICBjb252ZXJ0IC0tPiBydW5fcHl2cm9vbQogIHJ1bl9uZXh0cm91dGV7IH0KICBydW5fbmV4dHJvdXRlX2pvaW57IH0KICBydW5fbmV4dHJvdXRlXzAocnVuX25leHRyb3V0ZV8wKQogIHJ1bl9uZXh0cm91dGUgLS0+IHJ1bl9uZXh0cm91dGVfMAogIHJ1bl9uZXh0cm91dGVfMCAtLT4gcnVuX25leHRyb3V0ZV9qb2luCiAgcnVuX25leHRyb3V0ZV8xKHJ1bl9uZXh0cm91dGVfMSkKICBydW5fbmV4dHJvdXRlIC0tPiBydW5fbmV4dHJvdXRlXzEKICBydW5fbmV4dHJvdXRlXzEgLS0+IHJ1bl9uZXh0cm91dGVfam9pbgogIHJ1bl9uZXh0cm91dGVfam9pbiAtLT4gcGlja19iZXN0CiAgcnVuX29ydG9vbHMocnVuX29ydG9vbHMpCiAgcnVuX29ydG9vbHMgLS0+IHBpY2tfYmVzdAogIHJ1bl9weXZyb29tKHJ1bl9weXZyb29tKQogIHJ1bl9weXZyb29tIC0tPiBwaWNrX2Jlc3QKICBwaWNrX2Jlc3QocGlja19iZXN0KQo=?theme=dark
[nextpipe] Running node prepare_0
[nextpipe] Running node run_nextroute_0
[nextpipe] Running node run_nextroute_1
[nextpipe] Running node convert_0
[nextpipe] Running node run_pyvroom_0
[nextpipe] Running node run_ortools_0
[nextpipe] Running node pick_best_0
[pick_best_0] Values: [31559.0, 32951.17999815941, 32999.0, 33122.33599805832]
{
"options": {
"input": "",
"output": "",
"duration": 30
},
"solution": {
"vehicles": [
{
"id": "vehicle-1",
"route_travel_distance": 1063,
"route": [
{
"stop": {
"location": {
"lon": 7.62558,
"lat": 51.96223
},
"id": "vehicle-1_start"
}
},
...
... (truncated for brevity)
...
],
"unplanned": []
},
"statistics": {
"run": {
"duration": 29.18671941757202
},
"result": {
"value": 32999.0,
"custom": {
"solution_found": true,
"activated_vehicles": 20,
"max_route_duration": 5067,
"max_stops_in_vehicle": 27,
"min_stops_in_vehicle": 0
}
},
"schema": "v1"
},
"assets": []
}
Here is the resulting Mermaid diagram of the workflow:
graph LR
prepare(prepare)
prepare --> convert
prepare --> run_nextroute
convert(convert)
convert --> run_ortools
convert --> run_pyvroom
run_nextroute{ }
run_nextroute_join{ }
run_nextroute_0(run_nextroute_0)
run_nextroute --> run_nextroute_0
run_nextroute_0 --> run_nextroute_join
run_nextroute_1(run_nextroute_1)
run_nextroute --> run_nextroute_1
run_nextroute_1 --> run_nextroute_join
run_nextroute_join --> pick_best
run_ortools(run_ortools)
run_ortools --> pick_best
run_pyvroom(run_pyvroom)
run_pyvroom --> pick_best
pick_best(pick_best)