Skip to content

Utilities Module

This module contains utility functions and helpers used throughout the Nextpipe library.

utils

Utility functions for nextpipe.

This module provides utility functions used throughout the nextpipe framework for logging, function wrapping, and AST parsing.

FUNCTION DESCRIPTION
log

Log a message with step context.

log_internal

Log an internal nextpipe message.

wrap_func

Wrap a function to unpack arguments.

convert_to_string_values

Convert all dictionary values to strings.

get_ast_root

Find the AST root of a given object.

THREAD_NAME_PREFIX module-attribute

THREAD_NAME_PREFIX = 'nextpipe-'

str: Prefix for thread names created by nextpipe.

convert_to_string_values

convert_to_string_values(
    input_dict: dict[str, Any],
) -> dict[str, str]

Converts all values of the given dictionary to strings.

This utility is useful when working with configuration objects where string representation of values is required.

PARAMETER DESCRIPTION

input_dict

The dictionary with values to convert.

TYPE: dict[str, Any]

RETURNS DESCRIPTION
dict[str, str]

A new dictionary with the same keys but all values converted to strings.

Examples:

>>> from nextpipe.utils import convert_to_string_values
>>> d = {'a': 1, 'b': 2.0, 'c': True}
>>> convert_to_string_values(d)
{'a': '1', 'b': '2.0', 'c': 'True'}
Source code in nextpipe/utils.py
def convert_to_string_values(input_dict: dict[str, Any]) -> dict[str, str]:
    """
    Converts all values of the given dictionary to strings.

    This utility is useful when working with configuration objects where
    string representation of values is required.

    Parameters
    ----------
    input_dict : dict[str, Any]
        The dictionary with values to convert.

    Returns
    -------
    dict[str, str]
        A new dictionary with the same keys but all values converted to strings.

    Examples
    --------
    >>> from nextpipe.utils import convert_to_string_values
    >>> d = {'a': 1, 'b': 2.0, 'c': True}
    >>> convert_to_string_values(d)
    {'a': '1', 'b': '2.0', 'c': 'True'}
    """

    return {key: str(value) for key, value in input_dict.items()}

get_ast_root

get_ast_root(obj: object) -> ClassDef

Find the root AST of the given object.

This function determines whether the code is running in a Jupyter notebook or a normal Python module, and calls the appropriate helper function to find the AST node representing the class definition of the provided object.

PARAMETER DESCRIPTION

obj

The object whose class definition AST node to find.

TYPE: object

RETURNS DESCRIPTION
ClassDef

The AST node representing the class definition.

RAISES DESCRIPTION
ValueError

If running in a notebook and the class definition cannot be found.

IndexError

If not running in a notebook and the class definition cannot be found.

Examples:

>>> from nextpipe.utils import get_ast_root
>>> class Example:
...     pass
>>> obj = Example()
>>> root = get_ast_root(obj)
>>> isinstance(root, ast.ClassDef)
True
>>> root.name
'Example'
Source code in nextpipe/utils.py
def get_ast_root(obj: object) -> ast.ClassDef:
    """
    Find the root AST of the given object.

    This function determines whether the code is running in a Jupyter notebook
    or a normal Python module, and calls the appropriate helper function to
    find the AST node representing the class definition of the provided object.

    Parameters
    ----------
    obj : object
        The object whose class definition AST node to find.

    Returns
    -------
    ast.ClassDef
        The AST node representing the class definition.

    Raises
    ------
    ValueError
        If running in a notebook and the class definition cannot be found.
    IndexError
        If not running in a notebook and the class definition cannot be found.

    Examples
    --------
    >>> from nextpipe.utils import get_ast_root
    >>> class Example:
    ...     pass
    >>> obj = Example()
    >>> root = get_ast_root(obj)
    >>> isinstance(root, ast.ClassDef)
    True
    >>> root.name
    'Example'
    """

    if __is_running_in_notebook():
        return __get_notebook_ast_root(obj)
    else:
        return __get_normal_ast_root(obj)

log

log(message: str) -> None

Logs a message using stderr. Furthermore, prepends the name of the calling function if it is a step.

You can import the log function directly from nextpipe:

from nextpipe import log

This function is useful for debugging and providing execution information during pipeline runs.

PARAMETER DESCRIPTION

message

The message to log.

TYPE: str

Examples:

>>> from nextpipe import log
>>> log("Processing data")
Processing data
>>> # In a step function, it would show:
>>> # [step_name] Processing data
Source code in nextpipe/utils.py
def log(message: str) -> None:
    """
    Logs a message using stderr. Furthermore, prepends the name of the calling function if it is a step.

    You can import the `log` function directly from `nextpipe`:

    ```python
    from nextpipe import log
    ```

    This function is useful for debugging and providing execution information during pipeline runs.

    Parameters
    ----------
    message : str
        The message to log.

    Examples
    --------
    >>> from nextpipe import log
    >>> log("Processing data")
    Processing data
    >>> # In a step function, it would show:
    >>> # [step_name] Processing data
    """

    step_name = __get_step_name()
    if step_name:
        print(f"[{step_name}] {message}", file=sys.stderr)
    else:
        print(message, file=sys.stderr)

log_internal

log_internal(message: str) -> None

Logs a message using stderr.

This function is used internally by the nextpipe framework to log messages with a consistent prefix.

PARAMETER DESCRIPTION

message

The message to log.

TYPE: str

Examples:

>>> from nextpipe.utils import log_internal
>>> log_internal("Pipeline initialized")
[nextpipe] Pipeline initialized
Source code in nextpipe/utils.py
def log_internal(message: str) -> None:
    """
    Logs a message using stderr.

    This function is used internally by the nextpipe framework to log messages
    with a consistent prefix.

    Parameters
    ----------
    message : str
        The message to log.

    Examples
    --------
    >>> from nextpipe.utils import log_internal
    >>> log_internal("Pipeline initialized")
    [nextpipe] Pipeline initialized
    """

    print(f"[nextpipe] {message}", file=sys.stderr)

wrap_func

wrap_func(function)

Wraps the given function in a new function that unpacks the arguments given as a tuple.

This utility is used internally by nextpipe to pass arguments to functions that are executed in threads.

PARAMETER DESCRIPTION

function

The function to wrap.

TYPE: callable

RETURNS DESCRIPTION
callable

A new function that unpacks its arguments and calls the original function.

Examples:

>>> from nextpipe.utils import wrap_func
>>> def add(a, b):
...     return a + b
>>> wrapped = wrap_func(add)
>>> wrapped(([1, 2], {}))
3
Source code in nextpipe/utils.py
def wrap_func(function):
    """
    Wraps the given function in a new function that unpacks the arguments given as a tuple.

    This utility is used internally by nextpipe to pass arguments to functions
    that are executed in threads.

    Parameters
    ----------
    function : callable
        The function to wrap.

    Returns
    -------
    callable
        A new function that unpacks its arguments and calls the original function.

    Examples
    --------
    >>> from nextpipe.utils import wrap_func
    >>> def add(a, b):
    ...     return a + b
    >>> wrapped = wrap_func(add)
    >>> wrapped(([1, 2], {}))
    3
    """

    @wraps(function)
    def func_wrapper(args):
        return function(*args[0], **args[1])

    return func_wrapper