Flume实战案例

栏目: 后端 · 发布时间: 6年前

内容简介:a1.sources = r1 r2 r3a1.sinks = k1a1.channels = c1

1. 日志的采集和汇总

1.1. 案例场景

A、B两台日志服务机器实时生产日志主要类型为access.log、nginx.log、web.log

现在要求:

把A、B 机器中的access.log、nginx.log、web.log 采集汇总到C机器上然后统一收集到hdfs中。

但是在hdfs中要求的目录为:

/source/logs/access/20160101/

/source/logs/web/20160101/**

1.2. 场景分析

Flume实战案例

1.3. 数据流程处理分析

Flume实战案例

1.4. 功能实现

①在服务器A和服务器B上

创建配置文件 exec_source_avro_sink.conf

Name the components on this agent

a1.sources = r1 r2 r3

a1.sinks = k1

a1.channels = c1

Describe/configure the source

a1.sources.r1.type = exec

a1.sources.r1.command = tail -F /root/data/access.log

a1.sources.r1.interceptors = i1

a1.sources.r1.interceptors.i1.type = static

static拦截器的功能就是往采集到的数据的header中插入自## 己定义的key-value对

a1.sources.r1.interceptors.i1.key = type

a1.sources.r1.interceptors.i1.value = access

a1.sources.r2.type = exec

a1.sources.r2.command = tail -F /root/data/nginx.log

a1.sources.r2.interceptors = i2

a1.sources.r2.interceptors.i2.type = static

a1.sources.r2.interceptors.i2.key = type

a1.sources.r2.interceptors.i2.value = nginx

a1.sources.r3.type = exec

a1.sources.r3.command = tail -F /root/data/web.log

a1.sources.r3.interceptors = i3

a1.sources.r3.interceptors.i3.type = static

a1.sources.r3.interceptors.i3.key = type

a1.sources.r3.interceptors.i3.value = web

Describe the sink

a1.sinks.k1.type = avro

a1.sinks.k1.hostname = 192.168.200.101

a1.sinks.k1.port = 41414

Use a channel which buffers events in memory

a1.channels.c1.type = memory

a1.channels.c1.capacity = 20000

a1.channels.c1.transactionCapacity = 10000

Bind the source and sink to the channel

a1.sources.r1.channels = c1

a1.sources.r2.channels = c1

a1.sources.r3.channels = c1

a1.sinks.k1.channel = c1

②在服务器C上创建配置文件 avro_source_hdfs_sink.conf 文件内容为

#定义agent名, source、channel、sink的名称

a1.sources = r1

a1.sinks = k1

a1.channels = c1

#定义source

a1.sources.r1.type = avro

a1.sources.r1.bind = mini2

a1.sources.r1.port =41414

#添加时间拦截器

a1.sources.r1.interceptors = i1

a1.sources.r1.interceptors.i1.type =

org.apache.flume.interceptor.TimestampInterceptor$Builder

#定义channels

a1.channels.c1.type = memory

a1.channels.c1.capacity = 20000

a1.channels.c1.transactionCapacity = 10000

#定义sink

a1.sinks.k1.type = hdfs

a1.sinks.k1.hdfs.path=hdfs://192.168.200.101:9000/source/logs/%{type}/%Y%m%d

a1.sinks.k1.hdfs.filePrefix =events

a1.sinks.k1.hdfs.fileType = DataStream

a1.sinks.k1.hdfs.writeFormat = Text

#时间类型

a1.sinks.k1.hdfs.useLocalTimeStamp = true

#生成的文件不按条数生成

a1.sinks.k1.hdfs.rollCount = 0

#生成的文件按时间生成

a1.sinks.k1.hdfs.rollInterval = 30

#生成的文件按大小生成

a1.sinks.k1.hdfs.rollSize = 10485760

#批量写入hdfs的个数

a1.sinks.k1.hdfs.batchSize = 10000

flume操作hdfs的线程数(包括新建,写入等)

a1.sinks.k1.hdfs.threadsPoolSize=10

#操作hdfs超时时间

a1.sinks.k1.hdfs.callTimeout=30000

#组装source、channel、sink

a1.sources.r1.channels = c1

a1.sinks.k1.channel = c1

③配置完成之后,在服务器A和B上的/root/data有数据文件access.log、nginx.log、web.log。先启动服务器C上的flume,启动命令

在flume安装目录下执行 :

bin/flume-ng agent -c conf -f conf/avro_source_hdfs_sink.conf -name a1 -Dflume.root.logger=DEBUG,console

然后在启动服务器上的A和B,启动命令

在flume安装目录下执行 :

bin/flume-ng agent -c conf -f conf/exec_source_avro_sink.conf -name a1 -Dflume.root.logger=DEBUG,console

2. Flume自定义拦截器(了解)

2.1. 案例背景介绍

Flume是Cloudera提供的一个高可用的,高可靠的,分布式的海量日志采集、聚合和传输的系统,Flume支持在日志系统中定制各类数据发送方,用于收集数据;同时,Flume提供对数据进行简单处理,并写到各种数据接受方(可定制)的能力。Flume有各种自带的拦截器,比如:TimestampInterceptor、HostInterceptor、RegexExtractorInterceptor等,通过使用不同的拦截器,实现不同的功能。但是以上的这些拦截器,不能改变原有日志数据的内容或者对日志信息添加一定的处理逻辑,当一条日志信息有几十个甚至上百个字段的时候,在传统的Flume处理下,收集到的日志还是会有对应这么多的字段,也不能对你想要的字段进行对应的处理。

2.2. 自定义拦截器

根据实际业务的需求,为了更好的满足数据在应用层的处理,通过自定义Flume拦截器,过滤掉不需要的字段,并对指定字段加密处理,将源数据进行预处理。减少了数据的传输量,降低了存储的开销。

2.3. 功能实现

本技术方案核心包括二部分:

l 编写 java 代码,自定义拦截器

内容包括:

  1. 定义一个类CustomParameterInterceptor实现Interceptor接口。
  2. 在CustomParameterInterceptor类中定义变量,这些变量是需要到 Flume的配置文件中进行配置使用的。每一行字段间的分隔符(fields_separator)、通过分隔符分隔后,所需要列字段的下标(indexs)、多个下标使用的分隔符(indexs_separator)、多个下标使用的分隔符(indexs_separator)。
  3. 添加CustomParameterInterceptor的有参构造方法。并对相应的变量进行处理。将配置文件中传过来的unicode编码进行转换为字符串。
  4. 写具体的要处理的逻辑intercept()方法,一个是单个处理的,一个是批量处理。
  5. 接口中定义了一个内部接口Builder,在configure方法中,进行一些参数配置。并给出,在flume的conf中没配置一些参数时,给出其默认值。通过其builder方法,返回一个CustomParameterInterceptor对象。
  6. 定义一个静态类,类中封装MD5加密方法
    Flume实战案例
  7. 通过以上步骤,自定义拦截器的代码开发已完成,然后打包成jar, 放到Flume的根目录下的lib中
    l 修改Flume的配置信息
    新增配置文件spool-interceptor-hdfs.conf,内容为:
    a1.channels = c1
    a1.sources = r1
    a1.sinks = s1
    #channel
    a1.channels.c1.type = memory
    a1.channels.c1.capacity=100000
    a1.channels.c1.transactionCapacity=50000
    #source
    a1.sources.r1.channels = c1
    a1.sources.r1.type = spooldir
    a1.sources.r1.spoolDir = /root/data/
    a1.sources.r1.batchSize= 50
    a1.sources.r1.inputCharset = UTF-8
    a1.sources.r1.interceptors =i1 i2
    a1.sources.r1.interceptors.i1.type =cn.itcast.interceptor.CustomParameterInterceptor$Builder
    a1.sources.r1.interceptors.i1.fields_separator=\u0009
    a1.sources.r1.interceptors.i1.indexs =0,1,3,5,6
    a1.sources.r1.interceptors.i1.indexs_separator =\u002c
    a1.sources.r1.interceptors.i1.encrypted_field_index =0
    a1.sources.r1.interceptors.i2.type = org.apache.flume.interceptor.TimestampInterceptor$Builder
    #sink
    a1.sinks.s1.channel = c1
    a1.sinks.s1.type = hdfs
    a1.sinks.s1.hdfs.path =hdfs://192.168.200.101:9000/flume/%Y%m%d
    a1.sinks.s1.hdfs.filePrefix = event
    a1.sinks.s1.hdfs.fileSuffix = .log
    a1.sinks.s1.hdfs.rollSize = 10485760
    a1.sinks.s1.hdfs.rollInterval =20
    a1.sinks.s1.hdfs.rollCount = 0
    a1.sinks.s1.hdfs.batchSize = 1500
    a1.sinks.s1.hdfs.round = true
    a1.sinks.s1.hdfs.roundUnit = minute
    a1.sinks.s1.hdfs.threadsPoolSize = 25
    a1.sinks.s1.hdfs.useLocalTimeStamp = true
    a1.sinks.s1.hdfs.minBlockReplicas = 1
    a1.sinks.s1.hdfs.fileType =DataStream
    a1.sinks.s1.hdfs.writeFormat = Text
    a1.sinks.s1.hdfs.callTimeout = 60000
    a1.sinks.s1.hdfs.idleTimeout =60
    启动:
    bin/flume-ng agent -c conf -f conf/spool-interceptor-hdfs.conf -name a1 -Dflume.root.logger=DEBUG,console

以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

Java常用算法手册

Java常用算法手册

2012-5 / 59.00元

《Java常用算法手册》分三篇,共13章,分别介绍了算法基础、算法应用和算法面试题。首先介绍了算法概述,然后重点分析了数据结构和基本算法思想;接着,详细讲解了算法在排序、查找、数学计算、数论、历史趣题、游戏、密码学等领域中的应用;最后,列举了算法的一些常见面试题。书中知识点覆盖全面,结构安排紧凑,讲解详细,实例丰富。全书对每一个知识点都给出了相应的算法及应用实例,虽然这些例子都是以Java语言来编......一起来看看 《Java常用算法手册》 这本书的介绍吧!

CSS 压缩/解压工具
CSS 压缩/解压工具

在线压缩/解压 CSS 代码

JSON 在线解析
JSON 在线解析

在线 JSON 格式化工具

正则表达式在线测试
正则表达式在线测试

正则表达式在线测试