5 分钟带你体验一把 Kafka

栏目: IT技术 · 发布时间: 4年前

内容简介:毕竟是要搭建环境和简单实用,所以文中有大量的代码和配置文件。前置条件:你的电脑已经安装 Docker。主要内容:

前言

毕竟是要搭建环境和简单实用,所以文中有大量的代码和配置文件。

前置条件:你的电脑已经安装 Docker。

主要内容:

  1. 使用 Docker 安装
  2. 使用命令行测试消息的生产和消费消息队列功能使用
  3. ZooKeeper 和 Kafka 可视化管理工具
  4. Java 程序中简单使用 Kafka

使用 Docker 安装搭建 Kafka 环境

单机版

下面使用的单机版的 Kafka 来作为演示,推荐先搭建单机版的 Kafka 来学习。

以下使用 Docker 搭建Kafka基本环境来自开源项目:

https://github.com/simplesteph ... mpose

当然,你也可以按照官方提供的来:

https://github.com/wurstmeiste ... e.yml

新建一个名为 zk-single-kafka-single.yml 的文件,文件内容如下:

version:'2.1'



services:

zoo1:

image:zookeeper:3.4.9

hostname:zoo1

ports:

  -"2181:2181"

environment:

  ZOO_MY_ID:1

  ZOO_PORT:2181

  ZOO_SERVERS:server.1=zoo1:2888:3888

volumes:

  -./zk-single-kafka-single/zoo1/data:/data

  -./zk-single-kafka-single/zoo1/datalog:/datalog



kafka1:

image:confluentinc/cp-kafka:5.3.1

hostname:kafka1

ports:

  -"9092:9092"

environment:

  KAFKA_ADVERTISED_LISTENERS:LISTENER_DOCKER_INTERNAL://kafka1:19092,LISTENER_DOCKER_EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9092

  KAFKA_LISTENER_SECURITY_PROTOCOL_MAP:LISTENER_DOCKER_INTERNAL:PLAINTEXT,LISTENER_DOCKER_EXTERNAL:PLAINTEXT

  KAFKA_INTER_BROKER_LISTENER_NAME:LISTENER_DOCKER_INTERNAL

  KAFKA_ZOOKEEPER_CONNECT:"zoo1:2181"

  KAFKA_BROKER_ID:1

  KAFKA_LOG4J_LOGGERS:"kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO"

  KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR:1



volumes:

  -./zk-single-kafka-single/kafka1/data:/var/lib/kafka/data

depends_on:

  -zoo1

运行以下命令即可完成环境搭建(会自动下载并运行一个 ZooKeeper 和 Kafka)。

docker-compose -f zk-single-kafka-single.yml up

如果需要停止 Kafka 相关容器的话,运行以下命令即可:

docker-compose -f zk-single-kafka-single.yml down

集群版

以下使用 Docker 搭建 Kafka 基本环境来自开源项目:

https://github.com/simplesteph ... mpose

新建一个名为 zk-single-kafka-multiple.yml 的文件,文件内容如下:

version: '2.1'



services:

zoo1:

image: zookeeper:3.4.9

hostname: zoo1

ports:

  - "2181:2181"

environment:

    ZOO_MY_ID: 1

    ZOO_PORT: 2181

    ZOO_SERVERS: server.1=zoo1:2888:3888

volumes:

  - ./zk-single-kafka-multiple/zoo1/data:/data

  - ./zk-single-kafka-multiple/zoo1/datalog:/datalog



kafka1:

image: confluentinc/cp-kafka:5.4.0

hostname: kafka1

ports:

  - "9092:9092"

environment:

  KAFKA_ADVERTISED_LISTENERS: LISTENER_DOCKER_INTERNAL://kafka1:19092,LISTENER_DOCKER_EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9092

  KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_DOCKER_INTERNAL:PLAINTEXT,LISTENER_DOCKER_EXTERNAL:PLAINTEXT

  KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_DOCKER_INTERNAL

  KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181"

  KAFKA_BROKER_ID: 1

  KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO"

volumes:

  - ./zk-single-kafka-multiple/kafka1/data:/var/lib/kafka/data

depends_on:

  - zoo1



kafka2:

image: confluentinc/cp-kafka:5.4.0

hostname: kafka2

ports:

  - "9093:9093"

environment:

  KAFKA_ADVERTISED_LISTENERS: LISTENER_DOCKER_INTERNAL://kafka2:19093,LISTENER_DOCKER_EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9093

  KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_DOCKER_INTERNAL:PLAINTEXT,LISTENER_DOCKER_EXTERNAL:PLAINTEXT

  KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_DOCKER_INTERNAL

  KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181"

  KAFKA_BROKER_ID: 2

  KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO"

volumes:

  - ./zk-single-kafka-multiple/kafka2/data:/var/lib/kafka/data

depends_on:

  - zoo1





kafka3:

image: confluentinc/cp-kafka:5.4.0

hostname: kafka3

ports:

  - "9094:9094"

environment:

  KAFKA_ADVERTISED_LISTENERS: LISTENER_DOCKER_INTERNAL://kafka3:19094,LISTENER_DOCKER_EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9094

  KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_DOCKER_INTERNAL:PLAINTEXT,LISTENER_DOCKER_EXTERNAL:PLAINTEXT

  KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_DOCKER_INTERNAL

  KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181"

  KAFKA_BROKER_ID: 3

  KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO"

volumes:

  - ./zk-single-kafka-multiple/kafka3/data:/var/lib/kafka/data

depends_on:

  - zoo1

运行以下命令即可完成 1 个节点 ZooKeeper + 3 个节点的 Kafka 的环境搭建。

docker-compose -f zk-single-kafka-multiple.yml up

如果需要停止 Kafka 相关容器的话,运行以下命令即可:

docker-compose -f zk-single-kafka-multiple.yml down

使用命令行测试消息的生产和消费

一般情况下我们很少会用到 Kafka 的命令行操作。

1、进入 Kafka container 内部执行 Kafka 官方自带了一些命令:

docker exec -ti docker_kafka1_1 bash

2、列出所有 Topic:

root@kafka1:/# kafka-topics --describe --zookeeper zoo1:2181

3、创建一个 Topic:

root@kafka1:/# kafka-topics --create --topic test --partitions 3 --zookeeper zoo1:2181 --replication-factor 1

Created topic test.

我们创建了一个名为 test 的 Topic,Partition 数为 3,Replica 数为 1。

4、消费者订阅主题:

root@kafka1:/# kafka-console-consumer --bootstrap-server localhost:9092 --topic test

send hello from console -producer

我们订阅了 名为 test 的 Topic。

5、生产者向 Topic 发送消息:

root@kafka1:/# kafka-console-producer --broker-list localhost:9092 --topic test

>send hello from console -producer

>

我们使用 kafka-console-producer 命令向名为 test 的 Topic 发送了一条消息,消息内容为:“send hello from console -producer”。

这个时候,你会发现消费者成功接收到了消息:

root@kafka1:/# kafka-console-consumer --bootstrap-server localhost:9092 --topic test

send hello from console -producer

IDEA 相关插件推荐

Zoolytic-Zookeeper tool

这是一款 IDEA 提供的 ZooKeeper 可视化 工具 插件,非常好用!我们可以通过它:

  1. 可视化 ZkNodes 节点信息
  2. ZkNodes 节点管理-添加/删除
  3. 编辑 zkNodes 数据
  4. ......

实际使用效果如下:

5 分钟带你体验一把 Kafka

使用方法:

  1. 打开工具:View->Tool windows->Zoolytic;
  2. 点击 “+” 号后在弹出框数据:“127.0.0.1:2181” 连接 ZooKeeper;
  3. 连接之后点击新创建的连接然后点击“+”号旁边的刷新按钮即可!

Kafkalytic

IDEA 提供的 Kafka 可视化管理插件。这个插件为我们提供了下面这写功能:

  1. 多个集群支持
  2. 主题管理:创建/删除/更改分区
  3. 使用正则表达式搜索主题
  4. 发布字符串/字节序列化的消息
  5. 使用不同的策略消费消息

实际使用效果如下:

5 分钟带你体验一把 Kafka

使用方法:

  1. 打开工具:View->Tool windows->kafkalytic;
  2. 点击 “+” 号后在弹出框数据:“127.0.0.1:9092” 连接;

Java 程序中简单使用 Kafka

代码地址: https://github.com/Snailclimb/ ... -demo

Step 1:新建一个 Maven 项目。

Step 2:pom.xml 中添加相关依赖。

<dependency>

        <groupId>org.apache.kafka</groupId>

        <artifactId>kafka-clients</artifactId>

        <version>2.2.0</version>

    </dependency>

Step 3:初始化消费者和生产者。

KafkaConstants 常量类中定义了 Kafka 一些常用配置常量。

publicclass KafkaConstants {

publicstaticfinal String BROKER_LIST = "localhost:9092";

publicstaticfinal String CLIENT_ID = "client1";

publicstatic String GROUP_ID_CONFIG="consumerGroup1";

private KafkaConstants() {



}

} 

ProducerCreator 中有一个 createProducer() 方法方法用于返回一个 KafkaProducer对象。

import org.apache.kafka.clients.producer.KafkaProducer;

import org.apache.kafka.clients.producer.Producer;

import org.apache.kafka.clients.producer.ProducerConfig;

import org.apache.kafka.common.serialization.StringSerializer;



import java.util.Properties;



/**

* @author shuang.kou

*/

publicclass ProducerCreator {





public static Producer<String, String> createProducer() {

    Properties properties = new Properties();

    properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaConstants.BROKER_LIST);

    properties.put(ProducerConfig.CLIENT_ID_CONFIG, KafkaConstants.CLIENT_ID);

    properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

    properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

    returnnew KafkaProducer<>(properties);

}

} 

ConsumerCreator 中有一个 createConsumer() 方法方法用于返回一个 KafkaConsumer 对象。

import org.apache.kafka.clients.consumer.Consumer;

import org.apache.kafka.clients.consumer.ConsumerConfig;

import org.apache.kafka.clients.consumer.KafkaConsumer;

import org.apache.kafka.common.serialization.StringDeserializer;



import java.util.Properties;



publicclass ConsumerCreator {



public static Consumer<String, String> createConsumer() {

    Properties properties = new Properties();

    properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaConstants.BROKER_LIST);

    properties.put(ConsumerConfig.GROUP_ID_CONFIG, KafkaConstants.GROUP_ID_CONFIG);

    properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

    properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

    returnnew KafkaConsumer<>(properties);

}

} 

Step 4:发送和消费消息。

生产者发送消息:

privatestaticfinal String TOPIC = "test-topic";

Producer<String, String> producer = ProducerCreator.createProducer();

ProducerRecord<String, String> record =

new ProducerRecord<>(TOPIC, "hello, Kafka!");

try {

//send message

RecordMetadata metadata = producer.send(record).get();

System.out.println("Record sent to partition " + metadata.partition()

                + " with offset " + metadata.offset());

} catch (ExecutionException | InterruptedException e) {

System.out.println("Error in sending record");

e.printStackTrace();

}

producer.close();

消费者消费消息:

Consumer<String, String> consumer = ConsumerCreator.createConsumer();

// 循环消费消息

while (true) {

//subscribe topic and consume message

consumer.subscribe(Collections.singletonList(TOPIC));



ConsumerRecords<String, String> consumerRecords =

consumer.poll(Duration.ofMillis(1000));

for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {

System.out.println("Consumer consume message:" + consumerRecord.value());

}

} 

Step 5:测试。

运行程序控制台打印出:

Record sent to partition 0 with offset 20

Consumer consume message:hello, Kafka!

原文链接: https://mp.weixin.qq.com/s/HJ1OPiXF8jbC-qdn1U4z7g


以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

Game Engine Architecture, Second Edition

Game Engine Architecture, Second Edition

Jason Gregory / A K Peters/CRC Press / 2014-8-15 / USD 69.95

A 2010 CHOICE outstanding academic title, this updated book covers the theory and practice of game engine software development. It explains practical concepts and techniques used by real game studios,......一起来看看 《Game Engine Architecture, Second Edition》 这本书的介绍吧!

在线进制转换器
在线进制转换器

各进制数互转换器

HTML 编码/解码
HTML 编码/解码

HTML 编码/解码

HSV CMYK 转换工具
HSV CMYK 转换工具

HSV CMYK互换工具