如何在Spring Boot应用程序中使用Apache Kafka?

栏目: Java · 发布时间: 6年前

内容简介:第1步:生成我们的项目:第2步:发布/读取Kafka主题中的消息:第3步:通过application.yml配置文件配置Kafka:

第1步:生成我们的项目: Spring Initializr 来生成我们的项目。我们的项目将提供Spring MVC / Web支持和Apache Kafka支持。

第2步:发布/读取Kafka主题中的消息:

<b>public</b> <b>class</b> User {

    <b>private</b> String name;
    <b>private</b> <b>int</b> age;

    <b>public</b> User(String name, <b>int</b> age) {
        <b>this</b>.name = name;
        <b>this</b>.age = age;
    }
}

第3步:通过application.yml配置文件配置Kafka:

我们需要创建配置文件。我们需要以某种方式配置我们的Kafka生产者和消费者,以便能够发布和读取与主题相关的消息。相比建立一个使用@Configuration标注的 Java 类,我们可以直接使用配置文件application.properties或application.yml。Spring Boot让我们避免像过去一样编写的所有样板代码,同时为我们提供了更加智能的配置应用程序的方法,如下所示:

server: port: 9000
spring:
   kafka:
     consumer:
        bootstrap: localhost:9092
        group-id: group_id
        auto-offset-reset: earliest
        key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
        value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
producer:
        bootstrap: localhost:9092
        key-serializer: org.apache.kafka.common.serialization.StringSerializer
        value-serializer: org.apache.kafka.common.serialization.StringSerializer

第4步:创建一个生产者,创建生产者会将我们的消息写入该主题。

<b>public</b> <b>class</b> Producer {

    <b>private</b> <b>static</b> <b>final</b> Logger logger = LoggerFactory.getLogger(Producer.<b>class</b>);
    <b>private</b> <b>static</b> <b>final</b> String TOPIC = <font>"users"</font><font>;

    @Autowired
    <b>private</b> KafkaTemplate<String, String> kafkaTemplate;

    <b>public</b> <b>void</b> sendMessage(String message) {
        logger.info(String.format(</font><font>"#### -> Producing message -> %s"</font><font>, message));
        <b>this</b>.kafkaTemplate.send(TOPIC, message);
    }
}
</font>

自动连接autowire到 KafkaTemplate ,使用它将消息发布到主题 - 这就是消息的生产者!

第5步:创建一个消费者,消费者是负责根据您自己的业务逻辑的需求阅读处理消息的消息的服务。要进行设置,请输入以下内容:

@Service
<b>public</b> <b>class</b> Consumer {

    <b>private</b> <b>final</b> Logger logger = LoggerFactory.getLogger(Producer.<b>class</b>);

    @KafkaListener(topics = <font>"users"</font><font>, groupId = </font><font>"group_id"</font><font>)
    <b>public</b> <b>void</b> consume(String message) throws IOException {
        logger.info(String.format(</font><font>"#### -> Consumed message -> %s"</font><font>, message));
    }
}
</font>

在这里,我们告诉我们的方法void consume(String message)订阅用户的主题,并将每条消息发送到应用程序日志。在您的实际应用程序中,您可以按照业务需要的方式处理消息。

第6步:创建REST控制器,们已经拥有了能够消费Kafka消息所需的全部内容。

为了充分展示我们创建的所有内容的工作原理,我们需要创建一个具有单一端点的控制器。消息将发布到此端点,然后由我们的生产者处理。然后,我们的消费者将通过登录到控制台来捕获并处理它。

@RestController
@RequestMapping(value = <font>"/kafka"</font><font>)
<b>public</b> <b>class</b> KafkaController {

    <b>private</b> <b>final</b> Producer producer;

    @Autowired
    KafkaController(Producer producer) {
        <b>this</b>.producer = producer;
    }

    @PostMapping(value = </font><font>"/publish"</font><font>)
    <b>public</b> <b>void</b> sendMessageToKafkaTopic(@RequestParam(</font><font>"message"</font><font>) String message) {
        <b>this</b>.producer.sendMessage(message);
    }
}
</font>

让我们使用cURL将消息发送给Kafka:

curl -X POST -F 'message=test' http://localhost:9000/kafka/publish

基本上就是这样!在不到10个步骤中,您了解了将Apache Kafka添加到Spring Boot项目是多么容易。如果您遵循本指南,您现在知道如何将Kafka集成到Spring Boot项目中,并且您已准备好使用这个超级工具!


以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

周鸿祎自述

周鸿祎自述

周鸿祎 / 中信出版社 / 2014-8 / 45.00元

在很多方面,周鸿祎都是互联网领域的颠覆者。他重新定义了“微创新”,提出从细微之处着手,通过聚焦战略,以持续的创新,最终改变市场格局、为客户创造全新价值。他第一个提出了互联网免费安全的理念,也由此让奇虎360拥有了超过4亿的用户。 在《周鸿祎自述:我的互联网方法论》中,周鸿祎首次讲述了自己的互联网观、产品观和管理思想,厘清了互联网产品的本质特征和互联网时代的新趋势,列举了颠覆式创新在现实中的实......一起来看看 《周鸿祎自述》 这本书的介绍吧!

HTML 编码/解码
HTML 编码/解码

HTML 编码/解码

URL 编码/解码
URL 编码/解码

URL 编码/解码

HEX CMYK 转换工具
HEX CMYK 转换工具

HEX CMYK 互转工具