# -*- coding: utf-8 -*- """User management views for Django + DRF (dev-ready, prod-notes in comments).""" from django.apps import apps from datetime import timedelta from django.conf import settings from django.contrib.auth import get_user_model from django.contrib.auth.password_validation import validate_password from django.contrib.contenttypes.models import ContentType from django.core.exceptions import ValidationError as DjangoValidationError from django.core.mail import send_mail from django.utils import timezone from django.db import transaction from django.db.models import OuterRef, Subquery, DateTimeField, IntegerField from rest_framework_simplejwt.token_blacklist.models import OutstandingToken, BlacklistedToken from rest_framework import generics, status, viewsets from rest_framework.exceptions import PermissionDenied, ValidationError as DRFValidationError from rest_framework.permissions import AllowAny, IsAdminUser, IsAuthenticated, IsAuthenticatedOrReadOnly from rest_framework.response import Response from rest_framework.views import APIView from django.shortcuts import get_object_or_404 from .permissions import IsSelfOrAdmin from rest_framework.parsers import MultiPartParser, FormParser from rest_framework_simplejwt.views import TokenObtainPairView, TokenRefreshView # ← użyjemy z throttlingiem from rest_framework.throttling import ScopedRateThrottle from .models import PasswordResetToken, Favorite from .serializers import ( UserRegistrationSerializer, UserSerializer, ResetPasswordSerializer, ResetPasswordConfirmSerializer, AvatarSerializer, CustomTokenObtainPairSerializer, EmailChangeConfirmSerializer, CustomTokenRefreshSerializer, FavoriteCreateSerializer, FavoriteDeleteSerializer, TYPE_REGISTRY, ) from django.db.models import F User = get_user_model() # ✅ DEV + PROD: Rejestracja użytkownika class RegisterView(generics.CreateAPIView): permission_classes = (AllowAny,) serializer_class = UserRegistrationSerializer def create(self, request, *args, **kwargs): serializer = self.get_serializer(data=request.data) serializer.is_valid(raise_exception=True) self.perform_create(serializer) return Response( {"message": "User registered successfully. Please login to continue."}, status=status.HTTP_201_CREATED, ) # ✅ DEV + PROD: Widok listy użytkowników (tylko admin) class UserViewSet(viewsets.ModelViewSet): queryset = User.objects.all() serializer_class = UserSerializer permission_classes = [IsAdminUser] lookup_field = 'username' def destroy(self, request, *args, **kwargs): instance = self.get_object() if instance.role == 'admin' and getattr(request.user, 'role', '') != 'admin': return Response({'detail': 'Only admin can delete another admin.'}, status=403) return super().destroy(request, *args, **kwargs) # ✅ DEV + PROD: Profil użytkownika po username from rest_framework import generics, status from rest_framework.permissions import AllowAny, IsAuthenticated from rest_framework.exceptions import ValidationError as DRFValidationError from django.core.exceptions import ValidationError as DjangoValidationError from django.core.mail import send_mail from django.conf import settings from django.utils import timezone from datetime import timedelta from .models import User, EmailChangeToken from .serializers import UserSerializer class UserProfileView(generics.RetrieveUpdateAPIView): """ GET /profile// – podgląd dla wszystkich PUT/PATCH – tylko właściciel (bez wyjątków dla adminów) Zmiana e-maila: wymaga potwierdzenia linkiem. """ queryset = User.objects.all() serializer_class = UserSerializer lookup_field = 'username' def get_permissions(self): if self.request.method == "GET": return [AllowAny()] return [IsAuthenticated()] def _start_email_change_flow(self, request, obj, new_email): # Waliduj format i unikalność e-maila try: validate_email(new_email) except DjangoValidationError: raise DRFValidationError({"email": "Invalid email address."}) if User.objects.filter(email__iexact=new_email).exclude(pk=obj.pk).exists(): raise DRFValidationError({"email": "This email is already in use."}) # Weryfikacja hasła current_password = request.data.get("current_password") if not current_password or not request.user.check_password(current_password): raise DRFValidationError({"current_password": "Current password is required and must be correct."}) # Anuluj stare żądania EmailChangeToken.objects.filter(user=obj, used=False).update(used=True) # Utwórz token ważny 24h tok = EmailChangeToken.objects.create( user=obj, new_email=new_email, expires_at=timezone.now() + timedelta(hours=24), ) confirm_url = f"{settings.FRONTEND_URL}/confirm-email-change/{tok.token}" # Powiadomienia mailowe try: send_mail( "Confirm your new email", f"To confirm your new email, open: {confirm_url}", settings.DEFAULT_FROM_EMAIL, [new_email], fail_silently=False, ) if obj.email: send_mail( "Email change requested", "We received a request to change the email on your account. " "If this wasn't you, please secure your account immediately.", settings.DEFAULT_FROM_EMAIL, [obj.email], fail_silently=True, ) except Exception: pass return Response( {"detail": "Email change requested. Check your new inbox to confirm."}, status=status.HTTP_202_ACCEPTED, ) def update(self, request, *args, **kwargs): return self.partial_update(request, *args, **kwargs) def partial_update(self, request, *args, **kwargs): obj = self.get_object() # Blokada edycji przez innych użytkowników if request.user != obj: return Response({"detail": "You do not have permission to edit this profile."}, status=status.HTTP_403_FORBIDDEN) data = request.data.copy() # Flow zmiany e-maila new_email = data.get("email") if new_email and new_email != obj.email: return self._start_email_change_flow(request, obj, new_email) # Usuń email z aktualizacji, jeśli nie jest zmieniany data.pop("email", None) serializer = self.get_serializer(obj, data=data, partial=True) serializer.is_valid(raise_exception=True) self.perform_update(serializer) return Response(serializer.data, status=200) # ✅ DEV + PROD: Zmiana hasła class ChangePasswordView(APIView): permission_classes = [IsAuthenticated] def post(self, request): user = request.user old_password = request.data.get('old_password') new_password = request.data.get('new_password') # 1) Walidacja wejścia if not old_password or not new_password: return Response({"detail": "Both old and new password are required."}, status=status.HTTP_400_BAD_REQUEST) # 2) Sprawdzenie starego hasła if not user.check_password(old_password): return Response({"detail": "Current password is incorrect."}, status=status.HTTP_400_BAD_REQUEST) # 3) Polityka haseł try: validate_password(new_password, user) except DjangoValidationError as e: return Response({"detail": list(e.messages)}, status=status.HTTP_400_BAD_REQUEST) # 4) Zapis nowego hasła + bump token_version (atomowo, bez race condition) with transaction.atomic(): user.set_password(new_password) user.token_version = F('token_version') + 1 user.save(update_fields=["password", "token_version"]) user.refresh_from_db(fields=["token_version"]) # 5) (Opcjonalnie) ZBLACKLISTUJ wszystkie refresh jednym strzałem try: token_ids = list( OutstandingToken.objects.filter(user=user).values_list("id", flat=True) ) if token_ids: BlacklistedToken.objects.bulk_create( [BlacklistedToken(token_id=tid) for tid in token_ids], ignore_conflicts=True, ) except Exception: pass # brak appki blacklist → nie wysypuj 500 return Response( {"detail": "Password successfully updated. All sessions have been logged out."}, status=status.HTTP_200_OK ) # ✅ DEV + PROD: Reset hasła (wysyłka maila z tokenem) class ResetPasswordView(generics.GenericAPIView): permission_classes = [AllowAny] serializer_class = ResetPasswordSerializer throttle_scope = "reset_password" # ← działa po dodaniu w settings.py def post(self, request): serializer = self.get_serializer(data=request.data) serializer.is_valid(raise_exception=True) email = serializer.validated_data['email'] try: user = User.objects.get(email=email) # unieważnij stare tokeny PasswordResetToken.objects.filter(user=user, used=False).update(used=True) PasswordResetToken.objects.filter(user=user, expires_at__lte=timezone.now()).delete() token = PasswordResetToken.objects.create( user=user, expires_at=timezone.now() + timedelta(hours=24) ) reset_url = f"{settings.FRONTEND_URL}/reset-password/{token.token}" send_mail( 'Password Reset Request', f'Click the following link to reset your password: {reset_url}', settings.DEFAULT_FROM_EMAIL, [email], fail_silently=False, ) except User.DoesNotExist: pass # nie ujawniamy czy email istnieje return Response({ 'detail': 'If an account exists with this email, you will receive password reset instructions.' }, status=200) # ✅ DEV + PROD: Potwierdzenie resetu hasła class ResetPasswordConfirmView(generics.GenericAPIView): permission_classes = [AllowAny] serializer_class = ResetPasswordConfirmSerializer def post(self, request): serializer = self.get_serializer(data=request.data) serializer.is_valid(raise_exception=True) token = serializer.validated_data['token'] new_password = serializer.validated_data['new_password'] with transaction.atomic(): try: reset_token = PasswordResetToken.objects.select_for_update().get(token=token) except PasswordResetToken.DoesNotExist: raise DRFValidationError({"token": "Invalid token."}) if not reset_token.is_valid(): raise DRFValidationError({"token": "Invalid or expired token."}) user = reset_token.user try: validate_password(new_password, user) except DjangoValidationError as e: raise DRFValidationError({"new_password": list(e.messages)}) user.set_password(new_password) user.save() reset_token.used = True reset_token.save() return Response({'detail': 'Password has been reset successfully.'}, status=200) class AvatarView(generics.RetrieveUpdateAPIView): """ GET -> publiczny podgląd URL avatara (z throttlingiem 'avatar_get') PATCH -> upload tylko dla właściciela (z throttlingiem 'avatar_upload') """ serializer_class = AvatarSerializer parser_classes = [MultiPartParser, FormParser] throttle_classes = [ScopedRateThrottle] # <-- throttling włączony def get_permissions(self): # GET dla każdego, PATCH tylko zalogowany return [AllowAny()] if self.request.method == 'GET' else [IsAuthenticated()] def get_throttles(self): # Ustaw osobny scope zależnie od metody self.throttle_scope = 'avatar_get' if self.request.method == 'GET' else 'avatar_upload' return super().get_throttles() def get_object(self): if self.request.method == 'GET': # publiczny odczyt po username return get_object_or_404(User, username=self.kwargs['username']) # zapis wyłącznie dla właściciela user = self.request.user if self.kwargs.get('username') != user.username: raise PermissionDenied('Brak uprawnień do edycji.') return user def get_serializer_context(self): ctx = super().get_serializer_context() ctx['request'] = self.request # absolutny URL w serializerze return ctx # ✅ DEV + PROD: Logowanie z throttlingiem class ThrottledTokenObtainPairView(TokenObtainPairView): serializer_class = CustomTokenObtainPairSerializer throttle_scope = "login" # ← działa po dodaniu w settings.py class EmailChangeConfirmView(generics.GenericAPIView): """ Potwierdza zmianę e-maila na podstawie tokenu z maila. """ permission_classes = [AllowAny] serializer_class = EmailChangeConfirmSerializer def post(self, request): serializer = self.get_serializer(data=request.data) serializer.is_valid(raise_exception=True) token = serializer.validated_data["token"] with transaction.atomic(): try: ect = EmailChangeToken.objects.select_for_update().get(token=token) except EmailChangeToken.DoesNotExist: raise DRFValidationError({"token": "Invalid token."}) if not ect.is_valid(): raise DRFValidationError({"token": "Invalid or expired token."}) user = ect.user old_email = user.email user.email = ect.new_email user.save(update_fields=["email"]) ect.used = True ect.save(update_fields=["used"]) # Powiadomienie na nowy e-mail o udanej zmianie (opcjonalne) try: send_mail( "Your email was changed", "Your account email has been successfully updated.", settings.DEFAULT_FROM_EMAIL, [user.email], fail_silently=True, ) except Exception: pass return Response({"detail": "Email successfully updated."}, status=200) class CustomTokenRefreshView(TokenRefreshView): serializer_class = CustomTokenRefreshSerializer class FavoriteView(APIView): """ Zarządzanie ulubionymi (dodaj/usuń) dla zalogowanego użytkownika, per-typ z TYPE_REGISTRY. POST /api/favorites/ Body: { "type": "formula", "ref": "1A2-34-01" } # formula → ref = code { "type": "calculator", "ref": "42" } # calculator → ref = id/pk 201: { "status": "added", "type": "...", "ref": "..." } DELETE /api/favorites/?type=formula&ref=1A2-34-01 /api/favorites/?type=calculator&ref=42 204: (no content) """ permission_classes = [IsAuthenticated] def post(self, request): ser = FavoriteCreateSerializer(data=request.data, context={'request': request}) ser.is_valid(raise_exception=True) fav = ser.save() # 'resolved_ref' serializera – code lub pk w zależności od typu return Response( {"status": "added", "type": ser.validated_data["type"], "ref": ser.validated_data["resolved_ref"]}, status=status.HTTP_201_CREATED ) def delete(self, request): ser = FavoriteDeleteSerializer(data=request.query_params, context={'request': request}) ser.is_valid(raise_exception=True) ser.delete() return Response(status=status.HTTP_204_NO_CONTENT) class UserFavoriteObjectView(APIView): """ GET /api/users/{username}/favorites/ # wszystkie typy GET /api/users/{username}/favorites/?type=formula # tylko wybrany typ Zwracamy elementy: { "id": , "type": "", "ref": "", "title": "", "slug": "", "favorited_at": "", # dla formula: "description": "...", "latex": "..." } """ permission_classes = [IsAuthenticated, IsSelfOrAdmin] def get(self, request, username): target = get_object_or_404(User, username=username) self.check_object_permissions(request, target) only_type = request.query_params.get("type") if only_type and only_type not in TYPE_REGISTRY: return Response({"detail": "Unknown type."}, status=status.HTTP_400_BAD_REQUEST) types = [only_type] if only_type else list(TYPE_REGISTRY.keys()) results = [] for t in types: cfg = TYPE_REGISTRY[t] Model = apps.get_model(cfg["app"], cfg["model"]) ct = ContentType.objects.get_for_model(Model) # podzapytania: data i ID ulubionego fav_dt_sq = (Favorite.objects .filter(user=target, content_type=ct, object_id=OuterRef('pk')) .values('created_at')[:1]) fav_id_sq = (Favorite.objects .filter(user=target, content_type=ct, object_id=OuterRef('pk')) .values('id')[:1]) qs = (Model.objects .filter(pk__in=Favorite.objects.filter(user=target, content_type=ct).values('object_id')) .annotate(favorited_at=Subquery(fav_dt_sq, output_field=DateTimeField())) .annotate(favorite_id=Subquery(fav_id_sq, output_field=IntegerField())) .order_by('-favorited_at', 'pk')) lookup_field = cfg.get("lookup_field", "pk") for obj in qs: ref_value = getattr(obj, lookup_field) if lookup_field != 'pk' else obj.pk item = { "id": getattr(obj, "favorite_id", None), # ID w tabeli Favorite "type": t, "ref": ref_value, # dla formula = code "title": getattr(obj, "title", None), "slug": getattr(obj, "slug", None), "favorited_at": getattr(obj, "favorited_at", None), } if t == "formula": item["description"] = getattr(obj, "description", None) # LaTeX z pola 'latex' lub awaryjnie z JSON-a w 'meta' latex = getattr(obj, "latex", None) if not latex: try: meta = json.loads(getattr(obj, "meta", "") or "{}") latex = meta.get("latex") except Exception: latex = None item["latex"] = latex results.append(item) results.sort(key=lambda x: (x["favorited_at"] is not None, x["favorited_at"]), reverse=True) return Response(results, status=status.HTTP_200_OK) class UserFavoriteDetailView(APIView): """ DELETE /api/users/{username}/favorites/{favorite_id}/ """ permission_classes = [IsAuthenticated, IsSelfOrAdmin] def delete(self, request, username, favorite_id): target = get_object_or_404(User, username=username) self.check_object_permissions(request, target) fav = get_object_or_404(Favorite, id=favorite_id, user=target) fav.delete() return Response(status=status.HTTP_204_NO_CONTENT)