内容简介:官方参考文章:这里下载了zookeeper和kafaka两个安装包,下载地址:
官方参考文章:
1、下载和解压安装包
这里下载了zookeeper和kafaka两个安装包,下载地址:
zookeeper: www.apache.org/dyn/closer.…
kafka: kafka.apache.org/downloads
2、启动Zookeeper服务
这里的kafka默认是由内置的zookeeper的,如果使用内置的zookeeper的话,启动的方式如下:
zookeeper的配置文件是在:/kafka_2.12-0.11.0.0/config 目录下
启动Zookeeper:
>bin/zookeeper-server-start.sh config/zookeeper.properties 复制代码
当看到如下信息的时候,就表示成功了!
3、启动Kafka
kafka的配置文件是在/kafka_2.12-0.11.0.0/config 目录下,默认情况下不需要修改。
>bin/kafka-server-start.sh config/server.properties 复制代码
4、创建一个Topic
> bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic testTopic 复制代码
--replication-factor 复制因子为1; --partitions 分区为1;
查看已创建的Topic:
5、发送测试消息
kafka支持从Console发送信息,消费者从Console接受信息。
>bin/kafka-console-producer.sh --broker-list localhost:9092 --topic testTopic 复制代码
--broker-list 表示代理服务器的列表,这里只有一个;
创建一个消费者:
>bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic testTopic --from-beginning 复制代码
--from-beginning 表示从消息开始处读取;
然后在生产者的Console输入数据,消费者的Console就可以看到信息:
二、伪集群环境搭建
官方提供了一种方式在一台机器上启动多个Broker机器构成multi-broker cluster,这是一种伪集群的方式,下边就配置一下。
1、修改配置文件
思路是配置多个config/server.properties文件,修改其中的 broker.id=1
和端口号,日志文件位置。
> cp config/server.properties config/server-1.properties > cp config/server.properties config/server-2.properties 复制代码
编辑配置文件,修改如下对应的位置:
config/server-1.properties: broker.id=1 listeners=PLAINTEXT://:9093 log.dir=/tmp/kafka-logs-1 config/server-2.properties: broker.id=2 listeners=PLAINTEXT://:9094 log.dir=/tmp/kafka-logs-2 复制代码
2、分别启动另外两个Kafka
>bin/kafka-server-start.sh config/server-1.properties & >>bin/kafka-server-start.sh config/server-2.properties & 复制代码
&表示在后台运行
3、查看运行结果:
QuorumPeerMain表示Zookeeper进行; 另外有3个Kafka进程;
4、创建Topic
新建一个复制因子为3的Topic
>bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic 复制代码
查看Topic的描述信息:
>bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic 复制代码
5、发送消息
启动生产者,这里有3个Kafka实例,但是--broker-list 仍是启动的Zookeeper服务。
>bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my-replicated-topic 复制代码
启动消费者:
>bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic my-replicated-topic 复制代码
和单机的情况是一样的。
三、分布式集群环境搭建
搭建的分布式集群和伪集群的方式大致相同,这里假设使用3台服务器模拟实验,部署3个Zookeeper实例和3个Kafka实例,当然也可以直接部署一个Zookeeper实例,这里只是演示分布式Zookeeper和kafka的搭建。
工具使用的是SecureCRT。
1、分布式Zookeeper的搭建
(1)将Zookeeper安装包分别上传到3台服务器,我的是放在: /home/xuliugen/server
目录下。
(2)配置第一台Zookeeper
复制 zookeeper-3.4.10/conf/zoo_sample.cfg
为 zookeeper-3.4.10/conf/zoo.cfg
,修改zoo.cfg文件如下,只更改data的目录:
因为,修改了dataDir目录的位置,那么就需要创建一个 /zookeeper-3.4.6/data
目录。
(3)按同样的方式修改第二台Zookeeper和第三台Zookeeper服务器配置。
(4)然后,在每一台Zookeeper的配置文件中的最下边添加Zookeeper的集群配置:
(5)最后创建每一个Zookeeper的 myid
文件,在 /data/myid
文件
xuliugen@xuliugen-pc:~/server/zookeeper-3.4.6/data$ echo 1 > myid 复制代码
则,另外两台分别为:
xuliugen@xuliugen-pc:~/server/zookeeper-3.4.6/data$ echo 2 > myid 复制代码
xuliugen@xuliugen-pc:~/server/zookeeper-3.4.6/data$ echo 3 > myid 复制代码
注意:
1、myid和IP地址的对应
server.1= server.2= server.3= 复制代码
这里的1、2、3是和我们刚才配置的myid的数值是相对应的,即1的IP地址为192.168.1.120,那么server.1=192.168.1.120:2888:3888
2、防火墙端口的配置
另外,2888:3888端口要设置防火墙权限
2、启动Zookeeper服务器
依此使用命令 ./bin/zkServer.sh start
启动Zookeeper服务。
使用 jps
查看是否已经启动
查看zookeeper日志的话,是在 /zookeeper-3.4.6/bin
目录下的 zookeeper.out
文件:
使用 tailf zookeeper.out
可以进行查看。
3、分布式Kafka的搭建
(1)将Kafka安装包分别上传到3台服务器,我的是放在: /home/xuliugen/server
目录下。
(2)配置第一台Kafka
Kafka的配置文件是在 /conf/server.properties
,修改日志的目录:
配置主机IP或者hostname:
然后修改kafka中使用的Zookeeper集群地址:
多个Zookeeper之间以英文逗号分开。
注意:
这里需要注意的是,如果按照上述的方式配置:
listeners=PLAINTEXT://192.168.1.120:9092 复制代码
这样配置的话,是在内网环境下允许的,如果使用外网进行访问的话,可以配置为如下:
具体请参考: blog.csdn.net/fengcai19/a…
(3)按同样的方式配置第二台kafka和第三台kafka服务器。
要注意的是不同的kafka的 broker.id
一定要不一样,我这里分别配置的是0、1、2。
4、分别启动Kafka服务
>bin/kafka-server-start.sh config/server.properties 复制代码
四、代码测试
1、项目结构
2、pom文件内容
<dependencies> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>0.11.0.0</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>1.7.12</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> <version>1.7.12</version> </dependency> </dependencies> 复制代码
3、日志配置log4j.properties
log4j.rootLogger=DEBUG,rolling,errlog,stdout #stdout log log4j.appender.stdout=org.apache.log4j.ConsoleAppender log4j.appender.stdout.layout=org.apache.log4j.PatternLayout log4j.appender.stdout.layout.ConversionPattern=%d{HH:mm:ss} [%-5p] %c{1}.%M:%L-[%X{traceId}]-%m%n #common log log4j.appender.rolling=org.apache.log4j.DailyRollingFileAppender log4j.appender.rolling.File=${catalina.base}/logs/kafka-demo.log log4j.appender.rolling.DatePattern='.'yyyy-MM-dd-HH log4j.appender.rolling.layout=org.apache.log4j.PatternLayout log4j.appender.rolling.layout.ConversionPattern=%d{HH:mm:ss.SSS} [%-5p] %-20c{1} [%t]%x [%X{traceId}]-%m%n #error log log4j.appender.errlog=org.apache.log4j.DailyRollingFileAppender log4j.appender.errlog.Threshold=ERROR log4j.appender.errlog.File=${catalina.base}/logs/error.log log4j.appender.errlog.DatePattern='.'yyyy-MM-dd-HH log4j.appender.errlog.layout=org.apache.log4j.PatternLayout log4j.appender.errlog.layout.ConversionPattern=%d{MM-dd HH:mm:ss.SSS} [%-5p] %-20c{1} [%.11t] [%X{traceId}]%x-%m%n 复制代码
3、生产者
import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import java.util.Properties; public class ProducerDemo { // Topic private static final String topic = "kafkaTopic"; public static void main(String[] args) throws Exception { Properties props = new Properties(); props.put("bootstrap.servers", "192.168.1.120:9092,192.168.1.135:9093,192.168.1.227:9094"); props.put("acks", "0"); props.put("group.id", "1111"); props.put("retries", "0"); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); //生产者实例 KafkaProducer producer = new KafkaProducer(props); int i = 1; // 发送业务消息 // 读取文件 读取内存数据库 读socket端口 while (true) { Thread.sleep(1000); producer.send(new ProducerRecord<String, String>(topic, "key:" + i, "value:" + i)); System.out.println("key:" + i + " " + "value:" + i); i++; } } } 复制代码
4、消费者
import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.Arrays; import java.util.Properties; public class ConsumerDemo { private static final Logger logger = LoggerFactory.getLogger(ConsumerDemo.class); private static final String topic = "kafkaTopic"; public static void main(String[] args) { Properties props = new Properties(); props.put("bootstrap.servers", "192.168.1.120:9092,192.168.1.135:9093,192.168.1.227:9094"); props.put("group.id", "1111"); props.put("enable.auto.commit", "true"); props.put("auto.commit.interval.ms", "1000"); props.put("auto.offset.reset", "earliest"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props); consumer.subscribe(Arrays.asList(topic)); while (true) { ConsumerRecords<String, String> records = consumer.poll(1000); for (ConsumerRecord<String, String> record : records) { System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); } } } } 复制代码
5、测试结果
生产者:
消费者:
代码下载地址: download.csdn.net/download/u0…
也可以到官网下载Kafka的源代码包,包里边有example代码可以参考
搜索或扫描下述二维码关注微信公众号: Java后端技术(ID: JavaITWork),和20万人一起学Java!
Java后端技术专注 Java 相关技术:SSM、Spring全家桶、微服务、 MySQL 、MyCat、集群、分布式、中间件、 Linux 、网络、多线程,偶尔讲点运维Jenkins、Nexus、 Docker 、ELK,偶尔分享些技术干货,致力于Java全栈开发!
以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网
猜你喜欢:- Hadoop分布式集群的搭建
- 在家搭建大数据分布式计算环境
- Seata 搭建与分布式事务入门
- FastDFS+Nginx搭建分布式图片存储
- 使用Docker Swarm搭建分布式爬虫集群
- Docker来搭建分布式文件系统FastDFS
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
如何不在网上虚度人生
[美] 肯尼思·戈德史密斯 / 刘畅 / 北京联合出版公司 / 2017-9 / 39.80元
我们平时上网多大程度上是浪费时间,多大程度是在学习、关心社会、激发创造力?我们真能彻底断网,逃离社交网络吗? 手机把都市人变成一群电子僵尸,是福是祸? 浏览记录就是我们将来的回忆录吗?文件归档属于一种现代民间艺术? 不自拍、P图、发朋友圈,我还是我吗? 美国知名概念艺术家戈德史密斯认为:上网绝不是浪费时间,而是一种创造性的活动。在本书中他以跨学科角度、散文式语言进行论证,涉及大众传播学、计算......一起来看看 《如何不在网上虚度人生》 这本书的介绍吧!