trio-parallel: CPU parallelism for Trio¶
Do you have CPU-bound work that just keeps slowing down your Trio event loop no matter what you try? Do you need to get all those cores humming at once? This is the library for you!
The aim of trio-parallel is to use the lightest-weight, lowest-overhead, lowest-latency method to achieve CPU parallelism of arbitrary Python code with a dead-simple API.
Resources¶
License |
|
Documentation |
|
Chat |
|
Forum |
|
Issues |
|
Repository |
|
Tests |
|
Coverage |
|
Style |
|
Distribution |
Example¶
import functools
import multiprocessing
import trio
import trio_parallel
def loop(n):
# Arbitrary CPU-bound work
for _ in range(n):
pass
print("Loops completed:", n)
async def amain():
t0 = trio.current_time()
async with trio.open_nursery() as nursery:
# Do CPU-bound work in parallel
for i in [6, 7, 8] * 4:
nursery.start_soon(trio_parallel.run_sync, loop, 10 ** i)
# Event loop remains responsive
t1 = trio.current_time()
await trio.sleep(0)
print("Scheduling latency:", trio.current_time() - t1)
# This job could take far too long, make it cancellable!
nursery.start_soon(
functools.partial(
trio_parallel.run_sync, loop, 10 ** 20, cancellable=True
)
)
await trio.sleep(2)
# Only explicitly cancellable jobs are killed on cancel
nursery.cancel_scope.cancel()
print("Total runtime:", trio.current_time() - t0)
if __name__ == "__main__":
multiprocessing.freeze_support()
trio.run(amain)
Additional examples and the full API are available in the documentation.
Features¶
Bypasses the GIL for CPU-bound work
Minimal API complexity
looks and feels like Trio threads
Minimal internal complexity
No reliance on
multiprocessing.Pool
,ProcessPoolExecutor
, or any background threads
Cross-platform
print
just worksSeamless interoperation with
Automatic LIFO caching of subprocesses
Cancel seriously misbehaving code via SIGKILL/TerminateProcess
Convert segfaults and other scary things to catchable errors
FAQ¶
How does trio-parallel run Python code in parallel?¶
Currently, this project is based on multiprocessing
subprocesses and
has all the usual multiprocessing caveats (freeze_support
, pickleable objects
only, executing the __main__
module).
The case for basing these workers on multiprocessing is that it keeps a lot of
complexity outside of the project while offering a set of quirks that users are
likely already familiar with.
The pickling limitations can be partially alleviated by installing cloudpickle.
Can I have my workers talk to each other?¶
This is currently possible through the use of multiprocessing.Manager
,
but we don’t and will not officially support it.
This package focuses on providing
a flat hierarchy of worker subprocesses to run synchronous, CPU-bound functions.
If you are looking to create a nested hierarchy of processes communicating
asynchronously with each other, while preserving the power, safety, and convenience of
structured concurrency, look into tractor.
Or, if you are looking for a more customized solution, try using trio.run_process
to spawn additional Trio runs and have them talk to each other over sockets.
Can I let my workers outlive the main Trio process?¶
No. Trio’s structured concurrency strictly bounds job runs to within a given
trio.run
call, while cached idle workers are shutdown and killed if necessary
by our atexit
handler, so this use case is not supported.
How should I map a function over a collection of arguments?¶
This is fully possible but we leave the implementation of that up to you. Think of us as a loky for your joblib, but natively async and Trionic. We take care of the worker handling so that you can focus on the best concurrency for your application. That said, some example parallelism patterns can be found in the documentation.
Also, look into aiometer?
Contributing¶
If you notice any bugs, need any help, or want to contribute any code, GitHub issues and pull requests are very welcome! Please read the code of conduct.
Navigation¶
Reference¶
This project’s aim is to use the lightest-weight, lowest-overhead, lowest latency method to achieve parallelism of arbitrary Python code, and make it natively async for Trio. Given that Python (and CPython in particular) has ongoing difficulties parallelizing CPU-bound work in threads, this package dispatches synchronous function execution to subprocesses. However, this project is not fundamentally constrained by that, and will be considering subinterpreters, or any other avenue as they become available.
Running CPU-bound functions in parallel¶
The main interface for trio-parallel
is run_sync()
:
- await trio_parallel.run_sync(sync_fn: Callable[[...], T], *args, cancellable: bool = False, limiter: CapacityLimiter | None = None) → T¶
Run
sync_fn(*args)
in a separate process and return/raise its outcome.This function is intended to enable the following:
Circumventing the GIL to run CPU-bound functions in parallel
Making blocking APIs or infinite loops truly cancellable through SIGKILL/TerminateProcess without leaking resources
Protecting the main process from unstable/crashy code
Currently, this is a wrapping of
multiprocessing.Process
that follows the API oftrio.to_thread.run_sync()
. Othermultiprocessing
features may work but are not officially supported, and all the normalmultiprocessing
caveats apply. To customize worker behavior, useopen_worker_context()
.The underlying workers are cached LIFO and reused to minimize latency. Global state of the workers is not stable between and across calls.
- Parameters:
sync_fn – An importable or pickleable synchronous callable. See the
multiprocessing
documentation for detailed explanation of limitations.*args – Positional arguments to pass to sync_fn. If you need keyword arguments, use
functools.partial()
.cancellable (bool) – Whether to allow cancellation of this operation. Cancellation always involves abrupt termination of the worker process with SIGKILL/TerminateProcess. To obtain correct semantics with CTRL+C, SIGINT is ignored when raised in workers.
limiter (None, or trio.CapacityLimiter) – An object used to limit the number of simultaneous processes. Most commonly this will be a
CapacityLimiter
, but any async context manager will succeed.
- Returns:
Any – Whatever
sync_fn(*args)
returns.- Raises:
BaseException – Whatever
sync_fn(*args)
raises.BrokenWorkerError – Indicates the worker died unexpectedly. Not encountered in normal use.
Note
trio_parallel.run_sync()
does not work with functions defined at the REPL
or in a Jupyter notebook cell due to the use of the multiprocessing
spawn
context… unless cloudpickle is also installed!
A minimal program that dispatches work with run_sync()
looks like this:
import trio, trio_parallel
from operator import add
async def parallel_add():
return await trio_parallel.run_sync(add, 1, 2)
# Guard against our workers trying to recursively start workers on startup
if __name__ == '__main__':
assert add(1, 2) == trio.run(parallel_add) == 3
Just like that, you’ve dispatched a CPU-bound synchronous function to a worker subprocess and returned the result! However, only doing this much is a bit pointless; we are just expending the startup time of a whole python process to achieve the same result that we could have gotten synchronously. To take advantage, some other task needs to be able to run concurrently:
import trio, trio_parallel, time
async def check_scheduling_latency():
for _ in range(10):
t0 = trio.current_time()
await trio.lowlevel.checkpoint()
print(trio.current_time() - t0)
async def amain():
async with trio.open_nursery() as nursery:
nursery.start_soon(check_scheduling_latency)
await trio_parallel.run_sync(time.sleep, 1)
if __name__ == "__main__":
trio.run(amain)
The output of this script indicates that the Trio event loop is running smoothly.
Still, this doesn’t demonstrate much advantage over trio.to_thread.run_sync()
.
You can see for yourself by substituting the function calls, since the call
signatures are intentionally identical.
No, trio-parallel
really shines when your function has significant CPU-intensive
work that regularly involves the python interpreter:
import trio, trio_parallel, time
def loop(i=0):
deadline = time.perf_counter() + 1
# Arbitrary CPU-bound work
while time.perf_counter() < deadline:
i += 1
print("Loops completed:", i)
async def amain():
async with trio.open_nursery() as nursery:
for i in range(4):
nursery.start_soon(trio_parallel.run_sync, loop)
if __name__ == "__main__":
trio.run(amain)
This script should output a roughly equal number of loops completed for each process, as opposed to the lower and unbalanced number you might observe using threads.
As with Trio threads, these processes are cached to minimize latency and resource usage. Despite this, executing a function in a process can take orders of magnitude longer than in a thread when dealing with large arguments or a cold cache.
import trio, trio_parallel
async def amain():
t0 = trio.current_time()
await trio_parallel.run_sync(bool)
t1 = trio.current_time()
await trio_parallel.run_sync(bool)
t2 = trio.current_time()
await trio_parallel.run_sync(bytearray, 10**8)
t3 = trio.current_time()
print("Cold cache latency:", t1-t0)
print("Warm cache latency:", t2-t1)
print("IPC latency/MB:", (t3-t2)/10**2)
if __name__ == '__main__':
trio.run(amain)
Therefore, we recommend avoiding worker process dispatch for synchronous functions with an expected duration of less than about 1 ms.
Controlling Concurrency¶
By default, trio-parallel
will cache as many workers as the system has CPUs
(as reported by os.cpu_count()
), allowing fair, maximal, truly-parallel
dispatch of CPU-bound work in the vast majority of cases. There are two ways to modify
this behavior. The first is the limiter
argument of run_sync()
, which
permits you to limit the concurrency of a specific function dispatch. In some cases,
it may be useful to modify the default limiter, which will affect all run_sync()
calls.
- trio_parallel.current_default_worker_limiter()¶
Get the default
CapacityLimiter
used bytrio_parallel.run_sync()
.The most common reason to call this would be if you want to modify its
total_tokens
attribute. This attribute is initialized to the number of CPUs reported byos.cpu_count()
.
Cancellation and Exceptions¶
Unlike threads, subprocesses are strongly isolated from the parent process, which allows two important features that cannot be portably implemented in threads:
Forceful cancellation: a deadlocked call or infinite loop can be cancelled by completely terminating the process.
Protection from errors: if a call segfaults or an extension module has an unrecoverable error, the worker may die but the main process will raise a normal Python exception.
Cancellation¶
Cancellation of trio_parallel.run_sync()
is modeled after
trio.to_thread.run_sync()
, with a cancellable
keyword argument that
defaults to False
. Entry is an unconditional checkpoint, i.e. regardless of
the value of cancellable
. The only difference in behavior comes upon cancellation
when cancellable=True
. A Trio thread will be abandoned to run in the background
while this package will kill the worker with SIGKILL
/TerminateProcess
:
import trio, trio_parallel, time
def hello_delayed_world():
print("Hello")
time.sleep(1.0)
print("world!")
async def amain():
# warm up thread/process caches
await trio_parallel.run_sync(bool)
await trio.to_thread.run_sync(bool)
with trio.move_on_after(0.5):
await trio_parallel.run_sync(hello_delayed_world, cancellable=True)
with trio.move_on_after(0.5):
await trio.to_thread.run_sync(hello_delayed_world, cancellable=True)
# grace period for abandoned thread
await trio.sleep(0.6)
if __name__ == '__main__':
trio.run(amain)
We recommend to avoid using the cancellation feature if loss of intermediate results, writes to the filesystem, or shared memory writes may leave the larger system in an incoherent state.
Exceptions¶
- exception trio_parallel.BrokenWorkerError¶
Raised when a worker fails or dies unexpectedly.
This error is not typically encountered in normal use, and indicates a severe failure of either
trio-parallel
or the code that was executing in the worker. Some example failures may include segfaults, being killed by an external signal, or failing to cleanly shut down within a specifiedgrace_period
. (Seeatexit_shutdown_grace_period()
andopen_worker_context()
.)
Signal Handling¶
This library configures worker processes to ignore SIGINT
to have correct semantics
when you hit CTRL+C
, but all other signal handlers are left in python’s default
state. This can have surprising consequences if you handle signals in the main
process, as the workers are in the same process group but do not share the same
signal handlers. For example, if you handle SIGTERM
in the main process to
achieve a graceful shutdown of a service, a spurious BrokenWorkerError
will
raise at any running calls to run_sync()
. You will either
need to handle the exeptions, change the method you use to send signals, or configure
the workers to handle signals at initialization using the tools in the next section.
Configuring workers¶
By default, trio_parallel.run_sync()
draws workers from a global cache
that is shared across sequential and between concurrent trio.run()
calls, with workers’ lifetimes limited to the life of the main process. This
can be configured with configure_default_context()
:
- trio_parallel.configure_default_context(idle_timeout=600.0, init=<class 'bool'>, retire=<class 'bool'>, grace_period=30.0, worker_type=WorkerType.SPAWN)¶
Configure the default
WorkerContext
parameters associated withrun_sync
.- Parameters:
idle_timeout (float) – The time in seconds an idle worker will wait for a CPU-bound job before shutting down and releasing its own resources. Pass
math.inf
to wait forever. MUST be non-negative.init (Callable[[], bool]) – An object to call within the worker before waiting for jobs. This is suitable for initializing worker state so that such stateful logic does not need to be included in functions passed to
trio_parallel.run_sync()
. MUST be callable without arguments.retire (Callable[[], bool]) – An object to call within the worker after executing a CPU-bound job. The return value indicates whether worker should be retired (shut down.) By default, workers are never retired. The process-global environment is stable between calls. Among other things, that means that storing state in global variables works. MUST be callable without arguments.
grace_period (float) – The time in seconds to wait in the atexit handler for workers to exit before issuing SIGKILL/TerminateProcess and raising
BrokenWorkerError
. Passmath.inf
to wait forever. MUST be non-negative.worker_type (WorkerType) – The kind of worker to create, see
WorkerType
.
- Raises:
RuntimeError – if this function is called outside the main thread.
This covers most use cases, but for the many edge cases, open_worker_context()
yields a WorkerContext
object on which WorkerContext.run_sync()
pulls workers
from an isolated cache with behavior specified by the class arguments. It is only
advised to use this if specific control over worker type, state, or
lifetime is required in a subset of your application.
- async with trio_parallel.open_worker_context(idle_timeout=600.0, init=<class 'bool'>, retire=<class 'bool'>, grace_period=30.0, worker_type=WorkerType.SPAWN) as ctx¶
Create a new, customized worker context with isolated workers.
The context will automatically wait for any running workers to become idle when exiting the scope. Since this wait cannot be cancelled, it is more convenient to only pass the context object to tasks that cannot outlive the scope, for example, by using a
Nursery
.- Parameters:
idle_timeout (float) – The time in seconds an idle worker will wait for a CPU-bound job before shutting down and releasing its own resources. Pass
math.inf
to wait forever. MUST be non-negative.init (Callable[[], bool]) – An object to call within the worker before waiting for jobs. This is suitable for initializing worker state so that such stateful logic does not need to be included in functions passed to
WorkerContext.run_sync()
. MUST be callable without arguments.retire (Callable[[], bool]) – An object to call within the worker after executing a CPU-bound job. The return value indicates whether worker should be retired (shut down.) By default, workers are never retired. The process-global environment is stable between calls. Among other things, that means that storing state in global variables works. MUST be callable without arguments.
grace_period (float) – The time in seconds to wait in
__aexit__
for workers to exit before issuing SIGKILL/TerminateProcess and raisingBrokenWorkerError
. Passmath.inf
to wait forever. MUST be non-negative.worker_type (WorkerType) – The kind of worker to create, see
WorkerType
.
- Raises:
ValueError | TypeError – if an invalid value is passed for an argument, such as a negative timeout.
BrokenWorkerError – if a worker does not shut down cleanly when exiting the scope.
Warning
The callables passed to retire MUST not raise! Doing so will result in a
BrokenWorkerError
at an indeterminate futureWorkerContext.run_sync()
call.
- class trio_parallel.WorkerContext¶
A reification of a context where workers have a custom configuration.
Instances of this class are to be created using
open_worker_context()
, and cannot be directly instantiated. The arguments toopen_worker_context()
that created an instance are available for inspection as read-only attributes.This class provides a
statistics()
method, which returns an object with the following fields:idle_workers
: The number of live workers currently stored in the context’s cache.running_workers
: The number of workers currently executing jobs.
- await run_sync(sync_fn: Callable[[...], T], *args, cancellable: bool = False, limiter: CapacityLimiter = None) → T¶
Run
sync_fn(*args)
in a separate process and return/raise its outcome.Behaves according to the customized attributes of the context. See
trio_parallel.run_sync()
for details.- Raises:
trio.ClosedResourceError – if this method is run on a closed context
One typical use case for configuring workers is to set a policy for taking a worker
out of service. For this, use the retire
argument. This example shows how to
build (trivial) stateless and stateful worker retirement policies.
import trio, trio_parallel, os
def worker(i):
print(i, "hello from", os.getpid())
def after_single_use():
return True
WORKER_HAS_BEEN_USED = False
def after_dual_use():
global WORKER_HAS_BEEN_USED
if WORKER_HAS_BEEN_USED:
return True # retire
else:
WORKER_HAS_BEEN_USED = True
return False # don't retire... YET
async def amain():
trio_parallel.current_default_worker_limiter().total_tokens = 4
print("single use worker behavior:")
async with trio_parallel.open_worker_context(retire=after_single_use) as ctx:
async with trio.open_nursery() as nursery:
for i in range(10):
nursery.start_soon(ctx.run_sync, worker, i)
print("dual use worker behavior:")
async with trio_parallel.open_worker_context(retire=after_dual_use) as ctx:
async with trio.open_nursery() as nursery:
for i in range(10):
nursery.start_soon(ctx.run_sync, worker, i)
print("default behavior:")
async with trio.open_nursery() as nursery:
for i in range(10):
nursery.start_soon(trio_parallel.run_sync, worker, i)
if __name__ == "__main__":
trio.run(amain)
A more realistic use-case might examine the worker process’s memory usage (e.g. with psutil) and retire if usage is too high.
If you are retiring workers frequently, like in the single-use case, a large amount
of process startup overhead will be incurred with the default worker type. If your
platform supports it, an alternate WorkerType
might cut that overhead down.
- class trio_parallel.WorkerType¶
An Enum of available kinds of workers.
Instances of this Enum can be passed to
open_worker_context()
orconfigure_default_context()
to customize worker startup behavior.Currently, these correspond to the values of
multiprocessing.get_all_start_methods()
, which vary by platform.WorkerType.SPAWN
is the default and is supported on all platforms.WorkerType.FORKSERVER
is available on POSIX platforms and could be an optimization if workers need to be killed/restarted often.WorkerType.FORK
is available on POSIX for experimentation, but not recommended.
Internal Esoterica¶
You probably won’t use these… but create an issue if you do and need help!
- trio_parallel.atexit_shutdown_grace_period(grace_period=30.0)¶
Set the default worker cache shutdown grace period.
DEPRECATION NOTICE: this function has been superseded by
configure_default_context
and will be removed in a future versionYou might need this if you have a long-running
atexit
function, such as those installed bycoverage.py
orviztracer
. This only affects theatexit
behavior of the default context corresponding totrio_parallel.run_sync()
. Existing and futureWorkerContext
instances are unaffected.- Parameters:
grace_period (float) – The time in seconds to wait for workers to exit before issuing SIGKILL/TerminateProcess and raising
BrokenWorkerError
. Passmath.inf
to wait forever.
- trio_parallel.default_context_statistics()¶
Return the statistics corresponding to the default context.
Because the default context used by
trio_parallel.run_sync
is a private implementation detail, this function serves to provide public access to the default context statistics object.Note
The statistics are only eventually consistent in the case of multiple trio threads concurrently using
trio_parallel.run_sync
.
Example concurrency patterns¶
Parallel, ordered map and gather¶
import multiprocessing
import random
import trio
import trio_parallel
def twiddle(i):
for j in range(50000):
i *= random.choice((-1, 1))
return i
async def parallel_map(fn, inputs, *args):
results = [None] * len(inputs)
async def worker(j, inp):
results[j] = await trio_parallel.run_sync(fn, inp, *args)
print(j, "done")
async with trio.open_nursery() as nursery:
for i, inp in enumerate(inputs):
nursery.start_soon(worker, i, inp)
return results
if __name__ == "__main__":
multiprocessing.freeze_support()
print(trio.run(parallel_map, twiddle, range(100)))
Async parallel processing pipeline¶
import binascii
import multiprocessing
import time
import secrets
import trio
import trio_parallel
async def to_process_map_as_completed(
sync_fn,
job_aiter,
cancellable=False,
limiter=None,
*,
task_status,
):
if limiter is None:
limiter = trio_parallel.current_default_worker_limiter()
send_chan, recv_chan = trio.open_memory_channel(0)
task_status.started(recv_chan)
async def worker(job_item, task_status):
# Backpressure: hold limiter for entire task to avoid
# spawning too many workers
async with limiter:
task_status.started()
result = await trio_parallel.run_sync(
sync_fn,
*job_item,
cancellable=cancellable,
limiter=trio.CapacityLimiter(1),
)
await send_chan.send(result)
async with send_chan, trio.open_nursery() as nursery:
async for job_item in job_aiter:
await nursery.start(worker, job_item)
async def data_generator(*, task_status, limiter=None):
send_chan, recv_chan = trio.open_memory_channel(0)
task_status.started(recv_chan)
if limiter is None:
limiter = trio_parallel.current_default_worker_limiter()
async with send_chan:
for j in range(100):
# Just pretend this is coming from disk or network
data = secrets.token_hex()
# Inputs MUST be throttled with the SAME limiter as
# the rest of the steps of the pipeline
async with limiter:
await send_chan.send((j, data))
def clean_data(j, data):
time.sleep(secrets.randbelow(2) / 20)
return j, data.replace("deadbeef", "f00dbeef")
def load_data(j, data):
time.sleep(secrets.randbelow(2) / 20)
return j, binascii.unhexlify(data)
def compute(j, data):
time.sleep(secrets.randbelow(2) / 20)
n = 0
for value in data:
if value % 2:
n += 1
return j, n
async def amain():
i = 1
t0 = trio.current_time()
async with trio.open_nursery() as nursery:
data_aiter = await nursery.start(data_generator)
clean_data_aiter = await nursery.start(
to_process_map_as_completed,
clean_data,
data_aiter,
)
loaded_data_aiter = await nursery.start(
to_process_map_as_completed,
load_data,
clean_data_aiter,
)
computational_result_aiter = await nursery.start(
to_process_map_as_completed,
compute,
loaded_data_aiter,
)
async for result in computational_result_aiter:
print(i, (trio.current_time() - t0) / i, *result)
if result[1] <= 9:
print("Winner! after ", trio.current_time() - t0, "seconds")
nursery.cancel_scope.cancel()
i += 1
print("No extra-even bytestrings after ", trio.current_time() - t0, "seconds")
if __name__ == "__main__":
multiprocessing.freeze_support()
trio.run(amain)
Release history¶
trio-parallel 1.2.1 (2023-11-04)¶
Bugfixes¶
Resolved a deprecation warning on python 3.12. (#380)
Deprecations and Removals¶
Although python 3.7 has not been specifically broken, it is no longer tested in CI. (#389)
trio-parallel 1.2.0 (2022-10-29)¶
Features¶
The behavior of the default context is now fully configurable, superseding
atexit_shutdown_grace_period
(#328)
Bugfixes¶
Use tblib lazily to pass tracebacks on user exceptions. Previously, tracebacks would only be passed on the built-in python exceptions. (#332)
trio-parallel 1.1.0 (2022-09-18)¶
Features¶
Bugfixes¶
Deprecations and Removals¶
Removed python 3.6 support (#236)
trio-parallel 1.0.0 (2021-12-04)¶
Bugfixes¶
Fixed a hang on failed worker subprocess spawns that mostly occurred upon accidental multiprocessing recursive spawn. (#167)
Fixed a hang on Windows when trying to use
WorkerContext.run_sync()
in sequential and concurrent Trio runs. (#171)
Improved Documentation¶
Revamped documentation with tested examples. (#168)
trio-parallel 1.0.0b0 (2021-11-12)¶
With this release I consider the project “feature complete”.
Features¶
Added an API to view statistics about a
WorkerContext
, specifically countingidle_workers
andrunning_workers
. (#155)
trio-parallel 1.0.0a2 (2021-10-08)¶
Features¶
Opportunistically use
cloudpickle
to serialize jobs and results. (#115)Timeout arguments of
open_worker_context()
,idle_timeout
andgrace_period
, now work like trio timeouts, accepting any non-negativefloat
value. (#116)Worker process startup is now faster, by importing trio lazily (#117)
open_worker_context()
now returns a context object that can be used to run functions explicitly in a certain context (WorkerContext.run_sync()
) rather than implicitly altering the behavior oftrio_parallel.run_sync()
. (#127)
trio-parallel 1.0.0a1 (2021-09-05)¶
Features¶
Added configuration options for the grace periods permitted to worker caches upon shutdown. This includes a new keyword argument for
open_worker_context()
and a new top level functionatexit_shutdown_grace_period()
. (#108)open_worker_context()
gained a new argument,init
, andretire
is no longer called before the first job in the worker. (#110)
trio-parallel 1.0.0a0 (2021-07-22)¶
Features¶
The behavior and lifetime of worker processes can now be customized with the
open_worker_context()
context manager. (#19)
trio-parallel 0.5.1 (2021-05-05)¶
Bugfixes¶
Remove
__version__
attribute to avoid crash on import when metadata is not available (#55)
trio-parallel 0.5.0 (2021-05-02)¶
Features¶
trio_parallel.BrokenWorkerError
now contains a reference to the underlying worker process which can be inspected e.g. to handle specific exit codes. (#48)
Bugfixes¶
Misc¶
trio-parallel 0.4.0 (2021-03-25)¶
Bugfixes¶
Correctly handle the case where
os.cpu_count
returnsNone
. (#32)Ignore keyboard interrupt (SIGINT) in workers to ensure correct cancellation semantics and clean shutdown on CTRL+C. (#35)
Misc¶
trio-parallel 0.3.0 (2021-02-21)¶
Bugfixes¶
Fixed an underlying race condition in IPC. Not a critical bugfix, as it should not be triggered in practice. (#15)
Reduce the production of zombie children on Unix systems (#20)
Close internal race condition when waiting for subprocess exit codes on macOS. (#23)
Avoid a race condition leading to deadlocks when a worker process is killed right after receiving work. (#25)
Improved Documentation¶
Reorganized documentation for less redundancy and more clarity (#16)