izaac-2/backend/users/serializers.py
2025-08-31 23:05:53 +02:00

235 lines
9.3 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#Serializery do obsługi użytkowników w aplikacji Django REST Framework.
from rest_framework import serializers
from rest_framework_simplejwt.serializers import TokenObtainPairSerializer, TokenRefreshSerializer
from rest_framework_simplejwt.tokens import RefreshToken
from rest_framework_simplejwt.exceptions import InvalidToken, TokenError
from django.contrib.auth import get_user_model
from django.contrib.auth.password_validation import validate_password
from django.contrib.contenttypes.models import ContentType
from django.core.validators import validate_email
from django.core.exceptions import ValidationError
from .validators import validate_password_with_translation
from django.apps import apps
from users.models import Favorite
from .models import PasswordResetToken
TYPE_REGISTRY = {
"formula": {"app": "formulas", "model": "Formula", "lookup_field": "code"}, # po code
"calculator": {"app": "tools", "model": "Calculator", "lookup_field": "pk"}, # po id/pk
# dodawaj kolejne typy wg potrzeb
}
User = get_user_model()
# Serializer do rejestracji użytkowników.
class UserRegistrationSerializer(serializers.ModelSerializer):
password = serializers.CharField(write_only=True, required=True, validators=[validate_password_with_translation])
password2 = serializers.CharField(write_only=True, required=True)
class Meta:
model = User
fields = ('username', 'password', 'password2', 'email', 'first_name', 'last_name', 'role')
extra_kwargs = {
'first_name': {'required': True},
'last_name': {'required': True},
'email': {'required': True},
'role': {'default': 'user'}
}
def validate(self, attrs):
if attrs['password'] != attrs['password2']:
raise serializers.ValidationError({"password": "Password fields didn't match."})
return attrs
def create(self, validated_data):
validated_data.pop('password2')
user = User.objects.create_user(**validated_data)
return user
# Serializer do wyświetlania i edycji użytkowników.
class UserSerializer(serializers.ModelSerializer):
class Meta:
model = User
fields = ('id', 'username', 'email', 'first_name', 'last_name', 'role', 'bio', 'date_joined', 'last_login', 'is_active')
read_only_fields = ('role', 'username', 'is_staff', 'is_active', 'is_superuser', 'date_joined', 'last_login', 'groups', 'user_permissions', 'id')
#Serializer do zmiany hasła.
class ResetPasswordSerializer(serializers.Serializer):
email = serializers.EmailField(required=True)
def validate_email(self, value):
try:
User.objects.get(email=value)
except User.DoesNotExist:
pass # Don't reveal that the email doesn't exist
return value
# Serializer do potwierdzenia resetowania hasła.
class ResetPasswordConfirmSerializer(serializers.Serializer):
token = serializers.CharField(required=True)
new_password = serializers.CharField(write_only=True, required=True, validators=[validate_password])
confirm_password = serializers.CharField(write_only=True, required=True)
def validate(self, attrs):
if attrs['new_password'] != attrs['confirm_password']:
raise serializers.ValidationError({"new_password": "Password fields didn't match."})
try:
reset_token = PasswordResetToken.objects.get(token=attrs['token'])
if not reset_token.is_valid():
raise serializers.ValidationError({"token": "Invalid or expired token."})
except PasswordResetToken.DoesNotExist:
raise serializers.ValidationError({"token": "Invalid token."})
return attrs
class AvatarSerializer(serializers.ModelSerializer):
class Meta:
model = User
fields = ['avatar_image'] # zapis przez ImageField (writeable)
# Walidacja typu i rozmiaru
def validate_avatar_image(self, file):
if file is None:
return file
ct = getattr(file, 'content_type', None)
if ct not in ('image/png', 'image/jpeg', 'image/webp'):
raise serializers.ValidationError('Dozwolone: PNG, JPG, WEBP.')
if file.size > 2 * 1024 * 1024:
raise serializers.ValidationError('Maks 2 MB.')
return file
# Zwracaj ABSOLUTNY URL w odpowiedzi (a nie tylko /media/...)
def to_representation(self, instance):
data = super().to_representation(instance)
request = self.context.get('request')
url = data.get('avatar_image')
if url and request:
# Jeśli storage zwrócił ścieżkę względną, zbuduj absolutny adres,
# działający także za proxy/HTTPS.
if not url.startswith('http'):
proto = request.META.get('HTTP_X_FORWARDED_PROTO') or ('https' if request.is_secure() else 'http')
host = request.META.get('HTTP_X_FORWARDED_HOST') or request.get_host()
url = f'{proto}://{host}{url}'
data['avatar_image'] = url
return data
class CustomTokenObtainPairSerializer(TokenObtainPairSerializer):
"""
Dodaje do tokena:
- role (np. 'admin'/'user'/'moderator')
- is_staff (bool)
- adm (bool: True dla adminów)
- token_version (do unieważniania tokenów)
"""
@classmethod
def get_token(cls, user: User):
token = super().get_token(user)
role = getattr(user, "role", None)
is_staff = bool(getattr(user, "is_staff", False))
token["role"] = role
token["is_staff"] = is_staff
token["adm"] = bool(is_staff or role == "admin")
token["username"] = user.get_username()
token["token_version"] = getattr(user, "token_version", 0)
return token
class CustomTokenRefreshSerializer(TokenRefreshSerializer):
"""
Refresh: odrzuca stary refresh (inna token_version) i nadaje nowemu access spójne claimy.
"""
def validate(self, attrs):
try:
refresh = RefreshToken(attrs['refresh'])
except TokenError as e:
raise InvalidToken(e.args[0])
user_id = refresh.get('user_id', None)
if user_id is None:
raise InvalidToken("Malformed refresh token")
user = User.objects.get(pk=user_id)
# TWARDY CHECK: refresh z inną wersją → odrzuć
if int(refresh.get("token_version", 0)) != int(getattr(user, "token_version", 0)):
raise InvalidToken("Refresh token invalidated by version bump")
# Standardowe działanie (zwróci 'access' i jeśli ROTATE_REFRESH_TOKENS=True nowy 'refresh')
data = super().validate(attrs)
# Do nowego accessa wstrzykuj aktualne claimy
new_access = RefreshToken(attrs['refresh']).access_token
role = getattr(user, "role", None)
is_staff = bool(getattr(user, "is_staff", False))
new_access['username'] = user.get_username()
new_access['role'] = role
new_access['is_staff'] = is_staff
new_access['adm'] = bool(is_staff or role == "admin")
new_access['token_version'] = getattr(user, "token_version", 0)
data['access'] = str(new_access)
return data
class EmailChangeConfirmSerializer(serializers.Serializer):
token = serializers.CharField(required=True)
class FavoriteCreateSerializer(serializers.Serializer):
type = serializers.ChoiceField(choices=list(TYPE_REGISTRY.keys()))
ref = serializers.CharField(max_length=128) # dla pk też zadziała, zrzutujemy na int
def validate(self, attrs):
cfg = TYPE_REGISTRY[attrs['type']]
Model = apps.get_model(cfg['app'], cfg['model'])
lookup_field = cfg['lookup_field'] # 'code' albo 'pk'
value = attrs['ref']
if lookup_field in ('pk', 'id'):
# pk/id traktujemy jako int
try:
value = int(value)
except (TypeError, ValueError):
raise serializers.ValidationError({'ref': 'Wartość musi być liczbą (id/pk).'})
obj = Model.objects.filter(**{lookup_field: value}).first()
if not obj:
what = 'code' if lookup_field == 'code' else 'id'
raise serializers.ValidationError({'ref': f'Obiekt typu {attrs["type"]} o podanym {what} nie istnieje.'})
attrs['content_type'] = ContentType.objects.get_for_model(Model)
attrs['object_id'] = obj.pk
attrs['resolved_ref'] = getattr(obj, 'code', obj.pk) # przyda się do odpowiedzi
return attrs
def create(self, validated_data):
user = self.context['request'].user
fav, _ = Favorite.objects.get_or_create(
user=user,
content_type=validated_data['content_type'],
object_id=validated_data['object_id'],
)
return fav
class FavoriteDeleteSerializer(FavoriteCreateSerializer):
# ten sam kontrakt (type + ref), tylko operacja delete
def delete(self):
user = self.context['request'].user
Favorite.objects.filter(
user=user,
content_type=self.validated_data['content_type'],
object_id=self.validated_data['object_id'],
).delete()