常见问题 - 循环中读取数据库、嵌套循环引起的性能问题
金蝶云社区-JohnnyDing
JohnnyDing
6人赞赏了该文章 3,353次浏览 未经作者许可,禁止转载编辑于2016年03月31日 10:13:51

背景说明

K/3 Cloud的代码开发规范,严格禁止在循环中到数据库读取数据,这会引发严重的性能问题:
需在循环外,一次性取回需要的数据。

但对于提前取回的数据,如果没有预先处理,常常需要嵌套一个循环到集合中取数,这也是非常严重的性能问题。

本帖将通过一个案例,编写三套实现方法,演示循环取数,典型的错误方案与推荐方案。

案例说明

需求:
生成销售出库单时,自动检查库存,从有存货的仓库出库。

实现方案:
编写单据转换插件,物料、数量携带完毕后,到数据库取有存货的仓库,填写到仓库字段中;
如果某一个仓库的存货不够,则拆行。
此方案需要逐个(循环)读取物料的库存,随后基于库存实现拣货,有非常大的性能隐患。

特别说明:
此案例,主要用来演示如何合理规避循环取数,与嵌套循环,解决性能问题;
很多业务细节,被略过。
实际实现代码,要比本案例复杂的多,千万不能直接照搬此案例代码,解决实际问题

典型代码解析

这个案例的完整插件代码比较长,我们先略过与本帖主题无关的外围代码,以及拣货逻辑,聚焦在可能发生性能问题的循环代码中。
如下,直接介绍循环的三种实现方式:

错误方案一:循环中读取数据

说明:
在循环中,读取每行物料的即时库存:会引发严重的性能问题,加重数据库压力

错误方案二:嵌套循环取数

说明:
这里嵌套了一个循环搜索符合本物料的库存,循环总次数可能非常惊人
如单据体有1,000行,实际循环次数将超过1,000,000次
如单据体有2,000行,实际循环次数将超过4,000,000次
总循环次数,成指数上升。而且,单据体行数越多,性能压力越明显,甚至有可能1个小时都动不了。

推荐方案:

说明:
本函数最终循环的次数(1000行单据体):
1,000行即时库存 + 1,000行单据体 = 2,000次
循环次数是成倍数增长,跟方案二的指数增长相比,循环次数完全不在一个数量级。
性能表现也是可控的。

完整的插件代码


附:完整的插件代码,包含了错误方案、推荐方案三个版本的实现方法
拣货实现算法,未实际验证,且实现非常简单,仅供参考,请勿照搬为实际应用

//**************************************************
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.ComponentModel;

using Kingdee.BOS;
using Kingdee.BOS.Util;
using Kingdee.BOS.Core;
using Kingdee.BOS.Core.Metadata;
using Kingdee.BOS.Core.Metadata.ConvertElement;
using Kingdee.BOS.Core.Metadata.ConvertElement.PlugIn;
using Kingdee.BOS.Core.Metadata.ConvertElement.PlugIn.Args;
using Kingdee.BOS.Core.Metadata.EntityElement;
using Kingdee.BOS.Core.Metadata.FieldElement;
using Kingdee.BOS.Orm;
using Kingdee.BOS.Orm.DataEntity;
using Kingdee.BOS.App;
using Kingdee.BOS.App.Data;
using Kingdee.BOS.Contracts;

namespace JDSample.ServicePlugIn.BillConvert
{
public class S160330StockPickingConvPlug : AbstractConvertPlugIn
{
///


/// 单据体元数据
///

private Entity _entity = null;
///
/// 物料字段元数据
///

private BaseDataField _fldMaterial = null;

///
/// 仓库字段元数据
///

private BaseDataField _fldStock = null;

///
/// 基本单位数量字段元数据
///

private BaseQtyField _fldBaseQty = null;

///
/// 字段携带完毕,且关联关系已经顺利构建
///

///
public override void OnAfterCreateLink(CreateLinkEventArgs e)
{
// 获取后面要用到的元素的元数据
this._entity = e.TargetBusinessInfo.GetEntity("FEntity");
this._fldMaterial = e.TargetBusinessInfo.GetField("FMaterialID") as BaseDataField;
this._fldStock = e.TargetBusinessInfo.GetField("FStockID") as BaseDataField;
this._fldBaseQty = e.TargetBusinessInfo.GetField("FBaseUnitQty") as BaseQtyField;

// 读取已经生成的销售出库单:随后要对销售出库单,按单循环进行拣货与拆单据体行
ExtendedDataEntity[] billObjExts = e.TargetExtendedDataEntities.FindByEntityKey("FBillHead");
List billObjs = new List();
foreach (var billObjExt in billObjExts)
{
// 开始拣货
//this.FunVersion1(billObjExt.DataEntity); // 第一种实现代码:性能非常糟糕
//this.FunVersion2(billObjExt.DataEntity); // 第二种实现代码:性能同样糟糕
this.FunVersion3(billObjExt.DataEntity); // 第三种实现代码:推荐版本

billObjs.Add(billObjExt.DataEntity);
}

// 重新展开单据中包含的单据体行
// 特别说明:如果去掉此语句,新拆分的行,不会执行表单服务策略
e.TargetExtendedDataEntities.Parse(billObjs, e.TargetBusinessInfo);
}

///
/// 实现函数一:
/// 在循环中读取数据库,
/// 错误代码典型样本
///

/// 生成的下游单据集合
private void FunVersion1(DynamicObject billObj)
{
DynamicObjectCollection rows = this._entity.DynamicProperty.GetValue(billObj)
as DynamicObjectCollection;

// 对单据体进行循环:逐行实现拣货
// 需从后往前循环,新拆分的行,避开循环
int rowCount = rows.Count;
for (int i = rowCount - 1; i >= 0; i--)
{
DynamicObject currRow = rows[ i ]; // 当前行

// 取当前行物料:当前行没有填写物料,或者没有填写基本单位数量,不需要拣货,略过
DynamicObject materialObj = this._fldMaterial.DynamicProperty.GetValue(currRow) as DynamicObject;
if (materialObj == null) continue;

// 物料的MasterId:即时库存,使用的是物料的MasterId
long materialMasterId = Convert.ToInt64(materialObj[FormConst.MASTER_ID]);

// 性能问题点:在循环中执行SQL,读取数据:
// 读取有库存的仓库,按数量排序(未考虑货主、保管者、批号等其他库存维度,实际解决方案不能照搬)
string sql = "SELECT FSTOCKID, FQTY from T_STK_INVENTORY WHERE FMATERIALID = @FMATERIALID ORDER BY FQTY DESC";
SqlParam param = new SqlParam("@FMATERIALID", KDDbType.Int64, materialMasterId);
var stockQtyList = DBUtils.ExecuteDynamicObject(this.Context, sql, null, null, System.Data.CommandType.Text, param);

if (stockQtyList.Count == 0)
{// 没有找到有库存的仓库,不需要拣货,略过此行
continue;
}
else
{// 找到多个仓库有库存,需要灵活拣货
var newRows = this.DoStockPicking(rows, currRow, stockQtyList.ToList());
}
}
}

///
/// 实现函数二:
/// 把循环读取数据库,放在循环外一次性读取;
/// 但在循环中嵌套了循环,性能同样糟糕;
///

/// 生成的下游单据集合
private void FunVersion2(DynamicObject billObj)
{
DynamicObjectCollection rows = this._entity.DynamicProperty.GetValue(billObj)
as DynamicObjectCollection;

// 优化后算法:
// 先获取所有的物料,然后统一到即时库存中读取这些物料的库存量,
// 一次性取数,避免在循环中读取数据库
HashSet materialMasterIds = new HashSet();
// 循环取得所有的物料
foreach (var currRow in rows)
{
// 取当前行物料:当前行没有填写物料,或者没有填写基本单位数量,不需要拣货,略过
DynamicObject materialObj = this._fldMaterial.DynamicProperty.GetValue(currRow) as DynamicObject;
if (materialObj == null) continue;

// 物料的MasterId:即时库存,使用的是物料的MasterId
long materialMasterId = Convert.ToInt64(materialObj[FormConst.MASTER_ID]);
if (materialMasterIds.Contains(materialMasterId) == false)
{
materialMasterIds.Add(materialMasterId);
}
}

// 一次性读取所有物料的库存
string sql = @"
SELECT T1.FMATERIALID, T1.FSTOCKID, T1.FQTY
FROM T_STK_INVENTORY T1
INNER JOIN (table(fn_StrSplit(@FMATERIALID,',',1))) T2
ON (T1.FMATERIALID = T2.FID)
ORDER BY T1.FMATERIALID, T1.FQTY DESC";

SqlParam param = new SqlParam("@FMATERIALID", KDDbType.udt_inttable, materialMasterIds.ToArray());
var allStockQtyList = DBUtils.ExecuteDynamicObject(this.Context, sql, null, null, System.Data.CommandType.Text, param);

// 对单据体进行循环:逐行实现拣货
// 需从后往前循环,新拆分的行,避开循环
int rowCount = rows.Count;
for (int i = rowCount - 1; i >= 0; i--)
{
DynamicObject currRow = rows[ i ]; // 当前行

// 取当前行物料:当前行没有填写物料,或者没有填写基本单位数量,不需要拣货,略过
DynamicObject materialObj = this._fldMaterial.DynamicProperty.GetValue(currRow) as DynamicObject;
if (materialObj == null) continue;

// 物料的MasterId:即时库存,使用的是物料的MasterId
long materialMasterId = Convert.ToInt64(materialObj[FormConst.MASTER_ID]);

// 搜索此物料的库存数据 : 物料 = 当前行的物料
// 性能点:在循环中,从一个集合中搜索与当前行匹配的数据,使循环次数成指数上升
var stockQtyList = (from p in allStockQtyList
where ( Convert.ToInt64(p["FMaterialId"]) == materialMasterId)
select p).ToList();

if (stockQtyList.Count == 0)
{// 没有找到有库存的仓库,不需要拣货,略过此行
continue;
}
else
{// 找到多个仓库有库存,需要灵活拣货
var newRows = this.DoStockPicking(rows, currRow, stockQtyList.ToList());
}
}
}

///
/// 实现函数三:
/// 充分考虑了性能问题:
/// 1. 避免在循环中读取数据;
/// 2. 预先建立数据字典,避免在循环中嵌套循环
/// 推荐的实现方案
///

/// 生成的下游单据集合
private void FunVersion3(DynamicObject billObj)
{
DynamicObjectCollection rows = this._entity.DynamicProperty.GetValue(billObj)
as DynamicObjectCollection;

// 优化后算法:
// 先获取所有的物料,然后统一到即时库存中读取这些物料的库存量,
// 一次性取数,避免在循环中读取数据库
HashSet materialMasterIds = new HashSet();
// 循环取得所有的物料
foreach (var currRow in rows)
{
// 取当前行物料:当前行没有填写物料,或者没有填写基本单位数量,不需要拣货,略过
DynamicObject materialObj = this._fldMaterial.DynamicProperty.GetValue(currRow) as DynamicObject;
if (materialObj == null) continue;

// 物料的MasterId:即时库存,使用的是物料的MasterId
long materialMasterId = Convert.ToInt64(materialObj[FormConst.MASTER_ID]);
if (materialMasterIds.Contains(materialMasterId) == false)
{
materialMasterIds.Add(materialMasterId);
}
}

// 一次性读取所有物料的库存
string sql = @"
SELECT T1.FMATERIALID, T1.FSTOCKID, T1.FQTY
FROM T_STK_INVENTORY T1
INNER JOIN (table(fn_StrSplit(@FMATERIALID,',',1))) T2
ON (T1.FMATERIALID = T2.FID)
ORDER BY T1.FMATERIALID, T1.FQTY DESC";

SqlParam param = new SqlParam("@FMATERIALID", KDDbType.udt_inttable, materialMasterIds.ToArray());
var allStockQtyList = DBUtils.ExecuteDynamicObject(this.Context, sql, null, null, System.Data.CommandType.Text, param);

// 优化后算法:
// 对已经取得的物料库存数据,按物料进行分组,以便后面在循环中,快速取到本物料的库存
Dictionary> dctStockQty = new Dictionary>();
foreach (var stockQty in allStockQtyList)
{
long materialMasterId = Convert.ToInt64(stockQty["FMaterialId"]);
if (dctStockQty.ContainsKey(materialMasterId) == false)
{
dctStockQty.Add(materialMasterId, new List());
}
dctStockQty[materialMasterId].Add(stockQty);
}

// 对单据体进行循环:逐行实现拣货
// 需从后往前循环,新拆分的行,避开循环
int rowCount = rows.Count;
for (int i = rowCount - 1; i >= 0; i--)
{
DynamicObject currRow = rows[ i ]; // 当前行

// 取当前行物料:当前行没有填写物料,或者没有填写基本单位数量,不需要拣货,略过
DynamicObject materialObj = this._fldMaterial.DynamicProperty.GetValue(currRow) as DynamicObject;
if (materialObj == null) continue;

// 物料的MasterId:即时库存,使用的是物料的MasterId
long materialMasterId = Convert.ToInt64(materialObj[FormConst.MASTER_ID]);

// 搜索此物料的库存数据 : 物料 = 当前行的物料
// 性能点:在循环中,从一个集合中搜索与当前行匹配的数据,使循环次数成指数上升
List stockQtyList = null;
if (dctStockQty.TryGetValue(materialMasterId, out stockQtyList) == false
|| stockQtyList.Count == 0)
{// 没有找到有库存的仓库,不需要拣货,略过此行
continue;
}
else
{// 找到多个仓库有库存,需要灵活拣货
var newRows = this.DoStockPicking(rows, currRow, stockQtyList.ToList());
}
}
}

///
/// 拣货的实现逻辑:只是简单的实现拣货逻辑,仅供参考
///

/// 单据体行集合
/// 当前行
/// 物料即时库存数据
/// 新拆分出来的行
private List DoStockPicking(
DynamicObjectCollection rows,
DynamicObject currRow,
List stockQtyList)
{
List newRows = new List();

// 取本次需出库的基本单位数量
decimal baseQty = Convert.ToDecimal(this._fldBaseQty.DynamicProperty.GetValue(currRow));
if (baseQty == 0)
{
return newRows; // 无数量,不需要拣货,略过
}

// 读取单据数据的服务对象:提前准备好,后续需要不断使用
IViewService viewService = ServiceHelper.GetService();

// 首先判断第一个仓库库存是否足够:如够,直接使用此仓库出库
decimal stockQty = Convert.ToDecimal(stockQtyList[0]["FQty"]);
if (stockQtyList.Count == 1 || stockQty > baseQty)
{
long stockId = Convert.ToInt64(stockQtyList[0]["FStockId"]);

// 给仓库字段赋值
DynamicObject[] stockObjs = viewService.LoadFromCache(
this.Context, new object[] { stockId }, this._fldStock.RefFormDynamicObjectType);
this._fldStock.RefIDDynamicProperty.SetValue(currRow, stockId);
this._fldStock.DynamicProperty.SetValue(currRow, stockObjs[0]);
}
else
{// 需要使用多个仓库出库

// 首先把第一个仓库,填写到当前行,其他仓库,拆分新行出库
long stockId = Convert.ToInt64(stockQtyList[0]["FStockId"]);

// 给仓库字段赋值:取仓库数据,是使用了缓存的,性能比较好
DynamicObject[] stockObjs = viewService.LoadFromCache(
this.Context, new object[] { stockId }, this._fldStock.RefFormDynamicObjectType);
this._fldStock.RefIDDynamicProperty.SetValue(currRow, stockId);
this._fldStock.DynamicProperty.SetValue(currRow, stockObjs[0]);

// 调整此仓库出库数量
this._fldBaseQty.DynamicProperty.SetValue(currRow, stockQty);
baseQty = baseQty - stockQty;

// 剩余数量从其他仓库出库
int stockIndex = 1; // 当前仓库指针
while (baseQty > 0 && stockIndex < stockObjs.Length)
{
// 拆分出新行出库
DynamicObject newRow = (DynamicObject)currRow.Clone(false, true);

// 把新行,插入到单据中,排在当前行之后
rows.Insert(rows.IndexOf(currRow) + stockIndex, newRow);

// 填写仓库、数量
long otherStockId = Convert.ToInt64(stockQtyList[stockIndex]["FStockId"]);
decimal otherStockQty = Convert.ToDecimal(stockQtyList[stockIndex]["FQty"]);

if (stockIndex == stockObjs.Length - 1)
{
// 最后一个仓库,全部剩余数量,从此仓库出库
otherStockQty = baseQty;
}
if (otherStockQty > 0)
{
// 给仓库字段赋值
DynamicObject[] otherStockObjs = viewService.LoadFromCache(
this.Context, new object[] { otherStockId }, this._fldStock.RefFormDynamicObjectType);
this._fldStock.RefIDDynamicProperty.SetValue(newRow, otherStockId);
this._fldStock.DynamicProperty.SetValue(newRow, otherStockObjs[0]);

// 调整此仓库出库数量
this._fldBaseQty.DynamicProperty.SetValue(newRow, otherStockQty);
baseQty = baseQty - otherStockQty;
}

stockIndex++; // 仓库指针向后偏移:使用下一个仓库出库
}

}
return newRows;
}
}
}