Flume入门学习笔记

Flume是Cloudera提供的一个高可用的,高可靠的,分布式的海量日志采集、聚合和传输的软件

Flume的核心是把数据从数据源(source)收集过来,再将收集到的数据送到指定的目的地(sink)。为了保证输送的过程一定成功,在送到目的地(sink)之前,会先缓存数据(channel),待数据真正到达目的地(sink)后,flume在删除自己缓存的数据。

Flume支持定制各类数据发送方,用于收集各类型数据;同时,Flume支持定制各种数据接受方,用于最终存储数据。一般的采集需求,通过对flume的简单配置即可实现。针对特殊场景也具备良好的自定义扩展能力。因此,flume可以适用于大部分的日常数据采集场景。

当前Flume有两个版本。Flume 0.9X版本的统称Flume OG(original generation),Flume1.X版本的统称Flume NG(next generation)。由于Flume NG经过核心组件、核心配置以及代码架构重构,与Flume OG有很大不同,使用时请注意区分。改动的另一原因是将Flume纳入 apache 旗下,Cloudera Flume 改名为 Apache Flume。

运行机制

Flume分布式系统中最核心的角色是agent,agent本身是一个Java进程,一般运行在日志收集节点。flume采集系统就是由一个个agent所连接起来形成的。

每一个agent相当于一个数据传递员 ,内部有三个组件:

  • Source:采集组件,用于跟数据源对接,以获取数据
  • Sink:下沉组件,用于往下一级agent传递数据或者往最终存储系统传递数据
  • Channel:传输通道组件,用于从source将数据传递到sink

在整个数据的传输过程中,流动的是event,他是Flume内部数据传输的最基本单元,event将传输的数据进行封装。如果是文本文件,通常是一行记录,event也是事务的基本单位。Event从source,流向channel,再到sink,本身为一个字节数组,并可携带headers信息,event代表着一个数据的最小完整单元,从外部数据源来,向外部的目的地去。

一个完整的event包括:event headers、event body、event信息,其中event信息就是flume收集到的日志记录。

Flume采集系统结构图

简单结构

单个agent采集数据

复杂结构

多个agent之间串联。在串联的架构中没有主从之分,地位都是一样的。

安装部署

在安装flume之前需要安装hadoop环境。flume的安装比较简单,下载好安装包,解压就好了。以下为安装步骤:

  1. 下载解压,修改配置文件

    官网地址:https://flume.apache.org/

    下载地址:http://flume.apache.org/download.html

    网盘地址:链接:https://pan.baidu.com/s/1QoCMAK-P_gyx_fWHNTnCKw 提取码:n60i

    1
    2
    3
    4
    $ tar -zxvf /export/software/apache-flume-1.8.0-bin.tar.gz -C /安装目录
    $ cd /apache-flume-1.8.0/conf
    $ cp flume-env.templete.sh flume-env.sh
    $ vim flume-env.sh
    1
    export JAVA_HOME=jdk的安装目录
  2. 开发配置文件

    根据数据采集的需求配置采集方案,描述在配置文件中(文件名可任意自定义)

    这里举例:配置网络收集的配置文件

    在flume的conf目录下新建一个配置文件(采集方案)

    1
    $ vim /export/servers/flume/conf/netcat-logger.conf
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    # 定义这个agent中各组件的名字
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1

    # 描述和配置source组件:r1
    a1.sources.r1.type = netcat
    a1.sources.r1.bind = localhost
    a1.sources.r1.port = 44444

    # 描述和配置sink组件:k1
    a1.sinks.k1.type = logger

    # 描述和配置channel组件,此处使用是内存缓存的方式
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000
    a1.channels.c1.transactionCapacity = 100

    # 描述和配置source channel sink之间的连接关系
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1
  3. 启动agent去采集数据

    1
    2
    3
    4
    ## -c/--conf conf 指定flume自身的配置文件所在目录 
    ## -f/--conf-file conf/netcat-logger.conf 指定我们所描述的采集方案
    ## -n/--name a1 指定代理的名称
    $ bin/flume-ng agent --conf conf --conf-file conf/netcat-logger.conf --name a1 -Dflume.root.logger=INFO,console
  4. 安装telent测试

    首先安装telnet客户端,用于模拟数据的发送

    1
    2
    3
    $ yum -y install telnet
    # 安装成功后,启动telnet
    $ telnet node03 44444

采集示例

采集目录到HDFS

需求:某服务器的某特定目录下,会不断产生新的文件,每当有新文件出现,就需要把文件采集到HDFS中去

根据需求,首先定义以下3大要素:

  • 数据源组件,即source ——监控文件目录 : spooldir

    • spooldir特性:
      • 监视一个目录,只要目录中出现新文件,就会采集文件中的内容
      • 采集完成的文件,会被agent自动添加一个后缀:COMPLETED
      • 所监视的目录中不允许重复出现相同文件名的文件
  • 下沉组件,即sink——HDFS文件系统 : hdfs sink

  • 通道组件,即channel——可用file channel 也可以用内存channel

配置文件编写

配置文件编写:

1
2
3
$ cd  /apache-flume-1.8.0/conf
$ mkdir -p /export/servers/dirfile/
$ vim spooldir.conf
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
# Name the components on this agent
a1.sources=r1
a1.sinks=k1
a1.channels=c1

# Describe/configure the source
##注意:不能往监控目中重复丢同名文件
a1.sources.r1.type=spooldir
a1.sources.r1.spoolDir=/root/logs
a1.sources.r1.fileHeader=true

# Describe the sink
a1.sinks.k1.type=hdfs
a1.sinks.k1.channel=c1
a1.sinks.k1.hdfs.path=/flume/events/%y-%m-%d/%H%M/
a1.sinks.k1.hdfs.filePrefix=events-

# 是否开启时间上的舍弃 控制文件夹以多少时间间隔滚动
# 以下表示:每10分钟生成一个文件夹
a1.sinks.k1.hdfs.round=true
a1.sinks.k1.hdfs.roundValue=10
a1.sinks.k1.hdfs.roundUnit=minute

# roll控制写入hdfs文件,以何种方式进行滚动
a1.sinks.k1.hdfs.rollInterval=3 ## 以时间间隔
a1.sinks.k1.hdfs.rollSize=20 ## 以文件大小
a1.sinks.k1.hdfs.rollCount=5 ## 以event个数
# 如果三个都配置 谁先满足谁触发滚动;如果不想以某个属性滚动 设置为0即可

a1.sinks.k1.hdfs.batchSize=1
a1.sinks.k1.hdfs.useLocalTimeStamp=true
#生成的文件类型,默认是Sequencefile,可用DataStream,则为普通文本
a1.sinks.k1.hdfs.fileType=DataStream

# Use a channel which buffers events in memory
a1.channels.c1.type=memory
a1.channels.c1.capacity=1000
a1.channels.c1.transactionCapacity=100

# Bind the source and sink to the channel
a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1

启动flume

1
$ bin/flume-ng agent -c ./conf -f ./conf/spooldir.conf -n a1 -Dflume.root.logger=INFO,console

上传文件到指定目录

将不同的文件上传到下面目录里面去。

1
$ cd /export/servers/dirfile/

注意:

  • 注意其监控的文件夹下面不能有同名文件的产生
  • 如果有 报错且罢工 后续就不再进行数据的监视采集了
  • 通常给文件追加时间戳命名的方式保证文件不会重名

采集文件到HDFS

需求:业务系统使用log4j生成的日志,日志内容不断增加,需要把追加到日志文件中的数据实时采集到hdfs

根据需求,首先定义以下3大要素:

  • 采集源,即source——监控文件内容更新 : exec ‘tail -F file’

  • 下沉目标,即sink——HDFS文件系统 : hdfs sink

  • Source和sink之间的传递通道——channel,可用 file channel 也可以用 memory channel

flume配置文件开发

1
2
$ cd /export/servers/apache-flume-1.8.0/conf
$ vim tail-file.conf

配置文件内容

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /root/logs/test.log
a1.sources.r1.channels = c1

# Describe the sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /flume/tailout/%y-%m-%d/%H%M/
a1.sinks.k1.hdfs.filePrefix = events-
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 10
a1.sinks.k1.hdfs.roundUnit = minute
a1.sinks.k1.hdfs.rollInterval = 3
a1.sinks.k1.hdfs.rollSize = 20
a1.sinks.k1.hdfs.rollCount = 5
a1.sinks.k1.hdfs.batchSize = 1
a1.sinks.k1.hdfs.useLocalTimeStamp = true
#生成的文件类型,默认是Sequencefile,可用DataStream,则为普通文本
a1.sinks.k1.hdfs.fileType = DataStream

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

启动flume

1
$ bin/flume-ng agent -c conf -f conf/tail-file.conf -n a1 -Dflume.root.logger=INFO,console

测试

编写一个shell脚本定时追加文件内容

1
2
3
$ mkdir -p /shells/
$ cd /shells/
$ vim tail-file.sh
1
2
3
4
5
6
#!/bin/bash
while true
do
date >> /root/logs/test.log;
sleep 0.5;
done

创建文件夹

1
$ mkdir -p /root/logs/taillogs

启动脚本

1
$ sh /shells/tail-file.sh

Flume的load-balance、failover

  • 负载均衡是用于解决一台机器(一个进程)无法解决所有请求而产生的一种算法;

  • 同一个请求只能交给一个进行处理,避免数据重复;

  • 如果分配请求就涉及到了负载均衡的算法:轮询(round_robin)随机(random)权重;

Load balancing Sink Processor能够实现load balance功能,如下图Agent1是一个路由节点,负责将Channel暂存的Event均衡到对应的多个Sink组件上,而每个Sink组件分别连接到一个独立的Agent上,示例配置,如下所示:

注意:flume串联启动,通常从远离数据源的那一级开始启动。

1
2
3
4
5
6
7
8
9
a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2 k3
a1.sinkgroups.g1.processor.type = load_balance
# 如果开启,则将失败的sink放入黑名单
a1.sinkgroups.g1.processor.backoff = true
# 另外还支持random
a1.sinkgroups.g1.processor.selector = round_robin
# 在黑名单放置的超时时间,超时结束时,若仍然无法接收,则超时时间呈指数增长
a1.sinkgroups.g1.processor.selector.maxTimeOut=10000

Failover Sink Processor

  • 容错又称之为故障转移 容忍错误的发生。
  • 通常用于解决单点故障 给容易出故障的地方设置备份
  • 备份越多 容错能力越强 但是资源的浪费越严重

Failover Sink Processor维护一个优先级Sink组件列表,只要有一个Sink组件可用,Event就被传递到下一个组件。故障转移机制的作用是将失败的Sink降级到一个池,在这些池中它们被分配一个冷却时间,随着故障的连续,在重试之前冷却时间增加。一旦Sink成功发送一个事件,它将恢复到活动池。 Sink具有与之相关的优先级,数量越大,优先级越高。

例如,具有优先级为100的sink在优先级为80的Sink之前被激活。如果在发送事件时汇聚失败,则接下来将尝试下一个具有最高优先级的Sink发送事件。如果没有指定优先级,则根据在配置中指定Sink的顺序来确定优先级。示例配置如下:

1
2
3
4
5
6
7
a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2 k3
a1.sinkgroups.g1.processor.type = failover
a1.sinkgroups.g1.processor.priority.k1 = 5 #优先级值, 绝对值越大表示优先级越高
a1.sinkgroups.g1.processor.priority.k2 = 7
a1.sinkgroups.g1.processor.priority.k3 = 6
a1.sinkgroups.g1.processor.maxpenalty = 20000 #失败的Sink的最大回退期(millis)
-------------本文结束感谢您的阅读-------------

本文标题:Flume入门学习笔记

文章作者:Mr.wj

发布时间:2019年12月29日 - 14:27

最后更新:2019年12月29日 - 14:41

原始链接:https://www.wjqixige.cn/2019/12/29/Flume入门学习笔记/

许可协议: 署名-非商业性使用-禁止演绎 4.0 国际 转载请保留原文链接及作者。

Mr.wj wechat
欢迎您扫一扫上面的微信公众号,订阅我的博客!