200 lines
6.5 KiB
Python
200 lines
6.5 KiB
Python
"""
|
||
Narzędzia do bezpiecznego zapisu, odczytu i usuwania plików z dysku.
|
||
Wszystkie pliki trafiają do katalogu settings.files_base_dir/<subfolder>/.
|
||
"""
|
||
from __future__ import annotations
|
||
|
||
import os
|
||
import uuid
|
||
import mimetypes
|
||
from io import BytesIO
|
||
from pathlib import Path
|
||
|
||
from fastapi import HTTPException, UploadFile
|
||
|
||
from .config import settings
|
||
|
||
try:
|
||
from PIL import Image, ImageOps
|
||
except Exception: # pragma: no cover - handled at runtime if Pillow is missing
|
||
Image = None
|
||
ImageOps = None
|
||
|
||
# Dozwolone rozszerzenia plików (muszą pasować do allowed_mime_types)
|
||
_SAFE_EXTENSIONS: frozenset[str] = frozenset(
|
||
{
|
||
".jpg", ".jpeg", ".png", ".gif", ".webp",
|
||
".pdf", ".txt", ".zip",
|
||
".mp4", ".mp3", ".wav",
|
||
".doc", ".docx", ".xls", ".xlsx",
|
||
}
|
||
)
|
||
|
||
|
||
def _sanitize_subfolder(subfolder: str) -> str:
|
||
"""Usuwa '..' i nadmiarowe slashe, zwraca bezpieczną ścieżkę względną."""
|
||
parts = [p for p in subfolder.replace("\\", "/").split("/") if p and p != ".."]
|
||
return "/".join(parts)
|
||
|
||
|
||
def _generate_stored_name(original_name: str, forced_ext: str | None = None) -> str:
|
||
"""Zwraca UUID + bezpieczne rozszerzenie."""
|
||
ext = (forced_ext or Path(original_name).suffix).lower()
|
||
if ext and not ext.startswith("."):
|
||
ext = "." + ext
|
||
if ext not in _SAFE_EXTENSIONS:
|
||
ext = ".bin"
|
||
return f"{uuid.uuid4().hex}{ext}"
|
||
|
||
|
||
def _convert_image_to_webp(raw_data: bytes, max_side: int, quality: int) -> tuple[bytes, int]:
|
||
"""
|
||
Konwertuje obraz wejściowy do WebP, usuwa metadata i opcjonalnie zmniejsza.
|
||
Zwraca: (bytes_webp, final_bytes).
|
||
"""
|
||
if Image is None or ImageOps is None:
|
||
raise HTTPException(status_code=500, detail="Brak biblioteki Pillow do konwersji obrazów")
|
||
|
||
try:
|
||
with Image.open(BytesIO(raw_data)) as img:
|
||
# Szanuj orientację EXIF i usuń metadata przez ponowny zapis.
|
||
img = ImageOps.exif_transpose(img)
|
||
|
||
if max_side > 0:
|
||
img.thumbnail((max_side, max_side), Image.Resampling.LANCZOS)
|
||
|
||
save_image = img
|
||
if img.mode not in ("RGB", "RGBA"):
|
||
# WEBP wspiera alfa, więc zostawiamy RGBA jeśli jest przezroczystość.
|
||
save_image = img.convert("RGBA" if "A" in img.getbands() else "RGB")
|
||
|
||
output = BytesIO()
|
||
save_image.save(
|
||
output,
|
||
format="WEBP",
|
||
quality=max(1, min(100, quality)),
|
||
method=6,
|
||
optimize=True,
|
||
)
|
||
data = output.getvalue()
|
||
return data, len(data)
|
||
except HTTPException:
|
||
raise
|
||
except Exception as exc:
|
||
raise HTTPException(status_code=415, detail=f"Nieprawidłowy obraz: {exc}") from exc
|
||
|
||
|
||
def _resolve_safe_path(subfolder: str, filename: str) -> Path:
|
||
"""
|
||
Zwraca bezwzględną ścieżkę do pliku.
|
||
Rzuca HTTPException 403 jeśli ścieżka wychodzi poza files_base_dir.
|
||
"""
|
||
safe_sub = _sanitize_subfolder(subfolder)
|
||
safe_name = os.path.basename(filename) # tylko nazwa pliku – bez katalogów
|
||
|
||
candidate = (settings.files_base_dir / safe_sub / safe_name).resolve()
|
||
base = settings.files_base_dir.resolve()
|
||
|
||
if not str(candidate).startswith(str(base) + os.sep) and candidate != base:
|
||
raise HTTPException(status_code=403, detail="Niedozwolona ścieżka do pliku")
|
||
|
||
return candidate
|
||
|
||
|
||
async def save_file(
|
||
upload: UploadFile,
|
||
subfolder: str,
|
||
*,
|
||
convert_image_to_webp: bool = False,
|
||
image_max_side: int = 512,
|
||
image_quality: int = 82,
|
||
) -> dict:
|
||
"""
|
||
Zapisuje przesłany plik na dysk w katalogu files_base_dir/<subfolder>/.
|
||
Zwraca słownik z metadanymi zapisanego pliku.
|
||
"""
|
||
content_type = (upload.content_type or "application/octet-stream").split(";")[0].strip()
|
||
|
||
if content_type not in settings.allowed_mime_types:
|
||
raise HTTPException(
|
||
status_code=415,
|
||
detail=f"Nieobsługiwany typ pliku: {content_type}",
|
||
)
|
||
|
||
safe_sub = _sanitize_subfolder(subfolder)
|
||
target_dir = settings.files_base_dir / safe_sub
|
||
target_dir.mkdir(parents=True, exist_ok=True)
|
||
|
||
stored_name = _generate_stored_name(
|
||
upload.filename or "file",
|
||
forced_ext=".webp" if convert_image_to_webp and content_type.startswith("image/") else None,
|
||
)
|
||
file_path = target_dir / stored_name
|
||
|
||
max_bytes = settings.max_file_size_mb * 1024 * 1024
|
||
size = 0
|
||
data = bytearray()
|
||
|
||
try:
|
||
while True:
|
||
chunk = await upload.read(65_536)
|
||
if not chunk:
|
||
break
|
||
size += len(chunk)
|
||
if size > max_bytes:
|
||
file_path.unlink(missing_ok=True)
|
||
raise HTTPException(
|
||
status_code=413,
|
||
detail=f"Plik przekracza limit {settings.max_file_size_mb} MB",
|
||
)
|
||
data.extend(chunk)
|
||
|
||
if convert_image_to_webp and content_type.startswith("image/"):
|
||
converted_bytes, converted_size = _convert_image_to_webp(
|
||
bytes(data),
|
||
max_side=image_max_side,
|
||
quality=image_quality,
|
||
)
|
||
with open(file_path, "wb") as f:
|
||
f.write(converted_bytes)
|
||
size = converted_size
|
||
content_type = "image/webp"
|
||
else:
|
||
with open(file_path, "wb") as f:
|
||
f.write(data)
|
||
except HTTPException:
|
||
raise
|
||
except OSError as exc:
|
||
raise HTTPException(status_code=500, detail=f"Błąd zapisu pliku: {exc}") from exc
|
||
|
||
return {
|
||
"stored_name": stored_name,
|
||
"original_name": upload.filename or "plik",
|
||
"mime": content_type,
|
||
"size": size,
|
||
"path": f"{safe_sub}/{stored_name}",
|
||
}
|
||
|
||
|
||
def delete_stored_file(subfolder: str, filename: str) -> bool:
|
||
"""Usuwa plik z dysku. Zwraca True jeśli istniał."""
|
||
try:
|
||
file_path = _resolve_safe_path(subfolder, filename)
|
||
except HTTPException:
|
||
return False
|
||
|
||
if file_path.exists():
|
||
file_path.unlink()
|
||
return True
|
||
return False
|
||
|
||
|
||
def get_stored_file_path(subfolder: str, filename: str) -> Path:
|
||
"""
|
||
Zwraca ścieżkę do pliku lub rzuca HTTPException 404/403.
|
||
"""
|
||
file_path = _resolve_safe_path(subfolder, filename)
|
||
if not file_path.exists():
|
||
raise HTTPException(status_code=404, detail="Plik nie istnieje")
|
||
return file_path
|