Files
maimai-pcse/simulator.py
zhenghu e4543ce7bd feat: 初始化麦麦智农作物生长模拟平台
- 基于 PCSE/WOFOST 构建作物生长模拟平台
  - 新增 Streamlit 可视化应用(app.py)与模拟引擎(simulator.py)
  - 支持潜在生产(PP)与水分限制生产(WLP)两种模拟模式
  - 支持冬小麦、玉米、春大麦、马铃薯、冬油菜、向日葵 6 种作物
  - 提供 LAI 动态、生物量积累、土壤水分、产量对比等可视化图表
  - 新增 pyproject.toml、justfile、Dockerfile 等工程配置
  - 完善 README.md 项目文档与 .gitignore 忽略规则
2026-04-14 14:46:35 +08:00

224 lines
7.0 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
麦麦智农 - PCSE 模拟引擎封装
基于 WOFOST 7.2 潜在生产PP与水分限制生产WLP模型
"""
import sqlite3
from collections import namedtuple
from typing import Literal
import pandas as pd
import pcse
from pcse.models import Wofost72_PP, Wofost72_WLP_CWB
from pcse.settings import settings
from pcse.base import ParameterProvider
from pcse.tests.db_input import (
GridWeatherDataProvider,
fetch_soildata,
fetch_sitedata,
fetch_cropdata,
AgroManagementDataProvider,
)
def namedtuple_factory(cursor, row):
fields = [column[0] for column in cursor.description]
cls = namedtuple("Row", fields)
return cls._make(row)
# Demo 数据库配置
DB_PATH = settings.PCSE_USER_HOME + "/pcse.db"
# 作物映射crop_no -> 中文名/元数据)
CROP_META = {
1: {"name": "冬小麦", "emoji": "🌾", "color": "#c69c5d", "en": "WINTER WHEAT"},
2: {"name": "玉米", "emoji": "🌽", "color": "#e8a93f", "en": "GRAIN MAIZE"},
3: {"name": "春大麦", "emoji": "🌿", "color": "#9cb85f", "en": "SPRING BARLEY"},
7: {"name": "马铃薯", "emoji": "🥔", "color": "#b8a88a", "en": "POTATO"},
10: {"name": "冬油菜", "emoji": "🌻", "color": "#d97836", "en": "WINTER RAPESEED"},
11: {"name": "向日葵", "emoji": "🌻", "color": "#f4c430", "en": "SUNFLOWER"},
}
MODE_LABELS = {
"pp": "潜在生产",
"wlp": "水分限制生产",
}
def get_db_conn():
conn = sqlite3.connect(DB_PATH)
conn.row_factory = namedtuple_factory
return conn
def list_available_years(grid_no: int = 31031) -> list[int]:
"""返回 Demo 数据库中指定 grid 有气象数据的年份列表。"""
conn = get_db_conn()
c = conn.cursor()
rows = c.execute(
"SELECT DISTINCT strftime('%Y', day) as year FROM grid_weather WHERE grid_no=? ORDER BY year",
(grid_no,),
).fetchall()
conn.close()
return [int(r.year) for r in rows]
def list_available_crops(grid_no: int = 31031, year: int = 2000) -> list[dict]:
"""返回指定 grid 与年份有作物日历的作物列表。"""
conn = get_db_conn()
c = conn.cursor()
rows = c.execute(
"SELECT crop_no FROM crop_calendar WHERE grid_no=? AND year=? ORDER BY crop_no",
(grid_no, year),
).fetchall()
conn.close()
result = []
for r in rows:
meta = CROP_META.get(r.crop_no)
if meta:
result.append({"crop_no": r.crop_no, **meta})
return result
def run_wofost(
grid_no: int = 31031,
crop_no: int = 1,
year: int = 2000,
mode: Literal["pp", "wlp"] = "wlp",
) -> dict:
"""
运行 WOFOST 模拟,返回包含输出时间序列与关键指标的字典。
"""
conn = get_db_conn()
try:
agromanagement = AgroManagementDataProvider(conn, grid_no, crop_no, year)
sited = fetch_sitedata(conn, grid_no, year)
cropd = fetch_cropdata(conn, grid_no, year, crop_no)
soild = fetch_soildata(conn, grid_no)
parvalues = ParameterProvider(sitedata=sited, soildata=soild, cropdata=cropd)
wdp = GridWeatherDataProvider(conn, grid_no=grid_no)
if mode == "pp":
wofsim = Wofost72_PP(parvalues, wdp, agromanagement)
else:
wofsim = Wofost72_WLP_CWB(parvalues, wdp, agromanagement)
wofsim.run_till_terminate()
output = wofsim.get_output()
finally:
conn.close()
df = pd.DataFrame(output)
if not df.empty and "day" in df.columns:
df = df.set_index("day")
# 关键指标
if not df.empty:
last = df.iloc[-1]
max_lai = df["LAI"].max() if "LAI" in df.columns else None
tagp = last.get("TAGP")
twso = last.get("TWSO")
sm_end = last.get("SM")
dvs_end = last.get("DVS")
duration = len(df)
else:
max_lai = tagp = twso = sm_end = dvs_end = duration = None
# 生育期里程碑(基于 DVS
milestones = {}
if not df.empty and "DVS" in df.columns:
dvs_series = df["DVS"]
# 出苗/开始生长 DVS ~0实际在 calendar 开始日)
# 开花 DVS ~1.0
# 成熟 DVS ~2.0
flowering = dvs_series[dvs_series >= 1.0]
if not flowering.empty:
milestones["flowering"] = flowering.index[0]
maturity = dvs_series[dvs_series >= 2.0]
if not maturity.empty:
milestones["maturity"] = maturity.index[0]
# 开始日
milestones["start"] = df.index[0]
milestones["end"] = df.index[-1]
meta = CROP_META.get(crop_no, {"name": "未知作物", "emoji": "🌱", "color": "#888"})
return {
"meta": {
"grid_no": grid_no,
"crop_no": crop_no,
"year": year,
"mode": mode,
**meta,
"mode_label": MODE_LABELS.get(mode, mode),
},
"df": df,
"summary": {
"tagp": tagp, # 总地上生物量 kg/ha
"twso": twso, # 经济产量 kg/ha
"max_lai": max_lai,
"sm_end": sm_end,
"dvs_end": dvs_end,
"duration": duration,
"milestones": milestones,
},
}
def run_multi_crop(
grid_no: int = 31031,
year: int = 2000,
mode: Literal["pp", "wlp"] = "wlp",
) -> list[dict]:
"""
对指定年份所有可用作物运行模拟,返回各作物汇总结果列表(按 TWSO 排序)。
"""
crops = list_available_crops(grid_no, year)
results = []
for c in crops:
try:
res = run_wofost(grid_no=grid_no, crop_no=c["crop_no"], year=year, mode=mode)
results.append({
"crop_no": c["crop_no"],
"name": c["name"],
"emoji": c["emoji"],
"color": c["color"],
"tagp": res["summary"]["tagp"],
"twso": res["summary"]["twso"],
"max_lai": res["summary"]["max_lai"],
"duration": res["summary"]["duration"],
})
except Exception:
# 某些作物可能因数据缺失导致模拟失败,静默跳过
continue
results.sort(key=lambda x: (x["twso"] if x["twso"] is not None else -1), reverse=True)
return results
def run_multi_year(
grid_no: int = 31031,
crop_no: int = 1,
mode: Literal["pp", "wlp"] = "wlp",
) -> list[dict]:
"""
对指定作物在所有可用年份运行模拟,返回各年份汇总结果列表。
"""
years = list_available_years(grid_no)
results = []
meta = CROP_META.get(crop_no, {"name": "未知作物", "emoji": "🌱", "color": "#888"})
for year in years:
try:
res = run_wofost(grid_no=grid_no, crop_no=crop_no, year=year, mode=mode)
results.append({
"year": year,
"tagp": res["summary"]["tagp"],
"twso": res["summary"]["twso"],
"max_lai": res["summary"]["max_lai"],
"duration": res["summary"]["duration"],
})
except Exception:
continue
return results