Source code for debusine.db.models.auth

# Copyright 2019, 2021-2024 The Debusine Developers
# See the AUTHORS file at the top-level directory of this distribution
#
# This file is part of Debusine. It is subject to the license terms
# in the LICENSE file found in the top-level directory of this
# distribution. No part of Debusine, including this file, may be copied,
# modified, propagated, or distributed except according to the terms
# contained in the LICENSE file.

"""Data models for db users and authentication."""

import hashlib
import secrets
from typing import Optional, TYPE_CHECKING

from django.conf import settings
from django.contrib.auth.models import AbstractUser
from django.core.validators import (
    MaxLengthValidator,
    MinLengthValidator,
)
from django.db import models
from django.db.models import (
    CheckConstraint,
    Q,
    QuerySet,
    UniqueConstraint,
)

from debusine.server import notifications

if TYPE_CHECKING:
    from django_stubs_ext.db.models import TypedModelMeta
else:
    TypedModelMeta = object


class TokenManager(models.Manager["Token"]):
    """Manager for Token model."""

    def get_tokens(
        self, username: str | None = None, key: str | None = None
    ) -> QuerySet["Token"]:
        """
        Return all the tokens filtered by a specific owner and/or token.

        To avoid filtering by owner or token set them to None
        """
        tokens = self.get_queryset()

        if username:
            tokens = tokens.filter(user__username=username)

        if key:
            token_hash = hashlib.sha256(key.encode()).hexdigest()
            tokens = tokens.filter(hash=token_hash)

        return tokens

    def get_token_or_none(self, token_key: str) -> Optional["Token"]:
        """Return the token with token_key or None."""
        assert isinstance(token_key, str)

        token_hash = hashlib.sha256(token_key.encode()).hexdigest()

        try:
            return self.select_related('worker').get(hash=token_hash)
        except Token.DoesNotExist:
            return None


[docs]class Token(models.Model): """ Database model of a token. A token contains a key and other related data. It's used as a shared key between debusine server and clients (workers). This token model is very similar to rest_framework.authtoken.models.Token. The bigger difference is that debusine's token's owner is a CharField, the rest_framework owner is a OneToOne foreign key to a user. Database-wise we don't store the token itself, but a hash of the token. :py:func:`TokenManager.get_token_or_none` can be used to check a provided token key against the database. """ hash = models.CharField( max_length=64, unique=True, verbose_name='Hexadecimal hash, length is 64 chars', validators=[MaxLengthValidator(64), MinLengthValidator(64)], ) user = models.ForeignKey( settings.AUTH_USER_MODEL, null=True, blank=True, on_delete=models.PROTECT, ) comment = models.CharField( max_length=100, default='', verbose_name='Reason that this token was created', blank=True, ) created_at = models.DateTimeField(auto_now_add=True) enabled = models.BooleanField(default=False) last_seen_at = models.DateTimeField( blank=True, null=True, help_text="Last time that the token was used", ) key: str
[docs] def save(self, *args, **kwargs) -> None: """Save the token. If it's a new token it generates a key.""" if not self.hash: self.key = self._generate_key() self.hash = self._generate_hash(self.key) super().save(*args, **kwargs)
[docs] def enable(self) -> None: """Enable the token and save it.""" self.enabled = True self.save()
[docs] def disable(self) -> None: """Disable the token and save it.""" self.enabled = False self.save() notifications.notify_worker_token_disabled(self)
def __str__(self) -> str: """Return the hash of the Token.""" return self.hash @classmethod def _generate_key(cls) -> str: """Create and return a key.""" return secrets.token_hex(32) @classmethod def _generate_hash(cls, secret: str) -> str: """Hash the given secret.""" return hashlib.sha256(secret.encode()).hexdigest() objects = TokenManager()
SYSTEM_USER_NAME = "_system"
[docs]def system_user() -> "User": """Return the `_system` user.""" return User.objects.get(username=SYSTEM_USER_NAME)
[docs]class User(AbstractUser): """Debusine user.""" is_system = models.BooleanField(default=False) class Meta(AbstractUser.Meta): constraints = [ # System users must not have an email address. CheckConstraint( check=Q(is_system=False) | Q(email=""), name="%(app_label)s_%(class)s_non_system_email", ), # Email addresses of non-system users must be unique. UniqueConstraint( fields=["email"], condition=Q(is_system=False), name="%(app_label)s_%(class)s_unique_email", ), ]
[docs]class Identity(models.Model): """ Identity for a user in a remote user database. An Identity is bound if it's associated with a Django user, or unbound if no Django user is known for it. """ class Meta(TypedModelMeta): constraints = [ UniqueConstraint( fields=["issuer", "subject"], name="%(app_label)s_%(class)s_unique_issuer_subject", ), ] user = models.ForeignKey( settings.AUTH_USER_MODEL, related_name="identities", null=True, on_delete=models.SET_NULL, ) issuer = models.CharField( max_length=512, help_text="identifier of auhoritative system for this identity", ) subject = models.CharField( max_length=512, help_text="identifier of the user in the issuer system", ) last_used = models.DateTimeField( auto_now=True, help_text="last time this identity has been used" ) claims = models.JSONField(default=dict) def __str__(self): """Return str for the object.""" return f"{self.issuer}:{self.subject}"