内容简介:kafka消息堆积能力比较强,可以堆积上亿的消息,特别适合日志处理这种实时性要求不太高的场景,同时支持集群部署,相比redis堆积能力和可靠性更高完整项目代码已上传github:可以通过下面的步骤快速上手这个kafka
kafka消息堆积能力比较强,可以堆积上亿的消息,特别适合日志处理这种实时性要求不太高的场景,同时支持集群部署,相比 redis 堆积能力和可靠性更高
完整项目代码已上传github:
可以通过下面的步骤快速上手这个kafka
获取一个可用的kafka实例
可以使用 docker 一键启动一个kafka集群,参考: github.com/simplesteph…
git clone https://github.com/simplesteph/kafka-stack-docker-compose.git cd kafka-stack-docker-compose docker-compose -f full-stack.yml up -d 复制代码
操作效果如下
使用命令 docker-compose -f full-stack.yml ps
获取可以kafka监听的端口
记下kafka监听的地址9092,这个后面会用到
8000端口是这个kafka的topic的ui界面,这个界面可以查看当前的topic列表,效果如下
这里也看到topic里保存的数据
准备案例项目
可以在https://start.spring.io/创建测试项目
需要加上下面这三个包
- spring-boot-starter-web
- spring-kafka
- lombok
在 appliation.properties
中配置kafka的地址和使用的group-id,这个group-id名称可以自行定义,比如:myconsumergroup
spring.kafka.bootstrap-servers=127.0.0.1:9092 spring.kafka.consumer.group-id=myconsumergroup 复制代码
用kafka客户端发送消息
使用一个spring boot的service封装kafka发送消息的代码,核心代码如下
package mykafka.service; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.stereotype.Service; @Service public class Producer { private final KafkaTemplate<String, String> kafkaTemplate; private String topic = "自行定义的topic"; Producer(KafkaTemplate<String, String> kafkaTemplate) { this.kafkaTemplate = kafkaTemplate; } public void send(String message) { this.kafkaTemplate.send(topic, message); System.out.println("Sent sample message [" + message + "] to " + topic); } } 复制代码
然后编写一个接口调用这个发送kafka消息的service,核心代码如下:
@RestController @RequestMapping("/") @RequiredArgsConstructor(onConstructor = @__(@Autowired)) public class MyController { private final Producer producer; @RequestMapping("/test1") public String test1() { producer.send(String.format("my message currentTimeMillis: %d", System.currentTimeMillis())); return "test1"; } } 复制代码
注意:上面代码里使用的kafka的topic可以自行定义,比如mytopic
然后在浏览器中访问这个接口 ip:8080/test1
可以在这个kafka的topic的ui看到发送到kafka的消息
可以看到这个消息已经发送到kafka了
消费消息
消费消息只需要在方法上加上KafkaListener,并指定topic和groupId即可
核心代码如下
@KafkaListener(topics = "mytopic", groupId = "myconsumergroup") public void processMessage(String message, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) List<Integer> partitions, @Header(KafkaHeaders.RECEIVED_TOPIC) List<String> topics, @Header(KafkaHeaders.OFFSET) List<Long> offsets) { log.info( "received message, topic: {}, partition: {}, offset: {}, message: {}", topics.get(0), partitions.get(0), offsets.get(0), message ); } 复制代码
操作效果如下:
可以看到已经成功收到了kafka里的消息
其它客户端
php发送和消费客户端参考: github.com/arnaud-lb/p…
go客户端参考: github.com/confluentin…
一些注意的点
发送消息和消费消息需要确保topic一致
日志可以先发送到kafka做缓冲,然后通过kafka的客户端把消息取出来放到elk等日志存储系统中分析和可视化
参考链接
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网
猜你喜欢:- 一次 kafka 消息堆积问题排查
- Cassandra压缩任务堆积如何处理?
- 消息队列面试连环问:如何保证消息不丢失?处理重复消息?消息有序性?消息堆积处理?
- 快速上手virtualenv
- MongoDB 简单上手
- 快速上手 Kafka
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
互联网创业核心技术:构建可伸缩的web应用
【美】Artur Ejsmont / 李智慧、何坤 / 电子工业出版社 / 2016-12 / 89
可伸缩架构技术是所有互联网技术中最重要,也是最引人入胜的技术。《互联网创业核心技术:构建可伸缩的web应用》针对互联网创业需求快速迭代,业务快速发展,短时间内用户、数据、访问量激增的特点,提纲挈领地描述了伸缩性架构的基本原理与设计原则,详细阐述了Web应用前端层、服务层、数据层的可伸缩架构,并花大量篇幅讲述了缓存技术和异步处理技术的可伸缩设计及其在Web系统中的具体应用。 《互联网创业核心技......一起来看看 《互联网创业核心技术:构建可伸缩的web应用》 这本书的介绍吧!