Source code for debusine.artifacts.local_artifact

# Copyright 2023 The Debusine Developers
# See the AUTHORS file at the top-level directory of this distribution
#
# This file is part of Debusine. It is subject to the license terms
# in the LICENSE file found in the top-level directory of this
# distribution. No part of Debusine, including this file, may be copied,
# modified, propagated, or distributed except according to the terms
# contained in the LICENSE file.

"""
Local artifact's representation.

See docs/design/ontology.html for the semantics of the Artifacts.
"""
import abc
import json
import re
from collections.abc import Iterable, Sequence
from datetime import datetime
from json import JSONDecodeError
from pathlib import Path
from typing import Any, ClassVar, Generic, Self, TypeVar

from debian import deb822, debfile

try:
    import pydantic.v1 as pydantic
except ImportError:
    import pydantic  # type: ignore

import debusine.artifacts.models as data_models
from debusine import utils
from debusine.artifacts.utils import files_in_meta_file_match_files
from debusine.client.models import (
    ArtifactCreateRequest,
    FileRequest,
    FilesRequestType,
    model_to_json_serializable_dict,
)
from debusine.utils import extract_generic_type_argument

AD = TypeVar("AD", bound=data_models.ArtifactData)


[docs]class LocalArtifact(pydantic.BaseModel, Generic[AD], abc.ABC): """Represent an artifact locally."""
[docs] class Config: """Set up stricter pydantic Config.""" validate_assignment = True extra = pydantic.Extra.forbid
#: Artifact type category: str # Keys are paths in the artifact. Values the paths in the local system files: dict[str, Path] = pydantic.Field(default_factory=dict) #: Artifact data data: AD # TODO: it would be great to not have to redefine data in subclasses, but # it needs pydantic's Generics support, which might require work to work # with extract_generic_type_argument #: Default value for category _category: ClassVar[str] #: Class used as the in-memory representation of artifact data. _data_type: type[AD] = pydantic.PrivateAttr() # data_type is marked as PrivateAttr to make mypy happy. Setting # underscore_attrs_are_private to True does not seem to be enough _local_artifacts_category_to_class: dict[ str, type["LocalArtifact['Any']"] ] = {}
[docs] def __init__(self, **kwargs): """Set category to _category by default.""" kwargs.setdefault("category", self._category) if "data" in kwargs: if isinstance(d := kwargs["data"], dict): kwargs["data"] = self.create_data(d) super().__init__(**kwargs)
def __init_subclass__(cls, **kwargs): """ Register subclass into LocalArtifact._local_artifacts_category_to_class. Allow to list possible valid options (in the client or server). """ super().__init_subclass__(**kwargs) # The task data type, computed by introspecting the type argument # used to specialize this generic class. cls._data_type = extract_generic_type_argument( cls, LocalArtifact, data_models.ArtifactData ) LocalArtifact._local_artifacts_category_to_class[cls._category] = cls
[docs] @classmethod def create_data(cls, data_dict: dict[str, Any]) -> AD: """Instantiate a data model from a dict.""" return cls._data_type(**data_dict)
[docs] @staticmethod def artifact_categories() -> list[str]: """Return list of artifact categories.""" return list(LocalArtifact._local_artifacts_category_to_class.keys())
@pydantic.validator("category") def _validate_category(cls, category: str) -> str: """Validate that the category is known.""" if category not in cls._local_artifacts_category_to_class: raise ValueError( f"Invalid category: '{category}'. Expected one of " f"{', '.join(sorted(cls._local_artifacts_category_to_class))}." ) return category
[docs] @staticmethod def class_from_category(category: str) -> type["LocalArtifact['Any']"]: """Return class sub_local_artifact.""" category = LocalArtifact._validate_category(category) return LocalArtifact._local_artifacts_category_to_class[category]
[docs] def add_local_file( self, file: Path, *, artifact_base_dir: Path | None = None, override_name: str | None = None, ): """ Add a local file in the artifact. :param file: file in the local file system that is added to the artifact :param artifact_base_dir: base directory of the artifact. Must be an absolute path. If it's None: file is added in the root of the artifact. If it's not None: file is added with the relative path of the file with the artifact_base_dir. E.g. file=/tmp/artifact/dir1/file1 artifact_base_dir=/tmp/artifact Path of this file in the artifact: dir1/file1 :param override_name: if not None: use it instead of file.name :raises ValueError: artifact_base_dir is not absolute or is not a directory; file does not exist; the path in the artifact already had a file. """ if artifact_base_dir is not None: if not artifact_base_dir.is_absolute(): raise ValueError(f'"{artifact_base_dir}" must be absolute') if not artifact_base_dir.is_dir(): raise ValueError( f'"{artifact_base_dir}" does not exist or ' f'is not a directory' ) if not file.is_absolute(): file = artifact_base_dir.joinpath(file) path_in_artifact = file.relative_to(artifact_base_dir).as_posix() else: path_in_artifact = file.name if override_name is not None: path_in_artifact = override_name if not file.exists(): raise ValueError(f'"{file}" does not exist') if not file.is_file(): raise ValueError(f'"{file}" is not a file') file_absolute = file.absolute() if path_in_artifact in self.files: raise ValueError( f"File with the same path ({path_in_artifact}) " f"is already in the artifact " f'("{self.files[path_in_artifact]}" and "{file_absolute}")' ) self.files[path_in_artifact] = file_absolute
[docs] def validate_model(self): """Raise ValueError with an error if the model is not valid.""" *_, error = pydantic.validate_model(self.__class__, self.__dict__) if error is not None: raise ValueError(f"Model validation failed: {error}")
[docs] def serialize_for_create_artifact( self, *, workspace: str | None, work_request: int | None = None, expire_at: datetime | None = None, ) -> dict[str, Any]: """Return dictionary to be used by the API to create an artifact.""" files: FilesRequestType = FilesRequestType({}) for artifact_path, local_path in self.files.items(): files[artifact_path] = FileRequest.create_from(local_path) self.validate_model() serialized = model_to_json_serializable_dict( ArtifactCreateRequest( workspace=workspace, category=self.category, files=files, data=( self.data.dict() if isinstance(self.data, data_models.ArtifactData) else self.data ), work_request=work_request, expire_at=expire_at, ) ) # If the workspace was not specified: do not send it to the API. # The server will assign it. if serialized["workspace"] is None: del serialized["workspace"] return serialized
@classmethod def _validate_files_length( cls, files: dict[str, Path], number_of_files: int ) -> dict[str, Path]: # noqa: U100 """Raise ValueError if number of files is not number_of_files.""" if (actual_number_files := len(files)) != number_of_files: raise ValueError( f"Expected number of files: {number_of_files} " f"Actual: {actual_number_files}" ) return files @classmethod def _validate_files_end_in( cls, files: dict[str, Path], suffixes: Sequence[str] ) -> dict[str, Path]: # noqa: U100 """Raise ValueError if any file does not end in one of suffixes.""" for file_name in files.keys(): if not file_name.endswith(tuple(suffixes)): raise ValueError( f'Valid file suffixes: {suffixes}. ' f'Invalid filename: "{file_name}"' ) return files @classmethod def _validate_exactly_one_file_ends_in( cls, files: dict[str, Path], suffix: str ) -> dict[str, Path]: # noqa: U100 """Raise ValueError if files doesn't have exactly 1 file with suffix.""" changes_files = sum(1 for file in files if file.endswith(suffix)) if changes_files != 1: raise ValueError( f"Expecting 1 {suffix} file in {sorted(files.keys())}" ) return files
[docs]class WorkRequestDebugLogs(LocalArtifact[data_models.EmptyArtifactData]): """ WorkRequestDebugLogs: help debugging issues executing the task. Log files for debusine users in order to debug possible problems in their WorkRequests. """ _category = "debusine:work-request-debug-logs" data: data_models.EmptyArtifactData = pydantic.Field( default_factory=data_models.EmptyArtifactData )
[docs] @classmethod def create(cls, *, files: Iterable[Path]) -> Self: """Return a WorkRequestDebugLogs.""" artifact = cls(category=cls._category) for file in files: artifact.add_local_file(file) return artifact
[docs]class PackageBuildLog(LocalArtifact[data_models.DebianPackageBuildLog]): """PackageBuildLog: represents a build log file.""" _category = "debian:package-build-log"
[docs] @classmethod def create( cls, *, file: Path, source: str, version: str, ) -> Self: """Return a PackageBuildLog.""" artifact = cls( category=cls._category, data=data_models.DebianPackageBuildLog( source=source, version=version, filename=file.name ), ) artifact.add_local_file(file) return artifact
[docs] @pydantic.validator("files") def validate_files_length_is_one( cls, files: dict[str, Path] # noqa: U100 ) -> dict[str, Path]: """Validate that artifact has only one file.""" return super()._validate_files_length(files, 1)
[docs] @pydantic.validator("files") def file_must_end_in_build( cls, files: dict[str, Path] # noqa: U100 ) -> dict[str, Path]: """Raise ValueError if the file does not end in .build.""" return super()._validate_files_end_in(files, [".build"])
def deb822dict_to_dict(element): """ Traverse recursively element converting Deb822Dict to dict. deb822.Changes() return a Deb822Dict with some other inner-elements being Deb822Dict. This function traverse it and converts any elements being a Deb822Dict to a Python dict. Reason is to simplify is that json module cannot encode Deb822Dict. To simplify let's convert Deb822Dict as soon as possible to dict. """ if isinstance(element, (dict, deb822.Deb822Dict)): result = {} for key, value in element.items(): result[key] = deb822dict_to_dict(value) elif isinstance(element, list): result = [] for item in element: result.append(deb822dict_to_dict(item)) else: result = element return result
[docs]class Upload(LocalArtifact[data_models.DebianUpload]): """Upload: encapsulate a .changes and files listed in it.""" _category = "debian:upload"
[docs] @classmethod def create( cls, *, changes_file: Path, exclude_files: frozenset[Path] | set[Path] = frozenset(), ) -> Self: """ Return a Upload. Add the changes_files and files listed in it. :param changes_file: a .changes file. Parsed by deb822.Changes. :param exclude_files: do not add them in files even if listed in the Files section in the changes_file. """ with changes_file.open() as changes_obj: data = data_models.DebianUpload( type="dpkg", changes_fields=deb822dict_to_dict(deb822.Changes(changes_obj)), ) artifact = cls(category=cls._category, data=data) artifact.add_local_file(changes_file) # Add any files referenced by .changes (excluding the exclude_files) base_directory = changes_file.parent for file in data.changes_fields.get("Files", []): file = base_directory / file["name"] if file not in exclude_files: artifact.add_local_file(file) return artifact
[docs] @pydantic.validator("files") def files_contain_changes(cls, files: dict[str, Path]): """Raise ValueError when files does not have exactly 1 .changes file.""" return cls._validate_exactly_one_file_ends_in(files, ".changes")
[docs] @pydantic.validator("files") def files_contains_files_in_changes( cls, files: dict[str, Path] # noqa: U100 ): """ Validate that set(files) == set(files_in_changes_file). Exception: The .changes file must be in files but not in the .changes file. """ return files_in_meta_file_match_files( ".changes", deb822.Changes, files, )
[docs]class SourcePackage(LocalArtifact[data_models.DebianSourcePackage]): """SourcePackage: contains source code to be built into BinaryPackages.""" _category = "debian:source-package"
[docs] @classmethod def create(cls, *, name: str, version: str, files: list[Path]) -> Self: """Return a SourcePackage setting files and data.""" dsc_fields = {} for file in files: if file.suffix == ".dsc": dsc_fields = deb822dict_to_dict(utils.read_dsc(file)) data = data_models.DebianSourcePackage( name=name, version=version, type="dpkg", dsc_fields=dsc_fields, ) artifact = cls(category=cls._category, data=data) for file in files: artifact.add_local_file(file) return artifact
[docs] @pydantic.validator("files") def files_contain_one_dsc(cls, files: dict[str, Path]): """Raise ValueError when files does not have exactly 1 .dsc file.""" return cls._validate_exactly_one_file_ends_in(files, ".dsc")
[docs] @pydantic.validator("files") def files_contains_files_in_dsc(cls, files: dict[str, Path]): # noqa: U100 """ Validate that set(files) == set(files_in_dsc_file). Exception: The .dsc file must be in files but not in the .dsc file. """ return files_in_meta_file_match_files(".dsc", deb822.Dsc, files)
[docs]class BinaryPackage(LocalArtifact[data_models.DebianBinaryPackage]): r"""BinaryPackage: encapsulates a single \*.deb / \*.udeb.""" _category = "debian:binary-package"
[docs] @classmethod def create(cls, *, file: Path) -> Self: """Return a BinaryPackage setting file and data.""" pkg = debfile.DebFile(file) try: control = pkg.control.debcontrol() control_files = sorted( name.lstrip("./") for name in pkg.control if name.startswith("./") ) finally: pkg.close() srcpkg_name = control.get("Source", control["Package"]) if (m := re.match(r"^(.*) \((.*)\)$", srcpkg_name)) is not None: srcpkg_name, srcpkg_version = m.groups() else: srcpkg_version = control["Version"] data = data_models.DebianBinaryPackage( srcpkg_name=srcpkg_name, srcpkg_version=srcpkg_version, deb_fields=deb822dict_to_dict(control), deb_control_files=control_files, ) artifact = cls(category=cls._category, data=data) artifact.add_local_file(file) return artifact
[docs] @pydantic.validator("files") def files_must_end_in_deb_or_udeb( cls, files: dict[str, Path] # noqa: U100 ) -> dict[str, Path]: """Raise ValueError if a file does not end in .deb or .udeb.""" return super()._validate_files_end_in(files, [".deb", ".udeb"])
[docs] @pydantic.validator("files") def files_exactly_one( cls, files: dict[str, Path] # noqa: U100 ) -> dict[str, Path]: """Raise ValueError if len(files) != 1.""" if len(files) != 1: raise ValueError("Must have exactly one file") return files
[docs]class BinaryPackages(LocalArtifact[data_models.DebianBinaryPackages]): r"""BinaryPackages: encapsulates a group of \*.deb / \*.udeb.""" _category = "debian:binary-packages"
[docs] @classmethod def create( cls, *, srcpkg_name: str, srcpkg_version: str, version: str, architecture: str, files: list[Path], ) -> Self: """Return a BinaryPackages setting files and data.""" data = data_models.DebianBinaryPackages( srcpkg_name=srcpkg_name, srcpkg_version=srcpkg_version, version=version, architecture=architecture, # It might be better to get this from the metadata in the # package, but that would be rather more effort and isn't # currently vital. packages=[path.name.split("_", 1)[0] for path in files], ) artifact = cls(category=cls._category, data=data) for file in files: artifact.add_local_file(file) return artifact
[docs] @pydantic.validator("files") def files_must_end_in_deb_or_udeb( cls, files: dict[str, Path] # noqa: U100 ) -> dict[str, Path]: """Raise ValueError if a file does not end in .deb or .udeb.""" return super()._validate_files_end_in(files, [".deb", ".udeb"])
[docs] @pydantic.validator("files") def files_more_than_zero( cls, files: dict[str, Path] # noqa: U100 ) -> dict[str, Path]: """Raise ValueError if len(files) == 0.""" if len(files) == 0: raise ValueError("Must have at least one file") return files
[docs]class LintianArtifact(LocalArtifact[data_models.DebianLintian]): """LintianArtifact: encapsulate result of the Lintian run.""" _category = "debian:lintian"
[docs] @classmethod def create( cls, analysis: Path, lintian_output: Path, summary: data_models.DebianLintianSummary, ) -> Self: """Return a LintianArtifact with the files set.""" data = data_models.DebianLintian(summary=summary) artifact = cls(category=cls._category, data=data) artifact.add_local_file(analysis, override_name="analysis.json") artifact.add_local_file(lintian_output, override_name="lintian.txt") return artifact
@pydantic.validator("files") def _validate_required_files( cls, files: dict[str, Path] # noqa: U100 ) -> dict[str, Path]: """Artifact contain "analysis.json, "lintian.txt".""" # The .create() method already enforces having the correct files # But the artifact can be created using debusine.client or a web form, # which do not use the .create() of LintianArtifact but the # LocalArtifact. That's the reason that is needed to validate # that the required files are attached required_files = {"analysis.json", "lintian.txt"} if files.keys() != required_files: raise ValueError(f"Files required: {sorted(required_files)}") return files @staticmethod def _file_is_json_or_raise_value_error( file_name: str, files: dict[str, Path] ) -> None: """Raise ValueError() if file_name in files is not valid JSON.""" with files[file_name].open() as file: try: json.load(file) except JSONDecodeError as exc: raise ValueError(f"{file_name} is not valid JSON: {exc}") @pydantic.validator("files") def _validate_file_analysis_is_json( cls, files: dict[str, Path] # noqa: U100 ) -> dict[str, Path]: """Validate that "analysis.json" is valid JSON.""" cls._file_is_json_or_raise_value_error("analysis.json", files) return files
[docs]class AutopkgtestArtifact(LocalArtifact[data_models.DebianAutopkgtest]): """Autopkgtest: encapsulate result of the Autopkgtest run.""" _category = "debian:autopkgtest"
[docs] @classmethod def create( cls, artifact_directory: Path, data: data_models.DebianAutopkgtest ) -> Self: """Return AutopkgtestArtifact with the files and data set.""" artifact = cls(category=cls._category, data=data) for file in artifact_directory.rglob("*"): if not file.is_file(): # Only add files continue if file.is_relative_to(artifact_directory / "binaries"): # Skip binaries/ continue artifact.add_local_file(file, artifact_base_dir=artifact_directory) return artifact
[docs]class DebianSystemTarballArtifact( LocalArtifact[data_models.DebianSystemTarball] ): """ Contain system.tar.xz file with a Debian. Can be used by a chroot, container, etc. """ _category = "debian:system-tarball"
[docs] @classmethod def create(cls, tarball: Path, data: dict[str, Any]) -> Self: """Return a DebianSystemTarballArtifact with the tarball file.""" data = data.copy() data["filename"] = tarball.name artifact = cls( category=cls._category, data=data_models.DebianSystemTarball(**data) ) artifact.add_local_file(tarball) return artifact
@pydantic.validator("files") def _validate_file_name_ends_in_tar_xz( cls, files: dict[str, Path] # noqa: U100 ) -> dict[str, Path]: """Check if the artifact contains only one file and it is a .tar.xz.""" if not len(files) == 1: raise ValueError( "DebianSystemTarballArtifact does not contain exactly one file" ) if not (name := next(iter(files.keys()))).endswith(".tar.xz"): raise ValueError(f"Invalid file name: '{name}'. Expected .tar.xz") return files
[docs]class BlhcArtifact(LocalArtifact[data_models.EmptyArtifactData]): """BlhcArtifact: encapsulate result of the blhc run.""" _category = "debian:blhc" data: data_models.EmptyArtifactData = pydantic.Field( default_factory=data_models.EmptyArtifactData )
[docs] @classmethod def create(cls, blhc_output: Path) -> Self: """Return a BlhcArtifact with the files set.""" artifact = cls(category=cls._category) artifact.add_local_file(blhc_output, override_name="blhc.txt") return artifact
class DebianSystemImageArtifact(LocalArtifact[data_models.DebianSystemImage]): """ Contains a image.tar.xz file with a bootable Debian system. Can be used by a VM. """ _category = "debian:system-image" @classmethod def create(cls, image: Path, data: dict[str, Any]) -> Self: """Return a DebianSystemImageArtifact with the image file.""" data = data.copy() data["filename"] = image.name artifact = cls( category=cls._category, data=data_models.DebianSystemImage(**data) ) artifact.add_local_file(image) return artifact @pydantic.validator("files") def _validate_files( cls, files: dict[str, Path] # noqa: U100 ) -> dict[str, Path]: """Check if the artifact contains only one file and it's ending.""" if not len(files) == 1: raise ValueError( "DebianSystemImageArtifact does not contain exactly one file" ) name = next(iter(files.keys())) if not name.endswith(".tar.xz") and not name.endswith(".qcow2"): raise ValueError( f"Invalid file name: '{name}'. Expected .tar.xz or qcow2" ) return files