Files
maimai-pcse/simulator.py
zhenghu bd3d73d140 feat: 将模拟平台从西班牙 Demo 数据迁移至中国多省份场景
提交正文:
  - simulator.py: 新增 SyntheticWeatherDataProvider,基于河南、黑龙江、湖北、
    新疆、四川五省气候模板生成合成日气象数据
  - simulator.py: 引入国内农事日历 CROP_CALENDAR_CN 与省份作物配置 PROVINCE_CROPS
  - simulator.py: 移除对 GridWeatherDataProvider / AgroManagementDataProvider /
    fetch_sitedata 的西班牙 Demo 数据依赖
  - app.py: 侧边栏支持省份选择,年份范围扩展为 2019-2023
  - app.py: 全面移除自定义 CSS,改用 Streamlit 原生组件(st.metric / st.info /
    st.success / st.divider 等)简化界面
  - app.py: 图表回归 Plotly 原生 add_vline,移除 hex_to_rgba / add_milestone_line
    辅助函数
2026-04-14 15:49:37 +08:00

434 lines
16 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
麦麦智农 - PCSE 模拟引擎封装(国内场景版)
基于 WOFOST 7.2 潜在生产PP与水分限制生产WLP模型
"""
import datetime
import math
import sqlite3
from collections import namedtuple
from typing import Literal
import numpy as np
import pandas as pd
from pcse.models import Wofost72_PP, Wofost72_WLP_CWB
from pcse.settings import settings
from pcse.base import ParameterProvider, WeatherDataContainer, WeatherDataProvider
from pcse.tests.db_input import fetch_soildata, fetch_cropdata
from pcse.util import reference_ET
# ─── 常量配置 ────────────────────────────────────────────────────────────────
DB_PATH = settings.PCSE_USER_HOME + "/pcse.db"
CROP_META = {
1: {"name": "冬小麦", "emoji": "🌾", "color": "#c69c5d", "en": "WINTER WHEAT"},
2: {"name": "玉米", "emoji": "🌽", "color": "#e8a93f", "en": "GRAIN MAIZE"},
3: {"name": "春大麦", "emoji": "🌿", "color": "#9cb85f", "en": "SPRING BARLEY"},
7: {"name": "马铃薯", "emoji": "🥔", "color": "#b8a88a", "en": "POTATO"},
10: {"name": "冬油菜", "emoji": "🌻", "color": "#d97836", "en": "WINTER RAPESEED"},
11: {"name": "向日葵", "emoji": "🌻", "color": "#f4c430", "en": "SUNFLOWER"},
}
MODE_LABELS = {
"pp": "潜在生产",
"wlp": "水分限制生产",
}
# 省份支持作物(黑龙江太冷,去掉越冬作物)
PROVINCE_CROPS = {
"河南": [1, 2, 3, 7, 10, 11],
"黑龙江": [2, 3, 7, 11],
"湖北": [1, 2, 3, 7, 10, 11],
"新疆": [1, 2, 3, 7, 10, 11],
"四川": [1, 2, 3, 7, 10, 11],
}
# 国内农事日历(播种月日 -> 收获月日)
CROP_CALENDAR_CN = {
"冬小麦": {
"河南": {"start": (10, 15), "end": (5, 25)},
"湖北": {"start": (10, 25), "end": (5, 15)},
"新疆": {"start": (9, 20), "end": (6, 25)},
"四川": {"start": (10, 20), "end": (5, 10)},
},
"玉米": {
"河南": {"start": (6, 5), "end": (9, 25)},
"黑龙江": {"start": (5, 1), "end": (9, 20)},
"湖北": {"start": (3, 25), "end": (7, 20)},
"新疆": {"start": (4, 15), "end": (9, 10)},
"四川": {"start": (3, 20), "end": (7, 15)},
},
"春大麦": {
"河南": {"start": (3, 1), "end": (5, 25)},
"黑龙江": {"start": (4, 10), "end": (7, 15)},
"湖北": {"start": (2, 15), "end": (5, 20)},
"新疆": {"start": (3, 25), "end": (7, 10)},
"四川": {"start": (2, 20), "end": (5, 15)},
},
"马铃薯": {
"河南": {"start": (2, 25), "end": (5, 30)},
"黑龙江": {"start": (4, 20), "end": (8, 15)},
"湖北": {"start": (1, 15), "end": (5, 10)},
"新疆": {"start": (4, 1), "end": (7, 25)},
"四川": {"start": (1, 20), "end": (5, 5)},
},
"冬油菜": {
"河南": {"start": (9, 15), "end": (5, 15)},
"湖北": {"start": (9, 20), "end": (5, 5)},
"新疆": {"start": (8, 25), "end": (6, 15)},
"四川": {"start": (9, 10), "end": (4, 25)},
},
"向日葵": {
"河南": {"start": (5, 25), "end": (8, 30)},
"黑龙江": {"start": (5, 15), "end": (9, 10)},
"湖北": {"start": (4, 5), "end": (8, 5)},
"新疆": {"start": (4, 20), "end": (8, 20)},
"四川": {"start": (3, 25), "end": (7, 25)},
},
}
# 省份气候模板(每月 [TMAX, TMIN, RAIN(mm/day), IRRAD(kJ/m2/day)]
PROVINCES = {
"河南": {
"lat": 34.76, "lon": 113.65, "elev": 110,
"rh": 0.65, "wind_base": 2.0,
"climate": [
[5, -4, 0.5, 9000], [9, -1, 0.6, 12000], [15, 4, 1.0, 16000],
[22, 10, 1.5, 19000], [28, 16, 2.0, 22000], [33, 21, 2.5, 24000],
[32, 23, 4.0, 21000], [31, 22, 3.5, 19000], [27, 17, 2.0, 16000],
[22, 11, 1.2, 13000], [14, 3, 0.8, 9000], [7, -2, 0.4, 8000],
],
},
"黑龙江": {
"lat": 45.80, "lon": 126.53, "elev": 150,
"rh": 0.60, "wind_base": 3.0,
"climate": [
[-13, -24, 0.2, 6000], [-8, -20, 0.2, 9000], [2, -9, 0.4, 14000],
[13, 1, 1.0, 18000], [21, 8, 1.8, 22000], [26, 15, 3.0, 24000],
[28, 18, 4.5, 22000], [26, 16, 4.0, 19000], [20, 9, 1.5, 15000],
[11, 1, 0.8, 11000], [-1, -10, 0.4, 7000], [-11, -21, 0.3, 5000],
],
},
"湖北": {
"lat": 30.59, "lon": 114.31, "elev": 30,
"rh": 0.75, "wind_base": 1.8,
"climate": [
[8, 0, 1.5, 8000], [11, 3, 1.8, 10000], [16, 7, 2.5, 13000],
[22, 13, 3.0, 16000], [27, 18, 3.5, 19000], [30, 22, 4.5, 20000],
[33, 25, 5.5, 22000], [33, 25, 4.0, 21000], [29, 21, 3.0, 17000],
[24, 15, 2.0, 14000], [17, 8, 1.5, 10000], [11, 2, 1.0, 8000],
],
},
"新疆": {
"lat": 44.30, "lon": 87.60, "elev": 450,
"rh": 0.45, "wind_base": 2.5,
"climate": [
[-8, -16, 0.1, 7000], [-4, -12, 0.2, 11000], [6, -2, 0.3, 16000],
[18, 7, 0.5, 20000], [25, 13, 0.8, 24000], [30, 18, 1.0, 26000],
[32, 20, 1.2, 27000], [30, 18, 0.8, 25000], [24, 12, 0.5, 20000],
[15, 4, 0.3, 15000], [3, -4, 0.2, 9000], [-5, -13, 0.1, 6000],
],
},
"四川": {
"lat": 30.67, "lon": 104.07, "elev": 500,
"rh": 0.80, "wind_base": 1.2,
"climate": [
[10, 3, 0.6, 6000], [13, 5, 0.8, 8000], [17, 9, 1.2, 11000],
[23, 14, 2.0, 13000], [27, 18, 3.0, 15000], [29, 21, 4.0, 16000],
[31, 23, 5.0, 17000], [31, 23, 4.5, 16000], [27, 19, 3.0, 12000],
[22, 15, 1.5, 9000], [16, 10, 1.0, 7000], [11, 5, 0.5, 6000],
],
},
}
SITEDATA = {
"IFUNRN": 0.0,
"SSMAX": 0.0,
"NOTINF": 0.0,
"SSI": 0.0,
"WAV": 100.0,
"SMLIM": 0.0,
}
# ─── 数据库辅助 ───────────────────────────────────────────────────────────────
def namedtuple_factory(cursor, row):
fields = [column[0] for column in cursor.description]
cls = namedtuple("Row", fields)
return cls._make(row)
def get_db_conn():
conn = sqlite3.connect(DB_PATH)
conn.row_factory = namedtuple_factory
return conn
# ─── 气象数据生成器 ───────────────────────────────────────────────────────────
class SyntheticWeatherDataProvider(WeatherDataProvider):
"""基于省份气候模板和年份生成合成日气象数据。"""
angstA = 0.29
angstB = 0.49
def __init__(self, province: str, start_date: datetime.date, end_date: datetime.date, seed: int):
WeatherDataProvider.__init__(self)
self.province = PROVINCES[province]
self.latitude = self.province["lat"]
self.longitude = self.province["lon"]
self.elevation = self.province["elev"]
self.description = f"Synthetic weather for {province}"
rng = np.random.default_rng(seed)
days = (end_date - start_date).days + 1
days_in_month = [31, 28, 31, 30, 31, 30, 31, 31, 30, 31, 30, 31]
if (start_date.year % 4 == 0 and start_date.year % 100 != 0) or (start_date.year % 400 == 0):
days_in_month[1] = 29
for i in range(days):
day = start_date + datetime.timedelta(days=i)
month = day.month - 1
next_month = (month + 1) % 12
d = day.day - 1
frac = (d + 0.5) / days_in_month[month]
w2 = 0.5 - 0.5 * math.cos(math.pi * frac)
w1 = 1 - w2
c = self.province["climate"][month]
c_next = self.province["climate"][next_month]
tmax_base = c[0] * w1 + c_next[0] * w2
tmin_base = c[1] * w1 + c_next[1] * w2
rain_mean = c[2] * w1 + c_next[2] * w2
irrad_base = (c[3] * w1 + c_next[3] * w2) * 1000.0 # kJ -> J
tmax = tmax_base + rng.normal(0, 1.5)
tmin = tmin_base + rng.normal(0, 1.5)
tmin = min(tmin, tmax - 1.0)
irrad = max(0, irrad_base * (1 + rng.normal(0, 0.1)))
p_rain = min(0.8, max(0.05, rain_mean / 5.0))
if rng.random() < p_rain:
scale = rain_mean / (2 * p_rain)
rain = rng.gamma(2, scale) if scale > 0 else 0.0
else:
rain = 0.0
rain = max(0.0, rain)
tmean = (tmax + tmin) / 2.0
es = 0.6108 * math.exp(17.27 * tmean / (tmean + 237.3))
rh = self.province["rh"] + rng.normal(0, 0.05)
rh = max(0.2, min(0.95, rh))
vap = es * rh * 10.0
wind = max(0.5, self.province["wind_base"] + rng.normal(0, 0.8))
e0, es0, et0 = reference_ET(
DAY=day, LAT=self.latitude, ELEV=self.elevation,
TMIN=float(tmin), TMAX=float(tmax), IRRAD=float(irrad),
VAP=float(vap), WIND=float(wind),
ANGSTA=self.angstA, ANGSTB=self.angstB,
)
t = {
"DAY": day,
"LAT": self.latitude,
"LON": self.longitude,
"ELEV": self.elevation,
"TMAX": float(tmax),
"TMIN": float(tmin),
"VAP": float(vap),
"WIND": float(wind),
"RAIN": float(rain) / 10.0,
"IRRAD": float(irrad),
"E0": float(e0) / 10.0,
"ES0": float(es0) / 10.0,
"ET0": float(et0) / 10.0,
"SNOWDEPTH": 0.0,
}
wdc = WeatherDataContainer(**t)
self._store_WeatherDataContainer(wdc, day)
# ─── 农事管理构造 ───────────────────────────────────────────────────────────
def build_agromanagement(crop_no: int, province: str, year: int):
meta = CROP_META[crop_no]
crop_name = meta["en"]
cal = CROP_CALENDAR_CN[meta["name"]][province]
sm, sd = cal["start"]
em, ed = cal["end"]
start_date = datetime.date(year, sm, sd)
end_date = datetime.date(year, em, ed)
if end_date < start_date:
end_date = datetime.date(year + 1, em, ed)
campaign_start = datetime.date(year, 1, 1)
else:
campaign_start = datetime.date(year, 1, 1)
return [{
campaign_start: {
"CropCalendar": {
"crop_name": crop_name,
"variety_name": f"{crop_name}_{province}_{year}",
"crop_start_date": start_date,
"crop_start_type": "emergence",
"crop_end_date": end_date,
"crop_end_type": "earliest",
"max_duration": 365,
},
"TimedEvents": None,
"StateEvents": None,
}
}]
# ─── 公开接口 ───────────────────────────────────────────────────────────────
def list_available_provinces() -> list[str]:
return list(PROVINCES.keys())
def list_available_crops(province: str) -> list[dict]:
return [{"crop_no": c, **CROP_META[c]} for c in PROVINCE_CROPS.get(province, [])]
def run_wofost(
province: str,
crop_no: int,
year: int,
mode: Literal["pp", "wlp"] = "wlp",
) -> dict:
agromanagement = build_agromanagement(crop_no, province, year)
# 确定气象起止日期(覆盖整个生育期 + 前后缓冲)
campaign = agromanagement[0]
campaign_date = list(campaign.keys())[0]
cc = campaign[campaign_date]["CropCalendar"]
start = campaign_date - datetime.timedelta(days=30)
end = cc["crop_end_date"] + datetime.timedelta(days=30)
seed = year * 1000 + hash(province) % 100000
wdp = SyntheticWeatherDataProvider(province, start, end, seed)
conn = get_db_conn()
try:
# 作物/土壤参数复用 PCSE Demo DB生理参数通用
cropd = fetch_cropdata(conn, 31031, 2000, crop_no)
soild = fetch_soildata(conn, 31031)
parvalues = ParameterProvider(sitedata=SITEDATA, soildata=soild, cropdata=cropd)
finally:
conn.close()
if mode == "pp":
wofsim = Wofost72_PP(parvalues, wdp, agromanagement)
else:
wofsim = Wofost72_WLP_CWB(parvalues, wdp, agromanagement)
wofsim.run_till_terminate()
output = wofsim.get_output()
df = pd.DataFrame(output)
if not df.empty and "day" in df.columns:
df = df.set_index("day")
if not df.empty:
last = df.iloc[-1]
max_lai = df["LAI"].max() if "LAI" in df.columns else None
tagp = last.get("TAGP")
twso = last.get("TWSO")
sm_end = last.get("SM")
dvs_end = last.get("DVS")
duration = len(df)
else:
max_lai = tagp = twso = sm_end = dvs_end = duration = None
milestones = {}
crop_start = cc["crop_start_date"]
crop_end = cc["crop_end_date"]
if not df.empty and "DVS" in df.columns:
dvs_series = df["DVS"]
flowering = dvs_series[dvs_series >= 1.0]
if not flowering.empty:
milestones["flowering"] = flowering.index[0]
maturity = dvs_series[dvs_series >= 2.0]
if not maturity.empty:
milestones["maturity"] = maturity.index[0]
milestones["start"] = crop_start
milestones["end"] = df.index[-1] if df.index[-1] <= crop_end else crop_end
actual_duration = (milestones.get("end", crop_end) - crop_start).days + 1 if milestones else None
meta = CROP_META.get(crop_no, {"name": "未知作物", "emoji": "🌱", "color": "#888"})
return {
"meta": {
"province": province,
"crop_no": crop_no,
"year": year,
"mode": mode,
**meta,
"mode_label": MODE_LABELS.get(mode, mode),
},
"df": df,
"summary": {
"tagp": tagp,
"twso": twso,
"max_lai": max_lai,
"sm_end": sm_end,
"dvs_end": dvs_end,
"duration": actual_duration,
"milestones": milestones,
},
}
def run_multi_crop(
province: str,
year: int,
mode: Literal["pp", "wlp"] = "wlp",
) -> list[dict]:
crops = list_available_crops(province)
results = []
for c in crops:
try:
res = run_wofost(province=province, crop_no=c["crop_no"], year=year, mode=mode)
results.append({
"crop_no": c["crop_no"],
"name": c["name"],
"emoji": c["emoji"],
"color": c["color"],
"tagp": res["summary"]["tagp"],
"twso": res["summary"]["twso"],
"max_lai": res["summary"]["max_lai"],
"duration": res["summary"]["duration"],
})
except Exception:
continue
results.sort(key=lambda x: (x["twso"] if x["twso"] is not None else -1), reverse=True)
return results
def run_multi_year(
province: str,
crop_no: int,
mode: Literal["pp", "wlp"] = "wlp",
years: list[int] | None = None,
) -> list[dict]:
if years is None:
years = list(range(2019, 2024))
results = []
for year in years:
try:
res = run_wofost(province=province, crop_no=crop_no, year=year, mode=mode)
results.append({
"year": year,
"tagp": res["summary"]["tagp"],
"twso": res["summary"]["twso"],
"max_lai": res["summary"]["max_lai"],
"duration": res["summary"]["duration"],
})
except Exception:
continue
return results