from pydantic import BaseModel, EmailStr, Field, validator from typing import Optional from datetime import datetime import re class UserRegister(BaseModel): """用户注册请求模型""" username: str = Field(..., min_length=3, max_length=50, description="用户名") email: EmailStr = Field(..., description="邮箱地址") password: str = Field(..., min_length=6, max_length=128, description="密码") confirm_password: str = Field(..., min_length=6, max_length=128, description="确认密码") @validator('username') def validate_username(cls, v): """验证用户名格式""" if not re.match(r'^[a-zA-Z0-9_]+$', v): raise ValueError('用户名只能包含字母、数字和下划线') if v.startswith('_') or v.endswith('_'): raise ValueError('用户名不能以下划线开头或结尾') return v @validator('confirm_password') def passwords_match(cls, v, values): """验证密码确认""" if 'password' in values and v != values['password']: raise ValueError('两次输入的密码不一致') return v @validator('password') def validate_password_length(cls, v): """验证密码长度""" if len(v) <= 5: raise ValueError('密码长度必须大于5个字符') return v class UserLogin(BaseModel): """用户登录请求模型""" username: str = Field(..., description="用户名或邮箱") password: str = Field(..., min_length=1, description="密码") class TokenResponse(BaseModel): """令牌响应模型""" access_token: str = Field(..., description="访问令牌") refresh_token: str = Field(..., description="刷新令牌") token_type: str = Field(default="bearer", description="令牌类型") expires_in: int = Field(..., description="访问令牌过期时间(秒)") class UserResponse(BaseModel): """用户信息响应模型""" id: int username: str email: str avatar_url: Optional[str] = None storage_quota: int storage_used: int is_active: bool is_verified: bool last_login_at: Optional[datetime] = None created_at: datetime class Config: from_attributes = True class LoginResponse(BaseModel): """登录响应模型""" user: UserResponse = Field(..., description="用户信息") tokens: TokenResponse = Field(..., description="令牌信息") class TokenRefresh(BaseModel): """令牌刷新请求模型""" refresh_token: str = Field(..., description="刷新令牌") class PasswordChange(BaseModel): """修改密码请求模型""" current_password: str = Field(..., description="当前密码") new_password: str = Field(..., min_length=8, max_length=128, description="新密码") confirm_password: str = Field(..., min_length=8, max_length=128, description="确认新密码") @validator('confirm_password') def passwords_match(cls, v, values): """验证密码确认""" if 'new_password' in values and v != values['new_password']: raise ValueError('密码确认不匹配') return v @validator('new_password') def validate_password_strength(cls, v): """验证密码强度""" errors = [] if len(v) < 8: errors.append("密码长度至少8位") if not any(c.islower() for c in v): errors.append("密码必须包含至少一个小写字母") if not any(c.isupper() for c in v): errors.append("密码必须包含至少一个大写字母") if not any(c.isdigit() for c in v): errors.append("密码必须包含至少一个数字") special_chars = "!@#$%^&*()_+-=[]{}|;:,.<>?" if not any(c in special_chars for c in v): errors.append("密码必须包含至少一个特殊字符") if errors: raise ValueError('; '.join(errors)) return v class PasswordReset(BaseModel): """密码重置请求模型""" email: EmailStr = Field(..., description="邮箱地址") class PasswordResetConfirm(BaseModel): """密码重置确认模型""" token: str = Field(..., description="重置令牌") new_password: str = Field(..., min_length=8, max_length=128, description="新密码") confirm_password: str = Field(..., min_length=8, max_length=128, description="确认新密码") @validator('confirm_password') def passwords_match(cls, v, values): """验证密码确认""" if 'new_password' in values and v != values['new_password']: raise ValueError('密码确认不匹配') return v @validator('new_password') def validate_password_strength(cls, v): """验证密码强度""" errors = [] if len(v) < 8: errors.append("密码长度至少8位") if not any(c.islower() for c in v): errors.append("密码必须包含至少一个小写字母") if not any(c.isupper() for c in v): errors.append("密码必须包含至少一个大写字母") if not any(c.isdigit() for c in v): errors.append("密码必须包含至少一个数字") special_chars = "!@#$%^&*()_+-=[]{}|;:,.<>?" if not any(c in special_chars for c in v): errors.append("密码必须包含至少一个特殊字符") if errors: raise ValueError('; '.join(errors)) return v class ApiResponse(BaseModel): """标准API响应模型""" success: bool = Field(..., description="操作是否成功") message: str = Field(..., description="响应消息") data: Optional[dict] = Field(None, description="响应数据") error: Optional[dict] = Field(None, description="错误信息") class Config: json_encoders = { datetime: lambda v: v.isoformat() if v else None }