#Serializery do obsługi użytkowników w aplikacji Django REST Framework. from rest_framework import serializers from rest_framework_simplejwt.serializers import TokenObtainPairSerializer, TokenRefreshSerializer from rest_framework_simplejwt.tokens import RefreshToken from rest_framework_simplejwt.exceptions import InvalidToken, TokenError from django.contrib.auth import get_user_model from django.contrib.auth.password_validation import validate_password from django.contrib.contenttypes.models import ContentType from django.core.validators import validate_email from django.core.exceptions import ValidationError from .validators import validate_password_with_translation from django.apps import apps from users.models import Favorite from .models import PasswordResetToken TYPE_REGISTRY = { "formula": {"app": "formulas", "model": "Formula", "lookup_field": "code"}, # po code "calculator": {"app": "tools", "model": "Calculator", "lookup_field": "pk"}, # po id/pk # dodawaj kolejne typy wg potrzeb } User = get_user_model() # Serializer do rejestracji użytkowników. class UserRegistrationSerializer(serializers.ModelSerializer): password = serializers.CharField(write_only=True, required=True, validators=[validate_password_with_translation]) password2 = serializers.CharField(write_only=True, required=True) class Meta: model = User fields = ('username', 'password', 'password2', 'email', 'first_name', 'last_name', 'role') extra_kwargs = { 'first_name': {'required': True}, 'last_name': {'required': True}, 'email': {'required': True}, 'role': {'default': 'user'} } def validate(self, attrs): if attrs['password'] != attrs['password2']: raise serializers.ValidationError({"password": "Password fields didn't match."}) return attrs def create(self, validated_data): validated_data.pop('password2') user = User.objects.create_user(**validated_data) return user # Serializer do wyświetlania i edycji użytkowników. class UserSerializer(serializers.ModelSerializer): class Meta: model = User fields = ('id', 'username', 'email', 'first_name', 'last_name', 'role', 'bio', 'date_joined', 'last_login', 'is_active') read_only_fields = ('role', 'username', 'is_staff', 'is_active', 'is_superuser', 'date_joined', 'last_login', 'groups', 'user_permissions', 'id') #Serializer do zmiany hasła. class ResetPasswordSerializer(serializers.Serializer): email = serializers.EmailField(required=True) def validate_email(self, value): try: User.objects.get(email=value) except User.DoesNotExist: pass # Don't reveal that the email doesn't exist return value # Serializer do potwierdzenia resetowania hasła. class ResetPasswordConfirmSerializer(serializers.Serializer): token = serializers.CharField(required=True) new_password = serializers.CharField(write_only=True, required=True, validators=[validate_password]) confirm_password = serializers.CharField(write_only=True, required=True) def validate(self, attrs): if attrs['new_password'] != attrs['confirm_password']: raise serializers.ValidationError({"new_password": "Password fields didn't match."}) try: reset_token = PasswordResetToken.objects.get(token=attrs['token']) if not reset_token.is_valid(): raise serializers.ValidationError({"token": "Invalid or expired token."}) except PasswordResetToken.DoesNotExist: raise serializers.ValidationError({"token": "Invalid token."}) return attrs class AvatarSerializer(serializers.ModelSerializer): class Meta: model = User fields = ['avatar_image'] # zapis przez ImageField (writeable) # Walidacja typu i rozmiaru def validate_avatar_image(self, file): if file is None: return file ct = getattr(file, 'content_type', None) if ct not in ('image/png', 'image/jpeg', 'image/webp'): raise serializers.ValidationError('Dozwolone: PNG, JPG, WEBP.') if file.size > 2 * 1024 * 1024: raise serializers.ValidationError('Maks 2 MB.') return file # Zwracaj ABSOLUTNY URL w odpowiedzi (a nie tylko /media/...) def to_representation(self, instance): data = super().to_representation(instance) request = self.context.get('request') url = data.get('avatar_image') if url and request: # Jeśli storage zwrócił ścieżkę względną, zbuduj absolutny adres, # działający także za proxy/HTTPS. if not url.startswith('http'): proto = request.META.get('HTTP_X_FORWARDED_PROTO') or ('https' if request.is_secure() else 'http') host = request.META.get('HTTP_X_FORWARDED_HOST') or request.get_host() url = f'{proto}://{host}{url}' data['avatar_image'] = url return data class CustomTokenObtainPairSerializer(TokenObtainPairSerializer): """ Dodaje do tokena: - role (np. 'admin'/'user'/'moderator') - is_staff (bool) - adm (bool: True dla adminów) - token_version (do unieważniania tokenów) """ @classmethod def get_token(cls, user: User): token = super().get_token(user) role = getattr(user, "role", None) is_staff = bool(getattr(user, "is_staff", False)) token["role"] = role token["is_staff"] = is_staff token["adm"] = bool(is_staff or role == "admin") token["username"] = user.get_username() token["token_version"] = getattr(user, "token_version", 0) return token class CustomTokenRefreshSerializer(TokenRefreshSerializer): """ Refresh: odrzuca stary refresh (inna token_version) i nadaje nowemu access spójne claimy. """ def validate(self, attrs): try: refresh = RefreshToken(attrs['refresh']) except TokenError as e: raise InvalidToken(e.args[0]) user_id = refresh.get('user_id', None) if user_id is None: raise InvalidToken("Malformed refresh token") user = User.objects.get(pk=user_id) # TWARDY CHECK: refresh z inną wersją → odrzuć if int(refresh.get("token_version", 0)) != int(getattr(user, "token_version", 0)): raise InvalidToken("Refresh token invalidated by version bump") # Standardowe działanie (zwróci 'access' i – jeśli ROTATE_REFRESH_TOKENS=True – nowy 'refresh') data = super().validate(attrs) # Do nowego accessa wstrzykuj aktualne claimy new_access = RefreshToken(attrs['refresh']).access_token role = getattr(user, "role", None) is_staff = bool(getattr(user, "is_staff", False)) new_access['username'] = user.get_username() new_access['role'] = role new_access['is_staff'] = is_staff new_access['adm'] = bool(is_staff or role == "admin") new_access['token_version'] = getattr(user, "token_version", 0) data['access'] = str(new_access) return data class EmailChangeConfirmSerializer(serializers.Serializer): token = serializers.CharField(required=True) class FavoriteCreateSerializer(serializers.Serializer): type = serializers.ChoiceField(choices=list(TYPE_REGISTRY.keys())) ref = serializers.CharField(max_length=128) # dla pk też zadziała, zrzutujemy na int def validate(self, attrs): cfg = TYPE_REGISTRY[attrs['type']] Model = apps.get_model(cfg['app'], cfg['model']) lookup_field = cfg['lookup_field'] # 'code' albo 'pk' value = attrs['ref'] if lookup_field in ('pk', 'id'): # pk/id traktujemy jako int try: value = int(value) except (TypeError, ValueError): raise serializers.ValidationError({'ref': 'Wartość musi być liczbą (id/pk).'}) obj = Model.objects.filter(**{lookup_field: value}).first() if not obj: what = 'code' if lookup_field == 'code' else 'id' raise serializers.ValidationError({'ref': f'Obiekt typu {attrs["type"]} o podanym {what} nie istnieje.'}) attrs['content_type'] = ContentType.objects.get_for_model(Model) attrs['object_id'] = obj.pk attrs['resolved_ref'] = getattr(obj, 'code', obj.pk) # przyda się do odpowiedzi return attrs def create(self, validated_data): user = self.context['request'].user fav, _ = Favorite.objects.get_or_create( user=user, content_type=validated_data['content_type'], object_id=validated_data['object_id'], ) return fav class FavoriteDeleteSerializer(FavoriteCreateSerializer): # ten sam kontrakt (type + ref), tylko operacja delete def delete(self): user = self.context['request'].user Favorite.objects.filter( user=user, content_type=self.validated_data['content_type'], object_id=self.validated_data['object_id'], ).delete()