Source code for debusine.tasks.sbuild

# Copyright 2021 The Debusine Developers
# See the AUTHORS file at the top-level directory of this distribution
#
# This file is part of Debusine. It is subject to the license terms
# in the LICENSE file found in the top-level directory of this
# distribution. No part of Debusine, including this file, may be copied,
# modified, propagated, or distributed except according to the terms
# contained in the LICENSE file.

"""
Task to build Debian packages with sbuild.

This task implements the PackageBuild generic task for its task_data:
https://freexian-team.pages.debian.net/debusine/reference/tasks.html#task-packagebuild
"""

import email.utils
import subprocess
from pathlib import Path
from typing import Any, cast

try:
    from pydantic.v1 import ValidationError
except ImportError:
    from pydantic import ValidationError  # type: ignore

import debian.deb822 as deb822

import yaml

import debusine.utils
from debusine.artifacts import (
    BinaryPackage,
    BinaryPackages,
    PackageBuildLog,
    Upload,
)
from debusine.artifacts.models import (
    ArtifactCategory,
    CollectionCategory,
    DoseDistCheck,
)
from debusine.client.models import RemoteArtifact
from debusine.tasks import (
    BaseTaskWithExecutor,
    RunCommandTask,
    TaskConfigError,
)
from debusine.tasks.models import SbuildData, SbuildDynamicData
from debusine.tasks.sbuild_validator_mixin import SbuildValidatorMixin
from debusine.tasks.server import TaskDatabaseInterface
from debusine.utils import read_dsc


[docs]class Sbuild( SbuildValidatorMixin, RunCommandTask[SbuildData, SbuildDynamicData], BaseTaskWithExecutor[SbuildData, SbuildDynamicData], ): """Task implementing a Debian package build with sbuild.""" TASK_VERSION = 1
[docs] def __init__( self, task_data: dict[str, Any], dynamic_task_data: dict[str, Any] | None = None, ) -> None: """Initialize the sbuild task.""" super().__init__(task_data, dynamic_task_data) self.chroots = None self.builder = "sbuild" # dsc_file Path. Set by self.configure_for_execution() self._dsc_file: Path | None = None self._extra_packages: list[Path] = []
[docs] def compute_dynamic_data( self, task_database: TaskDatabaseInterface ) -> SbuildDynamicData: """Resolve artifact lookups for this task.""" environment = self.get_environment_lookup() return SbuildDynamicData( environment_id=( None if environment is None else task_database.lookup_single_artifact( environment, default_category=CollectionCategory.ENVIRONMENTS, ) ), input_source_artifact_id=task_database.lookup_single_artifact( self.data.input.source_artifact ), input_extra_binary_artifacts_ids=( task_database.lookup_multiple_artifacts( self.data.input.extra_binary_artifacts ) ), )
[docs] def get_source_artifacts_ids(self) -> list[int]: """ Return the list of source artifact IDs used by this task. This refers to the artifacts actually used by the task. If dynamic_data is empty, this returns the empty list. """ if not self.dynamic_data: return [] result: list[int] = [] if val := self.dynamic_data.environment_id: result.append(val) result.append(self.dynamic_data.input_source_artifact_id) result.extend(self.dynamic_data.input_extra_binary_artifacts_ids) return result
@property def chroot_name(self) -> str: """Build name of required chroot.""" return "{}-{}".format( self.data.distribution, self.data.host_architecture, ) @staticmethod def _call_dpkg_architecture(): # pragma: no cover return ( subprocess.check_output(["dpkg", "--print-architecture"]) .decode("utf-8") .strip() )
[docs] @classmethod def analyze_worker(cls): """Report metadata for this task on this worker.""" metadata = super().analyze_worker() available_key = cls.prefix_with_task_name("available") metadata[available_key] = debusine.utils.is_command_available("sbuild") if debusine.utils.is_command_available("schroot"): chroots_key = cls.prefix_with_task_name("chroots") metadata[chroots_key] = cls._list_chroots() host_arch_key = cls.prefix_with_task_name("host_architecture") metadata[host_arch_key] = cls._call_dpkg_architecture() return metadata
[docs] def can_run_on(self, worker_metadata: dict[str, Any]) -> bool: """Check the specified worker can run the requested task.""" if not super().can_run_on(worker_metadata): return False available_key = self.prefix_with_task_name("available") if not worker_metadata.get(available_key, False): return False if self.backend == "schroot": chroot_key = self.prefix_with_task_name("chroots") if self.chroot_name not in worker_metadata.get(chroot_key, []): return False else: executor_available_key = f"executor:{self.backend}:available" if not worker_metadata.get(executor_available_key, False): return False if self.backend != "unshare": if not worker_metadata.get("autopkgtest:available", False): return False return True
@staticmethod def _call_schroot_list(): # pragma: no cover return ( subprocess.check_output(["schroot", "--list"]) .decode("utf-8") .strip() ) @classmethod def _list_chroots(cls) -> list[str]: """ Provide support for finding available chroots. Ensure that aliases are supported as the DSC may explicitly refer to <codename>-security (or -backports) etc. Return the list of detected chroots. """ chroots = [] output = cls._call_schroot_list() for line in output.split("\n"): if line.startswith("chroot:") and line.endswith("-sbuild"): chroots.append(line[7:-7]) return chroots def _update_chroots_list(self): """ Populate the self.chroots list, if the list is empty. No return value, this is a find, not a get. """ if self.chroots is not None: return self.chroots = self._list_chroots() def _verify_schroot(self): """Verify a suitable schroot exists.""" self._update_chroots_list() if not self.chroots: self.logger.error("No sbuild chroots found") return False if self.chroot_name in self.chroots: return True self.logger.error("No suitable chroot found for %s", self.chroot_name) return False
[docs] def fetch_input(self, destination: Path) -> bool: """Download the source artifact.""" if not self.debusine: raise AssertionError("self.debusine not set") assert self.dynamic_data source_artifact_id = self.dynamic_data.input_source_artifact_id artifact = self.debusine.artifact_get(source_artifact_id) if artifact.category != ArtifactCategory.SOURCE_PACKAGE: self.append_to_log_file( "fetch_input.log", [ f"input.source_artifact points to a " f"{artifact.category}, not the expected " f"{ArtifactCategory.SOURCE_PACKAGE}." ], ) return False for file in artifact.files: if file.endswith(".dsc"): self._dsc_file = destination / file break else: raise AssertionError("No .dsc file found in source package.") self.fetch_artifact(source_artifact_id, destination) for artifact_id in self.dynamic_data.input_extra_binary_artifacts_ids: artifact = self.debusine.artifact_get(artifact_id) if artifact.category in ( ArtifactCategory.BINARY_PACKAGE, ArtifactCategory.BINARY_PACKAGES, ): for file in artifact.files: self._extra_packages.append(destination / file) else: self.append_to_log_file( "fetch_input.log", [ f"input.extra_binary_artifacts includes artifact " f"{artifact_id}, a {artifact.category}, not the " f"expected {ArtifactCategory.BINARY_PACKAGE} or " f"{ArtifactCategory.BINARY_PACKAGES}." ], ) return False self.fetch_artifact(artifact_id, destination) return True
def _cmdline(self) -> list[str]: """ Build the sbuild command line. Use self.data and self._dsc_file. """ cmd = [ self.builder, "--no-clean", "--purge-deps=never", ] if "any" in self.data.build_components: cmd.append("--arch-any") else: cmd.append("--no-arch-any") if "all" in self.data.build_components: cmd.append("--arch-all") else: cmd.append("--no-arch-all") if "source" in self.data.build_components: cmd.append("--source") else: cmd.append("--no-source") cmd.append("--arch=" + self.data.host_architecture) if self.backend == "schroot": # Using cast because SbuildTaskData validators enforce that # distribution is not None if backend is schroot cmd.append("--dist=" + cast(str, self.data.distribution)) else: # set in configure_for_execution if self.executor is None: raise AssertionError("self.executor not set") distribution = self.executor.system_image.data["codename"] cmd.append(f"--dist={distribution}") if self.backend == "unshare": cmd += [ "--chroot-mode=unshare", f"--chroot={self.executor.image_name()}", # Remove any dangling resolv.conf symlink (from # systemd-resolved installed in the environment, #1071736) "--chroot-setup-commands=rm -f /etc/resolv.conf", ] else: virt_server = self.executor.autopkgtest_virt_server() cmd += [ "--chroot-mode=autopkgtest", f"--autopkgtest-virt-server={virt_server}", ] + [ f"--autopkgtest-virt-server-opt={opt.replace('%', '%%')}" for opt in self.executor.autopkgtest_virt_args() ] # binNMU if self.data.binnmu is not None: cmd.append(f"--make-binNMU={self.data.binnmu.changelog}") cmd.append(f"--append-to-version={self.data.binnmu.suffix}") cmd.append("--binNMU=0") if self.data.binnmu.timestamp is not None: rfc5322date = email.utils.format_datetime( self.data.binnmu.timestamp ) cmd.append(f"--binNMU-timestamp={rfc5322date}") if self.data.binnmu.maintainer is not None: cmd.append(f"--maintainer={self.data.binnmu.maintainer}") # BD-Uninstallable cmd.append("--bd-uninstallable-explainer=dose3") for package in self._extra_packages: cmd.append(f"--extra-package={package}") cmd.append(str(self._dsc_file)) return cmd
[docs] def execute(self) -> bool: """ Verify task can be executed and super().execute(). :raises: TaskConfigError. """ # noqa: D402 if self.backend == "schroot" and not self._verify_schroot(): raise TaskConfigError( "No suitable schroot for" f" {self.data.distribution}-{self.data.host_architecture}" ) return super().execute()
@staticmethod def _extract_dose3_explanation( build_log_path: Path, ) -> DoseDistCheck | None: """Isolate and parse dose3 output in BD-Uninstallable scenario.""" output = '' with open(build_log_path) as f: # find start of dose-debcheck output while (line := f.readline()) != '': if line == '(I)Dose_applications: Solving...\n': break # exit if no/invalid output if not (line := f.readline()).startswith('output-version'): return None output += line # grab until next section while (line := f.readline()) != '': if line.startswith('+----'): break output += line # convert & validate to Pydantic structure # Use yaml.CBaseLoad to work-around malformed yaml # https://bugs.debian.org/cgi-bin/bugreport.cgi?bug=834059#22 # https://gitlab.com/irill/dose3/-/issues/18 parsed_dict = yaml.load(output, yaml.CBaseLoader) try: return DoseDistCheck.parse_obj(parsed_dict) except ValidationError: return None def _upload_package_build_log( self, build_directory: Path, source: str, version: str ) -> RemoteArtifact | None: if not self.debusine: raise AssertionError("self.debusine not set") build_log_path = debusine.utils.find_file_suffixes( build_directory, [".build"] ) if build_log_path is None: return None # BD-Uninstallable: look for the dose3 output explanation = self._extract_dose3_explanation(build_log_path) package_build_log = PackageBuildLog.create( source=source, version=version, file=build_log_path, bd_uninstallable=explanation, ) return self.debusine.upload_artifact( package_build_log, workspace=self.workspace_name, work_request=self.work_request_id, ) def _upload_binary_upload( self, build_directory: Path ) -> RemoteArtifact | None: if not self.debusine: raise AssertionError("self.debusine not set") changes_path = debusine.utils.find_file_suffixes( build_directory, [".changes"] ) if changes_path is None: return None artifact_binary_upload = Upload.create( changes_file=changes_path, ) return self.debusine.upload_artifact( artifact_binary_upload, workspace=self.workspace_name, work_request=self.work_request_id, ) def _create_binary_package_local_artifacts( self, build_directory: Path, dsc: deb822.Dsc, architecture: str, suffixes: list[str], ) -> list[BinaryPackage | BinaryPackages]: deb_paths = debusine.utils.find_files_suffixes( build_directory, suffixes ) artifacts: list[BinaryPackage | BinaryPackages] = [] for deb_path in deb_paths: artifacts.append(BinaryPackage.create(file=deb_path)) artifacts.append( BinaryPackages.create( srcpkg_name=dsc["source"], srcpkg_version=dsc["version"], version=dsc["version"], architecture=architecture, files=deb_paths, ) ) return artifacts def _upload_binary_packages( self, build_directory: Path, dsc: deb822.Dsc ) -> list[RemoteArtifact]: r"""Upload \*.deb and \*.udeb files.""" if not self.debusine: raise AssertionError("self.debusine not set") host_arch = self.data.host_architecture packages = [] if "any" in self.data.build_components: prefix = "_" + host_arch packages.extend( self._create_binary_package_local_artifacts( build_directory, dsc, host_arch, [prefix + ".deb", prefix + ".udeb"], ) ) if "all" in self.data.build_components: prefix = "_all" packages.extend( self._create_binary_package_local_artifacts( build_directory, dsc, host_arch, [prefix + ".deb", prefix + ".udeb"], ) ) remote_artifacts: list[RemoteArtifact] = [] for package in packages: if package.files: remote_artifacts.append( self.debusine.upload_artifact( package, workspace=self.workspace_name, work_request=self.work_request_id, ) ) return remote_artifacts def _create_remote_binary_packages_relations( self, remote_build_log: RemoteArtifact | None, remote_binary_upload: RemoteArtifact | None, remote_binary_packages: list[RemoteArtifact], ): if not self.debusine: raise AssertionError("self.debusine not set") for remote_binary_package in remote_binary_packages: for source_artifact_id in self._source_artifacts_ids: self.debusine.relation_create( remote_binary_package.id, source_artifact_id, "built-using", ) if remote_build_log is not None: self.debusine.relation_create( remote_build_log.id, remote_binary_package.id, "relates-to" ) if remote_binary_upload is not None: self.debusine.relation_create( remote_binary_upload.id, remote_binary_package.id, "extends", ) self.debusine.relation_create( remote_binary_upload.id, remote_binary_package.id, "relates-to", )
[docs] def configure_for_execution( self, download_directory: Path # noqa: U100 ) -> bool: """ Configure Task: set variables needed for the build() step. Return True if configuration worked, False, if there was a problem. """ if self._dsc_file is None or not self._dsc_file.exists(): self.append_to_log_file( "configure_for_execution.log", ["Input source package not found."], ) return False if self.backend != "schroot": self._prepare_executor() return True
[docs] def upload_artifacts(self, directory: Path, *, execution_success: bool): """ Upload the artifacts from directory. :param directory: directory containing the files that will be uploaded. :param execution_success: if False skip uploading .changes and *.deb/*.udeb """ if not self.debusine: raise AssertionError("self.debusine not set") dsc = read_dsc(self._dsc_file) if dsc is not None: # Upload the .build file (PackageBuildLog) remote_build_log = self._upload_package_build_log( directory, dsc["source"], dsc["version"] ) if remote_build_log is not None: for source_artifact_id in self._source_artifacts_ids: self.debusine.relation_create( remote_build_log.id, source_artifact_id, "relates-to", ) if execution_success: # Upload the *.deb/*.udeb files (BinaryPackages) remote_binary_packages = self._upload_binary_packages( directory, dsc ) # Upload the .changes and the rest of the files remote_binary_changes = self._upload_binary_upload(directory) # Create the relations self._create_remote_binary_packages_relations( remote_build_log, remote_binary_changes, remote_binary_packages, )
[docs] def get_label(self) -> str: """Return the task label.""" # TODO: copy the source package information in dynamic task data and # use them here if available return "build a source package"