大数据采集技术有哪些平台数据采集

今天我就来讲讲大数据平台建设中的数据收集和整合。当我们第一次谈到BI或MDM系统时,我们也涉及到了数据集成和交换,但一般来说完全可以通过ETL工具或技术来解决。但在大数据平台建设中,数据采集的实时性要求发生了变化,数据采集整合的类型也变得多样化,这是整个大数据平台采集整合发生变化的重要原因。

首先,这里有一个观点,即:

并不是指望用单一的工具或技术就能完成大数据的采集和整合,而是要根据数据采集的实时性要求、采集的数据类型、数据量等采用不同的方法和技术。

因此,今天我们将主要讲解不同的大数据收集和集成场景。

结构化数据的数据实时同步复制

结构化数据库的实时同步复制最早出现在类似于远程双活动的多中心基础设施建设中。目前,它常用于数据库读写分离集群的许多场景中。简单来说就是通过同步数据库复制将读写分开,从而实现读取集群本身的横向弹性扩展能力。

对于实时数据库同步和复制,两个商业产品是Oracle GoldenGate和Quest SharePlex。网上有很多详细的介绍。它们的核心特性是支持异构数据库之间的实时数据同步和责任,并且对源数据库本身的干扰较小。

这两种商业产品基本上都是分析各种数据库的日志文件,然后进行复制。

这篇文章有可能是自我研究的吗?

对于Mysql来说,因为Binlog日志的方法,像淘宝这样的Otter完全可以实现数据库的实体同步复制。如果只是简单的在Oracle和Oracle数据库之间,我们也可以使用Oracle DataGuard或者Oracle Stream流复制技术进行复制,然后基于Oracle LogMiner进行重做日志分析后同步两个数据库。

因此,关键问题是异构数据库之间的同步复制。对于数据库复制,目前Oracle常用的解决方案主要有:

Oracle日志文件,如LogMiner,OGG,shareplex Oracle CDC变更数据捕获)Oracle触发机制,如DataBus,SymmetricDSoracle物化视图,如淘宝的龚宇开源,可以看出在这些解决方案中有开源的对称DS解决方案,但基于触发机制,它是相对侵入性的。淘宝的龚宇可以实现Oracle-mysql的全拷贝或增量拷贝,但基于增量物化视图,会影响源数据库数据表的CUD操作。

事实上,最好的解决方案仍然是基于日志日志的实时同步复制,其核心思想包括三个步骤。

当源库设置为日志或归档模式时,源库可以首先记录日志信息。

可以读取实时日志信息,并且可以适当地分析或映射日志信息,包括与目标库的适配。

直接在目标数据库中运行对应解析后的日志SQL语句,实现同步更新。因为Mysql本身提供了可读性很高的Binlog日志,所以我们可以看到Mysql-Mysql和Mysql-Oracle的实时同步日志问题都可以很好的解决。甲骨文-甲骨文也可以解决这个问题。比较难的是Oracle-Mysql或者其他异构数据库。在这里,我们需要分析Oracle的重做日志目前,Oracle提供logminer工具)。如果我们自己编写一个分析包,我们需要对Oracle重做日志结构有一个完整的了解。

结合Oracle流复制技术,我们可以考虑Oracle首先将变更信息写入自己的AQ,然后我们订阅来自AQ的消息,然后直接处理或者写入自己的消息队列或者流处理软件,然后在流处理软件中完成相关的映射转换,再写入目标异构数据库。

Sqoop和Flume数据采集和集成

从Hadoop提供的标准技术架构和开源工具集来看,数据收集和集成的重点是两个工具,一个是Sqoop,一个是Flume。

Sqoop主要用于在HadoopHive)和传统数据库mysql、postgresql.).它可以从关系数据库如MySQL、Oracle、Postgres等)导入数据。)导入到Hadoop的HDFS,还将来自HDFS的数据导入到关系数据库中。

水槽是Cloudera提供的水槽。

个高可用的,高可靠的,分布式的海量日志采集、聚合和传输的系统,Flume支持在日志系统中定制各类数据发送方,用于收集数据;同时,Flume提供对数据进行简单处理,并达到各种数据接受方(可定制)的能力。

对于两者的区别简单说明如下:

Sqoop只支持结构化数据和HDFS之间的数据集成,Flume支持文件和日志Sqoop基于Mapreduce的批处理机制,Flume基于事件和流处理机制Sqoop偏定时处理,Flume偏实时或准实时处理当面对的是批量和大数据的时候,Sqoop性能好于Flume

在采用Sqoop方式进行数据采集和集成的时候,需要考虑的就是增量数据采集。增量数据导入分两种,一是基于递增列的增量数据导入(Append方式)。二是基于时间列的增量数据导入(LastModified方式)。当前这两种方式Sqoop已经支持。

而对于Flume,最早仅用于日志文件的实时采集和处理,而当前的Agent已经能够支持对结构化数据库的适配,也就是说结构化数据库的数据也可以类似流处理的方式采集到Hdfs库。

开源DataX数据采集和集成

DataX 是阿里开源的一个异构数据源离线同步工具,致力于实现包括关系型数据库MySQL、Oracle等)、HDFS、Hive、ODPS、HBase、FTP等各种异构数据源之间稳定高效的数据同步功能。

DataX本身作为数据同步框架,将不同数据源的同步抽象为从源头数据源读取数据的Reader插件,以及向目标端写入数据的Writer插件,理论上DataX框架可以支持任意数据源类型的数据同步工作。对于DataX整个架构设计最好的地方就是将Reader和Writer分离,你可以灵活地定义各种读写插件进行扩展。

Reader:Reader为数据采集模块,负责采集数据源的数据,将数据发送给Framework。Writer: Writer为数据写入模块,负责不断向Framework取数据,并将数据写入到目的端。

我们自己当前研发和使用的DIP大数据集成平台,也是在DataX底层引擎的基础上扩展了数据源配置,数据对象定义和管理,数据调度和任务管理,日志监控等功能。形成一个完善的大数据采集和集成工具平台,如下:

对于DataX可以看到实际和Sqoop大部分功能都相同,但是两者本身架构实现机制还是有差异。Sqoop本身是基于Hadoop的MapReduce机制进行分布式作业,而对于DataX则是自己对Job进行切分,然后并行执行。

对于DataX和Sqoop实际在单节点测试情况来看,两者在性能上的差距并不明显。但是数据源是Oracle,Msyql数据库的时候,DataX的性能略好;而当数据源是Hdfs库的时候,Sqoop性能略好。但是开源的DataX不支撑分布式集群,这个本身也对于大数据量下的架构扩展有影响。单节点的峰值传输能力在15M/S左右。

当前gitbub有对datax定制的管理平台开源,可以参考:

https://github.com/WeiYe-Jing/datax-web

自实现数据采集平台

而对于常规的数据库包括大数据存储之间的采集和集成,在充分考虑性能的情况下,核心思路为:

1. 将源数据库数据进行导出,使用Sql或DB原生的导出命令直接导出为txt文件,字段以分隔符进行分隔。
1.1 可以部署多个代理端,对数据库数据启用多个线程进行导出
1.2 支持基于key值或时间戳的增量数据导出
2. 对导出的数据进行压缩后进行传输(特别是在源和目标库不在同一个数据中心时)
3. 在目标库端基于数据库原生的load命令对数据进行bulk批量导入。

在整个实现里面有两个核心,一个就是可以启用多个代理端和多线程机制并行导出数据库,其次就是导出数据压缩传输,然后在通过load data原生命令进行数据库的bulk批量装载提升性能。

如果基于以上思路我们可以看到数据采集的重点还是在性能上面,不会去实现ETL工具本身复杂的数据映射和转化,数据聚合等操作。核心只是做异构数据库和Hdfs文件存储之间的数据搬移。而我们完全自己研发的DataPipe产品基本参考上述思路实现,其测试性能对于结构化数据库之间采集和集成是Sqoop或DataX的2-3倍左右,而对于hdfs之间的集成则在5-10倍左右的性能提升。

该思路在远程数据传输和集成中,有明显的性能优势。比如内蒙数据中心的批量数据要传输到贵州大数据中心。一个10G的数据在源端导出后并压缩后只有100M左右的大小,整个处理机制则是将压缩数据传输到贵州后再进行解压和入库。

但是整个方案涉及到需要在源端配置Agent代理,因此本身对源端具有一定的侵入性,导致整体应用效果并不太好。

对于这种采集存在的约束就是不要去处理数据变更的问题,仅仅是做数据的全量同步或者是数据库表数据的简单Append处理,否则性能本身会下降很多。如果有大量数据更新需要同步,最好的方式还是首先Truncate掉目标数据库表,然后再进行全量同步。简单验证对于Mysql数据库间100万数据,180M左右数据量的全量同步整体同步时间在14秒左右即全部完成。

虽然这个采集工具现在没有大范围使用,但是却对整体大数据采集和集成实施,功能扩展方面积累了相应的技术经验。

流处理模式

在前面谈Flume日志采集,当时对于日志采集和分析还有比较主流的ELK方案,其中对于日志采集部分重点通过Logstash来实现。

Logstash是一款开源的数据收集引擎,具备实时管道处理能力。简单来说,logstash作为数据源与数据存储分析工具之间的桥梁,结合ElasticSearch以及Kibana,能够极大方便数据的处理与分析。

从上图可以看到Logstash核心就是一个数据源和数据存储间的连接通道,而在ELK方案里面ElasticSearch就是支持全文检索的分布式存储。如果采集的数据量和并发量很大,还可以在ElasticSearch前增加Kafka消息中间件来实现前端输入的削峰处理。

实际上可以看到ELK方案本身和大数据平台的采集和集成关系并不密切,可以看做是针对日志采集分析的一个补充。

如果上面的方式更多的是流式采集和存储的话,还有一个就是流式计算。简单来说就是采集过来的数据不是简单的导入到目标存储中,而是对采集到的数据进行实时的处理和加工,将处理完成后的中间结果存储到目标库中。

比如当前谈得比较多的SparkStream流式计算框架。

举个简单例子,当前我们的ESB总线每天运行3000万次,产生3000万条的实例日志记录,但是我们并不希望将所有数据写入到目标库,而是希望按分钟为单位写入一个统计数据到目标库。传统方式我们是定时进行处理,而采用流式计算框架后可以做到实时或准实时处理。

前面谈采集,可以看到在源和目标之间增加了一个采集集成工具。

即: 源端 — 采集集成工具 — 目标端

而流式计算框架后整个过程增加了计算环节如下:

即: 源端 — 采集集成工具 — 计算 – 目标端

Spark Streaming 是一套优秀的实时计算框架。根据其官方文档介绍,Spark Streaming 有高扩展性、高吞吐量和容错能力强的特点。Spark Streaming 支持的数据输入源很多,例如:Kafka、Flume、Twitter、ZeroMQ 和简单的 TCP 套接字等等。

所以乐观的帅哥的场景不是简单的将原生数据无变化的采集到大数据平台的贴源层,而是需要进行加工处理仅仅写入中间态数据的话,就需要在传统方案的基础上增加类似SparkStream处理环境,或者进行二次采集集成处理。

Published by

风君子

独自遨游何稽首 揭天掀地慰生平

发表回复

您的电子邮箱地址不会被公开。 必填项已用 * 标注