通过Spring Boot Webflux实现Reactor Kafka

栏目: 后端 · 发布时间: 5年前

内容简介:在Apache Kafka简介中,我们研究了分布式流媒体平台Apache Kafka。这一次,我们将关注Reactor Kafka,这个库可以创建从Project Reactor到Kafka Topics的Reactive Streams,反之亦然。我们将使用两个小型示例应用程序,Paymentprocessor Gateway和PaymentValidator。这些应用程序的代码可以在Paymentprocessor网关提供了一个小网页,可以生成一个随机的信用卡号码(显然是伪造的),以及支付金额。当用户单

在Apache Kafka简介中,我们研究了分布式流媒体平台Apache Kafka。这一次,我们将关注Reactor Kafka,这个库可以创建从Project Reactor到Kafka Topics的Reactive Streams,反之亦然。

我们将使用两个小型示例应用程序,Paymentprocessor Gateway和PaymentValidator。这些应用程序的代码可以在 这里 找到。

Paymentprocessor网关提供了一个小网页,可以生成一个随机的信用卡号码(显然是伪造的),以及支付金额。当用户单击提交按钮时,表单将提交给网关的API。API具有针对Kafka群集上的未确认事务主题的反应流,这个未确认事务的主题的另外一边消费者是PaymentValidator,监听要验证的传入消息。然后,这些消息通过响应管道,验证方法将其打印到命令行。

通过Reactive Streams向Kafka发送消息

我们的应用程序构建在Spring 5和Spring Boot 2之上,使我们能够快速设置和使用Project Reactor。

Gateway应用程序的目标是设置从Web控制器到Kafka集群的Reactive流。这意味着我们需要特定的依赖关系来弹簧webflux和reactor-kafka。

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>

<dependency>
<groupId>io.projectreactor.kafka</groupId>
<artifactId>reactor-kafka</artifactId>
<version>1.1.0.RELEASE</version>
</dependency>

Spring Webflux RestController提供支付API,为paymentGateway类的doPayment方法创建一个Reactive流。

/ ** 
     *调用返回的Mono将被发送到Spring Webflux,后者依赖于multi-reactor 事件循环和NIO 
     *以非阻塞方式处理请求,从而实现更多的并发请求。结果将
     通过一个名为Server Sent Events 发送。
     ** /

@PostMapping(value = "/payment")
    public Mono<Void> doPayment(@RequestBody CreatePaymentCommand payment) {
    / ** 
         当调用doPayment方法时,我们发送付款信息,获得Mono <Void>作为响应。
         当我们的付款成功发送事件到Kafka主题
         ** / 
        return paymentGateway.doPayment(payment);
    }

paymentGateway需要一个kafkaProducer,它使我们能够将消息作为管道的一部分放在Kafka主题中。它可以使用KafkaSender.create方法轻松创建,传递许多生产者选项。

 public PaymentGatewayImpl() {
        final Map<String, Object> producerProps = new HashMap<>();
        producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
        producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");

        final SenderOptions<Integer, String> producerOptions = SenderOptions.create(producerProps);

        kafkaProducer = KafkaSender.create(producerOptions);
    }

创建之后,kafkaProducer可以用来轻松地将我们的消息发送到选择的Kafka主题,成为控制器中启动的管道的一部分。因为消息是以非阻塞方式发送到Kafka集群的,所以我们可以使用项目Reactor的事件循环接收并将来自Web API的大量并发消息路由到Kafka。

 @Override
    public Mono<Void> doPayment(final CreatePaymentCommand createPayment) {
        final PaymentEvent payment = new PaymentEvent(createPayment.getId(), createPayment.getCreditCardNumber(), createPayment.getAmount(), gatewayName);

        String payload = toBinary(payment);

        SenderRecord<Integer, String, Integer> message = SenderRecord.create(new ProducerRecord<>("unconfirmed-transactions", payload), 1);
        return kafkaProducer.send(Mono.just(message)).next();
    }

    private String toBinary(Object object) {
        try {
            return objectMapper.writeValueAsString(object);
        } catch (JsonProcessingException e) {
            throw new IllegalArgumentException(e);
        }
    }

从Kafka主题创建反应流

当没有消费者监听时,向主题发送消息没有多大意义,因此我们的第二个应用程序将使用一个反应管道来监听未确认的事务主题。为此,使用KafkaReceiver.create方法创建kafkaReceiver对象,类似于我们之前创建kafkaProducer的方法。

通过使用kafkaReceiver.receive方法,我们可以获得receiverRecords的Flux。进入我们读取的主题中每条消息都放入receiverRecord中。流入应用程序后,它们会进一步通过反应管道。然后,这些消息传递processEvent方法,该方法调用paymentValidator,该方法将一些信息输出到控制台。最后,在receiverOffset上调用acknowledge方法,向Kafka集群发送一条消息已被处理的确认。

   public PaymentValidatorListenerImpl(PaymentValidator paymentValidator) {
        this.paymentValidator = paymentValidator;

        final Map<String, Object> consumerProps = new HashMap<>();
        consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
        consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        consumerProps.put(ConsumerConfig.CLIENT_ID_CONFIG, "payment-validator-1");
        consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "payment-validator");
        consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");

        ReceiverOptions<Object, Object> consumerOptions = ReceiverOptions.create(consumerProps)
                .subscription(Collections.singleton("unconfirmed-transactions"))
                .addAssignListener(partitions -> log.debug("onPartitionsAssigned {}", partitions))
                .addRevokeListener(partitions -> log.debug("onPartitionsRevoked {}", partitions));

        kafkaReceiver = KafkaReceiver.create(consumerOptions);

        /**
         * We create a receiver for new unconfirmed transactions
         */<font>
        ((Flux<ReceiverRecord>) kafkaReceiver.receive())
                .doOnNext(r -> {
                    </font><font><i>/**
                     * Each unconfirmed payment we receive, we convert to a PaymentEvent and process it
                     */</i></font><font>
                    <b>final</b> PaymentEvent paymentEvent = fromBinary((String) r.value(), PaymentEvent.<b>class</b>);
                    processEvent(paymentEvent);
                    r.receiverOffset().acknowledge();
                })
                .subscribe();
    }

    <b>private</b> <b>void</b> processEvent(PaymentEvent paymentEvent) {
        paymentValidator.calculateResult(paymentEvent);
    }

    <b>private</b> <T> T fromBinary(String object, Class<T> resultType) {
        <b>try</b> {
            <b>return</b> objectMapper.readValue(object, resultType);
        } <b>catch</b> (IOException e) {
            <b>throw</b> <b>new</b> IllegalArgumentException(e);
        }
    }
</font>

可以在 此处 找到此示例的代码


以上所述就是小编给大家介绍的《通过Spring Boot Webflux实现Reactor Kafka》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

计算理论导引

计算理论导引

[美]Michael Sipser / 唐常杰、陈鹏、向勇、刘齐宏 / 机械工业出版社 / 2006-7 / 36.00元

本书是计算理论领域的经典著作,被国外多所大学选用为教材。本书以注重思路、深入引导为特色,系统地介绍计算理论的三大主要内容:自动机与语言、可计算性理论和计算复杂性理论。同时,对可计算性和计算复杂性理论中的某些高级内容作了重点讲解。全书通过启发性的问题、精彩的结果和待解决问题来引导读者挑战此领域中的高层次问题。新版的一大亮点是增加了更多习题、教辅资料和部分习题解答,更加有利于教学。 全书叙述由浅......一起来看看 《计算理论导引》 这本书的介绍吧!

HTML 压缩/解压工具
HTML 压缩/解压工具

在线压缩/解压 HTML 代码

随机密码生成器
随机密码生成器

多种字符组合密码

Base64 编码/解码
Base64 编码/解码

Base64 编码/解码