内容简介:Flume OG:Flume original generation,即Flume0.9x版本Flume NG:Flume next generation,即Flume1.x版本
1. Flume简介
- Flume是一个分布式、可靠、和高可用的海量日志采集、聚合和传输的系统
- 支持在日志系统中定制各类数据发送方,用于收集数据
- Flume提供对数据进行简单处理,并写到各种数据接收方
2. Flume OG 与Flume NG
Flume OG:Flume original generation,即Flume0.9x版本
Flume NG:Flume next generation,即Flume1.x版本
3. Flume体系结构
flume的事件(agent)
Source: 用来定义采集系统的源头
Channel: 把Source采集到的日志进行传输,处理
Sink:定义数据的目的地
4. Flume的安装
-
准备安装文件
- apache-flume-1.6.0-bin.tar.gz
-
解压
- tar -zxvf apache-flume-1.6.0-bin.tar.gz -C /opt/
-
重命名
- mv apache-flume-1.6.0-bin flume
-
添加环境变量
#配置Flume的环境变量 export FLUME_HOME=/opt/flume export PATH=$PATH:$FLUME_HOME/bin
-
配置文件
[root@uplooking01 /opt/flume/conf] mv flume-env.sh.template flume-env.sh
[root@uplooking01 /opt/flume/conf/flume-env.sh] export JAVA_HOME=/opt/jdk
5. Flume采集网络端口数据
5.1 定义flume的事件配置文件
flume-nc.properties
# flume-nc.conf: 用于监听网络数据的flume agent实例的配置文件 ############################################ # 对各个组件的描述说明 # 其中a1为agent的名字 # r1是a1的source的代号名字 # c1是a1的channel的代号名字 # k1是a1的sink的代号名字 ############################################ a1.sources = r1 a1.sinks = k1 a1.channels = c1 # 用于描述source的,类型是netcat网络 a1.sources.r1.type = netcat # source监听的网络ip地址和端口号 a1.sources.r1.bind = uplooking01 a1.sources.r1.port = 44444 # 用于描述sink,类型是日志格式 a1.sinks.k1.type = logger # 用于描述channel,在内存中做数据的临时的存储 a1.channels.c1.type = memory # 该内存中最大的存储容量,1000个events事件 a1.channels.c1.capacity = 1000 # 能够同时对100个events事件监管事务 a1.channels.c1.transactionCapacity = 100 # 将a1中的各个组件建立关联关系,将source和sink都指向了同一个channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
-
启动agent(flume事件)
flume-ng agent --name a1 --conf /opt/flume/conf --conf-file /opt/flume/conf/flume-nc.properties -Dflume.root.logger=INFO,console
-
启动agent(flume事件) ,简单化
flume-ng agent --name a1 --conf-file /opt/flume/conf/flume-nc.properties
6. 安装nc(瑞士军刀)
rpm -ivh nc-1.84-22.el6.x86_64.rpm
监听端口: nc -lk 4444
连接主机的端口: nc host port
7. 查看文件的命令
cat xxx: 查看文件的全部内容
head -100 xxx:查看文件内容开始的100行(默认不加-100是查看10行)
tail -100 xxx:查看文件内容末尾的100行(默认不加-100是查看10行)
tail -f xxx:查看文件内容末尾10行,并且监听文件的变化
8. Flume基本的组件
8.1 Source
- 从外界采集各种类型的数据,将数据传递给Channel
-
常见采集的数据类型
- NetCat Source
- Spooling Directory Source }
- Exec Source
- Kafka Source (后面说)
- HTTP Source
8.2 Channel
- 一个数据的存储池,中间通道
- 接受source传出的数据,向sink指定的目的地传输。Channel中的数据直到进入到下一个channel中或者进入终端才会被删除。当sink写入失败后,可以自动重写,不会造成数据丢失,因此很可靠
-
channel的类型 :
- Memory Channel (常用) 使用内存作为数据的存储。速度快
- File Channel 使用文件来作为数据的存储。安全可靠
- Spillable Memory Channel 用内存和文件作为数据的存储,即:先存在内存中,如果内存中数据达到阀值则flush到文件中
- JDBC Channel 使用jdbc数据源来作为数据的存储。
- Kafka Channel 使用kafka服务来作为数据的存储
8.3 Sink
- 数据的最终的目的地
-
接受channel写入的数据以指定的形式表现出来(或存储或展示)。
sink的表现形式很多比如:打印到控制台、hdfs上、avro服务中、文件中等
-
常见采集的数据类型
- HDFS Sink
- Hive Sink
- Logger Sink
- Avro Sink
- HBaseSink
- File Roll Sink
8.4 Event
event是Flume NG传输的数据的基本单位,也是事务的基本单位
在文本文件,通常是一行记录就是一个event。
网络消息传输系统中,一条消息就是一个event。
event里有header、body
Event里面的header类型:Map<String, String>
我们可以在source中自定义header的key:value,在某些channel和sink中使用header
9. Flume监听命令的执行结果
配置agent
############################################ # 对各个组件的描述说明 # 其中a1为agent的名字 # r1是a1的source的代号名字 # c1是a1的channel的代号名字 # k1是a1的sink的代号名字 ############################################ a1.sources = r1 a1.sinks = k1 a1.channels = c1 # 用于描述source的,类型是 linux 命令 a1.sources.r1.type = exec a1.sources.r1.command = tail -F /opt/flume/conf/logs/test01.txt # 用于描述sink,类型是日志格式 a1.sinks.k1.type = logger # 用于描述channel,在内存中做数据的临时的存储 a1.channels.c1.type = memory # 该内存中最大的存储容量,1000个events事件 a1.channels.c1.capacity = 1000 # 能够同时对100个events事件监管事务 a1.channels.c1.transactionCapacity = 100 # 将a1中的各个组件建立关联关系,将source和sink都指向了同一个channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
运行agent,开始采集数据
flume-ng agent --name a1 --conf-file flume-exec.properties
10. 采集目录中新增文件的数据
配置agent
############################################ # 对各个组件的描述说明 # 其中a1为agent的名字 # r1是a1的source的代号名字 # c1是a1的channel的代号名字 # k1是a1的sink的代号名字 ############################################ a1.sources = r1 a1.sinks = k1 a1.channels = c1 # 用于描述source的,监听的是一个目录 a1.sources.r1.type = spooldir a1.sources.r1.spoolDir = /opt/flume/conf/logs/aa a1.sources.r1.fileSuffix = .OK a1.sources.r1.deletePolicy = never a1.sources.r1.fileHeader = true # 用于描述sink,类型是日志格式 a1.sinks.k1.type = logger # 用于描述channel,在内存中做数据的临时的存储 a1.channels.c1.type = memory # 该内存中最大的存储容量,1000个events事件 a1.channels.c1.capacity = 1000 # 能够同时对100个events事件监管事务 a1.channels.c1.transactionCapacity = 100 # 将a1中的各个组件建立关联关系,将source和sink都指向了同一个channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
运行agent,开始采集数据
flume-ng agent --name a1 --conf-file flume-dir.properties
11. 后台运行flume
nohup flume-ng agent --name a1 --conf-file flume-dir.properties >dev/null 2>&1 &
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网
猜你喜欢:本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。