Reference

This project’s aim is to use the lightest-weight, lowest-overhead, lowest latency method to achieve parallelism of arbitrary Python code, and make it natively async for Trio. Given that Python (and CPython in particular) has ongoing difficulties parallelizing CPU-bound work in threads, this package dispatches synchronous function execution to subprocesses. However, this project is not fundamentally constrained by that, and will be considering subinterpreters, or any other avenue as they become available.

Running CPU-bound functions in parallel

The main interface for trio-parallel is run_sync():

await trio_parallel.run_sync(sync_fn: Callable[[...], T], *args, cancellable: bool = False, limiter: CapacityLimiter | None = None) T

Run sync_fn(*args) in a separate process and return/raise its outcome.

This function is intended to enable the following:

  • Circumventing the GIL to run CPU-bound functions in parallel

  • Making blocking APIs or infinite loops truly cancellable through SIGKILL/TerminateProcess without leaking resources

  • Protecting the main process from unstable/crashy code

Currently, this is a wrapping of multiprocessing.Process that follows the API of trio.to_thread.run_sync(). Other multiprocessing features may work but are not officially supported, and all the normal multiprocessing caveats apply. To customize worker behavior, use open_worker_context().

The underlying workers are cached LIFO and reused to minimize latency. Global state of the workers is not stable between and across calls.

Parameters:
  • sync_fn – An importable or pickleable synchronous callable. See the multiprocessing documentation for detailed explanation of limitations.

  • *args – Positional arguments to pass to sync_fn. If you need keyword arguments, use functools.partial().

  • cancellable (bool) – Whether to allow cancellation of this operation. Cancellation always involves abrupt termination of the worker process with SIGKILL/TerminateProcess. To obtain correct semantics with CTRL+C, SIGINT is ignored when raised in workers.

  • limiter (None, or trio.CapacityLimiter) – An object used to limit the number of simultaneous processes. Most commonly this will be a CapacityLimiter, but any async context manager will succeed.

Returns:

Any – Whatever sync_fn(*args) returns.

Raises:

Note

trio_parallel.run_sync() does not work with functions defined at the REPL or in a Jupyter notebook cell due to the use of the multiprocessing spawn context… unless cloudpickle is also installed!

A minimal program that dispatches work with run_sync() looks like this:

import trio, trio_parallel
from operator import add

async def parallel_add():
    return await trio_parallel.run_sync(add, 1, 2)

# Guard against our workers trying to recursively start workers on startup
if __name__ == '__main__':
    assert add(1, 2) == trio.run(parallel_add) == 3

Just like that, you’ve dispatched a CPU-bound synchronous function to a worker subprocess and returned the result! However, only doing this much is a bit pointless; we are just expending the startup time of a whole python process to achieve the same result that we could have gotten synchronously. To take advantage, some other task needs to be able to run concurrently:

import trio, trio_parallel, time

async def check_scheduling_latency():
    for _ in range(10):
        t0 = trio.current_time()
        await trio.lowlevel.checkpoint()
        print(trio.current_time() - t0)

async def amain():
    async with trio.open_nursery() as nursery:
        nursery.start_soon(check_scheduling_latency)
        await trio_parallel.run_sync(time.sleep, 1)

if __name__ == "__main__":
    trio.run(amain)

The output of this script indicates that the Trio event loop is running smoothly. Still, this doesn’t demonstrate much advantage over trio.to_thread.run_sync(). You can see for yourself by substituting the function calls, since the call signatures are intentionally identical.

No, trio-parallel really shines when your function has significant CPU-intensive work that regularly involves the python interpreter:

import trio, trio_parallel, time

def loop(i=0):
    deadline = time.perf_counter() + 1
    # Arbitrary CPU-bound work
    while time.perf_counter() < deadline:
        i += 1
    print("Loops completed:", i)

async def amain():
    async with trio.open_nursery() as nursery:
        for i in range(4):
            nursery.start_soon(trio_parallel.run_sync, loop)

if __name__ == "__main__":
    trio.run(amain)

This script should output a roughly equal number of loops completed for each process, as opposed to the lower and unbalanced number you might observe using threads.

As with Trio threads, these processes are cached to minimize latency and resource usage. Despite this, executing a function in a process can take orders of magnitude longer than in a thread when dealing with large arguments or a cold cache.

import trio, trio_parallel

async def amain():
    t0 = trio.current_time()
    await trio_parallel.run_sync(bool)
    t1 = trio.current_time()
    await trio_parallel.run_sync(bool)
    t2 = trio.current_time()
    await trio_parallel.run_sync(bytearray, 10**8)
    t3 = trio.current_time()
    print("Cold cache latency:", t1-t0)
    print("Warm cache latency:", t2-t1)
    print("IPC latency/MB:", (t3-t2)/10**2)

if __name__ == '__main__':
    trio.run(amain)

Therefore, we recommend avoiding worker process dispatch for synchronous functions with an expected duration of less than about 1 ms.

Controlling Concurrency

By default, trio-parallel will cache as many workers as the system has CPUs (as reported by os.cpu_count()), allowing fair, maximal, truly-parallel dispatch of CPU-bound work in the vast majority of cases. There are two ways to modify this behavior. The first is the limiter argument of run_sync(), which permits you to limit the concurrency of a specific function dispatch. In some cases, it may be useful to modify the default limiter, which will affect all run_sync() calls.

trio_parallel.current_default_worker_limiter()

Get the default CapacityLimiter used by trio_parallel.run_sync().

The most common reason to call this would be if you want to modify its total_tokens attribute. This attribute is initialized to the number of CPUs reported by os.cpu_count().

Cancellation and Exceptions

Unlike threads, subprocesses are strongly isolated from the parent process, which allows two important features that cannot be portably implemented in threads:

  • Forceful cancellation: a deadlocked call or infinite loop can be cancelled by completely terminating the process.

  • Protection from errors: if a call segfaults or an extension module has an unrecoverable error, the worker may die but the main process will raise a normal Python exception.

Cancellation

Cancellation of trio_parallel.run_sync() is modeled after trio.to_thread.run_sync(), with a cancellable keyword argument that defaults to False. Entry is an unconditional checkpoint, i.e. regardless of the value of cancellable. The only difference in behavior comes upon cancellation when cancellable=True. A Trio thread will be abandoned to run in the background while this package will kill the worker with SIGKILL/TerminateProcess:

import trio, trio_parallel, time

def hello_delayed_world():
    print("Hello")
    time.sleep(1.0)
    print("world!")

async def amain():
    # warm up thread/process caches
    await trio_parallel.run_sync(bool)
    await trio.to_thread.run_sync(bool)

    with trio.move_on_after(0.5):
        await trio_parallel.run_sync(hello_delayed_world, cancellable=True)

    with trio.move_on_after(0.5):
        await trio.to_thread.run_sync(hello_delayed_world, cancellable=True)

    # grace period for abandoned thread
    await trio.sleep(0.6)

if __name__ == '__main__':
    trio.run(amain)

We recommend to avoid using the cancellation feature if loss of intermediate results, writes to the filesystem, or shared memory writes may leave the larger system in an incoherent state.

Exceptions

exception trio_parallel.BrokenWorkerError

Raised when a worker fails or dies unexpectedly.

This error is not typically encountered in normal use, and indicates a severe failure of either trio-parallel or the code that was executing in the worker. Some example failures may include segfaults, being killed by an external signal, or failing to cleanly shut down within a specified grace_period. (See atexit_shutdown_grace_period() and open_worker_context().)

Signal Handling

This library configures worker processes to ignore SIGINT to have correct semantics when you hit CTRL+C, but all other signal handlers are left in python’s default state. This can have surprising consequences if you handle signals in the main process, as the workers are in the same process group but do not share the same signal handlers. For example, if you handle SIGTERM in the main process to achieve a graceful shutdown of a service, a spurious BrokenWorkerError will raise at any running calls to run_sync(). You will either need to handle the exeptions, change the method you use to send signals, or configure the workers to handle signals at initialization using the tools in the next section.

Configuring workers

By default, trio_parallel.run_sync() draws workers from a global cache that is shared across sequential and between concurrent trio.run() calls, with workers’ lifetimes limited to the life of the main process. This can be configured with configure_default_context():

trio_parallel.configure_default_context(idle_timeout=600.0, init=<class 'bool'>, retire=<class 'bool'>, grace_period=30.0, worker_type=WorkerType.SPAWN)

Configure the default WorkerContext parameters associated with run_sync.

Parameters:
  • idle_timeout (float) – The time in seconds an idle worker will wait for a CPU-bound job before shutting down and releasing its own resources. Pass math.inf to wait forever. MUST be non-negative.

  • init (Callable[[], bool]) – An object to call within the worker before waiting for jobs. This is suitable for initializing worker state so that such stateful logic does not need to be included in functions passed to trio_parallel.run_sync(). MUST be callable without arguments.

  • retire (Callable[[], bool]) – An object to call within the worker after executing a CPU-bound job. The return value indicates whether worker should be retired (shut down.) By default, workers are never retired. The process-global environment is stable between calls. Among other things, that means that storing state in global variables works. MUST be callable without arguments.

  • grace_period (float) – The time in seconds to wait in the atexit handler for workers to exit before issuing SIGKILL/TerminateProcess and raising BrokenWorkerError. Pass math.inf to wait forever. MUST be non-negative.

  • worker_type (WorkerType) – The kind of worker to create, see WorkerType.

Raises:

RuntimeError – if this function is called outside the main thread.

Warning

This function is meant to be used once before any usage of run_sync. Doing otherwise may (on POSIX) result in workers being leaked until the main process ends, or (on Win32) having no effect until the next trio.run!

This covers most use cases, but for the many edge cases, open_worker_context() yields a WorkerContext object on which WorkerContext.run_sync() pulls workers from an isolated cache with behavior specified by the class arguments. It is only advised to use this if specific control over worker type, state, or lifetime is required in a subset of your application.

async with trio_parallel.open_worker_context(idle_timeout=600.0, init=<class 'bool'>, retire=<class 'bool'>, grace_period=30.0, worker_type=WorkerType.SPAWN) as ctx

Create a new, customized worker context with isolated workers.

The context will automatically wait for any running workers to become idle when exiting the scope. Since this wait cannot be cancelled, it is more convenient to only pass the context object to tasks that cannot outlive the scope, for example, by using a Nursery.

Parameters:
  • idle_timeout (float) – The time in seconds an idle worker will wait for a CPU-bound job before shutting down and releasing its own resources. Pass math.inf to wait forever. MUST be non-negative.

  • init (Callable[[], bool]) – An object to call within the worker before waiting for jobs. This is suitable for initializing worker state so that such stateful logic does not need to be included in functions passed to WorkerContext.run_sync(). MUST be callable without arguments.

  • retire (Callable[[], bool]) – An object to call within the worker after executing a CPU-bound job. The return value indicates whether worker should be retired (shut down.) By default, workers are never retired. The process-global environment is stable between calls. Among other things, that means that storing state in global variables works. MUST be callable without arguments.

  • grace_period (float) – The time in seconds to wait in __aexit__ for workers to exit before issuing SIGKILL/TerminateProcess and raising BrokenWorkerError. Pass math.inf to wait forever. MUST be non-negative.

  • worker_type (WorkerType) – The kind of worker to create, see WorkerType.

Raises:
  • ValueError | TypeError – if an invalid value is passed for an argument, such as a negative timeout.

  • BrokenWorkerError – if a worker does not shut down cleanly when exiting the scope.

Warning

The callables passed to retire MUST not raise! Doing so will result in a BrokenWorkerError at an indeterminate future WorkerContext.run_sync() call.

class trio_parallel.WorkerContext

A reification of a context where workers have a custom configuration.

Instances of this class are to be created using open_worker_context(), and cannot be directly instantiated. The arguments to open_worker_context() that created an instance are available for inspection as read-only attributes.

This class provides a statistics() method, which returns an object with the following fields:

  • idle_workers: The number of live workers currently stored in the context’s cache.

  • running_workers: The number of workers currently executing jobs.

await run_sync(sync_fn: Callable[[...], T], *args, cancellable: bool = False, limiter: CapacityLimiter = None) T

Run sync_fn(*args) in a separate process and return/raise its outcome.

Behaves according to the customized attributes of the context. See trio_parallel.run_sync() for details.

Raises:

trio.ClosedResourceError – if this method is run on a closed context

One typical use case for configuring workers is to set a policy for taking a worker out of service. For this, use the retire argument. This example shows how to build (trivial) stateless and stateful worker retirement policies.

import trio, trio_parallel, os


def worker(i):
    print(i, "hello from", os.getpid())


def after_single_use():
    return True


WORKER_HAS_BEEN_USED = False


def after_dual_use():
    global WORKER_HAS_BEEN_USED
    if WORKER_HAS_BEEN_USED:
        return True  # retire
    else:
        WORKER_HAS_BEEN_USED = True
        return False  # don't retire... YET


async def amain():
    trio_parallel.current_default_worker_limiter().total_tokens = 4

    print("single use worker behavior:")
    async with trio_parallel.open_worker_context(retire=after_single_use) as ctx:
        async with trio.open_nursery() as nursery:
            for i in range(10):
                nursery.start_soon(ctx.run_sync, worker, i)

    print("dual use worker behavior:")
    async with trio_parallel.open_worker_context(retire=after_dual_use) as ctx:
        async with trio.open_nursery() as nursery:
            for i in range(10):
                nursery.start_soon(ctx.run_sync, worker, i)

    print("default behavior:")
    async with trio.open_nursery() as nursery:
        for i in range(10):
            nursery.start_soon(trio_parallel.run_sync, worker, i)


if __name__ == "__main__":
    trio.run(amain)

A more realistic use-case might examine the worker process’s memory usage (e.g. with psutil) and retire if usage is too high.

If you are retiring workers frequently, like in the single-use case, a large amount of process startup overhead will be incurred with the default worker type. If your platform supports it, an alternate WorkerType might cut that overhead down.

class trio_parallel.WorkerType

An Enum of available kinds of workers.

Instances of this Enum can be passed to open_worker_context() or configure_default_context() to customize worker startup behavior.

Currently, these correspond to the values of multiprocessing.get_all_start_methods(), which vary by platform. WorkerType.SPAWN is the default and is supported on all platforms. WorkerType.FORKSERVER is available on POSIX platforms and could be an optimization if workers need to be killed/restarted often. WorkerType.FORK is available on POSIX for experimentation, but not recommended.

Internal Esoterica

You probably won’t use these… but create an issue if you do and need help!

trio_parallel.atexit_shutdown_grace_period(grace_period=30.0)

Set the default worker cache shutdown grace period.

DEPRECATION NOTICE: this function has been superseded by configure_default_context and will be removed in a future version

You might need this if you have a long-running atexit function, such as those installed by coverage.py or viztracer. This only affects the atexit behavior of the default context corresponding to trio_parallel.run_sync(). Existing and future WorkerContext instances are unaffected.

Parameters:

grace_period (float) – The time in seconds to wait for workers to exit before issuing SIGKILL/TerminateProcess and raising BrokenWorkerError. Pass math.inf to wait forever.

trio_parallel.default_context_statistics()

Return the statistics corresponding to the default context.

Because the default context used by trio_parallel.run_sync is a private implementation detail, this function serves to provide public access to the default context statistics object.

Note

The statistics are only eventually consistent in the case of multiple trio threads concurrently using trio_parallel.run_sync.