Tasks (internal API)

Collection of tasks.

The debusine.tasks module hierarchy hosts a collection of BaseTask that are used by workers to fulfill debusine.db.models.WorkRequests sent by the debusine scheduler.

Creating a new task requires adding a new file containing a class inheriting from the BaseTask or RunCommandTask base class. The name of the class must be unique among all child classes.

A child class must, at the very least, override the BaseTask.execute() method.

class debusine.tasks.AssembleSignedSource(task_data: dict[str, Any], dynamic_task_data: dict[str, Any] | None = None)[source]

Bases: RunCommandTask[AssembleSignedSourceData, AssembleSignedSourceDynamicData], BaseTaskWithExecutor[AssembleSignedSourceData, AssembleSignedSourceDynamicData]

Task to assemble a signed source package.

TASK_VERSION: int | None = 1

Must be overridden by child classes to document the current version of the task’s code. A task will only be scheduled on a worker if its task version is the same as the one running on the scheduler.

__init__(task_data: dict[str, Any], dynamic_task_data: dict[str, Any] | None = None) None[source]

Initialize the task.

can_run_on(worker_metadata: dict[str, Any]) bool[source]

Check if the specified worker can run the task.

cleanup() None[source]

Clean up after running the task.

compute_dynamic_data(task_database: TaskDatabaseInterface) AssembleSignedSourceDynamicData[source]

Resolve artifact lookups for this task.

configure_for_execution(download_directory: Path) bool[source]

Configure task: ensure that the executor has dpkg-dev installed.

dynamic_task_data_type

alias of AssembleSignedSourceDynamicData

fetch_input(destination: Path) bool[source]

Download the required artifacts.

get_label() str[source]

Return the task label.

name: ClassVar[str] = 'assemblesignedsource'
run(execute_directory: Path) bool[source]

Do the main assembly work.

task_data_type

alias of AssembleSignedSourceData

upload_artifacts(execute_directory: Path, *, execution_success: bool) None[source]

Upload artifacts for the task.

class debusine.tasks.Autopkgtest(task_data: dict[str, Any], dynamic_task_data: dict[str, Any] | None = None)[source]

Bases: ExtraRepositoryMixin[AutopkgtestData, AutopkgtestDynamicData], RunCommandTask[AutopkgtestData, AutopkgtestDynamicData], BaseTaskWithExecutor[AutopkgtestData, AutopkgtestDynamicData]

Task to use autopkgtest in debusine.

ARTIFACT_DIR = 'artifact-dir'
SUMMARY_FILE = 'artifact-dir/summary'
TASK_VERSION: int | None = 1

Must be overridden by child classes to document the current version of the task’s code. A task will only be scheduled on a worker if its task version is the same as the one running on the scheduler.

__init__(task_data: dict[str, Any], dynamic_task_data: dict[str, Any] | None = None) None[source]

Initialize object.

classmethod analyze_worker() dict[str, Any][source]

Report metadata for this task on this worker.

can_run_on(worker_metadata: dict[str, Any]) bool[source]

Check if the specified worker can run the task.

check_directory_for_consistency_errors(build_directory: Path) list[str][source]

Autopkgtest ARTIFACT_DIR/summary file does not exist.

compute_dynamic_data(task_database: TaskDatabaseInterface) AutopkgtestDynamicData[source]

Resolve artifact lookups for this task.

configure_for_execution(download_directory: Path) bool[source]

Gather information used later on (_cmdline(), upload_artifacts()).

dynamic_task_data_type

alias of AutopkgtestDynamicData

fetch_input(destination: Path) bool[source]

Download the required artifacts.

get_label() str[source]

Return the task label.

name: ClassVar[str] = 'autopkgtest'
task_data_type

alias of AutopkgtestData

task_succeeded(returncode: int | None, execute_directory: Path) bool[source]

Parse the summary file and return success.

Use self.data.fail_on.

upload_artifacts(execute_directory: Path, *, execution_success: bool) None[source]

Upload AutopkgtestArtifact with the files, data and relationships.

class debusine.tasks.BaseExternalTask(task_data: dict[str, Any], dynamic_task_data: dict[str, Any] | None = None)[source]

Bases: BaseTask[TD, DTD], Generic[TD, DTD]

A BaseTask that runs on an external worker.

Concrete subclasses must implement:

  • run(execute_directory: Path) -> bool: Do the main work of the task.

Most concrete subclasses should also implement:

  • fetch_input(self, destination) -> bool. Download the needed artifacts into destination. Suggestion: can use fetch_artifact(artifact, dir) to download them. (default: return True)

  • configure_for_execution(self, download_directory: Path) -> bool (default: return True)

  • check_directory_for_consistency_errors(self, build_directory: Path) -> list[str] (default: return an empty list, indicating no errors)

  • upload_artifacts(self, directory: Path, \*, execution_success: bool). The member variable self._source_artifacts_ids is set by fetch_input() and can be used to create the relations between uploaded artifacts and downloaded artifacts. (default: return True)

TASK_TYPE: TaskTypes = 'Worker'

The worker type must be suitable for the task type. TaskTypes.WORKER requires an external worker; TaskTypes.SERVER requires a Celery worker; TaskTypes.SIGNING requires a signing worker.

__init__(task_data: dict[str, Any], dynamic_task_data: dict[str, Any] | None = None) None[source]

Initialize the task.

check_directory_for_consistency_errors(build_directory: Path) list[str][source]

Return list of errors after doing the main work of the task.

cleanup() None[source]

Clean up after running the task.

configure_for_execution(download_directory: Path) bool[source]

Configure task: set variables needed for the self._cmdline().

Called after the files are downloaded via fetch_input().

configure_server_access(debusine: Debusine) None[source]

Set the object to access the server.

dynamic_task_data_type

Class used as the in-memory representation of dynamic task data.

alias of TypeVar(‘DTD’, bound=BaseDynamicTaskData)

fetch_artifact(artifact_id: int, destination: Path) ArtifactResponse[source]

Download artifact_id to destination.

Add artifact_id to self._source_artifacts_ids.

fetch_input(destination: Path) bool[source]

Download artifacts needed by the task, update self.source_artifacts_ids.

Task might use self.data.input to download the relevant artifacts.

The method self.fetch_artifact(artifact, destination) might be used to download the relevant artifacts and update self.source_artifacts_ids.

name: ClassVar[str] = 'baseexternaltask'
prepare_to_run(download_directory: Path, execute_directory: Path) None[source]

Prepare the execution environment to do the main work of the task.

abstract run(execute_directory: Path) bool[source]

Do the main work of the task.

task_data_type

Class used as the in-memory representation of task data.

alias of TypeVar(‘TD’, bound=BaseTaskData)

upload_artifacts(execute_directory: Path, *, execution_success: bool) None[source]

Upload the artifacts for the task.

class debusine.tasks.BaseTask(task_data: dict[str, Any], dynamic_task_data: dict[str, Any] | None = None)[source]

Bases: Generic[TD, DTD]

Base class for tasks.

A BaseTask object serves two purpose: encapsulating the logic of what needs to be done to execute the task (cf configure() and execute() that are run on a worker), and supporting the scheduler by determining if a task is suitable for a given worker. That is done in a two-step process, collating metadata from each worker (with the analyze_worker() method that is run on a worker) and then, based on this metadata, see if a task is suitable (with can_run_on() that is executed on the scheduler).

Most concrete task implementations should inherit from RunCommandTask instead.

TASK_TYPE: TaskTypes

The worker type must be suitable for the task type. TaskTypes.WORKER requires an external worker; TaskTypes.SERVER requires a Celery worker; TaskTypes.SIGNING requires a signing worker.

TASK_VERSION: int | None = None

Must be overridden by child classes to document the current version of the task’s code. A task will only be scheduled on a worker if its task version is the same as the one running on the scheduler.

__init__(task_data: dict[str, Any], dynamic_task_data: dict[str, Any] | None = None) None[source]

Initialize the task.

abort() None[source]

Task does not need to be executed. Once aborted cannot be changed.

property aborted: bool

Return if the task is aborted.

Tasks cannot transition from aborted -> not-aborted.

classmethod analyze_worker() dict[str, Any][source]

Return dynamic metadata about the current worker.

This method is called on the worker to collect information about the worker. The information is stored as a set of key-value pairs in a dictionary.

That information is then reused on the scheduler to be fed to can_run_on() and determine if a task is suitable to be executed on the worker.

Derived objects can extend the behaviour by overriding the method, calling metadata = super().analyze_worker(), and then adding supplementary data in the dictionary.

To avoid conflicts on the names of the keys used by different tasks you should use key names obtained with self.prefix_with_task_name(...).

Returns:

a dictionary describing the worker.

Return type:

dict.

classmethod analyze_worker_all_tasks() dict[str, Any][source]

Return dictionary with metadata for each task in BaseTask._sub_tasks.

Subclasses of BaseTask get registered in BaseTask._sub_tasks. Return a dictionary with the metadata of each of the subtasks.

This method is executed in the worker when submitting the dynamic metadata.

append_to_log_file(filename: str, lines: list[str]) None[source]

Open log file and write contents into it.

Parameters:
  • filename – use self.open_debug_log_file(filename)

  • lines – write contents to the logfile

can_run_on(worker_metadata: dict[str, Any]) bool[source]

Check if the specified worker can run the task.

This method shall take its decision solely based on the supplied worker_metadata and on the configured task data (self.data).

The default implementation always returns True unless TASK_TYPE doesn’t match the worker type or there’s a mismatch between the TASK_VERSION on the scheduler side and on the worker side.

Derived objects can implement further checks by overriding the method in the following way:

if not super().can_run_on(worker_metadata):
    return False

if ...:
    return False

return True
Parameters:

worker_metadata (dict) – The metadata collected from the worker by running analyze_worker() on all the tasks on the worker under consideration.

Returns:

the boolean result of the check.

Return type:

bool.

static class_from_name(task_type: TaskTypes, task_name: str) type[debusine.tasks._task.BaseTask[Any, Any]][source]

Return class for :param task_name (case-insensitive).

Parameters:

task_type – type of task to look up

__init_subclass__() registers BaseTask subclasses’ into BaseTask._sub_tasks.

compute_dynamic_data(task_database: TaskDatabaseInterface) Optional[DTD][source]

Compute dynamic data for this task.

This may involve resolving artifact lookups.

data: TD
dynamic_data: Optional[DTD]
dynamic_task_data_type: type[DTD]

Class used as the in-memory representation of dynamic task data.

execute() bool[source]

Call the _execute() method, upload debug artifacts.

See _execute() for more information.

Returns:

result of the _execute() method.

execute_logging_exceptions() bool[source]

Execute self.execute() logging any raised exceptions.

get_event_reactions(event_name: Literal['on_creation', 'on_unblock', 'on_success', 'on_failure']) list[Union[debusine.tasks.models.ActionSendNotification, debusine.tasks.models.ActionUpdateCollectionWithArtifacts, debusine.tasks.models.ActionUpdateCollectionWithData, debusine.tasks.models.ActionRetryWithDelays]][source]

Return event reactions for this task.

This allows tasks to provide actions that are processed by the server at various points in the lifecycle of the work request.

abstract get_label() str[source]

Return a short human-readable label for the task.

Returns:

None if no label could be computed from task data

get_source_artifacts_ids() list[int][source]

Return the list of source artifact IDs used by this task.

This refers to the artifacts actually used by the task. If dynamic_data is empty, this returns the empty list.

This is used by views to show what artifacts were used by a task. _source_artifacts_ids cannot be used for this purpose because it is only set during task execution.

host_architecture() str | None[source]

Return host_architecture.

Tasks where host_architecture is not determined by self.data.host_architecture should re-implement this method.

static is_valid_task_name(task_type: TaskTypes, task_name: str) bool[source]

Return True if task_name is registered (its class is imported).

static is_worker_task(task_name: str) bool[source]

Check if task_name is a task that can run on external workers.

logger

A logging.Logger instance that can be used in child classes when you override methods to implement the task.

name: ClassVar[str]
open_debug_log_file(filename: str, *, mode: OpenTextModeWriting = 'a') TextIO[source]
open_debug_log_file(filename: str, *, mode: OpenBinaryModeWriting) BinaryIO

Open a temporary file and return it.

The files are always for the same temporary directory, calling it twice with the same file name will open the same file.

The caller must call .close() when finished writing.

classmethod prefix_with_task_name(text: str) str[source]
Returns:

the text prefixed with the task name and a colon.

task_data_type: type[TD]

Class used as the in-memory representation of task data.

static task_names(task_type: TaskTypes) list[str][source]

Return list of sub-task names.

static worker_task_names() list[str][source]

Return list of sub-task names not of type TaskTypes.SERVER.

class debusine.tasks.BaseTaskWithExecutor(task_data: dict[str, Any], dynamic_task_data: dict[str, Any] | None = None)[source]

Bases: BaseExternalTask[TDE, DTDE], Generic[TDE, DTDE]

Base for tasks with executor capabilities.

Concrete subclasses must implement fetch_input(), configure_for_execution(), run(), check_directory_for_consistency_errors(), and upload_artifacts(), as documented by BaseExternalTask.

DEFAULT_BACKEND = 'unshare'
property backend: str

Return the backend name to use.

cleanup() None[source]

Clean up after running the task.

Some tasks use the executor in upload_artifacts, so we clean up the executor here rather than in run().

dynamic_task_data_type

Class used as the in-memory representation of dynamic task data.

alias of TypeVar(‘DTDE’, bound=BaseDynamicTaskDataWithExecutor)

get_environment(task_database: TaskDatabaseInterface, lookup: int | str, default_category: debusine.artifacts.models.CollectionCategory | None = None, image_category: debusine.tasks.executors.base.ExecutorImageCategory | None = None, set_backend: bool = True) int[source]

Get an environment for an executor-capable task.

This automatically fills in some additional constraints from the task data if needed.

Parameters:
  • task_database – the TaskDatabaseInterface used to perform the lookup

  • lookup – the base lookup provided by the task data

  • default_category – the default category to use for the first segment of the lookup

  • image_category – try to use an environment with this image category; defaults to the image category needed by the executor for self.backend

  • set_backend – if True (default), try to use an environment matching self.backend

Returns:

the ID of a suitable environment artifact

name: ClassVar[str] = 'basetaskwithexecutor'
prepare_to_run(download_directory: Path, execute_directory: Path) None[source]

Copy the download and execution directories into the executor.

run_executor_command(cmd: list[str], log_filename: str, run_as_root: bool = False, check: bool = True) None[source]

Run cmd within the executor, logging the output to log_name.

task_data_type

Class used as the in-memory representation of task data.

alias of TypeVar(‘TDE’, bound=BaseTaskDataWithExecutor)

class debusine.tasks.Blhc(task_data: dict[str, Any], dynamic_task_data: dict[str, Any] | None = None)[source]

Bases: RunCommandTask[BlhcData, BlhcDynamicData], BaseTaskWithExecutor[BlhcData, BlhcDynamicData]

Task to use blhc (build-log hardening check) in debusine.

CAPTURE_OUTPUT_FILENAME: str | None = 'blhc.txt'
TASK_VERSION: int | None = 1

Must be overridden by child classes to document the current version of the task’s code. A task will only be scheduled on a worker if its task version is the same as the one running on the scheduler.

__init__(task_data: dict[str, Any], dynamic_task_data: dict[str, Any] | None = None) None[source]

Initialize object.

compute_dynamic_data(task_database: TaskDatabaseInterface) BlhcDynamicData[source]

Resolve artifact lookups for this task.

configure_for_execution(download_directory: Path) bool[source]

Find .build files in the input artifacts.

Set self._blhc_target to the relevant file.

Parameters:

download_directory – where to search the files

Returns:

True if valid files were found

dynamic_task_data_type

alias of BlhcDynamicData

fetch_input(destination: Path) bool[source]

Download the required artifacts.

get_label() str[source]

Return the task label.

name: ClassVar[str] = 'blhc'
task_data_type

alias of BlhcData

task_succeeded(returncode: int | None, execute_directory: Path) bool[source]

Evaluate task output and return success.

We don’t actually check the output, but use the return code of blhc.

Returns:

True for success, False failure.

upload_artifacts(exec_directory: Path, *, execution_success: bool) None[source]

Upload the BlhcArtifact with the files and relationships.

class debusine.tasks.DebDiff(task_data: dict[str, Any], dynamic_task_data: dict[str, Any] | None = None)[source]

Bases: RunCommandTask[DebDiffData, DebDiffDynamicData], BaseTaskWithExecutor[DebDiffData, DebDiffDynamicData]

Task to use debdiff in debusine.

CAPTURE_OUTPUT_FILENAME: str | None = 'debdiff.txt'
TASK_VERSION: int | None = 1

Must be overridden by child classes to document the current version of the task’s code. A task will only be scheduled on a worker if its task version is the same as the one running on the scheduler.

__init__(task_data: dict[str, Any], dynamic_task_data: dict[str, Any] | None = None) None[source]

Initialize object.

compute_dynamic_data(task_database: TaskDatabaseInterface) DebDiffDynamicData[source]

Resolve artifact lookups for this task.

configure_for_execution(download_directory: Path) bool[source]

Set self._(original|new)_targets to the relevant files.

Parameters:

download_directory – where to search the files

Returns:

True if valid files were found

dynamic_task_data_type

alias of DebDiffDynamicData

fetch_input(destination: Path) bool[source]

Download the required artifacts.

get_label() str[source]

Return the task label.

name: ClassVar[str] = 'debdiff'
task_data_type

alias of DebDiffData

task_succeeded(returncode: int | None, execute_directory: Path) bool[source]

Evaluate task output and return success.

We don’t actually check the output, but use the return code of debdiff.

Returns:

True for success, False failure.

upload_artifacts(exec_directory: Path, *, execution_success: bool) None[source]

Upload the DebDiffArtifact with the files and relationships.

class debusine.tasks.ExtraRepositoryMixin(*args: Any, **kwargs: Any)[source]

Bases: BaseExternalTask[TDER, DTD], Generic[TDER, DTD]

Methods for configuring external APT repositories.

__init__(*args: Any, **kwargs: Any) None[source]

Initialize class variables.

data: TDER
dynamic_task_data_type

Class used as the in-memory representation of dynamic task data.

alias of TypeVar(‘DTD’, bound=BaseDynamicTaskData)

extra_repository_keys: list[pathlib.Path]
extra_repository_sources: list[pathlib.Path]
iter_oneline_sources() Generator[str][source]

Generate apt one-line sources.list entries.

Use this for releases that don’t supports_deb822_sources.

name: ClassVar[str] = 'extrarepositorymixin'
supports_deb822_sources(codename: str) bool[source]

Determine if codename supports deb822 sources.

supports_inline_signed_by(codename: str) bool[source]

Determine if codename supports inline signatures.

task_data_type

Class used as the in-memory representation of task data.

alias of TypeVar(‘TDER’, bound=BaseTaskDataWithExtraRepositories)

write_extra_repository_config(codename: str, destination: Path) None[source]

Write extra_repositories config to files in destination.

extra_repository_keys will be populated with keys to install into /etc/apt/keyrings, if needed.

extra_repository_sources will be populated with deb822 sources files, if supported.

class debusine.tasks.ExtractForSigning(task_data: dict[str, Any], dynamic_task_data: dict[str, Any] | None = None)[source]

Bases: RunCommandTask[ExtractForSigningData, ExtractForSigningDynamicData], BaseTaskWithExecutor[ExtractForSigningData, ExtractForSigningDynamicData]

Task to extract signing input from other artifacts.

TASK_VERSION: int | None = 1

Must be overridden by child classes to document the current version of the task’s code. A task will only be scheduled on a worker if its task version is the same as the one running on the scheduler.

__init__(task_data: dict[str, Any], dynamic_task_data: dict[str, Any] | None = None) None[source]

Initialize the task.

can_run_on(worker_metadata: dict[str, Any]) bool[source]

Check if the specified worker can run the task.

cleanup() None[source]

Clean up after running the task.

compute_dynamic_data(task_database: TaskDatabaseInterface) ExtractForSigningDynamicData[source]

Resolve artifact lookups for this task.

configure_for_execution(download_directory: Path) bool[source]

Configure task: create and start an executor instance.

dynamic_task_data_type

alias of ExtractForSigningDynamicData

fetch_input(destination: Path) bool[source]

Download the required artifacts.

get_label() str[source]

Return the task label.

name: ClassVar[str] = 'extractforsigning'
run(execute_directory: Path) bool[source]

Do the main extraction work.

task_data_type

alias of ExtractForSigningData

upload_artifacts(execute_directory: Path, *, execution_success: bool) None[source]

Upload artifacts for the task.

class debusine.tasks.Lintian(task_data: dict[str, Any], dynamic_task_data: dict[str, Any] | None = None)[source]

Bases: RunCommandTask[LintianData, LintianDynamicData], BaseTaskWithExecutor[LintianData, LintianDynamicData]

Task to use lintian in debusine.

CAPTURE_OUTPUT_FILENAME: str | None = 'lintian.txt'
TASK_VERSION: int | None = 1

Must be overridden by child classes to document the current version of the task’s code. A task will only be scheduled on a worker if its task version is the same as the one running on the scheduler.

__init__(task_data: dict[str, Any], dynamic_task_data: dict[str, Any] | None = None) None[source]

Initialize object.

compute_dynamic_data(task_database: TaskDatabaseInterface) LintianDynamicData[source]

Resolve artifact lookups for this task.

configure_for_execution(download_directory: Path) bool[source]

Find a .dsc, .deb and .udeb files in download_directory.

Set self._lintian_targets to the relevant files.

Parameters:

download_directory – where to search the files

Returns:

True if valid files were found

dynamic_task_data_type

alias of LintianDynamicData

execution_consistency_errors(build_directory: Path) list[str][source]

Return list of errors.

fetch_input(destination: Path) bool[source]

Download the required artifacts.

classmethod generate_severity_count_zero() dict[str, int][source]

Return dictionary. Keys: severities. Value: 0.

get_label() str[source]

Return the task label.

name: ClassVar[str] = 'lintian'
task_data_type

alias of LintianData

task_succeeded(returncode: int | None, execute_directory: Path) bool[source]

Evaluate task output and return success.

For a successful run of lintian: -lintian must have generated the output file -no tags of severity self.data[“fail_on”] or higher

Returns:

True for success, False failure.

upload_artifacts(exec_directory: Path, *, execution_success: bool) None[source]

Upload the LintianArtifact with the files and relationships.

class debusine.tasks.MakeSourcePackageUpload(task_data: dict[str, Any], dynamic_task_data: dict[str, Any] | None = None)[source]

Bases: RunCommandTask[MakeSourcePackageUploadData, MakeSourcePackageUploadDynamicData], BaseTaskWithExecutor[MakeSourcePackageUploadData, MakeSourcePackageUploadDynamicData]

Makes a debian:upload artifact from a debian:source-package artifact.

TASK_VERSION: int | None = 1

Must be overridden by child classes to document the current version of the task’s code. A task will only be scheduled on a worker if its task version is the same as the one running on the scheduler.

__init__(task_data: dict[str, Any], dynamic_task_data: dict[str, Any] | None = None) None[source]

Initialize (constructor).

compute_dynamic_data(task_database: TaskDatabaseInterface) MakeSourcePackageUploadDynamicData[source]

Resolve artifact lookups for this task.

configure_for_execution(download_directory: Path) bool[source]

Find the .dsc file for dpkg-source and dpkg-genchanges.

Set self._dsc_file to the relevant file. Set self._changes_path to the target file. Set self._shell_script to a copy of an integration Bash script.

Parameters:

download_directory – where to find the .dsc file (downloaded via fetch_input)

Returns:

True if valid files were found

dynamic_task_data_type

alias of MakeSourcePackageUploadDynamicData

fetch_input(destination: Path) bool[source]

Populate work directory with user-specified source artifact.

get_label() str[source]

Return the task label.

name: ClassVar[str] = 'makesourcepackageupload'
task_data_type

alias of MakeSourcePackageUploadData

upload_artifacts(execute_directory: Path, *, execution_success: bool) None[source]

Create DebianUpload artifact and relationships.

class debusine.tasks.MergeUploads(task_data: dict[str, Any], dynamic_task_data: dict[str, Any] | None = None)[source]

Bases: RunCommandTask[MergeUploadsData, MergeUploadsDynamicData], BaseTaskWithExecutor[MergeUploadsData, MergeUploadsDynamicData]

Combines multiple debian:upload artifacts into a single one.

This is in preparation for uploading them together.

CAPTURE_OUTPUT_FILENAME: str | None = 'multi.changes'
TASK_VERSION: int | None = 1

Must be overridden by child classes to document the current version of the task’s code. A task will only be scheduled on a worker if its task version is the same as the one running on the scheduler.

__init__(task_data: dict[str, Any], dynamic_task_data: dict[str, Any] | None = None) None[source]

Initialize (constructor).

compute_dynamic_data(task_database: TaskDatabaseInterface) MergeUploadsDynamicData[source]

Resolve artifact lookups for this task.

configure_for_execution(download_directory: Path) bool[source]

Find the .changes files for mergechanges.

Set self._changes_files to the relevant files.

Parameters:

download_directory – where to find the .dsc file (downloaded via fetch_input)

Returns:

True if valid files were found

dynamic_task_data_type

alias of MergeUploadsDynamicData

fetch_input(destination: Path) bool[source]

Populate work directory with user-specified binary artifact(s).

get_label() str[source]

Return the task label.

name: ClassVar[str] = 'mergeuploads'
task_data_type

alias of MergeUploadsData

upload_artifacts(execute_directory: Path, *, execution_success: bool) None[source]

Create DebianUpload artifact and relationships.

class debusine.tasks.MmDebstrap(task_data: dict[str, Any], dynamic_task_data: dict[str, Any] | None = None)[source]

Bases: SystemBootstrap[MmDebstrapData]

Implement MmDebstrap: extends the ontology SystemBootstrap.

__init__(task_data: dict[str, Any], dynamic_task_data: dict[str, Any] | None = None) None[source]

Initialize MmDebstrap.

classmethod analyze_worker() dict[str, Any][source]

Report metadata for this task on this worker.

can_run_on(worker_metadata: dict[str, Any]) bool[source]

Check if the specified worker can run the task.

dynamic_task_data_type

alias of BaseDynamicTaskData

fetch_input(destination: Path) bool[source]

Do nothing: no artifacts need to be downloaded.

host_architecture() str[source]

Return architecture.

name: ClassVar[str] = 'mmdebstrap'
task_data_type

alias of MmDebstrapData

upload_artifacts(execute_dir: Path, *, execution_success: bool) None[source]

Upload generated artifacts.

class debusine.tasks.Noop(task_data: dict[str, Any], dynamic_task_data: dict[str, Any] | None = None)[source]

Bases: BaseExternalTask[NoopData, BaseDynamicTaskData]

Task that returns a boolean (execute() depending on the result field).

Used for integration testing.

TASK_VERSION: int | None = 1

Must be overridden by child classes to document the current version of the task’s code. A task will only be scheduled on a worker if its task version is the same as the one running on the scheduler.

dynamic_task_data_type

alias of BaseDynamicTaskData

get_label() str[source]

Return the task label.

name: ClassVar[str] = 'noop'
run(execute_directory: Path) bool[source]

Return self.data.result (was sent by the client).

task_data_type

alias of NoopData

class debusine.tasks.Piuparts(task_data: dict[str, Any], dynamic_task_data: dict[str, Any] | None = None)[source]

Bases: ExtraRepositoryMixin[PiupartsData, PiupartsDynamicData], RunCommandTask[PiupartsData, PiupartsDynamicData], BaseTaskWithExecutor[PiupartsData, PiupartsDynamicData]

Test Debian binary packages using piuparts.

TASK_VERSION: int | None = 1

Must be overridden by child classes to document the current version of the task’s code. A task will only be scheduled on a worker if its task version is the same as the one running on the scheduler.

__init__(task_data: dict[str, Any], dynamic_task_data: dict[str, Any] | None = None) None[source]

Initialize (constructor).

compute_dynamic_data(task_database: TaskDatabaseInterface) PiupartsDynamicData[source]

Resolve artifact lookups for this task.

configure_for_execution(download_directory: Path) bool[source]

Find the .deb files for piuparts.

Prepare executor, install “piuparts” in it and prepare debian:system-image (e.g. delete /dev/* files).

Set self._deb_files to the relevant files.

Parameters:

download_directory – where to find the *.deb files (downloaded via fetch_input) and where to download the chroot of debian:system-image (for piuparts –basetgz).

Returns:

True if valid files were found

dynamic_task_data_type

alias of PiupartsDynamicData

fetch_input(destination: Path) bool[source]

Populate work directory with user-specified binary artifact(s).

get_label() str[source]

Return the task label.

name: ClassVar[str] = 'piuparts'
task_data_type

alias of PiupartsData

upload_artifacts(execute_directory: Path, *, execution_success: bool) None[source]

cmd-output.log is enough for now, upload nothing.

class debusine.tasks.RunCommandTask(task_data: dict[str, Any], dynamic_task_data: dict[str, Any] | None = None)[source]

Bases: BaseExternalTask[TD, DTD], Generic[TD, DTD]

A BaseTask that can execute commands and upload artifacts.

Concrete subclasses must implement:

  • _cmdline(self) -> list[str]

  • task_succeeded(self, returncode: Optional[int], execute_directory: Path) -> bool (defaults to True)

They must also implement configure_for_execution(), fetch_input(), check_directory_for_consistency_errors(), and upload_artifacts(), as documented by BaseTaskWithExecutor. (They do not need to implement run(), but may do so if they need to run multiple commands rather than just one.)

Use self.append_to_log_file() / self.open_debug_log_file() to provide information for the user (it will be available to the user as an artifact).

Command execution uses process groups to make sure that the command and possible spawned commands are finished, and cancels the execution of the command if BaseTask.aborted() is True.

Optionally: _cmdline_as_root() and _cmd_env() may be implemented, to customize behaviour.

See the main entry point BaseTask._execute() for details of the flow.

CAPTURE_OUTPUT_FILENAME: str | None = None
CMD_LOG_FILENAME = 'cmd-output.log'
CMD_LOG_SEPARATOR = '--------------------'
dynamic_task_data_type

Class used as the in-memory representation of dynamic task data.

alias of TypeVar(‘DTD’, bound=BaseDynamicTaskData)

name: ClassVar[str] = 'runcommandtask'
run(execute_directory: Path) bool[source]

Run a single command via the executor.

Note

If the member variable CAPTURE_OUTPUT_FILENAME is set: create a file with its name with the stdout of the command. Otherwise, the stdout of the command is saved in self.CMD_LOG_FILENAME).

run_cmd(cmd: list[str], working_directory: Path, *, env: dict[str, str] | None = None, run_as_root: bool = False, capture_stdout_filename: str | None = None) int | None[source]

Run cmd in working_directory. Create self.CMD_OUTPUT_FILE log file.

If BaseTask.aborted == True terminates the process.

Parameters:
  • cmd – command to execute with its arguments.

  • working_directory – working directory where the command is executed.

  • run_as_root – if True, run the command as root. Otherwise, the command runs as the worker’s user

  • capture_stdout_filename – for some commands the output of the command is the output of stdout (e.g. lintian) and not a set of files generated by the command (e.g. sbuild). If capture_stdout is not None, save the stdout into this file. The caller can then use it.

Returns:

returncode of the process or None if aborted

task_data_type

Class used as the in-memory representation of task data.

alias of TypeVar(‘TD’, bound=BaseTaskData)

task_succeeded(returncode: int | None, execute_directory: Path) bool[source]

Sub-tasks can evaluate if the task was a success or failure.

By default, return True (success). Sub-classes can re-implement it.

Parameters:
  • returncode – return code of the command, or None if aborted

  • execute_directory – directory with the output of the task

Returns:

True (if success) or False (if failure).

class debusine.tasks.Sbuild(task_data: dict[str, Any], dynamic_task_data: dict[str, Any] | None = None)[source]

Bases: SbuildValidatorMixin, ExtraRepositoryMixin[SbuildData, SbuildDynamicData], RunCommandTask[SbuildData, SbuildDynamicData], BaseTaskWithExecutor[SbuildData, SbuildDynamicData]

Task implementing a Debian package build with sbuild.

TASK_VERSION: int | None = 1

Must be overridden by child classes to document the current version of the task’s code. A task will only be scheduled on a worker if its task version is the same as the one running on the scheduler.

__init__(task_data: dict[str, Any], dynamic_task_data: dict[str, Any] | None = None) None[source]

Initialize the sbuild task.

classmethod analyze_worker() dict[str, Any][source]

Report metadata for this task on this worker.

can_run_on(worker_metadata: dict[str, Any]) bool[source]

Check the specified worker can run the requested task.

compute_dynamic_data(task_database: TaskDatabaseInterface) SbuildDynamicData[source]

Resolve artifact lookups for this task.

configure_for_execution(download_directory: Path) bool[source]

Configure Task: set variables needed for the build() step.

Return True if configuration worked, False, if there was a problem.

dynamic_task_data_type

alias of SbuildDynamicData

fetch_input(destination: Path) bool[source]

Download the source artifact.

get_label() str[source]

Return the task label.

get_source_artifacts_ids() list[int][source]

Return the list of source artifact IDs used by this task.

This refers to the artifacts actually used by the task. If dynamic_data is empty, this returns the empty list.

name: ClassVar[str] = 'sbuild'
task_data_type

alias of SbuildData

task_succeeded(returncode: int | None, execute_directory: Path) bool[source]

Check whether the task succeeded.

sbuild returns 0 for “Status: skipped” (i.e. host architecture not supported by source package); we consider that a failure.

upload_artifacts(directory: Path, *, execution_success: bool) None[source]

Upload the artifacts from directory.

Parameters:
  • directory – directory containing the files that will be uploaded.

  • execution_success – if False skip uploading .changes and .deb/.udeb

class debusine.tasks.SimpleSystemImageBuild(task_data: dict[str, Any], dynamic_task_data: dict[str, Any] | None = None)[source]

Bases: SystemBootstrap[SystemImageBuildData]

Implement SimpleSystemImageBuild using debefivm-create.

__init__(task_data: dict[str, Any], dynamic_task_data: dict[str, Any] | None = None) None[source]

Initialize SimpleSystemImageBuild.

classmethod analyze_worker() dict[str, Any][source]

Report metadata for this task on this worker.

can_run_on(worker_metadata: dict[str, Any]) bool[source]

Check if the specified worker can run the task.

dynamic_task_data_type

alias of BaseDynamicTaskData

fetch_input(destination: Path) bool[source]

Do nothing: no artifacts need to be downloaded.

get_label() str[source]

Return the task label.

host_architecture() str[source]

Return architecture.

name: ClassVar[str] = 'simplesystemimagebuild'
task_data_type

alias of SystemImageBuildData

upload_artifacts(execute_dir: Path, *, execution_success: bool) None[source]

Upload generated artifacts.

exception debusine.tasks.TaskConfigError(message: str | None, original_exception: Exception | None = None)[source]

Bases: Exception

Exception raised when there is an issue with a task configuration.

__init__(message: str | None, original_exception: Exception | None = None)[source]

Initialize the TaskConfigError.

Parameters:
  • message – human-readable message describing the error.

  • original_exception – the exception that triggered this error, if applicable. This is used to provide additional information.

debusine.tasks.get_environment(task_database: TaskDatabaseInterface, lookup: int | str, architecture: str | None, backend: str | None, default_category: debusine.artifacts.models.CollectionCategory | None = None, image_category: debusine.tasks.executors.base.ExecutorImageCategory | None = None) int[source]

Get an environment.

This automatically fills in some additional constraints if needed.

Parameters:
  • task_database – the TaskDatabaseInterface used to perform the lookup

  • lookup – the base lookup provided by the task data

  • architecture – the task’s host architecture, if available

  • backend – the task’s backend, or None if the environment lookup does not need to be constrained to a particular backend

  • default_category – the default category to use for the first segment of the lookup

  • image_category – try to use an environment with this image category; defaults to the image category needed by the executor for self.backend

Returns:

the ID of a suitable environment artifact

Task to build Debian packages with sbuild.

This task implements the PackageBuild generic task for its task_data: https://freexian-team.pages.debian.net/debusine/reference/tasks/ontology-generic-tasks.html#task-packagebuild

class debusine.tasks.sbuild.Sbuild(task_data: dict[str, Any], dynamic_task_data: dict[str, Any] | None = None)[source]

Bases: SbuildValidatorMixin, ExtraRepositoryMixin[SbuildData, SbuildDynamicData], RunCommandTask[SbuildData, SbuildDynamicData], BaseTaskWithExecutor[SbuildData, SbuildDynamicData]

Task implementing a Debian package build with sbuild.

TASK_VERSION: int | None = 1

Must be overridden by child classes to document the current version of the task’s code. A task will only be scheduled on a worker if its task version is the same as the one running on the scheduler.

__init__(task_data: dict[str, Any], dynamic_task_data: dict[str, Any] | None = None) None[source]

Initialize the sbuild task.

classmethod analyze_worker() dict[str, Any][source]

Report metadata for this task on this worker.

can_run_on(worker_metadata: dict[str, Any]) bool[source]

Check the specified worker can run the requested task.

chroots: list[str] | None
compute_dynamic_data(task_database: TaskDatabaseInterface) SbuildDynamicData[source]

Resolve artifact lookups for this task.

configure_for_execution(download_directory: Path) bool[source]

Configure Task: set variables needed for the build() step.

Return True if configuration worked, False, if there was a problem.

data: TDER
debusine: Debusine | None
dynamic_data: DTD | None
dynamic_task_data_type

alias of SbuildDynamicData

executor: ExecutorInterface | None
executor_instance: InstanceInterface | None
extra_repository_keys: list[Path]
extra_repository_sources: list[Path]
fetch_input(destination: Path) bool[source]

Download the source artifact.

get_label() str[source]

Return the task label.

get_source_artifacts_ids() list[int][source]

Return the list of source artifact IDs used by this task.

This refers to the artifacts actually used by the task. If dynamic_data is empty, this returns the empty list.

name: ClassVar[str] = 'sbuild'
task_data_type

alias of SbuildData

task_succeeded(returncode: int | None, execute_directory: Path) bool[source]

Check whether the task succeeded.

sbuild returns 0 for “Status: skipped” (i.e. host architecture not supported by source package); we consider that a failure.

upload_artifacts(directory: Path, *, execution_success: bool) None[source]

Upload the artifacts from directory.

Parameters:
  • directory – directory containing the files that will be uploaded.

  • execution_success – if False skip uploading .changes and .deb/.udeb

work_request_id: int | None
worker_host_architecture: str | None
workspace_name: str | None