Reference
This project’s aim is to use the lightest-weight, lowest-overhead, lowest latency
method to achieve parallelism of arbitrary Python code, and make it natively async for Trio.
Given that Python (and CPython in particular) has ongoing difficulties parallelizing
CPU-bound work in threads, this package dispatches synchronous function execution to
subprocesses. However, this project is not fundamentally constrained by that,
and will be considering subinterpreters, or any other avenue as they become available.
Running CPU-bound functions in parallel
The main interface for trio-parallel
is run_sync()
:
-
await trio_parallel.run_sync(sync_fn, *args, cancellable=False, limiter=None)
Run sync_fn(*args)
in a separate process and return/raise it’s outcome.
This function is intended to enable the following:
Circumventing the GIL to run CPU-bound functions in parallel
Making blocking APIs or infinite loops truly cancellable through
SIGKILL/TerminateProcess without leaking resources
Protecting the main process from unstable/crashy code
Currently, this is a wrapping of multiprocessing.Process
that
follows the API of trio.to_thread.run_sync()
.
Other multiprocessing
features may work but are not officially
supported, and all the normal multiprocessing
caveats apply.
To customize worker behavior, use open_worker_context()
.
The underlying workers are cached LIFO and reused to minimize latency.
Global state of the workers is not stable between and across calls.
- Parameters
sync_fn – An importable or pickleable synchronous callable. See the
multiprocessing
documentation for detailed explanation of
limitations.
*args – Positional arguments to pass to sync_fn. If you need keyword
arguments, use functools.partial()
.
cancellable (bool) – Whether to allow cancellation of this operation.
Cancellation always involves abrupt termination of the worker process
with SIGKILL/TerminateProcess. To obtain correct semantics with CTRL+C,
SIGINT is ignored when raised in workers.
limiter (None, or trio.CapacityLimiter) – An object used to limit the number of simultaneous processes. Most
commonly this will be a CapacityLimiter
, but any async
context manager will succeed.
- Returns
Any – Whatever sync_fn(*args)
returns.
- Raises
-
A minimal program that dispatches work with run_sync()
looks like this:
import trio, trio_parallel
from operator import add
async def parallel_add():
return await trio_parallel.run_sync(add, 1, 2)
# Guard against our workers trying to recursively start workers on startup
if __name__ == '__main__':
assert add(1, 2) == trio.run(parallel_add) == 3
Just like that, you’ve dispatched a CPU-bound synchronous function to a worker
subprocess and returned the result! However, only doing this much is a bit pointless;
we are just expending the startup time of a whole python process to achieve the same
result that we could have gotten synchronously. To take advantage, some other task
needs to be able to run concurrently:
import trio, trio_parallel, time
async def check_scheduling_latency():
for _ in range(10):
t0 = trio.current_time()
await trio.lowlevel.checkpoint()
print(trio.current_time() - t0)
async def amain():
async with trio.open_nursery() as nursery:
nursery.start_soon(check_scheduling_latency)
await trio_parallel.run_sync(time.sleep, 1)
if __name__ == "__main__":
trio.run(amain)
The output of this script indicates that the Trio event loop is running smoothly.
Still, this doesn’t demonstrate much advantage over trio.to_thread.run_sync()
.
You can see for yourself by substituting the function calls, since the call
signatures are intentionally identical.
No, trio-parallel
really shines when your function has significant CPU-intensive
work that regularly involves the python interpreter:
import trio, trio_parallel, time
def loop(i=0):
deadline = time.perf_counter() + 1
# Arbitrary CPU-bound work
while time.perf_counter() < deadline:
i += 1
print("Loops completed:", i)
async def amain():
async with trio.open_nursery() as nursery:
for i in range(4):
nursery.start_soon(trio_parallel.run_sync, loop)
if __name__ == "__main__":
trio.run(amain)
This script should output a roughly equal number of loops completed for each process,
as opposed to the lower and unbalanced number you might observe using threads.
As with Trio threads, these processes are cached to minimize latency and resource
usage. Despite this, executing a function in a process can take orders of magnitude
longer than in a thread when dealing with large arguments or a cold cache.
import trio, trio_parallel
async def amain():
t0 = trio.current_time()
await trio_parallel.run_sync(bool)
t1 = trio.current_time()
await trio_parallel.run_sync(bool)
t2 = trio.current_time()
await trio_parallel.run_sync(bytearray, 10**8)
t3 = trio.current_time()
print("Cold cache latency:", t1-t0)
print("Warm cache latency:", t2-t1)
print("IPC latency/MB:", (t3-t2)/10**2)
if __name__ == '__main__':
trio.run(amain)
Therefore, we recommend avoiding worker process dispatch
for synchronous functions with an expected duration of less than about 1 ms.
Controlling Concurrency
By default, trio-parallel
will cache as many workers as the system has CPUs
(as reported by os.cpu_count()
), allowing fair, maximal, truly-parallel
dispatch of CPU-bound work in the vast majority of cases. There are two ways to modify
this behavior. The first is the limiter
argument of run_sync()
, which
permits you to limit the concurrency of a specific function dispatch. In some cases,
it may be useful to modify the default limiter, which will affect all run_sync()
calls.
-
trio_parallel.current_default_worker_limiter()
Get the default CapacityLimiter
used by
trio_parallel.run_sync()
.
The most common reason to call this would be if you want to modify its
total_tokens
attribute. This attribute
is initialized to the number of CPUs reported by os.cpu_count()
.
Cancellation and Exceptions
Unlike threads, subprocesses are strongly isolated from the parent process, which
allows two important features that cannot be portably implemented in threads:
Forceful cancellation: a deadlocked call or infinite loop can be cancelled
by completely terminating the process.
Protection from errors: if a call segfaults or an extension module has an
unrecoverable error, the worker may die but the main process will raise
a normal Python exception.
Cancellation
Cancellation of trio_parallel.run_sync()
is modeled after
trio.to_thread.run_sync()
, with a cancellable
keyword argument that
defaults to False
. Entry is an unconditional checkpoint, i.e. regardless of
the value of cancellable
. The only difference in behavior comes upon cancellation
when cancellable=True
. A Trio thread will be abandoned to run in the background
while this package will kill the worker with SIGKILL
/TerminateProcess
:
import trio, trio_parallel, time
def hello_delayed_world():
print("Hello")
time.sleep(1.0)
print("world!")
async def amain():
# warm up thread/process caches
await trio_parallel.run_sync(bool)
await trio.to_thread.run_sync(bool)
with trio.move_on_after(0.5):
await trio_parallel.run_sync(hello_delayed_world, cancellable=True)
with trio.move_on_after(0.5):
await trio.to_thread.run_sync(hello_delayed_world, cancellable=True)
# grace period for abandoned thread
await trio.sleep(0.6)
if __name__ == '__main__':
trio.run(amain)
We recommend to avoid using the cancellation feature
if loss of intermediate results, writes to the filesystem, or shared memory writes
may leave the larger system in an incoherent state.
Exceptions
-
exception trio_parallel.BrokenWorkerError
Raised when a worker fails or dies unexpectedly.
This error is not typically encountered in normal use, and indicates a severe
failure of either trio-parallel
or the code that was executing in the worker.
Some example failures may include segfaults, being killed by an external signal,
or failing to cleanly shut down within a specified grace_period
. (See
atexit_shutdown_grace_period()
and open_worker_context()
.)
Configuring workers
By default, trio_parallel.run_sync()
draws workers from a global cache
that is shared across sequential and between concurrent trio.run()
calls, with workers’ lifetimes limited to the life of the main process. This
covers most use cases, but for the many edge cases, open_worker_context()
yields a WorkerContext
object on which WorkerContext.run_sync()
pulls workers
from an isolated cache with behavior specified by the class arguments. It is only
advised to use this if specific control over worker type, state, or
lifetime is required.
-
async with trio_parallel.open_worker_context(idle_timeout=600.0, init=<class 'bool'>, retire=<class 'bool'>, grace_period=30.0, worker_type=WorkerType.SPAWN) as ctx
Create a new, customized worker context with isolated workers.
The context will automatically wait for any running workers to become idle when
exiting the scope. Since this wait cannot be cancelled, it is more convenient to
only pass the context object to tasks that cannot outlive the scope, for example,
by using a Nursery
.
- Parameters
idle_timeout (float) – The time in seconds an idle worker will
wait for a CPU-bound job before shutting down and releasing its own
resources. Pass math.inf
to wait forever. MUST be non-negative.
init (Callable[[], bool]) – An object to call within the worker before waiting for jobs.
This is suitable for initializing worker state so that such stateful logic
does not need to be included in functions passed to
WorkerContext.run_sync()
. MUST be callable without arguments.
retire (Callable[[], bool]) – An object to call within the worker after executing a CPU-bound job.
The return value indicates whether worker should be retired (shut down.)
By default, workers are never retired.
The process-global environment is stable between calls. Among other things,
that means that storing state in global variables works.
MUST be callable without arguments.
grace_period (float) – The time in seconds to wait in when closing for workers to
exit before issuing SIGKILL/TerminateProcess and raising BrokenWorkerError
.
Pass math.inf
to wait forever. MUST be non-negative.
worker_type (WorkerType) – The kind of worker to create, see WorkerType
.
- Raises
ValueError | TypeError – if an invalid value is passed for an argument, such as a
negative timeout.
BrokenWorkerError – if a worker does not shut down cleanly when exiting the scope.
-
class trio_parallel.WorkerContext
A reification of a context where workers have a custom configuration.
Instances of this class are to be created using open_worker_context()
,
and cannot be directly instantiated. The arguments to open_worker_context()
that created an instance are available for inspection as read-only attributes.
This class provides a statistics()
method, which returns an object with the
following fields:
-
await run_sync(sync_fn, *args, cancellable=False, limiter=None)
Run sync_fn(*args)
in a separate process and return/raise it’s outcome.
Behaves according to the customized attributes of the context. See
trio_parallel.run_sync()
for details.
- Raises
trio.ClosedResourceError – if this method is run on a closed context
One typical use case for configuring workers is to set a policy for taking a worker
out of service. For this, use the retire
argument. This example shows how to
build (trivial) stateless and stateful worker retirement policies.
import trio, trio_parallel, os
def worker(i):
print(i, "hello from", os.getpid())
def after_single_use():
return True
WORKER_HAS_BEEN_USED = False
def after_dual_use():
global WORKER_HAS_BEEN_USED
if WORKER_HAS_BEEN_USED:
return True # retire
else:
WORKER_HAS_BEEN_USED = True
return False # don't retire... YET
async def amain():
trio_parallel.current_default_worker_limiter().total_tokens = 4
print("single use worker behavior:")
async with trio_parallel.open_worker_context(retire=after_single_use) as ctx:
async with trio.open_nursery() as nursery:
for i in range(10):
nursery.start_soon(ctx.run_sync, worker, i)
print("dual use worker behavior:")
async with trio_parallel.open_worker_context(retire=after_dual_use) as ctx:
async with trio.open_nursery() as nursery:
for i in range(10):
nursery.start_soon(ctx.run_sync, worker, i)
print("default behavior:")
async with trio.open_nursery() as nursery:
for i in range(10):
nursery.start_soon(trio_parallel.run_sync, worker, i)
if __name__ == "__main__":
trio.run(amain)
A more realistic use-case might examine the worker process’s memory usage (e.g. with
psutil) and retire if usage is too high.
If you are retiring workers frequently, like in the single-use case, a large amount
of process startup overhead will be incurred with the default worker type. If your
platform supports it, an alternate WorkerType
might cut that overhead down.
-
class trio_parallel.WorkerType
An Enum of available kinds of workers.
Instances of this Enum can be passed to open_worker_context()
to customize
worker startup behavior.
Currently, these correspond to the values of
multiprocessing.get_all_start_methods()
, which vary by platform.
WorkerType.SPAWN
is the default and is supported on all platforms.
WorkerType.FORKSERVER
is available on POSIX platforms and could be an
optimization if workers need to be killed/restarted often.
WorkerType.FORK
is available on POSIX for experimentation, but not
recommended.
Internal Esoterica
You probably won’t use these… but create an issue if you do and need help!
-
trio_parallel.atexit_shutdown_grace_period(grace_period=- 1.0)
Return and optionally set the default worker cache shutdown grace period.
You might need this if you have a long-running atexit
function, such as those
installed by coverage.py
or viztracer
.
This only affects the atexit
behavior of the default context corresponding to
trio_parallel.run_sync()
. Existing and future WorkerContext
instances
are unaffected.
- Parameters
grace_period (float) – The time in seconds to wait for workers to
exit before issuing SIGKILL/TerminateProcess and raising BrokenWorkerError
.
Pass math.inf
to wait forever. Pass a negative value or no argument
to return the current value without modifying it.
- Returns
float – The current grace period in seconds.
Note
This function is subject to threading race conditions.
-
trio_parallel.default_context_statistics()
Return the statistics corresponding to the default context.
Because the default context used by trio_parallel.run_sync
is a private
implementation detail, this function serves to provide public access to the default
context statistics object.
Note
The statistics are only eventually consistent in the case of multiple trio
threads concurrently using trio_parallel.run_sync
.
Release history
trio-parallel 1.0.0 (2021-12-04)
Bugfixes
Fixed a hang on failed worker subprocess spawns that mostly occurred upon
accidental multiprocessing recursive spawn. (#167)
Fixed a hang on Windows when trying to use WorkerContext.run_sync()
in sequential
and concurrent Trio runs. (#171)
trio-parallel 1.0.0b0 (2021-11-12)
With this release I consider the project “feature complete”.
trio-parallel 1.0.0a2 (2021-10-08)
trio-parallel 1.0.0a1 (2021-09-05)
trio-parallel 1.0.0a0 (2021-07-22)
trio-parallel 0.5.1 (2021-05-05)
trio-parallel 0.5.0 (2021-05-02)
Bugfixes
Workers are now fully synchronized with only pipe/channel-like objects, making it impossible to leak semaphores. (#33)
Fix a regression of a rare race condition where idle workers shut down cleanly but appear broken. (#43)
Ensure a clean worker shutdown if IPC pipes are closed (#51)
trio-parallel 0.4.0 (2021-03-25)
trio-parallel 0.3.0 (2021-02-21)
Bugfixes
Fixed an underlying race condition in IPC. Not a critical bugfix, as it should not be triggered in practice. (#15)
Reduce the production of zombie children on Unix systems (#20)
Close internal race condition when waiting for subprocess exit codes on macOS. (#23)
Avoid a race condition leading to deadlocks when a worker process is killed right after receiving work. (#25)
trio-parallel 0.2.0 (2021-02-02)
Bugfixes
Changed subprocess context to explicitly always spawn new processes (#5)
Changed synchronization scheme to achieve full passing tests on
Windows, Linux, MacOS
CPython 3.6, 3.7, 3.8, 3.9
Pypy 3.6, 3.7, 3.7-nightly
Note Pypy on Windows is not supported here or by Trio (#10)