码迷,mamicode.com
首页 > 数据库 > 详细

循环中读取数据库、嵌套循环引起的性能问题

时间:2016-04-29 16:44:23      阅读:303      评论:0      收藏:0      [点我收藏+]

标签:

背景说明

K/3 Cloud的代码开发规范,严格禁止在循环中到数据库读取数据,这会引发严重的性能问题:
需在循环外,一次性取回需要的数据。

但对于提前取回的数据,如果没有预先处理,常常需要嵌套一个循环到集合中取数,这也是非常严重的性能问题。

本帖将通过一个案例,编写三套实现方法,演示循环取数,典型的错误方案与推荐方案。




案例说明

需求:
生成销售出库单时,自动检查库存,从有存货的仓库出库。


实现方案:
编写单据转换插件,物料、数量携带完毕后,到数据库取有存货的仓库,填写到仓库字段中;
如果某一个仓库的存货不够,则拆行。
此方案需要逐个(循环)读取物料的库存,随后基于库存实现拣货,有非常大的性能隐患。

特别说明:
此案例,主要用来演示如何合理规避循环取数,与嵌套循环,解决性能问题;
很多业务细节,被略过。
实际实现代码,要比本案例复杂的多,千万不能直接照搬此案例代码,解决实际问题




典型代码解析

这个案例的完整插件代码比较长,我们先略过与本帖主题无关的外围代码,以及拣货逻辑,聚焦在可能发生性能问题的循环代码中。
如下,直接介绍循环的三种实现方式:

错误方案一:循环中读取数据

技术分享 

说明:
在循环中,读取每行物料的即时库存:会引发严重的性能问题,加重数据库压力


错误方案二:嵌套循环取数

技术分享 

技术分享 

说明:
这里嵌套了一个循环搜索符合本物料的库存,循环总次数可能非常惊人
如单据体有1,000行,实际循环次数将超过1,000,000次
如单据体有2,000行,实际循环次数将超过4,000,000次
总循环次数,成指数上升。而且,单据体行数越多,性能压力越明显,甚至有可能1个小时都动不了。

推荐方案:

技术分享 

技术分享 

说明:
本函数最终循环的次数(1000行单据体):
1,000行即时库存 + 1,000行单据体 = 2,000次
循环次数是成倍数增长,跟方案二的指数增长相比,循环次数完全不在一个数量级。
性能表现也是可控的。




完整的插件代码


附:完整的插件代码,包含了错误方案、推荐方案三个版本的实现方法

拣货实现算法,未实际验证,且实现非常简单,仅供参考,请勿照搬为实际应用


using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.ComponentModel;
using Kingdee.BOS;
using Kingdee.BOS.Util;
using Kingdee.BOS.Core;
using Kingdee.BOS.Core.Metadata;
using Kingdee.BOS.Core.Metadata.ConvertElement;
using Kingdee.BOS.Core.Metadata.ConvertElement.PlugIn;
using Kingdee.BOS.Core.Metadata.ConvertElement.PlugIn.Args;
using Kingdee.BOS.Core.Metadata.EntityElement;
using Kingdee.BOS.Core.Metadata.FieldElement;
using Kingdee.BOS.Orm;
using Kingdee.BOS.Orm.DataEntity;
using Kingdee.BOS.App;
using Kingdee.BOS.App.Data;
using Kingdee.BOS.Contracts;
namespace JDSample.ServicePlugIn.BillConvert
{
    public class S160330StockPickingConvPlug : AbstractConvertPlugIn
    {
        /// <summary>
        /// 单据体元数据
        /// </summary>
        private Entity _entity = null;
        /// <summary>
        /// 物料字段元数据
        /// </summary>
        private BaseDataField _fldMaterial = null;
        
        /// <summary>
        /// 仓库字段元数据
        /// </summary>
        private BaseDataField _fldStock = null;
        /// <summary>
        /// 基本单位数量字段元数据
        /// </summary>
        private BaseQtyField _fldBaseQty = null;
        /// <summary>
        /// 字段携带完毕,且关联关系已经顺利构建
        /// </summary>
        /// <param name="e"></param>
        public override void OnAfterCreateLink(CreateLinkEventArgs e)
        {
            // 获取后面要用到的元素的元数据
            this._entity = e.TargetBusinessInfo.GetEntity("FEntity");
            this._fldMaterial = e.TargetBusinessInfo.GetField("FMaterialID") as BaseDataField;
            this._fldStock = e.TargetBusinessInfo.GetField("FStockID") as BaseDataField;
            this._fldBaseQty = e.TargetBusinessInfo.GetField("FBaseUnitQty") as BaseQtyField;
            // 读取已经生成的销售出库单:随后要对销售出库单,按单循环进行拣货与拆单据体行
            ExtendedDataEntity[] billObjExts = e.TargetExtendedDataEntities.FindByEntityKey("FBillHead");
            List<DynamicObject> billObjs = new List<DynamicObject>();
            foreach (var billObjExt in billObjExts)
            {
                // 开始拣货
                //this.FunVersion1(billObjExt.DataEntity);            // 第一种实现代码:性能非常糟糕
                //this.FunVersion2(billObjExt.DataEntity);            // 第二种实现代码:性能同样糟糕
                this.FunVersion3(billObjExt.DataEntity);            // 第三种实现代码:推荐版本
                billObjs.Add(billObjExt.DataEntity);
            }
            // 重新展开单据中包含的单据体行
            // 特别说明:如果去掉此语句,新拆分的行,不会执行表单服务策略
            e.TargetExtendedDataEntities.Parse(billObjs, e.TargetBusinessInfo);
        }
        /// <summary>
        /// 实现函数一:
        /// 在循环中读取数据库,
        /// 错误代码典型样本
        /// </summary>
        /// <param name="billObj">生成的下游单据集合</param>
        private void FunVersion1(DynamicObject billObj)
        {
            DynamicObjectCollection rows = this._entity.DynamicProperty.GetValue(billObj)
                as DynamicObjectCollection;
            // 对单据体进行循环:逐行实现拣货
            // 需从后往前循环,新拆分的行,避开循环
            int rowCount = rows.Count;
            for (int i = rowCount - 1; i >= 0; i--)
            {
                DynamicObject currRow = rows[ i ];      // 当前行
                // 取当前行物料:当前行没有填写物料,或者没有填写基本单位数量,不需要拣货,略过
                DynamicObject materialObj = this._fldMaterial.DynamicProperty.GetValue(currRow) as DynamicObject;
                if (materialObj == null) continue;
                // 物料的MasterId:即时库存,使用的是物料的MasterId
                long materialMasterId = Convert.ToInt64(materialObj[FormConst.MASTER_ID]);
                // 性能问题点:在循环中执行SQL,读取数据:
                // 读取有库存的仓库,按数量排序(未考虑货主、保管者、批号等其他库存维度,实际解决方案不能照搬)
                string sql = "SELECT FSTOCKID, FQTY from T_STK_INVENTORY WHERE FMATERIALID = @FMATERIALID ORDER BY FQTY DESC";
                SqlParam param = new SqlParam("@FMATERIALID", KDDbType.Int64, materialMasterId);
                var stockQtyList = DBUtils.ExecuteDynamicObject(this.Context, sql, null, null, System.Data.CommandType.Text, param);
                if (stockQtyList.Count == 0)
                {// 没有找到有库存的仓库,不需要拣货,略过此行
                    continue;   
                }
                else 
                {// 找到多个仓库有库存,需要灵活拣货
                    var newRows = this.DoStockPicking(rows, currRow, stockQtyList.ToList());
                }
            }
        }
        /// <summary>
        /// 实现函数二:
        /// 把循环读取数据库,放在循环外一次性读取;
        /// 但在循环中嵌套了循环,性能同样糟糕;
        /// </summary>
        /// <param name="billObj">生成的下游单据集合</param>
        private void FunVersion2(DynamicObject billObj)
        {
            DynamicObjectCollection rows = this._entity.DynamicProperty.GetValue(billObj)
                as DynamicObjectCollection;
            // 优化后算法:
            // 先获取所有的物料,然后统一到即时库存中读取这些物料的库存量,
            // 一次性取数,避免在循环中读取数据库
            HashSet<long> materialMasterIds = new HashSet<long>();
            // 循环取得所有的物料
            foreach (var currRow in rows)
            {
                // 取当前行物料:当前行没有填写物料,或者没有填写基本单位数量,不需要拣货,略过
                DynamicObject materialObj = this._fldMaterial.DynamicProperty.GetValue(currRow) as DynamicObject;
                if (materialObj == null) continue;
                // 物料的MasterId:即时库存,使用的是物料的MasterId
                long materialMasterId = Convert.ToInt64(materialObj[FormConst.MASTER_ID]);
                if (materialMasterIds.Contains(materialMasterId) == false)
                {
                    materialMasterIds.Add(materialMasterId);
                }
            }
            // 一次性读取所有物料的库存
            string sql = @"
SELECT T1.FMATERIALID, T1.FSTOCKID, T1.FQTY 
  FROM T_STK_INVENTORY T1 
INNER JOIN (table(fn_StrSplit(@FMATERIALID,',',1))) T2 
    ON (T1.FMATERIALID = T2.FID) 
ORDER BY T1.FMATERIALID, T1.FQTY DESC";
            SqlParam param = new SqlParam("@FMATERIALID", KDDbType.udt_inttable, materialMasterIds.ToArray());
            var allStockQtyList = DBUtils.ExecuteDynamicObject(this.Context, sql, null, null, System.Data.CommandType.Text, param);
            // 对单据体进行循环:逐行实现拣货
            // 需从后往前循环,新拆分的行,避开循环
            int rowCount = rows.Count;
            for (int i = rowCount - 1; i >= 0; i--)
            {
                DynamicObject currRow = rows[ i ];      // 当前行
                // 取当前行物料:当前行没有填写物料,或者没有填写基本单位数量,不需要拣货,略过
                DynamicObject materialObj = this._fldMaterial.DynamicProperty.GetValue(currRow) as DynamicObject;
                if (materialObj == null) continue;
                // 物料的MasterId:即时库存,使用的是物料的MasterId
                long materialMasterId = Convert.ToInt64(materialObj[FormConst.MASTER_ID]);
                // 搜索此物料的库存数据 : 物料 = 当前行的物料
                // 性能点:在循环中,从一个集合中搜索与当前行匹配的数据,使循环次数成指数上升
                var stockQtyList = (from p in allStockQtyList 
                                    where ( Convert.ToInt64(p["FMaterialId"]) == materialMasterId)
                                    select p).ToList();
                if (stockQtyList.Count == 0)
                {// 没有找到有库存的仓库,不需要拣货,略过此行
                    continue;
                }
                else
                {// 找到多个仓库有库存,需要灵活拣货
                    var newRows = this.DoStockPicking(rows, currRow, stockQtyList.ToList());
                }
            }
        }
        /// <summary>
        /// 实现函数三:
        /// 充分考虑了性能问题:
        /// 1. 避免在循环中读取数据;
        /// 2. 预先建立数据字典,避免在循环中嵌套循环
        /// 推荐的实现方案
        /// </summary>
        /// <param name="billObj">生成的下游单据集合</param>
        private void FunVersion3(DynamicObject billObj)
        {
            DynamicObjectCollection rows = this._entity.DynamicProperty.GetValue(billObj)
                as DynamicObjectCollection;
            // 优化后算法:
            // 先获取所有的物料,然后统一到即时库存中读取这些物料的库存量,
            // 一次性取数,避免在循环中读取数据库
            HashSet<long> materialMasterIds = new HashSet<long>();
            // 循环取得所有的物料
            foreach (var currRow in rows)
            {
                // 取当前行物料:当前行没有填写物料,或者没有填写基本单位数量,不需要拣货,略过
                DynamicObject materialObj = this._fldMaterial.DynamicProperty.GetValue(currRow) as DynamicObject;
                if (materialObj == null) continue;
                // 物料的MasterId:即时库存,使用的是物料的MasterId
                long materialMasterId = Convert.ToInt64(materialObj[FormConst.MASTER_ID]);
                if (materialMasterIds.Contains(materialMasterId) == false)
                {
                    materialMasterIds.Add(materialMasterId);
                }
            }
            // 一次性读取所有物料的库存
            string sql = @"
SELECT T1.FMATERIALID, T1.FSTOCKID, T1.FQTY 
  FROM T_STK_INVENTORY T1 
INNER JOIN (table(fn_StrSplit(@FMATERIALID,',',1))) T2 
    ON (T1.FMATERIALID = T2.FID) 
ORDER BY T1.FMATERIALID, T1.FQTY DESC";
            SqlParam param = new SqlParam("@FMATERIALID", KDDbType.udt_inttable, materialMasterIds.ToArray());
            var allStockQtyList = DBUtils.ExecuteDynamicObject(this.Context, sql, null, null, System.Data.CommandType.Text, param);
            // 优化后算法:
            // 对已经取得的物料库存数据,按物料进行分组,以便后面在循环中,快速取到本物料的库存
            Dictionary<long, List<DynamicObject>> dctStockQty = new Dictionary<long, List<DynamicObject>>();
            foreach (var stockQty in allStockQtyList)
            {
                long materialMasterId = Convert.ToInt64(stockQty["FMaterialId"]);
                if (dctStockQty.ContainsKey(materialMasterId) == false)
                {
                    dctStockQty.Add(materialMasterId, new List<DynamicObject>());
                }
                dctStockQty[materialMasterId].Add(stockQty);
            }
            // 对单据体进行循环:逐行实现拣货
            // 需从后往前循环,新拆分的行,避开循环
            int rowCount = rows.Count;
            for (int i = rowCount - 1; i >= 0; i--)
            {
                DynamicObject currRow = rows[ i ];      // 当前行
                // 取当前行物料:当前行没有填写物料,或者没有填写基本单位数量,不需要拣货,略过
                DynamicObject materialObj = this._fldMaterial.DynamicProperty.GetValue(currRow) as DynamicObject;
                if (materialObj == null) continue;
                // 物料的MasterId:即时库存,使用的是物料的MasterId
                long materialMasterId = Convert.ToInt64(materialObj[FormConst.MASTER_ID]);
                // 搜索此物料的库存数据 : 物料 = 当前行的物料
                // 性能点:在循环中,从一个集合中搜索与当前行匹配的数据,使循环次数成指数上升
                List<DynamicObject> stockQtyList = null;
                if (dctStockQty.TryGetValue(materialMasterId, out stockQtyList) == false
                    || stockQtyList.Count == 0)
                {// 没有找到有库存的仓库,不需要拣货,略过此行
                    continue;
                }
                else
                {// 找到多个仓库有库存,需要灵活拣货
                    var newRows = this.DoStockPicking(rows, currRow, stockQtyList.ToList());
                }
            }
        }
        /// <summary>
        /// 拣货的实现逻辑:只是简单的实现拣货逻辑,仅供参考
        /// </summary>
        /// <param name="rows">单据体行集合</param>
        /// <param name="currRow">当前行</param>
        /// <param name="stockQtyList">物料即时库存数据</param>
        /// <returns>新拆分出来的行</returns>
        private List<DynamicObject> DoStockPicking(
            DynamicObjectCollection rows,
            DynamicObject currRow,
            List<DynamicObject> stockQtyList)
        {
            List<DynamicObject> newRows = new List<DynamicObject>();
            // 取本次需出库的基本单位数量
            decimal baseQty = Convert.ToDecimal(this._fldBaseQty.DynamicProperty.GetValue(currRow));
            if (baseQty == 0)
            {
                return newRows;     // 无数量,不需要拣货,略过
            }
            // 读取单据数据的服务对象:提前准备好,后续需要不断使用
            IViewService viewService = ServiceHelper.GetService<IViewService>();
            // 首先判断第一个仓库库存是否足够:如够,直接使用此仓库出库
            decimal stockQty = Convert.ToDecimal(stockQtyList[0]["FQty"]);
            if (stockQtyList.Count == 1 || stockQty > baseQty)
            {
                long stockId = Convert.ToInt64(stockQtyList[0]["FStockId"]);
                // 给仓库字段赋值
                DynamicObject[] stockObjs = viewService.LoadFromCache(
                    this.Context, new object[] { stockId }, this._fldStock.RefFormDynamicObjectType);
                this._fldStock.RefIDDynamicProperty.SetValue(currRow, stockId);
                this._fldStock.DynamicProperty.SetValue(currRow, stockObjs[0]);
            }
            else
            {// 需要使用多个仓库出库
                // 首先把第一个仓库,填写到当前行,其他仓库,拆分新行出库
                long stockId = Convert.ToInt64(stockQtyList[0]["FStockId"]);
                // 给仓库字段赋值:取仓库数据,是使用了缓存的,性能比较好
                DynamicObject[] stockObjs = viewService.LoadFromCache(
                    this.Context, new object[] { stockId }, this._fldStock.RefFormDynamicObjectType);
                this._fldStock.RefIDDynamicProperty.SetValue(currRow, stockId);
                this._fldStock.DynamicProperty.SetValue(currRow, stockObjs[0]);
                // 调整此仓库出库数量
                this._fldBaseQty.DynamicProperty.SetValue(currRow, stockQty);
                baseQty = baseQty - stockQty;
                // 剩余数量从其他仓库出库
                int stockIndex = 1;     // 当前仓库指针
                while (baseQty > 0 && stockIndex < stockObjs.Length)
                {
                    // 拆分出新行出库
                    DynamicObject newRow = (DynamicObject)currRow.Clone(false, true);
                    // 把新行,插入到单据中,排在当前行之后
                    rows.Insert(rows.IndexOf(currRow) + stockIndex, newRow);
                    // 填写仓库、数量
                    long otherStockId = Convert.ToInt64(stockQtyList[stockIndex]["FStockId"]);
                    decimal otherStockQty = Convert.ToDecimal(stockQtyList[stockIndex]["FQty"]);
                    if (stockIndex == stockObjs.Length - 1)
                    {
                        // 最后一个仓库,全部剩余数量,从此仓库出库
                        otherStockQty = baseQty;
                    }
                    if (otherStockQty > 0)
                    {
                        // 给仓库字段赋值
                        DynamicObject[] otherStockObjs = viewService.LoadFromCache(
                            this.Context, new object[] { otherStockId }, this._fldStock.RefFormDynamicObjectType);
                        this._fldStock.RefIDDynamicProperty.SetValue(newRow, otherStockId);
                        this._fldStock.DynamicProperty.SetValue(newRow, otherStockObjs[0]);
                        // 调整此仓库出库数量
                        this._fldBaseQty.DynamicProperty.SetValue(newRow, otherStockQty);
                        baseQty = baseQty - otherStockQty;
                    }
                    stockIndex++;  // 仓库指针向后偏移:使用下一个仓库出库
                }
            }
            return newRows;
        }
    }
}


循环中读取数据库、嵌套循环引起的性能问题

标签:

原文地址:http://blog.csdn.net/fyq891014/article/details/51260235

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!