235 lines
9.3 KiB
Python
235 lines
9.3 KiB
Python
#Serializery do obsługi użytkowników w aplikacji Django REST Framework.
|
||
from rest_framework import serializers
|
||
from rest_framework_simplejwt.serializers import TokenObtainPairSerializer, TokenRefreshSerializer
|
||
from rest_framework_simplejwt.tokens import RefreshToken
|
||
from rest_framework_simplejwt.exceptions import InvalidToken, TokenError
|
||
|
||
|
||
from django.contrib.auth import get_user_model
|
||
from django.contrib.auth.password_validation import validate_password
|
||
from django.contrib.contenttypes.models import ContentType
|
||
|
||
from django.core.validators import validate_email
|
||
from django.core.exceptions import ValidationError
|
||
from .validators import validate_password_with_translation
|
||
from django.apps import apps
|
||
|
||
from users.models import Favorite
|
||
from .models import PasswordResetToken
|
||
|
||
|
||
TYPE_REGISTRY = {
|
||
"formula": {"app": "formulas", "model": "Formula", "lookup_field": "code"}, # po code
|
||
"calculator": {"app": "tools", "model": "Calculator", "lookup_field": "pk"}, # po id/pk
|
||
# dodawaj kolejne typy wg potrzeb
|
||
}
|
||
|
||
|
||
User = get_user_model()
|
||
|
||
# Serializer do rejestracji użytkowników.
|
||
class UserRegistrationSerializer(serializers.ModelSerializer):
|
||
password = serializers.CharField(write_only=True, required=True, validators=[validate_password_with_translation])
|
||
password2 = serializers.CharField(write_only=True, required=True)
|
||
|
||
class Meta:
|
||
model = User
|
||
fields = ('username', 'password', 'password2', 'email', 'first_name', 'last_name', 'role')
|
||
extra_kwargs = {
|
||
'first_name': {'required': True},
|
||
'last_name': {'required': True},
|
||
'email': {'required': True},
|
||
'role': {'default': 'user'}
|
||
}
|
||
|
||
def validate(self, attrs):
|
||
if attrs['password'] != attrs['password2']:
|
||
raise serializers.ValidationError({"password": "Password fields didn't match."})
|
||
return attrs
|
||
|
||
def create(self, validated_data):
|
||
validated_data.pop('password2')
|
||
user = User.objects.create_user(**validated_data)
|
||
return user
|
||
|
||
# Serializer do wyświetlania i edycji użytkowników.
|
||
class UserSerializer(serializers.ModelSerializer):
|
||
class Meta:
|
||
model = User
|
||
fields = ('id', 'username', 'email', 'first_name', 'last_name', 'role', 'bio', 'date_joined', 'last_login', 'is_active')
|
||
read_only_fields = ('role', 'username', 'is_staff', 'is_active', 'is_superuser', 'date_joined', 'last_login', 'groups', 'user_permissions', 'id')
|
||
|
||
#Serializer do zmiany hasła.
|
||
class ResetPasswordSerializer(serializers.Serializer):
|
||
email = serializers.EmailField(required=True)
|
||
|
||
def validate_email(self, value):
|
||
try:
|
||
User.objects.get(email=value)
|
||
except User.DoesNotExist:
|
||
pass # Don't reveal that the email doesn't exist
|
||
return value
|
||
|
||
# Serializer do potwierdzenia resetowania hasła.
|
||
class ResetPasswordConfirmSerializer(serializers.Serializer):
|
||
token = serializers.CharField(required=True)
|
||
new_password = serializers.CharField(write_only=True, required=True, validators=[validate_password])
|
||
confirm_password = serializers.CharField(write_only=True, required=True)
|
||
|
||
def validate(self, attrs):
|
||
if attrs['new_password'] != attrs['confirm_password']:
|
||
raise serializers.ValidationError({"new_password": "Password fields didn't match."})
|
||
|
||
try:
|
||
reset_token = PasswordResetToken.objects.get(token=attrs['token'])
|
||
if not reset_token.is_valid():
|
||
raise serializers.ValidationError({"token": "Invalid or expired token."})
|
||
except PasswordResetToken.DoesNotExist:
|
||
raise serializers.ValidationError({"token": "Invalid token."})
|
||
|
||
return attrs
|
||
|
||
class AvatarSerializer(serializers.ModelSerializer):
|
||
class Meta:
|
||
model = User
|
||
fields = ['avatar_image'] # zapis przez ImageField (writeable)
|
||
|
||
# Walidacja typu i rozmiaru
|
||
def validate_avatar_image(self, file):
|
||
if file is None:
|
||
return file
|
||
ct = getattr(file, 'content_type', None)
|
||
if ct not in ('image/png', 'image/jpeg', 'image/webp'):
|
||
raise serializers.ValidationError('Dozwolone: PNG, JPG, WEBP.')
|
||
if file.size > 2 * 1024 * 1024:
|
||
raise serializers.ValidationError('Maks 2 MB.')
|
||
return file
|
||
|
||
# Zwracaj ABSOLUTNY URL w odpowiedzi (a nie tylko /media/...)
|
||
def to_representation(self, instance):
|
||
data = super().to_representation(instance)
|
||
request = self.context.get('request')
|
||
url = data.get('avatar_image')
|
||
if url and request:
|
||
# Jeśli storage zwrócił ścieżkę względną, zbuduj absolutny adres,
|
||
# działający także za proxy/HTTPS.
|
||
if not url.startswith('http'):
|
||
proto = request.META.get('HTTP_X_FORWARDED_PROTO') or ('https' if request.is_secure() else 'http')
|
||
host = request.META.get('HTTP_X_FORWARDED_HOST') or request.get_host()
|
||
url = f'{proto}://{host}{url}'
|
||
data['avatar_image'] = url
|
||
return data
|
||
|
||
class CustomTokenObtainPairSerializer(TokenObtainPairSerializer):
|
||
"""
|
||
Dodaje do tokena:
|
||
- role (np. 'admin'/'user'/'moderator')
|
||
- is_staff (bool)
|
||
- adm (bool: True dla adminów)
|
||
- token_version (do unieważniania tokenów)
|
||
"""
|
||
@classmethod
|
||
def get_token(cls, user: User):
|
||
token = super().get_token(user)
|
||
|
||
role = getattr(user, "role", None)
|
||
is_staff = bool(getattr(user, "is_staff", False))
|
||
|
||
token["role"] = role
|
||
token["is_staff"] = is_staff
|
||
token["adm"] = bool(is_staff or role == "admin")
|
||
token["username"] = user.get_username()
|
||
token["token_version"] = getattr(user, "token_version", 0)
|
||
|
||
return token
|
||
|
||
|
||
class CustomTokenRefreshSerializer(TokenRefreshSerializer):
|
||
"""
|
||
Refresh: odrzuca stary refresh (inna token_version) i nadaje nowemu access spójne claimy.
|
||
"""
|
||
def validate(self, attrs):
|
||
try:
|
||
refresh = RefreshToken(attrs['refresh'])
|
||
except TokenError as e:
|
||
raise InvalidToken(e.args[0])
|
||
|
||
user_id = refresh.get('user_id', None)
|
||
if user_id is None:
|
||
raise InvalidToken("Malformed refresh token")
|
||
|
||
user = User.objects.get(pk=user_id)
|
||
|
||
# TWARDY CHECK: refresh z inną wersją → odrzuć
|
||
if int(refresh.get("token_version", 0)) != int(getattr(user, "token_version", 0)):
|
||
raise InvalidToken("Refresh token invalidated by version bump")
|
||
|
||
# Standardowe działanie (zwróci 'access' i – jeśli ROTATE_REFRESH_TOKENS=True – nowy 'refresh')
|
||
data = super().validate(attrs)
|
||
|
||
# Do nowego accessa wstrzykuj aktualne claimy
|
||
new_access = RefreshToken(attrs['refresh']).access_token
|
||
role = getattr(user, "role", None)
|
||
is_staff = bool(getattr(user, "is_staff", False))
|
||
|
||
new_access['username'] = user.get_username()
|
||
new_access['role'] = role
|
||
new_access['is_staff'] = is_staff
|
||
new_access['adm'] = bool(is_staff or role == "admin")
|
||
new_access['token_version'] = getattr(user, "token_version", 0)
|
||
|
||
data['access'] = str(new_access)
|
||
return data
|
||
|
||
class EmailChangeConfirmSerializer(serializers.Serializer):
|
||
token = serializers.CharField(required=True)
|
||
|
||
|
||
|
||
class FavoriteCreateSerializer(serializers.Serializer):
|
||
type = serializers.ChoiceField(choices=list(TYPE_REGISTRY.keys()))
|
||
ref = serializers.CharField(max_length=128) # dla pk też zadziała, zrzutujemy na int
|
||
|
||
def validate(self, attrs):
|
||
cfg = TYPE_REGISTRY[attrs['type']]
|
||
Model = apps.get_model(cfg['app'], cfg['model'])
|
||
|
||
lookup_field = cfg['lookup_field'] # 'code' albo 'pk'
|
||
value = attrs['ref']
|
||
|
||
if lookup_field in ('pk', 'id'):
|
||
# pk/id traktujemy jako int
|
||
try:
|
||
value = int(value)
|
||
except (TypeError, ValueError):
|
||
raise serializers.ValidationError({'ref': 'Wartość musi być liczbą (id/pk).'})
|
||
|
||
obj = Model.objects.filter(**{lookup_field: value}).first()
|
||
if not obj:
|
||
what = 'code' if lookup_field == 'code' else 'id'
|
||
raise serializers.ValidationError({'ref': f'Obiekt typu {attrs["type"]} o podanym {what} nie istnieje.'})
|
||
|
||
attrs['content_type'] = ContentType.objects.get_for_model(Model)
|
||
attrs['object_id'] = obj.pk
|
||
attrs['resolved_ref'] = getattr(obj, 'code', obj.pk) # przyda się do odpowiedzi
|
||
return attrs
|
||
|
||
def create(self, validated_data):
|
||
user = self.context['request'].user
|
||
fav, _ = Favorite.objects.get_or_create(
|
||
user=user,
|
||
content_type=validated_data['content_type'],
|
||
object_id=validated_data['object_id'],
|
||
)
|
||
return fav
|
||
|
||
|
||
class FavoriteDeleteSerializer(FavoriteCreateSerializer):
|
||
# ten sam kontrakt (type + ref), tylko operacja delete
|
||
def delete(self):
|
||
user = self.context['request'].user
|
||
Favorite.objects.filter(
|
||
user=user,
|
||
content_type=self.validated_data['content_type'],
|
||
object_id=self.validated_data['object_id'],
|
||
).delete() |