# Copyright © The Debusine Developers
# See the AUTHORS file at the top-level directory of this distribution
#
# This file is part of Debusine. It is subject to the license terms
# in the LICENSE file found in the top-level directory of this
# distribution. No part of Debusine, including this file, may be copied,
# modified, propagated, or distributed except according to the terms
# contained in the LICENSE file.
"""Data models for db users and authentication."""
import abc
import hashlib
import re
import secrets
from datetime import datetime, timedelta
from enum import StrEnum
from typing import (
Annotated,
Any,
Generic,
Literal,
Optional,
Self,
TYPE_CHECKING,
TypeAlias,
TypeVar,
Union,
assert_never,
cast,
)
from django.conf import settings
from django.contrib.auth.models import AbstractUser
from django.contrib.auth.models import UserManager as DjangoUserManager
from django.core.exceptions import ValidationError
from django.core.validators import MaxLengthValidator, MinLengthValidator
from django.db import models
from django.db.models import CheckConstraint, Q, QuerySet, UniqueConstraint
from django.urls import reverse
from django.utils import timezone
try:
import pydantic.v1 as pydantic
except ImportError:
import pydantic as pydantic # type: ignore
from debusine.db.context import ContextConsistencyError, context
from debusine.db.models import permissions
from debusine.db.models.permissions import (
PartialCheckResult,
PermissionUser,
get_resource_scope,
permission_check,
permission_filter,
resolve_role,
)
from debusine.db.models.scopes import Scope
from debusine.server import notifications
from debusine.utils.typing_utils import copy_signature_from
if TYPE_CHECKING:
from django_stubs_ext.db.models import TypedModelMeta
# https://github.com/typeddjango/django-stubs/issues/2112
AbstractUserMeta = TypedModelMeta
else:
TypedModelMeta = object
AbstractUserMeta = AbstractUser.Meta
A = TypeVar("A")
class TokenManager(models.Manager["Token"]):
"""Manager for Token model."""
def create_worker_activation(self) -> "Token":
"""Create a token suitable for use in worker activation."""
# These are currently hardcoded to expire in 15 minutes, which
# should be enough for most providers to be able to launch an
# instance.
return self.create(
expire_at=timezone.now() + timedelta(minutes=15), enabled=True
)
def get_tokens(
self, username: str | None = None, key: str | None = None
) -> QuerySet["Token"]:
"""
Return all the tokens filtered by a specific owner and/or token.
To avoid filtering by owner or token set them to None
"""
tokens = self.get_queryset()
if username:
tokens = tokens.filter(user__username=username)
if key:
token_hash = hashlib.sha256(key.encode()).hexdigest()
tokens = tokens.filter(hash=token_hash)
return tokens
def get_token_or_none(self, token_key: str) -> Optional["Token"]:
"""Return the token with token_key or None."""
assert isinstance(token_key, str)
token_hash = hashlib.sha256(token_key.encode()).hexdigest()
try:
return (
self.select_related("worker", "activating_worker")
.exclude(expire_at__lt=timezone.now())
.get(hash=token_hash)
)
except Token.DoesNotExist:
return None
def expired(self, at: datetime) -> QuerySet["Token"]:
"""
Return queryset with tokens that have expired.
:param at: datetime to check if the tokens are expired.
:return: tokens whose ``expire_at`` is before the given datetime.
"""
return self.get_queryset().filter(expire_at__lt=at)
[docs]class Token(models.Model):
"""
Database model of a token.
A token contains a key and other related data. It's used as a shared
key between debusine server and clients (workers).
This token model is very similar to rest_framework.authtoken.models.Token.
The bigger difference is that debusine's token's owner is a CharField,
the rest_framework owner is a OneToOne foreign key to a user.
Database-wise we don't store the token itself, but a hash of the token.
:py:func:`TokenManager.get_token_or_none` can be used to check a provided
token key against the database.
"""
hash = models.CharField(
max_length=64,
unique=True,
verbose_name='Hexadecimal hash, length is 64 chars',
validators=[MaxLengthValidator(64), MinLengthValidator(64)],
)
user = models.ForeignKey(
settings.AUTH_USER_MODEL,
null=True,
blank=True,
on_delete=models.PROTECT,
)
comment = models.CharField(
max_length=100,
default='',
verbose_name='Reason that this token was created',
blank=True,
)
created_at = models.DateTimeField(auto_now_add=True)
expire_at = models.DateTimeField(null=True, blank=True)
enabled = models.BooleanField(default=False)
last_seen_at = models.DateTimeField(
blank=True,
null=True,
help_text="Last time that the token was used",
)
key: str
[docs] def save(self, *args: Any, **kwargs: Any) -> None:
"""Save the token. If it's a new token it generates a key."""
if not self.hash:
self.key = self._generate_key()
self.hash = self._generate_hash(self.key)
super().save(*args, **kwargs)
[docs] def enable(self) -> None:
"""Enable the token and save it."""
self.enabled = True
self.save()
[docs] def disable(self) -> None:
"""Disable the token and save it."""
self.enabled = False
self.save()
notifications.notify_worker_token_disabled(self)
def __str__(self) -> str:
"""Return the hash of the Token."""
return self.hash
@classmethod
def _generate_key(cls) -> str:
"""Create and return a key."""
return secrets.token_hex(32)
@classmethod
def _generate_hash(cls, secret: str) -> str:
"""Hash the given secret."""
return hashlib.sha256(secret.encode()).hexdigest()
objects = TokenManager()
SYSTEM_USER_NAME = "_system"
[docs]def system_user() -> "User":
"""Return the `_system` user."""
return User.objects.get(username=SYSTEM_USER_NAME)
class UserQuerySet(QuerySet["User", A], Generic[A]):
"""Custom QuerySet for User."""
@permission_filter
def can_display(
self, user: PermissionUser # noqa: U100
) -> "UserQuerySet[A]":
"""Keep only Users that can be displayed."""
assert user is not None # Enforced by decorator
if not user.is_authenticated:
return self.none()
return self
@permission_filter
def can_manage(self, user: PermissionUser) -> "UserQuerySet[A]":
"""Keep only Users that the given user can manage."""
assert user is not None # Enforced by decorator
if not user.is_authenticated:
return self.none()
return self.filter(pk=user.pk)
class UserManager(DjangoUserManager["User"]):
"""Manager for User model."""
# We cannot use from_queryset or we hit this problem:
# https://stackoverflow.com/questions/68367703/adding-custom-queryset-to-usermodel-causes-makemigrations-exception # noqa: E501
# Therefore we need to proxy all permission filters here
def can_display(
self, user: PermissionUser # noqa: U100
) -> "UserQuerySet[A]":
"""Keep only Users that can be displayed."""
return self.get_queryset().can_display(user)
def can_manage(self, user: PermissionUser) -> "UserQuerySet[A]":
"""Keep only Users that the given user can manage."""
return self.get_queryset().can_manage(user)
def get_queryset(self) -> UserQuerySet[Any]:
"""Use the custom QuerySet."""
return UserQuerySet(self.model, using=self._db)
[docs]class User(AbstractUser):
"""Debusine user."""
is_system = models.BooleanField(default=False)
# mypy seems to struggle changing the manager in a subclass
objects = UserManager() # type: ignore[misc]
class Meta(AbstractUserMeta):
constraints = [
# System users must not have an email address.
CheckConstraint(
check=Q(is_system=False) | Q(email=""),
name="%(app_label)s_%(class)s_non_system_email",
),
# Email addresses of non-system users must be unique.
UniqueConstraint(
fields=["email"],
condition=Q(is_system=False),
name="%(app_label)s_%(class)s_unique_email",
),
]
[docs] @permission_check("{user} cannot display user {resource}")
def can_display(self, user: PermissionUser) -> bool: # noqa: U100
"""Check if the user can be displayed."""
assert user is not None # enforced by decorator
# Disallow unauthenticated people to enumerate users in the system.
#
# In general, we do not assume that user information is automatically
# public.
#
# This could help make life a bit harder, for example, for drive-by
# spammers trying to enumerate users and their personal information.
return user.is_authenticated
[docs] @permission_check("{user} cannot manage user {resource}")
def can_manage(self, user: PermissionUser) -> bool:
"""Check if the user can manage this user."""
assert user is not None # enforced by decorator
# Token is not taken into account here
if not user.is_authenticated:
return False
return self.pk == user.pk
[docs] def get_absolute_url(self) -> str:
"""Return an absolute URL to this user."""
return reverse("user:detail", kwargs={"username": self.username})
[docs]class Identity(models.Model):
"""
Identity for a user in a remote user database.
An Identity is bound if it's associated with a Django user, or unbound if
no Django user is known for it.
"""
class Meta(TypedModelMeta):
constraints = [
UniqueConstraint(
fields=["issuer", "subject"],
name="%(app_label)s_%(class)s_unique_issuer_subject",
),
]
user = models.ForeignKey(
settings.AUTH_USER_MODEL,
related_name="identities",
null=True,
on_delete=models.SET_NULL,
)
issuer = models.CharField(
max_length=512,
help_text="identifier of auhoritative system for this identity",
)
subject = models.CharField(
max_length=512,
help_text="identifier of the user in the issuer system",
)
last_used = models.DateTimeField(
auto_now=True, help_text="last time this identity has been used"
)
claims = models.JSONField(default=dict)
def __str__(self) -> str:
"""Return str for the object."""
return f"{self.issuer}:{self.subject}"
#: Regexp matching the structure of group names
group_name_regex = re.compile(r"^[A-Za-z][A-Za-z0-9+._-]*$")
def is_valid_group_name(value: str) -> bool:
"""Check if value is a valid group name."""
return bool(group_name_regex.match(value))
def validate_group_name(value: str) -> None:
"""Validate group names."""
if not is_valid_group_name(value):
raise ValidationError(
"%(value)r is not a valid group name", params={"value": value}
)
class GroupQuerySet(QuerySet["Group", A], Generic[A]):
"""Custom QuerySet for Group."""
@permission_filter
def can_display(self, user: PermissionUser) -> "GroupQuerySet[A]":
"""Keep only groups that can be displayed."""
assert user is not None # Enforced by decorator
# Workers cannot display group details
if context.worker_token:
return self.none()
# Anonymous users cannot display group details
if not user.is_authenticated:
return self.none()
# Otherwise all groups can be displayed
return self.all()
@permission_filter
def can_display_audit_log(self, user: PermissionUser) -> "GroupQuerySet[A]":
"""Keep only groups whose audit log can be displayed."""
return self.can_display(user)
@permission_filter
def can_manage(self, user: PermissionUser) -> "GroupQuerySet[A]":
"""Keep only groups that can be managed."""
assert user is not None # Enforced by decorator
# Workers cannot manage groups
if context.worker_token:
return self.none()
# Anonymous users cannot manage groups
if not user.is_authenticated:
return self.none()
# Allow to scope owners
constraints = Q(
scope__roles__group__users=user,
scope__roles__role=Scope.Roles.OWNER,
)
# Allow to group admins
constraints |= Q(
membership__user=user, membership__role=Group.Roles.ADMIN
)
return self.filter(constraints).distinct()
class GroupManager(models.Manager["Group"]):
"""Manager for Group model."""
def get_queryset(self) -> GroupQuerySet[Any]:
"""Use the custom QuerySet."""
return GroupQuerySet(self.model, using=self._db)
def from_scoped_name(self, scope_group: str) -> "Group":
"""Lookup a group from a scopename/groupname string."""
scope_name, group_name = scope_group.split("/", 1)
return Group.objects.get(scope__name=scope_name, name=group_name)
def create_ephemeral(self, scope: Scope, name: str, owner: User) -> "Group":
"""Create an ephemeral group for the given user."""
group = self.create(scope=scope, name=name, ephemeral=True)
# Sets the user as an admin of the newly created group
group.add_user(owner, Group.Roles.ADMIN)
return group
def unused_ephemeral(self) -> QuerySet["Group"]:
"""List ephemeral groups with no roles assigned."""
constraints = Q()
# Introspect reverse foreign keys to detect which resources have a
# *Roles table
for field in Group._meta.get_fields():
if (
field.auto_created
and not field.concrete
and field.name.endswith("_roles")
):
# If this resource is currently assigning roles, then the group
# is still in use
constraints &= Q(**{f"{field.name}__isnull": True})
return Group.objects.filter(ephemeral=True).filter(constraints)
class GroupRoles(permissions.Roles):
"""Available roles for a Scope."""
ADMIN = "admin", "Admin"
MEMBER = "member", "Member"
[docs]class Group(models.Model):
"""
Connect users to roles.
Group names are scoped: groups with the same name in different scopes are
entirely distinct and unrelated.
"""
Roles: TypeAlias = GroupRoles
name = models.CharField(max_length=255, validators=[validate_group_name])
scope = models.ForeignKey(Scope, null=False, on_delete=models.PROTECT)
users = models.ManyToManyField(
User,
blank=True,
through="db.GroupMembership",
related_name="debusine_groups",
)
ephemeral = models.BooleanField(
default=False, help_text="remove the group if it has no roles assigned"
)
objects = GroupManager.from_queryset(GroupQuerySet)()
class Meta(TypedModelMeta):
constraints = [
UniqueConstraint(
fields=["name", "scope"],
name="%(app_label)s_%(class)s_unique_name_scope",
),
]
def __str__(self) -> str:
"""Return the scoped group name."""
return f"{self.scope.name}/{self.name}"
[docs] @copy_signature_from(models.Model.save)
def save(self, *args: Any, **kwargs: Any) -> None:
"""Update audit log on group create/update."""
old: Optional["Group"] = None
if self.pk:
old = Group.objects.get(pk=self.pk)
super().save(*args, **kwargs)
if old is None:
self.add_audit_log(GroupAuditLogCreated.create(self))
else:
self.add_audit_log(GroupAuditLogUpdated.create_diff(old, self))
[docs] def get_absolute_url(self) -> str:
"""Return an absolute URL to this user."""
# Prevent circular import
from debusine.server.scopes import urlconf_scope
with urlconf_scope(self.scope.name):
return reverse(
"groups:detail",
kwargs={"group": self.name},
)
[docs] @permission_check("{user} cannot display {resource}")
def can_display(self, user: PermissionUser) -> bool:
"""Check if the group can be displayed."""
assert user is not None # enforced by decorator
# Workers cannot display group details
if context.worker_token:
return False
# Anonymous users cannot display group details
if not user.is_authenticated:
return False
# Otherwise any logged in user can display group details
return True
[docs] @permission_check("{user} cannot display the audit log of {resource}")
def can_display_audit_log(self, user: PermissionUser) -> bool:
"""Check if the group audit log can be displayed."""
return self.can_display(user)
[docs] @permission_check("{user} cannot manage {resource}")
def can_manage(self, user: PermissionUser) -> bool:
"""Check if the group can be managed."""
assert user is not None # enforced by decorator
# Workers cannot manage groups
if context.worker_token:
return False
# Anonymous users cannot manage groups
if not user.is_authenticated:
return False
# Shortcut to avoid hitting the database for common cases
match self.scope.context_has_role(user, Scope.Roles.OWNER):
case PartialCheckResult.ALLOW:
return True
case PartialCheckResult.DENY | PartialCheckResult.PASS:
pass
case _ as unreachable:
assert_never(unreachable)
return Group.objects.can_manage(user).filter(pk=self.pk).exists()
[docs] def assign_role(
self, resource: models.Model, role: permissions.Roles | str
) -> models.Model:
"""Assign a role on a resource."""
if get_resource_scope(resource) != self.scope:
raise ValueError(
f"{resource.__class__.__name__} {str(resource)!r}"
f" is not in scope {self.scope}"
)
role = resolve_role(getattr(resource, "Roles"), role)
roles_model_cls: models.Model = getattr(
resource.__class__, "objects"
).get_roles_model()
assignment, _ = getattr(roles_model_cls, "objects").get_or_create(
resource=resource, group=self, role=role
)
return cast(models.Model, assignment)
[docs] def get_user_role(self, user: User) -> GroupRoles:
"""Get the role of an existing group member."""
try:
membership = GroupMembership.objects.get(group=self, user=user)
except GroupMembership.DoesNotExist:
raise ValueError(f"User {user} is not a member of group {self}")
return cast(GroupRoles, membership.role)
[docs] def set_user_role(self, user: User, role: Roles) -> "GroupMembership":
"""Set the role of an existing group member."""
try:
membership = GroupMembership.objects.get(group=self, user=user)
except GroupMembership.DoesNotExist:
raise ValueError(f"User {user} is not a member of group {self}")
membership.role = role
membership.save()
self.add_audit_log(
GroupAuditLogMemberRoleChanged(user=user.username, role=role)
)
return membership
[docs] def add_user(
self, user: User, role: Roles = Roles.MEMBER
) -> "GroupMembership":
"""Add a user to the group."""
membership, created = GroupMembership.objects.get_or_create(
group=self, user=user, defaults={"role": role}
)
if not created:
raise ValueError(f"{user} is already a member of group {self}")
self.add_audit_log(
GroupAuditLogMemberAdded(user=user.username, role=role)
)
return membership
[docs] def remove_user(self, user: User) -> None:
"""Remove an existing member from the group."""
try:
membership = GroupMembership.objects.get(group=self, user=user)
except GroupMembership.DoesNotExist:
raise ValueError(f"User {user} is not a member of group {self}")
membership.delete()
self.add_audit_log(GroupAuditLogMemberRemoved(user=user.username))
[docs] def add_audit_log(
self, changes: "GroupAuditLogEntry"
) -> Optional["GroupAuditLog"]:
"""Add an audit log entry."""
if context.user is None or not context.user.is_authenticated:
if context.permission_checks_disabled:
return None
raise ContextConsistencyError("add_audit_log requires a valid user")
changes.actor = context.user.username
return GroupAuditLog.objects.create_entry(
group=self, actor=context.user, changes=changes
)
class GroupMembership(models.Model):
"""Membership of a user in a group."""
Roles: TypeAlias = GroupRoles
group = models.ForeignKey(
Group, on_delete=models.CASCADE, related_name="membership"
)
user = models.ForeignKey(
User, on_delete=models.CASCADE, related_name="group_memberships"
)
role = models.CharField(
max_length=16, choices=Roles.choices, default=Roles.MEMBER
)
def __str__(self) -> str:
"""Return a string representation."""
return f"{self.group}:{self.user}"
class Meta(TypedModelMeta):
constraints = [
UniqueConstraint(
fields=["group", "user"],
name="%(app_label)s_%(class)s_unique_group_user",
),
]
class GroupAuditLogEntryTypes(StrEnum):
"""Possible values for GroupAuditLogEntry types."""
CREATED = "create"
UPDATED = "update"
ADD_USER = "add_user"
REMOVE_USER = "del_user"
SET_USER_ROLE = "set_role"
class GroupAuditLogEntryBase(pydantic.BaseModel, abc.ABC):
"""Base class for all audit log payloads."""
class Config:
"""Set up stricter pydantic Config."""
validate_assignment = True
extra = pydantic.Extra.forbid
#: Username of the user performing the action
actor: str = ""
@abc.abstractmethod
def description(self) -> str:
"""Format a description for this change."""
class GroupAuditLogCreated(GroupAuditLogEntryBase):
"""Group has been created."""
type: Literal[GroupAuditLogEntryTypes.CREATED] = (
GroupAuditLogEntryTypes.CREATED
)
name: str
scope: str
ephemeral: bool
@classmethod
def create(cls, group: Group) -> Self:
"""Create an entry populated with group's fields."""
return cls(
name=group.name,
scope=str(group.scope),
ephemeral=group.ephemeral,
)
def description(self) -> str:
"""Format a description for this change."""
unknown = "<unknown>"
group_desc = (
f"group {self.name or unknown} in scope {self.scope or unknown}"
)
if self.ephemeral:
return f"{self.actor} created ephemeral {group_desc}"
else:
return f"{self.actor} created {group_desc}"
class GroupAuditLogUpdated(GroupAuditLogEntryBase):
"""Group has been updated."""
type: Literal[GroupAuditLogEntryTypes.UPDATED] = (
GroupAuditLogEntryTypes.UPDATED
)
name: tuple[str, str] | None = None
scope: tuple[str, str] | None = None
ephemeral: tuple[bool, bool] | None = None
@classmethod
def create_diff(cls, old: Group, new: Group) -> Self:
"""
Compute the changes between two Group objects.
:returns: a dict mapping field names to tuples of ``(old, new)`` values
"""
name: tuple[str, str] | None = None
scope: tuple[str, str] | None = None
ephemeral: tuple[bool, bool] | None = None
if old.name != new.name:
name = (old.name, new.name)
if old.scope != new.scope:
scope = (str(old.scope), str(new.scope))
if old.ephemeral != new.ephemeral:
ephemeral = (old.ephemeral, new.ephemeral)
return cls(name=name, scope=scope, ephemeral=ephemeral)
def description(self) -> str:
"""Format a description for this change."""
changes: list[str] = []
for field in "name", "scope", "ephemeral":
change = getattr(self, field)
if change is None:
continue
changes.append(f"{field}: {change[0]!r}→{change[1]!r}")
if changes:
return f"{self.actor} updated {'; '.join(changes)}"
else:
return f"{self.actor} updated group, no changes detected"
class GroupAuditLogMembershipBase(GroupAuditLogEntryBase):
"""Base for membership changes descriptions."""
#: Username of the member affected
user: str
class GroupAuditLogMemberAdded(GroupAuditLogMembershipBase):
"""Member added to the gruoup."""
type: Literal[GroupAuditLogEntryTypes.ADD_USER] = (
GroupAuditLogEntryTypes.ADD_USER
)
role: GroupRoles
def description(self) -> str:
"""Format a description for this change."""
return f"{self.actor} added {self.user} as {self.role}"
class GroupAuditLogMemberRoleChanged(GroupAuditLogMembershipBase):
"""Member role updated."""
type: Literal[GroupAuditLogEntryTypes.SET_USER_ROLE] = (
GroupAuditLogEntryTypes.SET_USER_ROLE
)
role: GroupRoles
def description(self) -> str:
"""Format a description for this change."""
return f"{self.actor} set {self.user}'s role as {self.role}"
class GroupAuditLogMemberRemoved(GroupAuditLogMembershipBase):
"""Member removed from the gruoup."""
type: Literal[GroupAuditLogEntryTypes.REMOVE_USER] = (
GroupAuditLogEntryTypes.REMOVE_USER
)
def description(self) -> str:
"""Format a description for this change."""
return f"{self.actor} removed {self.user}"
GroupAuditLogEntry: TypeAlias = Annotated[
Union[
GroupAuditLogCreated,
GroupAuditLogUpdated,
GroupAuditLogMemberAdded,
GroupAuditLogMemberRoleChanged,
GroupAuditLogMemberRemoved,
],
pydantic.Field(discriminator="type"),
]
class GroupAuditLogManager(models.Manager["GroupAuditLog"]):
"""Manager for :py:class:`GroupAuditLog`."""
def create_entry(
self,
group: Group,
actor: User,
changes: GroupAuditLogEntry,
) -> "GroupAuditLog":
"""Create GroupAuditLog entry."""
return self.create(group=group, actor=actor, changes=changes.dict())
class GroupAuditLog(models.Model):
"""Audit log entry for group changes."""
created_at = models.DateTimeField(auto_now_add=True)
group = models.ForeignKey(
Group, on_delete=models.CASCADE, related_name="audit_log"
)
actor = models.ForeignKey(User, on_delete=models.SET_NULL, null=True)
changes = models.JSONField()
objects = GroupAuditLogManager()
@property
def changes_model(self) -> GroupAuditLogEntry:
"""Return the pydantic model for changes."""
# mypy complains that GroupAuditLogEntry is a "<typing special form>",
# but the code works. I'm adding a mypy override for now, and hopefully
# this can get cleaned in pydantic2.
# See https://salsa.debian.org/freexian-team/debusine/-/merge_requests/1701#note_597300 # noqa: E501
return pydantic.parse_obj_as(
GroupAuditLogEntry, self.changes # type: ignore[arg-type]
)
class ClientEnroll(models.Model):
"""Information from a client wanting to enroll."""
created_at = models.DateTimeField(auto_now_add=True)
nonce = models.CharField(max_length=64, unique=True)
payload = models.JSONField(default=dict)