内容简介:文章作者:原文链接:Flume日志采集框架 安装和部署 Flume运行机制 采集静态文件到hdfs 采集动态日志文件到hdfs 两个agent级联
文章作者: foochane
原文链接: https://foochane.cn/article/2019062701.html
Flume日志采集框架 安装和部署 Flume运行机制 采集静态文件到hdfs 采集动态日志文件到hdfs 两个agent级联
Flume日志采集框架
在一个完整的离线大数据处理系统中,除了hdfs+mapreduce+hive组成分析系统的核心之外,还需要数据采集、结果数据导出、任务调度等不可或缺的辅助系统,而这些辅助 工具 在hadoop生态体系中都有便捷的开源框架,如图所示:
1 Flume介绍
Flume
是一个分布式、可靠、和高可用的海量日志采集、聚合和传输的系统。 Flume
可以采集文件, socket
数据包、文件、文件夹、 kafka
等各种形式源数据,又可以将采集到的数据(下沉 sink
)输出到 HDFS
、 hbase
、 hive
、 kafka
等众多外部存储系统中。
对于一般的采集需求,通过对flume的简单配置即可实现。
Flume
针对特殊场景也具备良好的自定义扩展能力,因此, flume
可以适用于大部分的日常数据采集场景。
2 Flume运行机制
Flume
分布式系统中最核心的角色是 agent
, flume
采集系统就是由一个个 agent
所连接起来形成,每一个 agent
相当于一个数据传递员,内部有三个组件:
-
Source
:采集组件,用于跟数据源对接,以获取数据 -
Sink
:下沉组件,用于往下一级agent
传递数据或者往最终存储系统传递数据 -
Channel
:传输通道组件,用于从source
将数据传递到sink
单个agent采集数据:
多级agent之间串联:
3 Flume的安装部署
1 下载安装包 apache-flume-1.9.0-bin.tar.gz
解压
2 在 conf
文件夹下的 flume-env.sh
添加 JAVA_HOME
export JAVA_HOME=/usr/local/bigdata/java/jdk1.8.0_211
3 根据采集的需求,添加采集方案配置文件,文件名可以任意取
具体可以看后面的示例
4 启动 flume
测试环境下:
$ bin/flume/-ng agent -c conf/ -f ./dir-hdfs.conf -n agent1 -Dflume.root.logger=INFO,console
命令说明:
-
-c
:指定flume自带的配置文件目录,不用自己修改 -
-f
:指定自己的配置文件,这里问当前文件夹下的dir-hdfs.conf
-
-n
:指定自己配置文件中使用那个agent
,对应的配置文件中定义的名字。 -
-Dflume.root.logger
:把日志打印在控制台,类型为INFO
,这个只用于测试,后面将打印到日志文件中
生产中,启动flume,应该把flume启动在后台:
nohup bin/flume-ng agent -c ./conf -f ./dir-hdfs.conf -n agent1 1>/dev/null 2>&1 &
4 采集静态文件到hdfs
4.1 采集需求
某服务器的某特定目录下,会不断产生新的文件,每当有新文件出现,就需要把文件采集到HDFS中去
4.2 添加配置文件
在安装目录下添加文件 dir-hdfs.conf
,然后添加配置信息。
先获取 agent
,命名为 agent1
,后面的配置都跟在 agent1
后面,也可以改为其他值,如 agt1
,同一个配置文件中可以有多个配置配置方案,启动 agent
的时候获取对应的名字就可以。
根据需求,首先定义以下3大要素
数据源组件
即 source
——监控文件目录 : spooldir
spooldir
有如下特性:
COMPLETED
下沉组件
即 sink——HDFS
文件系统 : hdfs sink
通道组件
即 channel
——可用 file channel
也可以用内存 channel
#定义三大组件的名称 agent1.sources = source1 agent1.sinks = sink1 agent1.channels = channel1 # 配置source组件 agent1.sources.source1.type = spooldir agent1.sources.source1.spoolDir = /root/log/ agent1.sources.source1.fileSuffix=.FINISHED #文件每行的长度,注意这里如果事情文件每行超过这个长度会自动切断,会导致数据丢失 agent1.sources.source1.deserializer.maxLineLength=5120 # 配置sink组件 agent1.sinks.sink1.type = hdfs agent1.sinks.sink1.hdfs.path =hdfs://Master:9000/access_log/%y-%m-%d/%H-%M agent1.sinks.sink1.hdfs.filePrefix = app_log agent1.sinks.sink1.hdfs.fileSuffix = .log agent1.sinks.sink1.hdfs.batchSize= 100 agent1.sinks.sink1.hdfs.fileType = DataStream agent1.sinks.sink1.hdfs.writeFormat =Text # roll:滚动切换:控制写文件的切换规则 ## 按文件体积(字节)来切 agent1.sinks.sink1.hdfs.rollSize = 512000 ## 按event条数切 agent1.sinks.sink1.hdfs.rollCount = 1000000 ## 按时间间隔切换文件 agent1.sinks.sink1.hdfs.rollInterval = 60 ## 控制生成目录的规则 agent1.sinks.sink1.hdfs.round = true agent1.sinks.sink1.hdfs.roundValue = 10 agent1.sinks.sink1.hdfs.roundUnit = minute agent1.sinks.sink1.hdfs.useLocalTimeStamp = true # channel组件配置 agent1.channels.channel1.type = memory ## event条数 agent1.channels.channel1.capacity = 500000 ##flume事务控制所需要的缓存容量600条event agent1.channels.channel1.transactionCapacity = 600 # 绑定source、channel和sink之间的连接 agent1.sources.source1.channels = channel1 agent1.sinks.sink1.channel = channel1
Channel
参数解释:
-
capacity
:默认该通道中最大的可以存储的event
数量 -
trasactionCapacity
:每次最大可以从source
中拿到或者送到sink
中的event
数量 -
keep-alive
:event
添加到通道中或者移出的允许时间
4.3启动flume
$ bin/flume/-ng agent -c conf/ -f dir-hdfs.conf -n agent1 -Dflume.root.logger=INFO,console
5 采集动态日志文件到hdfs
5.1 采集需求
比如业务系统使用log4j生成的日志,日志内容不断增加,需要把追加到日志文件中的数据实时采集到hdfs
5.2 配置文件
配置文件名称: tail-hdfs.conf
根据需求,首先定义以下3大要素:
- 采集源,即
source
——监控文件内容更新 :exec
tail -F file
- 下沉目标,即
sink——HDFS
文件系统 : hdfs sink -
Source
和sink
之间的传递通道——channel
,可用file channel
也可以用 内存channel
配置文件内容:
# Name the components on this agent a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Describe/configure the source a1.sources.r1.type = exec a1.sources.r1.command = tail -F /root/app_weichat_login.log # Describe the sink agent1.sinks.sink1.type = hdfs agent1.sinks.sink1.hdfs.path =hdfs://Master:9000/app_weichat_login_log/%y-%m-%d/%H-%M agent1.sinks.sink1.hdfs.filePrefix = weichat_log agent1.sinks.sink1.hdfs.fileSuffix = .dat agent1.sinks.sink1.hdfs.batchSize= 100 agent1.sinks.sink1.hdfs.fileType = DataStream agent1.sinks.sink1.hdfs.writeFormat =Text agent1.sinks.sink1.hdfs.rollSize = 100 agent1.sinks.sink1.hdfs.rollCount = 1000000 agent1.sinks.sink1.hdfs.rollInterval = 60 agent1.sinks.sink1.hdfs.round = true agent1.sinks.sink1.hdfs.roundValue = 1 agent1.sinks.sink1.hdfs.roundUnit = minute agent1.sinks.sink1.hdfs.useLocalTimeStamp = true # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
5.3 启动flume
启动命令:
bin/flume-ng agent -c conf -f conf/tail-hdfs.conf -n a1
6 两个agent级联
从tail命令获取数据发送到avro端口
另一个节点可配置一个avro源来中继数据,发送外部存储
# Name the components on this agent a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Describe/configure the source a1.sources.r1.type = exec a1.sources.r1.command = tail -F /root/log/access.log # Describe the sink a1.sinks.k1.type = avro a1.sinks.k1.hostname = hdp-05 a1.sinks.k1.port = 4141 a1.sinks.k1.batch-size = 2 # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
从avro端口接收数据,下沉到 hdfs
采集配置文件, avro-hdfs.conf
# Name the components on this agent a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Describe/configure the source ##source中的avro组件是一个接收者服务 a1.sources.r1.type = avro a1.sources.r1.bind = hdp-05 a1.sources.r1.port = 4141 # Describe the sink a1.sinks.k1.type = hdfs a1.sinks.k1.hdfs.path = /flume/taildata/%y-%m-%d/ a1.sinks.k1.hdfs.filePrefix = tail- a1.sinks.k1.hdfs.round = true a1.sinks.k1.hdfs.roundValue = 24 a1.sinks.k1.hdfs.roundUnit = hour a1.sinks.k1.hdfs.rollInterval = 0 a1.sinks.k1.hdfs.rollSize = 0 a1.sinks.k1.hdfs.rollCount = 50 a1.sinks.k1.hdfs.batchSize = 10 a1.sinks.k1.hdfs.useLocalTimeStamp = true #生成的文件类型,默认是Sequencefile,可用DataStream,则为普通文本 a1.sinks.k1.hdfs.fileType = DataStream # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网
猜你喜欢:- Golang搭建并行版爬虫信息采集框架
- 老成 FMS 框架 5.3 发布:一键像素级仿站,搜索引擎式采集
- 一篇文章教会你理解Scrapy网络爬虫框架的工作原理和数据采集过程
- 图片采集的方向问题
- 日志采集落地方案
- Kubernetes 下日志采集、存储
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
大数据时代的IT架构设计
IT架构设计研究组 / 电子工业出版社 / 2014-4 / 49.00元
《大数据时代的IT架构设计》以大数据时代为背景,邀请著名企业中的一线架构师,结合工作中的实际案例展开与架构相关的讨论。《大数据时代的IT架构设计》作者来自互联网、教育、传统行业等领域,分享的案例极其实用,代表了该领域较先进的架构。无论你就职于哪一行业都可以从本书中找到相关的架构经验,对您在今后的架构设计工作中都能起到很好的帮助作用。 《大数据时代的IT架构设计》适合具备一定架构基础和架构经验......一起来看看 《大数据时代的IT架构设计》 这本书的介绍吧!