Threads Module¶
This module contains threading and concurrency functionality.
threads
¶
Thread management utilities for parallel job execution.
This module provides classes for managing multithreaded job execution.
CLASS | DESCRIPTION |
---|---|
Job |
A callable job that can be submitted to a thread pool. |
Pool |
A thread pool for executing jobs in parallel. |
The module also provides a thread local storage variable for thread-specific data. |
|
Job
¶
Job(
target: Callable,
start_callback: Callable,
done_callback: Callable,
args: Optional[list] = None,
name: str = None,
reference: Any = None,
)
A callable job that can be submitted to a thread pool.
This class represents a job that can be executed by a worker thread in a thread pool. It wraps a target callable with arguments and callback functions that are executed when the job starts and completes.
PARAMETER | DESCRIPTION |
---|---|
|
The function to be executed by the job.
TYPE:
|
|
Function to be called when the job starts. Takes the job as a parameter.
TYPE:
|
|
Function to be called when the job completes. Takes the job as a parameter.
TYPE:
|
|
Arguments to be passed to the target function, by default None.
TYPE:
|
|
Name for the job, by default None.
TYPE:
|
|
Reference to any object associated with this job, by default None.
TYPE:
|
ATTRIBUTE | DESCRIPTION |
---|---|
target |
The function to be executed by the job.
TYPE:
|
start_callback |
Function to be called when the job starts.
TYPE:
|
done_callback |
Function to be called when the job completes.
TYPE:
|
args |
Arguments to be passed to the target function.
TYPE:
|
name |
Name for the job.
TYPE:
|
reference |
Reference to any object associated with this job.
TYPE:
|
done |
Whether the job has completed.
TYPE:
|
result |
The result of the job execution.
TYPE:
|
error |
Any exception that occurred during job execution.
TYPE:
|
Examples:
>>> def my_task(x, y):
... return x + y
>>> def on_start(job):
... print(f"Starting job {job.name}")
>>> def on_done(job):
... print(f"Job {job.name} completed with result: {job.result}")
>>> job = Job(
... target=my_task,
... start_callback=on_start,
... done_callback=on_done,
... args=[5, 10],
... name="addition_job"
... )
Source code in nextpipe/threads.py
run
¶
Execute the job's target function.
Executes the target function with the provided arguments (if any) and sets the done flag to True. The result of the execution is stored in the result attribute.
Source code in nextpipe/threads.py
Pool
¶
Pool(max_threads: int = 0)
A thread pool for executing jobs in parallel.
This class provides a thread pool for executing jobs in parallel. The number of threads is limited by the max_threads parameter. Jobs are queued and executed as threads become available.
PARAMETER | DESCRIPTION |
---|---|
|
Maximum number of threads to use, by default 0 (uses CPU count).
TYPE:
|
ATTRIBUTE | DESCRIPTION |
---|---|
max_threads |
Maximum number of threads in the pool.
TYPE:
|
counter |
Counter used to assign unique IDs to threads.
TYPE:
|
waiting |
Dictionary of jobs waiting to be executed, keyed by thread ID.
TYPE:
|
running |
Dictionary of running thread objects, keyed by thread ID.
TYPE:
|
error |
Any exception that occurred during job execution.
TYPE:
|
lock |
Lock for thread synchronization.
TYPE:
|
cond |
Condition variable for thread synchronization.
TYPE:
|
Examples:
>>> def task(x):
... return x * 2
>>> def on_start(job):
... print(f"Starting job")
>>> def on_done(job):
... print(f"Result: {job.result}")
>>> pool = Pool(max_threads=4)
>>> for i in range(10):
... job = Job(
... target=task,
... start_callback=on_start,
... done_callback=on_done,
... args=[i]
... )
... pool.run(job)
>>> pool.join() # Wait for all jobs to finish
Initialize a thread pool.
PARAMETER | DESCRIPTION |
---|---|
|
Maximum number of threads to use, by default 0. If <= 0, uses the number of CPU cores.
TYPE:
|
Source code in nextpipe/threads.py
join
¶
Wait for all jobs to finish.
This method blocks until all submitted jobs have completed execution. It's useful to ensure all parallelized work is completed before proceeding.
Examples:
>>> pool = Pool(max_threads=4)
>>> # Submit jobs to the pool
>>> for i in range(10):
... job = Job(...)
... pool.run(job)
>>> # Wait for all jobs to complete
>>> pool.join()
>>> print("All jobs completed")
Source code in nextpipe/threads.py
run
¶
Submit a job to the thread pool for execution.
The job is first placed in the waiting queue and then assigned to a worker thread when one becomes available. If all threads are busy, the method will wait until a thread becomes available.
PARAMETER | DESCRIPTION |
---|---|
|
The job to be executed.
TYPE:
|
Notes
Jobs are executed in the order they are submitted, but the completion order may vary depending on execution time.
Source code in nextpipe/threads.py
wait_one
¶
Wait for one job to finish.
This method blocks until at least one job completes execution. It's useful when you want to process completed jobs as they finish without waiting for all jobs to complete.
Source code in nextpipe/threads.py
thread_local
module-attribute
¶
Thread-local storage.
This variable provides thread-specific storage that can be used to store data that is specific to a particular thread.