Asyncio

Spinach allows to define and run tasks as asyncio coroutines. In this mode the worker is a single thread that runs all tasks asynchronously. This allows for greater concurrency as well as compatibility with the asyncio ecosystem.

Creating async tasks

To define an asynchronous task, just prefix its definition with the async keyword:

@spin.task(name='compute')
async def compute(a, b):
    await asyncio.sleep(1)
    print('Computed {} + {} = {}'.format(a, b, a + b))

To run the workers in asynchronous mode, pass the AsyncioWorkers class to start_workers:

from spinach import AsyncioWorkers

spin.start_workers(number=256, workers_class=AsyncioWorkers)

When using the asyncio workers, the number argument can be set quite high because each worker is just a coroutine, consuming a negligible amount of resources.

Scheduling jobs

Because internally only workers are asyncio aware, jobs are still sent to Redis using a blocking socket. This means that to schedule jobs from asynchronous code, care must be taken to send jobs from outside the event loop. This can be achieve using asyncio.to_thread:

await asyncio.to_thread(spin.schedule, compute, 2, 4)

Code scheduling a lot of jobs should use batches to improve performance.

Example

import aiohttp
from spinach import Engine, MemoryBroker, Batch, AsyncioWorkers

spin = Engine(MemoryBroker())


@spin.task(name='get_pokemon_name')
async def get_pokemon_name(pokemon_id: int):
    """Call an HTTP API to retrieve a pokemon name by its ID."""
    url = f'https://pokeapi.co/api/v2/pokemon/{pokemon_id}'
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as response:
            pokemon = await response.json()

    print(f'Pokemon #{pokemon_id} is {pokemon["name"]}')


# Schedule a batch of 150 tasks to retrieve the name of the
# first 150 pokemons.
batch = Batch()
for pokemon_id in range(1, 151):
    batch.schedule(get_pokemon_name, pokemon_id)
spin.schedule_batch(batch)

# Start the asyncio workers and process the tasks
spin.start_workers(
    number=256,
    workers_class=AsyncioWorkers,
    stop_when_queue_empty=True
)

Note

If an application defines both sync and async tasks, each kind of task should go in its own queue so that sync tasks are picked by threaded workers and async tasks by asyncio workers.

Note

Not all contrib integrations may work with asynchronous workers.