""" 麦麦智农 - PCSE 模拟引擎封装(国内场景版) 基于 WOFOST 7.2 潜在生产(PP)与水分限制生产(WLP)模型 """ import datetime import math import sqlite3 from collections import namedtuple from typing import Literal import numpy as np import pandas as pd from pcse.models import Wofost72_PP, Wofost72_WLP_CWB from pcse.settings import settings from pcse.base import ParameterProvider, WeatherDataContainer, WeatherDataProvider from pcse.tests.db_input import fetch_soildata, fetch_cropdata from pcse.util import reference_ET # ─── 常量配置 ──────────────────────────────────────────────────────────────── DB_PATH = settings.PCSE_USER_HOME + "/pcse.db" CROP_META = { 1: {"name": "冬小麦", "emoji": "🌾", "color": "#c69c5d", "en": "WINTER WHEAT"}, 2: {"name": "玉米", "emoji": "🌽", "color": "#e8a93f", "en": "GRAIN MAIZE"}, 3: {"name": "春大麦", "emoji": "🌿", "color": "#9cb85f", "en": "SPRING BARLEY"}, 7: {"name": "马铃薯", "emoji": "🥔", "color": "#b8a88a", "en": "POTATO"}, 10: {"name": "冬油菜", "emoji": "🌻", "color": "#d97836", "en": "WINTER RAPESEED"}, 11: {"name": "向日葵", "emoji": "🌻", "color": "#f4c430", "en": "SUNFLOWER"}, } MODE_LABELS = { "pp": "潜在生产", "wlp": "水分限制生产", } # 省份支持作物(黑龙江太冷,去掉越冬作物) PROVINCE_CROPS = { "河南": [1, 2, 3, 7, 10, 11], "黑龙江": [2, 3, 7, 11], "湖北": [1, 2, 3, 7, 10, 11], "新疆": [1, 2, 3, 7, 10, 11], "四川": [1, 2, 3, 7, 10, 11], } # 国内农事日历(播种月日 -> 收获月日) CROP_CALENDAR_CN = { "冬小麦": { "河南": {"start": (10, 15), "end": (5, 25)}, "湖北": {"start": (10, 25), "end": (5, 15)}, "新疆": {"start": (9, 20), "end": (6, 25)}, "四川": {"start": (10, 20), "end": (5, 10)}, }, "玉米": { "河南": {"start": (6, 5), "end": (9, 25)}, "黑龙江": {"start": (5, 1), "end": (9, 20)}, "湖北": {"start": (3, 25), "end": (7, 20)}, "新疆": {"start": (4, 15), "end": (9, 10)}, "四川": {"start": (3, 20), "end": (7, 15)}, }, "春大麦": { "河南": {"start": (3, 1), "end": (5, 25)}, "黑龙江": {"start": (4, 10), "end": (7, 15)}, "湖北": {"start": (2, 15), "end": (5, 20)}, "新疆": {"start": (3, 25), "end": (7, 10)}, "四川": {"start": (2, 20), "end": (5, 15)}, }, "马铃薯": { "河南": {"start": (2, 25), "end": (5, 30)}, "黑龙江": {"start": (4, 20), "end": (8, 15)}, "湖北": {"start": (1, 15), "end": (5, 10)}, "新疆": {"start": (4, 1), "end": (7, 25)}, "四川": {"start": (1, 20), "end": (5, 5)}, }, "冬油菜": { "河南": {"start": (9, 15), "end": (5, 15)}, "湖北": {"start": (9, 20), "end": (5, 5)}, "新疆": {"start": (8, 25), "end": (6, 15)}, "四川": {"start": (9, 10), "end": (4, 25)}, }, "向日葵": { "河南": {"start": (5, 25), "end": (8, 30)}, "黑龙江": {"start": (5, 15), "end": (9, 10)}, "湖北": {"start": (4, 5), "end": (8, 5)}, "新疆": {"start": (4, 20), "end": (8, 20)}, "四川": {"start": (3, 25), "end": (7, 25)}, }, } # 省份气候模板(每月 [TMAX, TMIN, RAIN(mm/day), IRRAD(kJ/m2/day)]) PROVINCES = { "河南": { "lat": 34.76, "lon": 113.65, "elev": 110, "rh": 0.65, "wind_base": 2.0, "climate": [ [5, -4, 0.5, 9000], [9, -1, 0.6, 12000], [15, 4, 1.0, 16000], [22, 10, 1.5, 19000], [28, 16, 2.0, 22000], [33, 21, 2.5, 24000], [32, 23, 4.0, 21000], [31, 22, 3.5, 19000], [27, 17, 2.0, 16000], [22, 11, 1.2, 13000], [14, 3, 0.8, 9000], [7, -2, 0.4, 8000], ], }, "黑龙江": { "lat": 45.80, "lon": 126.53, "elev": 150, "rh": 0.60, "wind_base": 3.0, "climate": [ [-13, -24, 0.2, 6000], [-8, -20, 0.2, 9000], [2, -9, 0.4, 14000], [13, 1, 1.0, 18000], [21, 8, 1.8, 22000], [26, 15, 3.0, 24000], [28, 18, 4.5, 22000], [26, 16, 4.0, 19000], [20, 9, 1.5, 15000], [11, 1, 0.8, 11000], [-1, -10, 0.4, 7000], [-11, -21, 0.3, 5000], ], }, "湖北": { "lat": 30.59, "lon": 114.31, "elev": 30, "rh": 0.75, "wind_base": 1.8, "climate": [ [8, 0, 1.5, 8000], [11, 3, 1.8, 10000], [16, 7, 2.5, 13000], [22, 13, 3.0, 16000], [27, 18, 3.5, 19000], [30, 22, 4.5, 20000], [33, 25, 5.5, 22000], [33, 25, 4.0, 21000], [29, 21, 3.0, 17000], [24, 15, 2.0, 14000], [17, 8, 1.5, 10000], [11, 2, 1.0, 8000], ], }, "新疆": { "lat": 44.30, "lon": 87.60, "elev": 450, "rh": 0.45, "wind_base": 2.5, "climate": [ [-8, -16, 0.1, 7000], [-4, -12, 0.2, 11000], [6, -2, 0.3, 16000], [18, 7, 0.5, 20000], [25, 13, 0.8, 24000], [30, 18, 1.0, 26000], [32, 20, 1.2, 27000], [30, 18, 0.8, 25000], [24, 12, 0.5, 20000], [15, 4, 0.3, 15000], [3, -4, 0.2, 9000], [-5, -13, 0.1, 6000], ], }, "四川": { "lat": 30.67, "lon": 104.07, "elev": 500, "rh": 0.80, "wind_base": 1.2, "climate": [ [10, 3, 0.6, 6000], [13, 5, 0.8, 8000], [17, 9, 1.2, 11000], [23, 14, 2.0, 13000], [27, 18, 3.0, 15000], [29, 21, 4.0, 16000], [31, 23, 5.0, 17000], [31, 23, 4.5, 16000], [27, 19, 3.0, 12000], [22, 15, 1.5, 9000], [16, 10, 1.0, 7000], [11, 5, 0.5, 6000], ], }, } SITEDATA = { "IFUNRN": 0.0, "SSMAX": 0.0, "NOTINF": 0.0, "SSI": 0.0, "WAV": 100.0, "SMLIM": 0.0, } # ─── 数据库辅助 ─────────────────────────────────────────────────────────────── def namedtuple_factory(cursor, row): fields = [column[0] for column in cursor.description] cls = namedtuple("Row", fields) return cls._make(row) def get_db_conn(): conn = sqlite3.connect(DB_PATH) conn.row_factory = namedtuple_factory return conn # ─── 气象数据生成器 ─────────────────────────────────────────────────────────── class SyntheticWeatherDataProvider(WeatherDataProvider): """基于省份气候模板和年份生成合成日气象数据。""" angstA = 0.29 angstB = 0.49 def __init__(self, province: str, start_date: datetime.date, end_date: datetime.date, seed: int): WeatherDataProvider.__init__(self) self.province = PROVINCES[province] self.latitude = self.province["lat"] self.longitude = self.province["lon"] self.elevation = self.province["elev"] self.description = f"Synthetic weather for {province}" rng = np.random.default_rng(seed) days = (end_date - start_date).days + 1 days_in_month = [31, 28, 31, 30, 31, 30, 31, 31, 30, 31, 30, 31] if (start_date.year % 4 == 0 and start_date.year % 100 != 0) or (start_date.year % 400 == 0): days_in_month[1] = 29 for i in range(days): day = start_date + datetime.timedelta(days=i) month = day.month - 1 next_month = (month + 1) % 12 d = day.day - 1 frac = (d + 0.5) / days_in_month[month] w2 = 0.5 - 0.5 * math.cos(math.pi * frac) w1 = 1 - w2 c = self.province["climate"][month] c_next = self.province["climate"][next_month] tmax_base = c[0] * w1 + c_next[0] * w2 tmin_base = c[1] * w1 + c_next[1] * w2 rain_mean = c[2] * w1 + c_next[2] * w2 irrad_base = (c[3] * w1 + c_next[3] * w2) * 1000.0 # kJ -> J tmax = tmax_base + rng.normal(0, 1.5) tmin = tmin_base + rng.normal(0, 1.5) tmin = min(tmin, tmax - 1.0) irrad = max(0, irrad_base * (1 + rng.normal(0, 0.1))) p_rain = min(0.8, max(0.05, rain_mean / 5.0)) if rng.random() < p_rain: scale = rain_mean / (2 * p_rain) rain = rng.gamma(2, scale) if scale > 0 else 0.0 else: rain = 0.0 rain = max(0.0, rain) tmean = (tmax + tmin) / 2.0 es = 0.6108 * math.exp(17.27 * tmean / (tmean + 237.3)) rh = self.province["rh"] + rng.normal(0, 0.05) rh = max(0.2, min(0.95, rh)) vap = es * rh * 10.0 wind = max(0.5, self.province["wind_base"] + rng.normal(0, 0.8)) e0, es0, et0 = reference_ET( DAY=day, LAT=self.latitude, ELEV=self.elevation, TMIN=float(tmin), TMAX=float(tmax), IRRAD=float(irrad), VAP=float(vap), WIND=float(wind), ANGSTA=self.angstA, ANGSTB=self.angstB, ) t = { "DAY": day, "LAT": self.latitude, "LON": self.longitude, "ELEV": self.elevation, "TMAX": float(tmax), "TMIN": float(tmin), "VAP": float(vap), "WIND": float(wind), "RAIN": float(rain) / 10.0, "IRRAD": float(irrad), "E0": float(e0) / 10.0, "ES0": float(es0) / 10.0, "ET0": float(et0) / 10.0, "SNOWDEPTH": 0.0, } wdc = WeatherDataContainer(**t) self._store_WeatherDataContainer(wdc, day) # ─── 农事管理构造 ─────────────────────────────────────────────────────────── def build_agromanagement(crop_no: int, province: str, year: int): meta = CROP_META[crop_no] crop_name = meta["en"] cal = CROP_CALENDAR_CN[meta["name"]][province] sm, sd = cal["start"] em, ed = cal["end"] start_date = datetime.date(year, sm, sd) end_date = datetime.date(year, em, ed) if end_date < start_date: end_date = datetime.date(year + 1, em, ed) campaign_start = datetime.date(year, 1, 1) else: campaign_start = datetime.date(year, 1, 1) return [{ campaign_start: { "CropCalendar": { "crop_name": crop_name, "variety_name": f"{crop_name}_{province}_{year}", "crop_start_date": start_date, "crop_start_type": "emergence", "crop_end_date": end_date, "crop_end_type": "earliest", "max_duration": 365, }, "TimedEvents": None, "StateEvents": None, } }] # ─── 公开接口 ─────────────────────────────────────────────────────────────── def list_available_provinces() -> list[str]: return list(PROVINCES.keys()) def list_available_crops(province: str) -> list[dict]: return [{"crop_no": c, **CROP_META[c]} for c in PROVINCE_CROPS.get(province, [])] def run_wofost( province: str, crop_no: int, year: int, mode: Literal["pp", "wlp"] = "wlp", ) -> dict: agromanagement = build_agromanagement(crop_no, province, year) # 确定气象起止日期(覆盖整个生育期 + 前后缓冲) campaign = agromanagement[0] campaign_date = list(campaign.keys())[0] cc = campaign[campaign_date]["CropCalendar"] start = campaign_date - datetime.timedelta(days=30) end = cc["crop_end_date"] + datetime.timedelta(days=30) seed = year * 1000 + hash(province) % 100000 wdp = SyntheticWeatherDataProvider(province, start, end, seed) conn = get_db_conn() try: # 作物/土壤参数复用 PCSE Demo DB(生理参数通用) cropd = fetch_cropdata(conn, 31031, 2000, crop_no) soild = fetch_soildata(conn, 31031) parvalues = ParameterProvider(sitedata=SITEDATA, soildata=soild, cropdata=cropd) finally: conn.close() if mode == "pp": wofsim = Wofost72_PP(parvalues, wdp, agromanagement) else: wofsim = Wofost72_WLP_CWB(parvalues, wdp, agromanagement) wofsim.run_till_terminate() output = wofsim.get_output() df = pd.DataFrame(output) if not df.empty and "day" in df.columns: df = df.set_index("day") if not df.empty: last = df.iloc[-1] max_lai = df["LAI"].max() if "LAI" in df.columns else None tagp = last.get("TAGP") twso = last.get("TWSO") sm_end = last.get("SM") dvs_end = last.get("DVS") duration = len(df) else: max_lai = tagp = twso = sm_end = dvs_end = duration = None milestones = {} crop_start = cc["crop_start_date"] crop_end = cc["crop_end_date"] if not df.empty and "DVS" in df.columns: dvs_series = df["DVS"] flowering = dvs_series[dvs_series >= 1.0] if not flowering.empty: milestones["flowering"] = flowering.index[0] maturity = dvs_series[dvs_series >= 2.0] if not maturity.empty: milestones["maturity"] = maturity.index[0] milestones["start"] = crop_start milestones["end"] = df.index[-1] if df.index[-1] <= crop_end else crop_end actual_duration = (milestones.get("end", crop_end) - crop_start).days + 1 if milestones else None meta = CROP_META.get(crop_no, {"name": "未知作物", "emoji": "🌱", "color": "#888"}) return { "meta": { "province": province, "crop_no": crop_no, "year": year, "mode": mode, **meta, "mode_label": MODE_LABELS.get(mode, mode), }, "df": df, "summary": { "tagp": tagp, "twso": twso, "max_lai": max_lai, "sm_end": sm_end, "dvs_end": dvs_end, "duration": actual_duration, "milestones": milestones, }, } def run_multi_crop( province: str, year: int, mode: Literal["pp", "wlp"] = "wlp", ) -> list[dict]: crops = list_available_crops(province) results = [] for c in crops: try: res = run_wofost(province=province, crop_no=c["crop_no"], year=year, mode=mode) results.append({ "crop_no": c["crop_no"], "name": c["name"], "emoji": c["emoji"], "color": c["color"], "tagp": res["summary"]["tagp"], "twso": res["summary"]["twso"], "max_lai": res["summary"]["max_lai"], "duration": res["summary"]["duration"], }) except Exception: continue results.sort(key=lambda x: (x["twso"] if x["twso"] is not None else -1), reverse=True) return results def run_multi_year( province: str, crop_no: int, mode: Literal["pp", "wlp"] = "wlp", years: list[int] | None = None, ) -> list[dict]: if years is None: years = list(range(2019, 2024)) results = [] for year in years: try: res = run_wofost(province=province, crop_no=crop_no, year=year, mode=mode) results.append({ "year": year, "tagp": res["summary"]["tagp"], "twso": res["summary"]["twso"], "max_lai": res["summary"]["max_lai"], "duration": res["summary"]["duration"], }) except Exception: continue return results