Source code for debusine.db.models

# Copyright 2019, 2021-2022 The Debusine Developers
# See the AUTHORS file at the top-level directory of this distribution
# This file is part of Debusine. It is subject to the license terms
# in the LICENSE file found in the top-level directory of this
# distribution. No part of Debusine, including this file, may be copied,
# modified, propagated, or distributed except according to the terms
# contained in the LICENSE file.

"""Data models for the db application."""

import copy
import hashlib
import logging
import os
import secrets
from datetime import datetime
from functools import partial
from pathlib import Path
from typing import Optional

from django.conf import settings
from django.contrib.auth.models import AbstractUser
from django.core.exceptions import ValidationError
from django.core.validators import (
from django.db import IntegrityError, models, transaction
from django.db.models import (
from django.utils import timezone
from django.utils.text import slugify
from django.utils.translation import gettext_lazy as _

import jsonschema

from debusine.server import notifications
from debusine.utils import calculate_hash

logger = logging.getLogger(__name__)

[docs]class TokenManager(models.Manager): """Manager for Token model."""
[docs] def get_tokens( self, username: Optional[str] = None, key: Optional[str] = None ) -> QuerySet["Token"]: """ Return all the tokens filtered by a specific owner and/or token. To avoid filtering by owner or token set them to None """ tokens = self.get_queryset() if username: tokens = tokens.filter(user__username=username) if key: tokens = tokens.filter(key=key) return tokens
[docs] def get_token_or_none(self, token_key: str) -> Optional["Token"]: """Return the token with token_key or None.""" try: return self.select_related('worker').get(key=token_key) except Token.DoesNotExist: return None
[docs]class Token(models.Model): """ Database model of a token. A token contains a key and other related data. It's used as a shared key between debusine server and clients (workers). This token model is very similar to rest_framework.authtoken.models.Token. The bigger difference is that debusine's token's owner is a CharField, the rest_framework owner is a OneToOne foreign key to a user. """ key = models.CharField( max_length=64, unique=True, verbose_name='Hexadecimal key, length is 64 chars', validators=[MaxLengthValidator(64), MinLengthValidator(64)], ) user = models.ForeignKey( settings.AUTH_USER_MODEL, null=True, blank=True, on_delete=models.PROTECT, ) comment = models.CharField( max_length=100, default='', verbose_name='Reason that this token was created', blank=True, ) created_at = models.DateTimeField(auto_now_add=True) enabled = models.BooleanField(default=False)
[docs] def save(self, *args, **kwargs) -> None: """Save the token. If key is empty it generates a key.""" if not self.key: self.key = self._generate_key() super().save(*args, **kwargs)
[docs] def enable(self) -> None: """Enable the token and save it.""" self.enabled = True
[docs] def disable(self) -> None: """Disable the token and save it.""" self.enabled = False notifications.notify_worker_token_disabled(self)
def __str__(self) -> str: """Return the key of the Token.""" return self.key @classmethod def _generate_key(cls) -> str: """Create and return a key.""" return secrets.token_hex(32) objects = TokenManager()
[docs]class WorkerManager(models.Manager): """Manager for Worker model."""
[docs] def connected(self) -> QuerySet["Worker"]: """Return connected workers.""" return Worker.objects.filter(connected_at__isnull=False).order_by( 'connected_at' )
[docs] def waiting_for_work_request(self) -> QuerySet["Worker"]: """ Return workers that can be assigned a new work request. The workers without any associated pending or running work request don't have anything to run right now and are thus waiting for a work request. Worker's token must be enabled. """ running_work_request_count = Count( 'assigned_work_requests', filter=Q( assigned_work_requests__status__in=[ WorkRequest.Statuses.RUNNING, WorkRequest.Statuses.PENDING, ] ), ) workers = ( Worker.objects.filter(connected_at__isnull=False) .order_by('connected_at') .annotate(count_running=running_work_request_count) .filter(count_running=0) .filter(token__enabled=True) ) return workers
@staticmethod def _generate_unique_name(name: str, counter: int) -> str: """Return name slugified adding "-counter" if counter != 1.""" new_name = slugify(name.replace('.', '-')) if counter != 1: new_name += f'-{counter}' return new_name
[docs] @classmethod def create_with_fqdn(cls, fqdn: str, token: str) -> "Worker": """Return a new Worker with its name based on fqdn, with token.""" counter = 1 while True: name = cls._generate_unique_name(fqdn, counter) try: with transaction.atomic(): return Worker.objects.create( name=name, token=token, ) except IntegrityError: counter += 1
[docs] def get_worker_by_token_key_or_none( self, token_key: str ) -> Optional["Worker"]: """Return a Worker identified by its associated secret token.""" try: return Worker.objects.get(token__key=token_key) except Worker.DoesNotExist: return None
[docs] def get_worker_or_none(self, worker_name: str) -> Optional["Worker"]: """Return the worker with worker_name or None.""" try: return self.get(name=worker_name) except Worker.DoesNotExist: return None
[docs]class Worker(models.Model): """Database model of a worker.""" name = models.SlugField( unique=True, help_text='Human readable name of the worker based on the FQDN', ) registered_at = models.DateTimeField() connected_at = models.DateTimeField(blank=True, null=True) # This is the token used by the Worker to authenticate # Users have their own tokens - this is specific to a single worker. token = models.OneToOneField( Token, on_delete=models.PROTECT, related_name="worker" ) static_metadata = JSONField(default=dict, blank=True) dynamic_metadata = JSONField(default=dict, blank=True) dynamic_metadata_updated_at = models.DateTimeField(blank=True, null=True)
[docs] def mark_disconnected(self) -> None: """Update and save relevant Worker fields after disconnecting.""" self.connected_at = None
[docs] def mark_connected(self) -> None: """Update and save relevant Worker fields after connecting.""" self.connected_at =
[docs] def connected(self) -> bool: """Return True if the Worker is connected.""" return self.connected_at is not None
[docs] def metadata(self) -> dict: """ Return all metadata with static_metadata and dynamic_metadata merged. If the same key is in static_metadata and dynamic_metadata: static_metadata takes priority. """ return { **copy.deepcopy(self.dynamic_metadata), **copy.deepcopy(self.static_metadata), }
[docs] def set_dynamic_metadata(self, metadata: dict) -> None: """Save metadata and update dynamic_metadata_updated_at.""" self.dynamic_metadata = metadata self.dynamic_metadata_updated_at =
def __str__(self) -> str: """Return the id and name of the Worker.""" return f"Id: {} Name: {}" objects = WorkerManager()
[docs]class WorkRequestManager(models.Manager): """Manager for WorkRequest model."""
[docs] def pending( self, exclude_assigned: bool = False, worker: Optional[Worker] = None ) -> QuerySet["WorkRequest"]: """ Return a QuerySet of tasks in WorkRequest.Statuses.PENDING status. QuerySet is ordered by created_at. Filter out the assigned pending ones if exclude_assigned=True, and include only the WorkRequest for worker. PENDING is the default status of a task on creation. """ if exclude_assigned and worker is not None: raise ValueError("Cannot exclude_assigned and filter by worker") qs = WorkRequest.objects.filter(status=WorkRequest.Statuses.PENDING) if exclude_assigned: qs = qs.exclude(worker__isnull=False) if worker is not None: qs = qs.filter(worker=worker) qs = qs.order_by('created_at') return qs
[docs] def running( self, worker: Optional[Worker] = None ) -> QuerySet["WorkRequest"]: """Return a QuerySet of tasks in running status.""" qs = WorkRequest.objects.filter(status=WorkRequest.Statuses.RUNNING) if worker is not None: qs = qs.filter(worker=worker) return qs
[docs] def running_or_pending_exists(self, worker: Worker) -> bool: """Return True if there are running or pending work requests.""" return ( self.running(worker=worker) | self.pending(worker=worker) ).exists()
[docs] def completed(self) -> QuerySet["WorkRequest"]: """Return a QuerySet of tasks in completed status.""" return WorkRequest.objects.filter(status=WorkRequest.Statuses.COMPLETED)
[docs] def aborted(self) -> QuerySet["WorkRequest"]: """Return a QuerySet of tasks in aborted status.""" return WorkRequest.objects.filter(status=WorkRequest.Statuses.ABORTED)
[docs]class WorkRequest(models.Model): """ Database model of a request to execute a task. Time-consuming operations offloaded to Workers and using Artifacts (and associated Files) as input and output. Submission API needs to check if the request is valid using ontological rules - e.g. whether the specified distribution for a build task exists. Avoid exposing the status of tasks to the admin interface to avoid runaway changes whilst the scheduler process is running. The WorkRequest uses the non-Django tasks module to do the checks on whether a task can run on a particular worker. WorkRequest State Machine ========================= New WorkRequest database entries default to ``WorkRequest.Statuses.PENDING``. Once the WorkRequest is assigned to a worker and is running starts running the status is changed to ``WorkRequest.Statuses.RUNNING``. If the WorkRequest is aborted, the Scheduled.Task status is ``WorkRequest.Statuses.ABORTED``. If the task finish on the Worker the WorkRequest status will be ``WorkRequest.Statuses.COMPLETED`` and a WorkRequest.Result is then set, ``WorkRequest.Results.PASSED`` or ``WorkRequest.Results.FAILED``. .. graphviz:: digraph { Statuses_PENDING -> Statuses_RUNNING -> Statuses_COMPLETED; Statuses_PENDING -> Statuses_COMPLETED; Statuses_PENDING -> Statuses_ABORTED; Statuses_PENDING -> Statuses_RUNNING -> Statuses_ABORTED; } ``WorkRequest.started_at`` is set when the WorkRequest moves from ``WorkRequest.Statuses.PENDING`` to ``WorkRequest.Statuses.RUNNING``. ``WorkRequest.completed_at`` is set when the Task moves from ``WorkRequest.Statuses.RUNNING`` to ``WorkRequest.Statuses.COMPLETED``. """ objects = WorkRequestManager()
[docs] class Statuses(models.TextChoices): PENDING = "pending", _("Pending") RUNNING = "running", _("Running") COMPLETED = "completed", _("Completed") ABORTED = "aborted", _("Aborted")
[docs] class Results(models.TextChoices): NONE = "", "" SUCCESS = "success", _("Success") FAILURE = "failure", _("Failure") ERROR = "error", _("Error")
workspace = models.ForeignKey("Workspace", on_delete=models.PROTECT) created_at = models.DateTimeField(auto_now_add=True) started_at = models.DateTimeField(blank=True, null=True) completed_at = models.DateTimeField(blank=True, null=True) created_by = models.ForeignKey("User", on_delete=models.PROTECT) status = models.CharField( max_length=9, choices=Statuses.choices, default=Statuses.PENDING, editable=False, ) result = models.CharField( max_length=7, choices=Results.choices, default=Results.NONE, editable=False, ) # any one work request can only be on one worker # even if the worker can handle multiple work request. worker = models.ForeignKey( Worker, null=True, on_delete=models.CASCADE, related_name="assigned_work_requests", ) task_name = models.CharField( max_length=100, verbose_name='Name of the task to execute' ) task_data = JSONField(default=dict, blank=True) def __str__(self) -> str: """Return the id of the WorkRequest.""" return str(
[docs] def mark_running(self) -> bool: """Worker has begun executing the task.""" if self.worker is None: logger.debug( "Cannot mark WorkRequest %s as running: it does not have " "an assigned worker",, ) return False if self.status == self.Statuses.RUNNING: # It was already running - nothing to do return True if self.status != self.Statuses.PENDING: logger.debug( "Cannot mark as running - current status %s", self.status ) return False work_request_running_for_worker = WorkRequest.objects.running( worker=self.worker ).first() # There is a possible race condition here. This check (and other # checks in this class) currently help to avoid development mistakes # not database full integrity if work_request_running_for_worker is not None: logger.debug( "Cannot mark WorkRequest %s as running - the assigned worker " "%s is running another WorkRequest %s",, self.worker, work_request_running_for_worker, ) return False self.started_at = self.status = self.Statuses.RUNNING logger.debug("Marked WorkRequest %s as running", return True
[docs] def mark_completed(self, result: "WorkRequest.Results") -> bool: """Worker has finished executing the task.""" if self.status not in (self.Statuses.PENDING, self.Statuses.RUNNING): logger.debug( "Cannot mark WorkRequest %s as completed: current status is %s",, self.status, ) return False self.result = result self.completed_at = self.status = self.Statuses.COMPLETED logger.debug("Marked WorkRequest %s as completed", notifications.notify_work_request_completed(self) return True
[docs] def mark_aborted(self) -> bool: """ Worker has aborted the task after request from UI. Task will typically be in CREATED or RUNNING status. """ self.completed_at = self.status = self.Statuses.ABORTED logger.debug( "Marked WorkRequest %s as aborted (from status %s)",, self.status, ) return True
[docs] def assign_worker(self, worker: Optional[Worker]) -> None: """Assign worker and save it.""" self.worker = worker notifications.notify_work_request_assigned(self)
@property def duration(self) -> Optional[int]: """Return duration in seconds between started_at and completed_at.""" if self.started_at and self.completed_at: return (self.completed_at - self.started_at).total_seconds() else: return None
[docs]class File(models.Model): """ Database model of a file. Model different attributes of the file. From outside the class use the property ``hash`` and do not use ``sha256`` field. This allows, if ever needed, to change the hash algorithm only modifying this class without changing the users of this class. """ current_hash_algorithm = "sha256" sha256 = models.BinaryField( / 8, help_text=f"{current_hash_algorithm} of the file", ) size = models.PositiveBigIntegerField(help_text="Size in bytes of the file") class Meta: constraints = [ UniqueConstraint( fields=["sha256", "size"], name="Hash-size must be unique" ), CheckConstraint( name="sha256 cannot be empty", check=~Q(sha256=b"") ), ] def __str__(self) -> str: """Return basic information of File.""" return ( f"id: {} " f"{self.current_hash_algorithm}: {self.hash_digest.hex()} " f"size: {self.size}" )
[docs] @classmethod def from_local_path(cls, local_path: Path) -> "File": """Return a File with the fields.""" file, created = File.objects.get_or_create( **{ File.current_hash_algorithm: cls.calculate_hash(local_path), "size": os.stat(local_path).st_size, } ) return file
@property def hash_digest(self) -> bytes: """ Return the default hash digest of the File. Use this property instead of the field to allow the algorithm to be changed in the future. """ return getattr(self, self.current_hash_algorithm) @hash_digest.setter def hash_digest(self, value: bytes) -> None: setattr(self, self.current_hash_algorithm, value)
[docs] @classmethod def calculate_hash(cls, file_path: Path) -> bytes: """Return hash for the file.""" return calculate_hash(file_path, cls.current_hash_algorithm)
[docs] @classmethod def get_or_create(cls, *, hash_digest: bytes, size: int): """Call File.objects.get_or_create with the correct parameters.""" return cls.objects.get_or_create( **{cls.current_hash_algorithm: hash_digest, "size": size} )
class _FileStoreBackendChoices(models.TextChoices): """Enumerate all the Backend choices.""" LOCAL = "Local", _("Local") DEFAULT_FILE_STORE_NAME = "Default"
[docs]def default_file_store() -> "FileStore": """Return the default file store.""" return FileStore.objects.get(name=DEFAULT_FILE_STORE_NAME)
[docs]class FileStore(models.Model): """ Database model of a FileStore. FileStore has files attached to it. """ # BackendChoices not defined in FileStore to make it accessible # during the migrations. Historical models are used # ( BackendChoices = _FileStoreBackendChoices name = models.CharField(max_length=255, unique=True) backend = models.CharField(max_length=5, choices=BackendChoices.choices) configuration = models.JSONField(default=dict, blank=True) files = models.ManyToManyField(File, through="FileInStore")
[docs] @staticmethod def default() -> "FileStore": """Return the default FileStore.""" return default_file_store()
def __str__(self) -> str: """Return basic information of FileStore.""" return f"Id: {} Name: {} Backend: {self.backend}"
[docs] def get_backend_object(self) -> "LocalFileStore": # noqa: F821 """Instantiate the correct FileStore and return it.""" if self.backend == _FileStoreBackendChoices.LOCAL: from debusine.server.file_store.local import LocalFileStore return LocalFileStore(self) raise NotImplementedError( f"Cannot create FileStore for backend type: {self.backend}" )
[docs]class FileInStore(models.Model): """ Database model used as "through" from FileStore. Keeps the relationship between stores, files and attached data. """ store = models.ForeignKey(FileStore, on_delete=models.PROTECT) file = models.ForeignKey(File, on_delete=models.PROTECT) data = models.JSONField(default=dict, blank=True) class Meta: constraints = [ UniqueConstraint(fields=["store", "file"], name="Unique_StoreFile") ] def __str__(self) -> str: """Return basic information of FileInStore.""" return ( f"Id: {} " f"Store: {} " f"File: {self.file.hash_digest.hex()}" )
[docs]class WorkspaceManager(models.Manager): """Manager for Workspace model."""
[docs] @classmethod def create_with_name(cls, name: str) -> "Workspace": """Return a new Workspace with name and the default FileStore.""" return Workspace.objects.create( name=name, default_file_store=FileStore.default() )
[docs]def default_workspace() -> "Workspace": """Return the default Workspace.""" return Workspace.objects.get(name=DEFAULT_WORKSPACE_NAME)
[docs]class Workspace(models.Model): """Workspace model.""" objects = WorkspaceManager() name = models.CharField(max_length=255, unique=True) default_file_store = models.ForeignKey( FileStore, on_delete=models.PROTECT, related_name="default_workspaces" ) other_file_stores = models.ManyToManyField( FileStore, related_name="other_workspaces" ) public = models.BooleanField(default=False)
[docs] def is_file_in_workspace(self, fileobj: File) -> bool: """Return True if fileobj is in any store available for Workspace.""" file_stores = [self.default_file_store, *self.other_file_stores.all()] for file_store in file_stores: if file_store.fileinstore_set.filter(file=fileobj).exists(): return True return False
def __str__(self) -> str: """Return basic information of Workspace.""" return f"Id: {} Name: {}"
[docs]class ArtifactManager(models.Manager): """Manager for the Artifact model."""
[docs] def not_expired(self, at: datetime) -> QuerySet["Artifact"]: """ Return queryset with artifacts that have not expired. :param at: datetime to check if the artifacts are not expired. :return: artifacts that expire_at is None (do not expire) or expire_at is after the given datetime. """ return self.get_queryset().filter( Q(expire_at__gt=at) | Q(expire_at__isnull=True) )
[docs] def expired(self, at: datetime) -> QuerySet["Artifact"]: """ Return queryset with artifacts that have expired. :param at: datetime to check if the artifacts are expired. :return: artifacts that expire_at is before the given datetime. """ return self.get_queryset().filter(expire_at__lt=at)
[docs]class Artifact(models.Model): """Artifact model.""" category = models.CharField(max_length=255) workspace = models.ForeignKey(Workspace, on_delete=models.PROTECT) files = models.ManyToManyField(File, through="FileInArtifact") data = models.JSONField(default=dict, blank=True) created_at = models.DateTimeField(auto_now_add=True) expire_at = models.DateTimeField(blank=True, null=True) created_by = models.ForeignKey( "User", blank=True, null=True, on_delete=models.PROTECT ) created_by_work_request = models.ForeignKey( WorkRequest, blank=True, null=True, on_delete=models.PROTECT ) objects = ArtifactManager()
[docs] def expired(self, at: datetime) -> bool: """ Return True if this artifact has expired at a given datetime. :param at: datetime to check if the artifact is expired. :return bool: True if the artifact's expire_at is on or earlier than the parameter at. """ return self.expire_at is not None and self.expire_at <= at
def __str__(self) -> str: """Return basic information of Artifact.""" return ( f"Id: {} " f"Category: {self.category} " f"Workspace: {}" )
[docs]class FileInArtifact(models.Model): """File in artifact.""" artifact = models.ForeignKey(Artifact, on_delete=models.PROTECT) path = models.CharField(max_length=500) file = models.ForeignKey( File, null=True, blank=True, on_delete=models.PROTECT ) class Meta: constraints = [ UniqueConstraint( fields=["artifact", "path"], name="artifact_path_unique" ), ] def __str__(self) -> str: """Return basic information of FileInArtifact.""" return ( f"Id: {} Artifact: {} " f"Path: {self.path} File: {}" )
[docs]class FileUpload(models.Model): """File that is being/has been uploaded.""" file_in_artifact = models.OneToOneField( FileInArtifact, on_delete=models.PROTECT ) path = models.CharField( max_length=500, help_text="Path in the uploads directory", unique=True, ) last_activity_at = models.DateTimeField(auto_now_add=True)
[docs] @classmethod def current_size(cls, artifact: Artifact, path_in_artifact: str) -> int: """ Return current file size. The current file size might be smaller than the expected size of the file if the file has not finished being uploaded. Raise ValueError if path_in_artifact does not exist in Artifact or if there's no FileUpload object for the specific File. """ try: file_in_artifact = FileInArtifact.objects.get( artifact=artifact, path=path_in_artifact ) except FileInArtifact.DoesNotExist: raise ValueError( f'No FileInArtifact for Artifact {} ' f'and path "{path_in_artifact}"' ) try: file_upload = FileUpload.objects.get( file_in_artifact=file_in_artifact ) except FileUpload.DoesNotExist: raise ValueError( f"No FileUpload for FileInArtifact {}" ) try: size = file_upload.absolute_file_path().stat().st_size except FileNotFoundError: size = 0 return size
[docs] def delete(self, *args, **kwargs): """Schedule deletion of the file in the store.""" file_path = self.absolute_file_path() result = super().delete(*args, **kwargs) # If this method is called from a transaction: transaction.on_commit() # will call file_path.unlink when the most outer transaction is # committed. # # In the case that the code is running without a transaction: # the file_path.unlink will happen now. # # It's important that file_path.unlink is called only if the # DB is updated with the deletion. Otherwise, the file could be # deleted from the store but still referenced from the DB. transaction.on_commit(partial(file_path.unlink, missing_ok=True)) return result
[docs] def absolute_file_path(self) -> Path: """ Return the absolute file path of the file. The files are stored in settings.DEBUSINE_UPLOAD_DIRECTORY. """ return Path(settings.DEBUSINE_UPLOAD_DIRECTORY) / self.path
def __str__(self) -> str: """Return basic information.""" return f"{}"
[docs]class ArtifactRelation(models.Model): """Model relations between artifacts."""
[docs] class Relations(models.TextChoices): EXTENDS = "extends", _("Extends") RELATES_TO = "relates-to", _("Relates to") BUILT_USING = "built-using", _("Built using")
artifact = models.ForeignKey( Artifact, on_delete=models.PROTECT, related_name="relations" ) target = models.ForeignKey( Artifact, on_delete=models.PROTECT, related_name="targeted_by" ) type = models.CharField(max_length=11, choices=Relations.choices) def __str__(self) -> str: """Return str for the object.""" return f"{} {self.type} {}" class Meta: constraints = [ UniqueConstraint( fields=["artifact", "target", "type"], name="ArtifactRelations_must_be_unique", ) ]
[docs]class User(AbstractUser): """Debusine user.""" email = models.EmailField(unique=True)
[docs]class NotificationChannel(models.Model): """Model to store notification configuration."""
[docs] class Methods(models.TextChoices): EMAIL = "email", _("Email")
data_jsonschemas = { Methods.EMAIL: { "type": "object", "properties": { "from": {"type": "string", "format": "email"}, "to": { "type": "array", "items": {"type": "string", "format": "email"}, }, "cc": { "type": "array", "items": {"type": "string", "format": "email"}, }, "subject": {"type": "string"}, }, "additionalProperties": False, "required": ["from", "to"], } } name = models.CharField( max_length=20, unique=True, ) method = models.CharField(max_length=10, choices=Methods.choices) data = models.JSONField(default=dict, blank=True)
[docs] def clean(self): """ Ensure that data is valid for the specific method. :raise ValidationError: for invalid data. """ try: jsonschema.validate(, self.data_jsonschemas[self.method]) except jsonschema.exceptions.ValidationError as exc: raise ValidationError( f"NotificationChannel data is not valid: {exc}" ) return super().clean()
[docs] def save(self, *args, **kwargs): """Run validators and save the instance.""" self.full_clean() return super().save(*args, **kwargs)
def __str__(self): """Return name.""" return