Source code for debusine.tasks._task

# Copyright 2021-2022 The Debusine Developers
# See the AUTHORS file at the top-level directory of this distribution
#
# This file is part of Debusine. It is subject to the license terms
# in the LICENSE file found in the top-level directory of this
# distribution. No part of Debusine, including this file, may be copied,
# modified, propagated, or distributed except according to the terms
# contained in the LICENSE file.
"""
Collection of tasks.

The debusine.tasks module hierarchy hosts a collection of :class:`Task` that are
used by workers to fulfill WorkRequest sent by the debusine scheduler.

Creating a new task requires adding a new file containing a class inheriting
from the :class:`Task` base class. The name of the class must be unique among
all child classes.

A child class must, at the very least, override the :py:meth:`Task.execute`
method.
"""
import logging
from typing import Optional, Type

import jsonschema

from debusine.client.debusine import Debusine


[docs]class TaskConfigError(Exception): """Halt the task due to invalid configuration."""
class TaskMissingFilesError(Exception): """Task did not generate the expected output files."""
[docs]class Task: """ Base class for tasks. A Task object serves two purpose: encapsulating the logic of what needs to be done to execute the task (cf :py:meth:`configure` and :py:meth:`execute` that are run on a worker), and supporting the scheduler by determining if a task is suitable for a given worker. That is done in a two-step process, collating metadata from each worker (with the :py:meth:`analyze_worker` method that is run on a worker) and then, based on this metadata, see if a task is suitable (with :py:meth:`can_run_on` that is executed on the scheduler). """ #: Must be overridden by child classes to document the current version of #: the task's code. A task will only be scheduled on a worker if its task #: version is the same as the one running on the scheduler. TASK_VERSION = None #: Can be overridden to enable jsonschema validation of the ``task_data`` #: parameter passed to :py:meth:`configure`. TASK_DATA_SCHEMA = {} _sub_tasks = {} def __init_subclass__(cls, **kwargs): """ Register the subclass into Task._sub_tasks. Used by Task.class_from_name() to return the class given the name. """ super().__init_subclass__(**kwargs) sub_task_name_lowercase = cls.__name__.lower() # The same sub-task could register twice # (but assert that is the *same* class, not a different # subtask with a name with a different capitalisation) assert ( sub_task_name_lowercase not in cls._sub_tasks or cls._sub_tasks[sub_task_name_lowercase] == cls ) cls._sub_tasks[sub_task_name_lowercase] = cls
[docs] def __init__(self): """Initialize the task.""" #: Validated task data submitted through :py:meth:`configure`. self.data = None #: The name of the task. It is computed by :py:meth:`__init__` by #: converting the class name to lowercase. self.name = self.__class__.__name__.lower() #: A :class:`logging.Logger` instance that can be used in child classes #: when you override methods to implement the task. self.logger = logging.getLogger("debusine.tasks") # Task is aborted: the task does not need to be executed, and can be # stopped if it is already running self._aborted = False self.debusine: Optional[Debusine] = None
[docs] def configure_server_access(self, debusine: Debusine): """Set the object to access the server.""" self.debusine = debusine
[docs] def prefix_with_task_name(self, text: str) -> str: """:return: the ``text`` prefixed with the task name and a colon.""" return f"{self.name}:{text}"
[docs] def analyze_worker(self) -> dict: """ Return dynamic metadata about the current worker. This method is called on the worker to collect information about the worker. The information is stored as a set of key-value pairs in a dictionary. That information is then reused on the scheduler to be fed to :py:meth:`can_run_on` and determine if a task is suitable to be executed on the worker. Derived objects can extend the behaviour by overriding the method, calling ``metadata = super().analyze_worker()``, and then adding supplementary data in the dictionary. To avoid conflicts on the names of the keys used by different tasks you should use key names obtained with ``self.prefix_with_task_name(...)``. :return: a dictionary describing the worker. :rtype: dict. """ version_key_name = self.prefix_with_task_name("version") return { version_key_name: self.TASK_VERSION, }
[docs] @classmethod def analyze_worker_all_tasks(cls): """ Return dictionary with metadata for each task in Task._sub_tasks. Subclasses of Task get registered in Task._sub_tasks. Return a dictionary with the metadata of each of the subtasks. This method is executed in the worker when submitting the dynamic metadata. """ metadata = {} for task_class in cls._sub_tasks.values(): task = task_class() metadata.update(task.analyze_worker()) return metadata
[docs] def can_run_on(self, worker_metadata: dict) -> bool: """ Check if the specified worker can run the task. This method shall take its decision solely based on the supplied ``worker_metadata`` and on the configured task data (``self.data``). The default implementation returns always True except if there's a mismatch between the :py:attribute:TASK_VERSION on the scheduler side and on the worker side. Derived objects can implement further checks by overriding the method in the following way:: if not super().can_run_on(worker_metadata): return False if ...: return False return True :param dict worker_metadata: The metadata collected from the worker by running :py:meth:`analyze_worker` on all the tasks on the worker under consideration. :return: the boolean result of the check. :rtype: bool. """ version_key_name = self.prefix_with_task_name("version") if worker_metadata.get(version_key_name) != self.TASK_VERSION: return False return True
[docs] def configure(self, task_data): """ Configure the task with the supplied ``task_data``. The supplied data is first validated against the JSON schema defined in the TASK_DATA_SCHEMA class attribute. If validation fails, a TaskConfigError is raised. Otherwise, the supplied `task_data` is stored in the `data` attribute. Derived objects can extend the behaviour by overriding the method and calling ``super().configure(task_data)`` however the extra checks must not access any resource of the worker as the method can also be executed on the server when it tries to schedule work requests. :param dict task_data: The supplied data describing the task. :raises TaskConfigError: if the JSON schema is not respected. """ try: jsonschema.validate(task_data, self.TASK_DATA_SCHEMA) except jsonschema.ValidationError as exc: raise TaskConfigError(exc.message) self.data = task_data
[docs] def execute_logging_exceptions(self) -> bool: """Execute self.execute() logging any raised exceptions.""" try: return self.execute() except Exception as exc: self.logger.exception("Exception in Task %s", self.name) raise exc
[docs] def execute(self) -> bool: """ Execute the requested task. The task must first have been configured. It is allowed to take as much time as required. This method will only be run on a worker. It is thus allowed to access resources local to the worker. It is recommended to fail early by raising a :py:exc:TaskConfigError if the parameters of the task let you anticipate that it has no chance of completing successfully. :return: True to indicate success, False for a failure. :rtype: bool. :raises TaskConfigError: if the parameters of the work request are incompatible with the worker. """ raise NotImplementedError()
[docs] def abort(self): """Task does not need to be executed. Once aborted cannot be changed.""" self._aborted = True
@property def aborted(self) -> bool: """ Return if the task is aborted. Tasks cannot transition from aborted -> not-aborted. """ return self._aborted
[docs] @staticmethod def class_from_name(sub_task_class_name: str) -> Type["Task"]: """ Return class for :param sub_task_class_name (case-insensitive). __init_subclass__() registers Task subclasses' into Task._sub_tasks. """ sub_task_class_name_lowercase = sub_task_class_name.lower() if sub_task_class_name_lowercase in Task._sub_tasks: return Task._sub_tasks[sub_task_class_name_lowercase] raise ValueError( f"'{sub_task_class_name_lowercase}' is not a registered task_name" )
[docs] @staticmethod def is_valid_task_name(task_name) -> bool: """Return True if task_name is registered (its class is imported).""" return task_name.lower() in Task._sub_tasks
[docs] @staticmethod def task_names() -> list[str]: """Return list of sub-task names.""" return sorted(Task._sub_tasks)