Skip to content

Flow Module

This module contains functionality related to workflow definition and execution.

flow

Flow module for defining and executing decision pipelines.

This module provides classes and functions for creating, configuring, and executing decision pipelines.

CLASS DESCRIPTION
FlowStep

Represents a step in a workflow pipeline.

FlowNode

Represents a specific instance of a step in execution.

FlowSpec

Defines a complete workflow specification.

FlowGraph

Represents the directed acyclic graph (DAG) of steps in a flow.

StepVisitor

AST visitor for finding step functions in a flow class.

Runner

Handles the execution of a flow graph.

The module also defines status constants used throughout pipeline execution.

FlowGraph

FlowGraph(flow_spec: FlowSpec)

Represents the directed acyclic graph (DAG) of steps in a flow.

You can import the FlowGraph class directly from nextpipe:

from nextpipe import FlowGraph

A FlowGraph contains all steps of a flow and their connections, forming a DAG. It provides methods for converting the graph to different representations.

PARAMETER DESCRIPTION

flow_spec

The flow specification this graph belongs to.

TYPE: FlowSpec

ATTRIBUTE DESCRIPTION
flow_spec

The flow specification this graph belongs to.

TYPE: FlowSpec

steps

List of all steps in the graph.

TYPE: list[FlowStep]

steps_by_definition

Dictionary mapping step definitions to FlowStep objects.

TYPE: dict

start_steps

List of steps with no predecessors (starting points).

TYPE: list[FlowStep]

Initialize a FlowGraph.

PARAMETER DESCRIPTION

flow_spec

The flow specification this graph belongs to.

TYPE: FlowSpec

Source code in nextpipe/flow.py
def __init__(self, flow_spec: FlowSpec):
    """
    Initialize a FlowGraph.

    Parameters
    ----------
    flow_spec : FlowSpec
        The flow specification this graph belongs to.
    """
    self.flow_spec = flow_spec
    self.__create_graph(flow_spec)
    self.__debug_print()
    # Create a Mermaid diagram of the graph and log it
    mermaid = self._to_mermaid()
    utils.log_internal("Mermaid diagram:")
    utils.log_internal(mermaid)
    mermaid_url = f"https://mermaid.ink/svg/{base64.b64encode(mermaid.encode('utf8')).decode('ascii')}?theme=dark"
    utils.log_internal(f"Mermaid URL: {mermaid_url}")

flow_spec instance-attribute

flow_spec = flow_spec

get_step

get_step(definition: Step) -> FlowStep

Get a FlowStep by its definition.

PARAMETER DESCRIPTION
definition

Step definition to look up.

TYPE: Step

RETURNS DESCRIPTION
FlowStep

The FlowStep matching the given definition.

Source code in nextpipe/flow.py
def get_step(self, definition: decorators.Step) -> FlowStep:
    """
    Get a FlowStep by its definition.

    Parameters
    ----------
    definition : decorators.Step
        Step definition to look up.

    Returns
    -------
    FlowStep
        The FlowStep matching the given definition.
    """

    return self.steps_by_definition[definition]

FlowNode

FlowNode(parent: FlowStep, index: int)

Represents a specific instance of a step in execution.

A FlowNode is created for each execution instance of a FlowStep. For example, when using foreach or repeat decorators, multiple nodes will be created for a single step.

PARAMETER DESCRIPTION

parent

The parent step this node belongs to.

TYPE: FlowStep

index

The index of this node within the parent step.

TYPE: int

ATTRIBUTE DESCRIPTION
parent

The parent step this node belongs to.

TYPE: FlowStep

index

The index of this node within the parent step.

TYPE: int

id

Unique identifier for this node.

TYPE: str

status

Current execution status (pending, running, succeeded, failed).

TYPE: str

error

Error message if the node execution failed.

TYPE: str or None

predecessors

Nodes that this node depends on.

TYPE: list[FlowNode]

run_id

ID of the application run if this is an app step.

TYPE: str or None

result

Result data from this node's execution.

TYPE: Any

done

Whether this node has completed execution.

TYPE: bool

cancel

Flag to indicate if this node's execution should be cancelled.

TYPE: bool

Initialize a FlowNode.

PARAMETER DESCRIPTION

parent

The parent step this node belongs to.

TYPE: FlowStep

index

The index of this node within the parent step.

TYPE: int

Source code in nextpipe/flow.py
def __init__(self, parent: FlowStep, index: int):
    """
    Initialize a FlowNode.

    Parameters
    ----------
    parent : FlowStep
        The parent step this node belongs to.
    index : int
        The index of this node within the parent step.
    """

    self.parent = parent
    self.index = index
    self.id = f"{parent.definition.get_id()}_{index}"
    self.status: str = STATUS_PENDING
    self.error: str = None
    self.predecessors: list[FlowNode] = []
    self.run_id: str = None
    self.result: Any = None
    self.done: bool = False
    self.cancel: bool = False

cancel instance-attribute

cancel: bool = False

done instance-attribute

done: bool = False

error instance-attribute

error: str = None

id instance-attribute

id = f'{get_id()}_{index}'

index instance-attribute

index = index

parent instance-attribute

parent = parent

predecessors instance-attribute

predecessors: list[FlowNode] = []

result instance-attribute

result: Any = None

run_id instance-attribute

run_id: str = None

status instance-attribute

status: str = STATUS_PENDING

FlowSpec

FlowSpec(
    name: str,
    input: dict,
    conf: Optional[Configuration] = None,
    client: Optional[Client] = None,
    uplink_config: Optional[UplinkConfig] = None,
)

Defines a complete workflow specification.

You can import the FlowSpec class directly from nextpipe:

from nextpipe import FlowSpec

FlowSpec is the main class to define a workflow. Users typically inherit from this class and decorate methods with @step to define the workflow steps.

PARAMETER DESCRIPTION

name

Name of the flow specification.

TYPE: str

input

Input data for the flow.

TYPE: dict

conf

Configuration for the flow, by default None.

TYPE: Optional[Configuration] DEFAULT: None

client

Nextmv client for API access, by default None.

TYPE: Optional[Client] DEFAULT: None

Configuration for uplink, by default None.

TYPE: Optional[UplinkConfig] DEFAULT: None

ATTRIBUTE DESCRIPTION
name

Name of the flow specification.

TYPE: str

config

Configuration for the flow.

TYPE: Configuration

client

Nextmv client for API access.

TYPE: Client

uplink

Client for communicating with the Nextmv platform.

TYPE: UplinkClient

graph

Graph representing the workflow.

TYPE: FlowGraph

input

Input data for the flow.

TYPE: dict

runner

Runner responsible for executing the flow.

TYPE: Runner

Examples:

from nextpipe import FlowSpec, step

class MyFlow(FlowSpec):
    @step
    def step1(self, input_data):
        # Process input data
        return {"processed": input_data}

    @step
    def step2(self, input_data):
        # Further processing
        return {"result": input_data["processed"] * 2}

# Create and run the flow
flow = MyFlow("my-flow", {"value": 5})
flow.run()

Initialize a FlowSpec.

PARAMETER DESCRIPTION

name

Name of the flow specification.

TYPE: str

input

Input data for the flow.

TYPE: dict

conf

Configuration for the flow, by default None.

TYPE: Optional[Configuration] DEFAULT: None

client

Nextmv client for API access, by default None.

TYPE: Optional[Client] DEFAULT: None

Configuration for uplink, by default None.

TYPE: Optional[UplinkConfig] DEFAULT: None

Source code in nextpipe/flow.py
def __init__(
    self,
    name: str,
    input: dict,
    conf: Optional[config.Configuration] = None,
    client: Optional[Client] = None,
    uplink_config: Optional[uplink.UplinkConfig] = None,
):
    """
    Initialize a FlowSpec.

    Parameters
    ----------
    name : str
        Name of the flow specification.
    input : dict
        Input data for the flow.
    conf : Optional[config.Configuration], optional
        Configuration for the flow, by default None.
    client : Optional[Client], optional
        Nextmv client for API access, by default None.
    uplink_config : Optional[uplink.UplinkConfig], optional
        Configuration for uplink, by default None.
    """

    self.name = name
    self.config = config.Configuration() if conf is None else conf
    self.client = Client() if client is None else client
    self.uplink = uplink.UplinkClient(self.client, uplink_config)
    # Create the graph
    self.graph = FlowGraph(self)
    # Inform platform about the graph
    self.uplink.submit_update(self.graph._to_uplink_dto())
    # Prepare for running the flow
    self.input = input
    self.runner = Runner(
        self,
        self.graph,
        self.config,
        self.uplink,
    )

client instance-attribute

client = Client() if client is None else client

config instance-attribute

config = Configuration() if conf is None else conf

get_result

get_result(step: callable) -> Union[object, None]

Get the result of a step.

PARAMETER DESCRIPTION
step

The step function to get the result for.

TYPE: callable

RETURNS DESCRIPTION
Union[object, None]

The result of the step, or None if the step is not done. If the step has multiple nodes, a list of results is returned.

RAISES DESCRIPTION
Exception

If the provided function does not have a step decorator.

Examples:

# Assuming a flow with a step called 'process_data'
result = flow.get_result(flow.process_data)
Source code in nextpipe/flow.py
def get_result(self, step: callable) -> Union[object, None]:
    """
    Get the result of a step.

    Parameters
    ----------
    step : callable
        The step function to get the result for.

    Returns
    -------
    Union[object, None]
        The result of the step, or None if the step is not done.
        If the step has multiple nodes, a list of results is returned.

    Raises
    ------
    Exception
        If the provided function does not have a step decorator.

    Examples
    --------
    ```python
    # Assuming a flow with a step called 'process_data'
    result = flow.get_result(flow.process_data)
    ```
    """
    if not hasattr(step, "step"):
        raise Exception(f"Step {step} does not have a step decorator.")
    s = self.graph.get_step(step.step)
    if not s.done:
        return None
    return [n.result for n in s.nodes] if len(s.nodes) > 1 else s.nodes[0].result

graph instance-attribute

graph = FlowGraph(self)

input instance-attribute

input = input

name instance-attribute

name = name

run

run()

Run the flow.

This method starts the flow execution and blocks until it completes or fails with an exception.

Source code in nextpipe/flow.py
def run(self):
    """
    Run the flow.

    This method starts the flow execution and blocks until it completes
    or fails with an exception.
    """

    self.runner.run()

runner instance-attribute

runner = Runner(self, graph, config, uplink)
uplink = UplinkClient(client, uplink_config)

FlowStep

FlowStep(
    step_function: callable,
    step_definition: Step,
    docstring: str,
)

Represents a step in a workflow pipeline.

A FlowStep is created from a function decorated with @step and maintains information about its position in the flow graph, including predecessors, successors, and execution nodes.

PARAMETER DESCRIPTION

step_function

The AST function node representing the step function.

TYPE: callable

step_definition

The step decorator instance that contains the step's configuration.

TYPE: Step

docstring

The docstring of the step function.

TYPE: str

ATTRIBUTE DESCRIPTION
step_function

The AST function node representing the step function.

TYPE: callable

definition

The step decorator instance.

TYPE: Step

docstring

The docstring of the step function.

TYPE: str

lock

Thread lock for this step.

TYPE: Lock

done

Whether this step has completed execution.

TYPE: bool

successors

Steps that depend on this step.

TYPE: list[FlowStep]

predecessors

Steps that this step depends on.

TYPE: list[FlowStep]

nodes

Execution nodes for this step.

TYPE: list[FlowNode]

Initialize a FlowStep.

PARAMETER DESCRIPTION

step_function

The AST function node representing the step function.

TYPE: callable

step_definition

The step decorator instance that contains the step's configuration.

TYPE: Step

docstring

The docstring of the step function.

TYPE: str

Source code in nextpipe/flow.py
def __init__(
    self,
    step_function: callable,
    step_definition: decorators.Step,
    docstring: str,
):
    """
    Initialize a FlowStep.

    Parameters
    ----------
    step_function : callable
        The AST function node representing the step function.
    step_definition : decorators.Step
        The step decorator instance that contains the step's configuration.
    docstring : str
        The docstring of the step function.
    """

    self.step_function = step_function
    self.definition = step_definition
    self.docstring = docstring
    self.lock = threading.Lock()
    self.done = False
    self.successors: list[FlowStep] = []
    self.predecessors: list[FlowStep] = []
    self.nodes: list[FlowNode] = []

definition instance-attribute

definition = step_definition

docstring instance-attribute

docstring = docstring

done instance-attribute

done = False

lock instance-attribute

lock = Lock()

nodes instance-attribute

nodes: list[FlowNode] = []

predecessors instance-attribute

predecessors: list[FlowStep] = []

step_function instance-attribute

step_function = step_function

successors instance-attribute

successors: list[FlowStep] = []

Runner

Runner(
    spec: FlowSpec,
    graph: FlowGraph,
    config: Configuration,
    uplink: UplinkClient,
)

Handles the execution of a flow graph.

This class is responsible for preparing inputs for steps, creating execution nodes, dispatching jobs to the thread pool, and monitoring execution progress.

PARAMETER DESCRIPTION

spec

The flow specification being executed.

TYPE: FlowSpec

graph

The flow graph to execute.

TYPE: FlowGraph

config

Configuration for the runner.

TYPE: Configuration

uplink

Client for communicating with the Nextmv platform.

TYPE: UplinkClient

ATTRIBUTE DESCRIPTION
spec

The flow specification being executed.

TYPE: FlowSpec

graph

The flow graph to execute.

TYPE: FlowGraph

uplink

Client for communicating with the Nextmv platform.

TYPE: UplinkClient

pool

Thread pool for executing steps.

TYPE: Pool

jobs

List of jobs.

TYPE: list

node_idxs

Dictionary of node indices.

TYPE: dict

fail

Whether the flow has failed.

TYPE: bool

fail_reason

Reason for the flow failure, if any.

TYPE: str or None

lock_fail

Lock for updating fail state.

TYPE: Lock

lock_running

Lock for updating running steps.

TYPE: Lock

Initialize a Runner.

PARAMETER DESCRIPTION

spec

The flow specification being executed.

TYPE: FlowSpec

graph

The flow graph to execute.

TYPE: FlowGraph

config

Configuration for the runner.

TYPE: Configuration

uplink

Client for communicating with the Nextmv platform.

TYPE: UplinkClient

Source code in nextpipe/flow.py
def __init__(
    self,
    spec: FlowSpec,
    graph: FlowGraph,
    config: config.Configuration,
    uplink: uplink.UplinkClient,
):
    """
    Initialize a Runner.

    Parameters
    ----------
    spec : FlowSpec
        The flow specification being executed.
    graph : FlowGraph
        The flow graph to execute.
    config : config.Configuration
        Configuration for the runner.
    uplink : uplink.UplinkClient
        Client for communicating with the Nextmv platform.
    """

    self.spec = spec
    self.graph = graph
    self.uplink = uplink
    self.pool = threads.Pool(config.thread_count)
    self.jobs = []
    self.node_idxs = {}
    self.fail = False
    self.fail_reason = None
    self.lock_fail = threading.Lock()
    self.lock_running = threading.Lock()

fail instance-attribute

fail = False

fail_reason instance-attribute

fail_reason = None

graph instance-attribute

graph = graph

jobs instance-attribute

jobs = []

lock_fail instance-attribute

lock_fail = Lock()

lock_running instance-attribute

lock_running = Lock()

node_idxs instance-attribute

node_idxs = {}

pool instance-attribute

pool = Pool(thread_count)

run

run()

Run the flow.

This method starts the uplink communication, executes the flow steps in the correct order based on dependencies, and handles failures.

RAISES DESCRIPTION
RuntimeError

If the flow execution fails.

Source code in nextpipe/flow.py
def run(self):
    """
    Run the flow.

    This method starts the uplink communication, executes the flow steps
    in the correct order based on dependencies, and handles failures.

    Raises
    ------
    RuntimeError
        If the flow execution fails.
    """

    # Start communicating updates to the platform
    try:
        self.uplink.submit_update(self.graph._to_uplink_dto())
        self.uplink.run_async()
    except Exception as e:
        self.uplink.terminate()
        utils.log_internal(f"Failed to update graph with platform: {e}")

    # Start running the flow
    open_steps: set[FlowStep] = set(self.graph.start_steps)
    running_steps: set[FlowStep] = set()
    closed_steps: set[FlowStep] = set()

    # Run the steps in parallel
    while open_steps or running_steps:
        while True:
            # Get the first step from the open steps which has all its predecessors done
            step = next(iter(filter(lambda n: all(p in closed_steps for p in n.predecessors), open_steps)), None)
            if step is None:
                # No more steps to run at this point. Wait for the remaining tasks to finish.
                break
            open_steps.remove(step)
            # Skip the step if it is optional and the condition is not met
            if step.definition.skip():
                utils.log_internal(f"Skipping step {step.definition.get_id()}")
                # Create dummy node
                node = FlowNode(step, 0)
                node.status = STATUS_SUCCEEDED
                node.result = None
                step.nodes.append(node)
                closed_steps.add(step)
                open_steps.update(step.successors)
                self.uplink.submit_update(self.graph._to_uplink_dto())
                continue
            # Run the node asynchronously
            with self.lock_running:
                running_steps.add(step)
            inputs = self.__prepare_inputs(step)
            for i, input in enumerate(inputs):
                node = FlowNode(step, i)
                job = self.__create_job(node, input)
                self.pool.run(job)
                step.nodes.append(node)
                self.uplink.submit_update(self.graph._to_uplink_dto())

        # Wait until at least one task is done
        task_done = False
        while not task_done:
            time.sleep(0.1)
            # Check if any steps are done, if not, keep waiting
            done_steps = []
            with self.lock_running:
                done_steps = [step for step in running_steps if step.done]
                task_done = True
            for step in done_steps:
                # Remove step and mark successors as ready by adding them to the open list.
                with self.lock_running:
                    running_steps.remove(step)
                closed_steps.add(step)
                open_steps.update(step.successors)
            # Raise an exception if the flow failed
            with self.lock_fail:
                if self.fail:
                    # Issue cancel to all nodes
                    for step in running_steps:
                        for node in step.nodes:
                            node.cancel = True
                            node.status = STATUS_FAILED
                    # Submitting the final state and terminating uplink causes the last
                    # update to be send to the platform (reflecting the final state).
                    self.uplink.submit_update(self.graph._to_uplink_dto())
                    self.uplink.terminate()  # This will issue the final update.
                    raise RuntimeError(f"Flow failed: {self.fail_reason}")

    # Terminate uplink
    self.uplink.terminate()

spec instance-attribute

spec = spec
uplink = uplink

STATUS_FAILED module-attribute

STATUS_FAILED = 'failed'

Status constant indicating a failed step or node.

STATUS_PENDING module-attribute

STATUS_PENDING = 'pending'

Status constant indicating a pending step or node.

STATUS_RUNNING module-attribute

STATUS_RUNNING = 'running'

Status constant indicating a running step or node.

STATUS_SUCCEEDED module-attribute

STATUS_SUCCEEDED = 'succeeded'

Status constant indicating a successfully completed step or node.

StepVisitor

StepVisitor(steps: list[FlowStep], flow_class: type)

Bases: NodeVisitor

AST visitor for finding step functions in a flow class.

This visitor traverses the abstract syntax tree of a flow class and identifies all methods decorated with @step.

PARAMETER DESCRIPTION

steps

List to collect found steps.

TYPE: list[FlowStep]

flow_class

The flow class to visit.

TYPE: type

ATTRIBUTE DESCRIPTION
steps

List to collect found steps.

TYPE: list[FlowStep]

flow_class

The flow class being visited.

TYPE: type

Initialize a StepVisitor.

PARAMETER DESCRIPTION

steps

List to collect found steps.

TYPE: list[FlowStep]

flow_class

The flow class to visit.

TYPE: type

Source code in nextpipe/flow.py
def __init__(self, steps: list[FlowStep], flow_class: type):
    """
    Initialize a StepVisitor.

    Parameters
    ----------
    steps : list[FlowStep]
        List to collect found steps.
    flow_class : type
        The flow class to visit.
    """

    self.steps = steps
    self.flow_class = flow_class
    super().__init__()

flow_class instance-attribute

flow_class = flow_class

steps instance-attribute

steps = steps

visit_FunctionDef

visit_FunctionDef(step_function)

Visit a function definition node in the AST.

If the function has a step decorator, it will be added to the steps list.

PARAMETER DESCRIPTION
step_function

The function definition node.

TYPE: FunctionDef

Source code in nextpipe/flow.py
def visit_FunctionDef(self, step_function):
    """
    Visit a function definition node in the AST.

    If the function has a step decorator, it will be added to the steps list.

    Parameters
    ----------
    step_function : ast.FunctionDef
        The function definition node.
    """

    func = getattr(self.flow_class, step_function.name)
    if hasattr(func, "is_step"):
        self.steps.append(FlowStep(step_function, func.step, func.__doc__))