随着企业管理系统管理和维护的数据量越来越大, 大多数企业往往会陷入早期配置的存储系统在使用一段时间后达到瓶颈的困境.最初, 大家都是通过给系统替换内存更大、CPU更多的设备来解决这类问题, 但是这种替换整个系统的解决方案耗费的代价十分高昂.后来, 伴随着网络通信技术和分布式存储理论的成熟、发展, 分布式数据库逐渐登上了时代的舞台.相比传统集中式数据库, 分布式数据库具有非常友好的扩展性.因此, 为了安全、可靠、低成本地存储和管理这些海量的数据, 我们逐步选择将这些应用转移到分布式数据库中去.
在商业应用和政府工作中, 经常需要根据某些纬度做信息统计和分析.比如, 公司需要统计各个产品的销售量、营业额、利润; 企业需要统计各地区的营业额、利润情况; 基金公司需要了解各个基金的收益情况; 政府部门需要统计各个地区的男女比例、各个年龄段的劳动力情况、每年的新生儿出生率情况.这些需求都导致我们不可避免的需要调用分组和聚合等操作来实现.分组和聚合已经成为数据分析应用中出现频率最多的操作之一.因此, 如何提高分组和聚合的工作效率已经成为我们做统计分析的重中之重.本文研究的重点便是如何在分布式数据库中提高分组和聚合的运算效率.
论文的内容组织如下:第1节简要介绍了分布式数据库的基本结构以及目前分组和聚合的研究现状.第2节详细介绍了本文提出的两种通过Hash算法进行分组聚合的方案, 以及一种根据统计信息动态决策Hash桶数目和这两种Hash分组聚合方案的策略.第3节主要介绍了如何利用分布式数据库特点实现Hash分组聚合节点级的并行方案.第4节从数据分布、是否并行、并行节点个数等方面对Hash分组聚合效率进行了全面的实验验证.第5节对本文进行了总结.
1 相关工作 1.1 背景介绍随着网络通信技术分布式存储理论的成熟和发展, 数据库的发展逐渐进入了分布式的阶段.相比传统集中式数据库, 分布式数据库可以通过动态增加存储节点来实现存储容量的横向扩展.并且, 分布式数据库为了保证数据的安全和可靠存储, 往往会采用备份的策略.数据多副本存储不仅能够提供可靠、安全的存储, 还可以提高系统查询的并发量.由于分布式数据支持横向扩展和多副本安全存储的特性, 越来越多的企业开始采用分布式数据库作为自己的数据存储分析平台.分布式数据库一般分为物理数据存储节点和数据查询计算节点.以OceanBase[1]分布式数据库为例, 数据存储在ChunkServer服务器节点(以下简称CS).为了提高数据存储的可靠性, 一般设置多副本, 分别存储在不同CS节点上.查询一般交由MergeServer(以下简称为MS)处理.其他两类节点, UpdateServer(以下简称UPS)和RootServer(以下简称RS)则分别负责系统的增量数据处理和集群中所有服务器节点的管理.
针对GroupBy功能的实现, 大部分关系型数据库采用的是一种排序分组的方式.数据经过排序之后, 相同数据值的元组会排列在一起, 不同数据值只在各自边界时出现.但是, 单纯对于分组来说, 利用排序进行数据分组是事倍功半的操作, 将会导致数据分组以及分组之后的聚合操作效率低下.因此, 针对这一缺点, 我们提出通过Hash算法的方式去实现分组: Hash算法可以将属性数据映射到一个整数类型区间; 然后通过取模运算将整型值分类到不同桶中, 这样相同数据值就会映射到同一桶内, 即实现了基本的分组功能.因此, 使用Hash算法去设计GroupBy算子, 将极大地提高数据库做分组和聚合的效率.
1.2 相关研究许多研究人员为了提高数据库的查询处理能力,从查询涉及的众多方面进行了深入的研究. Aggregation、GroupBy作为查询语句中出现频率很多, 且耗费系统资源和占用查询时间较多的功能也吸引了很多学者的关注.众多学者从各个角度对Aggregation、GroupBy提出了优化方案.
Ho[2]提出了SUM和MAX这两种聚合操作的快速算法. Roussopoulos和Gupta等[3-4]认为通过维护恰当的物化视图可以加速Aggregation、GroupBy算子的运行.但是, 建立一个物化视图对于系统来说, 需要占用大量内存资源.并且对于需要频繁写入、修改的系统来说, 维护视图与数据库的数据一致性会拖慢写入和更新的效率. Govindarajan等[5]提出一个CRB-Tree索引结构, Feng Y等[6]提出Ag+-Tree索引结构, Sellis等[7]提出了BR+-Tree, 他们认为建立和维护一个高效的索引结构可以提高分组聚合查询的效率.建立索引相比物化视图, 虽然可以减少内存消耗, 但是与物化视图具有相同的缺陷:当系统是一个写密集型的系统时, 维护索引将拖慢写入和更新的效率.因此针对写密集应用, 通过建立索引和物化视图等方法加速分组聚集具有一定的局限性.
Tao等[8]分析传统分组聚合问题的特点, 将一个集合运算中出现同一个对象定义为RASS问题, 提供有效解决该问题的方案. Condie等[9]在Hadoop MapReduce[10]框架基础上修改并提出了HOP, 使得Aggregation函数不仅可以由多任务协同处理, 而且还可以看到已经完成任务返回的数据结果, 极大地提高Aggregation的查询响应速度.以上做法, 都是离开分组和聚合的具体实现来考虑问题.无论是从索引和视图等角度优化, 还是通过并行优化, 都是在分组和聚合操作的上层进行考虑问题.建立视图, 搭建MapReduce框架进行并行计算等优化方法的底层还是需要调用分组算子.在这里, 我们从分组和聚合算子的具体实现角度提出新的优化方案, 并且本文实现的hash分组聚合方案完全兼容索引和并行等优化手段.
2 Hash GroupBy的工作原理Hash GroupBy使用Hash算法进行分组. Hash函数首先计算数据Hash值, 然后对计算后的Hash值进行分区.比如: GroupBy c1, Hash函数首先计算c1列数据的Hash key, 然后将Hash key进行分区.由于调用的Hash函数相同, 因此相同数据计算出的Hash key值相同, 分区也相同.但是也可能存在不同数据计算出来的Hash key值相同情况, 我们在设计时需要特别考虑这类情况.此外, 针对Hash冲突情况, 这里我们采用开链法处理冲突.
2.1 方案一桶内排序分组原理如果数据集不同值个数很多, 即GroupBy得到的结果集很多.此种情况下, 我们建议使用桶内排序.先计算所有数据Hash key并插入桶中, 随后对每个桶内数据进行快速排序.由于可能出现不同row值计算出来的key相同从而导致分到的桶相同的情况.我们需要在保存Hash key的同时保存该列的值.具体伪代码见算法1.
![]() |
(1) 首先计算row的Hash key, 构建 < key, row*>键值对, 并插入Hash table中.
(2) 重复执行1, 直到所有row的Hash key都插入到Hash table中.在此过程中, 使用一个数组vec保存所有不重复的Hash key.
(3) 从vec中取出一个Hash key, 得到该Hash key对应桶中所有的row*, 放到一个临时数组中排序.将临时数组中排序后的row*都加入一个全局数组中后, 释放临时数组.重复执行直到所有Hash key对应的桶数据都加入到全局数组中.
(4) 此时, 全局数组中相同数据皆相邻存储, 但全局并不有序.然后, 根据聚合函数相邻相同数据执行聚合操作.
虽然与MergeSort GroupBy算子相似, 都是通过排序进行分组, 但是这里只是执行桶内排序.与MergeSort GroupBy相比, 排序的量级从整个表数据减小为一个桶内数据.因此, 该方案效率与桶内元组个数相关, 我们可以通过动态设置Hash桶数目调整同一桶内元组数目, 详细见2.3.但该方案桶内会存储重复元素, 且重复元素导致的同一桶内元素数目膨胀无法通过动态设置Hash桶解决.因此, 该方案实际效率受数据分布影响.如果数据相同值很多, 则同一桶内元素个数会很多, 会增加排序时间消耗, 因此该方案不适合处理相同值较多的情况.这种做法适合于数据分布均匀的情况.
2.2 方案二频数分组聚合原理如果数据集相同值个数较多, 即GroupBy得到的结果集不多.此种情况下, 我们提出方案二, 对每一条数据row, 查询Hash table, 如果Hash table内已经有相同元素则记录频数.我们在桶内只保存同一分组内的一条记录.这样可以极大地减少Hash表的内存大小.一个桶内的数据元素个数与聚集后的结果集个数相同.具体伪代码见算法2.
![]() |
(1) 对每一条row, 计算row的Hash key值, 然后确定该row对应的bucket_pos.
(2) 如果该bucket_pos对应的桶为空, 则插入该元组, 并调用聚合聚集表达式计算该元组.
(3) 如果桶中非空, 则逐一搜索桶中元组并比较.如果有相同元组, 不直接往链表中插入row值, 将该row对应频数加1.
(4) 如果没有相同元组, 则插入该元组, 并调用聚合函数计算该元组.
(5) 重复1-4, 直到所有元组都处理完.输出值时须判断频数, 若为1, 则直接输出; 否则, 调用聚合函数计算后输出.
该方案不需要进行任何排序, 但在处理每条row数据时都需要对Hash table中的所有数据进行一次遍历比较.因此, 如果每一个Hash key值涉及到多个数据, 则这种逐一比较耗费时间较高.但是该方案桶内不会有重复元组.所以, 如果我们合理控制Hash table在初始化时Hash桶的数目, 便可以极大减小Hash冲突率从而降低这种逐一比较的代价.因此, 我们在这里提出根据统计信息动态设置桶数, 由此控制每个桶内元素个数, 减少同一桶内元素个数, 详细见第2.3节.
考虑到计算节点上有限的内存, 当数据总量超过内存上限时如何初始化Hash桶便成为了一个难题.针对这一限制条件, 本文提出一种方案:首先对数据集调用Hash函数, 对数据进行分区操作, 分区操作可以将数据的量级减小.然后对分区后的分区表构建Hash表.如果分区之后的数据依然很大, 那么就进行递归分区, 直到数据适合内存大小.这样, 我们对整个数据集的操作就变为对分区之后的分区表操作.比如group c1, 如果c1列数据非常多, 则根据c1将数据集D划分为D1, D2,
由于调用Hash GroupBy时, 桶数目和数据分布会影响最终的Hash分组聚合效率.因此, 我们提出根据统计信息动态设定Hash桶数目以及动态决策最终采用的Hash GroupBy方案.为此, 首先需要在系统中收集统计信息, 分别收集表、列的统计信息.表的统计信息内主要包含表内数据的行数count.列的统计信息内包含各列不重复值个数distinct_col, 该列中高频率出现的10个频数值top[10].
2.3.1 Hash桶数目的动态决策实际分析应用时, 我们可能只需要根据一个属性做分析, 也可能根据多个属性信息进行分析.因此, 我们根据分组列为一个属性或多个属性分类讨论.当分组条件为单列时, 根据如下公式决定bucket_num.
$ \begin{align} {\rm avg}_{\rm num}=\frac{{\rm count}-\Sigma_{i=1}^{10} {\rm top}[i]}{{\rm distinct}\_{\rm col}-10}, \end{align} $ | (1) |
$ \begin{align} {\rm{bucket\_num}}=\left\{\!\!\!{\begin{array}{l} \frac{{\rm distinct}\_{\rm col}}{N}, \mbox{如果}{\rm avg}_{\rm num}\leqslant N, \\ {\rm distinct}\_{\rm col}, \mbox{若}{\rm avg}_{\rm num}> N. \end{array}}\right. \end{align} $ | (2) |
针对单列情况, 我们可以精细化地根据列中不同值个数进行Hash桶数目的选择, 使得Hash桶的数目大致等于该列不同值个数.这就能保证Hash table中每个桶内仅存储对应值.但是, 为了避免多数桶内只有一个元素的情况发生, 我们根据开发经验预估出Hash table中一个数据值的重复数目(avg_num).为了避免个别高频值干扰, 我们先用桶内总行数count减去统计信息中记录的10个高频值
当分组条件为多列时, 由于列与列之间相互独立, 两列之间不同值个数也并无关系.因此无法根据某列不同值个数决定桶数.比如GroupBy c1、c2和c3, 我们无法根据distinct_col
$ \begin{align} {\rm{bucket\_num}} = \left\{\!\!\!{\begin{array}{l}\frac{\rm count}{P}, \mbox{如果}{\rm avg}_{\rm num}> P, \\ {\rm count}, \mbox{若}{\rm avg}_{\rm num}\leqslant P. \end{array}}\right. \end{align} $ | (3) |
由于我们提出的两种方案针对不同数据分布各有特点, 因此, 我们提出根据统计信息动态决策具体的Hash GroupBy方案.与2.3.1节动态决定Hash桶数目相同, 根据单列和多列进行分别讨论.
首先, 我们分析单列情况, 与2.3.1中单列情况下Hash桶数目动态决策的方法相同.根据公式(1), 大致可以得到每个值平均数目avg_num.在实际实现过程中, 根据系统设计经验设置一个经验参数
当分组条件为多列时, Hash桶数目动态决策策略与2.3.1中提及的策略相同.通常情况下, 数据表中列与列之间相互独立.因此, 两列分别求出的avg_num并无关联.通过公式(1)的计算, 我们可以分别得到每一列的avg_num值, 从而选择最小的avg_num进行粗略估算(选择min(avg_num
分布式数据库集群一般部署多个数据存储节点, 每个数据节点负责管理和维护自己节点的数据并提供简单的读取和计算功能.当分布式数据库存储一张大表时, 系统会将大表数据根据主键水平切分成为多个分片, 一个分片存储一部分数据.为了保证系统的高可靠性, 分布式数据库还会设置多副本存储, 即将同一份数据备份存储在多个数据存储节点上.
考虑到既然各个存储节点可以完成简单的数据存取和计算功能, 且各个节点相互独立、互不影响.那么我们设计Hash GroupBy算子时完全可以将算子下压到各个数据存储节点上从而实现节点间的并行执行.在算子执行过程中, 各物理存储节点各自读取、计算自己节点的分片数据后, 分别调用Hash GroupBy对自己节点的分片数据进行分组聚合.因此, 通过算子的分发执行便可以实现Hash GroupBy算子在不同节点间的并行执行.
以OceanBase分布式数据库为例, 假设集群有三个物理存储节点CS1、CS2、CS3, 副本配置为3.由于数据表R1很大, 数据库会将R1切分成三个分片R11、R12、R13.同时复制R11、R12、R13进行副本备份, 并根据负载均衡算法将各个分片及副本分散存储在各CS节点上.我们实现将Hash GroupBy算子下压到CS上的功能, 即会在CS1、CS2、CS3上分别生成一个Hash GroupBy算子.
假设有一个分组聚合SQL: select count(col1) from R1 GroupBy col1.理想情况下, 查询服务节点MS会将查询数据请求均匀分发给三个节点.例如, 请求CS1有关R11分片数据, 请求CS2节点R12分片数据, 请求CS3节点R13分片数据. Hash GroupBy算子下压到CS意味着, 每个CS数据存储节点在读取磁盘数据以及其他计算功能之后, 还可以执行Hash GroupBy算子, 即CS1上会调用Hash GroupBy对R11部分数据进行分组聚合, 在CS2、CS3上同样会调用Hash GroupBy对R12、R13部分数据进行分组聚合.最终各节点将计算后的结果返回给MS. MS上同样会有一个Hash GroupBy算子用于对各CS最终返回的结果集进行分组聚合.具体算子执行原理如图 1.
![]() |
图 1 Hash GroupBy下压执行计划 Fig.1 Hash GroupBy push down execution plan |
首先测试随机数据分布情况下, Hash GroupBy相比MergeSort GroupBy是否有效率提升.接着, 测试不同数据分布和单节点非并行情况下, 两种不同Hash GroupBy方案相比MergeSort GroupBy的效率提升.最后, 测试特定数据分布、统计信息动态决策方案和三节点并行情况下, Hash GroupBy方案相比MergeSort GroupBy的效率提升.
4.1 实验环境本文选择OceanBase 0.4系统作为实验测试系统, 实验采用五台服务器部署数据库功能节点.其中控制节点RS和事务节点UPS公用一台服务器, 查询节点MS使用一台服务器节点, 其余三台服务器部署数据存储节点CS.
本文实验测试所使用的硬件环境如表 1所示, 其中磁盘为SSD(Solid State Drive).
![]() |
表 1 集群服务器配置 Tab.1 The cluster server configuration |
本文测试所用到的数据表模式如表 2所示.
![]() |
表 2 测试表的Schema Tab.2 The schema of the test table |
由于不同数据类型占用系统内存大小、耗费网络传输时间不同, 特别是数据比较时间不同, 因此生成的数据包含多种数据类型.为了尽可能控制变量, 本文测试了每种数据类型采用不同分组方法的效率差异, 我们的测试语句皆以单独列进行分组测试, 比如, 采用select count(c1) GroupBy c1测试int32.此外, 由于Hash函数存在冲突及Hash函数映射值空间有限等情况, Hash GroupBy实现效率还与桶内数据个数有关.因此, 我们设计了桶内不同数据个数分组聚合情况.实验涉及的主要表、数据分布情况以及数据量如表 3所示.
![]() |
表 3 测试表的大小和分布 Tab.3 Test data table information |
以G1为例, 数据分布指的是Hash桶内的元素个数, 1代表Hash桶内只有一个相同数据元素, 其他类比.桶内元素个数, 我们可以通过改变Hash桶数调整, 但是相同数据元素必定排列在同一个桶内, 因此, 我们必须考虑此种情况.除此之外, 我们还创建了6张表, 数据量分别为1万、10万、50万、100万、500万、1000万, 其数据类型与G1等相同, 但数据分布为随机生成, 毫无规律.
4.3 随机数据分布下方案一桶内排序的Hash GroupBy效率提升实验目的:测试方案一桶内排序Hash GroupBy相比MergeSort GroupBy是否有效率提升.数据设置: Hash桶数为80万左右, 数据表大小分别是1万、10万、50万、100万、500万、1000万.数据全都是随机生成, 毫无规律.实验结果如图 2所示.
![]() |
图 2 随机数据分布下方案一Hash GroupBy效率提升图 Fig.2 The efficiency graph of the Hash GroupBy under random data distribution with option 1 |
从实验数据上看, 对于方案一来说, Hash GroupBy相比MergeSort GroupBy而言, 是有效率提升的.其中, int32与double效率提升基本相同, 复杂数据类型比如varchar(50)、decimal(15, 5), 效率提升尤为明显.
4.4 数据分布对于不同Hash分组聚合方案的效率影响由于不同数据分布对于分组效率影响较大, 因此我们针对方案一桶内排序和方案二频数聚合分组设计实验, 对不同数据分布情况进行了测试.实验结果如图 3和图 4.
![]() |
图 3 方案一效率提升 Fig.3 The efficiency graph under option 1 |
![]() |
图 4 方案二效率提升 Fig.4 The efficiency graph under option 2 |
从实验结果分析来看, 对于方案一桶内排序来说, 随着桶内相同数据元素不断增多, 桶内排序效率提升越来越小, 甚至当同一桶内数据元素为50时, 对于int64、double、varchar(50)数据类型, 效率已经不如MergeSort GroupBy.对于方案二频数分组聚合恰好相反, 同一桶内元素个数很小时, 效率提升非常细微.但随着同一桶内数据元素越来越多, Hash分组聚合相比MergeSort GroupBy效率提升越来越大.
4.5 Hash分组聚合节点级并行效率提升由于分布式数据库多副本备份特点, 每个数据节点都可以提供查询和简单计算功能.因此, 我们针对本文提出的Hash分组和聚合方案分别设计了1和3不同数据节点数进行实验, 具体实验结果如图 5.
![]() |
图 5 三节点不同数据分布效率提升图 Fig.5 The efficiency graph of three nodes with various data distributions |
从实验结果可以发现, 当我们存储节点设置为3时, 相比1个存储节点是有明显效率提升的, 针对各个数据类型, 效率提升都在20%之上.并且我们在监控服务器CPU利用率上, 发现三个CS物理存储节点将数据读取和计算压力大致平均分散在了三个服务器节点上.因此, 我们的节点并行方案在大负载、高并发下相比单节点会有更高的效率提升.
5 总结本文提出了两种在分布式系统中采用Hash方法进行分组聚合的方案, 并且提出了一种利用统计信息对两种方案进行动态决策的策略.最后, 针对分布式数据库多节点特性, 又提出一种Hash GroupBy算子节点级的并行方案.在分布式数据库OceanBase上具体实现了这些方案.实验结果表明, Hash GroupBy相比MergeSort GroupBy具有极大的效率提升; 并且针对不同数据分布情况, 两种Hash GroupBy的实现方案各有优点.因此, 根据我们提出的利用统计信息进行动态选择方案的策略, 可以使我们的Hash分组和聚合具有更高的效率提升.但是该方案还有进一步需要优化的空间.目前各数据存储节点对本节点内数据分组和聚合后, 数据查询节点尚且需要汇总各数据节点返回数据再做一次总的分组和聚合, 这将是以后优化的重点.
[1] | 杨传辉. 大规模分布式存储系统[M]. 北京: 机械工业出版社, 2013. |
[2] | HO C T, AGRAWAL R, MEGIDDO N, et al. Range queries in OLAP data cubes[C]//ACM SIGMOD International Conference on Management of Data. ACM, 2008: 73-88. |
[3] | ROUSSOPOULOS N. Materialized views and data warehouses[C]//ACM SIGMOD International Conference on Management of Data. ACM, 1998: 21-26. |
[4] | GUPTA H, MUMICK I S. Selection of views to materialize in a data warehouse[J]. IEEE Transactions on Knowledge & Data Engineering, 1997, 17(1): 24-43. |
[5] | GOVINDARAJAN S, AGARWAL P K, ARGE L. CRB-Tree:An efficient indexing scheme for range-aggregate queries[J]. Lecture Notes in Computer Science, 2003, 2572: 143-157. DOI:10.1007/3-540-36285-1 |
[6] | TAO Y, PAPADIAS D. Range aggregate processing in spatial databases[J]. IEEE Transactions on Knowledge & Data Engineering, 2004, 16(12): 1555-1570. |
[7] | SELLIS T. The R+-Tree: A dynamic index for multi-dimensional object[C]//Proceeding of the 13th VLDB Conf. VLDB, 1987: 507-518. |
[8] | TAO Y, SHENG C, CHUNG C W, et al. Range aggregation with set selection[J]. IEEE Transactions on Knowledge & Data Engineering, 2014, 26(5): 1240-1252. |
[9] | CONDIE T, CONWAY N, ALVARO P, et al. Online aggregation and continuous query support in MapReduce[C]//ACM SIGMOD International Conference on Management of Data. ACM, 2010: 1115-1118. |
[10] | DEAN J, GHEMAWAT S. MapReduce:simplified data processing on large clusters[J]. Communications of The ACM, 2008, 51(1): 107-113. DOI:10.1145/1327452 |