Files
crop-disease/app.py
2026-04-13 17:50:03 +08:00

273 lines
9.5 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
基于 OpenRouter Qwen3 VL 8B Instruct 模型的病虫害识别 FastAPI 服务
"""
import os
import base64
import binascii
from io import BytesIO
from typing import Optional
from dotenv import load_dotenv
import httpx
from fastapi import FastAPI, File, UploadFile, HTTPException
from fastapi.responses import JSONResponse
from pydantic import BaseModel
from PIL import Image
# 加载环境变量
load_dotenv()
app = FastAPI(
title="病虫害识别 API",
description="基于 OpenRouter Qwen3 VL 8B Instruct 模型的病虫害识别服务",
version="1.0.0"
)
# OpenRouter API 配置
OPENROUTER_API_KEY = os.getenv("OPENROUTER_API_KEY")
OPENROUTER_API_URL = os.getenv("OPENROUTER_API_URL")
MODEL_NAME = os.getenv("OPENROUTER_MODEL") # 可通过环境变量配置
if not OPENROUTER_API_KEY:
raise ValueError("请设置 OPENROUTER_API_KEY 环境变量")
class Base64ImageRequest(BaseModel):
"""Base64 图片识别请求模型"""
image_base64: str
question: Optional[str] = None
def encode_image_to_base64(image: Image.Image) -> str:
"""将 PIL Image 转换为 base64 编码字符串"""
buffered = BytesIO()
# 转换为 RGB 模式(如果不是的话)
if image.mode != "RGB":
image = image.convert("RGB")
image.save(buffered, format="JPEG")
# 确保使用 UTF-8 编码解码 base64
img_str = base64.b64encode(buffered.getvalue()).decode('utf-8')
return img_str
async def call_openrouter_api(image_base64: str, user_message: str) -> str:
"""调用 OpenRouter API"""
headers = {
"Authorization": f"Bearer {OPENROUTER_API_KEY}",
"Content-Type": "application/json",
# "HTTP-Referer": "https://github.com/your-repo", # 可选:用于跟踪
# "X-Title": "Pest Disease Identification Service", # 可选:应用名称(使用英文避免编码问题)
}
payload = {
"model": MODEL_NAME,
"messages": [
{
"role": "system",
"content": "你是一位专业的农业病虫害识别专家。请仔细分析用户提供的植物图片识别可能存在的病虫害问题并提供详细的诊断信息包括1. 植物类型,病虫害类型和名称 2. 严重程度 3. 可能的病因 4. 防治建议。请用中文回答。"
},
{
"role": "user",
"content": [
{
"type": "image_url",
"image_url": {
"url": f"data:image/jpeg;base64,{image_base64}"
}
},
{
"type": "text",
"text": user_message
}
]
}
],
"max_tokens": 2000,
"temperature": 0.7,
"chat_template_kwargs": {
"enable_thinking": False
}
}
async with httpx.AsyncClient(timeout=60.0) as client:
try:
response = await client.post(OPENROUTER_API_URL, json=payload, headers=headers)
response.raise_for_status()
result = response.json()
print(result)
if "choices" in result and len(result["choices"]) > 0:
return result["choices"][0]["message"]["content"]
else:
raise HTTPException(status_code=500, detail="API 响应格式错误")
except httpx.HTTPStatusError as e:
error_text = e.response.text
try:
error_text.encode('utf-8')
except (UnicodeEncodeError, AttributeError):
error_text = repr(e.response.text)
raise HTTPException(status_code=e.response.status_code, detail=f"OpenRouter API 错误: {error_text}")
except httpx.RequestError as e:
error_msg = str(e)
try:
error_msg.encode('utf-8')
except UnicodeEncodeError:
error_msg = repr(e)
raise HTTPException(status_code=500, detail=f"请求错误: {error_msg}")
@app.get("/")
async def root():
"""根路径,返回 API 信息"""
return {
"message": "病虫害识别 API",
"version": "1.0.0",
"model": MODEL_NAME,
"endpoints": {
"识别病虫害": "/api/v1/identify",
"健康检查": "/health"
}
}
@app.get("/health")
async def health_check():
"""健康检查端点"""
return {"status": "healthy", "model": MODEL_NAME}
@app.post("/api/v1/identify")
async def identify_pest_disease(
file: UploadFile = File(..., description="植物图片文件"),
question: Optional[str] = None
):
"""
病虫害识别接口
- **file**: 上传的植物图片(支持 jpg, jpeg, png 格式)
- **question**: 可选的自定义问题,如果不提供则使用默认提示
"""
# 验证文件类型
if not file.content_type or not file.content_type.startswith("image/"):
raise HTTPException(status_code=400, detail="请上传图片文件jpg, jpeg, png")
try:
# 读取并验证图片
image_data = await file.read()
image = Image.open(BytesIO(image_data))
# 验证图片大小(可选:限制最大尺寸)
max_size = (4096, 4096)
if image.size[0] > max_size[0] or image.size[1] > max_size[1]:
image.thumbnail(max_size, Image.Resampling.LANCZOS)
# 转换为 base64
image_base64 = encode_image_to_base64(image)
# 构建用户消息
user_message = question or "请识别这张图片中的植物是否存在病虫害问题,如果存在,请详细说明病虫害类型、症状、严重程度和防治建议。"
# 调用 OpenRouter API
result = await call_openrouter_api(image_base64, user_message)
return JSONResponse(content={
"success": True,
"result": result,
"image_info": {
"format": image.format,
"size": image.size,
"mode": image.mode
}
})
except UnicodeEncodeError as e:
raise HTTPException(status_code=500, detail=f"处理图片时出错: 编码错误 - {repr(e)}")
except Exception as e:
error_msg = str(e)
# 确保错误信息可以正确编码
try:
error_msg.encode('utf-8')
except UnicodeEncodeError:
error_msg = repr(e)
raise HTTPException(status_code=500, detail=f"处理图片时出错: {error_msg}")
@app.post("/api/v1/identify/base64")
async def identify_pest_disease_base64(request: Base64ImageRequest):
"""
使用 base64 编码图片的病虫害识别接口
- **image_base64**: base64 编码的图片数据(不包含 data:image 前缀)
- **question**: 可选的自定义问题
"""
try:
# 清理 base64 字符串(移除可能的 data:image 前缀和空白字符)
base64_str = request.image_base64.strip()
if "," in base64_str:
# 如果包含 data:image 前缀,提取 base64 部分
base64_str = base64_str.split(",")[-1]
# 确保 base64 字符串只包含有效的 base64 字符
# 移除所有空白字符(包括换行符)
base64_str = ''.join(base64_str.split())
# 解码 base64 图片
try:
# 确保字符串是有效的 base64 格式
if not base64_str:
raise ValueError("Base64 字符串为空")
image_data = base64.b64decode(base64_str, validate=True)
except binascii.Error:
raise ValueError("Base64 解码失败: 无效的 base64 格式")
except Exception as decode_error:
error_msg = str(decode_error)
try:
error_msg.encode('utf-8')
except UnicodeEncodeError:
error_msg = repr(decode_error)
raise ValueError(f"Base64 解码失败: {error_msg}")
image = Image.open(BytesIO(image_data))
# 验证图片大小
max_size = (4096, 4096)
if image.size[0] > max_size[0] or image.size[1] > max_size[1]:
image.thumbnail(max_size, Image.Resampling.LANCZOS)
# 重新编码
image_base64_encoded = encode_image_to_base64(image)
# 构建用户消息
user_message = request.question or "请识别这张图片中的植物是否存在病虫害问题,如果存在,请详细说明病虫害类型、症状、严重程度和防治建议。"
# 调用 OpenRouter API
result = await call_openrouter_api(image_base64_encoded, user_message)
return JSONResponse(content={
"success": True,
"result": result,
"image_info": {
"format": image.format,
"size": image.size,
"mode": image.mode
}
})
except UnicodeEncodeError as e:
raise HTTPException(status_code=500, detail=f"处理图片时出错: 编码错误 - {repr(e)}")
except Exception as e:
error_msg = str(e)
# 确保错误信息可以正确编码
try:
error_msg.encode('utf-8')
except UnicodeEncodeError:
error_msg = repr(e)
raise HTTPException(status_code=500, detail=f"处理图片时出错: {error_msg}")
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)