Decorators Module¶
This module contains decorator functions used in Nextpipe.
decorators
¶
Decorators for defining pipeline steps and workflows.
This module provides decorators and helper classes for defining pipeline steps and their relationships in a workflow. These decorators are used to annotate functions that represent steps in a pipeline, and to define the order in which they are executed.
CLASS | DESCRIPTION |
---|---|
InputType |
Enumeration of input types for application steps. |
StepType |
Enumeration of step types. |
Step |
Represents a step in a pipeline. |
Needs |
Represents dependencies between steps. |
Optional |
Represents an optional step condition. |
Repeat |
Represents a repeating step. |
Foreach |
Represents a step that fans out its output. |
Join |
Represents a step that joins multiple inputs. |
App |
Represents an external application step. |
FUNCTION | DESCRIPTION |
---|---|
step |
Decorator to mark a function as a step in the pipeline. |
needs |
Decorator to mark the predecessors of a step. |
optional |
Decorator to mark a step as optional. |
repeat |
Decorator to make a step be repeated a number of times. |
foreach |
Decorator to perform a "fanout" operation. |
join |
Decorator to perform a "join" operation. |
app |
Decorator to mark a step as a Nextmv Application. |
App
¶
App(
app_id: str,
instance_id: str = "devint",
input_type: InputType = JSON,
parameters: dict[str, Any] = None,
options: dict[str, Any] = None,
full_result: bool = False,
polling_options: Optional[
PollingOptions
] = _DEFAULT_POLLING_OPTIONS,
)
Represents an external application step.
This class is used by the app
decorator to specify an external
Nextmv Application to run as part of the pipeline.
ATTRIBUTE | DESCRIPTION |
---|---|
app_id |
The ID of the Nextmv Application to run.
TYPE:
|
instance_id |
The ID of the instance to run.
TYPE:
|
options |
The options to pass to the application.
TYPE:
|
input_type |
The type of input to pass to the application (JSON or FILES).
TYPE:
|
full_result |
Whether to return the full result including metadata.
TYPE:
|
polling_options |
Options for polling for the results of the app run.
TYPE:
|
Initialize an App object.
PARAMETER | DESCRIPTION |
---|---|
|
The ID of the Nextmv Application to run.
TYPE:
|
|
The ID of the instance to run, by default "devint".
TYPE:
|
|
The type of input to pass to the application, by default InputType.JSON. |
|
The options to pass to the application, by default None.
TYPE:
|
|
Whether to return the full result including metadata, by default False.
TYPE:
|
|
Options for polling for the results of the app run, by default _DEFAULT_POLLING_OPTIONS.
TYPE:
|
Source code in nextpipe/decorators.py
Foreach
¶
Represents a step that fans out its output.
This class is used by the foreach
decorator to indicate that a step's
output should be spread across multiple instances of the successor step.
Initialize a Foreach object.
Source code in nextpipe/decorators.py
InputType
¶
Bases: Enum
Enumeration of input types for application steps.
This enum defines the possible input types when using the app
decorator.
ATTRIBUTE | DESCRIPTION |
---|---|
JSON |
Indicates that the input to the application is in JSON format.
TYPE:
|
FILES |
Indicates that the input to the application consists of files.
TYPE:
|
Join
¶
Represents a step that joins multiple inputs.
This class is used by the join
decorator to indicate that a step
should receive the outputs of multiple predecessor steps as a list.
Initialize a Join object.
Source code in nextpipe/decorators.py
Needs
¶
Needs(predecessors: list[Callable])
Represents dependencies between steps.
This class is used by the needs
decorator to specify which steps
must be executed before a specific step.
ATTRIBUTE | DESCRIPTION |
---|---|
predecessors |
The steps that must be executed before the decorated step.
TYPE:
|
Initialize a Needs object.
PARAMETER | DESCRIPTION |
---|---|
|
The steps that must be executed before the decorated step.
TYPE:
|
Source code in nextpipe/decorators.py
Optional
¶
Optional(condition: callable)
Represents an optional step condition.
This class is used by the optional
decorator to specify a condition
under which a step should be executed.
ATTRIBUTE | DESCRIPTION |
---|---|
condition |
A function that takes a step and returns a boolean indicating whether the step should be executed or not.
TYPE:
|
Initialize an Optional object.
PARAMETER | DESCRIPTION |
---|---|
|
A function that takes a step and returns a boolean indicating whether the step should be executed or not.
TYPE:
|
Source code in nextpipe/decorators.py
Repeat
¶
Repeat(repetitions: int)
Represents a repeating step.
This class is used by the repeat
decorator to specify how many times
a step should be repeated.
ATTRIBUTE | DESCRIPTION |
---|---|
repetitions |
The number of times to repeat the step.
TYPE:
|
Initialize a Repeat object.
PARAMETER | DESCRIPTION |
---|---|
|
The number of times to repeat the step.
TYPE:
|
Source code in nextpipe/decorators.py
Step
¶
Step(function: callable)
Represents a step in a pipeline.
A step is a function that has been decorated with the @step
decorator.
It can have additional properties set by other decorators like @needs
,
@optional
, @repeat
, @foreach
, @join
, or @app
.
ATTRIBUTE | DESCRIPTION |
---|---|
function |
The function that has been decorated as a step.
TYPE:
|
type |
The type of step (DEFAULT or APP).
TYPE:
|
run_ids |
The IDs of the runs associated with this step.
TYPE:
|
_inputs |
The inputs to the step.
TYPE:
|
_output |
The output of the step.
TYPE:
|
Initialize a Step object.
PARAMETER | DESCRIPTION |
---|---|
|
The function that has been decorated as a step.
TYPE:
|
Source code in nextpipe/decorators.py
get_app_id
¶
Get the ID of the Nextmv Application.
RETURNS | DESCRIPTION |
---|---|
str or None
|
The ID of the Nextmv Application, or None if the step is not a Nextmv Application step. |
Source code in nextpipe/decorators.py
get_id
¶
Get the ID of the step.
RETURNS | DESCRIPTION |
---|---|
str
|
The name of the function that has been decorated as a step. |
get_repetitions
¶
Get the number of times the step should be repeated.
RETURNS | DESCRIPTION |
---|---|
int
|
The number of times the step should be repeated, or 1 if the step should not be repeated. |
Source code in nextpipe/decorators.py
get_run_ids
¶
Get the run IDs for this step.
RETURNS | DESCRIPTION |
---|---|
list[str]
|
The run IDs for this step. |
is_app
¶
Check if the step is a Nextmv Application step.
RETURNS | DESCRIPTION |
---|---|
bool
|
True if the step is a Nextmv Application step, False otherwise. |
is_foreach
¶
Check if the step is a foreach step.
RETURNS | DESCRIPTION |
---|---|
bool
|
True if the step is a foreach step, False otherwise. |
is_join
¶
Check if the step is a join step.
RETURNS | DESCRIPTION |
---|---|
bool
|
True if the step is a join step, False otherwise. |
is_needs
¶
Check if the step has predecessors.
RETURNS | DESCRIPTION |
---|---|
bool
|
True if the step has predecessors, False otherwise. |
is_repeat
¶
Check if the step should be repeated.
RETURNS | DESCRIPTION |
---|---|
bool
|
True if the step should be repeated, False otherwise. |
set_run_ids
¶
set_run_ids(run_ids: list[str])
Set the run IDs for this step.
PARAMETER | DESCRIPTION |
---|---|
|
The run IDs to set.
TYPE:
|
skip
¶
Check if the step should be skipped.
RETURNS | DESCRIPTION |
---|---|
bool
|
True if the step should be skipped, False otherwise. |
StepType
¶
app
¶
app(
app_id: str,
instance_id: str = "devint",
parameters: dict[str, Any] = None,
options: dict[str, Any] = None,
input_type: InputType = JSON,
full_result: bool = False,
polling_options: Optional[
PollingOptions
] = _DEFAULT_POLLING_OPTIONS,
)
Decorator to mark a step as a Nextmv Application (external application) step.
You can import the app
decorator directly from nextpipe
:
If this decorator is used, an external application will be run, using the
specified options. You need to have a valid Nextmv account and
Application before you can use this decorator. Make sure the
NEXTMV_API_KEY
environment variable is set as well.
PARAMETER | DESCRIPTION |
---|---|
|
The ID of the application to run.
TYPE:
|
|
The ID of the instance to run. Default is "devint".
TYPE:
|
|
The options to pass to the application. This is a dictionary of parameter names and values. The values must be JSON serializable.
TYPE:
|
|
The type of input to pass to the application. This can be either JSON or FILES. Default is JSON. |
|
Whether to return the full result of the application run. If this is
set to
TYPE:
|
|
Options for polling for the results of the app run. This is used to
configure the polling behavior, such as the timeout and backoff
options. Default (or when undefined) is the predefined options in the
class itself. Please note that the
TYPE:
|
Example
In this example the step pre_process
is executed first. After
pre-processing is completed, the result is passed to the solve
step. This
step runs a Nextmv Application with the ID echo
. The result of the
application run is passed to the final step post_process
, which
post-processes the result.
from nextpipe import FlowSpec, app, log, needs, step
class Flow(FlowSpec):
@step
def pre_process(input: dict[str, Any]) -> dict[str, Any]:
log("You can pre-process your data here.")
return input
@app(app_id="echo")
@needs(predecessors=[pre_process])
@step
def solve():
pass
@needs(predecessors=[solve])
@step
def post_process(result: dict[str, Any]) -> dict[str, Any]:
log("You can post-process your data here.")
return result
data = {"foo": "bar"}
flow = Flow("DecisionFlow", data)
flow.run()
log(flow.get_result(flow.post_process))
Source code in nextpipe/decorators.py
841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880 881 882 883 884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 906 907 908 909 910 911 912 913 914 915 916 917 918 919 920 921 922 923 924 925 926 927 928 929 930 931 932 933 934 935 936 937 938 939 940 941 942 943 944 945 946 947 948 949 950 951 952 953 954 955 956 957 958 959 |
|
foreach
¶
Decorator to perform a "fanout", which means creating multiple parallel steps out of a single step.
You can import the foreach
decorator directly from nextpipe
:
The function that is decorated should return a list of some sort. Each element of the list is consumed as an input by the successor step. When using this decorator, use parentheses without any parameters.
Example
In this example the step step2
is executed for each element in the list
returned by step1
. The input to step2
is the element of the list.
from nextpipe import FlowSpec, foreach, log, needs, step
class Flow(FlowSpec):
@foreach()
@step
def step1() -> list[dict[str, Any]]:
return [{"input": 1}, {"input": 2}, {"input": 3}]
@needs(predecessors=[step1])
@step
def step2(data: dict) -> None:
log(data)
flow = Flow("DecisionFlow", None)
flow.run()
Source code in nextpipe/decorators.py
join
¶
Decorator to perform a "join", which means collecting the results of multiple parallel predecessor steps into a single step.
You can import the join
decorator directly from nextpipe
:
The outputs of the predecessor steps should be received as a list. The order of the elements in the list is the same as the order of the predecessor steps. Unpack the list to obtain the results and perform processing on them as needed. When using this decorator, use parentheses without any parameters.
Example
In this example the step step3
is executed after step1
and step2
.
The input to step3
is a list containing the outputs of step1
and
step2
.
from nextpipe import FlowSpec, join, log, needs, step
class Flow(FlowSpec):
@step
def step1() -> dict[str, Any]:
return {"input": 1}
@step
def step2() -> dict[str, Any]:
return {"input": 2}
@join()
@needs(predecessors=[step1, step2])
@step
def step3(data: list[dict[str, Any]]) -> None:
log(data)
flow = Flow("DecisionFlow", None)
flow.run()
Source code in nextpipe/decorators.py
needs
¶
needs(predecessors: list[Callable])
Decorator to mark the predecessors of a step.
You can import the needs
decorator directly from nextpipe
:
This is used to determine the order in which the steps are executed. The predecessors are the steps that need to be executed before this actual step can be run.
PARAMETER | DESCRIPTION |
---|---|
|
The list of predecessors
TYPE:
|
Example
In this example steps step1
and step2
are executed before step3
.
from nextpipe import FlowSpec, log, needs, step
class Flow(FlowSpec):
@step
def step1() -> None:
log("Execute step 1")
@step
def step2() -> None:
log("Execute step 2")
@needs(predecessors=[step1, step2])
@step
def step3() -> None:
log("Execute step 3 after steps 1 and 2")
flow = Flow("DecisionFlow", None)
flow.run()
Source code in nextpipe/decorators.py
optional
¶
Decorator to mark a step as optional.
You can import the optional
decorator directly from nextpipe
:
This is used to determine whether the step should be executed or not. The condition is a callable that takes the step as an argument and returns a boolean indicating whether the step should be executed or not. The condition is evaluated at runtime, so it can depend on the runtime state of the pipeline.
PARAMETER | DESCRIPTION |
---|---|
|
The condition to evaluate. This is a callable that takes the step as an argument and returns a boolean indicating whether the step should be executed or not.
TYPE:
|
Example
In this example the step step1
is executed given that the condition is
true.
Source code in nextpipe/decorators.py
repeat
¶
repeat(repetitions: int)
Decorator to make a step be repeated a number of times. The number of repetitions determines how many times the step will be run.
You can import the repeat
decorator directly from nextpipe
:
PARAMETER | DESCRIPTION |
---|---|
|
The number of times to repeat the step.
TYPE:
|
Example
In this example the step step1
is repeated 3 times.
Source code in nextpipe/decorators.py
step
¶
Decorator to mark a function as a step in the pipeline.
You can import the step
decorator directly from nextpipe
:
This is the most basic decorator. This decorator doesn't require any parameters or the use of parentheses.
Example
A simple example shows that a step is executed.