内容简介:kafka消息堆积能力比较强,可以堆积上亿的消息,特别适合日志处理这种实时性要求不太高的场景,同时支持集群部署,相比redis堆积能力和可靠性更高完整项目代码已上传github:可以通过下面的步骤快速上手这个kafka
kafka消息堆积能力比较强,可以堆积上亿的消息,特别适合日志处理这种实时性要求不太高的场景,同时支持集群部署,相比 redis 堆积能力和可靠性更高
完整项目代码已上传github:
可以通过下面的步骤快速上手这个kafka
获取一个可用的kafka实例
可以使用 docker 一键启动一个kafka集群,参考: github.com/simplesteph…
git clone https://github.com/simplesteph/kafka-stack-docker-compose.git cd kafka-stack-docker-compose docker-compose -f full-stack.yml up -d 复制代码
操作效果如下
使用命令 docker-compose -f full-stack.yml ps
获取可以kafka监听的端口
记下kafka监听的地址9092,这个后面会用到
8000端口是这个kafka的topic的ui界面,这个界面可以查看当前的topic列表,效果如下
这里也看到topic里保存的数据
准备案例项目
可以在https://start.spring.io/创建测试项目
需要加上下面这三个包
- spring-boot-starter-web
- spring-kafka
- lombok
在 appliation.properties
中配置kafka的地址和使用的group-id,这个group-id名称可以自行定义,比如:myconsumergroup
spring.kafka.bootstrap-servers=127.0.0.1:9092 spring.kafka.consumer.group-id=myconsumergroup 复制代码
用kafka客户端发送消息
使用一个spring boot的service封装kafka发送消息的代码,核心代码如下
package mykafka.service; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.stereotype.Service; @Service public class Producer { private final KafkaTemplate<String, String> kafkaTemplate; private String topic = "自行定义的topic"; Producer(KafkaTemplate<String, String> kafkaTemplate) { this.kafkaTemplate = kafkaTemplate; } public void send(String message) { this.kafkaTemplate.send(topic, message); System.out.println("Sent sample message [" + message + "] to " + topic); } } 复制代码
然后编写一个接口调用这个发送kafka消息的service,核心代码如下:
@RestController @RequestMapping("/") @RequiredArgsConstructor(onConstructor = @__(@Autowired)) public class MyController { private final Producer producer; @RequestMapping("/test1") public String test1() { producer.send(String.format("my message currentTimeMillis: %d", System.currentTimeMillis())); return "test1"; } } 复制代码
注意:上面代码里使用的kafka的topic可以自行定义,比如mytopic
然后在浏览器中访问这个接口 ip:8080/test1
可以在这个kafka的topic的ui看到发送到kafka的消息
可以看到这个消息已经发送到kafka了
消费消息
消费消息只需要在方法上加上KafkaListener,并指定topic和groupId即可
核心代码如下
@KafkaListener(topics = "mytopic", groupId = "myconsumergroup") public void processMessage(String message, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) List<Integer> partitions, @Header(KafkaHeaders.RECEIVED_TOPIC) List<String> topics, @Header(KafkaHeaders.OFFSET) List<Long> offsets) { log.info( "received message, topic: {}, partition: {}, offset: {}, message: {}", topics.get(0), partitions.get(0), offsets.get(0), message ); } 复制代码
操作效果如下:
可以看到已经成功收到了kafka里的消息
其它客户端
php发送和消费客户端参考: github.com/arnaud-lb/p…
go客户端参考: github.com/confluentin…
一些注意的点
发送消息和消费消息需要确保topic一致
日志可以先发送到kafka做缓冲,然后通过kafka的客户端把消息取出来放到elk等日志存储系统中分析和可视化
参考链接
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网
猜你喜欢:- 一次 kafka 消息堆积问题排查
- Cassandra压缩任务堆积如何处理?
- 消息队列面试连环问:如何保证消息不丢失?处理重复消息?消息有序性?消息堆积处理?
- 快速上手virtualenv
- MongoDB 简单上手
- 快速上手 Kafka
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
Effective JavaScript
David Herman / Addison-Wesley Professional / 2012-12-6 / USD 39.99
"It's uncommon to have a programming language wonk who can speak in such comfortable and friendly language as David does. His walk through the syntax and semantics of JavaScript is both charming and h......一起来看看 《Effective JavaScript》 这本书的介绍吧!