# Copyright 2019, 2021-2024 The Debusine Developers
# See the AUTHORS file at the top-level directory of this distribution
#
# This file is part of Debusine. It is subject to the license terms
# in the LICENSE file found in the top-level directory of this
# distribution. No part of Debusine, including this file, may be copied,
# modified, propagated, or distributed except according to the terms
# contained in the LICENSE file.
"""Data models for db users and authentication."""
import hashlib
import re
import secrets
from typing import Any, Optional, TYPE_CHECKING, cast
from django.conf import settings
from django.contrib.auth.models import AbstractUser
from django.core.exceptions import ValidationError
from django.core.validators import MaxLengthValidator, MinLengthValidator
from django.db import models
from django.db.models import CheckConstraint, Q, QuerySet, UniqueConstraint
from debusine.db.models.permissions import (
Roles,
get_resource_scope,
resolve_role,
)
from debusine.db.models.scopes import Scope
from debusine.server import notifications
if TYPE_CHECKING:
from django_stubs_ext.db.models import TypedModelMeta
# https://github.com/typeddjango/django-stubs/issues/2112
AbstractUserMeta = TypedModelMeta
else:
TypedModelMeta = object
AbstractUserMeta = AbstractUser.Meta
class TokenManager(models.Manager["Token"]):
"""Manager for Token model."""
def get_tokens(
self, username: str | None = None, key: str | None = None
) -> QuerySet["Token"]:
"""
Return all the tokens filtered by a specific owner and/or token.
To avoid filtering by owner or token set them to None
"""
tokens = self.get_queryset()
if username:
tokens = tokens.filter(user__username=username)
if key:
token_hash = hashlib.sha256(key.encode()).hexdigest()
tokens = tokens.filter(hash=token_hash)
return tokens
def get_token_or_none(self, token_key: str) -> Optional["Token"]:
"""Return the token with token_key or None."""
assert isinstance(token_key, str)
token_hash = hashlib.sha256(token_key.encode()).hexdigest()
try:
return self.select_related('worker').get(hash=token_hash)
except Token.DoesNotExist:
return None
[docs]class Token(models.Model):
"""
Database model of a token.
A token contains a key and other related data. It's used as a shared
key between debusine server and clients (workers).
This token model is very similar to rest_framework.authtoken.models.Token.
The bigger difference is that debusine's token's owner is a CharField,
the rest_framework owner is a OneToOne foreign key to a user.
Database-wise we don't store the token itself, but a hash of the token.
:py:func:`TokenManager.get_token_or_none` can be used to check a provided
token key against the database.
"""
hash = models.CharField(
max_length=64,
unique=True,
verbose_name='Hexadecimal hash, length is 64 chars',
validators=[MaxLengthValidator(64), MinLengthValidator(64)],
)
user = models.ForeignKey(
settings.AUTH_USER_MODEL,
null=True,
blank=True,
on_delete=models.PROTECT,
)
comment = models.CharField(
max_length=100,
default='',
verbose_name='Reason that this token was created',
blank=True,
)
created_at = models.DateTimeField(auto_now_add=True)
enabled = models.BooleanField(default=False)
last_seen_at = models.DateTimeField(
blank=True,
null=True,
help_text="Last time that the token was used",
)
key: str
[docs] def save(self, *args: Any, **kwargs: Any) -> None:
"""Save the token. If it's a new token it generates a key."""
if not self.hash:
self.key = self._generate_key()
self.hash = self._generate_hash(self.key)
super().save(*args, **kwargs)
[docs] def enable(self) -> None:
"""Enable the token and save it."""
self.enabled = True
self.save()
[docs] def disable(self) -> None:
"""Disable the token and save it."""
self.enabled = False
self.save()
notifications.notify_worker_token_disabled(self)
def __str__(self) -> str:
"""Return the hash of the Token."""
return self.hash
@classmethod
def _generate_key(cls) -> str:
"""Create and return a key."""
return secrets.token_hex(32)
@classmethod
def _generate_hash(cls, secret: str) -> str:
"""Hash the given secret."""
return hashlib.sha256(secret.encode()).hexdigest()
objects = TokenManager()
SYSTEM_USER_NAME = "_system"
[docs]def system_user() -> "User":
"""Return the `_system` user."""
return User.objects.get(username=SYSTEM_USER_NAME)
[docs]class User(AbstractUser):
"""Debusine user."""
is_system = models.BooleanField(default=False)
class Meta(AbstractUserMeta):
constraints = [
# System users must not have an email address.
CheckConstraint(
check=Q(is_system=False) | Q(email=""),
name="%(app_label)s_%(class)s_non_system_email",
),
# Email addresses of non-system users must be unique.
UniqueConstraint(
fields=["email"],
condition=Q(is_system=False),
name="%(app_label)s_%(class)s_unique_email",
),
]
[docs]class Identity(models.Model):
"""
Identity for a user in a remote user database.
An Identity is bound if it's associated with a Django user, or unbound if
no Django user is known for it.
"""
class Meta(TypedModelMeta):
constraints = [
UniqueConstraint(
fields=["issuer", "subject"],
name="%(app_label)s_%(class)s_unique_issuer_subject",
),
]
user = models.ForeignKey(
settings.AUTH_USER_MODEL,
related_name="identities",
null=True,
on_delete=models.SET_NULL,
)
issuer = models.CharField(
max_length=512,
help_text="identifier of auhoritative system for this identity",
)
subject = models.CharField(
max_length=512,
help_text="identifier of the user in the issuer system",
)
last_used = models.DateTimeField(
auto_now=True, help_text="last time this identity has been used"
)
claims = models.JSONField(default=dict)
def __str__(self) -> str:
"""Return str for the object."""
return f"{self.issuer}:{self.subject}"
#: Regexp matching the structure of group names
group_name_regex = re.compile(r"^[A-Za-z][A-Za-z0-9+._-]*$")
def is_valid_group_name(value: str) -> bool:
"""Check if value is a valid group name."""
return bool(group_name_regex.match(value))
def validate_group_name(value: str) -> None:
"""Validate group names."""
if not is_valid_group_name(value):
raise ValidationError(
"%(value)r is not a valid group name", params={"value": value}
)
class GroupManager(models.Manager["Group"]):
"""Manager for Group model."""
def from_scoped_name(self, scope_group: str) -> "Group":
"""Lookup a group from a scopename/groupname string."""
scope_name, group_name = scope_group.split("/", 1)
return Group.objects.get(scope__name=scope_name, name=group_name)
[docs]class Group(models.Model):
"""
Connect users to roles.
Group names are scoped: groups with the same name in different scopes are
entirely distinct and unrelated.
"""
name = models.CharField(max_length=255, validators=[validate_group_name])
scope = models.ForeignKey(Scope, null=False, on_delete=models.PROTECT)
users = models.ManyToManyField(
User,
blank=True,
related_name="debusine_groups",
)
objects = GroupManager()
class Meta(TypedModelMeta):
constraints = [
UniqueConstraint(
fields=["name", "scope"],
name="%(app_label)s_%(class)s_unique_name_scope",
),
]
def __str__(self) -> str:
"""Return the scoped group name."""
return f"{self.scope.name}/{self.name}"
[docs] def assign_role(
self, resource: models.Model, role: Roles | str
) -> models.Model:
"""Assign a role on a resource."""
if get_resource_scope(resource) != self.scope:
raise ValueError(
f"{resource.__class__.__name__} {str(resource)!r}"
f" is not in scope {self.scope}"
)
role = resolve_role(getattr(resource, "Roles"), role)
roles_model_cls: models.Model = getattr(
resource.__class__, "objects"
).get_roles_model()
assignment, _ = getattr(roles_model_cls, "objects").get_or_create(
resource=resource, group=self, role=role
)
return cast(models.Model, assignment)