使用Spring Boot的Kafka消息机制实现请求响应模型

栏目: Java · 发布时间: 7年前

内容简介:Spring Apache Kafka (spring-kafka)提供了基于卡夫卡的消息传递解决方案的高级抽象。传统的请求响应模型中,响应容易被堵塞,造成两个系统耦合,调用者需要等待到响应返回才能继续做自己的工作,这在分布式系统中,流量比较大情况下几乎不现实,使用消息模型只能每次请求一个消息,响应再来一个消息,用两个消息组合成请求响应,虽然编程没有传统请求响应方便,但是系统松耦合,相互协调好。spring-kafka使用起来了很简单:引入Maven包:

Spring Apache Kafka (spring-kafka)提供了基于卡夫卡的消息传递解决方案的高级抽象。传统的请求响应模型中,响应容易被堵塞,造成两个系统耦合,调用者需要等待到响应返回才能继续做自己的工作,这在分布式系统中,流量比较大情况下几乎不现实,使用消息模型只能每次请求一个消息,响应再来一个消息,用两个消息组合成请求响应,虽然编程没有传统请求响应方便,但是系统松耦合,相互协调好。

spring-kafka使用起来了很简单:

引入Maven包:

   

    <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>

消息生产者代码:

@Autowired
KafkaTemplate<String, String> kafkaTemplate;

 <b>public</b> <b>void</b> send(@RequestParam String productId) {
        kafkaTemplate.send(<font>"cdProduct"</font><font>, productId);
  }
</font>

生产者的配置application.property:

spring.kafka.consumer.group-id=ecomm
spring.kafka.bootstrap-servers=localhost:9092

消息接受者代码,这里接受到productId以后,查询到Product对象,再给发送者发回去,模拟请求-响应模型:

@KafkaListener(topics = <font>"cdProduct"</font><font>)
    <b>public</b> <b>void</b> onAction(ConsumerRecord<?, ?> consumerRecord) {
        System.out.printf(</font><font>"接受到="</font><font> + consumerRecord);
        String productId = (String) consumerRecord.value();
        System.out.printf(</font><font>"接受到productId="</font><font> + productId);
        Product product = productRepo.findById(productId).orElse(<b>new</b> Product());
        kafkaTemplate.send(</font><font>"cdProductReply"</font><font>, product);
    }
</font>

消费者需要配置JSON序列化将Product变成JSON,这里只要配置在application.property中即可,无需做代码生成自己ProducerFactory工厂:

spring.kafka.consumer.group-id=ecomm
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer

生产者接受到消费者查询到的Product放入自己的缓存。

@KafkaListener(topics = <font>"cdProductReply"</font><font>)
    <b>public</b> <b>void</b> onAction(Product product) {
        System.out.printf(</font><font>"接受到新的Product"</font><font> + product.getName());
        cache.put(product.getId(), product);

    }
</font>

为了实现Product直接序列化接受,需要在自己的入口类Application中加入:

@Bean
<b>public</b> StringJsonMessageConverter jsonConverter() {
        <b>return</b> <b>new</b> StringJsonMessageConverter();
}

无需配置监听器连接工厂ConcurrentKafkaListenerContainerFactory即可有用。


以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

Web Analytics 2.0

Web Analytics 2.0

Avinash Kaushik / Sybex / 2009-10-26 / USD 39.99

The bestselling book Web Analytics: An Hour A Day was the first book in the analytics space to move beyond clickstream analysis. Web Analytics 2.0 will significantly evolve the approaches from the fir......一起来看看 《Web Analytics 2.0》 这本书的介绍吧!

JSON 在线解析
JSON 在线解析

在线 JSON 格式化工具

图片转BASE64编码
图片转BASE64编码

在线图片转Base64编码工具

XML 在线格式化
XML 在线格式化

在线 XML 格式化压缩工具