使用Spring Boot的Kafka消息机制实现请求响应模型

栏目: Java · 发布时间: 7年前

内容简介:Spring Apache Kafka (spring-kafka)提供了基于卡夫卡的消息传递解决方案的高级抽象。传统的请求响应模型中,响应容易被堵塞,造成两个系统耦合,调用者需要等待到响应返回才能继续做自己的工作,这在分布式系统中,流量比较大情况下几乎不现实,使用消息模型只能每次请求一个消息,响应再来一个消息,用两个消息组合成请求响应,虽然编程没有传统请求响应方便,但是系统松耦合,相互协调好。spring-kafka使用起来了很简单:引入Maven包:

Spring Apache Kafka (spring-kafka)提供了基于卡夫卡的消息传递解决方案的高级抽象。传统的请求响应模型中,响应容易被堵塞,造成两个系统耦合,调用者需要等待到响应返回才能继续做自己的工作,这在分布式系统中,流量比较大情况下几乎不现实,使用消息模型只能每次请求一个消息,响应再来一个消息,用两个消息组合成请求响应,虽然编程没有传统请求响应方便,但是系统松耦合,相互协调好。

spring-kafka使用起来了很简单:

引入Maven包:

   

    <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>

消息生产者代码:

@Autowired
KafkaTemplate<String, String> kafkaTemplate;

 <b>public</b> <b>void</b> send(@RequestParam String productId) {
        kafkaTemplate.send(<font>"cdProduct"</font><font>, productId);
  }
</font>

生产者的配置application.property:

spring.kafka.consumer.group-id=ecomm
spring.kafka.bootstrap-servers=localhost:9092

消息接受者代码,这里接受到productId以后,查询到Product对象,再给发送者发回去,模拟请求-响应模型:

@KafkaListener(topics = <font>"cdProduct"</font><font>)
    <b>public</b> <b>void</b> onAction(ConsumerRecord<?, ?> consumerRecord) {
        System.out.printf(</font><font>"接受到="</font><font> + consumerRecord);
        String productId = (String) consumerRecord.value();
        System.out.printf(</font><font>"接受到productId="</font><font> + productId);
        Product product = productRepo.findById(productId).orElse(<b>new</b> Product());
        kafkaTemplate.send(</font><font>"cdProductReply"</font><font>, product);
    }
</font>

消费者需要配置JSON序列化将Product变成JSON,这里只要配置在application.property中即可,无需做代码生成自己ProducerFactory工厂:

spring.kafka.consumer.group-id=ecomm
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer

生产者接受到消费者查询到的Product放入自己的缓存。

@KafkaListener(topics = <font>"cdProductReply"</font><font>)
    <b>public</b> <b>void</b> onAction(Product product) {
        System.out.printf(</font><font>"接受到新的Product"</font><font> + product.getName());
        cache.put(product.getId(), product);

    }
</font>

为了实现Product直接序列化接受,需要在自己的入口类Application中加入:

@Bean
<b>public</b> StringJsonMessageConverter jsonConverter() {
        <b>return</b> <b>new</b> StringJsonMessageConverter();
}

无需配置监听器连接工厂ConcurrentKafkaListenerContainerFactory即可有用。


以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

代码之美

代码之美

Grey Wilson / 聂雪军 / 机械工业出版社 / 2008年09月 / 99.00元

《代码之美》介绍了人类在一个奋斗领域中的创造性和灵活性:计算机系统的开发领域。在每章中的漂亮代码都是来自独特解决方案的发现,而这种发现是来源于作者超越既定边界的远见卓识,并且识别出被多数人忽视的需求以及找出令人叹为观止的问题解决方案。 《代码之美》33章,有38位作者,每位作者贡献一章。每位作者都将自己心目中对于“美丽的代码”的认识浓缩在一章当中,张力十足。38位大牛,每个人对代码之美都有自......一起来看看 《代码之美》 这本书的介绍吧!

随机密码生成器
随机密码生成器

多种字符组合密码

Base64 编码/解码
Base64 编码/解码

Base64 编码/解码

HSV CMYK 转换工具
HSV CMYK 转换工具

HSV CMYK互换工具