内容简介:导弹拦截,精准防御。拦截器:在不修改应用程序业务逻辑的情况下,一组基于事件的可插拔的逻辑处理链;
导弹拦截,精准防御。
背景
拦截器:在不修改应用程序业务逻辑的情况下,一组基于事件的可插拔的逻辑处理链;
类比springMVC的拦截器:
这些都是通过配置拦截器,插入到应用程序中,实现可插拔的修改业务逻辑;
kafka在0.10.0.0版本中开始引入拦截器。分为生产者拦截器和消费者拦截器,类似责任链的方式编排多个拦截器为一个大拦截器。
配置方法:配置参数
Properties props = new Properties();
List<String> interceptors = new ArrayList<>();
interceptors.add("com.yourcompany.kafkaproject.interceptors.AddTimestampInterceptor"); // 拦截器1
interceptors.add("com.yourcompany.kafkaproject.interceptors.UpdateCounterInterceptor"); // 拦截器2
props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors);
……
注意: 配置拦截器需要制定拦截器的全限定名,并且保证生产者或者消费者客户端能够正确加载到配置的拦截器;
通过拦截器实现,强制让所有的生产者,消费者配置该拦截器,实现消息审计的功能; |
生产者拦截器
拦截器需要实现org.apache.kafka.clients.producer.ProducerInterceptor
消费者拦截器
org.apache.kafka.clients.consumer.ConsumerInterceptor
实操
实现端到端的性能监控:
处理过程:
生产者代码:
public class AvgLatencyProducerInterceptor implements ProducerInterceptor<String, String> {
private Jedis jedis; // 省略Jedis初始化
@Override
public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
jedis.incr("totalSentMessage");
return record;
}
@Override
public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
}
@Override
public void close() {
}
@Override
public void configure(Map<java.lang.String, ?> configs) {
}
消费者代码:
public class AvgLatencyConsumerInterceptor implements ConsumerInterceptor<String, String> {
private Jedis jedis; //省略Jedis初始化
@Override
public ConsumerRecords<String, String> onConsume(ConsumerRecords<String, String> records) {
long lantency = 0L;
for (ConsumerRecord<String, String> record : records) {
lantency += (System.currentTimeMillis() - record.timestamp());
}
jedis.incrBy("totalLatency", lantency);
long totalLatency = Long.parseLong(jedis.get("totalLatency"));
long totalSentMsgs = Long.parseLong(jedis.get("totalSentMessage"));
jedis.set("avgLatency", String.valueOf(totalLatency / totalSentMsgs));
return records;
}
@Override
public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {
}
@Override
public void close() {
}
@Override
public void configure(Map<String, ?> configs)
配置到拦截器到对应的生产者和消费者对象,即简单的实现了平均消息延时的端到端性能统计。
小结
类比AOP是Spring提供的核心功能,即面向切面编程,可以把跟业务逻辑无关的安全,审计,性能相关功能放到切面增强中实现。
对Kafka进行一些可插拔的功能增强可以通过拦截器实现。
本篇介绍了kafka的拦截器的使用方法,以及通过实例展示了具体的用法,希望对团队使用的kafka做一些增强功能的时候可以利用这个点去扩展。
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网
猜你喜欢:- 150行JavaScript代码实现增强现实
- Python+OpenCV实现增强现实!OpenCV真的牛逼!学会太受用了!
- swagger-bootstrap-ui 1.8.5 发布,Swagger增强UI实现
- swagger-bootstrap-ui 1.8.5 发布,Swagger增强UI实现
- Swagger-bootstrap-ui 1.8.6 发布,Swagger增强UI 实现
- Swagger-bootstrap-ui 1.8.6 发布,Swagger增强UI 实现
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
Fluent Python
Luciano Ramalho / O'Reilly Media / 2015-8-20 / USD 39.99
Learn how to write idiomatic, effective Python code by leveraging its best features. Python's simplicity quickly lets you become productive with it, but this often means you aren’t using everything th......一起来看看 《Fluent Python》 这本书的介绍吧!