Kafka 最新版kafka_2.12-2.2.1配置

栏目: 后端 · 发布时间: 5年前

内容简介:当前基于kafaka最新版 kafka_2.12-2.2.1.tgz 进行配置 。官网地址:kafka的一些基础知识 参考:

当前基于kafaka最新版 kafka_2.12-2.2.1.tgz 进行配置 。

官网地址: http://kafka.apache.org/intro

kafka的一些基础知识 参考: http://www.hechunbo.com/index.php/archives/140.html

最新版 kafka_2.12-2.2.1.tgz 进行配置 。单机生产者消费者图解配配置,多机模拟配置。以及文件读写配置,经验掌握,集成zookeeper不用再安装

  1. 配置 java 环境安装jdk

参考 http://www.hechunbo.com/index.php/archives/132.html

  1. 解压kafaka

    [root@localhost hcb]# tar -zxvf kafka_2.12-2.2.1.tgz -C /usr/local
  1. 启动zookeeper .因为最新版 已经包含有zookeeper 所以不用另外安装了

    [root@localhost kafka_2.12-2.2.1]# bin/zookeeper-server-start.sh config/zookeeper.properties 
    [2019-06-22 17:47:49,667] INFO Reading configuration from: config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
  2. 重新开一个连接 。输入jps 发现多了一个进程

    [root@localhost ~]# jps
    3136 Jps
    2842 QuorumPeerMain
  3. 启动kafka

    [root@localhost kafka_2.12-2.2.1]# ./bin/kafka-server-start.sh config/server.properties 
    [2019-06-22 17:51:18,786] INFO Registered kafka:type=kafka.Log4jController MBean (kafka.utils.Log4jControllerRegistration$)
    [2019-06-22 17:51:20,624] INFO starting (kafka.server.KafkaServer)
  4. 再开一个连接 输入jps查看当前运行的进程
    发现多了一个kafka
[root@localhost ~]# jps
3504 Jps
2842 QuorumPeerMain
3147 Kafka
  1. 创建一个topic

    [root@localhost kafka_2.12-2.2.1]# bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test
    [root@localhost kafka_2.12-2.2.1]#
  2. 查看topic消息

    [root@localhost kafka_2.12-2.2.1]# bin/kafka-topics.sh --list --bootstrap-server localhost:9092
    test
  3. 发送消息 到test

    [root@localhost kafka_2.12-2.2.1]# bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
    hi,>welcome to to kafka    
    >hi ,how are you
  4. 消费者取消息

    [root@localhost kafka_2.12-2.2.1]#  bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
    hi,welcome to to kafka
    hi ,how are you
生产者发送消息以后,消费者有通知 ,

![1561197972428](https://raw.githubusercontent.com/hsapphire/picbed/master/1561197972428.png)
  1. 进行多台机子测试

    因为我们是单台机子,所以把配置文件复制两份,更改端口和id配置进行第二台,第三台的模拟

    1. [root@localhost ~]# cd /usr/local/kafka_2.12-2.2.1/
      [root@localhost kafka_2.12-2.2.1]# cp config/server.properties config/server-1.properties
      [root@localhost kafka_2.12-2.2.1]# cp config/server.properties config/server-2.properties
修改第二台机子的配置
vi config/server-1.properties
log.dirs=/tmp/kafka-logs-1
listeners=PLAINTEXT://:9093
broker.id=1
![1561198817482](https://raw.githubusercontent.com/hsapphire/picbed/master/1561198817482.png)

修改第三台机子
vi config/server-2.properties
log.dirs=/tmp/kafka-logs-2
listeners=PLAINTEXT://:9094
broker.id=2
![1561198876385](https://raw.githubusercontent.com/hsapphire/picbed/master/1561198876385.png)
  1. 启动新模拟的两台服务器

    [root@localhost kafka_2.12-2.2.1]# bin/kafka-server-start.sh config/server-1.properties 
    [2019-06-22 18:23:56,237] INFO Registered kafka:type=kafka.Log4jController MBean (kafka.utils.Log4jControllerRegistration$)
新开连接 继续启动第三台,顺便查看下当前的进程 。发现有两个kafka存在了
[root@localhost ~]# jps
4370 ConsoleProducer
2842 QuorumPeerMain
5642 Jps
3147 Kafka
4955 ConsoleConsumer
5278 Kafka
[root@localhost ~]# cd /usr/local/kafka_2.12-2.2.1/
^C[root@localhost kafka_2.12-2.2.1]# bin/kafka-server-start.sh config/server-2.properties 
[2019-06-22 18:27:31,947] INFO Registered kafka:type=kafka.Log4jController MBean (kafka.utils.Log4jControllerRegistration$)
新开一个连接 ,查看下当前进程 ,三个kafka正常启动了
[root@localhost ~]# jps
4370 ConsoleProducer
6307 Jps
2842 QuorumPeerMain
3147 Kafka
4955 ConsoleConsumer
5948 Kafka
5278 Kafka
  1. 创建一个带有备份的topic

    [root@localhost kafka_2.12-2.2.1]# bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 3 --partitions 1 --topic my-replication-topic
  2. 查看哪个borke【kafka服务器】在工作

    [root@localhost kafka_2.12-2.2.1]# bin/kafka-topics.sh --describe --bootstrap-server localhost:9092 --topic my-replication-topic
    Topic:my-replication-topic    PartitionCount:1    ReplicationFactor:3    Configs:segment.bytes=1073741824
        Topic: my-replication-topic    Partition: 0    Leader: 1    Replicas: 1,2,0    Isr: 1,2,0
leader:哪个broker在读写

replicas:当前可以正常工作的kafka集群。当leader挂掉时会自动替补

isr:同步消息的列表集合
  1. 查看我们之前创建的topic消息

    当时我们只有一个kafka服务器。可以看只leader是0,替被和备份的都是0,

    [root@localhost kafka_2.12-2.2.1]# bin/kafka-topics.sh --describe --bootstrap-server localhost:9092 --topic test
    Topic:test    PartitionCount:1    ReplicationFactor:1    Configs:segment.bytes=1073741824
        Topic: test    Partition: 0    Leader: 0    Replicas: 0    Isr: 0
  2. 在新的topic中发布新的消息

    [root@localhost kafka_2.12-2.2.1]# bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my-replication-topic
    >message one
    >message two
  3. 消费者去获取消息

    [root@localhost kafka_2.12-2.2.1]# bin/kafka-console-consumer.sh  --bootstrap-server  localhost:9092 --from-beginning --topic my-replication-topic
    message one
    message two
  4. 检查当前的leader

    [root@localhost kafka_2.12-2.2.1]# bin/kafka-topics.sh --describe --bootstrap-server localhost:9092 --topic my-replication-topic
    Topic:my-replication-topic    PartitionCount:1    ReplicationFactor:3    Configs:segment.bytes=1073741824
        Topic: my-replication-topic    Partition: 0    Leader: 1    Replicas: 1,2,0    Isr: 1,2,0
  5. 模拟leader1挂掉以后的状态

    把leader1关掉

    检查leader1的进程

    ps aux 显示用户当前的所有进程 。并根据grep后面的内容进行搜索

    用kill杀死相关进程

    [root@localhost kafka_2.12-2.2.1]# ps aux | grep server-1.properties
    root       5278  3.5 20.5 3232460 205560 pts/5  Sl+  18:23   1:06 /usr/local/jdk1.8.0_211/bin/java -Xmx1G
    [root@localhost kafka_2.12-2.2.1]# kill -9 5278
  6. 再次检查当前topic的消息

    发现leader已经从1变成了2.

    [root@localhost kafka_2.12-2.2.1]# bin/kafka-topics.sh --describe --bootstrap-server localhost:9092 --topic my-replication-topic
    Topic:my-replication-topic    PartitionCount:1    ReplicationFactor:3    Configs:segment.bytes=1073741824
        Topic: my-replication-topic    Partition: 0    Leader: 2    Replicas: 1,2,0    Isr: 2,0
  7. 使用kafka connect 导入导出数据

    souce connector 从text.txt读取文件 ,把内容发送到connect-test., sink connector 从conect-test读写消息

    [root@localhost kafka_2.12-2.2.1]# bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties  config/connect-file-sink.properties 
    [2019-06-22 19:05:55,493] INFO Kafka Connect standalone worker initializing ... (org.apache.kafka.connect.cli.ConnectStandalone:67)
进行jps分发现多了一个ConnectStandalone的进程
[root@localhost ~]# jps
4370 ConsoleProducer
9478 Jps
9160 ConnectStandalone
2842 QuorumPeerMain
3147 Kafka
4955 ConsoleConsumer
5948 Kafka
显示文件内容 

 more 命令类似 cat ,不过会以一页一页的形式显示,更方便使用者逐页阅读,
[root@localhost kafka_2.12-2.2.1]# more test.sink.txt 
foo
bar
使用消费者控制 台显示
[root@localhost kafka_2.12-2.2.1]# bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic connect-test --from-beginning
{"schema":{"type":"string","optional":false},"payload":"foo"}
{"schema":{"type":"string","optional":false},"payload":"bar"}
继续测试 

生产者进行消息追加
[root@localhost kafka_2.12-2.2.1]# echo -e "foo\nbarddddaaa\aaaaa\dddd\1\2\2\3" > test.txt
[root@localhost kafka_2.12-2.2.1]# echo -e "foo\nbarddddaaa\aaaaa\dddd\1\2\2\3\new append" > test.txt
消费者进行实时显示
[root@localhost kafka_2.12-2.2.1]# bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic connect-test --from-beginning
{"schema":{"type":"string","optional":false},"payload":"foo"}
{"schema":{"type":"string","optional":false},"payload":"bar"}
{"schema":{"type":"string","optional":false},"payload":"dddd"}
{"schema":{"type":"string","optional":false},"payload":"aaaaaaad"}
{"schema":{"type":"string","optional":false},"payload":"dd"}
^[[A^[[A^[[B{"schema":{"type":"string","optional":false},"payload":"1\\2\\2\\3"}
{"schema":{"type":"string","optional":false},"payload":"ew append"}

以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

iOS编程实战

iOS编程实战

[美] Rob Napier、[美] Mugunth Kumar / 美团移动 / 人民邮电出版社 / 2014-9 / 79.00元

本书深入介绍iOS 7新特性和新功能,涵盖iOS 7大部分新增特性,包括新的后台操作、Core Bluetooth、UIKit动力学以及TextKit。另外还介绍了如何处理新的扁平化UI,并新增了一章你可能不知道的“小技巧”。如果读者熟练掌握C和C++,读完本书即可创建性能优异的iPhone、iPad和iPod touch应用。 本书主要内容包括:  iOS 7新特性和新功能概览; ......一起来看看 《iOS编程实战》 这本书的介绍吧!

随机密码生成器
随机密码生成器

多种字符组合密码

MD5 加密
MD5 加密

MD5 加密工具

XML 在线格式化
XML 在线格式化

在线 XML 格式化压缩工具