在spring boot中三分钟上手日志堆积系统kafka

栏目: Java · 发布时间: 5年前

内容简介:kafka消息堆积能力比较强,可以堆积上亿的消息,特别适合日志处理这种实时性要求不太高的场景,同时支持集群部署,相比redis堆积能力和可靠性更高完整项目代码已上传github:可以通过下面的步骤快速上手这个kafka

kafka消息堆积能力比较强,可以堆积上亿的消息,特别适合日志处理这种实时性要求不太高的场景,同时支持集群部署,相比 redis 堆积能力和可靠性更高

完整项目代码已上传github:

可以通过下面的步骤快速上手这个kafka

获取一个可用的kafka实例

可以使用 docker 一键启动一个kafka集群,参考: github.com/simplesteph…

git clone https://github.com/simplesteph/kafka-stack-docker-compose.git
cd kafka-stack-docker-compose
docker-compose -f full-stack.yml up -d
复制代码

操作效果如下

在spring boot中三分钟上手日志堆积系统kafka

使用命令 docker-compose -f full-stack.yml ps 获取可以kafka监听的端口

在spring boot中三分钟上手日志堆积系统kafka

记下kafka监听的地址9092,这个后面会用到

8000端口是这个kafka的topic的ui界面,这个界面可以查看当前的topic列表,效果如下

在spring boot中三分钟上手日志堆积系统kafka

这里也看到topic里保存的数据

准备案例项目

可以在https://start.spring.io/创建测试项目

在spring boot中三分钟上手日志堆积系统kafka

需要加上下面这三个包

  1. spring-boot-starter-web
  2. spring-kafka
  3. lombok

appliation.properties 中配置kafka的地址和使用的group-id,这个group-id名称可以自行定义,比如:myconsumergroup

spring.kafka.bootstrap-servers=127.0.0.1:9092
spring.kafka.consumer.group-id=myconsumergroup
复制代码

用kafka客户端发送消息

使用一个spring boot的service封装kafka发送消息的代码,核心代码如下

package mykafka.service;

import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;

@Service
public class Producer {

    private final KafkaTemplate<String, String> kafkaTemplate;


    private String topic = "自行定义的topic";

    Producer(KafkaTemplate<String, String> kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }

    public void send(String message) {
        this.kafkaTemplate.send(topic, message);
        System.out.println("Sent sample message [" + message + "] to " + topic);
    }

}
复制代码

然后编写一个接口调用这个发送kafka消息的service,核心代码如下:

@RestController
@RequestMapping("/")
@RequiredArgsConstructor(onConstructor = @__(@Autowired))
public class MyController {

    private final Producer producer;

    @RequestMapping("/test1")
    public String test1() {
        producer.send(String.format("my message currentTimeMillis: %d", System.currentTimeMillis()));
        return "test1";
    }
}
复制代码

注意:上面代码里使用的kafka的topic可以自行定义,比如mytopic

然后在浏览器中访问这个接口 ip:8080/test1

在spring boot中三分钟上手日志堆积系统kafka

可以在这个kafka的topic的ui看到发送到kafka的消息

在spring boot中三分钟上手日志堆积系统kafka

可以看到这个消息已经发送到kafka了

消费消息

消费消息只需要在方法上加上KafkaListener,并指定topic和groupId即可

核心代码如下

@KafkaListener(topics = "mytopic", groupId = "myconsumergroup")
public void processMessage(String message,
                           @Header(KafkaHeaders.RECEIVED_PARTITION_ID) List<Integer> partitions,
                           @Header(KafkaHeaders.RECEIVED_TOPIC) List<String> topics,
                           @Header(KafkaHeaders.OFFSET) List<Long> offsets) {
    log.info(
            "received message, topic: {}, partition: {}, offset: {}, message: {}",
            topics.get(0),
            partitions.get(0),
            offsets.get(0),
            message
    );
}
复制代码

操作效果如下:

在spring boot中三分钟上手日志堆积系统kafka

可以看到已经成功收到了kafka里的消息

其它客户端

php发送和消费客户端参考: github.com/arnaud-lb/p…

go客户端参考: github.com/confluentin…

一些注意的点

发送消息和消费消息需要确保topic一致

日志可以先发送到kafka做缓冲,然后通过kafka的客户端把消息取出来放到elk等日志存储系统中分析和可视化

参考链接

  1. www.baeldung.com/spring-kafk…
  2. www.baeldung.com/spring-inje…
  3. docs.confluent.io/current/cli…

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

Effective JavaScript

Effective JavaScript

David Herman / Addison-Wesley Professional / 2012-12-6 / USD 39.99

"It's uncommon to have a programming language wonk who can speak in such comfortable and friendly language as David does. His walk through the syntax and semantics of JavaScript is both charming and h......一起来看看 《Effective JavaScript》 这本书的介绍吧!

URL 编码/解码
URL 编码/解码

URL 编码/解码

XML、JSON 在线转换
XML、JSON 在线转换

在线XML、JSON转换工具

html转js在线工具
html转js在线工具

html转js在线工具