Asyncio¶
Spinach allows to define and run tasks as asyncio coroutines. In this mode the worker is a single thread that runs all tasks asynchronously. This allows for greater concurrency as well as compatibility with the asyncio ecosystem.
Creating async tasks¶
To define an asynchronous task, just prefix its definition with the async
keyword:
@spin.task(name='compute')
async def compute(a, b):
await asyncio.sleep(1)
print('Computed {} + {} = {}'.format(a, b, a + b))
To run the workers in asynchronous mode, pass the AsyncioWorkers
class to start_workers
:
from spinach import AsyncioWorkers
spin.start_workers(number=256, workers_class=AsyncioWorkers)
When using the asyncio workers, the number
argument can be set quite high because each worker
is just a coroutine, consuming a negligible amount of resources.
Scheduling jobs¶
Because internally only workers are asyncio aware, jobs are still sent to Redis using a blocking socket. This means that to schedule jobs from asynchronous code, care must be taken to send jobs from outside the event loop. This can be achieve using asyncio.to_thread:
await asyncio.to_thread(spin.schedule, compute, 2, 4)
Code scheduling a lot of jobs should use batches to improve performance.
Example¶
import aiohttp
from spinach import Engine, MemoryBroker, Batch, AsyncioWorkers
spin = Engine(MemoryBroker())
@spin.task(name='get_pokemon_name')
async def get_pokemon_name(pokemon_id: int):
"""Call an HTTP API to retrieve a pokemon name by its ID."""
url = f'https://pokeapi.co/api/v2/pokemon/{pokemon_id}'
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
pokemon = await response.json()
print(f'Pokemon #{pokemon_id} is {pokemon["name"]}')
# Schedule a batch of 150 tasks to retrieve the name of the
# first 150 pokemons.
batch = Batch()
for pokemon_id in range(1, 151):
batch.schedule(get_pokemon_name, pokemon_id)
spin.schedule_batch(batch)
# Start the asyncio workers and process the tasks
spin.start_workers(
number=256,
workers_class=AsyncioWorkers,
stop_when_queue_empty=True
)
Note
If an application defines both sync and async tasks, each kind of task should go in its own queue so that sync tasks are picked by threaded workers and async tasks by asyncio workers.
Note
Not all contrib integrations may work with asynchronous workers.