# Copyright 2021-2024 The Debusine Developers
# See the AUTHORS file at the top-level directory of this distribution
#
# This file is part of Debusine. It is subject to the license terms
# in the LICENSE file found in the top-level directory of this
# distribution. No part of Debusine, including this file, may be copied,
# modified, propagated, or distributed except according to the terms
# contained in the LICENSE file.
r"""
Collection of tasks.
The debusine.tasks module hierarchy hosts a collection of :class:`BaseTask`
that are used by workers to fulfill
:class:`debusine.db.models.WorkRequest`\ s sent by the debusine scheduler.
Creating a new task requires adding a new file containing a class inheriting
from the :class:`BaseTask` or :class:`RunCommandTask` base class. The name
of the class must be unique among all child classes.
A child class must, at the very least, override the :py:meth:`BaseTask.execute`
method.
"""
import abc
import logging
import os
import shlex
import signal
import subprocess
import tempfile
import traceback
from collections import defaultdict
from collections.abc import Iterator
from contextlib import contextmanager
from pathlib import Path
from typing import (
Any,
AnyStr,
BinaryIO,
ClassVar,
Generic,
IO,
TYPE_CHECKING,
TextIO,
TypeVar,
Union,
overload,
)
if TYPE_CHECKING:
from _typeshed import OpenBinaryModeWriting, OpenTextModeWriting
from debusine.artifacts import WorkRequestDebugLogs
from debusine.client.debusine import Debusine
from debusine.client.models import ArtifactResponse, CollectionItemType
from debusine.tasks.executors import (
ExecutorInterface,
InstanceInterface,
executor,
)
from debusine.tasks.models import (
BackendType,
BaseTaskData,
BaseTaskDataWithExecutor,
CollectionItemLookupMultiple,
CollectionItemLookupSingle,
TaskTypes,
WorkerType,
)
from debusine.utils import extract_generic_type_argument
[docs]class TaskConfigError(Exception):
"""Exception raised when there is an issue with a task configuration."""
[docs] def __init__(
self, message: str, original_exception: Exception | None = None
):
"""
Initialize the TaskConfigError.
:param message: human-readable message describing the error.
:param original_exception: the exception that triggered this error,
if applicable. This is used to provide additional information.
"""
super().__init__(message)
self.original_exception = original_exception
def __str__(self):
"""
Return a string representation.
If an original exception is present, its representation is appended
to the message for additional context.
"""
if self.original_exception:
return (
f"{self.args[0]} (Original exception: "
f"{self.original_exception})"
)
return self.args[0]
TD = TypeVar("TD", bound=BaseTaskData)
[docs]class BaseTask(Generic[TD]):
"""
Base class for tasks.
A BaseTask object serves two purpose: encapsulating the logic of what
needs to be done to execute the task (cf :py:meth:`configure`
and :py:meth:`execute` that are run on a worker), and supporting the
scheduler by determining if a task is suitable for a given worker. That is
done in a two-step process, collating metadata from each worker (with the
:py:meth:`analyze_worker` method that is run on a worker) and then,
based on this metadata, see if a task is suitable (with
:py:meth:`can_run_on` that is executed on the scheduler).
Most concrete task implementations should inherit from
:class:`RunCommandTask` instead.
"""
#: Class used as the in-memory representation of task data.
task_data_type: type[TD]
data: TD
#: Must be overridden by child classes to document the current version of
#: the task's code. A task will only be scheduled on a worker if its task
#: version is the same as the one running on the scheduler.
TASK_VERSION: int | None = None
# If TaskTypes.SERVER, this task must be executed on an internal worker with
# database access; if TaskTypes.WORKER, it must be executed on an external
# worker.
TASK_TYPE: TaskTypes
name: ClassVar[str]
_sub_tasks: dict[TaskTypes, dict[str, type["BaseTask['Any']"]]] = (
defaultdict(dict)
)
def __init_subclass__(cls, **kwargs: Any) -> None:
"""
Register the subclass into BaseTask._sub_tasks.
Used by BaseTask.class_from_name() to return the class given the name.
"""
super().__init_subclass__(**kwargs)
# The name of the task. It is computed by converting the class name
# to lowercase.
cls.name = getattr(cls, "TASK_NAME", cls.__name__.lower())
# The task data type, computed by introspecting the type argument
# used to specialize this generic class.
cls.task_data_type = extract_generic_type_argument(
cls, BaseTask, BaseTaskData
)
if abc.ABC in cls.__bases__:
# Ideally we would check cls.__abstractmethods__: classes
# with abstract methods should not be listed as tasks.
# But, at the point that __init_subclass__ is called the
# methods decorated as @abc.abstractmethod are not yet
# added in cls.__abstractmethods__. For now: if the class
# is a direct descendent of abc.ABC it is not added here
# because is probably an ontology and not a task.
return
registry = cls._sub_tasks[cls.TASK_TYPE]
# The same sub-task could register twice
# (but assert that is the *same* class, not a different
# subtask with a name with a different capitalisation)
if cls.name in registry and registry[cls.name] != cls:
raise AssertionError(f'Two Tasks with the same name: {cls.name!r}')
# Make sure SERVER and WORKER do not have conflicting task names
match cls.TASK_TYPE:
case TaskTypes.SERVER:
if cls.name in cls._sub_tasks[TaskTypes.WORKER]:
raise AssertionError(
f'{cls.name!r} already registered as a Worker task'
)
case TaskTypes.WORKER:
if cls.name in cls._sub_tasks[TaskTypes.SERVER]:
raise AssertionError(
f'{cls.name!r} already registered as a Server task'
)
registry[cls.name] = cls
[docs] def __init__(self, task_data: dict[str, Any]) -> None:
"""Initialize the task."""
#: Validated task data submitted through :py:meth:`configure` without
# BaseTask generic data
self._configure(task_data)
#: A :class:`logging.Logger` instance that can be used in child classes
#: when you override methods to implement the task.
self.logger = logging.getLogger("debusine.tasks")
# Task is aborted: the task does not need to be executed, and can be
# stopped if it is already running
self._aborted = False
self.work_request_id: int | None = None
# Workspace is used when uploading artifacts.
# If it's None: the artifacts are created in the default workspace.
# When the worker instantiates the task it should set
# self.workspace_name (see issue #186).
self.workspace_name: str | None = None
# fetch_input() add the downloaded artifacts. Used by
# `BaseTask._upload_work_request_debug_logs()` and maybe by
# required method `upload_artifacts()`.
self._source_artifacts_ids: list[int] = []
self._debug_log_files_directory: None | (
tempfile.TemporaryDirectory[str]
) = None
[docs] def append_to_log_file(self, filename: str, lines: list[str]) -> None:
"""
Open log file and write contents into it.
:param filename: use self.open_debug_log_file(filename)
:param lines: write contents to the logfile
"""
with self.open_debug_log_file(filename) as file:
file.writelines([line + "\n" for line in lines])
@overload
def open_debug_log_file(
self, filename: str, *, mode: "OpenTextModeWriting" = "a"
) -> TextIO: ...
@overload
def open_debug_log_file(
self, filename: str, *, mode: "OpenBinaryModeWriting"
) -> BinaryIO: ...
[docs] def open_debug_log_file(
self,
filename: str,
*,
mode: Union["OpenTextModeWriting", "OpenBinaryModeWriting"] = "a",
) -> IO[Any]:
"""
Open a temporary file and return it.
The files are always for the same temporary directory, calling it twice
with the same file name will open the same file.
The caller must call .close() when finished writing.
"""
if self._debug_log_files_directory is None:
self._debug_log_files_directory = tempfile.TemporaryDirectory(
prefix="debusine-task-debug-log-files-"
)
debug_file = Path(self._debug_log_files_directory.name) / filename
return debug_file.open(mode)
[docs] @classmethod
def prefix_with_task_name(cls, text: str) -> str:
""":return: the ``text`` prefixed with the task name and a colon."""
if cls.TASK_TYPE is TaskTypes.WORKER:
# Worker tasks are left unprefixed for compatibility
return f"{cls.name}:{text}"
else:
return f"{cls.TASK_TYPE.lower()}:{cls.name}:{text}"
[docs] @classmethod
def analyze_worker(cls) -> dict[str, Any]:
"""
Return dynamic metadata about the current worker.
This method is called on the worker to collect information about the
worker. The information is stored as a set of key-value pairs in a
dictionary.
That information is then reused on the scheduler to be fed to
:py:meth:`can_run_on` and determine if a task is suitable to be
executed on the worker.
Derived objects can extend the behaviour by overriding
the method, calling ``metadata = super().analyze_worker()``,
and then adding supplementary data in the dictionary.
To avoid conflicts on the names of the keys used by different tasks
you should use key names obtained with
``self.prefix_with_task_name(...)``.
:return: a dictionary describing the worker.
:rtype: dict.
"""
version_key_name = cls.prefix_with_task_name("version")
return {
version_key_name: cls.TASK_VERSION,
}
[docs] @classmethod
def analyze_worker_all_tasks(cls):
"""
Return dictionary with metadata for each task in BaseTask._sub_tasks.
Subclasses of BaseTask get registered in BaseTask._sub_tasks. Return
a dictionary with the metadata of each of the subtasks.
This method is executed in the worker when submitting the dynamic
metadata.
"""
metadata = {}
for registry in cls._sub_tasks.values():
for task_class in registry.values():
metadata.update(task_class.analyze_worker())
return metadata
[docs] def can_run_on(self, worker_metadata: dict[str, Any]) -> bool:
"""
Check if the specified worker can run the task.
This method shall take its decision solely based on the supplied
``worker_metadata`` and on the configured task data (``self.data``).
The default implementation always returns True unless
:py:attr:`TASK_TYPE` doesn't match the worker type or there's a
mismatch between the :py:attr:`TASK_VERSION` on the scheduler side
and on the worker side.
Derived objects can implement further checks by overriding the method
in the following way::
if not super().can_run_on(worker_metadata):
return False
if ...:
return False
return True
:param dict worker_metadata: The metadata collected from the worker by
running :py:meth:`analyze_worker` on all the tasks on the worker
under consideration.
:return: the boolean result of the check.
:rtype: bool.
"""
worker_type = worker_metadata.get("system:worker_type")
if (self.TASK_TYPE, worker_type) not in {
(TaskTypes.WORKER, WorkerType.EXTERNAL),
(TaskTypes.SERVER, WorkerType.CELERY),
}:
return False
version_key_name = self.prefix_with_task_name("version")
if worker_metadata.get(version_key_name) != self.TASK_VERSION:
return False
# Some tasks might not have "host_architecture"
task_architecture = getattr(self.data, "host_architecture", None)
if (
task_architecture is not None
and task_architecture
not in worker_metadata.get("system:architectures", [])
):
return False
return True
def _configure(self, task_data: dict[str, Any]):
"""
Configure the task with the supplied ``task_data``.
The supplied data is validated against the pydantic data model for the
type of task being configured. If validation fails, a TaskConfigError
is raised. Otherwise, the `data` attribute is filled with the supplied
`task_data`.
:param dict task_data: The supplied data describing the task.
:raises TaskConfigError: if the dict does not validate.
"""
try:
self.data = self.task_data_type(**task_data)
except ValueError as exc:
raise TaskConfigError(str(exc), original_exception=exc)
[docs] def execute_logging_exceptions(self) -> bool:
"""Execute self.execute() logging any raised exceptions."""
try:
return self.execute()
except Exception as exc:
self.logger.exception("Exception in Task %s", self.name)
raise exc
[docs] def execute(self) -> bool:
"""
Call the _execute() method, upload debug artifacts.
See _execute() for more information.
:return: result of the _execute() method.
""" # noqa: D402
result = self._execute()
self._upload_work_request_debug_logs()
return result
def _execute(self) -> bool:
"""
Execute the requested task.
The task must first have been configured. It is allowed to take
as much time as required. This method will only be run on a worker. It
is thus allowed to access resources local to the worker.
It is recommended to fail early by raising a :py:exc:TaskConfigError if
the parameters of the task let you anticipate that it has no chance of
completing successfully.
:return: True to indicate success, False for a failure.
:rtype: bool.
:raises TaskConfigError: if the parameters of the work request are
incompatible with the worker.
"""
raise NotImplementedError()
[docs] def abort(self):
"""Task does not need to be executed. Once aborted cannot be changed."""
self._aborted = True
@property
def aborted(self) -> bool:
"""
Return if the task is aborted.
Tasks cannot transition from aborted -> not-aborted.
"""
return self._aborted
[docs] @staticmethod
def class_from_name(
task_type: TaskTypes, task_name: str
) -> type["BaseTask['Any']"]:
"""
Return class for :param task_name (case-insensitive).
:param task_type: type of task to look up
__init_subclass__() registers BaseTask subclasses' into
BaseTask._sub_tasks.
"""
if (registry := BaseTask._sub_tasks.get(task_type)) is None:
raise ValueError(f"{task_type!r} is not a registered task type")
task_name_lowercase = task_name.lower()
if (cls := registry.get(task_name_lowercase)) is None:
raise ValueError(
f"{task_name_lowercase!r} is not a registered"
f" {task_type} task_name"
)
return cls
[docs] @staticmethod
def is_valid_task_name(task_type: TaskTypes, task_name: str) -> bool:
"""Return True if task_name is registered (its class is imported)."""
if (registry := BaseTask._sub_tasks.get(task_type)) is None:
return False
return task_name.lower() in registry
[docs] @staticmethod
def task_names(task_type: TaskTypes) -> list[str]:
"""Return list of sub-task names."""
return sorted(BaseTask._sub_tasks[task_type])
[docs] @staticmethod
def is_worker_task(task_name: str) -> bool:
"""Check if task_name is a task that can run on external workers."""
return task_name.lower() in BaseTask._sub_tasks[TaskTypes.WORKER]
[docs] @staticmethod
def worker_task_names() -> list[str]:
"""Return list of sub-task names not of type TaskTypes.SERVER."""
return sorted(BaseTask._sub_tasks[TaskTypes.WORKER].keys())
def _upload_work_request_debug_logs(self):
"""
Create a WorkRequestDebugLogs artifact and upload the logs.
The logs might exist in self._debug_log_files_directory and were
added via self.open_debug_log_file() or self.create_debug_log_file().
For each self._source_artifacts_ids: create a relation from
WorkRequestDebugLogs to source_artifact_id.
"""
raise NotImplementedError()
[docs]class BaseExternalTask(BaseTask[TD], Generic[TD]):
"""A :class:`BaseTask` that runs on an external worker."""
TASK_TYPE = TaskTypes.WORKER
[docs] def __init__(self, task_data: dict[str, Any]) -> None:
"""Initialize the task."""
super().__init__(task_data)
self.debusine: Debusine | None = None
self.executor: ExecutorInterface | None = None
self.executor_instance: InstanceInterface | None = None
def _upload_work_request_debug_logs(self):
"""
Create a WorkRequestDebugLogs artifact and upload the logs.
The logs might exist in self._debug_log_files_directory and were
added via self.open_debug_log_file() or self.create_debug_log_file().
For each self._source_artifacts_ids: create a relation from
WorkRequestDebugLogs to source_artifact_id.
"""
if self._debug_log_files_directory is None:
return
work_request_debug_logs_artifact = WorkRequestDebugLogs.create(
files=Path(self._debug_log_files_directory.name).glob("*")
)
remote_artifact = self.debusine.upload_artifact(
work_request_debug_logs_artifact,
workspace=self.workspace_name,
work_request=self.work_request_id,
)
for source_artifact_id in self._source_artifacts_ids:
self.debusine.relation_create(
remote_artifact.id,
source_artifact_id,
"relates-to",
)
self._debug_log_files_directory.cleanup()
self._debug_log_files_directory = None
TDE = TypeVar("TDE", bound=BaseTaskDataWithExecutor)
[docs]class BaseTaskWithExecutor(BaseExternalTask[TDE], Generic[TDE]):
"""Base for tasks with executor capabilities."""
DEFAULT_BACKEND = BackendType.UNSHARE
def _prepare_executor(self):
"""
Prepare the executor.
* Set self.executor to the new executor with self.backend and
self.data.environment
* Download the image
"""
self.executor = executor(
self.debusine,
self.backend,
self.work_request_id,
self.data.environment,
)
self.executor.download_image()
def _prepare_executor_instance(self):
"""
Create and start an executor instance.
If self.executor is None: call self._prepare_executor()
Set self.executor_instance to the new executor instance, starts
the instance.
"""
if self.executor is None:
self._prepare_executor()
self.executor_instance = self.executor.create()
self.executor_instance.start()
@property
def backend(self) -> str:
"""Return the backend name to use."""
backend = self.data.backend
if backend == "auto":
backend = self.DEFAULT_BACKEND
return backend
[docs]class RunCommandTask(BaseExternalTask[TD], Generic[TD]):
r"""
A :class:`BaseTask` that can execute commands and upload artifacts.
Concrete subclasses must implement:
* ``configure_for_execution(self, download_directory: Path) -> bool``
* ``_cmdline(self) -> list[str]``
* ``upload_artifacts(self, directory: Path, \*, execution_success: bool)``.
The member variable self._source_artifacts_ids is set by
``fetch_input()`` and can be used to create the relations between
uploaded artifacts and downloaded artifacts.
* ``fetch_input(self, destination) -> bool``. Download the needed
artifacts into destination. Suggestion: can use
``fetch_artifact(artifact, dir)`` to download them.
* ``check_directory_for_consistency_errors(self, build_directory: Path)
-> list[str]`` (defaults return an empty list: no-errors)
* ``task_succeeded(self, returncode: Optional[int], execute_directory: Path)
-> bool`` (defaults to True)
Use ``self.append_to_log_file()`` / ``self.open_debug_log_file()`` to
provide information for the user (it will be available to the user as an
artifact).
Command execution uses process groups to make sure that the command and
possible spawned commands are finished, and cancels the execution of the
command if ``BaseTask.aborted()`` is True.
See the main entry point ``_execute()`` for details of the
RunCommandTask flow.
"""
# If CAPTURE_OUTPUT_FILENAME is not None: self._execute() create a file
# in the cwd of the command to save the stdout. The file is available
# in the self.upload_artifacts().
CAPTURE_OUTPUT_FILENAME: str | None = None
CMD_LOG_SEPARATOR = "--------------------"
CMD_LOG_FILENAME = "cmd-output.log"
def _execute(self) -> bool:
"""
Fetch the required input, execute the command and upload artifacts.
Flow is:
- Call ``self.fetch_input():`` download the artifacts
- Call ``self.configure_for_execution()``: to set any member variables
that might be used in ``self._cmdline()``
- Run the command, capturing the stdout (see CAPTURE_OUTPUT_FILENAME)
in the newly created execute_directory
- Call ``check_directory_for_consistency_errors()``. If any errors are
returned save them into consistency.log.
- Call ``self.task_succeeded()``.
- Call ``self.upload_artifacts(exec_dir, execution_success=succeeded)``.
- Return ``execution_succeeded`` (the Worker will set the result as
"Success" or "Failure")
.. note::
If the member variable CAPTURE_OUTPUT_FILENAME is set:
create a file with its name with the stdout of the command. Otherwise,
the stdout of the command is saved in self.CMD_LOG_FILENAME).
"""
with (
self._temporary_directory() as execute_directory,
self._temporary_directory() as download_directory,
):
if not self.fetch_input(download_directory):
return False
try:
if not self.configure_for_execution(download_directory):
return False
except Exception as exc:
self._log_exception(exc, stage="configure_for_execution")
return False
try:
self._push_directories_to_executor(
download_directory, execute_directory
)
cmd = self._cmdline()
self.logger.info("Executing: %s", " ".join(cmd))
returncode = self.run_cmd(
cmd,
execute_directory,
run_as_root=self._cmdline_as_root(),
capture_stdout_filename=self.CAPTURE_OUTPUT_FILENAME,
)
self.logger.info("%s exited with code %s", cmd[0], returncode)
except Exception as exc:
self._log_exception(exc, stage="execution")
return False
try:
if errors := self.check_directory_for_consistency_errors(
execute_directory
):
self.append_to_log_file("consistency.log", sorted(errors))
return False
execution_succeeded = self.task_succeeded(
returncode, execute_directory
)
self.upload_artifacts(
execute_directory, execution_success=execution_succeeded
)
# Some tasks use the executor in upload_artifacts
if (
self.executor_instance
and self.executor_instance.is_started()
):
self.executor_instance.stop()
except Exception as exc:
self._log_exception(exc, stage="post_execution")
return False
return execution_succeeded
def _push_directories_to_executor(
self, download_directory: Path, execute_directory: Path
):
"""Copy the download and execution directories into the executor."""
if self.executor_instance:
self.executor_instance.create_user()
non_root_user = self.executor_instance.non_root_user
self.executor_instance.directory_push(
download_directory,
Path("/tmp"),
user=non_root_user,
group=non_root_user,
)
self.executor_instance.mkdir(
execute_directory,
user=non_root_user,
group=non_root_user,
)
[docs] def task_succeeded(
self, # noqa: U100
returncode: int | None, # noqa: U100
execute_directory: Path, # noqa: U100
) -> bool:
"""
Sub-tasks can evaluate if the task was a success or failure.
By default, return True (success). Sub-classes can re-implement it.
:param returncode: return code of the command, or None if aborted
:param execute_directory: directory with the output of the task
:return: True (if success) or False (if failure).
"""
return returncode == 0
[docs] def check_directory_for_consistency_errors(
self, build_directory: Path # noqa: U100
) -> list[str]:
"""Return list of errors after executing the command."""
return []
def _cmdline(self) -> list[str]:
"""Return the command to execute, as a list of program arguments."""
raise NotImplementedError()
@staticmethod
def _cmdline_as_root() -> bool:
"""Run _cmdline() as root."""
return False
[docs] def upload_artifacts(
self, execute_directory: Path, *, execution_success: bool
):
"""Upload the artifacts for the task."""
raise NotImplementedError()
[docs] def lookup_single_artifact(
self,
lookup: CollectionItemLookupSingle,
default_category: str | None = None,
) -> int:
"""Look up a single artifact using :ref:`lookup-single`."""
if not self.debusine:
raise AssertionError("self.debusine not set")
assert self.work_request_id is not None
return self.debusine.lookup_single(
lookup=lookup,
work_request=self.work_request_id,
expect_type=CollectionItemType.ARTIFACT,
default_category=default_category,
).artifact
[docs] def fetch_artifact(
self,
artifact: CollectionItemLookupSingle,
destination: Path,
default_category: str | None = None,
) -> ArtifactResponse:
"""
Look up artifact and download it to destination.
The artifact is looked up using :ref:`lookup-single`.
Add the artifact ID to self._source_artifacts_ids.
"""
if not self.debusine:
raise AssertionError("self.debusine not set")
artifact_id = self.lookup_single_artifact(
artifact, default_category=default_category
)
artifact_response = self.debusine.download_artifact(
artifact_id, destination, tarball=False
)
self._source_artifacts_ids.append(artifact_id)
return artifact_response
[docs] def lookup_multiple_artifacts(
self,
lookup: CollectionItemLookupMultiple,
default_category: str | None = None,
) -> list[int]:
"""
Look up multiple artifacts.
The artifacts are looked up using :ref:`lookup-multiple`.
"""
if not self.debusine:
raise AssertionError("self.debusine not set")
assert self.work_request_id is not None
if not lookup.__root__:
return []
return [
result.artifact
for result in self.debusine.lookup_multiple(
lookup=lookup,
work_request=self.work_request_id,
expect_type=CollectionItemType.ARTIFACT,
default_category=default_category,
)
]
[docs] def fetch_multiple_artifacts(
self,
artifacts: CollectionItemLookupMultiple,
destination: Path,
default_category: str | None = None,
) -> list[ArtifactResponse]:
"""
Look up multiple artifacts and download them all to destination.
The artifacts are looked up using :ref:`lookup-multiple`.
Add the artifact IDs to self._source_artifacts_ids.
"""
if not self.debusine:
raise AssertionError("self.debusine not set")
artifact_responses: list[ArtifactResponse] = []
for artifact_id in self.lookup_multiple_artifacts(
artifacts, default_category=default_category
):
artifact_responses.append(
self.debusine.download_artifact(
artifact_id, destination, tarball=False
)
)
self._source_artifacts_ids.append(artifact_id)
return artifact_responses
@staticmethod
@contextmanager
def _temporary_directory() -> Iterator[Path]:
with tempfile.TemporaryDirectory(
prefix="debusine-fetch-exec-upload-"
) as directory:
yield Path(directory)
@staticmethod
def _write_utf8(file: BinaryIO, text: str) -> None:
file.write(text.encode("utf-8", errors="replace") + b"\n")
def _write_popen_result(self, file: BinaryIO, p: subprocess.Popen[AnyStr]):
self._write_utf8(file, f"\naborted: {self.aborted}")
self._write_utf8(file, f"returncode: {p.returncode}")
[docs] def run_cmd(
self,
cmd: list[str],
working_directory: Path,
*,
run_as_root: bool = False,
capture_stdout_filename: str | None = None,
) -> int | None:
"""
Run cmd in working_directory. Create self.CMD_OUTPUT_FILE log file.
If BaseTask.aborted == True terminates the process.
:param cmd: command to execute with its arguments.
:param working_directory: working directory where the command
is executed.
:param run_as_root: if True, run the command as root. Otherwise,
the command runs as the worker's user
:param capture_stdout_filename: for some commands the output of the
command is the output of stdout (e.g. lintian) and not a set of files
generated by the command (e.g. sbuild). If capture_stdout is not None,
save the stdout into this file. The caller can then use it.
:return: returncode of the process or None if aborted
"""
with self.open_debug_log_file(
self.CMD_LOG_FILENAME, mode="ab"
) as cmd_log:
self._write_utf8(cmd_log, f"cmd: {shlex.join(cmd)}")
out_file: BinaryIO
if capture_stdout_filename:
self._write_utf8(
cmd_log,
"output (contains stderr only, stdout was captured):",
)
capture_stdout = working_directory / capture_stdout_filename
out_file = capture_stdout.open(mode="wb")
else:
self._write_utf8(
cmd_log, "output (contains stdout and stderr):"
)
out_file = cmd_log
try:
return self._run_cmd(
cmd,
working_directory,
run_as_root=run_as_root,
cmd_log=cmd_log,
out_file=out_file,
)
finally:
file_names = "\n".join(
str(file.relative_to(working_directory))
for file in sorted(working_directory.rglob("*"))
)
self._write_utf8(cmd_log, "\nFiles in working directory:")
self._write_utf8(cmd_log, file_names)
self._write_utf8(cmd_log, self.CMD_LOG_SEPARATOR)
if capture_stdout_filename:
self._write_utf8(cmd_log, capture_stdout.read_text())
self._write_utf8(cmd_log, self.CMD_LOG_SEPARATOR)
out_file.close()
def _run_cmd(
self,
cmd: list[str],
working_directory: Path,
*,
run_as_root: bool,
cmd_log: BinaryIO,
out_file: BinaryIO,
) -> int | None:
"""
Execute cmd.
:param run_as_root: if using an executor: run command as root or user.
If not using an executor and run_as_root is True: raise ValueError().
:param cmd_log: save the command log (parameters, stderr, return code,
stdout)
:param out_file: save the command stdout (might be the same as
cmd_log or a different file)
:return: returncode of the process or None if aborted
"""
# Need to flush or subprocess.Popen() overwrites part of it
cmd_log.flush()
out_file.flush()
run_kwargs: dict[str, Any] = {
"cwd": working_directory,
"stderr": cmd_log.fileno(),
"stdout": out_file.fileno(),
}
if self.executor_instance:
return self.executor_instance.run(
cmd, run_as_root=run_as_root, **run_kwargs
).returncode
if run_as_root:
raise ValueError("run_as_root requires an executor")
p = subprocess.Popen(cmd, start_new_session=True, **run_kwargs)
process_group = os.getpgid(p.pid)
while True:
if self.aborted:
break
try:
self._wait_popen(p, timeout=1)
break
except subprocess.TimeoutExpired:
pass
if self.aborted:
self.logger.debug("Task (cmd: %s PID %s) aborted", cmd, p.pid)
try:
if not self._send_signal_pid(p.pid, signal.SIGTERM):
# _send_signal_pid failed probably because cmd finished
# after aborting and before sending the signal
#
# p.poll() to read the returncode and avoid leaving cmd
# as zombie
p.poll()
# Kill possible processes launched by cmd
self._send_signal_group(process_group, signal.SIGKILL)
self.logger.debug("Could not send SIGTERM to %s", p.pid)
self._write_popen_result(cmd_log, p)
return None
# _wait_popen with a timeout=5 to leave 5 seconds of grace
# for the cmd to finish after sending SIGTERM
self._wait_popen(p, timeout=5)
except subprocess.TimeoutExpired:
# SIGTERM was sent and 5 seconds later cmd
# was still running. A SIGKILL to the process group will
# be sent
self.logger.debug(
"Task PID %s not finished after SIGTERM", p.pid
)
pass
# debusine sends a SIGKILL if:
# - SIGTERM was sent to cmd AND cmd was running 5 seconds later:
# SIGTERM was not enough so SIGKILL to the group is needed
# - SIGTERM was sent to cmd AND cmd finished: SIGKILL to the
# group to make sure that there are not processes spawned
# by cmd running
# (note that a cmd could launch processes in a new group
# could be left running)
self._send_signal_group(process_group, signal.SIGKILL)
self.logger.debug("Sent SIGKILL to process group %s", process_group)
# p.poll() to set p.returncode and avoid leaving cmd
# as a zombie process.
# But cmd might be left as a zombie process: if cmd was in a
# non-interruptable kernel call p.returncode will be None even
# after p.poll() and it will be left as a zombie process
# (until debusine worker dies and the zombie is adopted by
# init and waited on by init). If this happened there we might be a
# ResourceWarning from Popen.__del__:
# "subprocess %s is still running"
#
# A solution would e to wait (p.waitpid()) that the
# process finished dying. This is implemented in the unit test
# to avoid the warning but not implemented here to not delay
# the possible shut down of debusine worker
p.poll()
self.logger.debug("Returncode for PID %s: %s", p.pid, p.returncode)
return None
else:
# The cmd has finished. The cmd might have spawned
# other processes. debusine will kill any alive processes.
#
# If they existed they should have been finished by cmd:
# run_cmd() should not leave processes behind.
#
# Since the parent died they are adopted by init and on
# killing them they are not zombie.
# (cmd might have spawned new processes in a different process
# group: if this is the case they will be left running)
self._send_signal_group(process_group, signal.SIGKILL)
self._write_popen_result(cmd_log, p)
return p.returncode
def _wait_popen(self, popen: subprocess.Popen[AnyStr], timeout: float):
return popen.wait(timeout)
@staticmethod
def _send_signal_pid(pid, signal) -> bool:
try:
os.kill(pid, signal)
except ProcessLookupError:
return False
return True
@staticmethod
def _send_signal_group(process_group, signal):
"""Send signal to the process group."""
try:
os.killpg(process_group, signal)
except ProcessLookupError:
pass
def _log_exception(self, exc: Exception, stage: str):
"""Log an unexpected exception into the task log for stage."""
exc_type = type(exc).__name__
exc_traceback = traceback.format_exc()
log_message = [
f"Exception type: {exc_type}",
f"Message: {exc}",
"",
exc_traceback,
]
self.append_to_log_file(f"{stage}.log", log_message)