# Copyright 2019, 2021-2022 The Debusine Developers
# See the AUTHORS file at the top-level directory of this distribution
#
# This file is part of Debusine. It is subject to the license terms
# in the LICENSE file found in the top-level directory of this
# distribution. No part of Debusine, including this file, may be copied,
# modified, propagated, or distributed except according to the terms
# contained in the LICENSE file.
"""Data models for the db application."""
import copy
import hashlib
import logging
import os
import secrets
from datetime import datetime, timedelta
from functools import cached_property, partial
from pathlib import Path
from typing import Any, Optional, TYPE_CHECKING
from django.conf import settings
from django.contrib.auth.models import AbstractUser
from django.contrib.postgres.constraints import ExclusionConstraint
from django.contrib.postgres.fields import RangeOperators
from django.core.exceptions import FieldError, ValidationError
from django.core.validators import (
MaxLengthValidator,
MinLengthValidator,
)
from django.db import IntegrityError, models, transaction
from django.db.models import (
CheckConstraint,
Count,
F,
JSONField,
Q,
QuerySet,
UniqueConstraint,
)
from django.db.models.functions import Coalesce
from django.utils import timezone
from django.utils.text import slugify
import jsonpath_rw
from debusine.artifacts import LocalArtifact
from debusine.db.constraints import JsonDataUniqueConstraint
from debusine.server import notifications
from debusine.server.file_backend.models import (
ExternalDebianSuiteBackendConfiguration,
LocalFileBackendConfiguration,
)
from debusine.server.workflows.models import TaskWorkflowData
from debusine.tasks import BaseTask, TaskConfigError
from debusine.tasks.models import (
ActionTypes,
EventReaction,
EventReactions,
NotificationDataEmail,
TaskTypes,
)
from debusine.utils import calculate_hash
if TYPE_CHECKING:
from django_stubs_ext.db.models import TypedModelMeta
from debusine.server.collections import CollectionManagerInterface
from debusine.server.file_backend.local import LocalFileBackend
else:
TypedModelMeta = object
logger = logging.getLogger(__name__)
[docs]class TokenManager(models.Manager["Token"]):
"""Manager for Token model."""
[docs] def get_tokens(
self, username: str | None = None, key: str | None = None
) -> QuerySet["Token"]:
"""
Return all the tokens filtered by a specific owner and/or token.
To avoid filtering by owner or token set them to None
"""
tokens = self.get_queryset()
if username:
tokens = tokens.filter(user__username=username)
if key:
token_hash = hashlib.sha256(key.encode()).hexdigest()
tokens = tokens.filter(hash=token_hash)
return tokens
[docs] def get_token_or_none(self, token_key: str) -> Optional["Token"]:
"""Return the token with token_key or None."""
assert isinstance(token_key, str)
token_hash = hashlib.sha256(token_key.encode()).hexdigest()
try:
return self.select_related('worker').get(hash=token_hash)
except Token.DoesNotExist:
return None
[docs]class Token(models.Model):
"""
Database model of a token.
A token contains a key and other related data. It's used as a shared
key between debusine server and clients (workers).
This token model is very similar to rest_framework.authtoken.models.Token.
The bigger difference is that debusine's token's owner is a CharField,
the rest_framework owner is a OneToOne foreign key to a user.
Database-wise we don't store the token itself, but a hash of the token.
:py:func:`TokenManager.get_token_or_none` can be used to check a provided
token key against the database.
"""
hash = models.CharField(
max_length=64,
unique=True,
verbose_name='Hexadecimal hash, length is 64 chars',
validators=[MaxLengthValidator(64), MinLengthValidator(64)],
)
user = models.ForeignKey(
settings.AUTH_USER_MODEL,
null=True,
blank=True,
on_delete=models.PROTECT,
)
comment = models.CharField(
max_length=100,
default='',
verbose_name='Reason that this token was created',
blank=True,
)
created_at = models.DateTimeField(auto_now_add=True)
enabled = models.BooleanField(default=False)
key: str
[docs] def save(self, *args, **kwargs) -> None:
"""Save the token. If it's a new token it generates a key."""
if not self.hash:
self.key = self._generate_key()
self.hash = self._generate_hash(self.key)
super().save(*args, **kwargs)
[docs] def enable(self) -> None:
"""Enable the token and save it."""
self.enabled = True
self.save()
[docs] def disable(self) -> None:
"""Disable the token and save it."""
self.enabled = False
self.save()
notifications.notify_worker_token_disabled(self)
def __str__(self) -> str:
"""Return the hash of the Token."""
return self.hash
@classmethod
def _generate_key(cls) -> str:
"""Create and return a key."""
return secrets.token_hex(32)
@classmethod
def _generate_hash(cls, secret: str) -> str:
"""Hash the given secret."""
return hashlib.sha256(secret.encode()).hexdigest()
objects = TokenManager()
[docs]class WorkerManager(models.Manager["Worker"]):
"""Manager for Worker model."""
[docs] def connected(self) -> QuerySet["Worker"]:
"""Return connected workers."""
return Worker.objects.filter(connected_at__isnull=False).order_by(
'connected_at'
)
[docs] def waiting_for_work_request(self) -> QuerySet["Worker"]:
"""
Return workers that can be assigned a new work request.
The workers with fewer associated pending or running work requests
than their concurrency level could take more work right now and are
thus waiting for a work request.
Worker's token must be enabled.
"""
running_work_request_count = Count(
'assigned_work_requests',
filter=Q(
assigned_work_requests__status__in=[
WorkRequest.Statuses.RUNNING,
WorkRequest.Statuses.PENDING,
]
),
)
workers = (
Worker.objects.filter(connected_at__isnull=False)
.order_by('connected_at')
.annotate(count_running=running_work_request_count)
.filter(count_running__lt=F("concurrency"))
.filter(Q(internal=True) | Q(token__enabled=True))
)
return workers
@staticmethod
def _generate_unique_name(name: str, counter: int) -> str:
"""Return name slugified adding "-counter" if counter != 1."""
new_name = slugify(name.replace('.', '-'))
if counter != 1:
new_name += f'-{counter}'
return new_name
[docs] @classmethod
def create_with_fqdn(cls, fqdn: str, token: Token) -> "Worker":
"""Return a new Worker with its name based on fqdn, with token."""
counter = 1
while True:
name = cls._generate_unique_name(fqdn, counter)
try:
with transaction.atomic():
return Worker.objects.create(
name=name, token=token, registered_at=timezone.now()
)
except IntegrityError:
counter += 1
[docs] @classmethod
def get_or_create_celery(cls) -> "Worker":
"""Return a new Worker representing the Celery task queue."""
try:
return Worker.objects.get(name="celery", internal=True)
except Worker.DoesNotExist:
return Worker.objects.create(
name="celery", internal=True, registered_at=timezone.now()
)
[docs] def get_worker_by_token_key_or_none(
self, token_key: str
) -> Optional["Worker"]:
"""Return a Worker identified by its associated secret token."""
try:
token_hash = hashlib.sha256(token_key.encode()).hexdigest()
return Worker.objects.get(token__hash=token_hash)
except Worker.DoesNotExist:
return None
[docs] def get_worker_or_none(self, worker_name: str) -> Optional["Worker"]:
"""Return the worker with worker_name or None."""
try:
return self.get(name=worker_name)
except Worker.DoesNotExist:
return None
[docs]class Worker(models.Model):
"""Database model of a worker."""
name = models.SlugField(
unique=True,
help_text='Human readable name of the worker based on the FQDN',
)
registered_at = models.DateTimeField()
connected_at = models.DateTimeField(blank=True, null=True)
# This is the token used by the Worker to authenticate
# Users have their own tokens - this is specific to a single worker.
token = models.OneToOneField(
Token, null=True, on_delete=models.PROTECT, related_name="worker"
)
static_metadata = JSONField(default=dict, blank=True)
dynamic_metadata = JSONField(default=dict, blank=True)
dynamic_metadata_updated_at = models.DateTimeField(blank=True, null=True)
internal = models.BooleanField(default=False)
concurrency = models.PositiveIntegerField(
default=1,
help_text="Number of tasks this worker can run simultaneously",
)
class Meta(TypedModelMeta):
constraints = [
# Non-internal workers must have a token.
CheckConstraint(
name="%(app_label)s_%(class)s_internal_or_token",
check=Q(internal=True) | Q(token__isnull=False),
)
]
[docs] def mark_disconnected(self) -> None:
"""Update and save relevant Worker fields after disconnecting."""
self.connected_at = None
self.save()
[docs] def mark_connected(self) -> None:
"""Update and save relevant Worker fields after connecting."""
self.connected_at = timezone.now()
self.save()
[docs] def connected(self) -> bool:
"""Return True if the Worker is connected."""
return self.connected_at is not None
[docs] def is_busy(self) -> bool:
"""
Return True if the Worker is busy with work requests.
A Worker is busy if it has as many running or pending work requests
as its concurrency level.
"""
return (
WorkRequest.objects.running(worker=self)
| WorkRequest.objects.pending(worker=self)
).count() >= self.concurrency
def __str__(self) -> str:
"""Return the id and name of the Worker."""
return f"Id: {self.id} Name: {self.name}"
objects = WorkerManager()
[docs]class WorkflowTemplate(models.Model):
"""
Database model for Workflow templates.
Workflow templates contain the information needed to instantiate a
workflow, with a Workflow orchestrator and mandatory parameters.
"""
class Meta(TypedModelMeta):
constraints = [
UniqueConstraint(
fields=["name", "workspace"],
name="%(app_label)s_%(class)s_unique_name_workspace",
),
]
name = models.CharField(max_length=255)
workspace = models.ForeignKey("Workspace", on_delete=models.PROTECT)
task_name = models.CharField(
max_length=100, verbose_name='Name of the Workflow orchestrator class'
)
task_data = JSONField(default=dict, blank=True)
priority = models.IntegerField(
default=0,
help_text="Base priority for work requests created from this template",
)
[docs] def clean(self):
"""
Ensure that task_name and task data are valid.
:raise ValidationError: for invalid data.
"""
# Import here to prevent circular imports
from debusine.server.workflows import Workflow
if not isinstance(self.task_data, dict):
raise ValidationError(
{"task_data": "task data must be a dictionary"}
)
# Instantiate the orchestrator and use it to validate task_data
workflow_cls = Workflow.from_name(self.task_name)
try:
workflow_cls.validate_template_data(self.task_data)
except Exception as exc:
raise ValidationError({"task_data": str(exc)})
class _WorkRequestStatuses(models.TextChoices):
"""Choices for WorkRequest.status."""
PENDING = "pending", "Pending"
RUNNING = "running", "Running"
COMPLETED = "completed", "Completed"
ABORTED = "aborted", "Aborted"
BLOCKED = "blocked", "Blocked"
[docs]class WorkRequestManager(models.Manager["WorkRequest"]):
"""Manager for WorkRequest model."""
[docs] def create_workflow_callback(
self,
*,
parent: "WorkRequest",
step: str,
display_name: str | None = None,
status: _WorkRequestStatuses | None = None,
):
"""
Create a workflow callback WorkRequest.
A parent is always required, as a callback only makes sense as part of
a workflow.
:param step: string set by the workflow to identify this callback
"""
return self.create(
workspace=parent.workspace,
parent=parent,
created_by=parent.created_by,
status=status or WorkRequest.Statuses.BLOCKED,
task_type=TaskTypes.INTERNAL,
task_name="workflow",
task_data={},
priority_base=parent.priority_effective,
workflow_data_json=TaskWorkflowData(
step=step, display_name=display_name or step
).dict(exclude_unset=True),
)
[docs] def create_workflow(
self,
*,
template: WorkflowTemplate,
data: dict[str, Any],
created_by: "User",
parent: Optional["WorkRequest"] = None,
status: _WorkRequestStatuses | None = None,
) -> "WorkRequest":
"""Create a workflow from a template and user-provided data."""
# Import here to prevent circular imports
from debusine.server.workflows import Workflow
# Lookup the orchestrator
workflow_cls = Workflow.from_name(template.task_name)
# Merge user provided data into template data
task_data = workflow_cls.build_workflow_data(template.task_data, data)
# Build the WorkRequest
work_request = self.create(
workspace=template.workspace,
parent=parent,
created_by=created_by,
status=status or WorkRequest.Statuses.BLOCKED,
task_type=TaskTypes.WORKFLOW,
task_name=template.task_name,
task_data=task_data,
priority_base=template.priority,
)
try:
# Instantiate the orchestrator
orchestrator = workflow_cls(work_request)
# Thorough input validation
orchestrator.validate_input()
# Populate the workflow with initial work requests
orchestrator.populate()
except Exception:
# TODO: Can we have a better except than something which catches
# also things like SyntaxError?
# TODO: How can we store the error so that the user can see it?
logger.exception("Cannot start a workflow")
work_request.status = WorkRequest.Statuses.RUNNING
work_request.mark_completed(WorkRequest.Results.ERROR)
return work_request
[docs] def create_synchronization_point(
self,
*,
parent: "WorkRequest",
status: _WorkRequestStatuses | None = None,
):
"""
Create a synchronization point WorkRequest.
A parent is always required, as a synchronization point only makes
sense as part of a workflow.
"""
return self.create(
workspace=parent.workspace,
parent=parent,
created_by=parent.created_by,
status=status or WorkRequest.Statuses.BLOCKED,
task_type=TaskTypes.INTERNAL,
task_name="synchronization_point",
task_data={},
priority_base=parent.priority_effective,
)
[docs] def pending(
self, exclude_assigned: bool = False, worker: Worker | None = None
) -> QuerySet["WorkRequest"]:
"""
Return a QuerySet of tasks in WorkRequest.Statuses.PENDING status.
QuerySet is ordered by descending effective priority, then by
created_at.
Filter out the assigned pending ones if exclude_assigned=True,
and include only the WorkRequest for worker.
PENDING is the default status of a task on creation.
"""
if exclude_assigned and worker is not None:
raise ValueError("Cannot exclude_assigned and filter by worker")
qs = WorkRequest.objects.filter(status=WorkRequest.Statuses.PENDING)
if exclude_assigned:
qs = qs.exclude(worker__isnull=False)
if worker is not None:
qs = qs.filter(worker=worker)
qs = qs.order_by(
(F("priority_base") + F("priority_adjustment")).desc(), "created_at"
)
return qs
[docs] def running(self, worker: Worker | None = None) -> QuerySet["WorkRequest"]:
"""Return a QuerySet of tasks in running status."""
qs = WorkRequest.objects.filter(status=WorkRequest.Statuses.RUNNING)
if worker is not None:
qs = qs.filter(worker=worker)
return qs
[docs] def completed(self) -> QuerySet["WorkRequest"]:
"""Return a QuerySet of tasks in completed status."""
return WorkRequest.objects.filter(status=WorkRequest.Statuses.COMPLETED)
[docs] def aborted(self) -> QuerySet["WorkRequest"]:
"""Return a QuerySet of tasks in aborted status."""
return WorkRequest.objects.filter(status=WorkRequest.Statuses.ABORTED)
[docs]class WorkRequest(models.Model):
"""
Database model of a request to execute a task.
Time-consuming operations offloaded to Workers and using
Artifacts (and associated Files) as input and output.
Submission API needs to check if the request is valid using
ontological rules - e.g. whether the specified distribution for
a build task exists.
Avoid exposing the status of tasks to the admin interface to avoid
runaway changes whilst the scheduler process is running.
The WorkRequest uses the non-Django tasks module to do the checks on
whether a task can run on a particular worker.
WorkRequest State Machine
=========================
New WorkRequest database entries default to
``WorkRequest.Statuses.PENDING``.
Once the WorkRequest is assigned to a worker and is running starts running
the status is changed to ``WorkRequest.Statuses.RUNNING``.
If the WorkRequest is aborted, the Scheduled.Task status is
``WorkRequest.Statuses.ABORTED``.
If the task finish on the Worker the WorkRequest status will be
``WorkRequest.Statuses.COMPLETED`` and a WorkRequest.Result is then set,
``WorkRequest.Results.PASSED`` or ``WorkRequest.Results.FAILED``.
.. graphviz::
digraph {
Statuses_PENDING -> Statuses_RUNNING -> Statuses_COMPLETED;
Statuses_PENDING -> Statuses_COMPLETED;
Statuses_PENDING -> Statuses_ABORTED;
Statuses_PENDING -> Statuses_RUNNING -> Statuses_ABORTED;
}
``WorkRequest.started_at`` is set when the WorkRequest moves from
``WorkRequest.Statuses.PENDING`` to ``WorkRequest.Statuses.RUNNING``.
``WorkRequest.completed_at`` is set when the Task moves from
``WorkRequest.Statuses.RUNNING`` to ``WorkRequest.Statuses.COMPLETED``.
"""
objects = WorkRequestManager()
Statuses = _WorkRequestStatuses
[docs] class Results(models.TextChoices):
NONE = "", ""
SUCCESS = "success", "Success"
FAILURE = "failure", "Failure"
ERROR = "error", "Error"
[docs] class UnblockStrategy(models.TextChoices):
DEPS = "deps", "Dependencies have completed"
MANUAL = "manual", "Manually unblocked"
workspace = models.ForeignKey("Workspace", on_delete=models.PROTECT)
created_at = models.DateTimeField(auto_now_add=True)
started_at = models.DateTimeField(blank=True, null=True)
completed_at = models.DateTimeField(blank=True, null=True)
created_by = models.ForeignKey("User", on_delete=models.PROTECT)
status = models.CharField(
max_length=9,
choices=Statuses.choices,
default=Statuses.PENDING,
editable=False,
)
result = models.CharField(
max_length=7,
choices=Results.choices,
default=Results.NONE,
editable=False,
)
# any one work request can only be on one worker
# even if the worker can handle multiple work request.
worker = models.ForeignKey(
Worker,
null=True,
blank=True,
on_delete=models.CASCADE,
related_name="assigned_work_requests",
)
task_type = models.CharField(
max_length=8,
choices=TaskTypes.choices,
default=TaskTypes.WORKER,
editable=False,
verbose_name="Type of task to execute",
)
task_name = models.CharField(
max_length=100, verbose_name='Name of the task to execute'
)
task_data = JSONField(default=dict, blank=True)
priority_base = models.IntegerField(
default=0, help_text="Base priority of this work request"
)
priority_adjustment = models.IntegerField(
default=0,
help_text=(
"Administrator adjustment to the priority of this work request"
),
)
# Workflows
unblock_strategy = models.CharField(
max_length=6,
choices=UnblockStrategy.choices,
default=UnblockStrategy.DEPS,
editable=False,
)
dependencies = models.ManyToManyField(
"self", symmetrical=False, related_name="reverse_dependencies"
)
parent = models.ForeignKey(
"self",
# WorkRequests are only deleted through expiration, use CASCADE
on_delete=models.CASCADE,
blank=True,
null=True,
)
workflow_data_json = JSONField(
default=dict, blank=True, db_column="workflow_data"
)
event_reactions_json = JSONField(
default=dict, blank=True, db_column="event_reactions"
)
class Meta(TypedModelMeta):
indexes = [
# Handles the main scheduler query.
models.Index(
(F("priority_base") + F("priority_adjustment")).desc(),
"created_at",
name="%(app_label)s_%(class)s_pending_idx",
condition=Q(status=_WorkRequestStatuses.PENDING),
),
# Handles queries from workers.
models.Index(
"worker",
name="%(app_label)s_%(class)s_worker_idx",
condition=Q(
status__in=(
_WorkRequestStatuses.PENDING,
_WorkRequestStatuses.RUNNING,
)
),
),
]
permissions = [
(
"manage_workrequest_priorities",
"Can set positive priority adjustments on work requests",
)
]
def __str__(self) -> str:
"""Return the id of the WorkRequest."""
return str(self.id)
@property
def workflow_data(self) -> TaskWorkflowData | None:
"""Access workflow_data_json as a python structure."""
if self.workflow_data_json:
return TaskWorkflowData(**self.workflow_data_json)
else:
return None
@workflow_data.setter
def workflow_data(self, value: TaskWorkflowData | None) -> None:
"""Set workflow_data_json from a python structure."""
if value is None:
self.workflow_data_json = {}
else:
self.workflow_data_json = value.dict(
exclude_unset=True,
)
@property
def event_reactions(self) -> EventReactions:
"""Access event_reactions_json as a pydantic structure."""
return EventReactions(**self.event_reactions_json)
@event_reactions.setter
def event_reactions(self, value: EventReactions) -> None:
"""Set event_reactions_json from a pydantic structure."""
self.event_reactions_json = value.dict()
[docs] def clean(self):
"""
Ensure that task data is valid for this task name.
:raise ValidationError: for invalid data.
"""
if not isinstance(self.task_data, dict):
raise ValidationError(
{"task_data": "task data must be a dictionary"}
)
match self.task_type:
case TaskTypes.WORKER | TaskTypes.SERVER:
try:
task_cls = BaseTask.class_from_name(
self.task_type, self.task_name
)
except (KeyError, ValueError) as e:
raise ValidationError(
{
"task_name": f"{self.task_name}:"
f" invalid {self.task_type} task name"
}
) from e
try:
task_cls(task_data=self.task_data)
except TaskConfigError as e:
raise ValidationError(
{
"task_data": f"invalid {self.task_type}"
f" task data: {e}"
}
) from e
case TaskTypes.WORKFLOW:
# Import here to prevent circular imports
from debusine.server.workflows import Workflow
try:
workflow_cls = Workflow.from_name(self.task_name)
except (KeyError, ValueError) as e:
raise ValidationError(
{
"task_name": f"{self.task_name}:"
f" invalid workflow name"
}
) from e
try:
workflow_cls(self)
except TaskConfigError as e:
raise ValidationError(
{"task_data": f"invalid workflow data: {e}"}
) from e
# TODO: do we want to run expensive tests here
# (Workflow.validate_input), like looking up the types of
# references artifacts to validate them?
case TaskTypes.INTERNAL:
if self.task_name not in ("synchronization_point", "workflow"):
raise ValidationError(
{
"task_name": f"{self.task_name}:"
" invalid task name for internal task"
}
)
# Without this pass, python coverage is currently unable to
# detect that code does flow through here
pass
case _:
raise NotImplementedError(
f"task type {self.task_type} not yet supported"
)
[docs] def mark_running(self) -> bool:
"""Worker has begun executing the task."""
if self.worker is None:
logger.debug(
"Cannot mark WorkRequest %s as running: it does not have "
"an assigned worker",
self.pk,
)
return False
if self.status == self.Statuses.RUNNING:
# It was already running - nothing to do
return True
if self.status != self.Statuses.PENDING:
logger.debug(
"Cannot mark as running - current status %s", self.status
)
return False
work_requests_running_for_worker = WorkRequest.objects.running(
worker=self.worker
)
# There is a possible race condition here. This check (and other
# checks in this class) currently help to avoid development mistakes
# not database full integrity
if work_requests_running_for_worker.count() >= self.worker.concurrency:
logger.debug(
"Cannot mark WorkRequest %s as running - the assigned worker "
"%s is running too many other WorkRequests: %s",
self.pk,
self.worker,
list(work_requests_running_for_worker.order_by("id")),
)
return False
self.started_at = timezone.now()
self.status = self.Statuses.RUNNING
self.save()
logger.debug("Marked WorkRequest %s as running", self.pk)
return True
[docs] def mark_completed(self, result: "WorkRequest.Results") -> bool:
"""Worker has finished executing the task."""
if self.status not in (self.Statuses.PENDING, self.Statuses.RUNNING):
logger.debug(
"Cannot mark WorkRequest %s as completed: current status is %s",
self.pk,
self.status,
)
return False
self.result = result
self.completed_at = timezone.now()
self.status = self.Statuses.COMPLETED
self.save()
logger.debug("Marked WorkRequest %s as completed", self.pk)
# mark dependencies ready before sending notification
self.unblock_reverse_dependencies()
self.process_event_reactions()
return True
[docs] def process_event_reactions(self):
"""Process list of actions to perform on completion."""
actions = self.get_triggered_actions()
notifications.notify_work_request_completed(
self, actions[ActionTypes.SEND_NOTIFICATION]
)
self.process_update_collection_with_artifacts(
actions[ActionTypes.UPDATE_COLLECTION_WITH_ARTIFACTS]
)
[docs] def get_triggered_actions(self) -> dict[str, list[EventReaction]]:
"""Filter events to trigger, grouped by type."""
actions: dict[str, list[EventReaction]] = {
action: [] for action in ActionTypes
}
if self.result in (WorkRequest.Results.SUCCESS,):
for action in self.event_reactions.on_success:
action_type = action.action
actions[action_type].append(action)
elif self.result in (
WorkRequest.Results.FAILURE,
WorkRequest.Results.ERROR,
):
for action in self.event_reactions.on_failure:
action_type = action.action
actions[action_type].append(action)
else: # Results.NONE
pass
return actions
[docs] def process_update_collection_with_artifacts(self, actions):
"""Update collection following event_reactions."""
# local import to avoid circular dependency
from debusine.server.collections import (
CollectionManagerInterface,
ItemAdditionError,
ItemRemovalError,
)
for update in actions:
try:
collection = Collection.objects.get(
workspace=self.workspace, id=update.collection_id
)
except Collection.DoesNotExist:
logger.exception(
"Invalid update-collection-with-artifacts"
" collection_id %s in WorkRequest %s",
update.collection_id,
self.pk,
)
continue
manager = CollectionManagerInterface.get_manager_for(collection)
try:
artifacts_to_add = self.artifact_set.filter(
**update.artifact_filters
)
except FieldError:
logger.exception(
"Invalid update-collection-with-artifacts"
" artifact_filters in WorkRequest %s",
self.pk,
)
continue
for artifact in artifacts_to_add:
item_name = None
if update.name_template is not None:
try:
item_name = CollectionItem.expand_name(
update.name_template,
update.variables,
artifact,
)
except (KeyError, ValueError):
logger.exception(
"Invalid update-collection-with-artifacts"
" variables in WorkRequest %s",
self.pk,
)
continue
try:
manager.replace_or_add_artifact(
artifact,
user=self.created_by,
name=item_name,
)
except (ItemAdditionError, ItemRemovalError, LookupError):
logger.exception(
"Cannot replace or add artifact %s to collection %s"
" from WorkRequest %s",
artifact,
collection,
self.pk,
)
[docs] def mark_pending(self) -> bool:
"""Worker is ready for execution."""
if self.status not in (self.Statuses.BLOCKED):
logger.debug(
"Cannot mark WorkRequest %s as pending: current status is %s",
self.pk,
self.status,
)
return False
self.status = self.Statuses.PENDING
self.save()
logger.debug("Marked WorkRequest %s as pending", self.pk)
return True
[docs] def unblock_reverse_dependencies(self):
"""Unblock reverse dependencies."""
# Shortcuts to keep line length sane
r = WorkRequest.Results
s = WorkRequest.Statuses
if self.result == r.SUCCESS:
for rdep in self.reverse_dependencies.filter(status=s.BLOCKED):
if rdep.unblock_strategy == WorkRequest.UnblockStrategy.DEPS:
other_deps = rdep.dependencies.filter(~Q(id=self.id))
if other_deps.filter(~Q(status=s.COMPLETED)).count() == 0:
rdep.mark_pending()
else: # failures
# TODO: check future self.workflow.allow_failure
for rdep in self.reverse_dependencies.filter(status=s.BLOCKED):
rdep.mark_aborted()
[docs] def mark_aborted(self) -> bool:
"""
Worker has aborted the task after request from UI.
Task will typically be in CREATED or RUNNING status.
"""
self.completed_at = timezone.now()
self.status = self.Statuses.ABORTED
self.save()
logger.debug(
"Marked WorkRequest %s as aborted (from status %s)",
self.pk,
self.status,
)
return True
[docs] def assign_worker(self, worker: Worker | None) -> None:
"""Assign worker and save it."""
self.worker = worker
self.save()
notifications.notify_work_request_assigned(self)
@property
def duration(self) -> float | None:
"""Return duration in seconds between started_at and completed_at."""
if self.started_at and self.completed_at:
return (self.completed_at - self.started_at).total_seconds()
else:
return None
@property
def priority_effective(self) -> int:
"""The effective priority of this work request."""
return self.priority_base + self.priority_adjustment
[docs]class File(models.Model):
"""
Database model of a file.
Model different attributes of the file.
From outside the class use the property ``hash`` and do not use ``sha256``
field. This allows, if ever needed, to change the hash algorithm only
modifying this class without changing the users of this class.
"""
current_hash_algorithm = "sha256"
sha256 = models.BinaryField(
max_length=int(hashlib.new(current_hash_algorithm).digest_size / 8),
help_text=f"{current_hash_algorithm} of the file",
)
size = models.PositiveBigIntegerField(help_text="Size in bytes of the file")
class Meta(TypedModelMeta):
constraints = [
UniqueConstraint(
fields=["sha256", "size"],
name="%(app_label)s_%(class)s_unique_sha256_size",
),
CheckConstraint(
name="%(app_label)s_%(class)s_sha256_not_empty",
check=~Q(sha256=b""),
),
]
def __str__(self) -> str:
"""Return basic information of File."""
return (
f"id: {self.id} "
f"{self.current_hash_algorithm}: {self.hash_digest.hex()} "
f"size: {self.size}"
)
[docs] @classmethod
def from_local_path(cls, local_path: Path) -> "File":
"""Return a File with the fields."""
kwargs: dict[str, Any] = {
File.current_hash_algorithm: cls.calculate_hash(local_path)
}
file, created = File.objects.get_or_create(
**kwargs, size=os.stat(local_path).st_size
)
return file
@property
def hash_digest(self) -> bytes:
"""
Return the default hash digest of the File.
Use this property instead of the field to allow the algorithm
to be changed in the future.
"""
return bytes(getattr(self, self.current_hash_algorithm))
@hash_digest.setter
def hash_digest(self, value: bytes) -> None:
setattr(self, self.current_hash_algorithm, value)
[docs] @classmethod
def calculate_hash(cls, file_path: Path) -> bytes:
"""Return hash for the file."""
return calculate_hash(file_path, cls.current_hash_algorithm)
[docs] @classmethod
def get_or_create(cls, *, hash_digest: bytes, size: int):
"""Call File.objects.get_or_create with the correct parameters."""
kwargs: dict[str, Any] = {cls.current_hash_algorithm: hash_digest}
return cls.objects.get_or_create(**kwargs, size=size)
class _FileStoreBackendChoices(models.TextChoices):
"""Enumerate all the Backend choices."""
LOCAL = "Local", "Local"
EXTERNAL_DEBIAN_SUITE = "ExternalDebianSuite", "ExternalDebianSuite"
DEFAULT_FILE_STORE_NAME = "Default"
[docs]def default_file_store() -> "FileStore":
"""Return the default file store."""
return FileStore.objects.get(name=DEFAULT_FILE_STORE_NAME)
[docs]class FileStore(models.Model):
"""
Database model of a FileStore.
FileStore has files attached to it.
"""
# BackendChoices not defined in FileStore to make it accessible
# during the migrations. Historical models are used
# (https://docs.djangoproject.com/en/3.2/topics/migrations/#historical-models)
BackendChoices = _FileStoreBackendChoices
configuration_validators = {
BackendChoices.LOCAL: LocalFileBackendConfiguration,
BackendChoices.EXTERNAL_DEBIAN_SUITE: (
ExternalDebianSuiteBackendConfiguration
),
}
name = models.CharField(max_length=255, unique=True)
backend = models.CharField(max_length=255, choices=BackendChoices.choices)
configuration = models.JSONField(default=dict, blank=True)
files = models.ManyToManyField(File, through="db.FileInStore")
[docs] @staticmethod
def default() -> "FileStore":
"""Return the default FileStore."""
return default_file_store()
def __str__(self) -> str:
"""Return basic information of FileStore."""
return f"Id: {self.id} Name: {self.name} Backend: {self.backend}"
[docs] def clean(self):
"""
Ensure that data is valid for this backend type.
:raise ValidationError: for invalid data.
"""
if not isinstance(self.configuration, dict):
raise ValidationError(
{"configuration": "configuration must be a dictionary"}
)
try:
self.configuration_validators[self.backend](**self.configuration)
except (TypeError, ValueError) as e:
raise ValidationError(
{"configuration": f"invalid file store configuration: {e}"}
)
[docs] def get_backend_object(self) -> "LocalFileBackend": # noqa: F821
"""Instantiate the correct FileBackend and return it."""
if self.backend == _FileStoreBackendChoices.LOCAL:
from debusine.server.file_backend.local import LocalFileBackend
return LocalFileBackend(self)
raise NotImplementedError(
f"Cannot create FileStore for backend type: {self.backend}"
)
[docs]class FileInStore(models.Model):
"""
Database model used as "through" from FileStore.
Keeps the relationship between stores, files and attached data.
"""
store = models.ForeignKey(FileStore, on_delete=models.PROTECT)
file = models.ForeignKey(File, on_delete=models.PROTECT)
data = models.JSONField(default=dict, blank=True)
class Meta(TypedModelMeta):
constraints = [
UniqueConstraint(
fields=["store", "file"],
name="%(app_label)s_%(class)s_unique_store_file",
)
]
def __str__(self) -> str:
"""Return basic information of FileInStore."""
return (
f"Id: {self.id} "
f"Store: {self.store.name} "
f"File: {self.file.hash_digest.hex()}"
)
[docs]class WorkspaceManager(models.Manager["Workspace"]):
"""Manager for Workspace model."""
[docs] @classmethod
def create_with_name(cls, name: str) -> "Workspace":
"""Return a new Workspace with name and the default FileStore."""
return Workspace.objects.create(
name=name, default_file_store=FileStore.default()
)
DEFAULT_WORKSPACE_NAME = "System"
[docs]def default_workspace() -> "Workspace":
"""Return the default Workspace."""
return Workspace.objects.get(name=DEFAULT_WORKSPACE_NAME)
[docs]class Workspace(models.Model):
"""Workspace model."""
objects = WorkspaceManager()
name = models.CharField(max_length=255, unique=True)
default_file_store = models.ForeignKey(
FileStore, on_delete=models.PROTECT, related_name="default_workspaces"
)
other_file_stores = models.ManyToManyField(
FileStore, related_name="other_workspaces"
)
public = models.BooleanField(default=False)
default_expiration_delay = models.DurationField(
default=timedelta(0),
help_text="minimal time that a new artifact is kept in the"
" workspace before being expired",
)
[docs] def is_file_in_workspace(self, fileobj: File) -> bool:
"""Return True if fileobj is in any store available for Workspace."""
file_stores = [self.default_file_store, *self.other_file_stores.all()]
for file_store in file_stores:
if file_store.fileinstore_set.filter(file=fileobj).exists():
return True
return False
def __str__(self) -> str:
"""Return basic information of Workspace."""
return f"Id: {self.id} Name: {self.name}"
[docs]class ArtifactManager(models.Manager["Artifact"]):
"""Manager for the Artifact model."""
[docs] @classmethod
def create_from_local_artifact(
cls,
local_artifact: LocalArtifact[Any],
workspace: Workspace,
*,
created_by_work_request: WorkRequest | None = None,
) -> "Artifact":
"""Return a new Artifact based on a :class:`LocalArtifact`."""
file_store = workspace.default_file_store.get_backend_object()
artifact = Artifact.objects.create(
category=local_artifact.category,
workspace=workspace,
data=local_artifact.data.dict(),
created_by_work_request=created_by_work_request,
)
for artifact_path, local_path in local_artifact.files.items():
file = File.from_local_path(local_path)
file_store.add_file(local_path, fileobj=file)
FileInArtifact.objects.create(
artifact=artifact, path=artifact_path, file=file
)
return artifact
[docs] def not_expired(self, at: datetime) -> QuerySet["Artifact"]:
"""
Return queryset with artifacts that have not expired.
:param at: datetime to check if the artifacts are not expired.
:return: artifacts that expire_at is None (do not expire) or
expire_at is after the given datetime.
"""
return (
self.get_queryset()
.annotate(
effective_expiration_delay=Coalesce(
"expiration_delay",
"workspace__default_expiration_delay",
)
)
.filter(
Q(effective_expiration_delay=timedelta(0))
| Q(
# https://github.com/typeddjango/django-stubs/issues/1548
created_at__gt=(
at - F("effective_expiration_delay") # type: ignore[operator] # noqa: E501
)
)
)
)
[docs] def expired(self, at: datetime) -> QuerySet["Artifact"]:
"""
Return queryset with artifacts that have expired.
:param at: datetime to check if the artifacts are expired.
:return: artifacts that expire_at is before the given datetime.
"""
return (
self.get_queryset()
.annotate(
effective_expiration_delay=Coalesce(
"expiration_delay",
"workspace__default_expiration_delay",
)
)
.exclude(effective_expiration_delay=timedelta(0))
.filter(
# https://github.com/typeddjango/django-stubs/issues/1548
created_at__lte=(
at - F("effective_expiration_delay") # type: ignore[operator] # noqa: E501
)
)
)
[docs]class Artifact(models.Model):
"""Artifact model."""
category = models.CharField(max_length=255)
workspace = models.ForeignKey(Workspace, on_delete=models.PROTECT)
files = models.ManyToManyField(File, through="db.FileInArtifact")
data = models.JSONField(default=dict, blank=True)
created_at = models.DateTimeField(auto_now_add=True)
expiration_delay = models.DurationField(blank=True, null=True)
created_by = models.ForeignKey(
"User", blank=True, null=True, on_delete=models.PROTECT
)
created_by_work_request = models.ForeignKey(
WorkRequest, blank=True, null=True, on_delete=models.PROTECT
)
objects = ArtifactManager()
[docs] def clean(self):
"""
Ensure that data is valid for this artifact category.
:raise ValidationError: for invalid data.
"""
if not isinstance(self.data, dict):
raise ValidationError({"data": "data must be a dictionary"})
try:
artifact_cls = LocalArtifact.class_from_category(self.category)
except ValueError as e:
raise ValidationError(
{"category": f"{self.category}: invalid artifact category"}
) from e
try:
artifact_cls.create_data(self.data)
except ValueError as e:
raise ValidationError(
{"data": f"invalid artifact data: {e}"}
) from e
[docs] def effective_expiration_delay(self):
"""Return expiration_delay, inherited if None."""
expiration_delay = self.expiration_delay
if self.expiration_delay is None: # inherit
expiration_delay = self.workspace.default_expiration_delay
return expiration_delay
@property
def expire_at(self) -> datetime | None:
"""Return computed expiration date."""
delay = self.effective_expiration_delay()
if delay == timedelta(0):
return None
return self.created_at + delay
[docs] def expired(self, at: datetime) -> bool:
"""
Return True if this artifact has expired at a given datetime.
:param at: datetime to check if the artifact is expired.
:return bool: True if the artifact's expire_at is on or earlier than
the parameter at.
"""
expire_at = self.expire_at
if expire_at is None:
return False
return expire_at <= at
def __str__(self) -> str:
"""Return basic information of Artifact."""
return (
f"Id: {self.id} "
f"Category: {self.category} "
f"Workspace: {self.workspace.id}"
)
[docs]class FileInArtifact(models.Model):
"""File in artifact."""
artifact = models.ForeignKey(Artifact, on_delete=models.PROTECT)
path = models.CharField(max_length=500)
file = models.ForeignKey(File, on_delete=models.PROTECT)
class Meta(TypedModelMeta):
constraints = [
UniqueConstraint(
fields=["artifact", "path"],
name="%(app_label)s_%(class)s_unique_artifact_path",
),
]
def __str__(self) -> str:
"""Return basic information of FileInArtifact."""
return (
f"Id: {self.id} Artifact: {self.artifact.id} "
f"Path: {self.path} File: {self.file.id}"
)
[docs]class FileUpload(models.Model):
"""File that is being/has been uploaded."""
file_in_artifact = models.OneToOneField(
FileInArtifact, on_delete=models.PROTECT
)
path = models.CharField(
max_length=500,
help_text="Path in the uploads directory",
unique=True,
)
last_activity_at = models.DateTimeField(auto_now_add=True)
[docs] @classmethod
def current_size(cls, artifact: Artifact, path_in_artifact: str) -> int:
"""
Return current file size.
The current file size might be smaller than the expected size of the
file if the file has not finished being uploaded.
Raise ValueError if path_in_artifact does not exist in Artifact or
if there's no FileUpload object for the specific File.
"""
try:
file_in_artifact = FileInArtifact.objects.get(
artifact=artifact, path=path_in_artifact
)
except FileInArtifact.DoesNotExist:
raise ValueError(
f'No FileInArtifact for Artifact {artifact.id} '
f'and path "{path_in_artifact}"'
)
try:
file_upload = FileUpload.objects.get(
file_in_artifact=file_in_artifact
)
except FileUpload.DoesNotExist:
raise ValueError(
f"No FileUpload for FileInArtifact {file_in_artifact.id}"
)
try:
size = file_upload.absolute_file_path().stat().st_size
except FileNotFoundError:
size = 0
return size
[docs] def delete(self, *args, **kwargs):
"""Schedule deletion of the file in the store."""
file_path = self.absolute_file_path()
result = super().delete(*args, **kwargs)
# If this method is called from a transaction: transaction.on_commit()
# will call file_path.unlink when the most outer transaction is
# committed.
#
# In the case that the code is running without a transaction:
# the file_path.unlink will happen now.
#
# It's important that file_path.unlink is called only if the
# DB is updated with the deletion. Otherwise, the file could be
# deleted from the store but still referenced from the DB.
transaction.on_commit(partial(file_path.unlink, missing_ok=True))
return result
[docs] def absolute_file_path(self) -> Path:
"""
Return the absolute file path of the file.
The files are stored in settings.DEBUSINE_UPLOAD_DIRECTORY.
"""
return Path(settings.DEBUSINE_UPLOAD_DIRECTORY) / self.path
def __str__(self) -> str:
"""Return basic information."""
return f"{self.id}"
[docs]class ArtifactRelation(models.Model):
"""Model relations between artifacts."""
[docs] class Relations(models.TextChoices):
EXTENDS = "extends", "Extends"
RELATES_TO = "relates-to", "Relates to"
BUILT_USING = "built-using", "Built using"
artifact = models.ForeignKey(
Artifact, on_delete=models.PROTECT, related_name="relations"
)
target = models.ForeignKey(
Artifact, on_delete=models.PROTECT, related_name="targeted_by"
)
type = models.CharField(max_length=11, choices=Relations.choices)
def __str__(self) -> str:
"""Return str for the object."""
return f"{self.artifact.id} {self.type} {self.target.id}"
class Meta(TypedModelMeta):
constraints = [
UniqueConstraint(
fields=["artifact", "target", "type"],
name="%(app_label)s_%(class)s_unique_artifact_target_type",
)
]
[docs]class User(AbstractUser):
"""Debusine user."""
email = models.EmailField(unique=True)
[docs]class Identity(models.Model):
"""
Identity for a user in a remote user database.
An Identity is bound if it's associated with a Django user, or unbound if
no Django user is known for it.
"""
class Meta(TypedModelMeta):
constraints = [
UniqueConstraint(
fields=["issuer", "subject"],
name="%(app_label)s_%(class)s_unique_issuer_subject",
),
]
user = models.ForeignKey(
settings.AUTH_USER_MODEL,
related_name="identities",
null=True,
on_delete=models.SET_NULL,
)
issuer = models.CharField(
max_length=512,
help_text="identifier of auhoritative system for this identity",
)
subject = models.CharField(
max_length=512,
help_text="identifier of the user in the issuer system",
)
last_used = models.DateTimeField(
auto_now=True, help_text="last time this identity has been used"
)
claims = models.JSONField(default=dict)
def __str__(self):
"""Return str for the object."""
return f"{self.issuer}:{self.subject}"
[docs]class NotificationChannel(models.Model):
"""Model to store notification configuration."""
[docs] class Methods(models.TextChoices):
EMAIL = "email", "Email"
data_validators = {Methods.EMAIL: NotificationDataEmail}
name = models.CharField(
max_length=20,
unique=True,
)
method = models.CharField(max_length=10, choices=Methods.choices)
data = models.JSONField(default=dict, blank=True)
[docs] def clean(self):
"""
Ensure that data is valid for the specific method.
:raise ValidationError: for invalid data.
"""
try:
self.data_validators[self.method](**self.data)
except (TypeError, ValueError) as exc:
raise ValidationError(
f"NotificationChannel data is not valid: {exc}"
)
return super().clean()
[docs] def save(self, *args, **kwargs):
"""Run validators and save the instance."""
self.full_clean()
return super().save(*args, **kwargs)
def __str__(self):
"""Return name."""
return self.name
[docs]class Collection(models.Model):
"""Model representing a collection."""
name = models.CharField(max_length=255)
category = models.CharField(max_length=255)
full_history_retention_period = models.DurationField(null=True, blank=True)
metadata_only_retention_period = models.DurationField(null=True, blank=True)
workspace = models.ForeignKey(Workspace, on_delete=models.PROTECT)
child_artifacts = models.ManyToManyField(
Artifact,
through="db.CollectionItem",
through_fields=("parent_collection", "artifact"),
related_name="parent_collections",
)
child_collections = models.ManyToManyField(
"self",
through="db.CollectionItem",
through_fields=("parent_collection", "collection"),
related_name="parent_collections",
symmetrical=False,
)
data = models.JSONField(default=dict, blank=True)
class Meta(TypedModelMeta):
constraints = [
UniqueConstraint(
fields=["name", "category"],
name="%(app_label)s_%(class)s_unique_name_category",
),
CheckConstraint(
check=~Q(name=""), name="%(app_label)s_%(class)s_name_not_empty"
),
CheckConstraint(
check=~Q(category=""),
name="%(app_label)s_%(class)s_category_not_empty",
),
]
@cached_property
def manager(self) -> "CollectionManagerInterface":
"""Get collection manager for this collection category."""
# Local import to avoid circular dependency
from debusine.server.collections import CollectionManagerInterface
return CollectionManagerInterface.get_manager_for(self)
def __str__(self) -> str:
"""Return id, name, category."""
return f"Id: {self.id} Name: {self.name} Category: {self.category}"
[docs]class CollectionItemManager(models.Manager["CollectionItem"]):
"""Manager for CollectionItem model."""
[docs] @staticmethod
def create_from_artifact(
artifact: Artifact,
*,
parent_collection: Collection,
name: str,
data: dict[str, Any],
created_by_user: User,
) -> "CollectionItem":
"""Create a CollectionItem from the artifact."""
return CollectionItem.objects.create(
parent_collection=parent_collection,
name=name,
artifact=artifact,
child_type=CollectionItem.Types.ARTIFACT,
category=artifact.category,
data=data,
created_by_user=created_by_user,
)
[docs] @staticmethod
def create_from_collection(
collection: Collection,
*,
parent_collection: Collection,
name: str,
data: dict[str, Any],
created_by_user: User,
) -> "CollectionItem":
"""Create a CollectionItem from the collection."""
return CollectionItem.objects.create(
parent_collection=parent_collection,
name=name,
collection=collection,
child_type=CollectionItem.Types.COLLECTION,
category=collection.category,
data=data,
created_by_user=created_by_user,
)
[docs]class CollectionItemActiveManager(CollectionItemManager):
"""Manager for active collection items."""
[docs] def get_queryset(self) -> QuerySet["CollectionItem"]:
"""Return only active collection items."""
return super().get_queryset().filter(removed_at__isnull=True)
class _CollectionItemTypes(models.TextChoices):
"""Choices for the CollectionItem.type."""
BARE = "b", "Bare"
ARTIFACT = "a", "Artifact"
COLLECTION = "c", "Collection"
[docs]class CollectionItem(models.Model):
"""CollectionItem model."""
objects = CollectionItemManager()
active_objects = CollectionItemActiveManager()
name = models.CharField(max_length=255)
Types = _CollectionItemTypes
child_type = models.CharField(max_length=1, choices=Types.choices)
# category duplicates the category of the artifact or collection of this
# item, so when the underlying artifact or collection is deleted the
# category is retained
category = models.CharField(max_length=255)
parent_collection = models.ForeignKey(
Collection,
on_delete=models.PROTECT,
related_name="child_items",
)
collection = models.ForeignKey(
Collection,
on_delete=models.PROTECT,
related_name="collection_items",
null=True,
)
artifact = models.ForeignKey(
Artifact,
on_delete=models.PROTECT,
related_name="collection_items",
null=True,
)
data = models.JSONField(default=dict, blank=True)
created_at = models.DateTimeField(auto_now_add=True)
created_by_user = models.ForeignKey(
settings.AUTH_USER_MODEL,
on_delete=models.PROTECT,
related_name="user_created_%(class)s",
)
# created_by_workflow = models.ForeignKey(Workflow,
# on_delete=models.PROTECT, null=True)
removed_at = models.DateTimeField(blank=True, null=True)
removed_by_user = models.ForeignKey(
settings.AUTH_USER_MODEL,
on_delete=models.PROTECT,
null=True,
related_name="user_removed_%(class)s",
)
# removed_by_workflow = models.ForeignKey(Workflow,
# on_delete=models.PROTECT, null=True)
[docs] @staticmethod
def expand_name(
name_template: str, variables: dict[str, str], artifact: Artifact
) -> str:
"""
Format item name following item_template.
Expand JSONPath variables against an Artifact data.
"""
jsonpaths = {}
for name, path in variables.items():
try:
jsonpaths[name] = jsonpath_rw.parse(path)
except Exception as e:
raise ValueError(e)
expanded_variables = {}
for name, jsonpath in jsonpaths.items():
matches = jsonpath.find(artifact.data)
if len(matches) == 1:
expanded_variables[name] = matches[0].value
elif len(matches) > 1:
raise ValueError(
"Too many values expanding", variables[name], artifact.data
)
else:
raise KeyError(variables[name], artifact.data)
return name_template.format(**expanded_variables)
def __str__(self) -> str:
"""Return id, name, collection_id, child_type."""
item_info = (
f" Artifact id: {self.artifact.id}"
if self.artifact
else (
f" Collection id: {self.collection.id}"
if self.collection
else ""
)
)
return (
f"Id: {self.id} Name: {self.name} "
f"Parent collection id: {self.parent_collection_id} "
f"Child type: {self.child_type}"
f"{item_info}"
)
class Meta(TypedModelMeta):
constraints = [
JsonDataUniqueConstraint(
fields=[
"data->>'codename'",
"data->>'architecture'",
"data->>'variant'",
"parent_collection",
],
condition=Q(
category="debian:environment", removed_at__isnull=True
),
nulls_distinct=False,
name="%(app_label)s_%(class)s_unique_debian_environment",
),
UniqueConstraint(
fields=["name", "parent_collection"],
condition=Q(removed_at__isnull=True),
name="%(app_label)s_%(class)s_unique_active_name",
),
# Prevent direct way to add a collection to itself.
# It is still possible to add loops of collections. The Manager
# should avoid it
CheckConstraint(
check=~Q(collection=F("parent_collection")),
name="%(app_label)s_%(class)s_distinct_parent_collection",
),
CheckConstraint(
name="%(app_label)s_%(class)s_childtype_removedat_consistent",
check=(
Q(
child_type=_CollectionItemTypes.BARE,
collection__isnull=True,
artifact__isnull=True,
)
| (
Q(
child_type=_CollectionItemTypes.ARTIFACT,
collection__isnull=True,
)
& (
Q(artifact__isnull=False)
| Q(removed_at__isnull=False)
)
)
| (
Q(
child_type=_CollectionItemTypes.COLLECTION,
artifact__isnull=True,
)
& (
Q(collection__isnull=False)
| Q(removed_at__isnull=False)
)
)
),
),
]
[docs]class CollectionItemMatchConstraint(models.Model):
"""
Enforce matching-value constraints on collection items.
All instances of this model with the same :py:attr:`collection`,
:py:attr:`constraint_name`, and :py:attr:`key` must have the same
:py:attr:`value`.
"""
objects = models.Manager["CollectionItemMatchConstraint"]()
collection = models.ForeignKey(
Collection,
on_delete=models.CASCADE,
related_name="item_match_constraints",
)
# This is deliberately a bare ID, not a foreign key: some constraints
# take into account even items that no longer exist but were in a
# collection in the past.
collection_item_id = models.BigIntegerField()
constraint_name = models.CharField(max_length=255)
key = models.TextField()
value = models.TextField()
class Meta(TypedModelMeta):
constraints = [
ExclusionConstraint(
name="%(app_label)s_%(class)s_match_value",
expressions=(
(F("collection"), RangeOperators.EQUAL),
(F("constraint_name"), RangeOperators.EQUAL),
(F("key"), RangeOperators.EQUAL),
(F("value"), RangeOperators.NOT_EQUAL),
),
)
]
indexes = [
models.Index(
name="%(app_label)s_cimc_collection_item_idx",
fields=["collection_item_id"],
)
]