Tasks

Collection of tasks.

The debusine.tasks module hierarchy hosts a collection of Task that are used by workers to fulfill WorkRequest sent by the debusine scheduler.

Creating a new task requires adding a new file containing a class inheriting from the Task base class. The name of the class must be unique among all child classes.

A child class must, at the very least, override the Task.execute() method.

class debusine.tasks.Task[source]

Base class for tasks.

A Task object serves two purpose: encapsulating the logic of what needs to be done to execute the task (cf configure() and execute() that are run on a worker), and supporting the scheduler by determining if a task is suitable for a given worker. That is done in a two-step process, collating metadata from each worker (with the analyze_worker() method that is run on a worker) and then, based on this metadata, see if a task is suitable (with can_run_on() that is executed on the scheduler).

TASK_DATA_SCHEMA = {}

Can be overridden to enable jsonschema validation of the task_data parameter passed to configure().

TASK_VERSION = None

Must be overridden by child classes to document the current version of the task’s code. A task will only be scheduled on a worker if its task version is the same as the one running on the scheduler.

__init__()[source]

Initialize the task.

abort()[source]

Task does not need to be executed. Once aborted cannot be changed.

property aborted

Return if the task is aborted.

Tasks cannot transition from aborted -> not-aborted.

analyze_worker()dict[source]

Return dynamic metadata about the current worker.

This method is called on the worker to collect information about the worker. The information is stored as a set of key-value pairs in a dictionary.

That information is then reused on the scheduler to be fed to can_run_on() and determine if a task is suitable to be executed on the worker.

Derived objects can extend the behaviour by overriding the method, calling metadata = super().analyze_worker(), and then adding supplementary data in the dictionary.

To avoid conflicts on the names of the keys used by different tasks you should use key names obtained with self.prefix_with_task_name(...).

Returns

a dictionary describing the worker.

Return type

dict.

classmethod analyze_worker_all_tasks()[source]

Return dictionary with metadata for each task in Task._sub_tasks.

Subclasses of Task get registered in Task._sub_tasks. Return a dictionary with the metadata of each of the subtasks.

This method is executed in the worker when submitting the dynamic metadata.

can_run_on(worker_metadata: dict)bool[source]

Check if the specified worker can run the task.

This method shall take its decision solely based on the supplied worker_metadata and on the configured task data (self.data).

The default implementation returns always True except if there’s a mismatch between the :py:attribute:TASK_VERSION on the scheduler side and on the worker side.

Derived objects can implement further checks by overriding the method in the following way:

if not super().can_run_on(worker_metadata):
    return False

if ...:
    return False

return True
Parameters

worker_metadata (dict) – The metadata collected from the worker by running analyze_worker() on all the tasks on the worker under consideration.

Returns

the boolean result of the check.

Return type

bool.

static class_from_name(sub_task_class_name: str) → Type[debusine.tasks._task.Task][source]

Return class for :param sub_task_class_name (case-insensitive).

__init_subclass__() registers Task subclasses’ into Task._sub_tasks.

configure(task_data)[source]

Configure the task with the supplied task_data.

The supplied data is first validated against the JSON schema defined in the TASK_DATA_SCHEMA class attribute. If validation fails, a TaskConfigError is raised. Otherwise, the supplied task_data is stored in the data attribute.

Derived objects can extend the behaviour by overriding the method and calling super().configure(task_data) however the extra checks must not access any resource of the worker as the method can also be executed on the server when it tries to schedule work requests.

Parameters

task_data (dict) – The supplied data describing the task.

Raises

TaskConfigError – if the JSON schema is not respected.

configure_server_access(debusine: debusine.client.debusine.Debusine)[source]

Set the object to access the server.

data

Validated task data submitted through configure().

execute()bool[source]

Execute the requested task.

The task must first have been configured. It is allowed to take as much time as required. This method will only be run on a worker. It is thus allowed to access resources local to the worker.

It is recommended to fail early by raising a :py:exc:TaskConfigError if the parameters of the task let you anticipate that it has no chance of completing successfully.

Returns

True to indicate success, False for a failure.

Return type

bool.

Raises

TaskConfigError – if the parameters of the work request are incompatible with the worker.

execute_logging_exceptions()bool[source]

Execute self.execute() logging any raised exceptions.

static is_valid_task_name(task_name)bool[source]

Return True if task_name is registered (its class is imported).

logger

A logging.Logger instance that can be used in child classes when you override methods to implement the task.

name

The name of the task. It is computed by __init__() by converting the class name to lowercase.

prefix_with_task_name(text: str)str[source]
Returns

the text prefixed with the task name and a colon.

static task_names()list[source]

Return list of sub-task names.

exception debusine.tasks.TaskConfigError[source]

Halt the task due to invalid configuration.