Tasks

A tasks is a unit of code, usually a function, to be executed in the background on remote workers.

To define a task:

from spinach import Tasks

tasks = Tasks()

@tasks.task(name='add')
def add(a, b):
    print('Computed {} + {} = {}'.format(a, b, a + b))

Note

The args and kwargs of a task must be JSON serializable.

Retries

Spinach knows two kinds of tasks: the ones that can be retried safely (idempotent tasks) and the ones that cannot be retried safely (non-idempotent tasks). Since Spinach cannot guess if a task code is safe to be retried multiple times, it must be annotated when the task is created.

Note

Whether a task is retryable or not affects the behavior of jobs in case of normal errors during their execution but also when a worker catastrophically dies (power outage, OOM killed…).

Non-Retryable Tasks

Spinach assumes that by default tasks are not safe to be retried (tasks are assumed to have side effects).

These tasks are defined with max_retries=0 (the default):

@tasks.task(name='foo')
def foo(a, b):
    pass
  • use at-most-once delivery
  • it is guarantied that the job will not run multiple times
  • it is guarantied that the job will not run simultaneously in multiple workers
  • the job is not automatically retried in case of errors
  • the job may never even start in very rare conditions

Retryable Tasks

Idempotent tasks can be executed multiple times without changing the result beyond the initial execution. It is a nice property to have and most tasks should try to be idempotent to gracefully recover from errors.

Retryable tasks are defined with a positive max_retries value:

@tasks.task(name='foo', max_retries=10)
def foo(a, b):
    pass
  • use at-least-once delivery
  • the job is automatically retried, up to max_retries times, in case of errors
  • the job may be executed more than once
  • the job may be executed simultaneously in multiple workers in very rare conditions

When a worker catastrophically dies it will be detected dead after 30 minutes of inactivity and the retryable jobs that were running will be rescheduled automatically.

Retrying

When a retryable task is being executed it will be retried when it encounters an unexpected exception:

@tasks.task(name='foo', max_retries=10)
def foo(a, b):
    l = [0, 1, 2]
    print(l[100])  # Raises IndexError

To allow the system to recover gracefully, a default backoff strategy is applied.

spinach.utils.exponential_backoff(attempt: int, cap: int = 1200) → datetime.timedelta

Calculate a delay to retry using an exponential backoff algorithm.

It is an exponential backoff with random jitter to prevent failures from being retried at the same time. It is a good fit for most applications.

Parameters:
  • attempt – the number of attempts made
  • cap – maximum delay, defaults to 20 minutes

To be more explicit, a task can also raise a RetryException which allows to precisely control when it should be retried:

from spinach import RetryException

@tasks.task(name='foo', max_retries=10)
def foo(a, b):
    if status_code == 429:
        raise RetryException(
            'Should retry in 10 minutes',
            at=datetime.now(tz=timezone.utc) + timedelta(minutes=10)
        )
class spinach.task.RetryException(message, at: Optional[datetime.datetime] = None)

Exception raised in a task to indicate that the job should be retried.

Even if this exception is raised, the max_retries defined in the task still applies.

Parameters:at – Optional date at which the job should be retried. If it is not given the job will be retried after a randomized exponential backoff. It is advised to pass a timezone aware datetime to lift any ambiguity. However if a timezone naive datetime if given, it will be assumed to contain UTC time.

A task can also raise a AbortException for short-circuit behavior:

class spinach.task.AbortException

Exception raised in a task to indicate that the job should NOT be retried.

If this exception is raised, all retry attempts are stopped immediately.

Limiting task concurrency

If a task is idempotent it may also have a limit on the number of concurrent jobs spawned across all workers. These types of tasks are defined with a positive max_concurrency value:

@tasks.task(name='foo', max_retries=10, max_concurrency=1)
def foo(a, b):
    pass

With this definition, no more than one instance of the Task will ever be spawned as a running Job, no matter how many are queued and waiting to run.

Periodic tasks

Tasks marked as periodic get automatically scheduled. To run a task every 5 seconds:

from datetime import timedelta

from spinach import Engine, MemoryBroker

spin = Engine(MemoryBroker())
every_5_sec = timedelta(seconds=5)


@spin.task(name='make_coffee', periodicity=every_5_sec)
def make_coffee():
    print("Making coffee...")


print('Starting workers, ^C to quit')
spin.start_workers()

Periodic tasks get scheduled by the workers themselves, there is no need to run an additional process only for that. Of course having multiple workers on multiple machine is fine and will not result in duplicated tasks.

Periodic tasks run at most every period. If the system scheduling periodic tasks gets delayed, nothing compensates for the time lost. This has the added benefit of periodic tasks not being scheduled if all the workers are down for a prolonged amount of time. When they get back online, workers won’t have a storm of periodic tasks to execute.

Tasks Registry

Before being attached to a Spinach Engine, tasks are created inside a Tasks registry.

Attaching tasks to a Tasks registry instead of directly to the Engine allows to compose large applications in smaller units independent from each other, the same way a Django project is composed of many small Django apps.

This may seem cumbersome for trivial applications, like the examples in this documentation or some single-module projects, so those can create tasks directly on the Engine using:

spin = Engine(MemoryBroker())

@spin.task(name='fast')
def fast():
    time.sleep(1)

Note

Creating tasks directly in the Engine is a bit like creating a Flask app globally instead of using an app factory: it works until a change introduces a circular import. Its usage should really be limited to tiny projects.

class spinach.task.Tasks(queue: Optional[str] = None, max_retries: Optional[numbers.Number] = None, periodicity: Optional[datetime.timedelta] = None, max_concurrency: Optional[int] = None)

Registry for tasks to be used by Spinach.

Parameters:
  • queue – default queue for tasks
  • max_retries – default retry policy for tasks
  • periodicity – for periodic tasks, delay between executions as a timedelta
  • max_concurrency – maximum number of simultaneous Jobs that can be started for this Task. Requires max_retries to be also set.
add(func: Callable, name: Optional[str] = None, queue: Optional[str] = None, max_retries: Optional[numbers.Number] = None, periodicity: Optional[datetime.timedelta] = None, max_concurrency: Optional[int] = None)

Register a task function.

Parameters:
  • func – a callable to be executed
  • name – name of the task, used later to schedule jobs
  • queue – queue of the task, the default is used if not provided
  • max_retries – maximum number of retries, the default is used if not provided
  • periodicity – for periodic tasks, delay between executions as a timedelta
  • max_concurrency – maximum number of simultaneous Jobs that can be started for this Task. Requires max_retries to be also set.
>>> tasks = Tasks()
>>> tasks.add(lambda x: x, name='do_nothing')
schedule(task: Union[str, Callable, spinach.task.Task], *args, **kwargs) → Job

Schedule a job to be executed as soon as possible.

Parameters:
  • task – the task or its name to execute in the background
  • args – args to be passed to the task function
  • kwargs – kwargs to be passed to the task function
Returns:

The Job that was created and scheduled.

This method can only be used once tasks have been attached to a Spinach Engine.

schedule_at(task: Union[str, Callable, spinach.task.Task], at: datetime.datetime, *args, **kwargs) → Job

Schedule a job to be executed in the future.

Parameters:
  • task – the task or its name to execute in the background
  • at – Date at which the job should start. It is advised to pass a timezone aware datetime to lift any ambiguity. However if a timezone naive datetime if given, it will be assumed to contain UTC time.
  • args – args to be passed to the task function
  • kwargs – kwargs to be passed to the task function
Returns:

The Job that was created and scheduled.

This method can only be used once tasks have been attached to a Spinach Engine.

schedule_batch(batch: Batch) → Iterable[Job]

Schedule many jobs at once.

Scheduling jobs in batches allows to enqueue them fast by avoiding round-trips to the broker.

Parameters:batchBatch instance containing jobs to schedule
Returns:The Jobs that were created and scheduled.
task(func: Optional[Callable] = None, name: Optional[str] = None, queue: Optional[str] = None, max_retries: Optional[numbers.Number] = None, periodicity: Optional[datetime.timedelta] = None, max_concurrency: Optional[int] = None)

Decorator to register a task function.

Parameters:
  • name – name of the task, used later to schedule jobs
  • queue – queue of the task, the default is used if not provided
  • max_retries – maximum number of retries, the default is used if not provided
  • periodicity – for periodic tasks, delay between executions as a timedelta
  • max_concurrency – maximum number of simultaneous Jobs that can be started for this Task. Requires max_retries to be also set.
>>> tasks = Tasks()
>>> @tasks.task(name='foo')
>>> def foo():
...    pass

Batch

class spinach.task.Batch

Container allowing to schedule many jobs at once.

Batching the scheduling of jobs allows to avoid doing many round-trips to the broker, reducing the overhead and the chance of errors associated with doing network calls.

In this example 100 jobs are sent to Redis in one call:

>>> batch = Batch()
>>> for i in range(100):
...     batch.schedule('compute', i)
...
>>> spin.schedule_batch(batch)

Once the Batch is passed to the Engine it should be disposed off and not be reused.

schedule(task: Union[str, Callable, spinach.task.Task], *args, **kwargs)

Add a job to be executed ASAP to the batch.

Parameters:
  • task – the task or its name to execute in the background
  • args – args to be passed to the task function
  • kwargs – kwargs to be passed to the task function
schedule_at(task: Union[str, Callable, spinach.task.Task], at: datetime.datetime, *args, **kwargs)

Add a job to be executed in the future to the batch.

Parameters:
  • task – the task or its name to execute in the background
  • at – Date at which the job should start. It is advised to pass a timezone aware datetime to lift any ambiguity. However if a timezone naive datetime if given, it will be assumed to contain UTC time.
  • args – args to be passed to the task function
  • kwargs – kwargs to be passed to the task function