Apache Kafka和Spring Integration的使用

栏目: Java · 发布时间: 7年前

内容简介:Apache Kafka当然是最常用的JMS代理,它有Apache Hadoop等分布式系统用于数据入口。与其他JMS代理相比,Apache Kafka的关键特性(从我的观点来看):让我们看看如何将Apache Kafka与Spring Integration结合使用。我们将构建将消息生成到Kafka主题的简单演示。

Apache Kafka当然是最常用的JMS代理,它有Apache Hadoop等分布式系统用于数据入口。与其他JMS代理相比,Apache Kafka的关键特性(从我的观点来看):

  • Apache Kafka是无状态的,当您使用Kafka主题的消息时,它不会被删除。Kafka对已发布的消息有明确的保留政策。所以你的所有终端都必须是幂等的。
  • Apache Kafka是严格的发布 - 订阅 JMS代理。使用Kafka,您只能向主题发送消息。没有队列概念。
  • 在Apache Kafka中,消费者分为消费者群体。已发布的消息将分发到这些使用者组中,其中每个组中只有一个使用者获取该消息。
  • 在Apache Kafka中没有队列,但如果您只有一个消费者者组,那么您可以获得点对点消息传递的效果。

让我们看看如何将Apache Kafka与Spring Integration结合使用。我们将构建将消息生成到Kafka主题的简单演示。

卡夫卡安装和启动

您需要启动Apache ZooKeeper:

$KAFKA_HOME/bin/zookeeper-server-start.sh  $KAFKA_HOME/config/zookeeper.properties

启动Kafka:

$KAFKA_HOME/bin/kafka-server-start.sh  $KAFKA_HOME/config/server.properties

创建一个主题topic:

$KAFKA_HOME/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test_topic

列出Kafka创建的主题:

$KAFKA_HOME/bin/kafka-topics.sh --list --zookeeper localhost:2181

基于Spring Integration构建Apache Kafka生成器

我们将构建简单的Spring Integration应用程序发布消息到Apache Kafka作为程序参数输入。

首先,让我们创建Spring Integration基础架构:

<<b>int</b>:channel id=<font>"inputToKafka"</font><font>>
        <<b>int</b>:queue/>
    </<b>int</b>:channel>

    <<b>int</b>-kafka:outbound-channel-adapter
            id=</font><font>"kafkaOutboundChannelAdapter"</font><font>
            kafka-producer-context-ref=</font><font>"kafkaProducerContext"</font><font>
            channel=</font><font>"inputToKafka"</font><font>>
        <<b>int</b>:poller fixed-delay=</font><font>"1000"</font><font> time-unit=</font><font>"MILLISECONDS"</font><font> 
        receive-timeout=</font><font>"0"</font><font> task-executor=</font><font>"taskExecutor"</font><font>/>
    </<b>int</b>-kafka:outbound-channel-adapter>

    <task:executor id=</font><font>"taskExecutor"</font><font> pool-size=</font><font>"5"</font><font> 
     keep-alive=</font><font>"120"</font><font> queue-capacity=</font><font>"500"</font><font>/>
</font>

在这个xml配置,我们创建了队列通道 “inputToKafka”,我们将推送消息到这个通道。ID为“ kafkaOutboundChannelAdapter ”的Bean 是一个出站通道适配器,具有已定义的异步轮询,执行从“inputToKafka”通道读取消息并将其推送到Apache Kafka。

现在Apache Kafka生产者配置:

<bean id=<font>"kafkaStringSerializer"</font><font> 
          <b>class</b>=</font><font>"org.apache.kafka.common.serialization.StringSerializer"</font><font> />

    <<b>int</b>-kafka:producer-context id=</font><font>"kafkaProducerContext"</font><font>>
        <<b>int</b>-kafka:producer-configurations>
            <<b>int</b>-kafka:producer-configuration 
                  broker-list=</font><font>"localhost:9092"</font><font>
                  topic=</font><font>"test_topic"</font><font>
                  key-<b>class</b>-type=</font><font>"java.lang.String"</font><font>
                  value-<b>class</b>-type=</font><font>"java.lang.String"</font><font>
                  key-serializer=</font><font>"kafkaStringSerializer"</font><font>
                  value-serializer=</font><font>"kafkaStringSerializer"</font><font>
                  />
        </<b>int</b>-kafka:producer-configurations>
    </<b>int</b>-kafka:producer-context>
</font>

我们来看看生产者参数:

#Apache Kafka broker cluster. Cluster where we're going to 
be publishing messages. Let's go with <b>default</b> configuration. 

broker-list=<font>"localhost:9092"</font><font>
</font>
#Name of topic <b>for</b> publishing messages.
topic=<font>"test_topic"</font><font>
#Type of the optional key associated with the message
key-<b>class</b>-type=</font><font>"java.lang.String"</font><font>
#Type of value sent in the message.
value-<b>class</b>-type=</font><font>"java.lang.String"</font><font>
#Reference to the key serializer, all keys and values has to be serialized before sending into Apache Kafka.
key-serializer=</font><font>"kafkaStringSerializer"</font><font>
#Same and key serializer.
value-serializer=</font><font>"kafkaStringSerializer"</font><font>
</font>

启动消息流

要启动消息流,也就是消息生产者,我们将简单地创建SpringBoot CommandLineRunner,将命令行参数传递到提到的队列通道中:

@Component
@DependsOn(value=<font>"kafkaOutboundChannelAdapter"</font><font>)
<b>public</b> <b>class</b> MessageRunner implements CommandLineRunner {

    @Resource(name = </font><font>"inputToKafka"</font><font>)
    <b>private</b> MessageChannel messageChannel;

    @Override
    <b>public</b> <b>void</b> run(String... args) throws Exception {
        <b>for</b> (String arg1 : args) {
            messageChannel.send(
                    <b>new</b> GenericMessage<String>(arg1)
            );
        }
    }
}
</font>

如何测试应用程序

在maven应用程序的根目录中,运行:

mvn clean install

成功编译后,运行:

java -jar target / demo-0.0.1-SNAPSHOT.jar Test1 Test2 Test3

这将创建作为参数传递的String消息,并将它们推送到Apache Kafka代理中的test_topic中。

现在,如果您运行Apache Kafka消费者:

$KAFKA_HOME/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test_topic --from-beginning

会得到:

$ kafka-console-consumer.sh --zookeeper localhost:2181 --topic test_topic --from-beginning
Test1
Test2
Test3

太棒了,我们在Apache Kafka中创建了基于Spring Integration的简单消息生成器!


以上所述就是小编给大家介绍的《Apache Kafka和Spring Integration的使用》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

绝对价值

绝对价值

[美] 伊塔马尔·西蒙森 艾曼纽·罗森 / 钱峰 / 中国友谊出版公司 / 2014-7 / 45.00元

绝对价值指的是经用户体验的产品质量,即使用某件产品或者享受某项服务的切实感受。 过去,消费就像是押宝。一件商品好不好,一家餐馆的环境如何,没有亲身体验过消费者无从得知,只能根据营销人员提供的有限信息去猜测。品牌、原产地、价位、广告,这些重要的质量线索左右着消费者的选择。 然而,互联网和新兴科技以一种前所未有的速度改变了商业环境。当消费者可以在购买前查看到交易记录和消费者评价,通过便捷的......一起来看看 《绝对价值》 这本书的介绍吧!

HTML 编码/解码
HTML 编码/解码

HTML 编码/解码

MD5 加密
MD5 加密

MD5 加密工具

RGB CMYK 转换工具
RGB CMYK 转换工具

RGB CMYK 互转工具