利用Flume将MySQL表数据准实时抽取到HDFS(转)

一、为什么要用到Flume

在以前搭建HAWQ数据仓库实验环境时,我使用Sqoop抽取从MySQL数据库增量抽取数据到HDFS,然后用HAWQ的外部表进行访问。这种方式只需要很少量的配置即可完成数据抽取任务,但缺点同样明显,那就是实时性。Sqoop使用MapReduce读写数据,而MapReduce是为了批处理场景设计的,目标是大吞吐量,并不太关心低延时问题。就像实验中所做的,每天定时增量抽取数据一次。
Flume是一个海量日志采集、聚合和传输的系统,支持在日志系统中定制各类数据发送方,用于收集数据。同时,Flume提供对数据进行简单处理,并写到各种数据接受方的能力。Flume以流方式处理数据,可作为代理持续运行。当新的数据可用时,Flume能够立即获取数据并输出至目标,这样就可以在很大程度上解决实时性问题。
Flume是最初只是一个日志收集器,但随着flume-ng-sql-source插件的出现,使得Flume从关系数据库采集数据成为可能。下面简单介绍Flume,并详细说明如何配置Flume将MySQL表数据准实时抽取到HDFS。

二、Flume简介

1. Flume的概念

Flume是分布式的日志收集系统,它将各个服务器中的数据收集起来并送到指定的地方去,比如说送到HDFS,简单来说flume就是收集日志的,其架构如图1所示。

图1

 

2. Event的概念

在这里有必要先介绍一下Flume中event的相关概念:Flume的核心是把数据从数据源(source)收集过来,在将收集到的数据送到指定的目的地(sink)。为了保证输送的过程一定成功,在送到目的地(sink)之前,会先缓存数据(channel),待数据真正到达目的地(sink)后,Flume再删除自己缓存的数据。
在整个数据的传输的过程中,流动的是event,即事务保证是在event级别进行的。那么什么是event呢?Event将传输的数据进行封装,是Flume传输数据的基本单位,如果是文本文件,通常是一行记录。Event也是事务的基本单位。Event从source,流向channel,再到sink,本身为一个字节数组,并可携带headers(头信息)信息。Event代表着一个数据的最小完整单元,从外部数据源来,向外部的目的地去。

3. Flume架构介绍

Flume之所以这么神奇,是源于它自身的一个设计,这个设计就是agent。Agent本身是一个Java进程,运行在日志收集节点——所谓日志收集节点就是服务器节点。 Agent里面包含3个核心的组件:source、channel和sink,类似生产者、仓库、消费者的架构。

  • Source:source组件是专门用来收集数据的,可以处理各种类型、各种格式的日志数据,包括avro、thrift、exec、jms、spooling directory、netcat、sequence generator、syslog、http、legacy、自定义。
  • Channel:source组件把数据收集来以后,临时存放在channel中,即channel组件在agent中是专门用来存放临时数据的——对采集到的数据进行简单的缓存,可以存放在memory、jdbc、file等等。
  • Sink:sink组件是用于把数据发送到目的地的组件,目的地包括hdfs、logger、avro、thrift、ipc、file、null、Hbase、solr、自定义。

 

4. Flume的运行机制

Flume的核心就是一个agent,这个agent对外有两个进行交互的地方,一个是接受数据输入的source,一个是数据输出的sink,sink负责将数据发送到外部指定的目的地。source接收到数据之后,将数据发送给channel,chanel作为一个数据缓冲区会临时存放这些数据,随后sink会将channel中的数据发送到指定的地方,例如HDFS等。注意:只有在sink将channel中的数据成功发送出去之后,channel才会将临时数据进行删除,这种机制保证了数据传输的可靠性与安全性。

三、安装Hadoop和Flume

我的实验在HDP 2.5.0上进行,HDP安装中包含Flume,只要配置Flume服务即可。HDP的安装步骤参见“HAWQ技术解析(二) —— 安装部署

四、配置与测试

1. 建立MySQL数据库表

建立测试表并添加数据。

2. 建立相关目录与文件

(1)创建本地状态文件

(2)建立HDFS目标目录

 

3. 准备JAR包

http://book2s.com/java/jar/f/flume-ng-sql-source/download-flume-ng-sql-source-1.3.7.html下载flume-ng-sql-source-1.3.7.jar文件,并复制到Flume库目录。

将MySQL JDBC驱动JAR包也复制到Flume库目录。

4. 建立HAWQ外部表

5. 配置Flume

属性 描述
agent.channels.ch1.type Agent的channel类型
agent.sources.sql-source.channels Source对应的channel名称
agent.channels Channel名称
agent.sinks Sink名称
agent.sources Source名称
agent.sources.sql-source.type Source类型
agent.sources.sql-source.connection.url 数据库URL
agent.sources.sql-source.user 数据库用户名
agent.sources.sql-source.password 数据库密码
agent.sources.sql-source.table 数据库表名
agent.sources.sql-source.columns.to.select 查询的列
agent.sources.sql-source.incremental.column.name 增量列名
agent.sources.sql-source.incremental.value 增量初始值
agent.sources.sql-source.run.query.delay 发起查询的时间间隔,单位是毫秒
agent.sources.sql-source.status.file.path 状态文件路径
agent.sources.sql-source.status.file.name 状态文件名称
agent.sinks.HDFS.channel Sink对应的channel名称
agent.sinks.HDFS.type Sink类型
agent.sinks.HDFS.hdfs.path Sink路径
agent.sinks.HDFS.hdfs.fileType 流数据的文件类型
agent.sinks.HDFS.hdfs.writeFormat 数据写入格式
agent.sinks.HDFS.hdfs.rollSize 目标文件轮转大小,单位是字节
agent.sinks.HDFS.hdfs.rollInterval hdfs sink间隔多长将临时文件滚动成最终目标文件,单位是秒;如果设置成0,则表示不根据时间来滚动文件
agent.sinks.HDFS.hdfs.rollCount 当events数据达到该数量时候,将临时文件滚动成目标文件;如果设置成0,则表示不根据events数据来滚动文件

表1

6. 运行Flume代理

图2

图3

图4

图5

7. 测试准实时增量抽取

五、方案优缺点

  • 在源库上执行了查询,具有入侵性。
  • 通过轮询的方式实现增量,只能做到准实时,而且轮询间隔越短,对源库的影响越大。
  • 只能识别新增数据,检测不到删除与更新。
  • 要求源库必须有用于表示增量的字段。
  • 即便有诸多局限,但用Flume抽取关系库数据的方案还是有一定的价值,特别是在要求快速部署、简化编程,又能满足需求的应用场景,对传统的Sqoop方式也不失为一种有效的补充。

 

:转载于http://blog.csdn.net/wzy0623/article/details/73650053

  • 支付宝打赏
若你觉得我的文章对你有帮助,欢迎点击上方按钮对我打赏
Administrator

Administrator

知人不必言尽,留三分余地与人,留些口德与己。 责人不必苛尽,留三分余地与人,留些肚量与己。 才能不必傲尽,留三分余地与人,留些内涵与己。 锋芒不必露尽,留三分余地与人,留些深敛与己。 有功不必邀尽,留三分余地与人,留些谦让与己。

发表评论

电子邮件地址不会被公开。 必填项已用*标注