2025-08-31 23:05:53 +02:00

288 lines
9.8 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#Obsługa widoków dla aplikacji content.
import os
from django.conf import settings
from django.contrib.auth import get_user_model
from django.db.models import Q
from django.shortcuts import get_object_or_404
from django.utils import timezone
from rest_framework import viewsets, status, filters, serializers
from rest_framework.views import APIView
from rest_framework.response import Response
from rest_framework.decorators import action
from rest_framework.permissions import (
IsAuthenticated,
IsAuthenticatedOrReadOnly,
AllowAny,
)
from rest_framework.parsers import JSONParser, MultiPartParser, FormParser
from django_filters.rest_framework import DjangoFilterBackend
from .models import (
Post,
Comment,
Page,
Category,
UploadedImage,
Tag,
)
from .serializers import (
PostSerializer,
CommentSerializer,
PageSerializer,
CategorySerializer,
UploadedImageSerializer,
TagSerializer,
)
from users.permissions import IsAuthorOrReadOnly, IsEditorOrAdmin, IsAdminOrReadOnly
# Obsługa logiczna widoku dla postów
class PostViewSet(viewsets.ModelViewSet):
queryset = Post.objects.all()
serializer_class = PostSerializer
permission_classes = [IsAdminOrReadOnly]
filter_backends = [DjangoFilterBackend, filters.SearchFilter, filters.OrderingFilter]
filterset_fields = ['status', 'author', 'category']
search_fields = ['title', 'content']
ordering_fields = ['created_at', 'updated_at', 'title']
ordering = ['-created_at']
lookup_field = 'slug'
def perform_create(self, serializer):
serializer.save(author=self.request.user)
def get_queryset(self):
queryset = Post.objects.all()
if not self.request.user.is_authenticated:
queryset = queryset.filter(status='published')
elif not self.request.user.role in ['admin', 'moderator']:
queryset = queryset.filter(
status='published'
) | Post.objects.filter(
author=self.request.user
)
return queryset
def perform_update(self, serializer):
print(serializer.validated_data)
return super().perform_update(serializer)
# Obsługa komentarzy do postów i stron
"""
API endpoint for managing comments on posts and pages.
list:
Return a list of all comments. Results can be filtered by:
- post_id
- page_id
- author
- approved status
create:
Create a new comment. Requires authentication.
retrieve:
Return the details of a specific comment by ID.
update:
Update all fields of a specific comment. Requires authentication and appropriate permissions.
partial_update:
Update one or more fields of a specific comment. Requires authentication and appropriate permissions.
destroy:
Delete a specific comment. Requires authentication and appropriate permissions.
"""
class CommentViewSet(viewsets.ModelViewSet):
queryset = Comment.objects.all()
serializer_class = CommentSerializer
permission_classes = [IsAuthorOrReadOnly]
filter_backends = [DjangoFilterBackend, filters.OrderingFilter]
filterset_fields = ['post', 'page', 'author', 'approved']
ordering_fields = ['created_at']
ordering = ['-created_at']
def perform_create(self, serializer):
serializer.save(author=self.request.user)
def get_queryset(self):
queryset = Comment.objects.all()
if not self.request.user.is_authenticated:
queryset = queryset.filter(approved=True)
elif not self.request.user.role in ['admin', 'moderator']:
queryset = queryset.filter(
approved=True
) | Comment.objects.filter(
author=self.request.user
)
return queryset
#obsługa widoków dla stron
class PageViewSet(viewsets.ModelViewSet):
queryset = Page.objects.all()
serializer_class = PageSerializer
permission_classes = [IsEditorOrAdmin]
filter_backends = [DjangoFilterBackend, filters.SearchFilter, filters.OrderingFilter]
filterset_fields = ['status', 'author', 'category']
search_fields = ['title', 'content']
ordering_fields = ['created_at', 'updated_at', 'title']
ordering = ['-created_at']
def perform_create(self, serializer):
serializer.save(author=self.request.user)
def get_queryset(self):
queryset = Page.objects.all()
if not self.request.user.is_authenticated:
queryset = queryset.filter(status='published')
elif not self.request.user.role in ['admin', 'moderator']:
queryset = queryset.filter(
status='published'
) | Page.objects.filter(
author=self.request.user
)
return queryset
# Obsługa kategorii
class CategoryViewSet(viewsets.ModelViewSet):
queryset = Category.objects.all()
serializer_class = CategorySerializer
permission_classes = [IsAuthenticatedOrReadOnly]
filter_backends = [filters.SearchFilter]
search_fields = ['name', 'slug']
def get_queryset(self):
return Category.objects.all()
# Obsługa przesyłania obrazów
class UploadedImageViewSet(viewsets.ModelViewSet):
serializer_class = UploadedImageSerializer
parser_classes = (JSONParser, MultiPartParser, FormParser)
permission_classes = [IsAuthenticated]
def get_queryset(self):
return UploadedImage.objects.filter(user=self.request.user)
def get_serializer_context(self):
context = super().get_serializer_context()
context['request'] = self.request
return context
def perform_create(self, serializer):
user = self.request.user
# Admin? zero limitów
if not user.is_staff:
# Limit: 5MB dziennie
DAILY_UPLOAD_LIMIT = 5 * 1024 * 1024
current_size = UploadedImage.get_user_daily_upload_size(user)
file_size = self.request.FILES['image'].size
if current_size + file_size > DAILY_UPLOAD_LIMIT:
raise serializers.ValidationError({
'image': 'Daily upload limit (5MB) exceeded. Try again tomorrow.'
})
# Limit 1 plik / minutę
one_minute_ago = timezone.now() - timezone.timedelta(minutes=1)
if UploadedImage.objects.filter(user=user, uploaded_at__gte=one_minute_ago).exists():
raise serializers.ValidationError({
'image': 'Upload limit: 1 image per minute.'
})
serializer.save(user=user, file_size=self.request.FILES['image'].size)
def perform_update(self, serializer):
instance = serializer.instance
if instance.user != self.request.user and not self.request.user.is_staff:
raise serializers.ValidationError("Brak uprawnień do edycji tego pliku.")
serializer.save()
def destroy(self, request, *args, **kwargs):
obj = self.get_object()
if obj.user != request.user and not request.user.is_staff:
return Response({'detail': 'Brak uprawnień do usunięcia.'}, status=403)
# Usuń fizyczny plik z dysku
image_path = obj.image.path
if os.path.isfile(image_path):
os.remove(image_path)
obj.delete()
return Response({'detail': 'Plik usunięty.'}, status=200)
@action(detail=False, methods=['get'])
def quota(self, request):
DAILY_UPLOAD_LIMIT = 5 * 1024 * 1024
current_size = UploadedImage.get_user_daily_upload_size(request.user)
remaining = max(0, DAILY_UPLOAD_LIMIT - current_size)
return Response({
'daily_limit': DAILY_UPLOAD_LIMIT,
'used': current_size,
'remaining': remaining,
'daily_limit_display': f"{DAILY_UPLOAD_LIMIT / (1024 * 1024):.1f}MB",
'used_display': f"{current_size / (1024 * 1024):.1f}MB",
'remaining_display': f"{remaining / (1024 * 1024):.1f}MB"
})
# Obsługa tagów
class TagViewSet(viewsets.ModelViewSet):
queryset = Tag.objects.all()
serializer_class = TagSerializer
permission_classes = [IsAuthenticatedOrReadOnly]
filter_backends = [filters.SearchFilter]
search_fields = ['name', 'slug']
class SearchView(APIView):
permission_classes = [AllowAny]
def get(self, request):
query = request.GET.get('q', '').strip()
print("🔎 query:", query)
if not query:
return Response([], status=status.HTTP_200_OK)
results = []
# Szukaj w postach
post_matches = Post.objects.filter(
Q(title__icontains=query) | Q(content__icontains=query)
)[:5]
for post in post_matches:
results.append({
'id': post.id,
'title': post.title,
'slug': post.slug,
'type': 'post'
})
# Szukaj w tagach
tag_matches = Tag.objects.filter(
Q(name__icontains=query)
)[:5]
for tag in tag_matches:
results.append({
'id': tag.id,
'title': tag.name,
'slug': tag.slug,
'type': 'tag'
})
# Szukaj w użytkownikach
User = get_user_model()
user_matches = User.objects.filter(
Q(username__icontains=query) | Q(email__icontains=query)
)[:5]
for user in user_matches:
results.append({
'id': user.id,
'title': user.username,
'slug': str(user.id), # lub `username` jeśli masz slug
'type': 'user'
})
return Response(results)