Files
full-stack-doc/backend/test_file_upload.py
2025-10-14 20:05:29 +08:00

214 lines
7.7 KiB
Python

#!/usr/bin/env python3
"""
测试文件上传功能的脚本
"""
import requests
import json
from pathlib import Path
# API基础URL
BASE_URL = "http://localhost:8000/api/v1"
def test_file_upload():
"""测试文件上传功能"""
# 1. 先注册或登录用户获取token
print("1. 测试用户登录...")
login_data = {
"username": "testuser",
"password": "testpass123"
}
try:
# 尝试登录
response = requests.post(f"{BASE_URL}/auth/login", json=login_data)
if response.status_code == 200:
result = response.json()
if result.get("success"):
token = result["data"]["tokens"]["access_token"]
print("[OK] 登录成功")
else:
print("[ERROR] 登录失败,尝试注册...")
# 如果登录失败,先注册
register_data = {
"username": "testuser",
"email": "test@example.com",
"password": "testpass123"
}
register_response = requests.post(f"{BASE_URL}/auth/register", json=register_data)
if register_response.status_code == 201:
register_result = register_response.json()
if register_result.get("success"):
token = register_result["data"]["tokens"]["access_token"]
print("✓ 注册成功")
else:
print(f"✗ 注册失败: {register_result}")
return
else:
print(f"✗ 注册请求失败: {register_response.status_code}")
return
# 注册后再次登录
login_response = requests.post(f"{BASE_URL}/auth/login", json=login_data)
if login_response.status_code == 200:
login_result = login_response.json()
if login_result.get("success"):
token = login_result["data"]["tokens"]["access_token"]
print("✓ 登录成功")
else:
print(f"登录请求失败: {response.status_code}")
return
except Exception as e:
print(f"认证过程出错: {e}")
return
# 设置认证头
headers = {
"Authorization": f"Bearer {token}"
}
# 2. 创建测试文件
print("\n2. 创建测试文件...")
test_file_path = "test_file.txt"
test_content = "这是一个测试文件内容\nHello, World!\n测试文件上传功能"
try:
with open(test_file_path, "w", encoding="utf-8") as f:
f.write(test_content)
print(f"✓ 创建测试文件: {test_file_path}")
except Exception as e:
print(f"✗ 创建测试文件失败: {e}")
return
# 3. 测试文件上传
print("\n3. 测试文件上传...")
try:
with open(test_file_path, "rb") as f:
files = {
"file": (test_file_path, f, "text/plain")
}
data = {
"description": "测试文件描述",
"tags": "测试,文件,上传",
"is_public": "false"
}
upload_response = requests.post(
f"{BASE_URL}/files/upload",
headers=headers,
files=files,
data=data
)
if upload_response.status_code == 201:
upload_result = upload_response.json()
if upload_result.get("success"):
file_info = upload_result["data"]["file"]
print(f"✓ 文件上传成功!")
print(f" 文件ID: {file_info['id']}")
print(f" 文件名: {file_info['original_filename']}")
print(f" 文件大小: {file_info['file_size']} 字节")
# 保存文件ID用于后续测试
file_id = file_info["id"]
else:
print(f"✗ 文件上传失败: {upload_result}")
return
else:
print(f"✗ 文件上传请求失败: {upload_response.status_code}")
print(f"响应内容: {upload_response.text}")
return
except Exception as e:
print(f"✗ 文件上传过程出错: {e}")
return
# 4. 测试获取文件列表
print("\n4. 测试获取文件列表...")
try:
list_response = requests.get(
f"{BASE_URL}/files/list",
headers=headers
)
if list_response.status_code == 200:
list_result = list_response.json()
if list_result.get("success"):
files_list = list_result["data"]["files"]
pagination = list_result["data"]["pagination"]
print(f"✓ 获取文件列表成功!")
print(f" 文件数量: {len(files_list)}")
print(f" 总数: {pagination['total']}")
for file_item in files_list:
print(f" - {file_item['original_filename']} ({file_item['file_size']} 字节)")
else:
print(f"✗ 获取文件列表失败: {list_result}")
else:
print(f"✗ 获取文件列表请求失败: {list_response.status_code}")
except Exception as e:
print(f"✗ 获取文件列表过程出错: {e}")
# 5. 测试获取存储信息
print("\n5. 测试获取存储信息...")
try:
storage_response = requests.get(
f"{BASE_URL}/files/storage/info",
headers=headers
)
if storage_response.status_code == 200:
storage_result = storage_response.json()
if storage_result.get("success"):
storage_info = storage_result["data"]
print(f"✓ 获取存储信息成功!")
print(f" 总配额: {storage_info['total_quota']} 字节")
print(f" 已使用: {storage_info['used_space']} 字节")
print(f" 可用空间: {storage_info['available_space']} 字节")
print(f" 使用百分比: {storage_info['usage_percentage']:.2f}%")
print(f" 文件数量: {storage_info['file_count']}")
else:
print(f"✗ 获取存储信息失败: {storage_result}")
else:
print(f"✗ 获取存储信息请求失败: {storage_response.status_code}")
except Exception as e:
print(f"✗ 获取存储信息过程出错: {e}")
# 6. 测试文件下载
print("\n6. 测试文件下载...")
try:
download_response = requests.get(
f"{BASE_URL}/files/{file_id}/download",
headers=headers
)
if download_response.status_code == 200:
print(f"✓ 文件下载成功!")
print(f" 下载内容长度: {len(download_response.content)} 字节")
# 保存下载的文件
downloaded_file_path = "downloaded_test_file.txt"
with open(downloaded_file_path, "wb") as f:
f.write(download_response.content)
print(f" 已保存为: {downloaded_file_path}")
else:
print(f"✗ 文件下载请求失败: {download_response.status_code}")
except Exception as e:
print(f"✗ 文件下载过程出错: {e}")
# 7. 清理测试文件
print("\n7. 清理测试文件...")
try:
Path(test_file_path).unlink(missing_ok=True)
Path("downloaded_test_file.txt").unlink(missing_ok=True)
print("✓ 清理完成")
except Exception as e:
print(f"✗ 清理过程出错: {e}")
print("\n🎉 文件存储功能测试完成!")
if __name__ == "__main__":
test_file_upload()