Tasks

A tasks is a unit of code, usually a function, to be executed in the background on remote workers.

To define a task:

from spinach import Tasks

tasks = Tasks()

@tasks.task(name='add')
def add(a, b):
    print('Computed {} + {} = {}'.format(a, b, a + b))

Note

The args and kwargs of a task must be JSON serializable.

Retries

Spinach knows two kinds of tasks: the ones that can be retried safely (idempotent tasks) and the ones that cannot be retried safely (non-idempotent tasks). Since Spinach cannot guess if a task code is safe to be retried multiple times, it must be annotated when the task is created.

Non-Retriable Tasks

Spinach assumes that by default tasks are not safe to be retried (tasks are assumed to have side effects).

These tasks are defined with max_retries=0 (the default):

@tasks.task(name='foo')
def foo(a, b):
    pass
  • use at-most-once delivery, the job may never even start
  • jobs are not automatically retried in case of errors

Retriable Tasks

Idempotent tasks can be executed multiple times without changing the result beyond the initial application. It is a nice property to have and most tasks should try to be idempotent to gracefully recover from errors.

Retriable tasks are defined with a positive max_retries value:

@tasks.task(name='foo', max_retries=10)
def foo(a, b):
    pass
  • use at-least-once delivery, the job may be executed more than once
  • jobs are automatically retried, up to max_retries times, in case of errors

Retrying

When a retriable task is being executed it will be retried when it encounters an unexpected exception:

@tasks.task(name='foo', max_retries=10)
def foo(a, b):
    l = [0, 1, 2]
    print(l[100])  # Raises IndexError

To allow the system to recover gracefully, a default backoff strategy is applied.

spinach.utils.exponential_backoff(attempt: int, cap: int = 1200) → datetime.timedelta

Calculate a delay to retry using an exponential backoff algorithm.

It is an exponential backoff with random jitter to prevent failures from being retried at the same time. It is a good fit for most applications.

Parameters:
  • attempt – the number of attempts made
  • cap – maximum delay, defaults to 20 minutes

To be more explicit, a task can also raise a RetryException which allows to precisely control when it should be retried:

from spinach import RetryException

@tasks.task(name='foo', max_retries=10)
def foo(a, b):
    if status_code == 429:
        raise RetryException(
            'Should retry in 10 minutes',
            at=datetime.now(tz=timezone.utc) + timedelta(minutes=10)
        )
class spinach.task.RetryException(message, at: Optional[datetime.datetime] = None)

Exception raised in a task to indicate that the job should be retried.

Even if this exception is raised, the max_retries defined in the task still applies.

Parameters:at – Optional date at which the job should be retried. If it is not given the job will be retried after a randomized exponential backoff. It is advised to pass a timezone aware datetime to lift any ambiguity. However if a timezone naive datetime if given, it will be assumed to contain UTC time.

A task can also raise a AbortException for short-circuit behavior:

.. autoclass:: spinach.task.AbortException

Periodic tasks

Tasks marked as periodic get automatically scheduled. To run a task every 5 seconds:

from datetime import timedelta

from spinach import Engine, MemoryBroker

spin = Engine(MemoryBroker())
every_5_sec = timedelta(seconds=5)


@spin.task(name='make_coffee', periodicity=every_5_sec)
def make_coffee():
    print("Making coffee...")


print('Starting workers, ^C to quit')
spin.start_workers()

Periodic tasks get scheduled by the workers themselves, there is no need to run an additional process only for that. Of course having multiple workers on multiple machine is fine and will not result in duplicated tasks.

Periodic tasks run at most every period. If the system scheduling periodic tasks gets delayed, nothing compensates for the time lost. This has the added benefit of periodic tasks not being scheduled if all the workers are down for a prolonged amount of time. When they get back online, workers won’t have a storm of periodic tasks to execute.

Tasks Registry

Before being attached to a Spinach Engine, tasks are created inside a Tasks registry.

Attaching tasks to a Tasks registry instead of directly to the Engine allows to compose large applications in smaller units independent from each other, the same way a Django project is composed of many small Django apps.

This may seem cumbersome for trivial applications, like the examples in this documentation or some single-module projects, so those can create tasks directly on the Engine using:

spin = Engine(MemoryBroker())

@spin.task(name='fast')
def fast():
    time.sleep(1)

Note

Creating tasks directly in the Engine is a bit like creating a Flask app globally instead of using an app factory: it works until a change introduces a circular import. Its usage should really be limited to tiny projects.

class spinach.task.Tasks(queue: Optional[str] = None, max_retries: Optional[numbers.Number] = None, periodicity: Optional[datetime.timedelta] = None)

Registry for tasks to be used by Spinach.

Parameters:
  • queue – default queue for tasks
  • max_retries – default retry policy for tasks
  • periodicity – for periodic tasks, delay between executions as a timedelta
add(func: Callable, name: Optional[str] = None, queue: Optional[str] = None, max_retries: Optional[numbers.Number] = None, periodicity: Optional[datetime.timedelta] = None)

Register a task function.

Parameters:
  • func – a callable to be executed
  • name – name of the task, used later to schedule jobs
  • queue – queue of the task, the default is used if not provided
  • max_retries – maximum number of retries, the default is used if not provided
  • periodicity – for periodic tasks, delay between executions as a timedelta
>>> tasks = Tasks()
>>> tasks.add(lambda x: x, name='do_nothing')
schedule(task: Union[str, Callable, spinach.task.Task], *args, **kwargs)

Schedule a job to be executed as soon as possible.

Parameters:
  • task – the task or its name to execute in the background
  • args – args to be passed to the task function
  • kwargs – kwargs to be passed to the task function

This method can only be used once tasks have been attached to a Spinach Engine.

schedule_at(task: Union[str, Callable, spinach.task.Task], at: datetime.datetime, *args, **kwargs)

Schedule a job to be executed in the future.

Parameters:
  • task – the task or its name to execute in the background
  • at – Date at which the job should start. It is advised to pass a timezone aware datetime to lift any ambiguity. However if a timezone naive datetime if given, it will be assumed to contain UTC time.
  • args – args to be passed to the task function
  • kwargs – kwargs to be passed to the task function

This method can only be used once tasks have been attached to a Spinach Engine.

schedule_batch(batch: spinach.task.Batch)

Schedule many jobs at once.

Scheduling jobs in batches allows to enqueue them fast by avoiding round-trips to the broker.

Parameters:batchBatch instance containing jobs to schedule
task(func: Optional[Callable] = None, name: Optional[str] = None, queue: Optional[str] = None, max_retries: Optional[numbers.Number] = None, periodicity: Optional[datetime.timedelta] = None)

Decorator to register a task function.

Parameters:
  • name – name of the task, used later to schedule jobs
  • queue – queue of the task, the default is used if not provided
  • max_retries – maximum number of retries, the default is used if not provided
  • periodicity – for periodic tasks, delay between executions as a timedelta
>>> tasks = Tasks()
>>> @tasks.task(name='foo')
>>> def foo():
...    pass

Batch

class spinach.task.Batch

Container allowing to schedule many jobs at once.

Batching the scheduling of jobs allows to avoid doing many round-trips to the broker, reducing the overhead and the chance of errors associated with doing network calls.

In this example 100 jobs are sent to Redis in one call:

>>> batch = Batch()
>>> for i in range(100):
...     batch.schedule('compute', i)
...
>>> spin.schedule_batch(batch)

Once the Batch is passed to the Engine it should be disposed off and not be reused.

schedule(task: Union[str, Callable, spinach.task.Task], *args, **kwargs)

Add a job to be executed ASAP to the batch.

Parameters:
  • task – the task or its name to execute in the background
  • args – args to be passed to the task function
  • kwargs – kwargs to be passed to the task function
schedule_at(task: Union[str, Callable, spinach.task.Task], at: datetime.datetime, *args, **kwargs)

Add a job to be executed in the future to the batch.

Parameters:
  • task – the task or its name to execute in the background
  • at – Date at which the job should start. It is advised to pass a timezone aware datetime to lift any ambiguity. However if a timezone naive datetime if given, it will be assumed to contain UTC time.
  • args – args to be passed to the task function
  • kwargs – kwargs to be passed to the task function