Source code for debusine.db.models

# Copyright 2019, 2021-2022 The Debusine Developers
# See the AUTHORS file at the top-level directory of this distribution
#
# This file is part of Debusine. It is subject to the license terms
# in the LICENSE file found in the top-level directory of this
# distribution. No part of Debusine, including this file, may be copied,
# modified, propagated, or distributed except according to the terms
# contained in the LICENSE file.

"""Data models for the db application."""

import copy
import hashlib
import logging
import os
import secrets
from datetime import datetime, timedelta
from functools import cached_property, partial
from pathlib import Path
from typing import Any, Optional, TYPE_CHECKING

from django.conf import settings
from django.contrib.auth.models import AbstractUser
from django.contrib.postgres.constraints import ExclusionConstraint
from django.contrib.postgres.fields import RangeOperators
from django.core.exceptions import FieldError, ValidationError
from django.core.validators import (
    MaxLengthValidator,
    MinLengthValidator,
)
from django.db import IntegrityError, models, transaction
from django.db.models import (
    CheckConstraint,
    Count,
    F,
    JSONField,
    Q,
    QuerySet,
    UniqueConstraint,
)
from django.db.models.functions import Coalesce
from django.utils import timezone
from django.utils.text import slugify

import jsonpath_rw

from debusine.artifacts import LocalArtifact
from debusine.db.constraints import JsonDataUniqueConstraint
from debusine.server import notifications
from debusine.server.file_backend.models import (
    ExternalDebianSuiteBackendConfiguration,
    LocalFileBackendConfiguration,
)
from debusine.server.workflows.models import TaskWorkflowData
from debusine.tasks import BaseTask, TaskConfigError
from debusine.tasks.models import (
    ActionTypes,
    EventReaction,
    EventReactions,
    NotificationDataEmail,
    TaskTypes,
)
from debusine.utils import calculate_hash

if TYPE_CHECKING:
    from django_stubs_ext.db.models import TypedModelMeta

    from debusine.server.collections import CollectionManagerInterface
    from debusine.server.file_backend.local import LocalFileBackend
else:
    TypedModelMeta = object

logger = logging.getLogger(__name__)


[docs]class TokenManager(models.Manager["Token"]): """Manager for Token model."""
[docs] def get_tokens( self, username: str | None = None, key: str | None = None ) -> QuerySet["Token"]: """ Return all the tokens filtered by a specific owner and/or token. To avoid filtering by owner or token set them to None """ tokens = self.get_queryset() if username: tokens = tokens.filter(user__username=username) if key: token_hash = hashlib.sha256(key.encode()).hexdigest() tokens = tokens.filter(hash=token_hash) return tokens
[docs] def get_token_or_none(self, token_key: str) -> Optional["Token"]: """Return the token with token_key or None.""" assert isinstance(token_key, str) token_hash = hashlib.sha256(token_key.encode()).hexdigest() try: return self.select_related('worker').get(hash=token_hash) except Token.DoesNotExist: return None
[docs]class Token(models.Model): """ Database model of a token. A token contains a key and other related data. It's used as a shared key between debusine server and clients (workers). This token model is very similar to rest_framework.authtoken.models.Token. The bigger difference is that debusine's token's owner is a CharField, the rest_framework owner is a OneToOne foreign key to a user. Database-wise we don't store the token itself, but a hash of the token. :py:func:`TokenManager.get_token_or_none` can be used to check a provided token key against the database. """ hash = models.CharField( max_length=64, unique=True, verbose_name='Hexadecimal hash, length is 64 chars', validators=[MaxLengthValidator(64), MinLengthValidator(64)], ) user = models.ForeignKey( settings.AUTH_USER_MODEL, null=True, blank=True, on_delete=models.PROTECT, ) comment = models.CharField( max_length=100, default='', verbose_name='Reason that this token was created', blank=True, ) created_at = models.DateTimeField(auto_now_add=True) enabled = models.BooleanField(default=False) key: str
[docs] def save(self, *args, **kwargs) -> None: """Save the token. If it's a new token it generates a key.""" if not self.hash: self.key = self._generate_key() self.hash = self._generate_hash(self.key) super().save(*args, **kwargs)
[docs] def enable(self) -> None: """Enable the token and save it.""" self.enabled = True self.save()
[docs] def disable(self) -> None: """Disable the token and save it.""" self.enabled = False self.save() notifications.notify_worker_token_disabled(self)
def __str__(self) -> str: """Return the hash of the Token.""" return self.hash @classmethod def _generate_key(cls) -> str: """Create and return a key.""" return secrets.token_hex(32) @classmethod def _generate_hash(cls, secret: str) -> str: """Hash the given secret.""" return hashlib.sha256(secret.encode()).hexdigest() objects = TokenManager()
[docs]class WorkerManager(models.Manager["Worker"]): """Manager for Worker model."""
[docs] def connected(self) -> QuerySet["Worker"]: """Return connected workers.""" return Worker.objects.filter(connected_at__isnull=False).order_by( 'connected_at' )
[docs] def waiting_for_work_request(self) -> QuerySet["Worker"]: """ Return workers that can be assigned a new work request. The workers with fewer associated pending or running work requests than their concurrency level could take more work right now and are thus waiting for a work request. Worker's token must be enabled. """ running_work_request_count = Count( 'assigned_work_requests', filter=Q( assigned_work_requests__status__in=[ WorkRequest.Statuses.RUNNING, WorkRequest.Statuses.PENDING, ] ), ) workers = ( Worker.objects.filter(connected_at__isnull=False) .order_by('connected_at') .annotate(count_running=running_work_request_count) .filter(count_running__lt=F("concurrency")) .filter(Q(internal=True) | Q(token__enabled=True)) ) return workers
@staticmethod def _generate_unique_name(name: str, counter: int) -> str: """Return name slugified adding "-counter" if counter != 1.""" new_name = slugify(name.replace('.', '-')) if counter != 1: new_name += f'-{counter}' return new_name
[docs] @classmethod def create_with_fqdn(cls, fqdn: str, token: Token) -> "Worker": """Return a new Worker with its name based on fqdn, with token.""" counter = 1 while True: name = cls._generate_unique_name(fqdn, counter) try: with transaction.atomic(): return Worker.objects.create( name=name, token=token, registered_at=timezone.now() ) except IntegrityError: counter += 1
[docs] @classmethod def get_or_create_celery(cls) -> "Worker": """Return a new Worker representing the Celery task queue.""" try: return Worker.objects.get(name="celery", internal=True) except Worker.DoesNotExist: return Worker.objects.create( name="celery", internal=True, registered_at=timezone.now() )
[docs] def get_worker_by_token_key_or_none( self, token_key: str ) -> Optional["Worker"]: """Return a Worker identified by its associated secret token.""" try: token_hash = hashlib.sha256(token_key.encode()).hexdigest() return Worker.objects.get(token__hash=token_hash) except Worker.DoesNotExist: return None
[docs] def get_worker_or_none(self, worker_name: str) -> Optional["Worker"]: """Return the worker with worker_name or None.""" try: return self.get(name=worker_name) except Worker.DoesNotExist: return None
[docs]class Worker(models.Model): """Database model of a worker.""" name = models.SlugField( unique=True, help_text='Human readable name of the worker based on the FQDN', ) registered_at = models.DateTimeField() connected_at = models.DateTimeField(blank=True, null=True) # This is the token used by the Worker to authenticate # Users have their own tokens - this is specific to a single worker. token = models.OneToOneField( Token, null=True, on_delete=models.PROTECT, related_name="worker" ) static_metadata = JSONField(default=dict, blank=True) dynamic_metadata = JSONField(default=dict, blank=True) dynamic_metadata_updated_at = models.DateTimeField(blank=True, null=True) internal = models.BooleanField(default=False) concurrency = models.PositiveIntegerField( default=1, help_text="Number of tasks this worker can run simultaneously", ) class Meta(TypedModelMeta): constraints = [ # Non-internal workers must have a token. CheckConstraint( name="%(app_label)s_%(class)s_internal_or_token", check=Q(internal=True) | Q(token__isnull=False), ) ]
[docs] def mark_disconnected(self) -> None: """Update and save relevant Worker fields after disconnecting.""" self.connected_at = None self.save()
[docs] def mark_connected(self) -> None: """Update and save relevant Worker fields after connecting.""" self.connected_at = timezone.now() self.save()
[docs] def connected(self) -> bool: """Return True if the Worker is connected.""" return self.connected_at is not None
[docs] def is_busy(self) -> bool: """ Return True if the Worker is busy with work requests. A Worker is busy if it has as many running or pending work requests as its concurrency level. """ return ( WorkRequest.objects.running(worker=self) | WorkRequest.objects.pending(worker=self) ).count() >= self.concurrency
[docs] def metadata(self) -> dict[str, Any]: """ Return all metadata with static_metadata and dynamic_metadata merged. If the same key is in static_metadata and dynamic_metadata: static_metadata takes priority. """ return { **copy.deepcopy(self.dynamic_metadata), **copy.deepcopy(self.static_metadata), }
[docs] def set_dynamic_metadata(self, metadata: dict[str, Any]) -> None: """Save metadata and update dynamic_metadata_updated_at.""" self.dynamic_metadata = metadata self.dynamic_metadata_updated_at = timezone.now() self.save()
def __str__(self) -> str: """Return the id and name of the Worker.""" return f"Id: {self.id} Name: {self.name}" objects = WorkerManager()
[docs]class WorkflowTemplate(models.Model): """ Database model for Workflow templates. Workflow templates contain the information needed to instantiate a workflow, with a Workflow orchestrator and mandatory parameters. """ class Meta(TypedModelMeta): constraints = [ UniqueConstraint( fields=["name", "workspace"], name="%(app_label)s_%(class)s_unique_name_workspace", ), ] name = models.CharField(max_length=255) workspace = models.ForeignKey("Workspace", on_delete=models.PROTECT) task_name = models.CharField( max_length=100, verbose_name='Name of the Workflow orchestrator class' ) task_data = JSONField(default=dict, blank=True) priority = models.IntegerField( default=0, help_text="Base priority for work requests created from this template", )
[docs] def clean(self): """ Ensure that task_name and task data are valid. :raise ValidationError: for invalid data. """ # Import here to prevent circular imports from debusine.server.workflows import Workflow if not isinstance(self.task_data, dict): raise ValidationError( {"task_data": "task data must be a dictionary"} ) # Instantiate the orchestrator and use it to validate task_data workflow_cls = Workflow.from_name(self.task_name) try: workflow_cls.validate_template_data(self.task_data) except Exception as exc: raise ValidationError({"task_data": str(exc)})
class _WorkRequestStatuses(models.TextChoices): """Choices for WorkRequest.status.""" PENDING = "pending", "Pending" RUNNING = "running", "Running" COMPLETED = "completed", "Completed" ABORTED = "aborted", "Aborted" BLOCKED = "blocked", "Blocked"
[docs]class WorkRequestManager(models.Manager["WorkRequest"]): """Manager for WorkRequest model."""
[docs] def create_workflow_callback( self, *, parent: "WorkRequest", step: str, display_name: str | None = None, status: _WorkRequestStatuses | None = None, ): """ Create a workflow callback WorkRequest. A parent is always required, as a callback only makes sense as part of a workflow. :param step: string set by the workflow to identify this callback """ return self.create( workspace=parent.workspace, parent=parent, created_by=parent.created_by, status=status or WorkRequest.Statuses.BLOCKED, task_type=TaskTypes.INTERNAL, task_name="workflow", task_data={}, priority_base=parent.priority_effective, workflow_data_json=TaskWorkflowData( step=step, display_name=display_name or step ).dict(exclude_unset=True), )
[docs] def create_workflow( self, *, template: WorkflowTemplate, data: dict[str, Any], created_by: "User", parent: Optional["WorkRequest"] = None, status: _WorkRequestStatuses | None = None, ) -> "WorkRequest": """Create a workflow from a template and user-provided data.""" # Import here to prevent circular imports from debusine.server.workflows import Workflow # Lookup the orchestrator workflow_cls = Workflow.from_name(template.task_name) # Merge user provided data into template data task_data = workflow_cls.build_workflow_data(template.task_data, data) # Build the WorkRequest work_request = self.create( workspace=template.workspace, parent=parent, created_by=created_by, status=status or WorkRequest.Statuses.BLOCKED, task_type=TaskTypes.WORKFLOW, task_name=template.task_name, task_data=task_data, priority_base=template.priority, ) try: # Instantiate the orchestrator orchestrator = workflow_cls(work_request) # Thorough input validation orchestrator.validate_input() # Populate the workflow with initial work requests orchestrator.populate() except Exception: # TODO: Can we have a better except than something which catches # also things like SyntaxError? # TODO: How can we store the error so that the user can see it? logger.exception("Cannot start a workflow") work_request.status = WorkRequest.Statuses.RUNNING work_request.mark_completed(WorkRequest.Results.ERROR) return work_request
[docs] def create_synchronization_point( self, *, parent: "WorkRequest", status: _WorkRequestStatuses | None = None, ): """ Create a synchronization point WorkRequest. A parent is always required, as a synchronization point only makes sense as part of a workflow. """ return self.create( workspace=parent.workspace, parent=parent, created_by=parent.created_by, status=status or WorkRequest.Statuses.BLOCKED, task_type=TaskTypes.INTERNAL, task_name="synchronization_point", task_data={}, priority_base=parent.priority_effective, )
[docs] def pending( self, exclude_assigned: bool = False, worker: Worker | None = None ) -> QuerySet["WorkRequest"]: """ Return a QuerySet of tasks in WorkRequest.Statuses.PENDING status. QuerySet is ordered by descending effective priority, then by created_at. Filter out the assigned pending ones if exclude_assigned=True, and include only the WorkRequest for worker. PENDING is the default status of a task on creation. """ if exclude_assigned and worker is not None: raise ValueError("Cannot exclude_assigned and filter by worker") qs = WorkRequest.objects.filter(status=WorkRequest.Statuses.PENDING) if exclude_assigned: qs = qs.exclude(worker__isnull=False) if worker is not None: qs = qs.filter(worker=worker) qs = qs.order_by( (F("priority_base") + F("priority_adjustment")).desc(), "created_at" ) return qs
[docs] def running(self, worker: Worker | None = None) -> QuerySet["WorkRequest"]: """Return a QuerySet of tasks in running status.""" qs = WorkRequest.objects.filter(status=WorkRequest.Statuses.RUNNING) if worker is not None: qs = qs.filter(worker=worker) return qs
[docs] def completed(self) -> QuerySet["WorkRequest"]: """Return a QuerySet of tasks in completed status.""" return WorkRequest.objects.filter(status=WorkRequest.Statuses.COMPLETED)
[docs] def aborted(self) -> QuerySet["WorkRequest"]: """Return a QuerySet of tasks in aborted status.""" return WorkRequest.objects.filter(status=WorkRequest.Statuses.ABORTED)
[docs]class WorkRequest(models.Model): """ Database model of a request to execute a task. Time-consuming operations offloaded to Workers and using Artifacts (and associated Files) as input and output. Submission API needs to check if the request is valid using ontological rules - e.g. whether the specified distribution for a build task exists. Avoid exposing the status of tasks to the admin interface to avoid runaway changes whilst the scheduler process is running. The WorkRequest uses the non-Django tasks module to do the checks on whether a task can run on a particular worker. WorkRequest State Machine ========================= New WorkRequest database entries default to ``WorkRequest.Statuses.PENDING``. Once the WorkRequest is assigned to a worker and is running starts running the status is changed to ``WorkRequest.Statuses.RUNNING``. If the WorkRequest is aborted, the Scheduled.Task status is ``WorkRequest.Statuses.ABORTED``. If the task finish on the Worker the WorkRequest status will be ``WorkRequest.Statuses.COMPLETED`` and a WorkRequest.Result is then set, ``WorkRequest.Results.PASSED`` or ``WorkRequest.Results.FAILED``. .. graphviz:: digraph { Statuses_PENDING -> Statuses_RUNNING -> Statuses_COMPLETED; Statuses_PENDING -> Statuses_COMPLETED; Statuses_PENDING -> Statuses_ABORTED; Statuses_PENDING -> Statuses_RUNNING -> Statuses_ABORTED; } ``WorkRequest.started_at`` is set when the WorkRequest moves from ``WorkRequest.Statuses.PENDING`` to ``WorkRequest.Statuses.RUNNING``. ``WorkRequest.completed_at`` is set when the Task moves from ``WorkRequest.Statuses.RUNNING`` to ``WorkRequest.Statuses.COMPLETED``. """ objects = WorkRequestManager() Statuses = _WorkRequestStatuses
[docs] class Results(models.TextChoices): NONE = "", "" SUCCESS = "success", "Success" FAILURE = "failure", "Failure" ERROR = "error", "Error"
[docs] class UnblockStrategy(models.TextChoices): DEPS = "deps", "Dependencies have completed" MANUAL = "manual", "Manually unblocked"
workspace = models.ForeignKey("Workspace", on_delete=models.PROTECT) created_at = models.DateTimeField(auto_now_add=True) started_at = models.DateTimeField(blank=True, null=True) completed_at = models.DateTimeField(blank=True, null=True) created_by = models.ForeignKey("User", on_delete=models.PROTECT) status = models.CharField( max_length=9, choices=Statuses.choices, default=Statuses.PENDING, editable=False, ) result = models.CharField( max_length=7, choices=Results.choices, default=Results.NONE, editable=False, ) # any one work request can only be on one worker # even if the worker can handle multiple work request. worker = models.ForeignKey( Worker, null=True, blank=True, on_delete=models.CASCADE, related_name="assigned_work_requests", ) task_type = models.CharField( max_length=8, choices=TaskTypes.choices, default=TaskTypes.WORKER, editable=False, verbose_name="Type of task to execute", ) task_name = models.CharField( max_length=100, verbose_name='Name of the task to execute' ) task_data = JSONField(default=dict, blank=True) priority_base = models.IntegerField( default=0, help_text="Base priority of this work request" ) priority_adjustment = models.IntegerField( default=0, help_text=( "Administrator adjustment to the priority of this work request" ), ) # Workflows unblock_strategy = models.CharField( max_length=6, choices=UnblockStrategy.choices, default=UnblockStrategy.DEPS, editable=False, ) dependencies = models.ManyToManyField( "self", symmetrical=False, related_name="reverse_dependencies" ) parent = models.ForeignKey( "self", # WorkRequests are only deleted through expiration, use CASCADE on_delete=models.CASCADE, blank=True, null=True, ) workflow_data_json = JSONField( default=dict, blank=True, db_column="workflow_data" ) event_reactions_json = JSONField( default=dict, blank=True, db_column="event_reactions" ) class Meta(TypedModelMeta): indexes = [ # Handles the main scheduler query. models.Index( (F("priority_base") + F("priority_adjustment")).desc(), "created_at", name="%(app_label)s_%(class)s_pending_idx", condition=Q(status=_WorkRequestStatuses.PENDING), ), # Handles queries from workers. models.Index( "worker", name="%(app_label)s_%(class)s_worker_idx", condition=Q( status__in=( _WorkRequestStatuses.PENDING, _WorkRequestStatuses.RUNNING, ) ), ), ] permissions = [ ( "manage_workrequest_priorities", "Can set positive priority adjustments on work requests", ) ] def __str__(self) -> str: """Return the id of the WorkRequest.""" return str(self.id) @property def workflow_data(self) -> TaskWorkflowData | None: """Access workflow_data_json as a python structure.""" if self.workflow_data_json: return TaskWorkflowData(**self.workflow_data_json) else: return None @workflow_data.setter def workflow_data(self, value: TaskWorkflowData | None) -> None: """Set workflow_data_json from a python structure.""" if value is None: self.workflow_data_json = {} else: self.workflow_data_json = value.dict( exclude_unset=True, ) @property def event_reactions(self) -> EventReactions: """Access event_reactions_json as a pydantic structure.""" return EventReactions(**self.event_reactions_json) @event_reactions.setter def event_reactions(self, value: EventReactions) -> None: """Set event_reactions_json from a pydantic structure.""" self.event_reactions_json = value.dict()
[docs] def clean(self): """ Ensure that task data is valid for this task name. :raise ValidationError: for invalid data. """ if not isinstance(self.task_data, dict): raise ValidationError( {"task_data": "task data must be a dictionary"} ) match self.task_type: case TaskTypes.WORKER | TaskTypes.SERVER: try: task_cls = BaseTask.class_from_name( self.task_type, self.task_name ) except (KeyError, ValueError) as e: raise ValidationError( { "task_name": f"{self.task_name}:" f" invalid {self.task_type} task name" } ) from e try: task_cls(task_data=self.task_data) except TaskConfigError as e: raise ValidationError( { "task_data": f"invalid {self.task_type}" f" task data: {e}" } ) from e case TaskTypes.WORKFLOW: # Import here to prevent circular imports from debusine.server.workflows import Workflow try: workflow_cls = Workflow.from_name(self.task_name) except (KeyError, ValueError) as e: raise ValidationError( { "task_name": f"{self.task_name}:" f" invalid workflow name" } ) from e try: workflow_cls(self) except TaskConfigError as e: raise ValidationError( {"task_data": f"invalid workflow data: {e}"} ) from e # TODO: do we want to run expensive tests here # (Workflow.validate_input), like looking up the types of # references artifacts to validate them? case TaskTypes.INTERNAL: if self.task_name not in ("synchronization_point", "workflow"): raise ValidationError( { "task_name": f"{self.task_name}:" " invalid task name for internal task" } ) # Without this pass, python coverage is currently unable to # detect that code does flow through here pass case _: raise NotImplementedError( f"task type {self.task_type} not yet supported" )
[docs] def mark_running(self) -> bool: """Worker has begun executing the task.""" if self.worker is None: logger.debug( "Cannot mark WorkRequest %s as running: it does not have " "an assigned worker", self.pk, ) return False if self.status == self.Statuses.RUNNING: # It was already running - nothing to do return True if self.status != self.Statuses.PENDING: logger.debug( "Cannot mark as running - current status %s", self.status ) return False work_requests_running_for_worker = WorkRequest.objects.running( worker=self.worker ) # There is a possible race condition here. This check (and other # checks in this class) currently help to avoid development mistakes # not database full integrity if work_requests_running_for_worker.count() >= self.worker.concurrency: logger.debug( "Cannot mark WorkRequest %s as running - the assigned worker " "%s is running too many other WorkRequests: %s", self.pk, self.worker, list(work_requests_running_for_worker.order_by("id")), ) return False self.started_at = timezone.now() self.status = self.Statuses.RUNNING self.save() logger.debug("Marked WorkRequest %s as running", self.pk) return True
[docs] def mark_completed(self, result: "WorkRequest.Results") -> bool: """Worker has finished executing the task.""" if self.status not in (self.Statuses.PENDING, self.Statuses.RUNNING): logger.debug( "Cannot mark WorkRequest %s as completed: current status is %s", self.pk, self.status, ) return False self.result = result self.completed_at = timezone.now() self.status = self.Statuses.COMPLETED self.save() logger.debug("Marked WorkRequest %s as completed", self.pk) # mark dependencies ready before sending notification self.unblock_reverse_dependencies() self.process_event_reactions() return True
[docs] def process_event_reactions(self): """Process list of actions to perform on completion.""" actions = self.get_triggered_actions() notifications.notify_work_request_completed( self, actions[ActionTypes.SEND_NOTIFICATION] ) self.process_update_collection_with_artifacts( actions[ActionTypes.UPDATE_COLLECTION_WITH_ARTIFACTS] )
[docs] def get_triggered_actions(self) -> dict[str, list[EventReaction]]: """Filter events to trigger, grouped by type.""" actions: dict[str, list[EventReaction]] = { action: [] for action in ActionTypes } if self.result in (WorkRequest.Results.SUCCESS,): for action in self.event_reactions.on_success: action_type = action.action actions[action_type].append(action) elif self.result in ( WorkRequest.Results.FAILURE, WorkRequest.Results.ERROR, ): for action in self.event_reactions.on_failure: action_type = action.action actions[action_type].append(action) else: # Results.NONE pass return actions
[docs] def process_update_collection_with_artifacts(self, actions): """Update collection following event_reactions.""" # local import to avoid circular dependency from debusine.server.collections import ( CollectionManagerInterface, ItemAdditionError, ItemRemovalError, ) for update in actions: try: collection = Collection.objects.get( workspace=self.workspace, id=update.collection_id ) except Collection.DoesNotExist: logger.exception( "Invalid update-collection-with-artifacts" " collection_id %s in WorkRequest %s", update.collection_id, self.pk, ) continue manager = CollectionManagerInterface.get_manager_for(collection) try: artifacts_to_add = self.artifact_set.filter( **update.artifact_filters ) except FieldError: logger.exception( "Invalid update-collection-with-artifacts" " artifact_filters in WorkRequest %s", self.pk, ) continue for artifact in artifacts_to_add: item_name = None if update.name_template is not None: try: item_name = CollectionItem.expand_name( update.name_template, update.variables, artifact, ) except (KeyError, ValueError): logger.exception( "Invalid update-collection-with-artifacts" " variables in WorkRequest %s", self.pk, ) continue try: manager.replace_or_add_artifact( artifact, user=self.created_by, name=item_name, ) except (ItemAdditionError, ItemRemovalError, LookupError): logger.exception( "Cannot replace or add artifact %s to collection %s" " from WorkRequest %s", artifact, collection, self.pk, )
[docs] def mark_pending(self) -> bool: """Worker is ready for execution.""" if self.status not in (self.Statuses.BLOCKED): logger.debug( "Cannot mark WorkRequest %s as pending: current status is %s", self.pk, self.status, ) return False self.status = self.Statuses.PENDING self.save() logger.debug("Marked WorkRequest %s as pending", self.pk) return True
[docs] def unblock_reverse_dependencies(self): """Unblock reverse dependencies.""" # Shortcuts to keep line length sane r = WorkRequest.Results s = WorkRequest.Statuses if self.result == r.SUCCESS: for rdep in self.reverse_dependencies.filter(status=s.BLOCKED): if rdep.unblock_strategy == WorkRequest.UnblockStrategy.DEPS: other_deps = rdep.dependencies.filter(~Q(id=self.id)) if other_deps.filter(~Q(status=s.COMPLETED)).count() == 0: rdep.mark_pending() else: # failures # TODO: check future self.workflow.allow_failure for rdep in self.reverse_dependencies.filter(status=s.BLOCKED): rdep.mark_aborted()
[docs] def mark_aborted(self) -> bool: """ Worker has aborted the task after request from UI. Task will typically be in CREATED or RUNNING status. """ self.completed_at = timezone.now() self.status = self.Statuses.ABORTED self.save() logger.debug( "Marked WorkRequest %s as aborted (from status %s)", self.pk, self.status, ) return True
[docs] def assign_worker(self, worker: Worker | None) -> None: """Assign worker and save it.""" self.worker = worker self.save() notifications.notify_work_request_assigned(self)
@property def duration(self) -> float | None: """Return duration in seconds between started_at and completed_at.""" if self.started_at and self.completed_at: return (self.completed_at - self.started_at).total_seconds() else: return None @property def priority_effective(self) -> int: """The effective priority of this work request.""" return self.priority_base + self.priority_adjustment
[docs]class File(models.Model): """ Database model of a file. Model different attributes of the file. From outside the class use the property ``hash`` and do not use ``sha256`` field. This allows, if ever needed, to change the hash algorithm only modifying this class without changing the users of this class. """ current_hash_algorithm = "sha256" sha256 = models.BinaryField( max_length=int(hashlib.new(current_hash_algorithm).digest_size / 8), help_text=f"{current_hash_algorithm} of the file", ) size = models.PositiveBigIntegerField(help_text="Size in bytes of the file") class Meta(TypedModelMeta): constraints = [ UniqueConstraint( fields=["sha256", "size"], name="%(app_label)s_%(class)s_unique_sha256_size", ), CheckConstraint( name="%(app_label)s_%(class)s_sha256_not_empty", check=~Q(sha256=b""), ), ] def __str__(self) -> str: """Return basic information of File.""" return ( f"id: {self.id} " f"{self.current_hash_algorithm}: {self.hash_digest.hex()} " f"size: {self.size}" )
[docs] @classmethod def from_local_path(cls, local_path: Path) -> "File": """Return a File with the fields.""" kwargs: dict[str, Any] = { File.current_hash_algorithm: cls.calculate_hash(local_path) } file, created = File.objects.get_or_create( **kwargs, size=os.stat(local_path).st_size ) return file
@property def hash_digest(self) -> bytes: """ Return the default hash digest of the File. Use this property instead of the field to allow the algorithm to be changed in the future. """ return bytes(getattr(self, self.current_hash_algorithm)) @hash_digest.setter def hash_digest(self, value: bytes) -> None: setattr(self, self.current_hash_algorithm, value)
[docs] @classmethod def calculate_hash(cls, file_path: Path) -> bytes: """Return hash for the file.""" return calculate_hash(file_path, cls.current_hash_algorithm)
[docs] @classmethod def get_or_create(cls, *, hash_digest: bytes, size: int): """Call File.objects.get_or_create with the correct parameters.""" kwargs: dict[str, Any] = {cls.current_hash_algorithm: hash_digest} return cls.objects.get_or_create(**kwargs, size=size)
class _FileStoreBackendChoices(models.TextChoices): """Enumerate all the Backend choices.""" LOCAL = "Local", "Local" EXTERNAL_DEBIAN_SUITE = "ExternalDebianSuite", "ExternalDebianSuite" DEFAULT_FILE_STORE_NAME = "Default"
[docs]def default_file_store() -> "FileStore": """Return the default file store.""" return FileStore.objects.get(name=DEFAULT_FILE_STORE_NAME)
[docs]class FileStore(models.Model): """ Database model of a FileStore. FileStore has files attached to it. """ # BackendChoices not defined in FileStore to make it accessible # during the migrations. Historical models are used # (https://docs.djangoproject.com/en/3.2/topics/migrations/#historical-models) BackendChoices = _FileStoreBackendChoices configuration_validators = { BackendChoices.LOCAL: LocalFileBackendConfiguration, BackendChoices.EXTERNAL_DEBIAN_SUITE: ( ExternalDebianSuiteBackendConfiguration ), } name = models.CharField(max_length=255, unique=True) backend = models.CharField(max_length=255, choices=BackendChoices.choices) configuration = models.JSONField(default=dict, blank=True) files = models.ManyToManyField(File, through="db.FileInStore")
[docs] @staticmethod def default() -> "FileStore": """Return the default FileStore.""" return default_file_store()
def __str__(self) -> str: """Return basic information of FileStore.""" return f"Id: {self.id} Name: {self.name} Backend: {self.backend}"
[docs] def clean(self): """ Ensure that data is valid for this backend type. :raise ValidationError: for invalid data. """ if not isinstance(self.configuration, dict): raise ValidationError( {"configuration": "configuration must be a dictionary"} ) try: self.configuration_validators[self.backend](**self.configuration) except (TypeError, ValueError) as e: raise ValidationError( {"configuration": f"invalid file store configuration: {e}"} )
[docs] def get_backend_object(self) -> "LocalFileBackend": # noqa: F821 """Instantiate the correct FileBackend and return it.""" if self.backend == _FileStoreBackendChoices.LOCAL: from debusine.server.file_backend.local import LocalFileBackend return LocalFileBackend(self) raise NotImplementedError( f"Cannot create FileStore for backend type: {self.backend}" )
[docs]class FileInStore(models.Model): """ Database model used as "through" from FileStore. Keeps the relationship between stores, files and attached data. """ store = models.ForeignKey(FileStore, on_delete=models.PROTECT) file = models.ForeignKey(File, on_delete=models.PROTECT) data = models.JSONField(default=dict, blank=True) class Meta(TypedModelMeta): constraints = [ UniqueConstraint( fields=["store", "file"], name="%(app_label)s_%(class)s_unique_store_file", ) ] def __str__(self) -> str: """Return basic information of FileInStore.""" return ( f"Id: {self.id} " f"Store: {self.store.name} " f"File: {self.file.hash_digest.hex()}" )
[docs]class WorkspaceManager(models.Manager["Workspace"]): """Manager for Workspace model."""
[docs] @classmethod def create_with_name(cls, name: str) -> "Workspace": """Return a new Workspace with name and the default FileStore.""" return Workspace.objects.create( name=name, default_file_store=FileStore.default() )
DEFAULT_WORKSPACE_NAME = "System"
[docs]def default_workspace() -> "Workspace": """Return the default Workspace.""" return Workspace.objects.get(name=DEFAULT_WORKSPACE_NAME)
[docs]class Workspace(models.Model): """Workspace model.""" objects = WorkspaceManager() name = models.CharField(max_length=255, unique=True) default_file_store = models.ForeignKey( FileStore, on_delete=models.PROTECT, related_name="default_workspaces" ) other_file_stores = models.ManyToManyField( FileStore, related_name="other_workspaces" ) public = models.BooleanField(default=False) default_expiration_delay = models.DurationField( default=timedelta(0), help_text="minimal time that a new artifact is kept in the" " workspace before being expired", )
[docs] def is_file_in_workspace(self, fileobj: File) -> bool: """Return True if fileobj is in any store available for Workspace.""" file_stores = [self.default_file_store, *self.other_file_stores.all()] for file_store in file_stores: if file_store.fileinstore_set.filter(file=fileobj).exists(): return True return False
def __str__(self) -> str: """Return basic information of Workspace.""" return f"Id: {self.id} Name: {self.name}"
[docs]class ArtifactManager(models.Manager["Artifact"]): """Manager for the Artifact model."""
[docs] @classmethod def create_from_local_artifact( cls, local_artifact: LocalArtifact[Any], workspace: Workspace, *, created_by_work_request: WorkRequest | None = None, ) -> "Artifact": """Return a new Artifact based on a :class:`LocalArtifact`.""" file_store = workspace.default_file_store.get_backend_object() artifact = Artifact.objects.create( category=local_artifact.category, workspace=workspace, data=local_artifact.data.dict(), created_by_work_request=created_by_work_request, ) for artifact_path, local_path in local_artifact.files.items(): file = File.from_local_path(local_path) file_store.add_file(local_path, fileobj=file) FileInArtifact.objects.create( artifact=artifact, path=artifact_path, file=file ) return artifact
[docs] def not_expired(self, at: datetime) -> QuerySet["Artifact"]: """ Return queryset with artifacts that have not expired. :param at: datetime to check if the artifacts are not expired. :return: artifacts that expire_at is None (do not expire) or expire_at is after the given datetime. """ return ( self.get_queryset() .annotate( effective_expiration_delay=Coalesce( "expiration_delay", "workspace__default_expiration_delay", ) ) .filter( Q(effective_expiration_delay=timedelta(0)) | Q( # https://github.com/typeddjango/django-stubs/issues/1548 created_at__gt=( at - F("effective_expiration_delay") # type: ignore[operator] # noqa: E501 ) ) ) )
[docs] def expired(self, at: datetime) -> QuerySet["Artifact"]: """ Return queryset with artifacts that have expired. :param at: datetime to check if the artifacts are expired. :return: artifacts that expire_at is before the given datetime. """ return ( self.get_queryset() .annotate( effective_expiration_delay=Coalesce( "expiration_delay", "workspace__default_expiration_delay", ) ) .exclude(effective_expiration_delay=timedelta(0)) .filter( # https://github.com/typeddjango/django-stubs/issues/1548 created_at__lte=( at - F("effective_expiration_delay") # type: ignore[operator] # noqa: E501 ) ) )
[docs]class Artifact(models.Model): """Artifact model.""" category = models.CharField(max_length=255) workspace = models.ForeignKey(Workspace, on_delete=models.PROTECT) files = models.ManyToManyField(File, through="db.FileInArtifact") data = models.JSONField(default=dict, blank=True) created_at = models.DateTimeField(auto_now_add=True) expiration_delay = models.DurationField(blank=True, null=True) created_by = models.ForeignKey( "User", blank=True, null=True, on_delete=models.PROTECT ) created_by_work_request = models.ForeignKey( WorkRequest, blank=True, null=True, on_delete=models.PROTECT ) objects = ArtifactManager()
[docs] def clean(self): """ Ensure that data is valid for this artifact category. :raise ValidationError: for invalid data. """ if not isinstance(self.data, dict): raise ValidationError({"data": "data must be a dictionary"}) try: artifact_cls = LocalArtifact.class_from_category(self.category) except ValueError as e: raise ValidationError( {"category": f"{self.category}: invalid artifact category"} ) from e try: artifact_cls.create_data(self.data) except ValueError as e: raise ValidationError( {"data": f"invalid artifact data: {e}"} ) from e
[docs] def effective_expiration_delay(self): """Return expiration_delay, inherited if None.""" expiration_delay = self.expiration_delay if self.expiration_delay is None: # inherit expiration_delay = self.workspace.default_expiration_delay return expiration_delay
@property def expire_at(self) -> datetime | None: """Return computed expiration date.""" delay = self.effective_expiration_delay() if delay == timedelta(0): return None return self.created_at + delay
[docs] def expired(self, at: datetime) -> bool: """ Return True if this artifact has expired at a given datetime. :param at: datetime to check if the artifact is expired. :return bool: True if the artifact's expire_at is on or earlier than the parameter at. """ expire_at = self.expire_at if expire_at is None: return False return expire_at <= at
def __str__(self) -> str: """Return basic information of Artifact.""" return ( f"Id: {self.id} " f"Category: {self.category} " f"Workspace: {self.workspace.id}" )
[docs]class FileInArtifact(models.Model): """File in artifact.""" artifact = models.ForeignKey(Artifact, on_delete=models.PROTECT) path = models.CharField(max_length=500) file = models.ForeignKey(File, on_delete=models.PROTECT) class Meta(TypedModelMeta): constraints = [ UniqueConstraint( fields=["artifact", "path"], name="%(app_label)s_%(class)s_unique_artifact_path", ), ] def __str__(self) -> str: """Return basic information of FileInArtifact.""" return ( f"Id: {self.id} Artifact: {self.artifact.id} " f"Path: {self.path} File: {self.file.id}" )
[docs]class FileUpload(models.Model): """File that is being/has been uploaded.""" file_in_artifact = models.OneToOneField( FileInArtifact, on_delete=models.PROTECT ) path = models.CharField( max_length=500, help_text="Path in the uploads directory", unique=True, ) last_activity_at = models.DateTimeField(auto_now_add=True)
[docs] @classmethod def current_size(cls, artifact: Artifact, path_in_artifact: str) -> int: """ Return current file size. The current file size might be smaller than the expected size of the file if the file has not finished being uploaded. Raise ValueError if path_in_artifact does not exist in Artifact or if there's no FileUpload object for the specific File. """ try: file_in_artifact = FileInArtifact.objects.get( artifact=artifact, path=path_in_artifact ) except FileInArtifact.DoesNotExist: raise ValueError( f'No FileInArtifact for Artifact {artifact.id} ' f'and path "{path_in_artifact}"' ) try: file_upload = FileUpload.objects.get( file_in_artifact=file_in_artifact ) except FileUpload.DoesNotExist: raise ValueError( f"No FileUpload for FileInArtifact {file_in_artifact.id}" ) try: size = file_upload.absolute_file_path().stat().st_size except FileNotFoundError: size = 0 return size
[docs] def delete(self, *args, **kwargs): """Schedule deletion of the file in the store.""" file_path = self.absolute_file_path() result = super().delete(*args, **kwargs) # If this method is called from a transaction: transaction.on_commit() # will call file_path.unlink when the most outer transaction is # committed. # # In the case that the code is running without a transaction: # the file_path.unlink will happen now. # # It's important that file_path.unlink is called only if the # DB is updated with the deletion. Otherwise, the file could be # deleted from the store but still referenced from the DB. transaction.on_commit(partial(file_path.unlink, missing_ok=True)) return result
[docs] def absolute_file_path(self) -> Path: """ Return the absolute file path of the file. The files are stored in settings.DEBUSINE_UPLOAD_DIRECTORY. """ return Path(settings.DEBUSINE_UPLOAD_DIRECTORY) / self.path
def __str__(self) -> str: """Return basic information.""" return f"{self.id}"
[docs]class ArtifactRelation(models.Model): """Model relations between artifacts."""
[docs] class Relations(models.TextChoices): EXTENDS = "extends", "Extends" RELATES_TO = "relates-to", "Relates to" BUILT_USING = "built-using", "Built using"
artifact = models.ForeignKey( Artifact, on_delete=models.PROTECT, related_name="relations" ) target = models.ForeignKey( Artifact, on_delete=models.PROTECT, related_name="targeted_by" ) type = models.CharField(max_length=11, choices=Relations.choices) def __str__(self) -> str: """Return str for the object.""" return f"{self.artifact.id} {self.type} {self.target.id}" class Meta(TypedModelMeta): constraints = [ UniqueConstraint( fields=["artifact", "target", "type"], name="%(app_label)s_%(class)s_unique_artifact_target_type", ) ]
[docs]class User(AbstractUser): """Debusine user.""" email = models.EmailField(unique=True)
[docs]class Identity(models.Model): """ Identity for a user in a remote user database. An Identity is bound if it's associated with a Django user, or unbound if no Django user is known for it. """ class Meta(TypedModelMeta): constraints = [ UniqueConstraint( fields=["issuer", "subject"], name="%(app_label)s_%(class)s_unique_issuer_subject", ), ] user = models.ForeignKey( settings.AUTH_USER_MODEL, related_name="identities", null=True, on_delete=models.SET_NULL, ) issuer = models.CharField( max_length=512, help_text="identifier of auhoritative system for this identity", ) subject = models.CharField( max_length=512, help_text="identifier of the user in the issuer system", ) last_used = models.DateTimeField( auto_now=True, help_text="last time this identity has been used" ) claims = models.JSONField(default=dict) def __str__(self): """Return str for the object.""" return f"{self.issuer}:{self.subject}"
[docs]class NotificationChannel(models.Model): """Model to store notification configuration."""
[docs] class Methods(models.TextChoices): EMAIL = "email", "Email"
data_validators = {Methods.EMAIL: NotificationDataEmail} name = models.CharField( max_length=20, unique=True, ) method = models.CharField(max_length=10, choices=Methods.choices) data = models.JSONField(default=dict, blank=True)
[docs] def clean(self): """ Ensure that data is valid for the specific method. :raise ValidationError: for invalid data. """ try: self.data_validators[self.method](**self.data) except (TypeError, ValueError) as exc: raise ValidationError( f"NotificationChannel data is not valid: {exc}" ) return super().clean()
[docs] def save(self, *args, **kwargs): """Run validators and save the instance.""" self.full_clean() return super().save(*args, **kwargs)
def __str__(self): """Return name.""" return self.name
[docs]class Collection(models.Model): """Model representing a collection.""" name = models.CharField(max_length=255) category = models.CharField(max_length=255) full_history_retention_period = models.DurationField(null=True, blank=True) metadata_only_retention_period = models.DurationField(null=True, blank=True) workspace = models.ForeignKey(Workspace, on_delete=models.PROTECT) child_artifacts = models.ManyToManyField( Artifact, through="db.CollectionItem", through_fields=("parent_collection", "artifact"), related_name="parent_collections", ) child_collections = models.ManyToManyField( "self", through="db.CollectionItem", through_fields=("parent_collection", "collection"), related_name="parent_collections", symmetrical=False, ) data = models.JSONField(default=dict, blank=True) class Meta(TypedModelMeta): constraints = [ UniqueConstraint( fields=["name", "category"], name="%(app_label)s_%(class)s_unique_name_category", ), CheckConstraint( check=~Q(name=""), name="%(app_label)s_%(class)s_name_not_empty" ), CheckConstraint( check=~Q(category=""), name="%(app_label)s_%(class)s_category_not_empty", ), ] @cached_property def manager(self) -> "CollectionManagerInterface": """Get collection manager for this collection category.""" # Local import to avoid circular dependency from debusine.server.collections import CollectionManagerInterface return CollectionManagerInterface.get_manager_for(self) def __str__(self) -> str: """Return id, name, category.""" return f"Id: {self.id} Name: {self.name} Category: {self.category}"
[docs]class CollectionItemManager(models.Manager["CollectionItem"]): """Manager for CollectionItem model."""
[docs] @staticmethod def create_from_artifact( artifact: Artifact, *, parent_collection: Collection, name: str, data: dict[str, Any], created_by_user: User, ) -> "CollectionItem": """Create a CollectionItem from the artifact.""" return CollectionItem.objects.create( parent_collection=parent_collection, name=name, artifact=artifact, child_type=CollectionItem.Types.ARTIFACT, category=artifact.category, data=data, created_by_user=created_by_user, )
[docs] @staticmethod def create_from_collection( collection: Collection, *, parent_collection: Collection, name: str, data: dict[str, Any], created_by_user: User, ) -> "CollectionItem": """Create a CollectionItem from the collection.""" return CollectionItem.objects.create( parent_collection=parent_collection, name=name, collection=collection, child_type=CollectionItem.Types.COLLECTION, category=collection.category, data=data, created_by_user=created_by_user, )
[docs]class CollectionItemActiveManager(CollectionItemManager): """Manager for active collection items."""
[docs] def get_queryset(self) -> QuerySet["CollectionItem"]: """Return only active collection items.""" return super().get_queryset().filter(removed_at__isnull=True)
class _CollectionItemTypes(models.TextChoices): """Choices for the CollectionItem.type.""" BARE = "b", "Bare" ARTIFACT = "a", "Artifact" COLLECTION = "c", "Collection"
[docs]class CollectionItem(models.Model): """CollectionItem model.""" objects = CollectionItemManager() active_objects = CollectionItemActiveManager() name = models.CharField(max_length=255) Types = _CollectionItemTypes child_type = models.CharField(max_length=1, choices=Types.choices) # category duplicates the category of the artifact or collection of this # item, so when the underlying artifact or collection is deleted the # category is retained category = models.CharField(max_length=255) parent_collection = models.ForeignKey( Collection, on_delete=models.PROTECT, related_name="child_items", ) collection = models.ForeignKey( Collection, on_delete=models.PROTECT, related_name="collection_items", null=True, ) artifact = models.ForeignKey( Artifact, on_delete=models.PROTECT, related_name="collection_items", null=True, ) data = models.JSONField(default=dict, blank=True) created_at = models.DateTimeField(auto_now_add=True) created_by_user = models.ForeignKey( settings.AUTH_USER_MODEL, on_delete=models.PROTECT, related_name="user_created_%(class)s", ) # created_by_workflow = models.ForeignKey(Workflow, # on_delete=models.PROTECT, null=True) removed_at = models.DateTimeField(blank=True, null=True) removed_by_user = models.ForeignKey( settings.AUTH_USER_MODEL, on_delete=models.PROTECT, null=True, related_name="user_removed_%(class)s", ) # removed_by_workflow = models.ForeignKey(Workflow, # on_delete=models.PROTECT, null=True)
[docs] @staticmethod def expand_name( name_template: str, variables: dict[str, str], artifact: Artifact ) -> str: """ Format item name following item_template. Expand JSONPath variables against an Artifact data. """ jsonpaths = {} for name, path in variables.items(): try: jsonpaths[name] = jsonpath_rw.parse(path) except Exception as e: raise ValueError(e) expanded_variables = {} for name, jsonpath in jsonpaths.items(): matches = jsonpath.find(artifact.data) if len(matches) == 1: expanded_variables[name] = matches[0].value elif len(matches) > 1: raise ValueError( "Too many values expanding", variables[name], artifact.data ) else: raise KeyError(variables[name], artifact.data) return name_template.format(**expanded_variables)
def __str__(self) -> str: """Return id, name, collection_id, child_type.""" item_info = ( f" Artifact id: {self.artifact.id}" if self.artifact else ( f" Collection id: {self.collection.id}" if self.collection else "" ) ) return ( f"Id: {self.id} Name: {self.name} " f"Parent collection id: {self.parent_collection_id} " f"Child type: {self.child_type}" f"{item_info}" ) class Meta(TypedModelMeta): constraints = [ JsonDataUniqueConstraint( fields=[ "data->>'codename'", "data->>'architecture'", "data->>'variant'", "parent_collection", ], condition=Q( category="debian:environment", removed_at__isnull=True ), nulls_distinct=False, name="%(app_label)s_%(class)s_unique_debian_environment", ), UniqueConstraint( fields=["name", "parent_collection"], condition=Q(removed_at__isnull=True), name="%(app_label)s_%(class)s_unique_active_name", ), # Prevent direct way to add a collection to itself. # It is still possible to add loops of collections. The Manager # should avoid it CheckConstraint( check=~Q(collection=F("parent_collection")), name="%(app_label)s_%(class)s_distinct_parent_collection", ), CheckConstraint( name="%(app_label)s_%(class)s_childtype_removedat_consistent", check=( Q( child_type=_CollectionItemTypes.BARE, collection__isnull=True, artifact__isnull=True, ) | ( Q( child_type=_CollectionItemTypes.ARTIFACT, collection__isnull=True, ) & ( Q(artifact__isnull=False) | Q(removed_at__isnull=False) ) ) | ( Q( child_type=_CollectionItemTypes.COLLECTION, artifact__isnull=True, ) & ( Q(collection__isnull=False) | Q(removed_at__isnull=False) ) ) ), ), ]
[docs]class CollectionItemMatchConstraint(models.Model): """ Enforce matching-value constraints on collection items. All instances of this model with the same :py:attr:`collection`, :py:attr:`constraint_name`, and :py:attr:`key` must have the same :py:attr:`value`. """ objects = models.Manager["CollectionItemMatchConstraint"]() collection = models.ForeignKey( Collection, on_delete=models.CASCADE, related_name="item_match_constraints", ) # This is deliberately a bare ID, not a foreign key: some constraints # take into account even items that no longer exist but were in a # collection in the past. collection_item_id = models.BigIntegerField() constraint_name = models.CharField(max_length=255) key = models.TextField() value = models.TextField() class Meta(TypedModelMeta): constraints = [ ExclusionConstraint( name="%(app_label)s_%(class)s_match_value", expressions=( (F("collection"), RangeOperators.EQUAL), (F("constraint_name"), RangeOperators.EQUAL), (F("key"), RangeOperators.EQUAL), (F("value"), RangeOperators.NOT_EQUAL), ), ) ] indexes = [ models.Index( name="%(app_label)s_cimc_collection_item_idx", fields=["collection_item_id"], ) ]