Source code for debusine.db.models.auth

# Copyright © The Debusine Developers
# See the AUTHORS file at the top-level directory of this distribution
#
# This file is part of Debusine. It is subject to the license terms
# in the LICENSE file found in the top-level directory of this
# distribution. No part of Debusine, including this file, may be copied,
# modified, propagated, or distributed except according to the terms
# contained in the LICENSE file.

"""Data models for db users and authentication."""

import hashlib
import re
import secrets
from typing import Any, Generic, Optional, TYPE_CHECKING, TypeVar, cast

from django.conf import settings
from django.contrib.auth.models import AbstractUser
from django.contrib.auth.models import UserManager as DjangoUserManager
from django.core.exceptions import ValidationError
from django.core.validators import MaxLengthValidator, MinLengthValidator
from django.db import models
from django.db.models import CheckConstraint, Q, QuerySet, UniqueConstraint
from django.urls import reverse

from debusine.db.models.permissions import (
    PermissionUser,
    Roles,
    get_resource_scope,
    permission_check,
    permission_filter,
    resolve_role,
)
from debusine.db.models.scopes import Scope
from debusine.server import notifications

if TYPE_CHECKING:
    from django_stubs_ext.db.models import TypedModelMeta

    # https://github.com/typeddjango/django-stubs/issues/2112
    AbstractUserMeta = TypedModelMeta
else:
    TypedModelMeta = object
    AbstractUserMeta = AbstractUser.Meta

A = TypeVar("A")


class TokenManager(models.Manager["Token"]):
    """Manager for Token model."""

    def get_tokens(
        self, username: str | None = None, key: str | None = None
    ) -> QuerySet["Token"]:
        """
        Return all the tokens filtered by a specific owner and/or token.

        To avoid filtering by owner or token set them to None
        """
        tokens = self.get_queryset()

        if username:
            tokens = tokens.filter(user__username=username)

        if key:
            token_hash = hashlib.sha256(key.encode()).hexdigest()
            tokens = tokens.filter(hash=token_hash)

        return tokens

    def get_token_or_none(self, token_key: str) -> Optional["Token"]:
        """Return the token with token_key or None."""
        assert isinstance(token_key, str)

        token_hash = hashlib.sha256(token_key.encode()).hexdigest()

        try:
            return self.select_related('worker').get(hash=token_hash)
        except Token.DoesNotExist:
            return None


[docs]class Token(models.Model): """ Database model of a token. A token contains a key and other related data. It's used as a shared key between debusine server and clients (workers). This token model is very similar to rest_framework.authtoken.models.Token. The bigger difference is that debusine's token's owner is a CharField, the rest_framework owner is a OneToOne foreign key to a user. Database-wise we don't store the token itself, but a hash of the token. :py:func:`TokenManager.get_token_or_none` can be used to check a provided token key against the database. """ hash = models.CharField( max_length=64, unique=True, verbose_name='Hexadecimal hash, length is 64 chars', validators=[MaxLengthValidator(64), MinLengthValidator(64)], ) user = models.ForeignKey( settings.AUTH_USER_MODEL, null=True, blank=True, on_delete=models.PROTECT, ) comment = models.CharField( max_length=100, default='', verbose_name='Reason that this token was created', blank=True, ) created_at = models.DateTimeField(auto_now_add=True) enabled = models.BooleanField(default=False) last_seen_at = models.DateTimeField( blank=True, null=True, help_text="Last time that the token was used", ) key: str
[docs] def save(self, *args: Any, **kwargs: Any) -> None: """Save the token. If it's a new token it generates a key.""" if not self.hash: self.key = self._generate_key() self.hash = self._generate_hash(self.key) super().save(*args, **kwargs)
[docs] def enable(self) -> None: """Enable the token and save it.""" self.enabled = True self.save()
[docs] def disable(self) -> None: """Disable the token and save it.""" self.enabled = False self.save() notifications.notify_worker_token_disabled(self)
def __str__(self) -> str: """Return the hash of the Token.""" return self.hash @classmethod def _generate_key(cls) -> str: """Create and return a key.""" return secrets.token_hex(32) @classmethod def _generate_hash(cls, secret: str) -> str: """Hash the given secret.""" return hashlib.sha256(secret.encode()).hexdigest() objects = TokenManager()
SYSTEM_USER_NAME = "_system"
[docs]def system_user() -> "User": """Return the `_system` user.""" return User.objects.get(username=SYSTEM_USER_NAME)
class UserQuerySet(QuerySet["User", A], Generic[A]): """Custom QuerySet for User.""" @permission_filter def can_display( self, user: PermissionUser # noqa: U100 ) -> "UserQuerySet[A]": """Keep only Users that can be displayed.""" assert user is not None # Enforced by decorator if not user.is_authenticated: return self.none() return self @permission_filter def can_manage(self, user: PermissionUser) -> "UserQuerySet[A]": """Keep only Users that the given user can manage.""" assert user is not None # Enforced by decorator if not user.is_authenticated: return self.none() return self.filter(pk=user.pk) class UserManager(DjangoUserManager["User"]): """Manager for User model.""" # We cannot use from_queryset or we hit this problem: # https://stackoverflow.com/questions/68367703/adding-custom-queryset-to-usermodel-causes-makemigrations-exception # noqa: E501 # Therefore we need to proxy all permission filters here def can_display( self, user: PermissionUser # noqa: U100 ) -> "UserQuerySet[A]": """Keep only Users that can be displayed.""" return self.get_queryset().can_display(user) def can_manage(self, user: PermissionUser) -> "UserQuerySet[A]": """Keep only Users that the given user can manage.""" return self.get_queryset().can_manage(user) def get_queryset(self) -> UserQuerySet[Any]: """Use the custom QuerySet.""" return UserQuerySet(self.model, using=self._db)
[docs]class User(AbstractUser): """Debusine user.""" is_system = models.BooleanField(default=False) # mypy seems to struggle changing the manager in a subclass objects = UserManager() # type: ignore[misc] class Meta(AbstractUserMeta): constraints = [ # System users must not have an email address. CheckConstraint( check=Q(is_system=False) | Q(email=""), name="%(app_label)s_%(class)s_non_system_email", ), # Email addresses of non-system users must be unique. UniqueConstraint( fields=["email"], condition=Q(is_system=False), name="%(app_label)s_%(class)s_unique_email", ), ]
[docs] @permission_check("{user} cannot display user {resource}") def can_display(self, user: PermissionUser) -> bool: # noqa: U100 """Check if the user can be displayed.""" assert user is not None # enforced by decorator # Disallow unauthenticated people to enumerate users in the system. # # In general, we do not assume that user information is automatically # public. # # This could help make life a bit harder, for example, for drive-by # spammers trying to enumerate users and their personal information. return user.is_authenticated
[docs] @permission_check("{user} cannot manage user {resource}") def can_manage(self, user: PermissionUser) -> bool: """Check if the user can manage this user.""" assert user is not None # enforced by decorator # Token is not taken into account here if not user.is_authenticated: return False return self.pk == user.pk
[docs] def get_absolute_url(self) -> str: """Return an absolute URL to this user.""" return reverse("user:detail", kwargs={"username": self.username})
[docs]class Identity(models.Model): """ Identity for a user in a remote user database. An Identity is bound if it's associated with a Django user, or unbound if no Django user is known for it. """ class Meta(TypedModelMeta): constraints = [ UniqueConstraint( fields=["issuer", "subject"], name="%(app_label)s_%(class)s_unique_issuer_subject", ), ] user = models.ForeignKey( settings.AUTH_USER_MODEL, related_name="identities", null=True, on_delete=models.SET_NULL, ) issuer = models.CharField( max_length=512, help_text="identifier of auhoritative system for this identity", ) subject = models.CharField( max_length=512, help_text="identifier of the user in the issuer system", ) last_used = models.DateTimeField( auto_now=True, help_text="last time this identity has been used" ) claims = models.JSONField(default=dict) def __str__(self) -> str: """Return str for the object.""" return f"{self.issuer}:{self.subject}"
#: Regexp matching the structure of group names group_name_regex = re.compile(r"^[A-Za-z][A-Za-z0-9+._-]*$") def is_valid_group_name(value: str) -> bool: """Check if value is a valid group name.""" return bool(group_name_regex.match(value)) def validate_group_name(value: str) -> None: """Validate group names.""" if not is_valid_group_name(value): raise ValidationError( "%(value)r is not a valid group name", params={"value": value} ) class GroupManager(models.Manager["Group"]): """Manager for Group model.""" def from_scoped_name(self, scope_group: str) -> "Group": """Lookup a group from a scopename/groupname string.""" scope_name, group_name = scope_group.split("/", 1) return Group.objects.get(scope__name=scope_name, name=group_name)
[docs]class Group(models.Model): """ Connect users to roles. Group names are scoped: groups with the same name in different scopes are entirely distinct and unrelated. """ name = models.CharField(max_length=255, validators=[validate_group_name]) scope = models.ForeignKey(Scope, null=False, on_delete=models.PROTECT) users = models.ManyToManyField( User, blank=True, related_name="debusine_groups", ) objects = GroupManager() class Meta(TypedModelMeta): constraints = [ UniqueConstraint( fields=["name", "scope"], name="%(app_label)s_%(class)s_unique_name_scope", ), ] def __str__(self) -> str: """Return the scoped group name.""" return f"{self.scope.name}/{self.name}"
[docs] def assign_role( self, resource: models.Model, role: Roles | str ) -> models.Model: """Assign a role on a resource.""" if get_resource_scope(resource) != self.scope: raise ValueError( f"{resource.__class__.__name__} {str(resource)!r}" f" is not in scope {self.scope}" ) role = resolve_role(getattr(resource, "Roles"), role) roles_model_cls: models.Model = getattr( resource.__class__, "objects" ).get_roles_model() assignment, _ = getattr(roles_model_cls, "objects").get_or_create( resource=resource, group=self, role=role ) return cast(models.Model, assignment)