#!/usr/bin/env python3 """ 测试文件上传功能的脚本 """ import requests import json from pathlib import Path # API基础URL BASE_URL = "http://localhost:8000/api/v1" def test_file_upload(): """测试文件上传功能""" # 1. 先注册或登录用户获取token print("1. 测试用户登录...") login_data = { "username": "testuser", "password": "testpass123" } try: # 尝试登录 response = requests.post(f"{BASE_URL}/auth/login", json=login_data) if response.status_code == 200: result = response.json() if result.get("success"): token = result["data"]["tokens"]["access_token"] print("[OK] 登录成功") else: print("[ERROR] 登录失败,尝试注册...") # 如果登录失败,先注册 register_data = { "username": "testuser", "email": "test@example.com", "password": "testpass123" } register_response = requests.post(f"{BASE_URL}/auth/register", json=register_data) if register_response.status_code == 201: register_result = register_response.json() if register_result.get("success"): token = register_result["data"]["tokens"]["access_token"] print("✓ 注册成功") else: print(f"✗ 注册失败: {register_result}") return else: print(f"✗ 注册请求失败: {register_response.status_code}") return # 注册后再次登录 login_response = requests.post(f"{BASE_URL}/auth/login", json=login_data) if login_response.status_code == 200: login_result = login_response.json() if login_result.get("success"): token = login_result["data"]["tokens"]["access_token"] print("✓ 登录成功") else: print(f"登录请求失败: {response.status_code}") return except Exception as e: print(f"认证过程出错: {e}") return # 设置认证头 headers = { "Authorization": f"Bearer {token}" } # 2. 创建测试文件 print("\n2. 创建测试文件...") test_file_path = "test_file.txt" test_content = "这是一个测试文件内容\nHello, World!\n测试文件上传功能" try: with open(test_file_path, "w", encoding="utf-8") as f: f.write(test_content) print(f"✓ 创建测试文件: {test_file_path}") except Exception as e: print(f"✗ 创建测试文件失败: {e}") return # 3. 测试文件上传 print("\n3. 测试文件上传...") try: with open(test_file_path, "rb") as f: files = { "file": (test_file_path, f, "text/plain") } data = { "description": "测试文件描述", "tags": "测试,文件,上传", "is_public": "false" } upload_response = requests.post( f"{BASE_URL}/files/upload", headers=headers, files=files, data=data ) if upload_response.status_code == 201: upload_result = upload_response.json() if upload_result.get("success"): file_info = upload_result["data"]["file"] print(f"✓ 文件上传成功!") print(f" 文件ID: {file_info['id']}") print(f" 文件名: {file_info['original_filename']}") print(f" 文件大小: {file_info['file_size']} 字节") # 保存文件ID用于后续测试 file_id = file_info["id"] else: print(f"✗ 文件上传失败: {upload_result}") return else: print(f"✗ 文件上传请求失败: {upload_response.status_code}") print(f"响应内容: {upload_response.text}") return except Exception as e: print(f"✗ 文件上传过程出错: {e}") return # 4. 测试获取文件列表 print("\n4. 测试获取文件列表...") try: list_response = requests.get( f"{BASE_URL}/files/list", headers=headers ) if list_response.status_code == 200: list_result = list_response.json() if list_result.get("success"): files_list = list_result["data"]["files"] pagination = list_result["data"]["pagination"] print(f"✓ 获取文件列表成功!") print(f" 文件数量: {len(files_list)}") print(f" 总数: {pagination['total']}") for file_item in files_list: print(f" - {file_item['original_filename']} ({file_item['file_size']} 字节)") else: print(f"✗ 获取文件列表失败: {list_result}") else: print(f"✗ 获取文件列表请求失败: {list_response.status_code}") except Exception as e: print(f"✗ 获取文件列表过程出错: {e}") # 5. 测试获取存储信息 print("\n5. 测试获取存储信息...") try: storage_response = requests.get( f"{BASE_URL}/files/storage/info", headers=headers ) if storage_response.status_code == 200: storage_result = storage_response.json() if storage_result.get("success"): storage_info = storage_result["data"] print(f"✓ 获取存储信息成功!") print(f" 总配额: {storage_info['total_quota']} 字节") print(f" 已使用: {storage_info['used_space']} 字节") print(f" 可用空间: {storage_info['available_space']} 字节") print(f" 使用百分比: {storage_info['usage_percentage']:.2f}%") print(f" 文件数量: {storage_info['file_count']}") else: print(f"✗ 获取存储信息失败: {storage_result}") else: print(f"✗ 获取存储信息请求失败: {storage_response.status_code}") except Exception as e: print(f"✗ 获取存储信息过程出错: {e}") # 6. 测试文件下载 print("\n6. 测试文件下载...") try: download_response = requests.get( f"{BASE_URL}/files/{file_id}/download", headers=headers ) if download_response.status_code == 200: print(f"✓ 文件下载成功!") print(f" 下载内容长度: {len(download_response.content)} 字节") # 保存下载的文件 downloaded_file_path = "downloaded_test_file.txt" with open(downloaded_file_path, "wb") as f: f.write(download_response.content) print(f" 已保存为: {downloaded_file_path}") else: print(f"✗ 文件下载请求失败: {download_response.status_code}") except Exception as e: print(f"✗ 文件下载过程出错: {e}") # 7. 清理测试文件 print("\n7. 清理测试文件...") try: Path(test_file_path).unlink(missing_ok=True) Path("downloaded_test_file.txt").unlink(missing_ok=True) print("✓ 清理完成") except Exception as e: print(f"✗ 清理过程出错: {e}") print("\n🎉 文件存储功能测试完成!") if __name__ == "__main__": test_file_upload()