Futured provides a consistent interface for concurrent functional programming in Python. It wraps any callable to return a
concurrent.futures.Future, wraps any async coroutine to return an
asyncio.Future, and provides concurrent iterators and context managers for futures.
Transform any callable into one which runs in a thread or process pool, and returns a future.
from futured import threaded, processed import httpx fetch = threaded(httpx.Client().get) fetch(url) # return Future fs = (fetch(url + path) for path in paths) threaded.results(fs) # generate results from futures threaded.results(fs, timeout=...) # generate results as completed fetch.map(urls) # generate results in order fetch.map(urls, timeout=...) # generate results as completed fetch.mapzip(urls) # generate (url, result) pairs as completed
Thread and process pool executors may be used as context managers, customized with options, and reused with different callables.
threaded(max_workers=...)(func, ...) processed(max_workers=...)(func, ...)
futured classes have a
waiting context manager which collects results from tasks. Futures can be registered at creation, or appended to the list of tasks.
with threaded.waiting(*fs) as tasks: tasks.append(future) tasks # list of completed results
futured classes provide a
tasks interface which generalizes
futures.wait, while allowing the set of tasks to be modified, e.g., for retries.
threaded.tasks(fs, timeout=...) # mutable set of running tasks which iterate as completed
The same interface works for
from futured import asynced import httpx fetch = asynced(httpx.AsyncClient().get) fetch(url) # return coroutine asynced.results(fs) # generate results from futures asynced.results(fs, timeout=...) # generate results as completed fetch.map(urls) # generate results in order fetch.map(urls, timeout=...) # generate results as completed fetch.mapzip(urls) # generate (url, result) pairs as completed
asynced provides utilities for calling coroutines from a synchronous context.
waiting is similar to trio's nursery, but returns results from a synchronous
asynced.run(async_func, ...) # call and run until complete asynced.run(async_gen, ...) # call and run synchronous iterator with asynced.waiting(*fs) as tasks: # concurrent coroutines completed in a block asynced.tasks(fs, timeout=...) # mutable set of running tasks which iterate as completed
futured wrappers can be used as decorators, but arguments can also be partially bound.
@threaded def slow(): ... fetch = threaded(httpx.Client().get, url) fetch(params=...)
Methods are supported, as well as a
decorated utility for automatically subclassing.
from futured import decorated FutureClient = decorated(httpx.Client, request=threaded) # equivalent to class FutureClient(httpx.Client): request = threaded(httpx.Client.request)
subprocess.Popen to provide a
Future compatible interface.
from futured import futured, command command('ls').result() # return stdout or raises stderr command('ls').pipe('wc') # pipes into next command, or | ('wc',... ) for line in command('ls'): # iterable lines command.coroutine('ls') # return coroutine futured(command, 'ls') # supports `map` interface asynced(command.coroutine, 'ls') # supports `map` interface with timeout
forked allows iteration in separate child processes.
from futured import forked for value in forked(values, max_workers=...): # in a child process # in parent after children have exited
% pip install futured
100% branch coverage.
% pytest [--cov]