Files
2025-10-14 20:05:29 +08:00

383 lines
13 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from fastapi import APIRouter, Depends, HTTPException, status, UploadFile, File, Form, Query, Request
from fastapi.responses import FileResponse
from sqlalchemy.orm import Session
from typing import Optional, List
from datetime import datetime
from app.core.database import get_db
from app.services.file_service import FileService
from app.schemas.file import (
FileUploadRequest, FileUpdateRequest, FileSearchRequest,
FileResponse, FileListResponse, FileInfo, StorageInfo, ApiResponse,
UploadResponse, DeleteResponse, FileListRequest, FileIdRequest, StorageInfoRequest
)
from app.models.user import User
from app.dependencies.auth import get_current_user_from_headers
from app.exceptions.file import (
FileTooLargeException, StorageQuotaExceededException,
FileAlreadyExistsException, FileNotFoundException,
InvalidFileTypeException, FileUploadException, FileDeleteException
)
print(f"[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] INFO: [MODULE] Files module loaded successfully")
router = APIRouter()
@router.post("/upload", response_model=ApiResponse, status_code=status.HTTP_201_CREATED)
async def upload_file(
file: UploadFile = File(...),
description: Optional[str] = Form(None),
tags: Optional[str] = Form(None),
is_public: bool = Form(False),
current_user: User = Depends(get_current_user_from_headers),
db: Session = Depends(get_db)
):
"""上传文件"""
try:
file_service = FileService(db)
# 创建上传请求对象
upload_request = FileUploadRequest(
description=description,
tags=tags,
is_public=is_public
)
# 上传文件
db_file = file_service.upload_file(file, current_user, upload_request)
# 转换为响应格式
file_response = FileResponse(
id=db_file.id,
user_id=db_file.user_id,
filename=db_file.filename,
original_filename=db_file.original_filename,
file_size=db_file.file_size,
mime_type=db_file.mime_type,
file_hash=db_file.file_hash,
is_public=db_file.is_public,
download_count=db_file.download_count,
description=db_file.description,
tags=db_file.tags,
created_at=db_file.created_at,
updated_at=db_file.updated_at,
last_accessed_at=db_file.last_accessed_at
)
return ApiResponse(
success=True,
message="文件上传成功",
data={
"file": file_response.model_dump()
}
)
except (FileTooLargeException, StorageQuotaExceededException,
FileAlreadyExistsException, InvalidFileTypeException) as e:
raise e
except Exception as e:
raise FileUploadException(str(e))
@router.get("/list", response_model=ApiResponse)
def get_user_files(
page: int = Query(1, ge=1),
size: int = Query(10, ge=1, le=100),
current_user: User = Depends(get_current_user_from_headers),
db: Session = Depends(get_db)
):
"""获取用户文件列表"""
try:
file_service = FileService(db)
file_list = file_service.get_user_files(current_user.id, page, size)
return ApiResponse(
success=True,
message="获取文件列表成功",
data={
"files": [FileResponse(
id=file.id,
user_id=file.user_id,
filename=file.filename,
original_filename=file.original_filename,
file_size=file.file_size,
mime_type=file.mime_type,
file_hash=file.file_hash,
is_public=file.is_public,
download_count=file.download_count,
description=file.description,
tags=file.tags,
created_at=file.created_at,
updated_at=file.updated_at,
last_accessed_at=file.last_accessed_at
).model_dump() for file in file_list.files],
"pagination": {
"total": file_list.total,
"page": file_list.page,
"size": file_list.size,
"pages": file_list.pages
}
}
)
except Exception as e:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail={
"code": "GET_FILES_FAILED",
"message": "获取文件列表失败"
}
)
@router.post("/info", response_model=ApiResponse)
def get_file_info(
request: FileIdRequest,
db: Session = Depends(get_db)
):
"""获取文件详细信息"""
try:
file_service = FileService(db)
file_info = file_service.get_file_info(request.file_id, request.user_id)
return ApiResponse(
success=True,
message="获取文件信息成功",
data=file_info.model_dump()
)
except FileNotFoundException as e:
raise e
except Exception as e:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail={
"code": "GET_FILE_INFO_FAILED",
"message": "获取文件信息失败"
}
)
@router.post("/update", response_model=ApiResponse)
def update_file(
file_id_request: FileIdRequest,
update_request: FileUpdateRequest,
db: Session = Depends(get_db)
):
"""更新文件信息"""
try:
file_service = FileService(db)
db_file = file_service.update_file(file_id_request.file_id, file_id_request.user_id, update_request)
file_response = FileResponse(
id=db_file.id,
user_id=db_file.user_id,
filename=db_file.filename,
original_filename=db_file.original_filename,
file_size=db_file.file_size,
mime_type=db_file.mime_type,
file_hash=db_file.file_hash,
is_public=db_file.is_public,
download_count=db_file.download_count,
description=db_file.description,
tags=db_file.tags,
created_at=db_file.created_at,
updated_at=db_file.updated_at,
last_accessed_at=db_file.last_accessed_at
)
return ApiResponse(
success=True,
message="文件信息更新成功",
data={
"file": file_response.model_dump()
}
)
except FileNotFoundException as e:
raise e
except Exception as e:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail={
"code": "UPDATE_FILE_FAILED",
"message": "更新文件信息失败"
}
)
@router.post("/delete", response_model=ApiResponse)
def delete_file(
request: FileIdRequest,
db: Session = Depends(get_db)
):
"""删除文件"""
try:
file_service = FileService(db)
success = file_service.delete_file(request.file_id, request.user_id)
if success:
return ApiResponse(
success=True,
message="文件删除成功",
data={}
)
else:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail={
"code": "DELETE_FILE_FAILED",
"message": "文件删除失败"
}
)
except FileNotFoundException as e:
raise e
except Exception as e:
raise FileDeleteException(str(e))
@router.post("/download")
def download_file(
request: FileIdRequest,
db: Session = Depends(get_db)
):
"""下载文件"""
print(f"[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] INFO: [IN] Processing download request: file_id={request.file_id}, user_id={request.user_id}")
try:
file_service = FileService(db)
print(f"[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] DEBUG: File service created")
db_file = file_service.get_file_by_id(request.file_id, request.user_id)
print(f"[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] DEBUG: File found in database: {db_file is not None}")
if not db_file:
print(f"[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] ERROR: File not found in database")
raise FileNotFoundException()
# 增加下载次数
print(f"[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] DEBUG: Incrementing download count")
file_service.increment_download_count(request.file_id)
# 确保使用绝对路径
import os
absolute_path = os.path.abspath(db_file.file_path)
print(f"[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] INFO: [PATH] File path: {absolute_path}")
# 验证文件存在
if not os.path.exists(absolute_path):
print(f"[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] ERROR: File not found on disk: {absolute_path}")
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail={
"code": "FILE_NOT_FOUND_ON_DISK",
"message": "upload文件夹与数据库不匹配"
}
)
print(f"[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] INFO: [OK] File exists on disk")
# 对于文本文件,直接读取内容返回
if db_file.mime_type and db_file.mime_type.startswith('text/'):
print(f"[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] INFO: [TEXT] Processing text file")
with open(absolute_path, 'r', encoding='utf-8') as f:
content = f.read()
print(f"[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] INFO: [READ] Read {len(content)} characters from file")
from fastapi.responses import Response
import urllib.parse
# 对文件名进行URL编码以支持中文
encoded_filename = urllib.parse.quote(db_file.original_filename.encode('utf-8'))
print(f"[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] INFO: [SEND] Sending file: {db_file.original_filename}")
return Response(
content=content,
media_type=db_file.mime_type,
headers={
"Content-Disposition": f"attachment; filename*=UTF-8''{encoded_filename}"
}
)
else:
print(f"[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] INFO: [BINARY] Processing binary file with FileResponse")
# 对于二进制文件使用FileResponse
return FileResponse(
path=absolute_path,
filename=db_file.original_filename,
media_type=db_file.mime_type
)
except FileNotFoundException as e:
print(f"[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] ERROR: FileNotFoundException: {e}")
raise e
except Exception as e:
print(f"[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] ERROR: Download error: {e}")
import traceback
print(f"[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] ERROR: {traceback.format_exc()}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail={
"code": "DOWNLOAD_FILE_FAILED",
"message": "文件下载失败"
}
)
@router.post("/test-download")
def test_download_file():
"""测试下载功能 - 直接返回5KB文件内容"""
print("[TEST_DOWNLOAD] Test endpoint called", flush=True)
return {"message": "Test endpoint is working"}
@router.post("/simple-download")
def simple_download_file():
"""简单下载测试"""
try:
print("[SIMPLE_DOWNLOAD] Simple download endpoint called", flush=True)
# 直接读取我们创建的文件
file_path = "uploads/verified_5kb_download_test.txt"
import os
if not os.path.exists(file_path):
print(f"[SIMPLE_DOWNLOAD] File not found: {file_path}", flush=True)
return {"error": "File not found"}
with open(file_path, 'r', encoding='utf-8') as f:
content = f.read()
print(f"[SIMPLE_DOWNLOAD] Read {len(content)} characters", flush=True)
from fastapi.responses import Response
return Response(
content=content,
media_type="text/plain",
headers={"Content-Disposition": "attachment; filename=verified_5kb.txt"}
)
except Exception as e:
print(f"[SIMPLE_DOWNLOAD] Exception: {e}", flush=True)
import traceback
traceback.print_exc()
return {"error": str(e)}
@router.post("/storage/info", response_model=ApiResponse)
def get_storage_info(
request: StorageInfoRequest,
db: Session = Depends(get_db)
):
"""获取用户存储信息"""
try:
file_service = FileService(db)
storage_info = file_service.get_storage_info(request.user_id)
return ApiResponse(
success=True,
message="获取存储信息成功",
data=storage_info.model_dump()
)
except Exception as e:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail={
"code": "GET_STORAGE_INFO_FAILED",
"message": "获取存储信息失败"
}
)