Tasks (internal API)

class debusine.tasks.BaseExternalTask(task_data: dict[str, Any], dynamic_task_data: dict[str, Any] | None = None)[source]

Bases: TaskFieldContainer, Generic

Base class for external tasks.

A BaseExternalTask object serves two purposes: encapsulating the logic of what needs to be done to execute the task on an external worker (cf configure() and execute(), that are run on the worker), and supporting the server’s task-specific logic, such as scheduling and UI presentation.

Scheduling support is done in a two-step process: collating metadata from each worker (with the analyze_worker() method that is run on a worker) and then, based on this metadata, see if a task is suitable (with can_run_on() that is executed on the scheduler).

If tag-based scheduling is in use, that is replaced by :py:meth`get_provided_worker_tags`, which runs on the worker and reports provided worker tags that need to match what tasks require.

Concrete subclasses must implement:

  • run(execute_directory: Path) -> bool: Do the main work of the task.

Most concrete subclasses should also implement:

  • fetch_input(self, destination) -> bool. Download the needed artifacts into destination. Suggestion: can use fetch_artifact(artifact, dir) to download them. (default: return True)

  • configure_for_execution(self, download_directory: Path) -> bool (default: return True)

  • check_directory_for_consistency_errors(self, build_directory: Path) -> list[str] (default: return an empty list, indicating no errors)

  • upload_artifacts(self, directory: Path, \*, execution_success: bool). The member variable self._source_artifacts_ids is set by fetch_input() and can be used to create the relations between uploaded artifacts and downloaded artifacts. (default: return True)

Most concrete task implementations should inherit from RunCommandTask instead.

TASK_TYPE: TaskTypes = 'Worker'

The worker type must be suitable for the task type. TaskTypes.WORKER requires an external worker; TaskTypes.SERVER requires a Celery worker; TaskTypes.SIGNING requires a signing worker.

TASK_VERSION: int | None = None

Must be overridden by child classes to document the current version of the task’s code. A task will only be scheduled on a worker if its task version is the same as the one running on the scheduler.

__init__(task_data: dict[str, Any], dynamic_task_data: dict[str, Any] | None = None) None[source]

Initialize the task.

abort() None[source]

Task does not need to be executed. Once aborted cannot be changed.

property aborted: bool

Return if the task is aborted.

Tasks cannot transition from aborted -> not-aborted.

classmethod analyze_worker() dict[str, Any][source]

Return dynamic metadata about the current worker.

This method is called on the worker to collect information about the worker. The information is stored as a set of key-value pairs in a dictionary.

That information is then reused on the scheduler to be fed to can_run_on() and determine if a task is suitable to be executed on the worker.

Derived objects can extend the behaviour by overriding the method, calling metadata = super().analyze_worker(), and then adding supplementary data in the dictionary.

To avoid conflicts on the names of the keys used by different tasks you should use key names obtained with self.prefix_with_task_name(...).

Returns:

a dictionary describing the worker.

Return type:

dict.

append_to_log_file(filename: str, lines: list[str]) None[source]

Open log file and write contents into it.

Parameters:
  • filename – use self.open_debug_log_file(filename)

  • lines – write contents to the logfile

can_run_on(worker_metadata: dict[str, Any]) bool[source]

Check if the specified worker can run the task.

This method shall take its decision solely based on the supplied worker_metadata and on the configured task data (self.data).

The default implementation always returns True unless TASK_TYPE doesn’t match the worker type or there’s a mismatch between the TASK_VERSION on the scheduler side and on the worker side.

Derived objects can implement further checks by overriding the method in the following way:

if not super().can_run_on(worker_metadata):
    return False

if ...:
    return False

return True
Parameters:

worker_metadata (dict) – The metadata collected from the worker by running analyze_worker() on all the tasks on the worker under consideration.

Returns:

the boolean result of the check.

Return type:

bool.

check_directory_for_consistency_errors(build_directory: Path) list[str][source]

Return list of errors after doing the main work of the task.

static class_from_name(task_type: TaskTypes, task_name: str) type[BaseExternalTask[Any, Any]][source]

Return class for :param task_name (case-insensitive).

Parameters:

task_type – type of task to look up

__init_subclass__() registers BaseExternalTask subclasses’ into BaseExternalTask._sub_tasks.

cleanup() None[source]

Clean up after running the task.

compute_dynamic_data() DTD[source]

Build a dynamic task data structure for this task.

This method can only be called after all the task fields have been resolved.

Returns:

the newly created dynamic task data

compute_system_required_tags() set[str][source]

Compute the system set of task-required tags.

configure_for_execution(download_directory: Path) bool[source]

Configure task: set variables needed for the self._cmdline().

Called after the files are downloaded via fetch_input().

configure_server_access(debusine: Debusine) None[source]

Set the object to access the server.

data: TD
dynamic_data: DTD | None
dynamic_task_data_type: type[DTD]

Class used as the in-memory representation of dynamic task data.

execute() WorkRequestResults[source]

Call the _execute() method, upload debug artifacts.

See _execute() for more information.

Returns:

result of the _execute() method.

execute_logging_exceptions() WorkRequestResults[source]

Execute self.execute() logging any raised exceptions.

fetch_artifact(artifact_id: int, destination: Path) ArtifactResponse[source]

Download artifact_id to destination.

Add artifact_id to self._source_artifacts_ids.

fetch_input(destination: Path) bool[source]

Download artifacts needed by the task, update self.source_artifacts_ids.

Task might use self.data.input to download the relevant artifacts.

The method self.fetch_artifact(artifact, destination) might be used to download the relevant artifacts and update self.source_artifacts_ids.

get_configuration_context() str | None[source]

Return the configuration context used to look up task configuration.

This method can only be called after all the task fields have been resolved.

get_input_artifacts_ids() list[int][source]

Return the list of input artifact IDs used by this task.

This refers to the artifacts actually used by the task. If dynamic_data is empty, this returns the empty list.

This is used by views to show what artifacts were used by a task. _source_artifacts_ids cannot be used for this purpose because it is only set during task execution.

classmethod get_provided_worker_tags() set[str][source]

Return the set of worker tags provided by this task class.

This method is called on the worker to collect information about the worker.

abstract get_subject() str | None[source]

Return the task subject used to look up task configuration.

This method can only be called after all the task fields have been resolved.

get_user_provided_tags() set[str][source]

Compute the user set of task-provided tags.

This method can only be called after all the task fields have been resolved.

inputs: dict[str, 'TaskInput[Any]'] = {}

Inputs defined in this task class

logger

A logging.Logger instance that can be used in child classes when you override methods to implement the task.

name: ClassVar[str]
open_debug_log_file(filename: str, *, mode: OpenTextModeWriting = 'a') TextIO[source]
open_debug_log_file(filename: str, *, mode: OpenBinaryModeWriting) BinaryIO

Open a temporary file and return it.

The files are always for the same temporary directory, calling it twice with the same file name will open the same file.

The caller must call .close() when finished writing.

post_init() None[source]

Specific post-init code.

classmethod prefix_with_task_name(text: str) str[source]
Returns:

the text prefixed with the task name and a colon.

prepare_to_run(download_directory: Path, execute_directory: Path) None[source]

Prepare the execution environment to do the main work of the task.

abstract run(execute_directory: Path) WorkRequestResults[source]

Do the main work of the task.

task_data_type: type[TD]

Class used as the in-memory representation of task data.

upload_artifacts(execute_directory: Path, *, execution_result: WorkRequestResults) None[source]

Upload the artifacts for the task.

class debusine.tasks.BaseTaskWithExecutor(task_data: dict[str, Any], dynamic_task_data: dict[str, Any] | None = None)[source]

Bases: BaseExternalTask, Generic

Base for tasks with executor capabilities.

Concrete subclasses must implement fetch_input(), configure_for_execution(), run(), check_directory_for_consistency_errors(), and upload_artifacts(), as documented by BaseExternalTask.

DEFAULT_BACKEND = 'unshare'
property backend: BackendType

Return the backend name to use.

build_architecture

Task build architecture

can_run_on(worker_metadata: dict[str, Any]) bool[source]

Check if the specified worker can run the task.

This method shall take its decision solely based on the supplied worker_metadata and on the configured task data (self.data).

The default implementation always returns True unless TASK_TYPE doesn’t match the worker type or there’s a mismatch between the TASK_VERSION on the scheduler side and on the worker side.

Derived objects can implement further checks by overriding the method in the following way:

if not super().can_run_on(worker_metadata):
    return False

if ...:
    return False

return True
Parameters:

worker_metadata (dict) – The metadata collected from the worker by running analyze_worker() on all the tasks on the worker under consideration.

Returns:

the boolean result of the check.

Return type:

bool.

cleanup() None[source]

Clean up after running the task.

Some tasks use the executor in upload_artifacts, so we clean up the executor here rather than in run().

compute_system_required_tags() set[str][source]

Compute the system set of task-required tags.

dynamic_task_data_type: type[DTD] = DTDE

Class used as the in-memory representation of dynamic task data.

get_package_version(package_name: str) Version[source]

Get the installed version of a package in the executor.

Raises:

subprocess.CalledProcessError – if the package is not installed (also logged to install.log).

inputs: dict[str, 'TaskInput[Any]'] = {'build_architecture': <debusine.tasks.inputs.inputs.BuildArchitectureInput object>}

Inputs defined in this task class

name: ClassVar[str] = 'basetaskwithexecutor'
prepare_to_run(download_directory: Path, execute_directory: Path) None[source]

Copy the download and execution directories into the executor.

run_executor_command(cmd: list[str], log_filename: str, run_as_root: bool = False, check: bool = True) None[source]

Run cmd within the executor, logging the output to log_name.

task_data_type: type[TD] = TDE

Class used as the in-memory representation of task data.

class debusine.tasks.ExtraRepositoryMixin(task_data: dict[str, Any], dynamic_task_data: dict[str, Any] | None = None)[source]

Bases: BaseExternalTask, Generic

Methods for configuring external APT repositories.

data: TDER
dynamic_task_data_type: type[DTD] = DTDER

Class used as the in-memory representation of dynamic task data.

extra_repositories

Extra repositories configured in task data

extra_repository_keys: list[Path]
extra_repository_lists: list[Path]
extra_repository_sources: list[Path]
inputs: dict[str, 'TaskInput[Any]'] = {'extra_repositories': <debusine.tasks.inputs.inputs.ExtraRepositoriesInput object>}

Inputs defined in this task class

name: ClassVar[str] = 'extrarepositorymixin'
post_init() None[source]

Specific post-init code.

supports_inline_signed_by(codename: str) bool[source]

Determine if codename supports inline signatures.

task_data_type: type[TD] = TDER

Class used as the in-memory representation of task data.

write_extra_repository_config(codename: str, destination: Path) None[source]

Write extra_repositories config to files in destination.

extra_repository_keys will be populated with keys to install into /etc/apt/keyrings, if needed.

extra_repository_sources will be populated with deb822 sources files.

class debusine.tasks.RunCommandTask(task_data: dict[str, Any], dynamic_task_data: dict[str, Any] | None = None)[source]

Bases: BaseExternalTask, Generic

A BaseExternalTask that executes commands and uploads artifacts.

Concrete subclasses must implement:

  • _cmdline(self) -> list[str]

  • task_result(self, returncode: Optional[int], execute_directory: Path) -> WorkRequestResults (defaults to SUCCESS)

They must also implement configure_for_execution(), fetch_input(), check_directory_for_consistency_errors(), and upload_artifacts(), as documented by BaseTaskWithExecutor. (They do not need to implement run(), but may do so if they need to run multiple commands rather than just one.)

Use self.append_to_log_file() / self.open_debug_log_file() to provide information for the user (it will be available to the user as an artifact).

Command execution uses process groups to make sure that the command and possible spawned commands are finished, and cancels the execution of the command if BaseExternalTask.aborted() is True.

Optionally: _cmdline_as_root() and _cmd_env() may be implemented, to customize behaviour.

See the main entry point BaseExternalTask._execute() for details of the flow.

CAPTURE_OUTPUT_FILENAME: str | None = None
CMD_LOG_FILENAME = 'cmd-output.log'
CMD_LOG_SEPARATOR = '--------------------'
dynamic_task_data_type: type[DTD] = DTD

Class used as the in-memory representation of dynamic task data.

inputs: dict[str, 'TaskInput[Any]'] = {}

Inputs defined in this task class

name: ClassVar[str] = 'runcommandtask'
run(execute_directory: Path) WorkRequestResults[source]

Run a single command via the executor.

Note

If the member variable CAPTURE_OUTPUT_FILENAME is set: create a file with its name with the stdout of the command. Otherwise, the stdout of the command is saved in self.CMD_LOG_FILENAME).

run_cmd(cmd: list[str], working_directory: Path, *, env: dict[str, str] | None = None, run_as_root: bool = False, capture_stdout_filename: str | None = None) int | None[source]

Run cmd in working_directory. Create self.CMD_OUTPUT_FILE log file.

If BaseExternalTask.aborted == True terminates the process.

Parameters:
  • cmd – command to execute with its arguments.

  • working_directory – working directory where the command is executed.

  • run_as_root – if True, run the command as root. Otherwise, the command runs as the worker’s user

  • capture_stdout_filename – for some commands the output of the command is the output of stdout (e.g. lintian) and not a set of files generated by the command (e.g. sbuild). If capture_stdout is not None, save the stdout into this file. The caller can then use it.

Returns:

returncode of the process or None if aborted

task_data_type: type[TD] = TD

Class used as the in-memory representation of task data.

task_result(returncode: int | None, execute_directory: Path) WorkRequestResults[source]

Sub-tasks can evaluate if the task was a success or failure.

By default, return True (success). Sub-classes can re-implement it.

Parameters:
  • returncode – return code of the command, or None if aborted

  • execute_directory – directory with the output of the task

Returns:

WorkRequestResults.

class debusine.tasks.TaskConfigError(message: str | None, original_exception: Exception | None = None)[source]

Bases: Exception

Exception raised when there is an issue with a task configuration.

__init__(message: str | None, original_exception: Exception | None = None)[source]

Initialize the TaskConfigError.

Parameters:
  • message – human-readable message describing the error.

  • original_exception – the exception that triggered this error, if applicable. This is used to provide additional information.

add_parent_message(msg: str) None[source]

Prepend the error message with one from the containing scope.

debusine.tasks.ensure_artifact_categories(*, configuration_key: str, category: str, expected: Collection[ArtifactCategory]) None[source]

Validate that the artifact’s category is one of the expected categories.

Parameters:
  • configuration_key – Optional key to identify the source of the artifact. Provides additional context in error messages.

  • category – The category to validate.

  • expected – A collection of valid artifact categories.

debusine.tasks.ensure_collection_category(*, configuration_key: str, category: str, expected: CollectionCategory) None[source]

Validate that the collection’s category is as expected.

Parameters:
  • configuration_key – Optional key to identify the source of the artifact. Provides additional context in error messages.

  • category – The category to validate.

  • expected – The expected collection category.

debusine.tasks.analyze_external_worker_tasks() dict[str, Any][source]

Return dictionary with metadata for each BaseExternalTask.

Subclasses of BaseExternalTask get registered in BaseExternalTask._sub_tasks. Return a dictionary with the metadata of each of the subtasks.

This method is executed in the worker when submitting the dynamic metadata.

debusine.tasks.get_provided_external_worker_tags() set[str][source]

Return the set of worker tags provided by all available tasks.

This method is called on the worker to collect information about the worker.