Getting started¶
nextpipe is a Python framework for modeling and executing decision pipelines.
It allows you to define complex workflows with multiple steps, manage
dependencies between these steps, and integrate with external applications and
solvers.
You can find example workflows among the community apps. Their
name starts with python-wf-* (wf stands for workflow). These examples can be
cloned using the Nextmv CLI tool:
Core Concepts¶
Workflows - FlowSpec¶
Reference
Find the reference for the FlowSpec class here.
A Workflow in nextpipe represents a complete pipeline or flow. It's defined
as a Python class that extends FlowSpec:
Steps - @step¶
Reference
Find the reference for the step decorator here.
Steps are the fundamental building blocks of a pipeline. Each step is a
function decorated with @step:
@step
def prepare(input: dict):
"""Prepares the data."""
# Transform input data
return transformed_data
Steps can process data, make API calls, or perform any computation needed in your pipeline.
Dependencies - @needs¶
Reference
Find the reference for the needs decorator here.
Steps can depend on the results of other steps. Use the @needs decorator to
specify dependencies:
@needs(predecessors=[prepare])
@step
def process(data: dict):
"""Process prepared data."""
return processed_data
When a step has predecessors, the return values from those predecessors are
automatically passed as parameters to the step function. The parameters must
match the order of the predecessors listed in the @needs decorator. For
example:
@step
def step1(input: dict):
return {"result1": "value1"}
@step
def step2(input: dict):
return {"result2": "value2"}
@needs(predecessors=[step1, step2])
@step
def process_both(result_from_step1: dict, result_from_step2: dict):
# result_from_step1 contains {"result1": "value1"}
# result_from_step2 contains {"result2": "value2"}
return combined_result
External Applications - @app¶
Reference
Find the reference for the app decorator here.
nextpipe can integrate with external Nextmv applications using the @app
decorator:
@app(app_id="solver-app")
@needs(predecessors=[prepare])
@step
def solve():
"""Run external solver."""
pass
Note that @app steps don't need a function body - the decorator handles
calling the external application.
Optional Steps - @optional¶
Reference
Find the reference for the optional decorator here.
The @optional decorator allows steps to be conditionally executed based on a
provided condition function:
@optional(condition=lambda step: some_condition)
@step
def conditional_step(data: dict):
"""This step only runs when the condition is True."""
return processed_data
The condition function takes the step as a parameter and should return a boolean indicating whether the step should be executed.
Repeated Execution - @repeat¶
Reference
Find the reference for the repeat decorator here.
The @repeat decorator makes a step execute multiple times:
@repeat(repetitions=3)
@step
def repeated_step(input: dict):
"""This step runs 3 times."""
return processed_data
This is useful when you need to run the same step with different random seeds or configurations multiple times.
Parallel Execution - @foreach¶
Reference
Find the reference for the foreach decorator here.
The @foreach decorator enables dynamic fanout, running a successor step for
each item in a list:
@foreach()
@step
def create_scenarios(data: dict):
"""Create multiple scenarios to solve."""
return [scenario1, scenario2, scenario3] # Each will be processed separately
Collecting Results - @join¶
Reference
Find the reference for the join decorator here.
The @join decorator collects results from previous steps into a list:
@needs(predecessors=[solve])
@join()
@step
def merge(results: list[dict]):
"""Merge results from multiple executions."""
return merged_result
Dynamically customizing App Runs¶
Reference
Find the reference for the AppRunConfig class here.
The AppRunConfig class allows you to dynamically customize app runs.
Particularly, when fanning out steps using the @foreach decorator, you can
pass a list of AppRunConfig objects to specify different configurations for
each run:
from nextpipe.schema import AppRunConfig
@foreach()
@step
def create_scenarios(data: dict):
"""Create multiple scenarios to solve with different configurations."""
return [
AppRunConfig(
name="scenario1",
input=data,
options={
"solve.duration": "5s",
},
),
AppRunConfig(
name="scenario2",
input=data,
options={
"solve.duration": "10s",
},
),
]
@needs(predecessors=[create_scenarios])
@app(app_id="solver-app")
@step
def solve():
"""Run external solver for each scenario."""
pass
The AppRunConfig will be applied to each run of the solve step.
Output & visualization¶
After running a nextpipe program, the output is composed of the following
components:
nextpipelogs detailing the execution of the workflow, and information about the workflow diagram. All of this information is printed tostderr.- The actual output of the workflow, printed to
stdout.
nextpipe outputs a Mermaid diagram of the workflow. The diagram is shown as
source and as a link to the rendered diagram.
Consider the following output taken from the basic example:
[nextpipe] No application ID or run ID found, uplink is inactive.
[nextpipe] Flow: Workflow
[nextpipe] nextpipe: v0.2.2.dev0
[nextpipe] nextmv: 0.28.0
[nextpipe] Flow graph steps:
[nextpipe] Step:
[nextpipe] Definition: Step(prepare)
[nextpipe] Docstring: Prepares the data.
[nextpipe] Step:
[nextpipe] Definition: Step(solve, StepNeeds(prepare), StepRun(echo, , {}, InputType.JSON, False))
[nextpipe] Docstring: Runs the model.
[nextpipe] Step:
[nextpipe] Definition: Step(enhance, StepNeeds(solve))
[nextpipe] Docstring: Enhances the result.
[nextpipe] Mermaid diagram:
[nextpipe] graph LR
prepare(prepare)
prepare --> solve
solve(solve)
solve --> enhance
enhance(enhance)
[nextpipe] Mermaid URL: https://mermaid.ink/svg/Z3JhcGggTFIKICBwcmVwYXJlKHByZXBhcmUpCiAgcHJlcGFyZSAtLT4gc29sdmUKICBzb2x2ZShzb2x2ZSkKICBzb2x2ZSAtLT4gZW5oYW5jZQogIGVuaGFuY2UoZW5oYW5jZSkK?theme=dark
[nextpipe] Running node prepare_0
[nextpipe] Running node solve_0
[nextpipe] Running node enhance_0
{
"echo": {
"data": {
"hello": "world!",
"prepared": true
},
"args": []
},
"enhanced": true
}
As you can observe from the output, nextpipe automatically generates a
Mermaid diagram to visualize the flow structure.
graph LR
prepare(prepare)
prepare --> solve
solve(solve)
solve --> enhance
enhance(enhance)
The diagram can be viewed in a browser by following the link provided in the output. The diagram shows the flow of data between the steps.