Tasks (internal API)

Collection of tasks.

The debusine.tasks module hierarchy hosts a collection of BaseTask that are used by workers to fulfill debusine.db.models.WorkRequests sent by the debusine scheduler.

Creating a new task requires adding a new file containing a class inheriting from the BaseTask or RunCommandTask base class. The name of the class must be unique among all child classes.

A child class must, at the very least, override the BaseTask.execute() method.

class debusine.tasks.BaseExternalTask(task_data: dict[str, Any], dynamic_task_data: dict[str, Any] | None = None)[source]

Bases: BaseTask[TD, DTD], Generic[TD, DTD]

A BaseTask that runs on an external worker.

TASK_TYPE: TaskTypes = 'Worker'

The worker type must be suitable for the task type. TaskTypes.WORKER requires an external worker; TaskTypes.SERVER requires a Celery worker; TaskTypes.SIGNING requires a signing worker.

__init__(task_data: dict[str, Any], dynamic_task_data: dict[str, Any] | None = None) None[source]

Initialize the task.

configure_server_access(debusine: Debusine)[source]

Set the object to access the server.

dynamic_task_data_type

Class used as the in-memory representation of dynamic task data.

alias of TypeVar(‘DTD’, bound=BaseDynamicTaskData)

name: ClassVar[str] = 'baseexternaltask'
task_data_type

Class used as the in-memory representation of task data.

alias of TypeVar(‘TD’, bound=BaseTaskData)

class debusine.tasks.BaseTask(task_data: dict[str, Any], dynamic_task_data: dict[str, Any] | None = None)[source]

Bases: Generic[TD, DTD]

Base class for tasks.

A BaseTask object serves two purpose: encapsulating the logic of what needs to be done to execute the task (cf configure() and execute() that are run on a worker), and supporting the scheduler by determining if a task is suitable for a given worker. That is done in a two-step process, collating metadata from each worker (with the analyze_worker() method that is run on a worker) and then, based on this metadata, see if a task is suitable (with can_run_on() that is executed on the scheduler).

Most concrete task implementations should inherit from RunCommandTask instead.

TASK_TYPE: TaskTypes

The worker type must be suitable for the task type. TaskTypes.WORKER requires an external worker; TaskTypes.SERVER requires a Celery worker; TaskTypes.SIGNING requires a signing worker.

TASK_VERSION: int | None = None

Must be overridden by child classes to document the current version of the task’s code. A task will only be scheduled on a worker if its task version is the same as the one running on the scheduler.

__init__(task_data: dict[str, Any], dynamic_task_data: dict[str, Any] | None = None) None[source]

Initialize the task.

abort()[source]

Task does not need to be executed. Once aborted cannot be changed.

property aborted: bool

Return if the task is aborted.

Tasks cannot transition from aborted -> not-aborted.

classmethod analyze_worker() dict[str, Any][source]

Return dynamic metadata about the current worker.

This method is called on the worker to collect information about the worker. The information is stored as a set of key-value pairs in a dictionary.

That information is then reused on the scheduler to be fed to can_run_on() and determine if a task is suitable to be executed on the worker.

Derived objects can extend the behaviour by overriding the method, calling metadata = super().analyze_worker(), and then adding supplementary data in the dictionary.

To avoid conflicts on the names of the keys used by different tasks you should use key names obtained with self.prefix_with_task_name(...).

Returns:

a dictionary describing the worker.

Return type:

dict.

classmethod analyze_worker_all_tasks()[source]

Return dictionary with metadata for each task in BaseTask._sub_tasks.

Subclasses of BaseTask get registered in BaseTask._sub_tasks. Return a dictionary with the metadata of each of the subtasks.

This method is executed in the worker when submitting the dynamic metadata.

append_to_log_file(filename: str, lines: list[str]) None[source]

Open log file and write contents into it.

Parameters:
  • filename – use self.open_debug_log_file(filename)

  • lines – write contents to the logfile

can_run_on(worker_metadata: dict[str, Any]) bool[source]

Check if the specified worker can run the task.

This method shall take its decision solely based on the supplied worker_metadata and on the configured task data (self.data).

The default implementation always returns True unless TASK_TYPE doesn’t match the worker type or there’s a mismatch between the TASK_VERSION on the scheduler side and on the worker side.

Derived objects can implement further checks by overriding the method in the following way:

if not super().can_run_on(worker_metadata):
    return False

if ...:
    return False

return True
Parameters:

worker_metadata (dict) – The metadata collected from the worker by running analyze_worker() on all the tasks on the worker under consideration.

Returns:

the boolean result of the check.

Return type:

bool.

static class_from_name(task_type: TaskTypes, task_name: str) type[debusine.tasks._task.BaseTask[Any, Any]][source]

Return class for :param task_name (case-insensitive).

Parameters:

task_type – type of task to look up

__init_subclass__() registers BaseTask subclasses’ into BaseTask._sub_tasks.

compute_dynamic_data(task_database: TaskDatabaseInterface) Optional[DTD][source]

Compute dynamic data for this task.

This may involve resolving artifact lookups.

data: TD
dynamic_data: Optional[DTD]
dynamic_task_data_type: type[DTD]

Class used as the in-memory representation of dynamic task data.

execute() bool[source]

Call the _execute() method, upload debug artifacts.

See _execute() for more information.

Returns:

result of the _execute() method.

execute_logging_exceptions() bool[source]

Execute self.execute() logging any raised exceptions.

static is_valid_task_name(task_type: TaskTypes, task_name: str) bool[source]

Return True if task_name is registered (its class is imported).

static is_worker_task(task_name: str) bool[source]

Check if task_name is a task that can run on external workers.

logger

A logging.Logger instance that can be used in child classes when you override methods to implement the task.

name: ClassVar[str]
open_debug_log_file(filename: str, *, mode: OpenTextModeWriting = 'a') TextIO[source]
open_debug_log_file(filename: str, *, mode: OpenBinaryModeWriting) BinaryIO

Open a temporary file and return it.

The files are always for the same temporary directory, calling it twice with the same file name will open the same file.

The caller must call .close() when finished writing.

classmethod prefix_with_task_name(text: str) str[source]
Returns:

the text prefixed with the task name and a colon.

task_data_type: type[TD]

Class used as the in-memory representation of task data.

static task_names(task_type: TaskTypes) list[str][source]

Return list of sub-task names.

static worker_task_names() list[str][source]

Return list of sub-task names not of type TaskTypes.SERVER.

class debusine.tasks.BaseTaskWithExecutor(task_data: dict[str, Any], dynamic_task_data: dict[str, Any] | None = None)[source]

Bases: BaseExternalTask[TDE, DTDE], Generic[TDE, DTDE]

Base for tasks with executor capabilities.

DEFAULT_BACKEND = 'unshare'
property backend: str

Return the backend name to use.

dynamic_task_data_type

Class used as the in-memory representation of dynamic task data.

alias of TypeVar(‘DTDE’, bound=BaseDynamicTaskDataWithExecutor)

get_environment_lookup() int | str | None[source]

Get the environment lookup for an executor-capable task.

This automatically fills in some additional constraints from the task data if needed.

name: ClassVar[str] = 'basetaskwithexecutor'
task_data_type

Class used as the in-memory representation of task data.

alias of TypeVar(‘TDE’, bound=BaseTaskDataWithExecutor)

class debusine.tasks.RunCommandTask(task_data: dict[str, Any], dynamic_task_data: dict[str, Any] | None = None)[source]

Bases: BaseExternalTask[TD, DTD], Generic[TD, DTD]

A BaseTask that can execute commands and upload artifacts.

Concrete subclasses must implement:

  • configure_for_execution(self, download_directory: Path) -> bool

  • _cmdline(self) -> list[str]

  • upload_artifacts(self, directory: Path, \*, execution_success: bool). The member variable self._source_artifacts_ids is set by fetch_input() and can be used to create the relations between uploaded artifacts and downloaded artifacts.

  • fetch_input(self, destination) -> bool. Download the needed artifacts into destination. Suggestion: can use fetch_artifact(artifact, dir) to download them.

  • check_directory_for_consistency_errors(self, build_directory: Path) -> list[str] (defaults return an empty list: no-errors)

  • task_succeeded(self, returncode: Optional[int], execute_directory: Path) -> bool (defaults to True)

Use self.append_to_log_file() / self.open_debug_log_file() to provide information for the user (it will be available to the user as an artifact).

Command execution uses process groups to make sure that the command and possible spawned commands are finished, and cancels the execution of the command if BaseTask.aborted() is True.

See the main entry point _execute() for details of the RunCommandTask flow.

CAPTURE_OUTPUT_FILENAME: str | None = None
CMD_LOG_FILENAME = 'cmd-output.log'
CMD_LOG_SEPARATOR = '--------------------'
check_directory_for_consistency_errors(build_directory: Path) list[str][source]

Return list of errors after executing the command.

configure_for_execution(download_directory: Path) bool[source]

Configure task: set variables needed for the self._cmdline().

Called after the files are downloaded via fetch_input().

dynamic_task_data_type

Class used as the in-memory representation of dynamic task data.

alias of TypeVar(‘DTD’, bound=BaseDynamicTaskData)

fetch_artifact(artifact_id: int, destination: Path) ArtifactResponse[source]

Download artifact_id to destination.

Add artifact_id to self._source_artifacts_ids.

fetch_input(destination: Path) bool[source]

Download artifacts needed by the task, update self.source_artifacts_ids.

Task might use self.data.input to download the relevant artifacts.

The method self.fetch_artifact(artifact, destination) might be used to download the relevant artifacts and update self.source_artifacts_ids.

name: ClassVar[str] = 'runcommandtask'
run_cmd(cmd: list[str], working_directory: Path, *, run_as_root: bool = False, capture_stdout_filename: str | None = None) int | None[source]

Run cmd in working_directory. Create self.CMD_OUTPUT_FILE log file.

If BaseTask.aborted == True terminates the process.

Parameters:
  • cmd – command to execute with its arguments.

  • working_directory – working directory where the command is executed.

  • run_as_root – if True, run the command as root. Otherwise, the command runs as the worker’s user

  • capture_stdout_filename – for some commands the output of the command is the output of stdout (e.g. lintian) and not a set of files generated by the command (e.g. sbuild). If capture_stdout is not None, save the stdout into this file. The caller can then use it.

Returns:

returncode of the process or None if aborted

task_data_type

Class used as the in-memory representation of task data.

alias of TypeVar(‘TD’, bound=BaseTaskData)

task_succeeded(returncode: int | None, execute_directory: Path) bool[source]

Sub-tasks can evaluate if the task was a success or failure.

By default, return True (success). Sub-classes can re-implement it.

Parameters:
  • returncode – return code of the command, or None if aborted

  • execute_directory – directory with the output of the task

Returns:

True (if success) or False (if failure).

upload_artifacts(execute_directory: Path, *, execution_success: bool)[source]

Upload the artifacts for the task.

exception debusine.tasks.TaskConfigError(message: str, original_exception: Exception | None = None)[source]

Bases: Exception

Exception raised when there is an issue with a task configuration.

__init__(message: str, original_exception: Exception | None = None)[source]

Initialize the TaskConfigError.

Parameters:
  • message – human-readable message describing the error.

  • original_exception – the exception that triggered this error, if applicable. This is used to provide additional information.

Task to build Debian packages with sbuild.

This task implements the PackageBuild generic task for its task_data: https://freexian-team.pages.debian.net/debusine/reference/tasks.html#task-packagebuild

class debusine.tasks.sbuild.Sbuild(task_data: dict[str, Any], dynamic_task_data: dict[str, Any] | None = None)[source]

Bases: SbuildValidatorMixin, RunCommandTask[SbuildData, SbuildDynamicData], BaseTaskWithExecutor[SbuildData, SbuildDynamicData]

Task implementing a Debian package build with sbuild.

TASK_VERSION: int | None = 1

Must be overridden by child classes to document the current version of the task’s code. A task will only be scheduled on a worker if its task version is the same as the one running on the scheduler.

__init__(task_data: dict[str, Any], dynamic_task_data: dict[str, Any] | None = None) None[source]

Initialize the sbuild task.

classmethod analyze_worker()[source]

Report metadata for this task on this worker.

can_run_on(worker_metadata: dict[str, Any]) bool[source]

Check the specified worker can run the requested task.

property chroot_name: str

Build name of required chroot.

compute_dynamic_data(task_database: TaskDatabaseInterface) SbuildDynamicData[source]

Resolve artifact lookups for this task.

configure_for_execution(download_directory: Path) bool[source]

Configure Task: set variables needed for the build() step.

Return True if configuration worked, False, if there was a problem.

dynamic_task_data_type

alias of SbuildDynamicData

execute() bool[source]

Verify task can be executed and super().execute().

Raises:

TaskConfigError.

fetch_input(destination: Path) bool[source]

Download the source artifact.

name: ClassVar[str] = 'sbuild'
task_data_type

alias of SbuildData

upload_artifacts(directory: Path, *, execution_success: bool)[source]

Upload the artifacts from directory.

Parameters:
  • directory – directory containing the files that will be uploaded.

  • execution_success – if False skip uploading .changes and .deb/.udeb