EFK接入kafka消息队列

栏目: 后端 · 发布时间: 5年前

内容简介:在笔者最开始维护的日志服务中,日质量较小,没有接入kafka。随着业务规模扩增,日质量不断增长,接入到日志服务的产品线不断增多,遇到流量高峰,写入到es的性能就会降低,cpu打满,随时都有集群宕机的风险。因此,接入消息队列,进行削峰填谷就迫在眉睫。本文主要介绍在EFK的基础上如何接入kafka,并做到向前兼容。主要参考文章:【由于是要线上搭建集群,为避免单点故障,就需要部署至少3个节点(取决于多数选举机制)。

1 前言

在笔者最开始维护的日志服务中,日质量较小,没有接入kafka。随着业务规模扩增,日质量不断增长,接入到日志服务的产品线不断增多,遇到流量高峰,写入到es的性能就会降低,cpu打满,随时都有集群宕机的风险。因此,接入消息队列,进行削峰填谷就迫在眉睫。本文主要介绍在EFK的基础上如何接入kafka,并做到向前兼容。

2 主要内容

  1. 如何搭建kafka集群
  2. 原有EFK升级

3 搭建kafka集群

3.1 搭建zookeeper集群

主要参考文章:【 zookeeper安装指南

由于是要线上搭建集群,为避免单点故障,就需要部署至少3个节点(取决于多数选举机制)。

3.1.1 下载

进入要下载的版本的目录, 选择.tar.gz文件下载

3.1.2 安装

使用tar解压要安装的目录即可,以3.4.5版本为例

这里以解压到/home/work/common,实际安装根据自己的想安装的目录修改(注意如果修改,那后边的命令和配置文件中的路径都要相应修改)

tar -zxf zookeeper-3.4.5.tar.gz -C /home/work/common

3.1.3 配置

在主目录下创建data和logs两个目录用于存储数据和日志:

cd /home/work/zookeeper-3.4.5
mkdir data mkdir logs

在conf目录下新建zoo.cfg文件,写入如下配置:

tickTime=2000 
dataDir=/home/work/common/zookeeper1/data dataLogDir=/home/work/common/zookeeper1/logs 
clientPort=2181 
initLimit=5 
syncLimit=2 
server.1=192.168.220.128:2888:3888 
server.2=192.168.222.128:2888:3888 
server.3=192.168.223.128:2888:3888

在zookeeper1的data/myid配置如下:

echo '1' > data/myid

zookeeper2的data/myid配置如下:

echo '2' > data/myid

zookeeper2的data/myid配置如下:

echo '3' > data/myid

3.1.4 启停

进入bin目录,启动、停止、重启分和查看当前节点状态(包括集群中是何角色)别执行:

./zkServer.sh start 
./zkServer.sh stop 
./zkServer.sh restart
./zkServer.sh status

zookeeper集群搭建完成之后,根据实际情况开始部署kafka。以部署2个broker为例。

3.2 搭建kafka broker集群

3.2.1 安装

下载并解压包:

curl -L -O http://mirrors.cnnic.cn/apache/kafka/0.9.0.0/kafka_2.10-0.9.0.0.tgz
tar zxvf kafka_2.10-0.9.0.0.tgz

3.2.2 配置

进入kafka安装工程根目录编辑config/server.properties

#不同的broker对应的id不能重复
broker.id=1
delete.topic.enable=true
inter.broker.protocol.version=0.10.0.1
log.message.format.version=0.10.0.1
listeners=PLAINTEXT://:9092,SSL://:9093
auto.create.topics.enable=false
ssl.key.password=test
ssl.keystore.location=/home/work/certificate/server-keystore.jks
ssl.keystore.password=test
ssl.truststore.location=/home/work/certificate/server-truststore.jks
ssl.truststore.password=test
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/home/work/data/kafka/log
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=72
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=192.168.220.128:2181,192.168.222.128:2181,192.168.223.128:2181
zookeeper.connection.timeout.ms=6000
group.initial.rebalance.delay.ms=0

3.2.3 启动kafka

进入kafka的主目录

nohup sh bin/kafka-server-start.sh config/server.properties > /dev/null 2>&1 &

3.2.4 连通性测试

首先创建一个topic:topic_1

sh bin/kafka-topics.sh --create --topic topic_1 --partitions 2 --replication-factor 2  --zookeeper 192.168.220.128:2181

可以先检查一下是否创建成功:

sh bin/kafka-topics.sh --list --zookeeper  192.168.220.128:2181

起两个终端,一个作为producer,一个作为consumer

生产消息:

bin/kafka-console-producer.sh --topic topic_1 --broker-list 192.168.220.128:9092,192.168.223.128:9092

消费消息:

sh bin/kafka-console-consumer.sh --bootstrap-server 192.168.220.128:9092,192.168.223.128:9092 --topic topic_1

好了,上面的调通了,万里长征第一步就走完了。

4 EFK接入kafka向前兼容

4.1 准备证书

在之前的EFK中是通过证书进行安全加固的,所以要先为接入kafka准备一下相关的证书。要确保给kafka生成的证书和给efk生成的证书是同一个根证书。关于证书的生成,笔者会写文章专门介绍。主要包括:

  • 服务端证书
  • client证书

那么作为kafka的输入(filebeat)和输出(logstash),都需要kafka的client证书,kafka的broker需要的是服务端证书。

需要注意的是,filebeat配置的是pem证书,kafka和logstash的kafka-input插件用的是jks证书~~~因此,证书生成 工具 最好需要能够同时生成这两种证书。

4.2 filebeat升级

4.2.1 input日志收集文件

在fields中添加log_topic字段,指定写入的topic

fields:
    module: sonofelice
    type: debug
    log_topic: topic_1
    language: java

4.2.2 filebeat.yml文件

output.kafka:
  hosts: ["192.168.220.128:9093","192.168.223.128:9093"]
  topic: '%{[fields.log_topic]}'
  partition.round_robin:
    reachable_only: false
  required_acks: 1
  compression: gzip
  max_message_bytes: 1000000
  ssl.certificate_authorities: ["/home/work/filebeat/keys/root-ca.pem"]
  ssl.certificate: "/home/work/filebeat/keys/kafka.crt.pem"
  ssl.key: "/home/work/filebeat/keys/kafka.key.pem"

4.3 logstash升级

input {
    kafka {
        bootstrap_servers => "10.100.27.199:9093,10.64.56.75:9093"
        group_id => "consumer-group-01"
        topics => ["topic_1"]
        consumer_threads => 5
        decorate_events => false
        auto_offset_reset => "earliest"
        security_protocol => "SSL"
        ssl_keystore_password => "test"
        ssl_keystore_location => "/home/work/certificate/kafka-keystore.jks"
        ssl_keystore_password => "test"
        ssl_truststore_password => "test"
        ssl_truststore_location => "/home/work/cvca/certificate/truststore.jks"
        codec => json {
            charset => "UTF-8"
        }
    }
}

那为了向前兼容之前的filebeat日志收集,我们在input中同时保留beats配置,最终配置如下:

input {
    kafka {
        bootstrap_servers => "10.100.27.199:9093,10.64.56.75:9093"
        group_id => "consumer-group-01"
        topics => ["topic_1"]
        consumer_threads => 5
        decorate_events => false
        auto_offset_reset => "earliest"
        security_protocol => "SSL"
        ssl_keystore_password => "test"
        ssl_keystore_location => "/home/work/certificate/kafka-keystore.jks"
        ssl_keystore_password => "test"
        ssl_truststore_password => "test"
        ssl_truststore_location => "/home/work/cvca/certificate/truststore.jks"
        codec => json {
            charset => "UTF-8"
        }
    }
    beats {
        port => 5044
        client_inactivity_timeout => 600
        ssl => true
        ssl_certificate_authorities => ["/home/work/certificate/chain-ca.pem"]
        ssl_certificate => "/home/work/certificate/server.crt.pem"
        ssl_key => "/home/work/certificate/server.key.pem"
        ssl_verify_mode => "force_peer"
    }
}

需要特别注意的是,对于kafka的input来说,codec并不是默认为json的,导致之前用beats能成功解析到es的字段都无法解析成功,所以务必加上codec的配置。

至此,改造升级的点应该没有太大的坑了,也能够向前兼容,接入端自行切换即可。


以上所述就是小编给大家介绍的《EFK接入kafka消息队列》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

有趣的二进制

有趣的二进制

[ 日] 爱甲健二 / 周自恒 / 人民邮电出版社 / 2015-10 / 39.00元

《有趣的二进制:软件安全与逆向分析》通过逆向工程,揭开人们熟知的软件背后的机器语言的秘密,并教给读者读懂这些二进制代码的方法。理解了这些方法,技术人员就能有效地Debug,防止软件受到恶意攻击和反编译。本书涵盖的技术包括:汇编与反汇编、调试与反调试、缓冲区溢出攻击与底层安全、钩子与注入、Metasploit 等安全工具。 《有趣的二进制:软件安全与逆向分析》适合对计算机原理、底层或计算机安全......一起来看看 《有趣的二进制》 这本书的介绍吧!

RGB转16进制工具
RGB转16进制工具

RGB HEX 互转工具

HTML 编码/解码
HTML 编码/解码

HTML 编码/解码

Markdown 在线编辑器
Markdown 在线编辑器

Markdown 在线编辑器