""" Narzędzia do bezpiecznego zapisu, odczytu i usuwania plików z dysku. Wszystkie pliki trafiają do katalogu settings.files_base_dir//. """ from __future__ import annotations import os import uuid import mimetypes from io import BytesIO from pathlib import Path from fastapi import HTTPException, UploadFile from .config import settings try: from PIL import Image, ImageOps except Exception: # pragma: no cover - handled at runtime if Pillow is missing Image = None ImageOps = None # Dozwolone rozszerzenia plików (muszą pasować do allowed_mime_types) _SAFE_EXTENSIONS: frozenset[str] = frozenset( { ".jpg", ".jpeg", ".png", ".gif", ".webp", ".pdf", ".txt", ".zip", ".mp4", ".mp3", ".wav", ".doc", ".docx", ".xls", ".xlsx", } ) def _sanitize_subfolder(subfolder: str) -> str: """Usuwa '..' i nadmiarowe slashe, zwraca bezpieczną ścieżkę względną.""" parts = [p for p in subfolder.replace("\\", "/").split("/") if p and p != ".."] return "/".join(parts) def _generate_stored_name(original_name: str, forced_ext: str | None = None) -> str: """Zwraca UUID + bezpieczne rozszerzenie.""" ext = (forced_ext or Path(original_name).suffix).lower() if ext and not ext.startswith("."): ext = "." + ext if ext not in _SAFE_EXTENSIONS: ext = ".bin" return f"{uuid.uuid4().hex}{ext}" def _convert_image_to_webp(raw_data: bytes, max_side: int, quality: int) -> tuple[bytes, int]: """ Konwertuje obraz wejściowy do WebP, usuwa metadata i opcjonalnie zmniejsza. Zwraca: (bytes_webp, final_bytes). """ if Image is None or ImageOps is None: raise HTTPException(status_code=500, detail="Brak biblioteki Pillow do konwersji obrazów") try: with Image.open(BytesIO(raw_data)) as img: # Szanuj orientację EXIF i usuń metadata przez ponowny zapis. img = ImageOps.exif_transpose(img) if max_side > 0: img.thumbnail((max_side, max_side), Image.Resampling.LANCZOS) save_image = img if img.mode not in ("RGB", "RGBA"): # WEBP wspiera alfa, więc zostawiamy RGBA jeśli jest przezroczystość. save_image = img.convert("RGBA" if "A" in img.getbands() else "RGB") output = BytesIO() save_image.save( output, format="WEBP", quality=max(1, min(100, quality)), method=6, optimize=True, ) data = output.getvalue() return data, len(data) except HTTPException: raise except Exception as exc: raise HTTPException(status_code=415, detail=f"Nieprawidłowy obraz: {exc}") from exc def _resolve_safe_path(subfolder: str, filename: str) -> Path: """ Zwraca bezwzględną ścieżkę do pliku. Rzuca HTTPException 403 jeśli ścieżka wychodzi poza files_base_dir. """ safe_sub = _sanitize_subfolder(subfolder) safe_name = os.path.basename(filename) # tylko nazwa pliku – bez katalogów candidate = (settings.files_base_dir / safe_sub / safe_name).resolve() base = settings.files_base_dir.resolve() if not str(candidate).startswith(str(base) + os.sep) and candidate != base: raise HTTPException(status_code=403, detail="Niedozwolona ścieżka do pliku") return candidate async def save_file( upload: UploadFile, subfolder: str, *, convert_image_to_webp: bool = False, image_max_side: int = 512, image_quality: int = 82, ) -> dict: """ Zapisuje przesłany plik na dysk w katalogu files_base_dir//. Zwraca słownik z metadanymi zapisanego pliku. """ content_type = (upload.content_type or "application/octet-stream").split(";")[0].strip() if content_type not in settings.allowed_mime_types: raise HTTPException( status_code=415, detail=f"Nieobsługiwany typ pliku: {content_type}", ) safe_sub = _sanitize_subfolder(subfolder) target_dir = settings.files_base_dir / safe_sub target_dir.mkdir(parents=True, exist_ok=True) stored_name = _generate_stored_name( upload.filename or "file", forced_ext=".webp" if convert_image_to_webp and content_type.startswith("image/") else None, ) file_path = target_dir / stored_name max_bytes = settings.max_file_size_mb * 1024 * 1024 size = 0 data = bytearray() try: while True: chunk = await upload.read(65_536) if not chunk: break size += len(chunk) if size > max_bytes: file_path.unlink(missing_ok=True) raise HTTPException( status_code=413, detail=f"Plik przekracza limit {settings.max_file_size_mb} MB", ) data.extend(chunk) if convert_image_to_webp and content_type.startswith("image/"): converted_bytes, converted_size = _convert_image_to_webp( bytes(data), max_side=image_max_side, quality=image_quality, ) with open(file_path, "wb") as f: f.write(converted_bytes) size = converted_size content_type = "image/webp" else: with open(file_path, "wb") as f: f.write(data) except HTTPException: raise except OSError as exc: raise HTTPException(status_code=500, detail=f"Błąd zapisu pliku: {exc}") from exc return { "stored_name": stored_name, "original_name": upload.filename or "plik", "mime": content_type, "size": size, "path": f"{safe_sub}/{stored_name}", } def delete_stored_file(subfolder: str, filename: str) -> bool: """Usuwa plik z dysku. Zwraca True jeśli istniał.""" try: file_path = _resolve_safe_path(subfolder, filename) except HTTPException: return False if file_path.exists(): file_path.unlink() return True return False def get_stored_file_path(subfolder: str, filename: str) -> Path: """ Zwraca ścieżkę do pliku lub rzuca HTTPException 404/403. """ file_path = _resolve_safe_path(subfolder, filename) if not file_path.exists(): raise HTTPException(status_code=404, detail="Plik nie istnieje") return file_path