2. 江苏省肉类生产与加工质量安全控制协同创新中心, 南京 210095
2. Jiangsu Collaborative Innovation Center of Meat Production and Processing, Quality and Safety Control, Nanjing 210095, China
近年来乳制品安全事件的连续披露对我国乳制品行业造成强有力冲击, 饲料成分异常、违规使用药品、加工过程不规范、运输与存储温度异常等均会产生乳制品质量问题.政府为此大力推广乳制品溯源机, 制促使更多企业加入到溯源队伍中.但这一举措造成溯源系统的数据种类多样化、数据处理复杂、数据规模庞大等问题, 使得仅通过升级硬件资源无法有效改善海量数据的存储与处理性能.因此, 寻找一套海量数据的存储和处理方案对乳制品溯源系统性能的提升有重要意义.
为了解决系统对大规模数据的存储和处理问题, 国内外专家做了大量研究.为提升数据处理的并行性, Abouzied等[1]将基于Web的远程蛋白质序列分析应用部署在HadoopDB[2]上. Ismail等[3]将Hadoop和Hive技术应用到DBpedia[4]中为新阿拉伯语启动新的SPARQL[5]端点, 改进后查询速度提升了约76%. LinkedIn公司将Hadoop技术和布隆过滤器(Bloom Fliter)应用到社交数据分析领域中, 使得数据处理性能提升了约10倍.王德文等[6]利用Hive创建电力信息数据仓库, 实现了数据分布式冗余存储和并行处理, 当待处理数据行数大于2
本文针对大规模乳制品溯源数据存储和处理的问题, 结合乳制品企业业务流程和数据特点, 构建了一个基于Hadoop/Hive的乳制品溯源平台, 并从增加数据存储量和提升数据存储及处理性能等方面对系统进行了优化.
1 乳制品溯源系统业务流程国际标准化组织认为利用可记录的标记物跟踪产品的不同阶段的历史、使用和位置等方面的信息能力即为溯源[11].食品法典委员会对食品溯源定义为对食品的生产、加工和流通阶段进行可追溯[11].因此, 清晰的供应链业务流程信息和合理的溯源单元、溯源信息的划分在整个食品溯源系统中起着至关重要的作用.
1.1 乳制品供应链业务流程本研究以江苏省溯源示范企业——南京卫岗乳业作为调研单位, 通过调研, 总结出了乳制品供应链业务流程(见图 1).按照生产流程将企业分为牧场、奶站、加工厂、仓储、物流配送、零售商等6类.记录这些企业的生产流通数据并制定企业间统一数据接口, 以形成整个乳制品溯源供应链.消费者通过溯源码来获取产品信息, 发现问题可及时投诉, 让企业与监管部门在第一时间给出处理方案.
不同乳制品企业的管理模式、质量安全、生产规模大小都有所不同, 需要根据实际需求构建合理的乳制品溯源系统[12].通过对上述供应链业务流程分析后发现, 乳制品生产和流通环节较复杂, 其过程中会产生和积累大量生产流通数据, 要实现所有信息的溯源将会产生高额的通信成本并消耗大量查询时间.为减少溯源成本需明确关键溯源信息.具体乳制品溯源单元和溯源信息的划分情况见图 2所示.
牧场溯源信息主要为奶牛饲养、检疫和奶质检测等.奶站溯源信息主要为原奶转发质检数据、环境参数等.加工厂溯源信息主要为原料验收、环境卫生以及产品质检等.仓储中心溯源信息为存储环境和存储周期.产品运输溯源信息主要为环境参数.零售商主要溯源信息为产品履历表.
2 基于Hadoop/Hive的乳制品溯源框架设计Hadoop是一个发展较完备的大数据分布式并行处理架构, 它提供了很多成熟的组件, 有益于进行功能扩展. Hive的设计理念和使用方式与MySQL较相似, 选用Hive可以通过类SQL语句快速实现MapReduce统计, 不必开发专门的MapReduce应用.为了缓解传统乳制品溯源系统对大规模数据存储和处理的压力, 本文搭建了基于Hadoop/Hive的乳制品溯源框架.
2.1 溯源框架总视图基于Hadoop/Hive乳制品溯源框架总视图包含乳制品企业平台、Hadoop/Hive海量数据存储与处理平台和监管部门应用服务器等3个模块.消费者通过访问监管部门提供的溯源系统, 就可实现乳制品溯源.溯源框架总视图见图 3所示.
据调研, 大型乳制品企业的子机构遍布全国各地, 但这些企业已按地理位置和机构的不同建立了分布式数据库, 实现了企业内部数据溯源.这些企业仅需向监管部门的溯源平台提供数据接口, 定期上传数据, 就可协助完成统一的溯源.没有构建溯源系统的中小型企业, 则依托监管部门提供的溯源平台, 定期上传数据实现溯源.
Hadoop/Hive数据存储与处理平台, 使用HDFS[13]分布式文件系统实现底层存储, 将各企业上传的生产、流通、质检、物流等数据分割成数据块后, 冗余存储在各从节点上以缓解海量数据的存储压力; 同时将待处理的作业分割后交由各从节点并行处理, 处理结果存入监管部门的关系型数据库中.
监管部门通过访问本地服务器获取企业用户上传的数据、Hadoop/Hive平台数据处理结果.消费者通过访问监管部门提供的溯源系统实现溯源.
以上3模块间的数据流通由Sqoop API和Hive JDBC实现, 数据传送方式见图 4所示. Sqoop API将已存储在关系型数据库中的数据传送至集群, Hive JDBC将未存入关系型数据库的文本数据批量传送至集群.同时, Hadoop/Hive集群处理的结果通过Sqoop API发送给监管部门的服务器, 监管部门和消费者通过MySQL JDBC从溯源系统中获取数据.
从实际应用角度出发, 构建Hadoop/Hive乳制品溯源系统应用框架, 框架自底向上由数据获取平台、数据存储中心、数据处理中心和用户溯源平台等4部分组成.详细内部构建见图 5所示.
(1) 数据源层, 数据来源于企业的生产、质检、仓储、零售等过程, 这些数据按种类和特点的不同分别放在存储层相应的数据库中.
(2) 存储层, 为数据源层数据提供持久化存储, 主要包括HDFS分布式文件存储、Hive数据存储、传统关系型数据库存储以及大型乳制品企业内部的分布式数据库存储.
(3) 分析层, 对数据源层数据进行访问、处理及访问速率优化, 为应用层提供结构化数据并行处理以及查询优化等功能.
(4) 应用层, 直接面向用户, 由公众平台、企业管理平台和监管平台构成, 为消费者和监管部门提供了对企业产品质量督促和监管的渠道.
本文主要实现了存储层的数据交互, 在分析层对存储层的数据进行统计、查询等操作, 并对系统整体性能进行测试.
3 基于Hadoop/Hive的乳制品溯源框架性能测试 3.1 实验环境配置为了验证上述框架对海量生产数据和溯源数据的存储与处理性能, 搭建了基于Hadoop/Hive的大数据实验平台, 平台中的各模块软硬件配置情况见表 1所示.
Hadoop/Hive大数据存储与处理模块由1台管理机Master和3台从机Slave构成, 管理机对从机进行作业调度和任务分配. HDFS的NameNode和DataNode协作完成数据分布式存储, 分割后的数据块(Block)按一定策略冗余存储在各子节点中[14]. MapReduce中的JobTracker和TaskTracker负责任务分配和数据并行处理[15]. Hive通过编译器将HiveQL语句转换为MapReduce程序以提供结构化数据的分布式存储和并行处理[16].
MySQL Cluster分布式数据库用于模拟拥有内部乳制品溯源系统的大型乳制品企业的分布式数据库.将乳制品溯源系统部署在Web服务器上, 通过Hive JDBC和MySQL JDBC对Hadoop/Hive模块和MySQL Cluster分布式数据库进行数据操作.其中, Hadoop/Hive和MySQL Cluster两个模块内部通过Sqoop API和Hive JDBC进行数据交互.基于Hadoop/Hive乳制品溯源系统部署见图 6.
本文实验数据来源于某农业信息服务公司提供的近3年奶牛生产数据, 主要涉及产奶批次、DHI[17]数据、销售数据、奶品转发日志、投料信息、添加剂数据、仓储数据和配送数据等多张关系表, 平均记录条数达2 000多万条.
3.4 实验方法及结果分析 3.4.1 数据导入性能为了比较采用传统关系型数据库和Hadoop/Hive技术前后系统数据导入性能的变化, 分别在MySQL数据库和Hadoop/Hive平台中创建相应的实验关系表, 每种记录条数分别在MySQL和Hadoop/Hive环境导入, 记录条数按5万条递增, 数据格式为TXT, 每次导入耗时对比见表 2所示, 由表 2中数据可知系统平均导入速度提升了87.43%.
经分析发现, MySQL需先对SQL语句进行解析、预处理和执行优化, 再逐行对导入数据的合法性进行检查, 检查通过后方可将数据导入.因此随着导入记录条数的大幅增加, MySQL数据导入时间会随之急剧递增.而Hadoop/Hive获取HiveQL语句后, 对其解析和编译并生成可执行计划发送至JobTracker, JobTracker使用HDFS进行写入操作时不需要对导入数据的合法性进行一一验证, 而是直接将数据文件导入至相应节点.这样随着数据量的增加, Hadoop/Hive无需产生额外的数据验证时间, 数据导入时间也不会明显增加.同时, 它还利用DataNode内部管道流机制降低数据传送耗时.因此, 引入Hadoop/Hive技术提升了系统导入大规模数据的能力, 缩短了企业上传数据的时间.
3.4.2 数据查询性能为了比较采用传统关系型数据库和Hadoop/Hive技术前后系统数据查询性能的变化, 按实际生产需求对实验关系表中的数据进行了查询操作, 如查询出某牧场近3个月牛奶乳脂率低于3%的牛只的耳标、所在牛舍号、品种、健康状况等信息; 查询产奶量高于给定数值的牛只耳标、牧场号、牛舍号、原奶批次等信息; 查询某批次原奶槽罐清洁度、温湿度运输车车牌号等信息.每种查询分别在MySQL和Hadoop/Hive下查询2次, 执行时各个关系表的记录数为50万条~3000万条, 间隔为50万条.记录不同记录条数下查询操作总耗时, 求其平均消耗时间, 数据查询耗时对比见图 7所示.
由图 7可知, 记录条数低于1 150万条时, MySQL的耗时低于Hadoop/Hive, 主要因为Hadoop/Hive执行HiveQL语句的初始化时间较长, 需将解析后的HiveQL语句转换为能在Hadoop/Hive平台上执行的MapReduce程序, 再由Hadoop对MapReduce任务进行分配并转交由各个子结点并行执行, 而MySQL数据库初始化时间比较短, 几乎可以忽略不计; 当记录条数逐渐超过1 150万条时, MySQL的耗时逐渐高于Hadoop/Hive, 且差距在不断加大.造成这一现象的原因是, Hive编译器将HiveQL语句编译为MapReduce程序的耗时与记录条数无关, 所以初始化的时间基本保持不变; 且Hadoop/Hive较MySQL有数据分布式存储和数据并行处理的优势, 所以记录条数大于一定数值时, Hadoop/Hive平台数据访问消耗时间开始低于MySQL, 逐渐体现出大数据处理的优势.
通过以上结果分析可知, 引入Hadoop/Hive技术提升了系统大规模数据的查询能力.
3.4.3 数据交互性能为了比较生产数据从文本文件直接传送到关系型数据库MySQL和使用Sqoop API将数据从MySQL Cluster分布式数据库中的某一节点传送至Hadoop/Hive模块中性能差异, 分别在MySQL、MySQL Cluster和Hadoop/Hive模块中创建相应的关系表, 按不同记录数进行数据交互.
第一组实验, 使用Sqoop API将MySQL Cluster中不同记录数的数据迁移到Hadoop/Hive平台中; 第二组实验将不同记录数的生产数据的文本文件导入到MySQL数据库中.两组实验中不同记录条数的数据分别迁移2次, 记录其消耗时间, 求不同记录条数的平均耗时.
将文本数据导入MySQL与利用Sqoop API将MySQL Cluster某站点数据迁移到Hadoop/Hive平台的执行平均耗时对比见表 3所示.由表 3中数据可知, 引入Sqoop数据迁移工具后, 系统整体数据交互速度平均提升了58.16%.
产生这一现象的原因如下: Sqoop进行数据迁移时, 其MadpReduce任务分割后由不同节点并行执行, 并且执行过程中无需对导入数据的合法性进行验证; 而MySQL数据导入时需对所有数据的合法性进行检验.随着数据规模的持续扩大, MySQL的消耗时间迅速增加, 而Sqoop数据消耗时间增加缓慢.
与此同时, 为了验证利用Sqoop API将Hadoop/Hive海量数据分析结果迁移至监管部门服务器的性能, 按5万条记录数递增, 对45万条记录进行了实验. Hive-MySQLCluster数据迁移平均耗时见表 4所示.
分析表 4中数据可知, Hive-MySQL Cluster数据迁移最初耗时远大于Txt-MySQL; 随着数据量的增加, Hive-MySQL Cluster数据迁移耗时增长缓慢, 而Txt-MySQL的耗时急剧增长; 当数据量达到40万条时, Hive-MySQL Cluster耗时开始低于Txt-MySQL, 时间增长速度非常缓慢, 基本满足溯源系统的时间需求.
综上可知, 与传统的将文本文件上传至MySQL中以完成各站点数据采集的方式相比, 使用Hadoop/Hive技术和Sqoop数据迁移技术大大提升了系统进行海量数据交互的能力.
4 总结与展望通过以上对基于Hadoop/Hive的大数据实验平台的性能验证, 可知Hadoop/Hive技术提升了乳制品溯源系统的整体性能, 具体如下.
(1) 减轻了企业每日向监管部门传送数据的压力.由企业直接向关系型数据库传送生产数据转变为企业向Hadoop/Hive数据处理平台传送数据, 解决了原系统中企业远程传送海量数据至监管部门关系型数据库中导致的数据传送和解析缓慢问题;
(2) 提升了系统数据存储的规模.海量数据分割后冗余存储于各从机中, 扩大了数据存储规模;
(3) 提高了系统数据处理能力.由传统数据处理方式变为分布式并行处理, 提升了大规模数据的处理速度. Hadoop/Hive定期处理数据并将结果存储于监管部门的服务器中使得用户不再需要等待服务器在线处理数据.
然而, 该乳制品溯源系统在对实时数据的处理方面还有待完善, 后续工作将尝试使用Strom、Spark等支持内存计算的技术提升整体系统的实时数据处理性能, 使之能够更好地为乳制品溯源服务, 并将其推广到其他溯源应用中.
[1] | ABOUZIED A, BAJDA-PAWLIKOWSKI K, HUANG J, et al. HadoopDB in action: Building real world applications[C]//ACM SIGMOD International Conference on Management of Data. ACM, 2010: 1111-1114. |
[2] | ABOUZEID A, BAJDA-PAWLIKOWSKI K, ABADI D, et al. HadoopDB:An architectural hybrid of MapReduce and DBMS technologies for analytical workloads[J]. Proceedings of the VLDB Endowment, 2009, 2(1): 922-933. DOI:10.14778/1687627 |
[3] | ISMAIL A S, AL-FEEL H, MOKHTAR H M O. Introducing a new arabic endpoint for DBpedia internationalization project[C]//International Database Engineering & Applications Symposium. ACM, 2016: 284-289. |
[4] | TORRES D, SKAF-MOLLI H, MOLLI P. et al. BlueFinder: Recommending wikipedia links using DBpedia properties[C]//Proceedings of the 5th Annual ACM Web Science Conference (WebSci'13). New York: ACM, 2013: 413-422. DOI: https://doi.org/10.1145/2464464.2464515. |
[5] | 叶育鑫, 欧阳丹彤. 混合语义约简和选择估值优化SPARQL[J]. 电子学报, 2010, 38(5): 1205-1210. |
[6] | 王德文, 肖凯, 肖磊. 基于Hive的电力设备状态信息数据仓库[J]. 电力系统保护与控制, 2013(9): 125-130. DOI:10.7667/j.issn.1674-3415.2013.09.021 |
[7] | 曲朝阳, 朱莉, 张士林. 基于Hadoop的广域测量系统数据处理[J]. 电力系统自动化, 2013, 37(4): 92-97. DOI:10.7500/AEPS201111169 |
[8] | 刘越, 李锦涛, 虎嵩林. 基于代价估计的Hive多维索引分割策略选择算法[J]. 计算机研究与发展, 2016, 53(4): 798-810. DOI:10.7544/issn1000-1239.2016.20151163 |
[9] | 董新华, 李瑞轩, 周湾湾, 等. Hadoop系统性能优化与功能增强综述[J]. 计算机研究与发展, 2013, 50(s2): 1-15. |
[10] | THUSOO A, SARMA J S, JAIN N, et al. Hive:A warehousing solution over a map-reduce framework[J]. Proceedings of the VLDB Endowment, 2009, 2(2): 1626-1629. DOI:10.14778/1687553 |
[11] | OLSEN P, BORIT M. How to define traceability[J]. Trends in Food Science & Technology, 2013, 29(2): 142-150. |
[12] | 钱建平, 刘学馨, 杨信廷, 等. 可追溯系统的追溯粒度评价指标体系构建[J]. 农业工程学报, 2014, 30(1): 98-104. |
[13] | SHVACHKO K, KUANG H, RADIA S, et al. The Hadoop distributed file system[C]//Proceedings of the IEEE 26th Symposium on Mass Storage Systems and Technologies. Washington: IEEE Computer Society, 2010: 1-10. DOI: 10.1109/MSST.2010.5496972. |
[14] | 张良均. Hadoop大数据分析与挖掘实战[M]. 北京: 机械工业出版社, 2016. |
[15] | 荀亚玲, 张继福, 秦啸. MapReduce集群环境下的数据放置策略[J]. 软件学报, 2015(8): 2056-2073. |
[16] | 叶晓江, 刘鹏. 实战Hadoop2.0从云计算到大数据[M]. 北京: 电子工业出版社, 2016. |
[17] | 张佳兰, 昝林森, 刘永峰, 等. 我国DHI测定现状及存在的问题[J]. 中国牛业科学, 2007, 33(5): 56-59. |