Tasks (internal API)
Collection of tasks.
The debusine.tasks module hierarchy hosts a collection of BaseTask
that are used by workers to fulfill
debusine.db.models.WorkRequest
s sent by the debusine scheduler.
Creating a new task requires adding a new file containing a class inheriting
from the BaseTask
or RunCommandTask
base class. The name
of the class must be unique among all child classes.
A child class must, at the very least, override the BaseTask.execute()
method.
- class debusine.tasks.BaseExternalTask(task_data: dict[str, Any])[source]
Bases:
BaseTask
[TD
],Generic
[TD
]A
BaseTask
that runs on an external worker.- TASK_TYPE: TaskTypes = 'Worker'
- task_data_type
Class used as the in-memory representation of task data.
alias of TypeVar(‘TD’, bound=
BaseTaskData
)
- class debusine.tasks.BaseTask(task_data: dict[str, Any])[source]
Bases:
Generic
[TD
]Base class for tasks.
A BaseTask object serves two purpose: encapsulating the logic of what needs to be done to execute the task (cf
configure()
andexecute()
that are run on a worker), and supporting the scheduler by determining if a task is suitable for a given worker. That is done in a two-step process, collating metadata from each worker (with theanalyze_worker()
method that is run on a worker) and then, based on this metadata, see if a task is suitable (withcan_run_on()
that is executed on the scheduler).Most concrete task implementations should inherit from
RunCommandTask
instead.- TASK_TYPE: TaskTypes
- TASK_VERSION: int | None = None
Must be overridden by child classes to document the current version of the task’s code. A task will only be scheduled on a worker if its task version is the same as the one running on the scheduler.
- property aborted: bool
Return if the task is aborted.
Tasks cannot transition from aborted -> not-aborted.
- classmethod analyze_worker() dict[str, Any] [source]
Return dynamic metadata about the current worker.
This method is called on the worker to collect information about the worker. The information is stored as a set of key-value pairs in a dictionary.
That information is then reused on the scheduler to be fed to
can_run_on()
and determine if a task is suitable to be executed on the worker.Derived objects can extend the behaviour by overriding the method, calling
metadata = super().analyze_worker()
, and then adding supplementary data in the dictionary.To avoid conflicts on the names of the keys used by different tasks you should use key names obtained with
self.prefix_with_task_name(...)
.- Returns:
a dictionary describing the worker.
- Return type:
dict.
- classmethod analyze_worker_all_tasks()[source]
Return dictionary with metadata for each task in BaseTask._sub_tasks.
Subclasses of BaseTask get registered in BaseTask._sub_tasks. Return a dictionary with the metadata of each of the subtasks.
This method is executed in the worker when submitting the dynamic metadata.
- append_to_log_file(filename: str, lines: list[str]) None [source]
Open log file and write contents into it.
- Parameters:
filename – use self.open_debug_log_file(filename)
lines – write contents to the logfile
- can_run_on(worker_metadata: dict[str, Any]) bool [source]
Check if the specified worker can run the task.
This method shall take its decision solely based on the supplied
worker_metadata
and on the configured task data (self.data
).The default implementation always returns True unless
TASK_TYPE
doesn’t match the worker type or there’s a mismatch between theTASK_VERSION
on the scheduler side and on the worker side.Derived objects can implement further checks by overriding the method in the following way:
if not super().can_run_on(worker_metadata): return False if ...: return False return True
- Parameters:
worker_metadata (dict) – The metadata collected from the worker by running
analyze_worker()
on all the tasks on the worker under consideration.- Returns:
the boolean result of the check.
- Return type:
bool.
- static class_from_name(task_type: TaskTypes, task_name: str) type[debusine.tasks._task.BaseTask[Any]] [source]
Return class for :param task_name (case-insensitive).
- Parameters:
task_type – type of task to look up
__init_subclass__() registers BaseTask subclasses’ into BaseTask._sub_tasks.
- data: TD
- execute() bool [source]
Call the _execute() method, upload debug artifacts.
See _execute() for more information.
- Returns:
result of the _execute() method.
- static is_valid_task_name(task_type: TaskTypes, task_name: str) bool [source]
Return True if task_name is registered (its class is imported).
- static is_worker_task(task_name: str) bool [source]
Check if task_name is a task that can run on external workers.
- logger
A
logging.Logger
instance that can be used in child classes when you override methods to implement the task.
- open_debug_log_file(filename: str, *, mode: OpenTextModeWriting = 'a') TextIO [source]
- open_debug_log_file(filename: str, *, mode: OpenBinaryModeWriting) BinaryIO
Open a temporary file and return it.
The files are always for the same temporary directory, calling it twice with the same file name will open the same file.
The caller must call .close() when finished writing.
- class debusine.tasks.BaseTaskWithExecutor(task_data: dict[str, Any])[source]
Bases:
BaseExternalTask
[TDE
],Generic
[TDE
]Base for tasks with executor capabilities.
- DEFAULT_BACKEND = 'unshare'
- task_data_type
Class used as the in-memory representation of task data.
alias of TypeVar(‘TDE’, bound=
BaseTaskDataWithExecutor
)
- class debusine.tasks.RunCommandTask(task_data: dict[str, Any])[source]
Bases:
BaseExternalTask
[TD
],Generic
[TD
]A
BaseTask
that can execute commands and upload artifacts.Concrete subclasses must implement:
configure_for_execution(self, download_directory: Path) -> bool
_cmdline(self) -> list[str]
upload_artifacts(self, directory: Path, \*, execution_success: bool)
. The member variable self._source_artifacts_ids is set byfetch_input()
and can be used to create the relations between uploaded artifacts and downloaded artifacts.fetch_input(self, destination) -> bool
. Download the needed artifacts into destination. Suggestion: can usefetch_artifact(artifact, dir)
to download them.check_directory_for_consistency_errors(self, build_directory: Path) -> list[str]
(defaults return an empty list: no-errors)task_succeeded(self, returncode: Optional[int], execute_directory: Path) -> bool
(defaults to True)
Use
self.append_to_log_file()
/self.open_debug_log_file()
to provide information for the user (it will be available to the user as an artifact).Command execution uses process groups to make sure that the command and possible spawned commands are finished, and cancels the execution of the command if
BaseTask.aborted()
is True.See the main entry point
_execute()
for details of the RunCommandTask flow.- CMD_LOG_FILENAME = 'cmd-output.log'
- CMD_LOG_SEPARATOR = '--------------------'
- check_directory_for_consistency_errors(build_directory: Path) list[str] [source]
Return list of errors after executing the command.
- configure_for_execution(download_directory: Path) bool [source]
Configure task: set variables needed for the self._cmdline().
Called after the files are downloaded via fetch_input().
- fetch_artifact(artifact: int | str, destination: Path, default_category: str | None = None) ArtifactResponse [source]
Look up artifact and download it to destination.
The artifact is looked up using Single lookup.
Add the artifact ID to self._source_artifacts_ids.
- fetch_input(destination: Path) bool [source]
Download artifacts needed by the task, update self.source_artifacts_ids.
Task might use self.data.input to download the relevant artifacts.
The method self.fetch_artifact(artifact, destination) might be used to download the relevant artifacts and update self.source_artifacts_ids.
- fetch_multiple_artifacts(artifacts: CollectionItemLookupMultiple, destination: Path, default_category: str | None = None) list[debusine.client.models.ArtifactResponse] [source]
Look up multiple artifacts and download them all to destination.
The artifacts are looked up using Multiple lookup.
Add the artifact IDs to self._source_artifacts_ids.
- lookup_multiple_artifacts(lookup: CollectionItemLookupMultiple, default_category: str | None = None) list[int] [source]
Look up multiple artifacts.
The artifacts are looked up using Multiple lookup.
- lookup_single_artifact(lookup: int | str, default_category: str | None = None) int [source]
Look up a single artifact using Single lookup.
- run_cmd(cmd: list[str], working_directory: Path, *, run_as_root: bool = False, capture_stdout_filename: str | None = None) int | None [source]
Run cmd in working_directory. Create self.CMD_OUTPUT_FILE log file.
If BaseTask.aborted == True terminates the process.
- Parameters:
cmd – command to execute with its arguments.
working_directory – working directory where the command is executed.
run_as_root – if True, run the command as root. Otherwise, the command runs as the worker’s user
capture_stdout_filename – for some commands the output of the command is the output of stdout (e.g. lintian) and not a set of files generated by the command (e.g. sbuild). If capture_stdout is not None, save the stdout into this file. The caller can then use it.
- Returns:
returncode of the process or None if aborted
- task_data_type
Class used as the in-memory representation of task data.
alias of TypeVar(‘TD’, bound=
BaseTaskData
)
- task_succeeded(returncode: int | None, execute_directory: Path) bool [source]
Sub-tasks can evaluate if the task was a success or failure.
By default, return True (success). Sub-classes can re-implement it.
- Parameters:
returncode – return code of the command, or None if aborted
execute_directory – directory with the output of the task
- Returns:
True (if success) or False (if failure).
- exception debusine.tasks.TaskConfigError(message: str, original_exception: Exception | None = None)[source]
Bases:
Exception
Exception raised when there is an issue with a task configuration.
- __init__(message: str, original_exception: Exception | None = None)[source]
Initialize the TaskConfigError.
- Parameters:
message – human-readable message describing the error.
original_exception – the exception that triggered this error, if applicable. This is used to provide additional information.
Task to build Debian packages with sbuild.
This task implements the PackageBuild generic task for its task_data: https://freexian-team.pages.debian.net/debusine/reference/tasks.html#task-packagebuild
- class debusine.tasks.sbuild.Sbuild(task_data: dict[str, Any])[source]
Bases:
SbuildValidatorMixin
,RunCommandTask
[SbuildData
],BaseTaskWithExecutor
[SbuildData
]Task implementing a Debian package build with sbuild.
- TASK_VERSION: int | None = 1
Must be overridden by child classes to document the current version of the task’s code. A task will only be scheduled on a worker if its task version is the same as the one running on the scheduler.
- can_run_on(worker_metadata: dict[str, Any]) bool [source]
Check the specified worker can run the requested task.
- configure_for_execution(download_directory: Path) bool [source]
Configure Task: set variables needed for the build() step.
Return True if configuration worked, False, if there was a problem.
- execute() bool [source]
Verify task can be executed and super().execute().
- Raises:
TaskConfigError.
- task_data_type
alias of
SbuildData