Files
soil_water_q12/main.py

1597 lines
58 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

### Y/M/D 2025/02/28_Beta_hqw
### This function is to calculate Si module of soil water balance. # 根系吸水量。
import datetime
import json
import math
from dataclasses import dataclass
from typing import Any, Dict, List, Tuple
import numpy as np
import pandas as pd
import uvicorn
from fastapi import FastAPI, File, HTTPException, UploadFile
from pydantic import BaseModel
class DateUtils:
@staticmethod
def cal_Doy(date: str) -> int:
"""计算日序"""
try:
month, day, year = map(int, date.split('/'))
date_obj = datetime.date(year, month, day)
return int(date_obj.strftime('%j'))
except (ValueError, IndexError):
raise ValueError(f"日期格式错误: {date}")
@staticmethod
def cal_DayLength(date: str, latitude: float) -> float:
"""计算日长"""
day_of_year = DateUtils.cal_Doy(date)
RAD = math.pi / 180.0
DEC = -math.asin(math.sin(23.45 * RAD) * math.cos(2.0 * math.pi * (day_of_year + 10.0) / 365.0))
dSINLD = math.sin(RAD * latitude) * math.sin(DEC)
dCOSLD = math.cos(RAD * latitude) * math.cos(DEC)
dCOSLD = dCOSLD if dCOSLD != 0.0 else 0.001
SSCC = dSINLD / dCOSLD
return 12.0 * (1.0 + 2.0 * math.asin(SSCC) / math.pi)
class TemperatureUtils:
@staticmethod
def get_hourly_temps(min_temperature: float, max_temperature: float) -> list:
"""获取逐时温度"""
if min_temperature > max_temperature:
raise ValueError("温度参数错误")
return [(max_temperature + min_temperature)/2 +
(max_temperature - min_temperature)*math.cos(0.2618*(i-14))/2
for i in range(1, 25)]
@staticmethod
def get_day_temps(min_temperature: float, max_temperature: float) -> float:
"""获取白天平均温度"""
if min_temperature > max_temperature:
raise ValueError("温度参数错误")
else:
T5 = []
for i in range(1, 9):
# print(i)
if i == 4 or i == 5 or i == 6:
continue
else:
Tfac = 0.9310 + 0.1140 * i - 0.0703 * i ** 2 + 0.0053 * i ** 3
# print(format(min_temperature + Tfac * (max_temperature - min_temperature), '.4f'))
T5.append(format(min_temperature + Tfac * (max_temperature - min_temperature), '.4f'))
T5 = [float(x) for x in T5]
average_Tday = np.mean(T5)
return average_Tday
@staticmethod
def dailyGDD(Tmax: float, Tmin: float, Tb: float, Tm: float) -> float:
"""计算日有效积温"""
Tmean = (Tmax + Tmin) / 2
Tmean = max(min(Tmean, Tm), Tb)
return Tmean - Tb
class VernalizationCalculator:
@staticmethod
def cal_DVE(PVT: float, VD: float, min_temp: float, max_temp: float) -> float:
"""春化效应计算"""
T24 = TemperatureUtils.get_hourly_temps(min_temp, max_temp)
Tbv = -PVT / 12
Tol = 5 - PVT / 12
Tou = 10 - PVT / 12
Tmv = 20 - PVT / 6
vef = 1 / (2 - 0.0167 * PVT)
sum_RVE = sum_DDVE = 0
for Ti in T24:
# 春化效应计算
if Ti <= Tbv or Ti >= Tmv:
RVEi = 0
elif Tbv < Ti < Tol:
RVEi = math.sin((Ti-Tbv)/(Tol-Tbv)*math.pi/2)**0.5
elif Tol <= Ti <= Tou:
RVEi = 1
else:
RVEi = math.sin((Ti-Tou)/(Tmv-Tou)*math.pi/2)**vef
sum_RVE += RVEi
# 反春化效应计算
sum_DDVE += 0.5*(Ti-27) if Ti > 27 else 0
DVE = sum_RVE / 24
DDVE = sum_DDVE / 24
if VD < 0.3*PVT:
return DVE - DDVE
elif 0.3*PVT <= VD <= PVT:
return DVE
return 0
class PhotoperiodCalculator:
@staticmethod
def cal_DPE(date: str, latitude: float, P: float, PS: float) -> float:
"""光周期效应计算"""
DL = DateUtils.cal_DayLength(date, latitude)
return max(0, 2/(1 + math.exp(PS*(P - DL))))
class StressCalculator:
def __init__(self, plant_params: dict):
self.plant_params = plant_params
def _init_parameters(self):
"""参数初始化"""
# 物候阶段阈值
self.PDT_EM = self.plant_params['1.1']
self.PDT_BT = self.plant_params['1.5']
self.PDT_SF = self.plant_params['2.2']
self.PDT_EF = self.plant_params['2.4']
self.PDT_MT = self.plant_params['3.0']
@staticmethod
def temperature_stress(Tmin: float, Tmax: float) -> float:
"""温度胁迫"""
Tmn = 0
Tpl = 20
Tpu = 25
Tmx = 35
Tmean = (Tmax + Tmin) / 2
if Tmn <= Tmean <= Tpl:
TE = math.sin((Tmean - Tmn) / (Tpl - Tmn) * math.pi / 2)
elif Tpl < Tmean <= Tpu:
TE = 1
elif Tpu < Tmean <= Tmx:
TE = math.sin((Tmx - Tmean) / (Tmx - Tpu) * math.pi / 2)
else:
TE = 0
return TE
def age_stress(self, current_pdt: float):
"""生理年龄胁迫"""
# 计算生理年龄胁迫因子
if current_pdt <= self.PDT_SF:
FAL = 1.0
else:
FAL = 1 - 0.013 * (current_pdt - self.PDT_SF)
if current_pdt <= self.PDT_EF:
FAP = 1.0
else:
FAP = 1 - 1 / 48 * (current_pdt - self.PDT_EF)
return FAL, FAP
@staticmethod
def co2_stress(cx: float) -> float:
"""二氧化碳浓度胁迫
:param Cx: 实际CO2浓度ppm
:return:
FCO2, CO2影响因子
"""
FCO2 = 1 + 0.8 * math.log(cx / 340)
return FCO2
@staticmethod
def NNI_stress(TOPWT: float, NA: float) -> float:
"""N浓度胁迫
:param TOPWT: 地上部干物质量kg/ha
:param NA: 地上部组织中的实际氮浓度, %
:return:
NNI, 氮素营养指数
"""
DM = TOPWT / 1000 # 地上部干重单位换算成 t/ha
if DM >= 0.88:
NC = 4.48 * DM ** (-0.25)
else:
NC = 4.63
NNI = NA / NC
return NNI
@staticmethod
def WDF_stress(ETcadj: float, ETo: float) -> float:
"""
:param ETcadj: 实际蒸散量mm
:param ETo: 潜在蒸散量mm
:return:
WDF水分亏缺因子"""
WDF = ETcadj / ETo
return WDF
class FlowerAreaIndex:
def __init__(self, DW_onset_flower: float, FlowerArea: float = 1.99):
"""
花面积指数(FAI)计算模型
:param DW_onset_flower : float - 初花期地上部干物重 (kg hm⁻²)
:param FlowerArea : float, optional - 单花面积 (cm²)默认1.99
"""
# 品种参数
self.FlowerArea = FlowerArea
# 计算开花速率
self.Flowerrate = 41.5 * DW_onset_flower
# 状态变量初始化
self.FlowerNum = 0.0 # 当前花朵总数
self.flower_queue = [] # 花朵开放队列 (剩余积温, 花朵数量)
def _calc_senescence(self, Tmean: float) -> float:
"""计算当日凋谢花朵数"""
senesced = 0.0
remaining_flowers = []
for batch in self.flower_queue:
batch["remaining"] -= Tmean
if batch["remaining"] <= 0:
senesced += batch["amount"]
else:
remaining_flowers.append(batch)
self.flower_queue = remaining_flowers
return senesced
def daily_step(self, Tmean: float) -> float:
"""
执行单日计算
:param Tmean : float - 当日平均温度(℃)
:param date : str, optional - 日期字符串,月/日/年
"""
# 当日新开花朵数
new_flowers = self.Flowerrate * Tmean
# 将新花朵加入追踪队列
self.flower_queue.append({"remaining": 100.0, "amount": new_flowers})
# 计算当日凋谢花朵数
senesced = self._calc_senescence(Tmean)
# 更新花朵总数
self.FlowerNum += new_flowers - senesced
# 计算花面积指数
FAI = self.FlowerNum * self.FlowerArea
return FAI
@dataclass
class GaussianCoefficients:
"""高斯积分系数容器"""
nodes: Tuple[float, ...]
weights: Tuple[float, ...]
class PhotosynthesisSimulator:
# 高斯积分系数
TIME_GAUSS = GaussianCoefficients(
nodes=(0.112702, 0.500000, 0.887298),
weights=(0.277778, 0.444444, 0.277778)
)
LAYER_GAUSS = GaussianCoefficients(
nodes=(0.0469101, 0.2307534, 0.5000000, 0.7691465, 0.9530899),
weights=(0.1184635, 0.2393144, 0.2844444, 0.2393144, 0.1184635)
)
def __init__(self, latitude: float, SH: float, PDT_BT: float, PDT_EF: float):
self.latitude = latitude
self.SH = SH
self.PDT_BT = PDT_BT
self.PDT_EF = PDT_EF
def _cal_PARCAN(self, current_date: datetime) -> float:
"""计算冠层光合有效辐射PARCAN"""
RAD = math.pi / 180.0
Gsc = 1367
# 日地距离系数计算
dr = 1 + 0.033 * math.cos(2 * math.pi * DateUtils.cal_Doy(current_date) / 365)
# 太阳赤纬计算
delta = 0.409 * math.sin(2 * math.pi * DateUtils.cal_Doy(current_date) / 365 - 1.39)
# 太阳时角计算
ws = math.acos(-math.tan(self.latitude * RAD) * math.tan(delta))
# 天文辐射计算
Q = (Gsc / math.pi * dr *
(ws * math.sin(self.latitude * RAD) * math.sin(delta) +
math.sin(ws) * math.cos(self.latitude * RAD) * math.cos(delta)))
# 日长计算
DL = DateUtils.cal_DayLength(current_date, self.latitude)
# PAR计算
PARCAN = (0.23 + 0.48 * self.SH / DL) * Q
return PARCAN
def _cal_DTGA(self, current_date: datetime, GAI: float, PAR: float,
kx: float, px: float, PLMX: float) -> float:
"""
计算每日光合同化物 (DTGA)
:param current_date: 当前日期
:param GAI: 绿色面积指数 (叶/角果面积指数)
:param PAR: 光合有效辐射
:param kx: 消光系数
:param px: 反射率
:param PLMX: 最大光合速率
"""
RAD = math.pi / 180.0
# 计算太阳赤纬
delta = 0.409 * math.sin(2 * math.pi * DateUtils.cal_Doy(current_date) / 365 - 1.39)
# 太阳位置参数
SSIN = math.sin(self.latitude * RAD) * math.sin(delta)
CCOS = math.cos(self.latitude * RAD) * math.cos(delta)
# 日长计算
DL = DateUtils.cal_DayLength(current_date, self.latitude)
total_assimilation = 0.0
# 时间维度高斯积分(每日时角)
for t_node, t_weight in zip(self.TIME_GAUSS.nodes, self.TIME_GAUSS.weights):
# 将积分点转换为实际时间
hour = 12 + 0.5 * DL * t_node
solar_angle = 2 * math.pi * (hour + 12) / 24
# 计算瞬时太阳高度角正弦
sin_beta = SSIN + CCOS * math.cos(solar_angle)
if sin_beta < 0:
continue # 夜间时段
# 冠层维度高斯积分(垂直分布)
layer_integral = 0.0
for l_node, l_weight in zip(self.LAYER_GAUSS.nodes, self.LAYER_GAUSS.weights):
cumulative_gai = GAI * l_node
# 光强衰减计算(比尔-朗伯定律)
light_intensity = (1 - px) * PAR * 3 * sin_beta * math.exp(-kx * cumulative_gai) # PAR偏低加上系数
# 光合响应模型(非直角双曲线)
photosynthesis = PLMX * (1 - math.exp(-0.45 * light_intensity / PLMX))
layer_integral += photosynthesis * l_weight
# 积分结果累加
total_assimilation += layer_integral * t_weight
DTGA = total_assimilation * DL
return DTGA
def Respiratory(self, FDTGA: float, Tmean: float, Tday: float) -> tuple:
"""呼吸作用计算"""
RmTo = 0.15
Q10 = 2
To = 25
RM = RmTo * FDTGA * Q10 ** ((Tmean - To) / 10)
RG = 0.39 * FDTGA
RpTo = 0.33
RP = FDTGA * RpTo * Q10 ** ((Tday - To) / 10)
return (RM, RG, RP)
def _cal_distribution(self, current_pdt: float, biomass: float) -> tuple:
"""干物质分配计算"""
if current_pdt < self.PDT_BT:
PIR = (0.18 - 0.0386) * math.sin(current_pdt / self.PDT_BT * math.pi / 2) + 0.0386
PIFP = 0
PIF = 0
PIL = 0.8 if current_pdt == 0 else -0.0600 * math.log(current_pdt / self.PDT_BT) + 0.22
else:
PIR = -0.0011 * (current_pdt - self.PDT_BT) + 0.18
PIFP = 0.54 * 0.1202 * (math.exp(0.0439 * (current_pdt - self.PDT_BT)) - 1)
PIL = -0.1454 * math.log(current_pdt / self.PDT_BT) + 0.22
PIF = 0.05 * math.exp(-(current_pdt - self.PDT_EF) ** 2 / (9.486 ** 2)) if current_pdt <= self.PDT_EF else 0
PIS = 1 - PIR
PIP = PIFP - PIF
PIST = PIS - PIL - PIFP
ROOTWT = biomass * PIR
TOPWT = biomass * PIS
WLV = TOPWT * PIL
WST = TOPWT * PIST
WAPV = TOPWT * PIP
WFL = TOPWT * PIF
return (ROOTWT, TOPWT, WLV, WAPV, WFL, WST)
class BiomassModel:
def __init__(self, start: str, end: str, weather: pd.DataFrame, cultivar_params: dict,
management: dict, plant_params: dict, soil_params: dict):
self.start = start
self.end = end
self.weather = weather
self.cultivar_params = cultivar_params
self.management = management
self.plant_params = plant_params
self.soil_params = soil_params
self._parse_dates()
self._init_parameters()
def _parse_dates(self):
"""日期解析"""
def parse_date(date_str):
return datetime.date(*map(int, (date_str.split('/')[-1],
date_str.split('/')[0],
date_str.split('/')[1])))
self.sow_date = parse_date(self.management['sowdate'])
self.start_date = parse_date(self.start)
self.end_date = parse_date(self.end)
self.latitude = self.management['latitude']
self.seed_weight = self.management['seed_weight']
def _init_parameters(self):
"""参数初始化"""
self.P = self.cultivar_params['P']
self.PS = self.cultivar_params['PS']
self.TS = self.cultivar_params['TS']
self.PVT = self.cultivar_params['PVT']
self.FDF = self.cultivar_params['FDF']
self.PLMXp = self.cultivar_params['PLMXp']
self.PLMXl = self.cultivar_params['PLMXl']
self.SLA = self.cultivar_params['SLA']
self.SPA = self.cultivar_params['SPA']
self.pp = self.plant_params['pp']
self.pl = self.plant_params['pl']
self.kp = self.plant_params['kp']
self.kl = self.plant_params['kl']
self.RTGR = self.plant_params['RTGR']
self.RFAC1 = self.plant_params['RFAC1']
# 物候阶段阈值
self.GDD_emergence = self.plant_params.get('GDD_emergence', 120) # 默认120℃·d
self.emergence_calculated = False
self.PDT_EM = self.plant_params['1.1']
self.PDT_BT = self.plant_params['1.5']
self.PDT_SF = self.plant_params['2.2']
self.PDT_EF = self.plant_params['2.4']
self.PDT_MT = self.plant_params['3.0']
# 温度参数
self.Tb = self.cultivar_params['Tb']
self.To = self.cultivar_params['To']
self.Tm = self.cultivar_params['Tm']
# 土壤参数
self.SNO3_0 = self.soil_params['SNO3_0']
self.SNH4_0 = self.soil_params['SNH4_0']
self.BD = self.soil_params['BD']
self.DLAYR = self.soil_params['DLAYR']
self.NLAYR = self.soil_params['NLAYR']
self.DS = self.soil_params['DS']
self.DUL = self.soil_params['DUL']
self.SAT = self.soil_params['SAT']
self.LL = self.soil_params['LL']
self.PH = self.soil_params['PH']
self.SOILC = self.soil_params['SOILC']
self.FOM = self.soil_params['FOM']
self.FON = self.soil_params['FON']
self.TOTN = self.soil_params['TOTN']
self.SW0 = self.soil_params['SW0']
self.CLAY = self.soil_params['CLAY']
self.CN = self.soil_params['CN']
def _get_temperature_parameters(self, current_pdt: float) -> tuple:
"""获取当前阶段的温度参数"""
if current_pdt <= self.PDT_BT:
return self.Tb[1], self.To[1], self.Tm[1]
elif self.PDT_BT < current_pdt <= self.PDT_SF:
return self.Tb[2], self.To[2], self.Tm[2]
elif self.PDT_SF < current_pdt <= self.PDT_EF:
return self.Tb[3], self.To[3], self.Tm[3]
elif self.PDT_EF < current_pdt <= self.PDT_MT:
return self.Tb[4], self.To[4], self.Tm[4]
return self.Tb[-1], self.To[-1], self.Tm[-1]
def _calculate_daily_effects(self, current_date: str, current_pdt: float, VD: float):
"""计算每日效应值"""
# 光周期效应
DPE = PhotoperiodCalculator.cal_DPE(current_date, self.latitude, self.P, self.PS)
# 温度效应
Tb, To, Tm = self._get_temperature_parameters(current_pdt)
Tmin = self.weather.loc[current_date, 'Tmin']
Tmax = self.weather.loc[current_date, 'Tmax']
DTE = self._calculate_DTE(Tb, To, Tm, Tmin, Tmax)
# 春化效应
DVE = VernalizationCalculator.cal_DVE(self.PVT, VD, Tmin, Tmax)
return DPE, DTE, DVE
def _calculate_DTE(self, Tb: float, To: float, Tm: float, Tmin: float, Tmax: float) -> float:
"""温度效应计算"""
T24 = TemperatureUtils.get_hourly_temps(Tmin, Tmax)
total_TE = 0
for Ti in T24:
if Ti < Tb or Ti > Tm:
TE = 0
elif Tb <= Ti <= To:
TE = (math.sin((Ti-Tb)/(To-Tb)*math.pi/2)) ** self.TS
else:
TE = (math.sin((Tm-Ti)/(Tm-To)*math.pi/2)) ** ((Tm-To)/(To-Tb)*self.TS)
total_TE += TE
return total_TE / 24
def ifEmergence(self) -> str:
"""判断出苗期开始时间"""
GDD = 0
current_date = self.sow_date
while GDD < self.GDD_emergence:
date_str = current_date.strftime("%m/%d/%Y")
try:
Tmax = self.weather.loc[date_str, 'Tmax']
Tmin = self.weather.loc[date_str, 'Tmin']
except KeyError:
current_date += datetime.timedelta(days=1)
continue
# 判断是否出苗
GDD_daily = TemperatureUtils.dailyGDD(Tmax, Tmin,
Tb=self.cultivar_params['Tb'][0],Tm=self.cultivar_params['Tm'][0])
GDD += GDD_daily
# 记录出苗日期
if GDD >= self.GDD_emergence:
self.emergence_date = current_date
self.emergence_days = (current_date - self.sow_date).days
break
current_date += datetime.timedelta(days=1)
# 防止无限循环
if (current_date - self.sow_date).days > 365:
self.emergence_date = None
self.emergence_days = 9999
break
def _cal_pdt(self, current_date: datetime.date) -> float:
"""执行模拟"""
# 阶段1计算出苗期
self.ifEmergence()
# 阶段2出苗后发育阶段
VD = 0 # 累积春化天数
PDT = self.PDT_EM # 从出苗开始发育
# 模拟循环
delta = (current_date - self.emergence_date).days
for day in (self.emergence_date + datetime.timedelta(n) for n in range(delta + 1)):
date_str = day.strftime("%m/%d/%Y")
# 计算每日效应
DPE, DTE, DVE = self._calculate_daily_effects(date_str, PDT, VD)
VD += DVE
# 计算物候发育进度
if PDT <= self.PDT_BT:
DPDT = DTE * (DPE * min(VD / self.PVT, 1) if self.PVT else DPE)
else:
DPDT = DTE * self.FDF if PDT > self.PDT_SF else DTE * DPE
DPDT = max(0, DPDT)
PDT += DPDT
if PDT > self.PDT_MT:
break
return PDT
def daily_DRTDEP(self, current_date: str, current_pdt: float, wdf):
"""计算每日增加的根系深度"""
Tb, To, Tm = self._get_temperature_parameters(current_pdt)
Tmin = self.weather.loc[current_date, 'Tmin']
Tmax = self.weather.loc[current_date, 'Tmax']
GDD = TemperatureUtils.dailyGDD(Tmax, Tmin, Tb, Tm)
DRTDEP = self.RTGR * GDD * wdf # DRTDEP(cm)
return DRTDEP
def RLV(self, RTDEP: float, rootwt: float):
"""
计算单位体积土壤中根系的总长度,单位为 cm/cm3
:param RTDEP: 根深,单位为 cm
:param rootwt: 地下部生物量(干重),单位为 kg 干物质/ha/d
:return:
RLV: Root Length Volume, 指单位体积土壤中根系的总长度,单位为 cm/cm3
"""
CUMDEP = 0
RLV_list = [] # cm/cm3
for L in range(self.NLAYR):
DEP = min(RTDEP - CUMDEP, self.DLAYR[L])
RLINIT = rootwt / 10000 / 10000 * 1000 * self.RFAC1 * DEP / (RTDEP)
CUMDEP += DEP
RLV = round(RLINIT / self.DLAYR[L], 3)
RLV_list.append(RLV)
if CUMDEP > RTDEP:
break
return RLV_list
def biomass(self, day: datetime.date, Biomass0: float, TE: int = 0, FA: int = 0, FCO2: int = 0, NNI: int = 0,
WDF: int = 0) -> tuple:
"""
执行模拟
:param TE: 温度胁迫, 0 - 无胁迫默认1 - 有胁迫
:param FA: 生长年龄胁迫, 0 - 无胁迫默认1 - 有胁迫
:param FCO2: CO2胁迫 0 - 无胁迫默认1 - 有胁迫
:param NNI: 氮素胁迫, 0 - 无胁迫默认1 - 有胁迫
:param WDF: 水分胁迫, 0 - 无胁迫默认1 - 有胁迫
"""
# 计算出苗期,从出苗后开始发育
self.ifEmergence()
date_str = day.strftime("%m/%d/%Y")
try:
Tmax = self.weather.loc[date_str, 'Tmax']
Tmin = self.weather.loc[date_str, 'Tmin']
Tmean = (Tmax + Tmin) / 2
SH = self.weather.loc[date_str, 'SH']
except KeyError:
print(f"气象数据中缺少 {date_str} 的记录,跳过该天。")
PDT = self._cal_pdt(day)
# 开始计算
photo = PhotosynthesisSimulator(self.latitude, SH, self.PDT_BT, self.PDT_EF)
# 地上部、地下部、叶片、角果、花、茎生物量
ROOTWT, TOPWT, WLV, WAPV, WFL, WST = photo._cal_distribution(PDT, Biomass0)
# 胁迫判断
stess = StressCalculator(self.plant_params)
if FA == 0:
FAL, FAP = 1.0, 1.0
else:
FAL, FAP = stess.age_stress(PDT)
if TE == 0:
tem_stress = 1.0
else:
tem_stress = StressCalculator.temperature_stress(Tmin, Tmax)
if FCO2 == 0:
co2 = 1.0
else:
co2 = StressCalculator.co2_stress(cx=340)
if WDF == 0:
wdf = 1.0
# else:
# wdf = StressCalculator.WDF_stress(ETcadj, ETo)
if NNI == 0:
nni = 1.0
# else:
# nni = StressCalculator.NNI_stress(TOPWT, NA)
PLMXL = self.PLMXl * co2 * FAL * tem_stress * min(wdf, nni)
PLMXP = self.PLMXp * co2 * FAP * tem_stress * min(wdf, nni)
# 花面积指数
startflowering = False
if not startflowering and PDT >= self.PDT_SF:
OnesetFlower = WFL
startflowering = True
if self.PDT_SF <= PDT <= self.PDT_EF:
flowermodel = FlowerAreaIndex(DW_onset_flower=OnesetFlower)
FAI = flowermodel.daily_step(Tmean)
else:
FAI = 0
# 叶面积指数
if PDT <= self.PDT_BT:
SLAv = self.SLA
else:
SLAv = self.SLA * (-0.0009 * (PDT - self.PDT_BT) ** 2 + 0.0683 * (PDT - self.PDT_BT) + 1.177)
LAI = SLAv * WLV
# 角果面积指数
if PDT >= self.PDT_SF:
SPAv = self.SPA * (0.00017 * (PDT - self.PDT_SF) ** 2 - 0.05554 * (PDT - self.PDT_SF) + 2.91887)
PAI = SPAv * WAPV
else:
PAI = 0
# 计算光合同化物
if PDT < self.PDT_SF:
Tp = photo._cal_PARCAN(date_str) # 无角果遮挡
else:
Tp = (1 - self.pp) * photo._cal_PARCAN(date_str) * math.exp(-self.kp * PAI) # 有角果遮挡
Tl = (1 - self.pl) * Tp * math.exp(-self.kl * LAI) # radiation passing through the leaves
Rl = 0.05 * Tp # leaf reflection
Rs = 0.2 * Tl # soil reflection
if PDT >= self.PDT_SF:
PARP = (1 - self.pp) * photo._cal_PARCAN(date_str) + Rl
else:
PARP = 0
PARL = (1 - self.pl) * Tp + Rs
DTGAp = photo._cal_DTGA(date_str, PAI, PARP, self.kp, self.pp, PLMXP)
DTGAl = photo._cal_DTGA(date_str, LAI, PARL, self.kl, self.pl, PLMXL)
# print('DTGAp, DTGAl: ', DTGAp, DTGAl)
FDTGA = (DTGAl + DTGAp) * min(wdf, nni)
# print('FDTGA: ', FDTGA)
# RmTo = 0.03*WLV+0.015*WST+0.015*ROOTWT+0.01*WAPV
# print('RmTo: ', RmTo)
Tday = TemperatureUtils.get_day_temps(Tmin, Tmax)
RM, RG, RP = photo.Respiratory(FDTGA, Tmean, Tday)
# print('RM, RG, RP: ', RM, RG, RP)
PND = FDTGA - RM - RG
# print('PND: ', PND)
TDRW = 30 / 44 * 0.95 * PND / (1 - 0.05)
TBiomass = TDRW + Biomass0
return TBiomass, LAI, TOPWT, ROOTWT
class SoilWaterModel:
def __init__(self, start: str,
end: str,
weather: pd.DataFrame,
cultivar_params: dict,
plant_params: dict,
soil_params: dict,
management: dict,
n_params: dict):
self.start = start
self.end = end
self.weather = weather
self.cultivar_params = cultivar_params
self.plant_params = plant_params
self.soil_params = soil_params
self.management = management
self.n_params = n_params
self._parse_dates()
self._init_parameters()
def _parse_dates(self):
"""日期解析"""
def parse_date(date_str):
return datetime.date(*map(int, (date_str.split('/')[-1],
date_str.split('/')[0],
date_str.split('/')[1])))
self.sow_date = parse_date(self.management['sowdate'])
self.start_date = parse_date(self.start)
self.end_date = parse_date(self.end)
def _init_parameters(self):
"""参数初始化"""
# 管理措施参数
self.latitude = self.management['latitude']
self.irrigation = self.management['irrigation']
self.elevation = self.management['elevation']
self.seed_weight = self.management['seed_weight']
# 作物参数
self.GDD_emergence = self.plant_params.get('GDD_emergence', 120) # 默认120℃·d
self.emergence_calculated = False
self.PDT_EM = self.plant_params['1.1']
self.PDT_BT = self.plant_params['1.5']
self.PDT_SF = self.plant_params['2.2']
self.PDT_EF = self.plant_params['2.4']
self.PDT_MT = self.plant_params['3.0']
self.RTGR = self.plant_params['RTGR']
self.Kcbini = self.plant_params['Kcbini']
self.Kcbmid = self.plant_params['Kcbmid']
self.Kcbend = self.plant_params['Kcbend']
self.h = self.plant_params['h']
self.kl = self.plant_params['kl']
# 品种参数
self.Po = self.cultivar_params['P']
self.PS = self.cultivar_params['PS']
self.TS = self.cultivar_params['TS']
self.PVT = self.cultivar_params['PVT']
self.FDF = self.cultivar_params['FDF']
self.Tb = self.cultivar_params['Tb']
self.To = self.cultivar_params['To']
self.Tm = self.cultivar_params['Tm']
# 土壤参数
self.SNO3_0 = self.soil_params['SNO3_0']
self.SNH4_0 = self.soil_params['SNH4_0']
self.BD = self.soil_params['BD']
self.DLAYR = self.soil_params['DLAYR']
self.NLAYR = self.soil_params['NLAYR']
self.DS = self.soil_params['DS']
self.DUL = self.soil_params['DUL']
self.SAT = self.soil_params['SAT']
self.LL = self.soil_params['LL']
self.PH = self.soil_params['PH']
self.SOILC = self.soil_params['SOILC']
self.FOM = self.soil_params['FOM']
self.FON = self.soil_params['FON']
self.TOTN = self.soil_params['TOTN']
self.SW0 = self.soil_params['SW0']
self.CLAY = self.soil_params['CLAY']
self.CN = self.soil_params['CN']
# 氮相关参数
self.Nd = self.n_params['Nd']
def interception(self, P: float, LAI: float, Fw: float, SW0: list):
"""Interception, 作物叶片截留, 单位为cm3/cm3"""
eta = math.exp(- self.kl * LAI)
f = 0.2 # f means 0.2kg water/kg fresh weight,f为单位鲜重生物量的截留能力0.2kg水/kg 鲜重)
intcap = (1-eta) * f * Fw/10000/10000*1000 # Fw means fresh weight of the crop,its unit is kg/ha. turns to g/cm2 ???? Fw = 地上部鲜重? ,intercap(cm)
t = 1 # 模拟步长
Interception = min(P, intcap/t)
if Interception <= 0: ###补充代码,避免出现截留量<0的情况
# print('interception < 0, please check')
Interception = 0
return Interception, SW0
def runoff(self, P: float, I: float, Interception: float, SW0: list, Flood: bool=False):
"""
计算Runoff, 地表径流量cm
:param Flood: 是否淹水布尔类型默认False
"""
SW1 = [0 for i in range(self.NLAYR)]
if Flood == True:
FldH = 4 # flooding depth, cm 水田淹水深度
Pinf = P + I - Interception
FldH = FldH + Pinf
BundH = 30 # height of ridge, cm 田埂高
if FldH > BundH:
Runoff = FldH - BundH #### SW会改变吗
depth = BundH
if FldH <= BundH:
Runoff = 0
depth = FldH
# SW = []
for L in range(self.NLAYR):
SW1[L] = self.DUL[L]
if Flood == False:
# 土壤储水量 Soil storage
# SMX = 254.0 * (100.0 / CN - 1.0) # (mm)
SMX = 254.0 * (100.0 / self.CN - 1.0) / 10 #(cm)
# 初始吸收比率 Initial abstraction ratio
# 地表径流与土壤地表两层含水量的平均值有关。Runoff is related to the average soil water content of the top two layers of soil
SWABI = 0.15 * ((self.SAT[0] - SW0[0]) / (self.SAT[0] - self.LL[0] * 0.5) + (
self.SAT[1] - SW0[1]) / (self.SAT[1] - self.LL[1] * 0.5))
SWABI = max(0.0, SWABI)
# 暂不考虑土壤表层的覆盖物
IABS = SWABI
# WATAVL, Water available for infiltration or runoff (rainfall plus irrigation) (cm/d)
# 可用于入渗或径流的水量(= 降雨量+灌溉量单位为cm/d与降雨和灌溉的单位一致
WATAVL = P + I
# PB, Determines threshold amount of rainfall that will occur before runoff starts (mm/d)
# 确定在径流开始之前,将发生的降雨或灌溉阈值(毫米/天)
PB = WATAVL - IABS * SMX
if WATAVL > 0.001:
if PB > 0:
Runoff = PB ** 2 / (WATAVL + (1.0 - IABS) * SMX)
else:
Runoff = 0.0
else:
Runoff = 0.0
for L in range(self.NLAYR):
# print(L)
if L <= 1:
SW1[L] = self.DUL[L] # 土壤前两层为饱和含水量
else:
SW1[L] = self.SW0[L]
return Runoff, SW1
def pinf(self, P: float, I: float, Interception: float, SW1: list):
"""降水或灌溉的入渗cm"""
SWSAT =[]
# SWFC = DUL # SWFC means field water capacity,the same as DUL.
Ksw = []
Pinf = [0 for i in range(self.NLAYR)]
SW2 = [0 for i in range(self.NLAYR)]
# Interception = Soil_Interception.Interception(WEATHER, t, PDT, HI, TBiomass, SLA)
sum_Pinf = [0 for i in range(self.NLAYR)]
sum_Pinf[0] = P + I - Interception
# print('sum_Pinf = ',sum_Pinf)
for L in range(self.NLAYR):
SWSAT.append(1 - self.BD[L] / 2.65) #SWSAT == SAT
# SWSAT = SAT
Ksw.append(75 * ((SWSAT[L] - self.DUL[L]) / self.DUL[L]) ** 2) #Its unit is cm/h, KSAT means Soil Saturated Hydraulic Conductivity. the same as Ksw.饱和导水率
SW2[L] = (sum_Pinf[L]/self.DLAYR[L] + SW1[L])
if SW2[L] < self.DUL[L]:
Pinf[L] = sum_Pinf[L]
sum_Pinf[L] = 0
continue
if SW2[L] == self.DUL[L]:
if L == 0:
Pinf[L] = sum_Pinf[L]
if L > 0:
sum_Pinf[L] = sum_Pinf[L - 1]
continue
if SW2[L] > self.DUL[L]:
Pinf[L] = (self.DUL[L]-SW1[L])*self.DLAYR[L]
SW2[L] = self.DUL[L]
sum_Pinf[L] = sum_Pinf[L] - Pinf[L] #sum_Pinf 表示到下一层的入渗量
if SW2[L] <= self.LL[L]: ###补充代码农田的含水量始终高于萎蔫含水量避免出现含水量为0的情况
SW2[L] == self.LL[L]
print('SW2 < 0, please check')
return SW2
def capillary_rise(self, SW2: list, water_table_presence: bool=1):
"""
计算地下水的毛细管上升量
:param water_table_presence: 是否存在地下水01
:return: 地下水位对毛细管的补给量
"""
Soil_fshape_cr = 16.0 # 毛细管上升形状系数,用于控制毛细管上升水量的比例
FluxOut = [0.04, 0.03, 0.02, 0.01] # 从每层流出量
z_gw = 25
Init_water_Adj = [0, 0, 0, 0]
zMid_1 = np.cumsum(self.DLAYR)
zMid_2 = [item / 2 for item in self.DLAYR]
zMid = np.subtract(zMid_1, zMid_2)
# 计算每一层的饱和导水率 (mm/day) 饱和导水率
Ksat = []
for ii in range(len(self.SAT)):
lmbda = 1 / ((np.log(1500) - np.log(33)) / (np.log(self.DUL[ii]) - np.log(self.LL[ii])))
K1 = (1930 * (self.SAT[ii] - self.DUL[ii]) ** (3 - lmbda)) * 24
Ksat.append(K1)
# print(Ksat)
# 模块输入系数总体上bCR、aCR,需要的系数
bCR = np.zeros(self.NLAYR)
aCR = np.zeros(self.NLAYR)
for jj in range(0, self.NLAYR):
if (
(self.LL[jj] >= 0.04)
and (self.LL[jj] <= 0.15)
and (self.DUL[jj] >= 0.09)
and (self.DUL[jj] <= 0.28)
and (self.SAT[jj] >= 0.32)
and (self.SAT[jj] <= 0.51)
):
# Sandy soil class
if (Ksat[jj] >= 200) and (Ksat[jj] <= 2000):
aCR[jj] = -0.3112 - (Ksat[jj] * (1e-5))
bCR[jj] = -1.4936 + (0.2416 * np.log(Ksat[jj]))
elif Ksat[jj] < 200:
aCR[jj] = -0.3112 - (200 * (1e-5))
bCR[jj] = -1.4936 + (0.2416 * np.log(200))
elif Ksat[jj] > 2000:
aCR[jj] = -0.3112 - (2000 * (1e-5))
bCR[jj] = -1.4936 + (0.2416 * np.log(2000))
elif (
(self.LL[jj] >= 0.06)
and (self.LL[jj] <= 0.20)
and (self.DUL[jj] >= 0.23)
and (self.DUL[jj] <= 0.42)
and (self.SAT[jj] >= 0.42)
and (self.SAT[jj] <= 0.55)
):
# Loamy soil class
if (Ksat[jj] >= 100) and (Ksat[jj] <= 750):
aCR[jj] = -0.4986 + (9 * (1e-5) * Ksat[jj])
bCR[jj] = -2.132 + (0.4778 * np.log(Ksat[jj]))
elif Ksat[jj] < 100:
aCR[jj] = -0.4986 + (9 * (1e-5) * 100)
bCR[jj] = -2.132 + (0.4778 * np.log(100))
elif Ksat[jj] > 750:
aCR[jj] = -0.4986 + (9 * (1e-5) * 750)
bCR[jj] = -2.132 + (0.4778 * np.log(750))
elif (
(self.LL[jj] >= 0.16)
and (self.LL[jj] <= 0.34)
and (self.DUL[jj] >= 0.25)
and (self.DUL[jj] <= 0.45)
and (self.SAT[jj] >= 0.40)
and (self.SAT[jj] <= 0.53)
):
# Sandy clayey soil class
if (Ksat[jj] >= 5) and (Ksat[jj] <= 150):
aCR[jj] = -0.5677 - (4 * (1e-5) * Ksat[jj])
bCR[jj] = -3.7189 + (0.5922 * np.log(Ksat[jj]))
elif Ksat[jj] < 5:
aCR[jj] = -0.5677 - (4 * (1e-5) * 5)
bCR[jj] = -3.7189 + (0.5922 * np.log(5))
elif Ksat[jj] > 150:
aCR[jj] = -0.5677 - (4 * (1e-5) * 150)
bCR[jj] = -3.7189 + (0.5922 * np.log(150))
elif (
(self.LL[jj] >= 0.20)
and (self.LL[jj] <= 0.42)
and (self.DUL[jj] >= 0.40)
and (self.DUL[jj] <= 0.58)
and (self.SAT[jj] >= 0.49)
and (self.SAT[jj] <= 0.58)
):
# Silty clayey soil class
if (Ksat[jj] >= 1) and (Ksat[jj] <= 150):
# print(aCR)
aCR[jj] = -0.6366 + (8 * (1e-4) * Ksat[jj])
bCR[jj] = -1.9165 + (0.7063 * np.log(Ksat[jj]))
elif Ksat[jj] < 1:
aCR[jj] = -0.6366 + (8 * (1e-4) * 1)
bCR[jj] = -1.9165 + (0.7063 * np.log(1))
elif Ksat[jj] > 150:
aCR[jj] = -0.6366 + (8 * (1e-4) * 150)
bCR[jj] = -1.9165 + (0.7063 * np.log(150))
for i in range(len(aCR)):
aCR[i] = max(aCR[i], 0)
bCR[i] = max(bCR[i], 0)
# print(aCR, bCR)
## Get groundwater table elevation on current day ##
z_gw = z_gw
## Calculate capillary rise ##
if water_table_presence == 0: # No water table present
# Capillary rise is zero
CrTot = 0
elif water_table_presence == 1: # Water table present
# Get maximum capillary rise for bottom compartment
zBot = zMid_1[-1]
zBotMid = zMid[-1]
if (Ksat[-1] > 0) and (z_gw > 0) and ((z_gw - zBotMid) < 4):
if zBotMid >= z_gw:
MaxCR = 99
else:
MaxCR = np.exp((np.log(z_gw - zBotMid) - bCR[-1]) / aCR[-1])
if MaxCR > 99:
MaxCR = 99
else:
MaxCR = 0
compi = self.NLAYR - 1 # 从土层最底部开始
WCr = 0 # 毛细管上升计数器
Init_water = SW2
while (round(MaxCR * 1000) > 0) and (compi > -1) and (round(FluxOut[compi] * 1000) == 0):
# 向上运动,达到土壤表面,或遇到当天已经发生向下排水渗透的隔室,找到当前土层,计算驱动力
if (Init_water[compi] >= self.LL[compi]) and (Soil_fshape_cr > 0):
Df = 1 - (
(
(Init_water[compi] - self.LL[compi])
/ (Init_water_Adj[compi] - self.LL[compi])
)
** Soil_fshape_cr
)
if Df > 1:
Df = 1
elif Df < 0:
Df = 0
else:
Df = 1
# 计算相对水力传导系数
thThr = (self.LL[compi] + self.DUL[compi]) / 2
if Init_water[compi] < thThr:
if (Init_water[compi] <= self.LL[compi]) or (thThr <= self.LL[compi]):
Krel = 0
else:
Krel = (Init_water[compi] - self.LL[compi]) / (thThr - self.LL[compi])
else:
Krel = 1
# 检查是否有空间储存毛细管上升的水
dth = round(Init_water_Adj[compi] - Init_water[compi],4)
# Store water if room is available 如果空间有效的话,存储水量
if (dth > 0) and ((zBot - self.DLAYR[compi] / 2) < z_gw):
dthMax = Krel * Df * MaxCR / (10 * self.DLAYR[compi])
if dth >= dthMax:
Init_water[compi] = Init_water[compi] + dthMax
CRcomp = dthMax * 10 * self.DLAYR[compi]
MaxCR = 0
else:
Init_water[compi] = Init_water_Adj[compi]
CRcomp = dth * 10 * self.DLAYR[compi]
MaxCR = (Krel * MaxCR) - CRcomp
WCr = WCr + CRcomp
# 更新各层底部高度
zBot = zBot - self.DLAYR[compi]
# 更新隔间和层计数器
compi = compi - 1
# 更新毛细管最大上升限制
if compi > -1:
zBotMid = zBot - (self.DLAYR[compi] / 2)
if (Ksat[compi] > 0) and (z_gw > 0) and ((z_gw - zBotMid) < 4):
if zBotMid >= z_gw:
LimCR = 99
else:
LimCR = np.exp((np.log(z_gw - zBotMid) - bCR[compi]) / aCR[compi])
if LimCR > 99:
LimCR = 99
else:
LimCR = 0
if MaxCR > LimCR:
MaxCR = LimCR
# Store total depth of capillary rise 毛细管上升总深度
CrTot = WCr
return CrTot, SW2
def e0(self, T: float):
"""
:param T: 气温,℃
:return:
e: 气温为T时的水汽压kPa
"""
e = 0.6108*(math.exp((17.27 * T / (T + 237.3))))
return e
def cal_Rn(self, doy: int, SH: float, Tmax: float, Tmin: float, RH: float) -> float:
"""作物表面上的净辐射MJ/(m2·day)"""
a = 0.23 # 冠层反射率绿色植被覆盖的地表的反射率约为0.20-0.25。以绿草为参考作物时a=0.23。
Lat = self.latitude * math.pi / 180.0
Z = self.elevation
delta = 0.409 * math.sin(2 * math.pi * doy / 365 - 1.39) # 太阳赤纬
ws = math.acos(-math.tan(Lat*math.pi / 180.0)*math.tan(delta)) # 太阳时角
dr = 1 + 0.033 * math.cos(2 * math.pi * doy / 365) # 日地距离系数
Ra = ((24 * 60 * 0.082) / math.pi * dr *
(ws * math.sin(Lat*math.pi / 180.0) * math.sin(delta) + math.sin(ws) * math.cos(Lat*math.pi / 180.0) * math.cos(delta))) # 天文辐射
Rso = (0.75 + 2 * 10**(-5) * Z) * Ra # 晴天辐射
N = 24 / math.pi * ws
Rs = (0.25 + 0.5 * SH / N) * Ra # 太阳短波辐射
Rns = (1 - a) * Rs # 净短波辐射
es = (self.e0(Tmax) + self.e0(Tmin)) / 2
ea = es * RH / 100
Rnl = (4.903 * 10**(-9) * (((Tmax + 273.16)**4 - (Tmin + 273.16)**4) / 4) *
(0.34 - 0.14 * ea**0.5) * (1.35 * Rs / Rso - 0.35)) # 净长波辐射
Rn = Rns - Rnl
return Rn
def PM_ET0(self, Tmax, Tmin, RH, P, u2, Rn):
"""作物潜在蒸散量mm"""
Tmean = (Tmax + Tmin) / 2
# 土壤热通量
G = 0
# 饱和水汽压
es = (self.e0(Tmax) + self.e0(Tmin)) / 2
# 实际水汽压
ea = es * RH / 100
# 饱和水汽压曲线斜率
delta = 4098 * self.e0(Tmean) / (Tmean + 237.3) ** 2
# 湿度计常数
gama = 0.665 * 0.001 * P
ET0 = (0.408*delta*(Rn-G) + gama*900*u2*(es-ea)/(Tmean+273)) / (delta + gama*(1 + 0.34*u2))
return ET0
def cal_Kcb(self, current_pdt: float, RHmin: float, u2: float) -> float:
"""Kcb作物蒸腾系数"""
PDT_1 = self.PDT_EM
PDT_2 = self.PDT_BT
PDT_3 = self.PDT_EF
PDT_4 = self.PDT_MT
if RHmin != 45 or u2 != 2:
if self.Kcbmid >= 0.45:
kcbmid_new = self.Kcbmid + (0.04*(u2-2)-0.004*(RHmin-45))*(self.h/3)**0.3
else:
kcbmid_new = self.Kcbmid
if self.Kcbend >= 0.45:
kcbend_new = self.Kcbend + (0.04*(u2-2)-0.004*(RHmin-45))*(self.h/3)**0.3
else:
kcbend_new = self.Kcbend
else:
kcbmid_new = self.Kcbmid
kcbend_new = self.Kcbend
if current_pdt <= PDT_1:
Kcb = self.Kcbini
elif PDT_1 < current_pdt <= PDT_2:
Kcb = self.Kcbini + (kcbmid_new - self.Kcbini)/(PDT_2 - PDT_1) * (current_pdt - PDT_1)
elif PDT_2 < current_pdt <= PDT_3:
Kcb = kcbmid_new
elif PDT_3 < current_pdt < PDT_4:
Kcb = self.Kcbmid + (kcbend_new - kcbmid_new)/(PDT_4 - PDT_3) * (current_pdt - PDT_3)
else:
Kcb = kcbend_new
return Kcb
def ETc(self, current_date: str, current_pdt: float, rootdepth: float, Runoff: float, I: float, CrTot: float, De0: float, SW0: list):
"""ETcadj: 胁迫条件下的作物蒸腾量mm"""
Rain = self.weather.loc[current_date, 'RAIN']
Tmin = self.weather.loc[current_date, 'Tmin']
Tmax = self.weather.loc[current_date, 'Tmax']
SH = self.weather.loc[current_date, 'SH']
u2 = self.weather.loc[current_date, 'u2']
RH = self.weather.loc[current_date, 'RHave']
RHmin = self.weather.loc[current_date, 'RHmin']
P = self.weather.loc[current_date, 'Atm']
# 计算ETo
doy = DateUtils.cal_Doy(current_date)
Rn = self.cal_Rn(doy, SH, Tmax, Tmin, RH)
ETo = self.PM_ET0(Tmax, Tmin, RH, P, u2, Rn) # 参考作物蒸散量mm/d
kcb = self.cal_Kcb(current_pdt, RHmin, u2) # 作物蒸腾系数
# print(kcb)
kcmin = 0.15 # 无地表覆盖的干燥土壤最小Kc值约为 0.150.20
kcmax = max((1.2 + (0.04 * (u2 - 2) - 0.004 * (RHmin - 45)) * (self.h / 3) ** 0.3), (kcb + 0.5))
if kcb >= kcmin:
fc = ((kcb - kcmin) / (kcmax - kcmin)) ** (1 + 0.5 * self.h) # 地表遮盖比例
else:
fc = 0
fw = 1.0 # 地表湿润比例降雨、喷灌、漫灌、畦灌条件下为1.0
few = min((1 - fc), fw)
# 计算蒸发减少系数kr
Ze = 0.10 # 由于蒸发而变干的表土层厚度可取0.10~0.15m
TEW = 1000 * (self.DUL[0] - 0.5 * self.LL[0]) * Ze # 总蒸发水量
REW = 8 # 易蒸发水量
if De0 <= REW:
kr = 1
else:
kr = (TEW - De0) / (TEW - REW)
# 计算地表蒸发系数ke和地表蒸发量E
ke = min(kr*(kcmax-kcb), few*kcb)
E = kr * ke * ETo # 地表蒸发量mm
Tew = 0 # 通常可忽略
# 当有强降水或灌溉时De可认为是0
if Rain >= 3 or I > 0:
De = 0
DPe = (Rain - Runoff) + I / fw - De0
SW = SW0
SW[0] = self.DUL[0]
else:
DPe = 0
De = De0 - (Rain - Runoff) - I/fw + E/few + Tew + DPe
if De < 0:
De = 0
elif De > TEW:
De = TEW
SW = SW0
SW[0] = SW0[0] - De / (Ze * 1000)
ETcadj = 0
if rootdepth > 0:
# ======================== 计算 ET =========================
Zr = rootdepth / 10 # 每日有效根深m
# Zr = 0.5
Depth = 0
if Zr > 0.4:
i = 3
Zr = 0.4
else:
i = 0
while i < self.NLAYR:
Depth += self.DLAYR[i] / 100
if Zr > Depth:
i += 1
else:
break
# print(i)
TAW = 1000 * (self.DUL[i] - self.LL[i]) * Zr # 总有效水量mm
pct = 0.5 # 不遭受水分胁迫时,作物从根系层中吸收的有效水量与总有效水量 TAW 之比通常取0.5
Dr0 = 1000 * (self.DUL[i] - SW0[i]) * Zr
if Dr0 > TAW:
Dr0 = TAW
if Dr0 <= pct*TAW:
ks = 1.0
else:
ks = (TAW - Dr0) / ((1 - pct) * TAW)
ETc = (kcb + ke) * ETo
# ETcadj = (ks*kcb + ke) * ETo
ETcadj = (kcb + ke) * ETo
# 当有强降水或灌溉时De可认为是0
if Rain >= 3 or I > 0:
DP = (Rain - Runoff) + I - ETcadj - Dr0
else:
DP = 0
Dr = Dr0 - (Rain - Runoff) - I - CrTot + ETcadj + DP
if Dr > TAW:
Dr = TAW
SW[i] = SW0[i] - Dr / (Zr * 1000)
return E, ETcadj, De, SW
def simulate(self):
"""根系吸水量,单位为 cm3/cm3"""
biomassmodel = BiomassModel(self.start, self.end, self.weather, self.cultivar_params, self.management,
self.plant_params, self.soil_params)
biomassmodel.ifEmergence()
results = {}
delta = (self.end_date - self.start_date).days
SW = self.SW0
De0 = 0
Biomass0 = self.seed_weight / 2
rootdepth = 0
for day in (self.start_date + datetime.timedelta(n) for n in range(delta + 1)):
date_str = day.strftime("%m/%d/%Y")
Biomass0, LAI, Fw, ROOTWT = BiomassModel(self.start, date_str, self.weather, self.cultivar_params, self.management,
self.plant_params, self.soil_params).biomass(day, Biomass0)
irri_date = self.irrigation['date']
if date_str == irri_date:
I = self.irrigation['amount']
else:
I = 0.0
try:
Rain = self.weather.loc[date_str, 'RAIN']
Tmin = self.weather.loc[date_str, 'Tmin']
Tmax = self.weather.loc[date_str, 'Tmax']
SH = self.weather.loc[date_str, 'SH']
u2 = self.weather.loc[date_str, 'u2']
RH = self.weather.loc[date_str, 'RHave']
RHmin = self.weather.loc[date_str, 'RHmin']
# P = self.weather.loc[date_str, 'Atm']
except KeyError:
print(f"气象数据中缺少 {date_str} 的记录,跳过该天。")
continue
if day < biomassmodel.emergence_date:
print("作物还未出苗。")
PDT = 0
Interception = 0
Runoff, SW1 = self.runoff(Rain, I, Interception, SW)
SW2 = self.pinf(Rain, I, Interception, SW1)
CrTot, SW2 = self.capillary_rise(SW2)
E, ETcadj, De, SW3 = self.ETc(date_str, PDT, rootdepth, Runoff, I, CrTot, De0, SW2)
Si = [0 for i in range(self.NLAYR)]
SW4 = [0 for i in range(self.NLAYR)]
for L in range(self.NLAYR):
SW4[L] = min(max(self.LL[L], SW3[L] - Si[L]), self.SAT[L]) ### 土壤水分含量最低为LL
else:
Interception, SW0 = self.interception(Rain, LAI, Fw, SW)
PDT = biomassmodel._cal_pdt(day)
daily_rootdepth = biomassmodel.daily_DRTDEP(date_str, PDT, wdf=1)
rootdepth += daily_rootdepth
# 达到最大有效根深后根系深度不再增加
if rootdepth == 150:
rootdepth = 150
RLV = biomassmodel.RLV(rootdepth, ROOTWT)
Runoff, SW1 = self.runoff(Rain, I, Interception, SW0)
SW2 = self.pinf(Rain, I, Interception, SW1)
CrTot, SW2 = self.capillary_rise(SW2)
E, ETcadj, De, SW3 = self.ETc(date_str, PDT, rootdepth, Runoff, I, CrTot, De0, SW2)
Tp = ETcadj - E # Tp means potential transpiration of plants
Si = []
SW4 = [0 for i in range(self.NLAYR)]
sum_RLV = sum(RLV)
for L in range(self.NLAYR):
ccc = RLV[L] / sum_RLV
Smi = Tp * ccc
WFi = 0.4 # WFi means the effct on Soil water transpiration, and its range is 0-1.
# WFi = 1
# print(min(SW3[L] - self.LL[L], WFi * Smi))
Si.append(max(min(SW3[L] - self.LL[L], WFi * Smi), 0)) ### 作物仅能吸收LL以上的土壤水分
# sum_Ta = sum_Ta + Si
SW4[L] = min(max(self.LL[L], SW3[L] - Si[L]), self.SAT[L]) ### 土壤水分含量最低为LL
results[date_str] = Si
SW = SW4
De0 = De
return results
# Pydantic models for API
class SimulationRequest(BaseModel):
start_date: str
end_date: str
cultivar_params: Dict[str, Any]
plant_params: Dict[str, Any]
soil_params: Dict[str, Any]
management: Dict[str, Any]
n_params: Dict[str, Any]
weather_data: List[Dict[str, Any]] # Weather data as list of dicts
class HealthCheckResponse(BaseModel):
status: str
message: str
# FastAPI app
app = FastAPI(
title="Soil Water Balance Model API",
description="API for calculating soil water balance and root water uptake",
version="1.0.0"
)
@app.get("/health", response_model=HealthCheckResponse)
async def health_check():
"""Health check endpoint"""
return HealthCheckResponse(status="healthy", message="Soil water balance model API is running")
@app.post("/simulate", response_model=Dict[str, Any])
async def simulate_soil_water_balance(request: SimulationRequest):
"""
Run soil water balance simulation
Parameters:
- start_date: Start date in MM/DD/YYYY format
- end_date: End date in MM/DD/YYYY format
- cultivar_params: Cultivar parameters
- plant_params: Plant parameters
- soil_params: Soil parameters
- management: Management parameters
- n_params: Nitrogen parameters
- weather_data: Weather data as list of dictionaries
Returns:
- Dictionary containing simulation results with dates as keys and root water uptake (Si) as values
"""
try:
# Convert weather data list to pandas DataFrame
weather_df = pd.DataFrame(request.weather_data)
if 'date' in weather_df.columns:
weather_df.set_index('date', inplace=True)
weather_df.index = pd.to_datetime(weather_df.index)
# Initialize the model
model = SoilWaterModel(
start=request.start_date,
end=request.end_date,
weather=weather_df,
cultivar_params=request.cultivar_params,
plant_params=request.plant_params,
soil_params=request.soil_params,
management=request.management,
n_params=request.n_params
)
# Run simulation
results = model.simulate()
return {
"status": "success",
"results": results,
"start_date": request.start_date,
"end_date": request.end_date
}
except Exception as e:
raise HTTPException(status_code=500, detail=f"Simulation failed: {str(e)}")
@app.post("/simulate_from_files")
async def simulate_from_files(
start_date: str,
end_date: str,
weather_file: UploadFile = File(...),
management_file: UploadFile = File(...),
soil_params_file: UploadFile = File(...),
n_params_file: UploadFile = File(...),
plant_params_file: UploadFile = File(...),
cultivar_params_file: UploadFile = File(...)
):
"""
Run simulation using uploaded parameter files
Parameters:
- start_date: Start date in MM/DD/YYYY format
- end_date: End date in MM/DD/YYYY format
- weather_file: CSV file with weather data
- management_file: JSON file with management parameters
- soil_params_file: JSON file with soil parameters
- n_params_file: JSON file with nitrogen parameters
- plant_params_file: JSON file with plant parameters
- cultivar_params_file: JSON file with cultivar parameters
Returns:
- Dictionary containing simulation results
"""
try:
# Read uploaded files
weather_data = pd.read_csv(weather_file.file, index_col='date', parse_dates=True)
management = json.load(management_file.file)
soil_params = json.load(soil_params_file.file)
n_params = json.load(n_params_file.file)
plant_params = json.load(plant_params_file.file)
cultivar_params = json.load(cultivar_params_file.file)
# Initialize the model
model = SoilWaterModel(
start=start_date,
end=end_date,
weather=weather_data,
cultivar_params=cultivar_params,
plant_params=plant_params,
soil_params=soil_params,
management=management,
n_params=n_params
)
# Run simulation
results = model.simulate()
return {
"status": "success",
"results": results,
"start_date": start_date,
"end_date": end_date
}
except Exception as e:
raise HTTPException(status_code=500, detail=f"Simulation failed: {str(e)}")
if __name__ == '__main__':
# Run with uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)