如何在Spring Boot应用程序中使用Apache Kafka?

栏目: Java · 发布时间: 7年前

内容简介:第1步:生成我们的项目:第2步:发布/读取Kafka主题中的消息:第3步:通过application.yml配置文件配置Kafka:

第1步:生成我们的项目: Spring Initializr 来生成我们的项目。我们的项目将提供Spring MVC / Web支持和Apache Kafka支持。

第2步:发布/读取Kafka主题中的消息:

<b>public</b> <b>class</b> User {

    <b>private</b> String name;
    <b>private</b> <b>int</b> age;

    <b>public</b> User(String name, <b>int</b> age) {
        <b>this</b>.name = name;
        <b>this</b>.age = age;
    }
}

第3步:通过application.yml配置文件配置Kafka:

我们需要创建配置文件。我们需要以某种方式配置我们的Kafka生产者和消费者,以便能够发布和读取与主题相关的消息。相比建立一个使用@Configuration标注的 Java 类,我们可以直接使用配置文件application.properties或application.yml。Spring Boot让我们避免像过去一样编写的所有样板代码,同时为我们提供了更加智能的配置应用程序的方法,如下所示:

server: port: 9000
spring:
   kafka:
     consumer:
        bootstrap: localhost:9092
        group-id: group_id
        auto-offset-reset: earliest
        key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
        value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
producer:
        bootstrap: localhost:9092
        key-serializer: org.apache.kafka.common.serialization.StringSerializer
        value-serializer: org.apache.kafka.common.serialization.StringSerializer

第4步:创建一个生产者,创建生产者会将我们的消息写入该主题。

<b>public</b> <b>class</b> Producer {

    <b>private</b> <b>static</b> <b>final</b> Logger logger = LoggerFactory.getLogger(Producer.<b>class</b>);
    <b>private</b> <b>static</b> <b>final</b> String TOPIC = <font>"users"</font><font>;

    @Autowired
    <b>private</b> KafkaTemplate<String, String> kafkaTemplate;

    <b>public</b> <b>void</b> sendMessage(String message) {
        logger.info(String.format(</font><font>"#### -> Producing message -> %s"</font><font>, message));
        <b>this</b>.kafkaTemplate.send(TOPIC, message);
    }
}
</font>

自动连接autowire到 KafkaTemplate ,使用它将消息发布到主题 - 这就是消息的生产者!

第5步:创建一个消费者,消费者是负责根据您自己的业务逻辑的需求阅读处理消息的消息的服务。要进行设置,请输入以下内容:

@Service
<b>public</b> <b>class</b> Consumer {

    <b>private</b> <b>final</b> Logger logger = LoggerFactory.getLogger(Producer.<b>class</b>);

    @KafkaListener(topics = <font>"users"</font><font>, groupId = </font><font>"group_id"</font><font>)
    <b>public</b> <b>void</b> consume(String message) throws IOException {
        logger.info(String.format(</font><font>"#### -> Consumed message -> %s"</font><font>, message));
    }
}
</font>

在这里,我们告诉我们的方法void consume(String message)订阅用户的主题,并将每条消息发送到应用程序日志。在您的实际应用程序中,您可以按照业务需要的方式处理消息。

第6步:创建REST控制器,们已经拥有了能够消费Kafka消息所需的全部内容。

为了充分展示我们创建的所有内容的工作原理,我们需要创建一个具有单一端点的控制器。消息将发布到此端点,然后由我们的生产者处理。然后,我们的消费者将通过登录到控制台来捕获并处理它。

@RestController
@RequestMapping(value = <font>"/kafka"</font><font>)
<b>public</b> <b>class</b> KafkaController {

    <b>private</b> <b>final</b> Producer producer;

    @Autowired
    KafkaController(Producer producer) {
        <b>this</b>.producer = producer;
    }

    @PostMapping(value = </font><font>"/publish"</font><font>)
    <b>public</b> <b>void</b> sendMessageToKafkaTopic(@RequestParam(</font><font>"message"</font><font>) String message) {
        <b>this</b>.producer.sendMessage(message);
    }
}
</font>

让我们使用cURL将消息发送给Kafka:

curl -X POST -F 'message=test' http://localhost:9000/kafka/publish

基本上就是这样!在不到10个步骤中,您了解了将Apache Kafka添加到Spring Boot项目是多么容易。如果您遵循本指南,您现在知道如何将Kafka集成到Spring Boot项目中,并且您已准备好使用这个超级工具!


以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

The Everything Store

The Everything Store

Brad Stone / Little, Brown and Company / 2013-10-22 / USD 28.00

The definitive story of Amazon.com, one of the most successful companies in the world, and of its driven, brilliant founder, Jeff Bezos. Amazon.com started off delivering books through the mail. Bu......一起来看看 《The Everything Store》 这本书的介绍吧!

JSON 在线解析
JSON 在线解析

在线 JSON 格式化工具

Base64 编码/解码
Base64 编码/解码

Base64 编码/解码

SHA 加密
SHA 加密

SHA 加密工具