Reference
This project’s aim is to use the lightest-weight, lowest-overhead, lowest latency method to achieve parallelism of arbitrary Python code, and make it natively async for Trio. Given that Python (and CPython in particular) has ongoing difficulties parallelizing CPU-bound work in threads, this package dispatches synchronous function execution to subprocesses. However, this project is not fundamentally constrained by that, and will be considering subinterpreters, or any other avenue as they become available.
Running CPU-bound functions in parallel
The main interface for trio-parallel
is run_sync()
:
- await trio_parallel.run_sync(sync_fn: Callable[[...], T], *args, kill_on_cancel: bool = False, cancellable: bool = False, limiter: CapacityLimiter = None) T
Run
sync_fn(*args)
in a separate process and return/raise its outcome.This function is intended to enable the following:
Circumventing the GIL to run CPU-bound functions in parallel
Making blocking APIs or infinite loops truly cancellable through SIGKILL/TerminateProcess without leaking resources
Protecting the main process from unstable/crashy code
Currently, this is a wrapping of
multiprocessing.Process
that follows the API oftrio.to_thread.run_sync()
. Othermultiprocessing
features may work but are not officially supported, and all the normalmultiprocessing
caveats apply. To customize worker behavior, useopen_worker_context()
.The underlying workers are cached LIFO and reused to minimize latency. Global state of the workers is not stable between and across calls.
- Parameters:
sync_fn – An importable or pickleable synchronous callable. See the
multiprocessing
documentation for detailed explanation of limitations.*args – Positional arguments to pass to sync_fn. If you need keyword arguments, use
functools.partial()
.kill_on_cancel (bool) – Whether to allow cancellation of this operation. Cancellation always involves abrupt termination of the worker process with SIGKILL/TerminateProcess. To obtain correct semantics with CTRL+C, SIGINT is ignored when raised in workers.
cancellable (bool) – Alias for
kill_on_cancel
. If both aliases are passed, Python’sor
operator combines them.limiter (None, or trio.CapacityLimiter) – An object used to limit the number of simultaneous processes. Most commonly this will be a
CapacityLimiter
, but any async context manager will succeed.
- Returns:
Any – Whatever
sync_fn(*args)
returns.- Raises:
BaseException – Whatever
sync_fn(*args)
raises.BrokenWorkerError – Indicates the worker died unexpectedly. Not encountered in normal use.
Note
trio_parallel.run_sync()
does not work with functions defined at the REPL
or in a Jupyter notebook cell due to the use of the multiprocessing
spawn
context… unless cloudpickle is also installed!
A minimal program that dispatches work with run_sync()
looks like this:
import trio, trio_parallel
from operator import add
async def parallel_add():
return await trio_parallel.run_sync(add, 1, 2)
# Guard against our workers trying to recursively start workers on startup
if __name__ == '__main__':
assert add(1, 2) == trio.run(parallel_add) == 3
Just like that, you’ve dispatched a CPU-bound synchronous function to a worker subprocess and returned the result! However, only doing this much is a bit pointless; we are just expending the startup time of a whole python process to achieve the same result that we could have gotten synchronously. To take advantage, some other task needs to be able to run concurrently:
import trio, trio_parallel, time
async def check_scheduling_latency():
for _ in range(10):
t0 = trio.current_time()
await trio.lowlevel.checkpoint()
print(trio.current_time() - t0)
async def amain():
async with trio.open_nursery() as nursery:
nursery.start_soon(check_scheduling_latency)
await trio_parallel.run_sync(time.sleep, 1)
if __name__ == "__main__":
trio.run(amain)
The output of this script indicates that the Trio event loop is running smoothly.
Still, this doesn’t demonstrate much advantage over trio.to_thread.run_sync()
.
You can see for yourself by substituting the function calls, since the call
signatures are intentionally identical.
No, trio-parallel
really shines when your function has significant CPU-intensive
work that regularly involves the python interpreter:
import trio, trio_parallel, time
def loop(i=0):
deadline = time.perf_counter() + 1
# Arbitrary CPU-bound work
while time.perf_counter() < deadline:
i += 1
print("Loops completed:", i)
async def amain():
async with trio.open_nursery() as nursery:
for i in range(4):
nursery.start_soon(trio_parallel.run_sync, loop)
if __name__ == "__main__":
trio.run(amain)
This script should output a roughly equal number of loops completed for each process, as opposed to the lower and unbalanced number you might observe using threads.
As with Trio threads, these processes are cached to minimize latency and resource usage. Despite this, executing a function in a process can take orders of magnitude longer than in a thread when dealing with large arguments or a cold cache.
import trio, trio_parallel
async def amain():
t0 = trio.current_time()
await trio_parallel.run_sync(bool)
t1 = trio.current_time()
await trio_parallel.run_sync(bool)
t2 = trio.current_time()
await trio_parallel.run_sync(bytearray, 10**8)
t3 = trio.current_time()
print("Cold cache latency:", t1-t0)
print("Warm cache latency:", t2-t1)
print("IPC latency/MB:", (t3-t2)/10**2)
if __name__ == '__main__':
trio.run(amain)
Therefore, we recommend avoiding worker process dispatch for synchronous functions with an expected duration of less than about 1 ms.
Controlling Concurrency
By default, trio-parallel
will cache as many workers as the system has CPUs
(as reported by os.cpu_count()
), allowing fair, maximal, truly-parallel
dispatch of CPU-bound work in the vast majority of cases. There are two ways to modify
this behavior. The first is the limiter
argument of run_sync()
, which
permits you to limit the concurrency of a specific function dispatch. In some cases,
it may be useful to modify the default limiter, which will affect all run_sync()
calls.
- trio_parallel.current_default_worker_limiter()
Get the default
CapacityLimiter
used bytrio_parallel.run_sync()
.The most common reason to call this would be if you want to modify its
total_tokens
attribute. This attribute is initialized to the number of CPUs reported byos.cpu_count()
.
Cancellation and Exceptions
Unlike threads, subprocesses are strongly isolated from the parent process, which allows two important features that cannot be portably implemented in threads:
Forceful cancellation: a deadlocked call or infinite loop can be cancelled by completely terminating the process.
Protection from errors: if a call segfaults or an extension module has an unrecoverable error, the worker may die but the main process will raise a normal Python exception.
Cancellation
Cancellation of trio_parallel.run_sync()
is modeled after
trio.to_thread.run_sync()
, with a kill_on_cancel
keyword argument that
defaults to False
. Entry is an unconditional checkpoint, i.e. regardless of
the value of kill_on_cancel
. The key difference in behavior comes upon cancellation
when kill_on_cancel=True
. A Trio thread will be abandoned to run in the background
while this package will kill the worker with SIGKILL
/TerminateProcess
:
import trio, trio_parallel, time
def hello_delayed_world():
print("Hello")
time.sleep(1.0)
print("world!")
async def amain():
# warm up thread/process caches
await trio_parallel.run_sync(bool)
await trio.to_thread.run_sync(bool)
with trio.move_on_after(0.5):
await trio_parallel.run_sync(hello_delayed_world, kill_on_cancel=True)
with trio.move_on_after(0.5):
await trio.to_thread.run_sync(hello_delayed_world, abandon_on_cancel=True)
# grace period for abandoned thread
await trio.sleep(0.6)
if __name__ == "__main__":
trio.run(amain)
We recommend to avoid using the kill_on_cancel
feature
if loss of intermediate results, writes to the filesystem, or shared memory writes
may leave the larger system in an incoherent state.
Exceptions
- exception trio_parallel.BrokenWorkerError
Raised when a worker fails or dies unexpectedly.
This error is not typically encountered in normal use, and indicates a severe failure of either
trio-parallel
or the code that was executing in the worker. Some example failures may include segfaults, being killed by an external signal, or failing to cleanly shut down within a specifiedgrace_period
. (Seeconfigure_default_context()
andopen_worker_context()
.)
Signal Handling
This library configures worker processes to ignore SIGINT
to have correct semantics
when you hit CTRL+C
, but all other signal handlers are left in python’s default
state. This can have surprising consequences if you handle signals in the main
process, as the workers are in the same process group but do not share the same
signal handlers. For example, if you handle SIGTERM
in the main process to
achieve a graceful shutdown of a service, a spurious BrokenWorkerError
will
raise at any running calls to run_sync()
. You will either
need to handle the exeptions, change the method you use to send signals, or configure
the workers to handle signals at initialization using the tools in the next section.
Configuring workers
By default, trio_parallel.run_sync()
draws workers from a global cache
that is shared across sequential and between concurrent trio.run()
calls, with workers’ lifetimes limited to the life of the main process. This
can be configured with configure_default_context()
:
- trio_parallel.configure_default_context(idle_timeout=600.0, init=<class 'bool'>, retire=<class 'bool'>, grace_period=30.0, worker_type=WorkerType.SPAWN)
Configure the default
WorkerContext
parameters associated withrun_sync
.- Parameters:
idle_timeout (float) – The time in seconds an idle worker will wait for a CPU-bound job before shutting down and releasing its own resources. Pass
math.inf
to wait forever. MUST be non-negative.init (Callable[[], bool]) – An object to call within the worker before waiting for jobs. This is suitable for initializing worker state so that such stateful logic does not need to be included in functions passed to
trio_parallel.run_sync()
. MUST be callable without arguments.retire (Callable[[], bool]) – An object to call within the worker after executing a CPU-bound job. The return value indicates whether worker should be retired (shut down.) By default, workers are never retired. The process-global environment is stable between calls. Among other things, that means that storing state in global variables works. MUST be callable without arguments.
grace_period (float) – The time in seconds to wait in the atexit handler for workers to exit before issuing SIGKILL/TerminateProcess and raising
BrokenWorkerError
. Passmath.inf
to wait forever. MUST be non-negative.worker_type (WorkerType) – The kind of worker to create, see
WorkerType
.
- Raises:
RuntimeError – if this function is called outside the main thread.
This covers most use cases, but for the many edge cases, open_worker_context()
yields a WorkerContext
object on which WorkerContext.run_sync()
pulls workers
from an isolated cache with behavior specified by the class arguments. It is only
advised to use this if specific control over worker type, state, or
lifetime is required in a subset of your application.
- async with trio_parallel.open_worker_context(idle_timeout=600.0, init=<class 'bool'>, retire=<class 'bool'>, grace_period=30.0, worker_type=WorkerType.SPAWN) as ctx
Create a new, customized worker context with isolated workers.
The context will automatically wait for any running workers to become idle when exiting the scope. Since this wait cannot be cancelled, it is more convenient to only pass the context object to tasks that cannot outlive the scope, for example, by using a
Nursery
.- Parameters:
idle_timeout (float) – The time in seconds an idle worker will wait for a CPU-bound job before shutting down and releasing its own resources. Pass
math.inf
to wait forever. MUST be non-negative.init (Callable[[], bool]) – An object to call within the worker before waiting for jobs. This is suitable for initializing worker state so that such stateful logic does not need to be included in functions passed to
WorkerContext.run_sync()
. MUST be callable without arguments.retire (Callable[[], bool]) – An object to call within the worker after executing a CPU-bound job. The return value indicates whether worker should be retired (shut down.) By default, workers are never retired. The process-global environment is stable between calls. Among other things, that means that storing state in global variables works. MUST be callable without arguments.
grace_period (float) – The time in seconds to wait in
__aexit__
for workers to exit before issuing SIGKILL/TerminateProcess and raisingBrokenWorkerError
. Passmath.inf
to wait forever. MUST be non-negative.worker_type (WorkerType) – The kind of worker to create, see
WorkerType
.
- Raises:
ValueError | TypeError – if an invalid value is passed for an argument, such as a negative timeout.
BrokenWorkerError – if a worker does not shut down cleanly when exiting the scope.
Warning
The callables passed to retire MUST not raise! Doing so will result in a
BrokenWorkerError
at an indeterminate futureWorkerContext.run_sync()
call.
- class trio_parallel.WorkerContext
A reification of a context where workers have a custom configuration.
Instances of this class are to be created using
open_worker_context()
, and cannot be directly instantiated. The arguments toopen_worker_context()
that created an instance are available for inspection as read-only attributes.This class provides a
statistics()
method, which returns an object with the following fields:idle_workers
: The number of live workers currently stored in the context’s cache.running_workers
: The number of workers currently executing jobs.
- await run_sync(sync_fn: Callable[[...], T], *args, kill_on_cancel: bool = False, cancellable: bool = False, limiter: CapacityLimiter = None) T
Run
sync_fn(*args)
in a separate process and return/raise its outcome.Behaves according to the customized attributes of the context. See
trio_parallel.run_sync()
for details.- Raises:
trio.ClosedResourceError – if this method is run on a closed context
Alternatively, you can implicitly override the default context of run_sync()
in any subset of the task tree using cache_scope()
. This async context manager
sets an internal TreeVar so that the current task and all nested subtasks operate
using an internal, isolated WorkerContext
, without having to manually pass a
context object around.
- async with trio_parallel.cache_scope(idle_timeout=600.0, init=<class 'bool'>, retire=<class 'bool'>, grace_period=30.0, worker_type=WorkerType.SPAWN)
Override the configuration of
trio_parallel.run_sync()
in this task and all subtasks.The internal
WorkerContext
is passed implicitly down the task tree and can be overridden by nested scopes. ExplicitWorkerContext
objects fromopen_worker_context
will not be overridden.- Parameters:
idle_timeout (float) – The time in seconds an idle worker will wait for a CPU-bound job before shutting down and releasing its own resources. Pass
math.inf
to wait forever. MUST be non-negative.init (Callable[[], bool]) – An object to call within the worker before waiting for jobs. This is suitable for initializing worker state so that such stateful logic does not need to be included in functions passed to
WorkerContext.run_sync()
. MUST be callable without arguments.retire (Callable[[], bool]) – An object to call within the worker after executing a CPU-bound job. The return value indicates whether worker should be retired (shut down.) By default, workers are never retired. The process-global environment is stable between calls. Among other things, that means that storing state in global variables works. MUST be callable without arguments.
grace_period (float) – The time in seconds to wait in
__aexit__
for workers to exit before issuing SIGKILL/TerminateProcess and raisingBrokenWorkerError
. Passmath.inf
to wait forever. MUST be non-negative.worker_type (WorkerType) – The kind of worker to create, see
WorkerType
.
- Raises:
ValueError | TypeError – if an invalid value is passed for an argument, such as a negative timeout.
BrokenWorkerError – if a worker does not shut down cleanly when exiting the scope.
Warning
The callables passed to retire MUST not raise! Doing so will result in a
BrokenWorkerError
at an indeterminate futureWorkerContext.run_sync()
call.
One typical use case for configuring workers is to set a policy for taking a worker
out of service. For this, use the retire
argument. This example shows how to
build (trivial) stateless and stateful worker retirement policies.
import trio, trio_parallel, os
def worker(i):
print(i, "hello from", os.getpid())
def after_single_use():
return True
WORKER_HAS_BEEN_USED = False
def after_dual_use():
global WORKER_HAS_BEEN_USED
if WORKER_HAS_BEEN_USED:
return True # retire
else:
WORKER_HAS_BEEN_USED = True
return False # don't retire... YET
async def amain():
trio_parallel.current_default_worker_limiter().total_tokens = 4
print("single use worker behavior:")
async with trio_parallel.open_worker_context(retire=after_single_use) as ctx:
async with trio.open_nursery() as nursery:
for i in range(10):
nursery.start_soon(ctx.run_sync, worker, i)
print("dual use worker behavior:")
async with trio_parallel.cache_scope(retire=after_dual_use):
async with trio.open_nursery() as nursery:
for i in range(10):
nursery.start_soon(trio_parallel.run_sync, worker, i)
print("default behavior:")
async with trio.open_nursery() as nursery:
for i in range(10):
nursery.start_soon(trio_parallel.run_sync, worker, i)
if __name__ == "__main__":
trio.run(amain)
A more realistic use-case might examine the worker process’s memory usage (e.g. with psutil) and retire if usage is too high.
If you are retiring workers frequently, like in the single-use case, a large amount
of process startup overhead will be incurred with the default “spawn” worker type.
If your platform supports it, an alternate WorkerType
might cut that overhead down.
- class trio_parallel.WorkerType
An Enum of available kinds of workers.
Instances of this Enum can be passed to
open_worker_context()
orconfigure_default_context()
to customize worker startup behavior.Currently, these correspond to the values of
multiprocessing.get_all_start_methods()
, which vary by platform.WorkerType.SPAWN
is the default and is supported on all platforms.WorkerType.FORKSERVER
is available on POSIX platforms and could be an optimization if workers need to be killed/restarted often.WorkerType.FORK
is available on POSIX for experimentation, but not recommended.
Internal Esoterica
You probably won’t use these… but create an issue if you do and need help!
- trio_parallel.default_context_statistics()
Return the statistics corresponding to the default context.
Because the default context used by
trio_parallel.run_sync
is a private implementation detail, this function serves to provide public access to the default context statistics object.Note
The statistics are only eventually consistent in the case of multiple trio threads concurrently using
trio_parallel.run_sync
.