togethere.cloud/api/storage.py

200 lines
6.5 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
Narzędzia do bezpiecznego zapisu, odczytu i usuwania plików z dysku.
Wszystkie pliki trafiają do katalogu settings.files_base_dir/<subfolder>/.
"""
from __future__ import annotations
import os
import uuid
import mimetypes
from io import BytesIO
from pathlib import Path
from fastapi import HTTPException, UploadFile
from .config import settings
try:
from PIL import Image, ImageOps
except Exception: # pragma: no cover - handled at runtime if Pillow is missing
Image = None
ImageOps = None
# Dozwolone rozszerzenia plików (muszą pasować do allowed_mime_types)
_SAFE_EXTENSIONS: frozenset[str] = frozenset(
{
".jpg", ".jpeg", ".png", ".gif", ".webp",
".pdf", ".txt", ".zip",
".mp4", ".mp3", ".wav",
".doc", ".docx", ".xls", ".xlsx",
}
)
def _sanitize_subfolder(subfolder: str) -> str:
"""Usuwa '..' i nadmiarowe slashe, zwraca bezpieczną ścieżkę względną."""
parts = [p for p in subfolder.replace("\\", "/").split("/") if p and p != ".."]
return "/".join(parts)
def _generate_stored_name(original_name: str, forced_ext: str | None = None) -> str:
"""Zwraca UUID + bezpieczne rozszerzenie."""
ext = (forced_ext or Path(original_name).suffix).lower()
if ext and not ext.startswith("."):
ext = "." + ext
if ext not in _SAFE_EXTENSIONS:
ext = ".bin"
return f"{uuid.uuid4().hex}{ext}"
def _convert_image_to_webp(raw_data: bytes, max_side: int, quality: int) -> tuple[bytes, int]:
"""
Konwertuje obraz wejściowy do WebP, usuwa metadata i opcjonalnie zmniejsza.
Zwraca: (bytes_webp, final_bytes).
"""
if Image is None or ImageOps is None:
raise HTTPException(status_code=500, detail="Brak biblioteki Pillow do konwersji obrazów")
try:
with Image.open(BytesIO(raw_data)) as img:
# Szanuj orientację EXIF i usuń metadata przez ponowny zapis.
img = ImageOps.exif_transpose(img)
if max_side > 0:
img.thumbnail((max_side, max_side), Image.Resampling.LANCZOS)
save_image = img
if img.mode not in ("RGB", "RGBA"):
# WEBP wspiera alfa, więc zostawiamy RGBA jeśli jest przezroczystość.
save_image = img.convert("RGBA" if "A" in img.getbands() else "RGB")
output = BytesIO()
save_image.save(
output,
format="WEBP",
quality=max(1, min(100, quality)),
method=6,
optimize=True,
)
data = output.getvalue()
return data, len(data)
except HTTPException:
raise
except Exception as exc:
raise HTTPException(status_code=415, detail=f"Nieprawidłowy obraz: {exc}") from exc
def _resolve_safe_path(subfolder: str, filename: str) -> Path:
"""
Zwraca bezwzględną ścieżkę do pliku.
Rzuca HTTPException 403 jeśli ścieżka wychodzi poza files_base_dir.
"""
safe_sub = _sanitize_subfolder(subfolder)
safe_name = os.path.basename(filename) # tylko nazwa pliku bez katalogów
candidate = (settings.files_base_dir / safe_sub / safe_name).resolve()
base = settings.files_base_dir.resolve()
if not str(candidate).startswith(str(base) + os.sep) and candidate != base:
raise HTTPException(status_code=403, detail="Niedozwolona ścieżka do pliku")
return candidate
async def save_file(
upload: UploadFile,
subfolder: str,
*,
convert_image_to_webp: bool = False,
image_max_side: int = 512,
image_quality: int = 82,
) -> dict:
"""
Zapisuje przesłany plik na dysk w katalogu files_base_dir/<subfolder>/.
Zwraca słownik z metadanymi zapisanego pliku.
"""
content_type = (upload.content_type or "application/octet-stream").split(";")[0].strip()
if content_type not in settings.allowed_mime_types:
raise HTTPException(
status_code=415,
detail=f"Nieobsługiwany typ pliku: {content_type}",
)
safe_sub = _sanitize_subfolder(subfolder)
target_dir = settings.files_base_dir / safe_sub
target_dir.mkdir(parents=True, exist_ok=True)
stored_name = _generate_stored_name(
upload.filename or "file",
forced_ext=".webp" if convert_image_to_webp and content_type.startswith("image/") else None,
)
file_path = target_dir / stored_name
max_bytes = settings.max_file_size_mb * 1024 * 1024
size = 0
data = bytearray()
try:
while True:
chunk = await upload.read(65_536)
if not chunk:
break
size += len(chunk)
if size > max_bytes:
file_path.unlink(missing_ok=True)
raise HTTPException(
status_code=413,
detail=f"Plik przekracza limit {settings.max_file_size_mb} MB",
)
data.extend(chunk)
if convert_image_to_webp and content_type.startswith("image/"):
converted_bytes, converted_size = _convert_image_to_webp(
bytes(data),
max_side=image_max_side,
quality=image_quality,
)
with open(file_path, "wb") as f:
f.write(converted_bytes)
size = converted_size
content_type = "image/webp"
else:
with open(file_path, "wb") as f:
f.write(data)
except HTTPException:
raise
except OSError as exc:
raise HTTPException(status_code=500, detail=f"Błąd zapisu pliku: {exc}") from exc
return {
"stored_name": stored_name,
"original_name": upload.filename or "plik",
"mime": content_type,
"size": size,
"path": f"{safe_sub}/{stored_name}",
}
def delete_stored_file(subfolder: str, filename: str) -> bool:
"""Usuwa plik z dysku. Zwraca True jeśli istniał."""
try:
file_path = _resolve_safe_path(subfolder, filename)
except HTTPException:
return False
if file_path.exists():
file_path.unlink()
return True
return False
def get_stored_file_path(subfolder: str, filename: str) -> Path:
"""
Zwraca ścieżkę do pliku lub rzuca HTTPException 404/403.
"""
file_path = _resolve_safe_path(subfolder, filename)
if not file_path.exists():
raise HTTPException(status_code=404, detail="Plik nie istnieje")
return file_path