Skip to content

Uplink Module

This module contains functionality for linking with external systems and services.

Module for communicating with the Nextmv platform.

This module provides functionality to communicate pipeline execution status with the Nextmv platform. It includes data classes for modeling the pipeline graph state and a client for updating this state in the platform.

CLASS DESCRIPTION
UplinkConfig

Configuration for the uplink client.

StepDTO

Data Transfer Object representing a pipeline step.

NodeDTO

Data Transfer Object representing a node in the pipeline graph.

FlowDTO

Data Transfer Object representing a flow graph.

FlowUpdateDTO

Data Transfer Object for updating a flow in the platform.

UplinkClient

Client for posting graph and node updates to the platform.

FUNCTION DESCRIPTION
ExcludeIfNone

Helper function for dataclasses_json to exclude None fields.

ENV_APP_ID module-attribute

ENV_APP_ID = 'NEXTMV_APP_ID'

Environment variable name for the application ID.

ENV_RUN_ID module-attribute

ENV_RUN_ID = 'NEXTMV_RUN_ID'

Environment variable name for the run ID.

ExcludeIfNone

ExcludeIfNone(value)

Determine if a value should be excluded from serialization.

This function is used as a helper for dataclasses_json to exclude None values during serialization.

PARAMETER DESCRIPTION

value

The value to check.

TYPE: Any

RETURNS DESCRIPTION
bool

True if the value is None and should be excluded, False otherwise.

Examples:

>>> ExcludeIfNone(None)
True
>>> ExcludeIfNone("something")
False
Source code in nextpipe/uplink.py
def ExcludeIfNone(value):
    """
    Determine if a value should be excluded from serialization.

    This function is used as a helper for dataclasses_json to exclude None values
    during serialization.

    Parameters
    ----------
    value : Any
        The value to check.

    Returns
    -------
    bool
        True if the value is None and should be excluded, False otherwise.

    Examples
    --------
    >>> ExcludeIfNone(None)
    True
    >>> ExcludeIfNone("something")
    False
    """

    return value is None

FAILED_UPDATES_THRESHOLD module-attribute

FAILED_UPDATES_THRESHOLD = 10

Maximum number of consecutive failed updates before termination.

FlowDTO dataclass

FlowDTO(steps: list[StepDTO], nodes: list[NodeDTO])

Data Transfer Object representing a flow graph.

A FlowDTO represents a flow and more importantly its graph and state, including steps and nodes.

PARAMETER DESCRIPTION

steps

Steps in the flow.

TYPE: list[StepDTO]

nodes

Nodes and their current state.

TYPE: list[NodeDTO]

nodes instance-attribute

nodes: list[NodeDTO]

Nodes and their current state.

steps instance-attribute

steps: list[StepDTO]

Steps in the flow.

FlowUpdateDTO dataclass

FlowUpdateDTO(
    pipeline_graph: FlowDTO, updated_at: str = None
)

Data Transfer Object for updating a flow in the platform.

A FlowUpdateDTO represents a flow in the platform, containing the pipeline graph and the timestamp of the update.

PARAMETER DESCRIPTION

pipeline_graph

The graph of the pipeline.

TYPE: FlowDTO

updated_at

Time of the update as an RFC3339 string. Will be set automatically.

TYPE: str DEFAULT: None

pipeline_graph instance-attribute

pipeline_graph: FlowDTO

The graph of the pipeline.

updated_at class-attribute instance-attribute

updated_at: str = None

Time of the update as an RFC3339 string. Will be set automatically.

MAX_DOCS_LENGTH module-attribute

MAX_DOCS_LENGTH = 1000

Maximum length for documentation strings sent to the platform.

NodeDTO dataclass

NodeDTO(
    id: str,
    parent_id: str,
    predecessor_ids: list[str],
    status: str,
    run_id: str = None,
)

Data Transfer Object representing a node in the pipeline graph.

A NodeDTO represents a node in the pipeline execution graph, tracking its status, dependencies, and relationships.

PARAMETER DESCRIPTION

id

The ID of the node.

TYPE: str

parent_id

Parent step ID.

TYPE: str

predecessor_ids

Predecessor nodes via their IDs.

TYPE: list[str]

status

Status of the node.

TYPE: str

run_id

ID of the associated run, if any.

TYPE: str DEFAULT: None

id instance-attribute

id: str

The ID of the node.

parent_id instance-attribute

parent_id: str

Parent step.

predecessor_ids instance-attribute

predecessor_ids: list[str]

Predecessor nodes via their IDs.

run_id class-attribute instance-attribute

run_id: str = field(
    default=None, metadata=config(exclude=ExcludeIfNone)
)

ID of the associated run, if any.

status instance-attribute

status: str

Status of the node.

StepDTO dataclass

StepDTO(
    id: str,
    predecessors: list[str],
    docs: str = None,
    app_id: str = None,
)

Data Transfer Object representing a pipeline step.

A StepDTO represents a step in a pipeline, with its unique identifier, dependencies, documentation, and optional associated application.

PARAMETER DESCRIPTION

id

The ID of the step.

TYPE: str

predecessors

The IDs of the nodes that depend on this node.

TYPE: list[str]

docs

The documentation string of the step.

TYPE: str DEFAULT: None

app_id

The ID of the app this step represents (if any).

TYPE: str DEFAULT: None

app_id class-attribute instance-attribute

app_id: str = field(
    default=None, metadata=config(exclude=ExcludeIfNone)
)

The ID of the app this step represents (if any).

docs class-attribute instance-attribute

docs: str = field(
    default=None, metadata=config(exclude=ExcludeIfNone)
)

The doc string of the step.

id instance-attribute

id: str

The ID of the step.

predecessors instance-attribute

predecessors: list[str]

The IDs of the nodes that depend on this node.

UPDATE_INTERVAL module-attribute

UPDATE_INTERVAL = 5

Interval in seconds between update attempts.

UplinkClient

UplinkClient(client: Client, config: UplinkConfig)

Client for posting graph and node updates to the platform.

This class provides an interface to communicate with the Nextmv platform, posting updates about the pipeline execution status and graph structure.

PARAMETER DESCRIPTION

client

The Nextmv Cloud client.

TYPE: Client

config

The configuration for the uplink client.

TYPE: UplinkConfig

ATTRIBUTE DESCRIPTION
config

The configuration for the uplink client.

TYPE: UplinkConfig

inactive

Whether the client is inactive.

TYPE: bool

client

The Nextmv Cloud client.

TYPE: Client

flow

The current flow.

TYPE: dict or FlowUpdateDTO

changed

Whether the flow has changed and needs to be updated.

TYPE: bool

Initialize the UplinkClient.

PARAMETER DESCRIPTION

client

The Nextmv Cloud client.

TYPE: Client

config

The configuration for the uplink client. If None, configuration is loaded from environment variables.

TYPE: UplinkConfig

Notes

If no application ID or run ID is provided, the client will be marked as inactive and will not send any updates to the platform.

Source code in nextpipe/uplink.py
def __init__(self, client: Client, config: UplinkConfig):
    """
    Initialize the UplinkClient.

    Parameters
    ----------
    client : nextmv.cloud.Client
        The Nextmv Cloud client.
    config : UplinkConfig, optional
        The configuration for the uplink client. If None, configuration is
        loaded from environment variables.

    Notes
    -----
    If no application ID or run ID is provided, the client will be marked as
    inactive and will not send any updates to the platform.
    """

    if config is None:
        # Load config from environment
        config = UplinkConfig(
            application_id=os.environ.get(ENV_APP_ID),
            run_id=os.environ.get(ENV_RUN_ID),
        )
    self.config = config
    self.inactive = False
    if not self.config.application_id or not self.config.run_id:
        self.inactive = True
        self.terminated = True
        log_internal("No application ID or run ID found, uplink is inactive.")
    self.client = client
    self._lock = threading.Lock()
    self.flow = {}
    self.changed = False
    self._terminate = False
    self._terminated = False
    self._updates_failed = 0

changed instance-attribute

changed = False

client instance-attribute

client = client

config instance-attribute

config = config

flow instance-attribute

flow = {}

inactive instance-attribute

inactive = False

run_async

run_async()

Start the uplink client in a separate thread.

This method starts the uplink client in a separate thread, which will post node updates to the platform until terminated. Updates are sent at regular intervals defined by UPDATE_INTERVAL.

If the client is inactive or already terminated, this method returns without starting a new thread.

Source code in nextpipe/uplink.py
def run_async(self):
    """
    Start the uplink client in a separate thread.

    This method starts the uplink client in a separate thread, which will
    post node updates to the platform until terminated. Updates are sent
    at regular intervals defined by UPDATE_INTERVAL.

    If the client is inactive or already terminated, this method returns
    without starting a new thread.
    """

    if self.inactive or self._terminate:
        return

    def run():
        while not self._terminate:
            # Post update, if any
            if self.changed:
                with self._lock:
                    try:
                        self._post_node_update()
                        self.changed = False
                    except Exception as e:
                        # Update failed, keep in pending
                        log_internal(f"Failed to post flow update (#{self._updates_failed}): {e}")
                        self._updates_failed += 1
                        if self._updates_failed > FAILED_UPDATES_THRESHOLD:
                            # Too many failed updates, terminate
                            self._terminate = True
            else:
                self._updates_failed = 0
            # Sleep
            time.sleep(UPDATE_INTERVAL)

        # Signal termination
        self._terminated = True

    threading.Thread(target=run).start()

submit_update

submit_update(flow: FlowUpdateDTO)

Post the full flow and its state to the platform.

This method submits the flow state to be updated in the Nextmv platform. It truncates documentation strings if they exceed the maximum length.

PARAMETER DESCRIPTION
flow

The flow to update.

TYPE: FlowUpdateDTO

RAISES DESCRIPTION
ValueError

If the flow is not a FlowUpdateDTO instance.

Source code in nextpipe/uplink.py
def submit_update(self, flow: FlowUpdateDTO):
    """
    Post the full flow and its state to the platform.

    This method submits the flow state to be updated in the Nextmv platform.
    It truncates documentation strings if they exceed the maximum length.

    Parameters
    ----------
    flow : FlowUpdateDTO
        The flow to update.

    Raises
    ------
    ValueError
        If the flow is not a FlowUpdateDTO instance.
    """

    if self.inactive or self._terminate:
        return
    if not isinstance(flow, FlowUpdateDTO):
        raise ValueError(f"Expected FlowDTO, got {type(flow)}")
    # Truncate docs to a maximum length
    for step in flow.pipeline_graph.steps:
        if step.docs and len(step.docs) > MAX_DOCS_LENGTH:
            step.docs = step.docs[:MAX_DOCS_LENGTH] + "..."
    # Inform the client about the new flow
    with self._lock:
        self.flow = flow
        self.changed = True

terminate

terminate()

Terminate the uplink client gracefully.

This method stops the uplink client's update thread and sends a final update to the platform if there are pending changes. It waits for the thread to terminate before returning.

If the client is inactive, this method returns without taking any action.

Source code in nextpipe/uplink.py
def terminate(self):
    """
    Terminate the uplink client gracefully.

    This method stops the uplink client's update thread and sends a final
    update to the platform if there are pending changes. It waits for the
    thread to terminate before returning.

    If the client is inactive, this method returns without taking any action.
    """

    if self.inactive:
        return

    # Terminate the client
    self._terminate = True
    while not self._terminated:
        time.sleep(0.1)

    # Send final update
    if self._updates_failed > 0:
        log_internal(f"Uplink client is terminating (failed updates: {self._updates_failed})")
    if self.changed:
        try:
            self._post_node_update()
        except Exception:
            pass

terminated instance-attribute

terminated = True

UplinkConfig dataclass

UplinkConfig(application_id: str, run_id: str)

Configuration for the uplink client.

PARAMETER DESCRIPTION

application_id

The ID of the application.

TYPE: str

run_id

The ID of the run.

TYPE: str

application_id instance-attribute

application_id: str

The ID of the application.

run_id instance-attribute

run_id: str

The ID of the run.