内容简介:This is the second article in the Reliable Kafka messaging series. If you haven’t seen theprevious article, make sure you read it before continuing. It’s important that you are familiar with the concepts before trying to apply them.Quick recap. We want to
This is the second article in the Reliable Kafka messaging series. If you haven’t seen theprevious article, make sure you read it before continuing. It’s important that you are familiar with the concepts before trying to apply them.
Quick recap. We want to achieve a fault-tolerant and reliable Kafka messaging based communication between components. I covered 2 main scenarios in the previous article:
- Crashing the service when a message is under processing
- The message cannot be processed due to the unavailability of an external system, e.g. database
These 2 use-cases were discussed in the context of the at-most once delivery guarantee that comes with the Kafka auto-commit feature. One was to achieve redelivery of messages when the consumer is crashed in the middle of the processing. The other one was to achieve a way of retry the message processing in case a recoverable error happens, e.g. the database is unavailable for a short amount of time.
In this article, I’d like to show an example solution for those 2 problems using Spring Boot. Within the example implementation, I take Kafka as a managed service. If your system needs to be prepared for handling cases when Kafka is down, you need to add other measures as well to have the at-least-once delivery guarantee .
Initial setup
Let’s jump right into it. I’m gonna start off with a standard Spring Boot project that I generated at start.spring.io and I added the Kafka dependecy. I’m naming the project normal-topic-consumer just for further reference in the article.
First off, we need to set up the infrastructure a.k.a. Kafka and Zookeeper. I’m using the following docker-compose.yml on Docker for Windows:
version: '3' services: zookeeper: image: wurstmeister/zookeeper ports: - "2181:2181" hostname: zookeeper kafka: image: wurstmeister/kafka command: [start-kafka.sh] ports: - "9092:9092" hostname: kafka environment: KAFKA_CREATE_TOPICS: "normal-topic:1:1" KAFKA_LISTENERS: PLAINTEXT://:9092 KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092 KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 volumes: - /var/run/docker.sock:/var/run/docker.sock depends_on: - "zookeeper"
We’ll use one topic for now, the normal-topic with single replication and single partitioning.
After executing a docker-compose up
, we have the infrastructure ready to go.
Now let’s create a consumer in the Spring Boot application. Its gonna be very simple, trust me. We’ll need one new class, I’m calling it NormalTopicConsumer
. The only thing it will do for now is to log out the read messages. One more thing here, I’m using Lombok because I’m too lazy to write out the loggers.
@Component @Slf4j public class NormalTopicConsumer { @KafkaListener(id = "normal-topic-consumer", groupId = "normal-topic-group", topics = "normal-topic") public void consume(ConsumerRecord<?, ?> consumerRecord) { String json = consumerRecord.value().toString(); log.info("Consuming normal message {}", json); } }
application.properties
:
spring.kafka.consumer.bootstrap-servers=localhost:9092
If we start up the application as well with the Kafka infra behind, we can see the messages are being consumed. Of course first we should send some messages to the topic. To do that, I’m gonna go with the easiest solution. Get a terminal into the Kafka container and use the kafka-console-producer.sh
to send messages.
As the first step, we need to find out what the container id is, you can use docker ps
for that. And then exec
into the container:
$ docker exec -it a0a7 bash
Then get into producer mode and send an example JSON message:
$ sh /opt/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic normal-topic >{"data":"test"}
On the application side, we should see the following log:
[-consumer-0-C-1] c.a.b.n.NormalTopicConsumer : Consuming normal message {"data":"test"}
Awesome, let’s continue.
Implementing manual commit
The implementation is fairly simple for manual commit mode. There is a little configuration that needs change to disable the auto-commit mode and enable the manual one. And then a little tweaking on the code.
application.properties
:
spring.kafka.consumer.bootstrap-servers=localhost:9092 spring.kafka.consumer.enable-auto-commit=false spring.kafka.listener.ack-mode=MANUAL_IMMEDIATE
The different types of ack modes are available here
. There are 2 of them which are relevant. MANUAL
and MANUAL_IMMEDIATE
. The difference is described on the mentioned page. For the sake of the article, I’m gonna go with MANUAL_IMMEDIATE
.
The code change is very simple. The only thing we need to do is to extend the parameter list of the consumer method and add an Acknowledgement
parameter there. Spring will automatically populate it. The object provides an acknowledge
method that manually commits the read offset. The MANUAL_IMMEDIATE
ack mode sets up the consumer in a way that as soon as the acknowledge
method is called, it will immediately tell the broker that the consumer has successfully processed the message.
@Component @Slf4j public class NormalTopicConsumer { @KafkaListener(id = "normal-topic-consumer", groupId = "normal-topic-group", topics = "normal-topic") public void consume(ConsumerRecord<?, ?> consumerRecord, Acknowledgment ack) { String json = consumerRecord.value().toString(); log.info("Consuming normal message {}", json); ack.acknowledge(); } }
If you start up the application in debug mode and you put a breakpoint before the acknowledge
call. In the meantime you send a message via the console producer. When the execution has suspended at the breakpoint and you kill the application, the offset is not yet committed. Starting up the application will result in the same message redelivered and reprocessed again. That’s the behavior we wanted to achieve.
Implementing the DLQ
As I already covered in the previous article, there are several ways to pass along data to the DLQ. It can be a custom message format, it can be shared via Kafka headers, etc. I’m gonna use Kafka headers here because I think its a much cleaner implementation when you don’t pollute your actual message payload with this level of detail.
In the docker-compose.yml
, I’ll add the new dlq-topic
with replication factor 1 and partition 1.
KAFKA_CREATE_TOPICS: "normal-topic:1:1,dlq-topic:1:1"
The purpose of the DLQ now is to have a place to send failed messages. The NormalTopicConsumer
class needs to be changed a little bit:
@Component @Slf4j public class NormalTopicConsumer { public static final String ORIGINAL_TOPIC_HEADER_KEY = "originalTopic"; public static final String DLQ_TOPIC = "dlq-topic"; @Autowired private KafkaTemplate<String, String> kafkaTemplate; @KafkaListener(id = "normal-topic-consumer", groupId = "normal-topic-group", topics = "normal-topic") public void consume(ConsumerRecord<?, ?> consumerRecord, Acknowledgment ack) { String json = consumerRecord.value().toString(); try { log.info("Consuming normal message {}", json); // Simulating an error case // throw new RuntimeException(); } catch (Exception e) { log.info("Message consumption failed for message {}", json); String originalTopic = consumerRecord.topic(); ProducerRecord<String, String> record = new ProducerRecord<>(DLQ_TOPIC, json); record.headers().add(ORIGINAL_TOPIC_HEADER_KEY, originalTopic.getBytes(UTF_8)); kafkaTemplate.send(record); } finally { ack.acknowledge(); } } }
There are a few things I’ve done here. One is, I’ve put the processing logic into a try clause so that any errors falling out of the logic will be caught and we can take the necessary steps to recover. Obviously for now it’s only a logging and that’s not going to fail anytime soon but that’s why I’ve put there the line to throw a new RuntimeException
. Just uncomment it and you’ll be in the same situation when the processing logic fails.
Another thing is the catch clause. I’m creating a ProducerRecord
that holds the payload and stores which topic it is supposed to be sent to ( dlq-topic
). Last but not least, adding the currently processed topic name under the originalTopic
key as a header on the message. Since the Kafka client API only accepts bytes as headers, the String value must be converted first. I know it’s not pretty but nothing we can’t live with.
And I’ve added a finally clause as well to always commit the offset, even if it failed. If the processing has successfully completed, then commit because of that. If the processing has failed, we still need to commit it after we’ve sent it to the DLQ, otherwise the consumer will receive the same message from the broker and will keep failing. That’s not what we want.
Now the other side, the DLQ. In normal circumstances it would be enough to just send the failed messages to the DLQ but since I want to use it as a way of retry, we need to have some logic there. I’m gonna use another service for that. Same deal, generated project just like the normal-topic-consumer service. I’m calling it dlq-topic-consumer .
The configuration is the same as for the normal-topic-consumer .
application.properties
:
spring.kafka.consumer.bootstrap-servers=localhost:9092 spring.kafka.consumer.enable-auto-commit=false spring.kafka.listener.ack-mode=MANUAL_IMMEDIATE
One Spring bean for the consumption logic, DlqTopicConsumer
:
@Component @Slf4j public class DlqTopicConsumer { public static final String ORIGINAL_TOPIC_HEADER_KEY = "originalTopic"; @Autowired private KafkaTemplate<String, String> kafkaTemplate; @KafkaListener(id = "dlq-topic-consumer", groupId = "dlq-topic-group", topics = "dlq-topic") public void consume(ConsumerRecord<?, ?> consumerRecord, Acknowledgment ack) { String json = consumerRecord.value().toString(); try { Header originalTopicHeader = consumerRecord.headers().lastHeader(ORIGINAL_TOPIC_HEADER_KEY); if (originalTopicHeader != null) { String originalTopic = new String(originalTopicHeader.value(), UTF_8); log.info("Consuming DLQ message {} from originalTopic {}", json, originalTopic); } else { log.error("Unable to read DLQ message because it's missing the originalTopic header"); } } catch (Exception e) { log.error("Unable to process DLQ message {}", json); } finally { ack.acknowledge(); } } }
It does a single thing. In case the message has the originalTopic
header available, it just logs out the message along with the original topic. For now this is sufficient but we’re gonna build a more sophisticated solution when we reach the retry logic.
If you start up the 2 services with the RuntimeException
in place within the normal-topic-consumer
and you send a message to the normal-topic
. The normal-topic-consumer will pick it up, the processing will fail and it will send the same message to the dlq-topic
and commit the offset. When the message is sent to the DLQ topic, the dlq-topic-consumer
service kicks in. It will pick up the message and logs it. Awesome.
Implementing the retry logic
Hope you are still with me. The last thing we want to do is the retry mechanism.
The way it’s going to work is this. The DLQ consumer will set a retryCount header on the message when it’s resending it to the original topic. If the retryCount header is not present yet in the incoming DLQ message, it will set it to zero. If it’s already there, it will read it out, increment it and set the new value to the outgoing message. On the other side, the normal topic consumer will copy the retryCount header to the DLQ message in case it’s available.
When the DLQ consumer is about to resend the message, it will check the retry count against a threshold, I’ll use 5 as a threshold here but you can use whatever value that suits you.
The code looks the following:
NormalTopicConsumer
:
@Component @Slf4j public class NormalTopicConsumer { public static final String RETRY_COUNT_HEADER_KEY = "retryCount"; public static final String ORIGINAL_TOPIC_HEADER_KEY = "originalTopic"; public static final String DLQ_TOPIC = "dlq-topic"; @Autowired private KafkaTemplate<String, String> kafkaTemplate; @KafkaListener(id = "normal-topic-consumer", groupId = "normal-topic-group", topics = "normal-topic") public void consume(ConsumerRecord<?, ?> consumerRecord, Acknowledgment ack) { String json = consumerRecord.value().toString(); try { log.info("Consuming normal message {}", json); throw new RuntimeException(); } catch (Exception e) { log.info("Message consumption failed for message {}", json); String originalTopic = consumerRecord.topic(); ProducerRecord<String, String> record = new ProducerRecord<>(DLQ_TOPIC, json); record.headers().add(ORIGINAL_TOPIC_HEADER_KEY, originalTopic.getBytes(UTF_8)); Header retryCount = consumerRecord.headers().lastHeader(RETRY_COUNT_HEADER_KEY); if (retryCount != null) { record.headers().add(retryCount); } kafkaTemplate.send(record); } finally { ack.acknowledge(); } } }
DlqTopicConsumer
:
@Component @Slf4j public class DlqTopicConsumer { public static final String RETRY_COUNT_HEADER_KEY = "retryCount"; public static final String ORIGINAL_TOPIC_HEADER_KEY = "originalTopic"; @Autowired private KafkaTemplate<String, String> kafkaTemplate; @KafkaListener(id = "dlq-topic-consumer", groupId = "dlq-topic-group", topics = "dlq-topic") public void consume(ConsumerRecord<?, ?> consumerRecord, Acknowledgment ack) { String json = consumerRecord.value().toString(); try { log.info("Consuming DLQ message {}", json); Header originalTopicHeader = consumerRecord.headers().lastHeader(ORIGINAL_TOPIC_HEADER_KEY); if (originalTopicHeader != null) { String originalTopic = new String(originalTopicHeader.value(), UTF_8); Header retryCountHeader = consumerRecord.headers().lastHeader(RETRY_COUNT_HEADER_KEY); int retryCount = 0; if (retryCountHeader != null) { retryCount = Integer.parseInt(new String(retryCountHeader.value(), UTF_8)); } if (retryCount < 5) { retryCount += 1; log.info("Resending attempt {}", retryCount); ProducerRecord<String, String> record = new ProducerRecord<>(originalTopic, json); byte[] retryCountHeaderInByte = Integer.valueOf(retryCount).toString().getBytes(UTF_8); record.headers().add(RETRY_COUNT_HEADER_KEY, retryCountHeaderInByte); kafkaTemplate.send(record); }); } else { log.error("Retry limit exceeded for message {}", json); } } else { log.error("Unable to resend DLQ message because it's missing the originalTopic header"); } } catch (Exception e) { log.error("Unable to process DLQ message {}", json); } finally { ack.acknowledge(); } } }
One thing is left to make the retry logic a little smarter, to introduce some delay into the resending. That can be achieved by using an AsyncTaskExecutor and we’ll do the message sending in another thread but sleeping it at the beginning for some time, I’m using 5 seconds delay here. The final code looks the following:
@Component @Slf4j public class DlqTopicConsumer { public static final String RETRY_COUNT_HEADER_KEY = "retryCount"; public static final String ORIGINAL_TOPIC_HEADER_KEY = "originalTopic"; @Autowired private KafkaTemplate<String, String> kafkaTemplate; @Autowired private AsyncTaskExecutor asyncTaskExecutor; @KafkaListener(id = "dlq-topic-consumer", groupId = "dlq-topic-group", topics = "dlq-topic") public void consume(ConsumerRecord<?, ?> consumerRecord, Acknowledgment ack) { String json = consumerRecord.value().toString(); try { log.info("Consuming DLQ message {}", json); Header originalTopicHeader = consumerRecord.headers().lastHeader(ORIGINAL_TOPIC_HEADER_KEY); if (originalTopicHeader != null) { String originalTopic = new String(originalTopicHeader.value(), UTF_8); Header retryCountHeader = consumerRecord.headers().lastHeader(RETRY_COUNT_HEADER_KEY); int retryCount = 0; if (retryCountHeader != null) { retryCount = Integer.parseInt(new String(retryCountHeader.value(), UTF_8)); } if (retryCount < 5) { retryCount += 1; log.info("Resending attempt {}", retryCount); ProducerRecord<String, String> record = new ProducerRecord<>(originalTopic, json); byte[] retryCountHeaderInByte = Integer.valueOf(retryCount).toString().getBytes(UTF_8); record.headers().add(RETRY_COUNT_HEADER_KEY, retryCountHeaderInByte); asyncTaskExecutor.execute(() -> { try { log.info("Waiting for 5 seconds until resend"); Thread.sleep(5000); kafkaTemplate.send(record); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } }); } else { log.error("Retry limit exceeded for message {}", json); } } else { log.error("Unable to resend DLQ message because it's missing the originalTopic header"); } } catch (Exception e) { log.error("Unable to process DLQ message {}", json); } finally { ack.acknowledge(); } } }
If you do the same test as we did before, sending the message to the normal-topic . It will send it to the DLQ topic and will try to send it 5 times with 5 seconds delay before each resend.
Conclusion
In this article, I wanted to show you how a fault-tolerant and reliable messaging can be achieved using Spring Boot. This is just covering the basics but definitely, you can start off from this to guarantee message ordering during the resends, multiple partitions, and so on.
I hope you liked it and if you did, give it a thumbs up and share it. If you are interested in more, make sure you follow me on Twitter . And as usual, the code is available on GitHub .
以上所述就是小编给大家介绍的《Fault-tolerant and reliable messaging with Kafka and Spring Boot》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!
猜你喜欢:本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
大思维:集体智慧如何改变我们的世界
杰夫·摩根 / 郭莉玲、尹玮琦、徐强 / 中信出版集团股份有限公司 / 2018-8-1 / CNY 65.00
智能时代,我们如何与机器互联,利用技术来让我们变得更聪明?为什么智能技术不会自动导致智能结果呢?线上线下群体如何协作?社会、政府或管理系统如何解决复杂的问题?本书从哲学、计算机科学和生物学等领域收集见解,揭示了如何引导组织和社会充分利用人脑和数字技术进行大规模思考,从而提高整个集体的智力水平,以解决我们时代的巨大挑战。是英国社会创新之父的洞见之作,解析企业、群体、社会如何明智决策、协作进化。一起来看看 《大思维:集体智慧如何改变我们的世界》 这本书的介绍吧!