内容简介:以下是 Producer 事务使用示例:
作者 | 来自网络
整理 | 纯粹技术分享
这篇文章主要讲述 Kafka 事务性相关原理,从 Kafka EOS 语义、幂等性、事务性等几个方面阐述 。
Kafka EOS 语义
EOS(Exactly Once Semantics,精确一次处理语义)是从 Kafka 0.11.0.0 版本开始支持的,之前版本中只支持 At Least Once 和 At Most Once 语义,并不支持 Exactly Once 语义 。
因为在很多要求严格的场景下,如使用 Kafka 处理交易数据,Exactly Once 语义是必须的。我们可以通过让下游系统具有幂等性来配合 Kafka 的 At Least Once 语义来间接实现 Exactly Once 语义。但是也存在一些问题:
-
该方案要求下游系统支持幂等操作,限制了 Kafka 的适用场景
-
实现门槛相对较高,需要用户对 Kafka 的工作机制非常了解
-
对于 Kafka Stream 而言,Kafka 本身即是自己的下游系统,但 Kafka 在 0.11.0.0 版本之前不具有幂等发送能力
因此,Kafka 本身对Exactly Once语义的支持就非常必要。
Kafka 幂等性
在说 Kafka 的事务之前,先要说一下 Kafka 中幂等( Idempotent )的实现。幂等和事务是 Kafka 0.11.0.0 版本引入的两个特性,以此来实现 EOS 语义。
Kafka 幂等性是 Producer 端的特性,为了实现生产端幂等性,Kafka 引入了 Producer ID(即PID)和 Sequence Number。
-
PID:每个新的 Producer 在初始化的时候会被分配一个唯一的 PID,这个PID 对用户完全是透明的。
-
Sequence Numbler:对于每个 PID,该 Producer 发送到每个 Partition 的 数据 都有对应的序列号,这些序列号是从0开始单调递增的。
Broker 端在缓存中保存了这 Sequence Numbler ,对于接收的每条消息,如果其序号比 Broker 缓存中序号大 于1则接受它,否则将其丢弃。这样就可以实现了消息重复提交了。幂等涉及的参数是 enable.idempotence,默认为 false,开启需要设置为 ture。
但是,这种只能保证单个 Producer 对于 单会话单 Partition 的 Exactly Once 语义。不能保证同一个 Producer 一个 topic 不同的 Partition 幂等。
Kafka 事务性
Kafka 事务支持
正是因为 Kafka Idempotent 不提供跨多个 Partition 和跨会话场景下的保证,因此,我们是需要一种更强的事务保证,能够原子处理多个 Partition 的写入操作,数据要么全部写入成功,要么全部失败,这就是 Kafka Transactions,即Kafka 事务。
Kafka 事务 API
producer提供了initTransactions,beginTransaction,sendOffsetsToTransaction,commitTransaction,abortTransaction 五个事务方法。
/**
* 初始化事务。需要注意的有:
* 1、前提
* 需要保证transation.id属性被配置。
* 2、这个方法执行逻辑是:
* (1)Ensures any transactions initiated by previous instances of the producer with the same
* transactional.id are completed. If the previous instance had failed with a transaction in
* progress, it will be aborted. If the last transaction had begun completion,
* but not yet finished, this method awaits its completion.
* (2)Gets the internal producer id and epoch, used in all future transactional
* messages issued by the producer.
*
*/
public void initTransactions();
/**
* 开启事务
*/
public void beginTransaction() throws ProducerFencedException ;
/**
* 为消费者提供的在事务内提交偏移量的操作
*/
public void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets,
String consumerGroupId) throws ProducerFencedException ;
/**
* 提交事务
*/
public void commitTransaction() throws ProducerFencedException;
/**
* 放弃事务,类似回滚事务的操作
*/
public void abortTransaction() throws ProducerFencedException ;
相关属性配置
使用 Kafka 的事务 API 时的一些注意事项:
-
需要消费者的自动模式设置为 false,并且不能子再手动的进行执行consumer#commitSync或者consumer#commitAsyc。
-
生产者配置 transactional.id 属性。
-
生产者不需要再配置 enable.idempotence,因为如果配置了transaction.id,则此时 enable.idempotence 会被设置为true。
-
消费者需要配置 isolation.level 属性,有两个可选值:"read_committed " , " read_uncommitted ",默认" read_uncommitted "。
Kafka 事务示例
以下是 Producer 事务使用示例:
Properties props = new Properties(); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("client.id", "ProducerTranscationnalExample"); props.put("bootstrap.servers", "localhost:9092"); props.put("transactional.id", "test-transactional");
props . put ( "acks" , "all" );
KafkaProducer producer = new KafkaProducer ( props );
producer . initTransactions ();
try {
String msg = "matt test" ; producer . beginTransaction (); producer . send ( new ProducerRecord ( topic , "0" , msg . toString ())); producer . send ( new ProducerRecord ( topic , "1" , msg . toString ())); producer . send ( new ProducerRecord ( topic , "2" , msg . toString ()));
producer . commitTransaction ();
} catch ( ProducerFencedException e1 ) {
e1 . printStackTrace ();
producer . close ();
} catch ( KafkaException e2 ) {
e2 . printStackTrace ();
producer . abortTransaction ();
}
producer . close();
Kafka 幂等与事务的关系
事务属性实现前提是幂等性,即在配置事务属性 transaction id 时,必须还得配置幂等性;但是幂等性是可以独立使用的,不需要依赖事务属性。
-
幂等性引入了 Porducer ID(还有 Sequence Numbler )。
-
事务属性引入了 Transaction Id 属性。
参数组合情况:
-
enable.idempotence = true,transactional.id不设置:只支持幂等性。
-
enable.idempotence = true,transactional.id设置:支持事务属性和幂等性
-
enable.idempotence = false,transactional.id不设置:没有事务属性和幂等性的kafka
-
enable.idempotence = false,transactional.id设置:无法获取到PID,此时会报错
参考链接:
Kafka EOS 之事务性实现:
https://www.codercto.com/a/36351.html
Kafka生产者事务和幂等:
http://www.heartthinkdo.com/?p=2040#5
一个进阶的大数据技术交流学习公众号,死磕大数据与分布式系统,分享NoSQL数据库、存储计算引擎、消息中间件等。长按二维码关注:
好文推荐:
喜欢就戳一下 "在看" ↘↘ ↘
以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网
猜你喜欢:- Java程序员TCP 协议到底怎么学?学了到底有什么用?
- Java事务解析(事务的基本操作+隔离的等级+事务的四大特性+事务的概念)
- 到底什么是DeFi?
- 到底什么是“云原生”?
- 动态规划到底有多难?
- 微软将开源进行到底
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
机器学习实践指南
麦好 / 机械工业出版社 / 2014-4-1 / 69.00
《机器学习实践指南:案例应用解析》是机器学习及数据分析领域不可多得的一本著作,也是为数不多的既有大量实践应用案例又包含算法理论剖析的著作,作者针对机器学习算法既抽象复杂又涉及多门数学学科的特点,力求理论联系实际,始终以算法应用为主线,由浅入深以全新的角度诠释机器学习。 全书分为准备篇、基础篇、统计分析实战篇和机器学习实战篇。准备篇介绍了机器学习的发展及应用前景以及常用科学计算平台,主要包括统......一起来看看 《机器学习实践指南》 这本书的介绍吧!