内容简介:导弹拦截,精准防御。拦截器:在不修改应用程序业务逻辑的情况下,一组基于事件的可插拔的逻辑处理链;
导弹拦截,精准防御。
背景
拦截器:在不修改应用程序业务逻辑的情况下,一组基于事件的可插拔的逻辑处理链;
类比springMVC的拦截器:
这些都是通过配置拦截器,插入到应用程序中,实现可插拔的修改业务逻辑;
kafka在0.10.0.0版本中开始引入拦截器。分为生产者拦截器和消费者拦截器,类似责任链的方式编排多个拦截器为一个大拦截器。
配置方法:配置参数
Properties props = new Properties(); List<String> interceptors = new ArrayList<>(); interceptors.add("com.yourcompany.kafkaproject.interceptors.AddTimestampInterceptor"); // 拦截器1 interceptors.add("com.yourcompany.kafkaproject.interceptors.UpdateCounterInterceptor"); // 拦截器2 props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors); ……
注意: 配置拦截器需要制定拦截器的全限定名,并且保证生产者或者消费者客户端能够正确加载到配置的拦截器;
通过拦截器实现,强制让所有的生产者,消费者配置该拦截器,实现消息审计的功能; |
生产者拦截器
拦截器需要实现org.apache.kafka.clients.producer.ProducerInterceptor
消费者拦截器
org.apache.kafka.clients.consumer.ConsumerInterceptor
实操
实现端到端的性能监控:
处理过程:
生产者代码:
public class AvgLatencyProducerInterceptor implements ProducerInterceptor<String, String> { private Jedis jedis; // 省略Jedis初始化 @Override public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) { jedis.incr("totalSentMessage"); return record; } @Override public void onAcknowledgement(RecordMetadata metadata, Exception exception) { } @Override public void close() { } @Override public void configure(Map<java.lang.String, ?> configs) { }
消费者代码:
public class AvgLatencyConsumerInterceptor implements ConsumerInterceptor<String, String> { private Jedis jedis; //省略Jedis初始化 @Override public ConsumerRecords<String, String> onConsume(ConsumerRecords<String, String> records) { long lantency = 0L; for (ConsumerRecord<String, String> record : records) { lantency += (System.currentTimeMillis() - record.timestamp()); } jedis.incrBy("totalLatency", lantency); long totalLatency = Long.parseLong(jedis.get("totalLatency")); long totalSentMsgs = Long.parseLong(jedis.get("totalSentMessage")); jedis.set("avgLatency", String.valueOf(totalLatency / totalSentMsgs)); return records; } @Override public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) { } @Override public void close() { } @Override public void configure(Map<String, ?> configs)
配置到拦截器到对应的生产者和消费者对象,即简单的实现了平均消息延时的端到端性能统计。
小结
类比AOP是Spring提供的核心功能,即面向切面编程,可以把跟业务逻辑无关的安全,审计,性能相关功能放到切面增强中实现。
对Kafka进行一些可插拔的功能增强可以通过拦截器实现。
本篇介绍了kafka的拦截器的使用方法,以及通过实例展示了具体的用法,希望对团队使用的kafka做一些增强功能的时候可以利用这个点去扩展。
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网
猜你喜欢:- 150行JavaScript代码实现增强现实
- Python+OpenCV实现增强现实!OpenCV真的牛逼!学会太受用了!
- swagger-bootstrap-ui 1.8.5 发布,Swagger增强UI实现
- swagger-bootstrap-ui 1.8.5 发布,Swagger增强UI实现
- Swagger-bootstrap-ui 1.8.6 发布,Swagger增强UI 实现
- Swagger-bootstrap-ui 1.8.6 发布,Swagger增强UI 实现
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
Agile Web Development with Rails 4
Sam Ruby、Dave Thomas、David Heinemeier Hansson / Pragmatic Bookshelf / 2013-10-11 / USD 43.95
Ruby on Rails helps you produce high-quality, beautiful-looking web applications quickly. You concentrate on creating the application, and Rails takes care of the details. Tens of thousands of deve......一起来看看 《Agile Web Development with Rails 4》 这本书的介绍吧!