2025-08-31 23:05:53 +02:00

526 lines
20 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# -*- coding: utf-8 -*-
"""User management views for Django + DRF (dev-ready, prod-notes in comments)."""
from django.apps import apps
from datetime import timedelta
from django.conf import settings
from django.contrib.auth import get_user_model
from django.contrib.auth.password_validation import validate_password
from django.contrib.contenttypes.models import ContentType
from django.core.exceptions import ValidationError as DjangoValidationError
from django.core.mail import send_mail
from django.utils import timezone
from django.db import transaction
from django.db.models import OuterRef, Subquery, DateTimeField, IntegerField
from rest_framework_simplejwt.token_blacklist.models import OutstandingToken, BlacklistedToken
from rest_framework import generics, status, viewsets
from rest_framework.exceptions import PermissionDenied, ValidationError as DRFValidationError
from rest_framework.permissions import AllowAny, IsAdminUser, IsAuthenticated, IsAuthenticatedOrReadOnly
from rest_framework.response import Response
from rest_framework.views import APIView
from django.shortcuts import get_object_or_404
from .permissions import IsSelfOrAdmin
from rest_framework.parsers import MultiPartParser, FormParser
from rest_framework_simplejwt.views import TokenObtainPairView, TokenRefreshView # ← użyjemy z throttlingiem
from rest_framework.throttling import ScopedRateThrottle
from .models import PasswordResetToken, Favorite
from .serializers import (
UserRegistrationSerializer,
UserSerializer,
ResetPasswordSerializer,
ResetPasswordConfirmSerializer,
AvatarSerializer,
CustomTokenObtainPairSerializer,
EmailChangeConfirmSerializer,
CustomTokenRefreshSerializer,
FavoriteCreateSerializer,
FavoriteDeleteSerializer,
TYPE_REGISTRY,
)
from django.db.models import F
User = get_user_model()
# ✅ DEV + PROD: Rejestracja użytkownika
class RegisterView(generics.CreateAPIView):
permission_classes = (AllowAny,)
serializer_class = UserRegistrationSerializer
def create(self, request, *args, **kwargs):
serializer = self.get_serializer(data=request.data)
serializer.is_valid(raise_exception=True)
self.perform_create(serializer)
return Response(
{"message": "User registered successfully. Please login to continue."},
status=status.HTTP_201_CREATED,
)
# ✅ DEV + PROD: Widok listy użytkowników (tylko admin)
class UserViewSet(viewsets.ModelViewSet):
queryset = User.objects.all()
serializer_class = UserSerializer
permission_classes = [IsAdminUser]
lookup_field = 'username'
def destroy(self, request, *args, **kwargs):
instance = self.get_object()
if instance.role == 'admin' and getattr(request.user, 'role', '') != 'admin':
return Response({'detail': 'Only admin can delete another admin.'}, status=403)
return super().destroy(request, *args, **kwargs)
# ✅ DEV + PROD: Profil użytkownika po username
from rest_framework import generics, status
from rest_framework.permissions import AllowAny, IsAuthenticated
from rest_framework.exceptions import ValidationError as DRFValidationError
from django.core.exceptions import ValidationError as DjangoValidationError
from django.core.mail import send_mail
from django.conf import settings
from django.utils import timezone
from datetime import timedelta
from .models import User, EmailChangeToken
from .serializers import UserSerializer
class UserProfileView(generics.RetrieveUpdateAPIView):
"""
GET /profile/<username>/ podgląd dla wszystkich
PUT/PATCH tylko właściciel (bez wyjątków dla adminów)
Zmiana e-maila: wymaga potwierdzenia linkiem.
"""
queryset = User.objects.all()
serializer_class = UserSerializer
lookup_field = 'username'
def get_permissions(self):
if self.request.method == "GET":
return [AllowAny()]
return [IsAuthenticated()]
def _start_email_change_flow(self, request, obj, new_email):
# Waliduj format i unikalność e-maila
try:
validate_email(new_email)
except DjangoValidationError:
raise DRFValidationError({"email": "Invalid email address."})
if User.objects.filter(email__iexact=new_email).exclude(pk=obj.pk).exists():
raise DRFValidationError({"email": "This email is already in use."})
# Weryfikacja hasła
current_password = request.data.get("current_password")
if not current_password or not request.user.check_password(current_password):
raise DRFValidationError({"current_password": "Current password is required and must be correct."})
# Anuluj stare żądania
EmailChangeToken.objects.filter(user=obj, used=False).update(used=True)
# Utwórz token ważny 24h
tok = EmailChangeToken.objects.create(
user=obj,
new_email=new_email,
expires_at=timezone.now() + timedelta(hours=24),
)
confirm_url = f"{settings.FRONTEND_URL}/confirm-email-change/{tok.token}"
# Powiadomienia mailowe
try:
send_mail(
"Confirm your new email",
f"To confirm your new email, open: {confirm_url}",
settings.DEFAULT_FROM_EMAIL,
[new_email],
fail_silently=False,
)
if obj.email:
send_mail(
"Email change requested",
"We received a request to change the email on your account. "
"If this wasn't you, please secure your account immediately.",
settings.DEFAULT_FROM_EMAIL,
[obj.email],
fail_silently=True,
)
except Exception:
pass
return Response(
{"detail": "Email change requested. Check your new inbox to confirm."},
status=status.HTTP_202_ACCEPTED,
)
def update(self, request, *args, **kwargs):
return self.partial_update(request, *args, **kwargs)
def partial_update(self, request, *args, **kwargs):
obj = self.get_object()
# Blokada edycji przez innych użytkowników
if request.user != obj:
return Response({"detail": "You do not have permission to edit this profile."},
status=status.HTTP_403_FORBIDDEN)
data = request.data.copy()
# Flow zmiany e-maila
new_email = data.get("email")
if new_email and new_email != obj.email:
return self._start_email_change_flow(request, obj, new_email)
# Usuń email z aktualizacji, jeśli nie jest zmieniany
data.pop("email", None)
serializer = self.get_serializer(obj, data=data, partial=True)
serializer.is_valid(raise_exception=True)
self.perform_update(serializer)
return Response(serializer.data, status=200)
# ✅ DEV + PROD: Zmiana hasła
class ChangePasswordView(APIView):
permission_classes = [IsAuthenticated]
def post(self, request):
user = request.user
old_password = request.data.get('old_password')
new_password = request.data.get('new_password')
# 1) Walidacja wejścia
if not old_password or not new_password:
return Response({"detail": "Both old and new password are required."}, status=status.HTTP_400_BAD_REQUEST)
# 2) Sprawdzenie starego hasła
if not user.check_password(old_password):
return Response({"detail": "Current password is incorrect."}, status=status.HTTP_400_BAD_REQUEST)
# 3) Polityka haseł
try:
validate_password(new_password, user)
except DjangoValidationError as e:
return Response({"detail": list(e.messages)}, status=status.HTTP_400_BAD_REQUEST)
# 4) Zapis nowego hasła + bump token_version (atomowo, bez race condition)
with transaction.atomic():
user.set_password(new_password)
user.token_version = F('token_version') + 1
user.save(update_fields=["password", "token_version"])
user.refresh_from_db(fields=["token_version"])
# 5) (Opcjonalnie) ZBLACKLISTUJ wszystkie refresh jednym strzałem
try:
token_ids = list(
OutstandingToken.objects.filter(user=user).values_list("id", flat=True)
)
if token_ids:
BlacklistedToken.objects.bulk_create(
[BlacklistedToken(token_id=tid) for tid in token_ids],
ignore_conflicts=True,
)
except Exception:
pass # brak appki blacklist → nie wysypuj 500
return Response(
{"detail": "Password successfully updated. All sessions have been logged out."},
status=status.HTTP_200_OK
)
# ✅ DEV + PROD: Reset hasła (wysyłka maila z tokenem)
class ResetPasswordView(generics.GenericAPIView):
permission_classes = [AllowAny]
serializer_class = ResetPasswordSerializer
throttle_scope = "reset_password" # ← działa po dodaniu w settings.py
def post(self, request):
serializer = self.get_serializer(data=request.data)
serializer.is_valid(raise_exception=True)
email = serializer.validated_data['email']
try:
user = User.objects.get(email=email)
# unieważnij stare tokeny
PasswordResetToken.objects.filter(user=user, used=False).update(used=True)
PasswordResetToken.objects.filter(user=user, expires_at__lte=timezone.now()).delete()
token = PasswordResetToken.objects.create(
user=user,
expires_at=timezone.now() + timedelta(hours=24)
)
reset_url = f"{settings.FRONTEND_URL}/reset-password/{token.token}"
send_mail(
'Password Reset Request',
f'Click the following link to reset your password: {reset_url}',
settings.DEFAULT_FROM_EMAIL,
[email],
fail_silently=False,
)
except User.DoesNotExist:
pass # nie ujawniamy czy email istnieje
return Response({
'detail': 'If an account exists with this email, you will receive password reset instructions.'
}, status=200)
# ✅ DEV + PROD: Potwierdzenie resetu hasła
class ResetPasswordConfirmView(generics.GenericAPIView):
permission_classes = [AllowAny]
serializer_class = ResetPasswordConfirmSerializer
def post(self, request):
serializer = self.get_serializer(data=request.data)
serializer.is_valid(raise_exception=True)
token = serializer.validated_data['token']
new_password = serializer.validated_data['new_password']
with transaction.atomic():
try:
reset_token = PasswordResetToken.objects.select_for_update().get(token=token)
except PasswordResetToken.DoesNotExist:
raise DRFValidationError({"token": "Invalid token."})
if not reset_token.is_valid():
raise DRFValidationError({"token": "Invalid or expired token."})
user = reset_token.user
try:
validate_password(new_password, user)
except DjangoValidationError as e:
raise DRFValidationError({"new_password": list(e.messages)})
user.set_password(new_password)
user.save()
reset_token.used = True
reset_token.save()
return Response({'detail': 'Password has been reset successfully.'}, status=200)
class AvatarView(generics.RetrieveUpdateAPIView):
"""
GET -> publiczny podgląd URL avatara (z throttlingiem 'avatar_get')
PATCH -> upload tylko dla właściciela (z throttlingiem 'avatar_upload')
"""
serializer_class = AvatarSerializer
parser_classes = [MultiPartParser, FormParser]
throttle_classes = [ScopedRateThrottle] # <-- throttling włączony
def get_permissions(self):
# GET dla każdego, PATCH tylko zalogowany
return [AllowAny()] if self.request.method == 'GET' else [IsAuthenticated()]
def get_throttles(self):
# Ustaw osobny scope zależnie od metody
self.throttle_scope = 'avatar_get' if self.request.method == 'GET' else 'avatar_upload'
return super().get_throttles()
def get_object(self):
if self.request.method == 'GET':
# publiczny odczyt po username
return get_object_or_404(User, username=self.kwargs['username'])
# zapis wyłącznie dla właściciela
user = self.request.user
if self.kwargs.get('username') != user.username:
raise PermissionDenied('Brak uprawnień do edycji.')
return user
def get_serializer_context(self):
ctx = super().get_serializer_context()
ctx['request'] = self.request # absolutny URL w serializerze
return ctx
# ✅ DEV + PROD: Logowanie z throttlingiem
class ThrottledTokenObtainPairView(TokenObtainPairView):
serializer_class = CustomTokenObtainPairSerializer
throttle_scope = "login" # ← działa po dodaniu w settings.py
class EmailChangeConfirmView(generics.GenericAPIView):
"""
Potwierdza zmianę e-maila na podstawie tokenu z maila.
"""
permission_classes = [AllowAny]
serializer_class = EmailChangeConfirmSerializer
def post(self, request):
serializer = self.get_serializer(data=request.data)
serializer.is_valid(raise_exception=True)
token = serializer.validated_data["token"]
with transaction.atomic():
try:
ect = EmailChangeToken.objects.select_for_update().get(token=token)
except EmailChangeToken.DoesNotExist:
raise DRFValidationError({"token": "Invalid token."})
if not ect.is_valid():
raise DRFValidationError({"token": "Invalid or expired token."})
user = ect.user
old_email = user.email
user.email = ect.new_email
user.save(update_fields=["email"])
ect.used = True
ect.save(update_fields=["used"])
# Powiadomienie na nowy e-mail o udanej zmianie (opcjonalne)
try:
send_mail(
"Your email was changed",
"Your account email has been successfully updated.",
settings.DEFAULT_FROM_EMAIL,
[user.email],
fail_silently=True,
)
except Exception:
pass
return Response({"detail": "Email successfully updated."}, status=200)
class CustomTokenRefreshView(TokenRefreshView):
serializer_class = CustomTokenRefreshSerializer
class FavoriteView(APIView):
"""
Zarządzanie ulubionymi (dodaj/usuń) dla zalogowanego użytkownika, per-typ z TYPE_REGISTRY.
POST /api/favorites/
Body: { "type": "formula", "ref": "1A2-34-01" } # formula → ref = code
{ "type": "calculator", "ref": "42" } # calculator → ref = id/pk
201: { "status": "added", "type": "...", "ref": "..." }
DELETE /api/favorites/?type=formula&ref=1A2-34-01
/api/favorites/?type=calculator&ref=42
204: (no content)
"""
permission_classes = [IsAuthenticated]
def post(self, request):
ser = FavoriteCreateSerializer(data=request.data, context={'request': request})
ser.is_valid(raise_exception=True)
fav = ser.save()
# 'resolved_ref' serializera code lub pk w zależności od typu
return Response(
{"status": "added", "type": ser.validated_data["type"], "ref": ser.validated_data["resolved_ref"]},
status=status.HTTP_201_CREATED
)
def delete(self, request):
ser = FavoriteDeleteSerializer(data=request.query_params, context={'request': request})
ser.is_valid(raise_exception=True)
ser.delete()
return Response(status=status.HTTP_204_NO_CONTENT)
class UserFavoriteObjectView(APIView):
"""
GET /api/users/{username}/favorites/ # wszystkie typy
GET /api/users/{username}/favorites/?type=formula # tylko wybrany typ
Zwracamy elementy:
{
"id": <ID rekordu Favorite>,
"type": "<typ z registry>",
"ref": "<code lub pk wg lookup_field>",
"title": "<jeśli model ma>",
"slug": "<jeśli model ma>",
"favorited_at": "<ISO>",
# dla formula:
"description": "...",
"latex": "..."
}
"""
permission_classes = [IsAuthenticated, IsSelfOrAdmin]
def get(self, request, username):
target = get_object_or_404(User, username=username)
self.check_object_permissions(request, target)
only_type = request.query_params.get("type")
if only_type and only_type not in TYPE_REGISTRY:
return Response({"detail": "Unknown type."}, status=status.HTTP_400_BAD_REQUEST)
types = [only_type] if only_type else list(TYPE_REGISTRY.keys())
results = []
for t in types:
cfg = TYPE_REGISTRY[t]
Model = apps.get_model(cfg["app"], cfg["model"])
ct = ContentType.objects.get_for_model(Model)
# podzapytania: data i ID ulubionego
fav_dt_sq = (Favorite.objects
.filter(user=target, content_type=ct, object_id=OuterRef('pk'))
.values('created_at')[:1])
fav_id_sq = (Favorite.objects
.filter(user=target, content_type=ct, object_id=OuterRef('pk'))
.values('id')[:1])
qs = (Model.objects
.filter(pk__in=Favorite.objects.filter(user=target, content_type=ct).values('object_id'))
.annotate(favorited_at=Subquery(fav_dt_sq, output_field=DateTimeField()))
.annotate(favorite_id=Subquery(fav_id_sq, output_field=IntegerField()))
.order_by('-favorited_at', 'pk'))
lookup_field = cfg.get("lookup_field", "pk")
for obj in qs:
ref_value = getattr(obj, lookup_field) if lookup_field != 'pk' else obj.pk
item = {
"id": getattr(obj, "favorite_id", None), # ID w tabeli Favorite
"type": t,
"ref": ref_value, # dla formula = code
"title": getattr(obj, "title", None),
"slug": getattr(obj, "slug", None),
"favorited_at": getattr(obj, "favorited_at", None),
}
if t == "formula":
item["description"] = getattr(obj, "description", None)
# LaTeX z pola 'latex' lub awaryjnie z JSON-a w 'meta'
latex = getattr(obj, "latex", None)
if not latex:
try:
meta = json.loads(getattr(obj, "meta", "") or "{}")
latex = meta.get("latex")
except Exception:
latex = None
item["latex"] = latex
results.append(item)
results.sort(key=lambda x: (x["favorited_at"] is not None, x["favorited_at"]), reverse=True)
return Response(results, status=status.HTTP_200_OK)
class UserFavoriteDetailView(APIView):
"""
DELETE /api/users/{username}/favorites/{favorite_id}/
"""
permission_classes = [IsAuthenticated, IsSelfOrAdmin]
def delete(self, request, username, favorite_id):
target = get_object_or_404(User, username=username)
self.check_object_permissions(request, target)
fav = get_object_or_404(Favorite, id=favorite_id, user=target)
fav.delete()
return Response(status=status.HTTP_204_NO_CONTENT)