Source code for debusine.db.models.auth

# Copyright 2019, 2021-2024 The Debusine Developers
# See the AUTHORS file at the top-level directory of this distribution
#
# This file is part of Debusine. It is subject to the license terms
# in the LICENSE file found in the top-level directory of this
# distribution. No part of Debusine, including this file, may be copied,
# modified, propagated, or distributed except according to the terms
# contained in the LICENSE file.

"""Data models for db users and authentication."""

import hashlib
import re
import secrets
from typing import Any, Optional, TYPE_CHECKING, cast

from django.conf import settings
from django.contrib.auth.models import AbstractUser
from django.core.exceptions import ValidationError
from django.core.validators import MaxLengthValidator, MinLengthValidator
from django.db import models
from django.db.models import CheckConstraint, Q, QuerySet, UniqueConstraint

from debusine.db.models.permissions import (
    Roles,
    get_resource_scope,
    resolve_role,
)
from debusine.db.models.scopes import Scope
from debusine.server import notifications

if TYPE_CHECKING:
    from django_stubs_ext.db.models import TypedModelMeta

    # https://github.com/typeddjango/django-stubs/issues/2112
    AbstractUserMeta = TypedModelMeta
else:
    TypedModelMeta = object
    AbstractUserMeta = AbstractUser.Meta


class TokenManager(models.Manager["Token"]):
    """Manager for Token model."""

    def get_tokens(
        self, username: str | None = None, key: str | None = None
    ) -> QuerySet["Token"]:
        """
        Return all the tokens filtered by a specific owner and/or token.

        To avoid filtering by owner or token set them to None
        """
        tokens = self.get_queryset()

        if username:
            tokens = tokens.filter(user__username=username)

        if key:
            token_hash = hashlib.sha256(key.encode()).hexdigest()
            tokens = tokens.filter(hash=token_hash)

        return tokens

    def get_token_or_none(self, token_key: str) -> Optional["Token"]:
        """Return the token with token_key or None."""
        assert isinstance(token_key, str)

        token_hash = hashlib.sha256(token_key.encode()).hexdigest()

        try:
            return self.select_related('worker').get(hash=token_hash)
        except Token.DoesNotExist:
            return None


[docs]class Token(models.Model): """ Database model of a token. A token contains a key and other related data. It's used as a shared key between debusine server and clients (workers). This token model is very similar to rest_framework.authtoken.models.Token. The bigger difference is that debusine's token's owner is a CharField, the rest_framework owner is a OneToOne foreign key to a user. Database-wise we don't store the token itself, but a hash of the token. :py:func:`TokenManager.get_token_or_none` can be used to check a provided token key against the database. """ hash = models.CharField( max_length=64, unique=True, verbose_name='Hexadecimal hash, length is 64 chars', validators=[MaxLengthValidator(64), MinLengthValidator(64)], ) user = models.ForeignKey( settings.AUTH_USER_MODEL, null=True, blank=True, on_delete=models.PROTECT, ) comment = models.CharField( max_length=100, default='', verbose_name='Reason that this token was created', blank=True, ) created_at = models.DateTimeField(auto_now_add=True) enabled = models.BooleanField(default=False) last_seen_at = models.DateTimeField( blank=True, null=True, help_text="Last time that the token was used", ) key: str
[docs] def save(self, *args: Any, **kwargs: Any) -> None: """Save the token. If it's a new token it generates a key.""" if not self.hash: self.key = self._generate_key() self.hash = self._generate_hash(self.key) super().save(*args, **kwargs)
[docs] def enable(self) -> None: """Enable the token and save it.""" self.enabled = True self.save()
[docs] def disable(self) -> None: """Disable the token and save it.""" self.enabled = False self.save() notifications.notify_worker_token_disabled(self)
def __str__(self) -> str: """Return the hash of the Token.""" return self.hash @classmethod def _generate_key(cls) -> str: """Create and return a key.""" return secrets.token_hex(32) @classmethod def _generate_hash(cls, secret: str) -> str: """Hash the given secret.""" return hashlib.sha256(secret.encode()).hexdigest() objects = TokenManager()
SYSTEM_USER_NAME = "_system"
[docs]def system_user() -> "User": """Return the `_system` user.""" return User.objects.get(username=SYSTEM_USER_NAME)
[docs]class User(AbstractUser): """Debusine user.""" is_system = models.BooleanField(default=False) class Meta(AbstractUserMeta): constraints = [ # System users must not have an email address. CheckConstraint( check=Q(is_system=False) | Q(email=""), name="%(app_label)s_%(class)s_non_system_email", ), # Email addresses of non-system users must be unique. UniqueConstraint( fields=["email"], condition=Q(is_system=False), name="%(app_label)s_%(class)s_unique_email", ), ]
[docs]class Identity(models.Model): """ Identity for a user in a remote user database. An Identity is bound if it's associated with a Django user, or unbound if no Django user is known for it. """ class Meta(TypedModelMeta): constraints = [ UniqueConstraint( fields=["issuer", "subject"], name="%(app_label)s_%(class)s_unique_issuer_subject", ), ] user = models.ForeignKey( settings.AUTH_USER_MODEL, related_name="identities", null=True, on_delete=models.SET_NULL, ) issuer = models.CharField( max_length=512, help_text="identifier of auhoritative system for this identity", ) subject = models.CharField( max_length=512, help_text="identifier of the user in the issuer system", ) last_used = models.DateTimeField( auto_now=True, help_text="last time this identity has been used" ) claims = models.JSONField(default=dict) def __str__(self) -> str: """Return str for the object.""" return f"{self.issuer}:{self.subject}"
#: Regexp matching the structure of group names group_name_regex = re.compile(r"^[A-Za-z][A-Za-z0-9+._-]*$") def is_valid_group_name(value: str) -> bool: """Check if value is a valid group name.""" return bool(group_name_regex.match(value)) def validate_group_name(value: str) -> None: """Validate group names.""" if not is_valid_group_name(value): raise ValidationError( "%(value)r is not a valid group name", params={"value": value} ) class GroupManager(models.Manager["Group"]): """Manager for Group model.""" def from_scoped_name(self, scope_group: str) -> "Group": """Lookup a group from a scopename/groupname string.""" scope_name, group_name = scope_group.split("/", 1) return Group.objects.get(scope__name=scope_name, name=group_name)
[docs]class Group(models.Model): """ Connect users to roles. Group names are scoped: groups with the same name in different scopes are entirely distinct and unrelated. """ name = models.CharField(max_length=255, validators=[validate_group_name]) scope = models.ForeignKey(Scope, null=False, on_delete=models.PROTECT) users = models.ManyToManyField( User, blank=True, related_name="debusine_groups", ) objects = GroupManager() class Meta(TypedModelMeta): constraints = [ UniqueConstraint( fields=["name", "scope"], name="%(app_label)s_%(class)s_unique_name_scope", ), ] def __str__(self) -> str: """Return the scoped group name.""" return f"{self.scope.name}/{self.name}"
[docs] def assign_role( self, resource: models.Model, role: Roles | str ) -> models.Model: """Assign a role on a resource.""" if get_resource_scope(resource) != self.scope: raise ValueError( f"{resource.__class__.__name__} {str(resource)!r}" f" is not in scope {self.scope}" ) role = resolve_role(getattr(resource, "Roles"), role) roles_model_cls: models.Model = getattr( resource.__class__, "objects" ).get_roles_model() assignment, _ = getattr(roles_model_cls, "objects").get_or_create( resource=resource, group=self, role=role ) return cast(models.Model, assignment)