内容简介:最近看到Kafka官方wiki上有一篇关于有且仅有一次语义与事务消息的文档(见生产者新增了5个新的方法(initTransactions, beginTransaction, sendOffsets, commitTransaction, abortTransaction),并且发送接口也增加了一个新的异常。见下面:如果broker检测出数据丢失,生产者接口会抛出OutOfOrderSequenceException异常。换句话说,就是broker发现序列号比预期序列号高。异常会在Future中返回,并且
最近看到Kafka官方wiki上有一篇关于有且仅有一次语义与事务消息的文档(见 这里 ),里面说的非常详细。对于有且仅有一次语义与事务消息是什么东西,大家可以看我的上一篇博客,或者看Kafka的这篇wiki,这里不做展开。这篇文章主要整理关于该语义和事务消息的API接口、数据流和配置。
生产者接口
生产者API的改动
生产者新增了5个新的方法(initTransactions, beginTransaction, sendOffsets, commitTransaction, abortTransaction),并且发送接口也增加了一个新的异常。见下面:
public interface Producer<K,V> extends Closeable { /* * Needs to be called before any of the other transaction methods. Assumes that * the transactional.id is specified in the producer configuration. * * This method does the following: * 1. Ensures any transactions initiated by previous instances of the producer * are completed. If the previous instance had failed with a transaction in * progress, it will be aborted. If the last transaction had begun completion, * but not yet finished, this method awaits its completion. * 2. Gets the internal producer id and epoch, used in all future transactional * messages issued by the producer. * * @throws IllegalStateException if the TransactionalId for the producer is not set * in the configuration. */ void initTransactions() throws IllegalStateException; /* * Should be called before the start of each new transaction. * * @throws ProducerFencedException if another producer is with the same * transactional.id is active. */ void beginTransaction() throws ProducerFencedException; /* * Sends a list of consumed offsets to the consumer group coordinator, and also marks * those offsets as part of the current transaction. These offsets will be considered * consumed only if the transaction is committed successfully. * * This method should be used when you need to batch consumed and produced messages * together, typically in a consume-transform-produce pattern. * * @throws ProducerFencedException if another producer is with the same * transactional.id is active. */ void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets, String consumerGroupId) throws ProducerFencedException; /* * Commits the ongoing transaction. * * @throws ProducerFencedException if another producer is with the same * transactional.id is active. */ void commitTransaction() throws ProducerFencedException; /* * Aborts the ongoing transaction. * * @throws ProducerFencedException if another producer is with the same * transactional.id is active. */ void abortTransaction() throws ProducerFencedException; /* * Send the given record asynchronously and return a future which will eventually contain the response information. * * @param record The record to send * @return A future which will eventually contain the response information * */ public Future<RecordMetadata> send(ProducerRecord<K, V> record); /* * Send a record and invoke the given callback when the record has been acknowledged by the server **/ public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback); }
OutOfOrderSequence异常
如果broker检测出数据丢失,生产者接口会抛出OutOfOrderSequenceException异常。换句话说,就是broker发现序列号比预期序列号高。异常会在Future中返回,并且如果存在callback的话会把异常传给callback。这是一个严重异常,生产者后续调用send, beginTransaction, commitTransaction等方法都会抛出一个IlegalStateException。
应用示例
以下是一个使用上述API的简单应用:
public class KafkaTransactionsExample { public static void main(String args[]) { KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerConfig); // Note that the ‘transactional.id’ configuration _must_ be specified in the // producer config in order to use transactions. KafkaProducer<String, String> producer = new KafkaProducer<>(producerConfig); // We need to initialize transactions once per producer instance. To use transactions, // it is assumed that the application id is specified in the config with the key // transactional.id. // // This method will recover or abort transactions initiated by previous instances of a // producer with the same app id. Any other transactional messages will report an error // if initialization was not performed. // // The response indicates success or failure. Some failures are irrecoverable and will // require a new producer instance. See the documentation for TransactionMetadata for a // list of error codes. producer.initTransactions(); while(true) { ConsumerRecords<String, String> records = consumer.poll(CONSUMER_POLL_TIMEOUT); if (!records.isEmpty()) { // Start a new transaction. This will begin the process of batching the consumed // records as well // as an records produced as a result of processing the input records. // // We need to check the response to make sure that this producer is able to initiate // a new transaction. producer.beginTransaction(); // Process the input records and send them to the output topic(s). List<ProducerRecord<String, String>> outputRecords = processRecords(records); for (ProducerRecord<String, String> outputRecord : outputRecords) { producer.send(outputRecord); } // To ensure that the consumed and produced messages are batched, we need to commit // the offsets through // the producer and not the consumer. // // If this returns an error, we should abort the transaction. sendOffsetsResult = producer.sendOffsetsToTransaction(getUncommittedOffsets()); // Now that we have consumed, processed, and produced a batch of messages, let's // commit the results. // If this does not report success, then the transaction will be rolled back. producer.endTransaction(); } } } }
新的配置
broker配置
- transactional.id.timeout.ms:事务协调者超过多长时间没有收到生产者TransactionalId的事务状态更新就认为其过期。默认值为604800000(7天),这个值使得每星期执行一次的生产者任务可以持续维护其ID。
- max.transaction.timeout.ms:事务超时时间。如果client请求事务时间超过这个值,那么broker会在InitPidRequest中返回一个InvalidTransactionTimeout异常。这防止client出现超时时间太长,这会使得消费者消费事务相关的主题时变慢。默认值为900000(15分钟),这是一个保守的上限值。
- transaction.state.log.replication.facto:事务状态主题的副本个数,默认值为3。
- transaction.state.log.num.partitions:事务状态主题的分区个数,默认值为50。
- transaction.state.log.min.isr:事务状态主题每个分区拥有多少个insync的副本才被视为上线。默认为2。
- transaction.state.log.segment.bytes:事务状态主题的日志段大小,默认为104857600字节。
生产者配置
- enable.idempotence:是否使用幂等写(默认为false)。如果为false,生产者发送消息请求时不会携带PID字段,保持为与之前的语义一样。如果希望使用事务,那么这个值必须置位true。如果为true,那么会额外要求acks=all,retries > 1,和 max.inflight.requests.per.connection=1。因为如果这些条件不满足,那么无法保证幂等性。如果应用没有显示指明这些属性,那么在启用幂等性时生产者会设置acks=all,retries=Integer.MAX_VALUE,和 max.inflight.requests.per.connection=1。
- transaction.timeout.ms:生产者超过多久没有更新事务状态,事务协调者会将其进行中的事务回滚。这个值会随着InitPidRequest一起发送给事务协调者。如果这个值大于broker设置的max.transaction.timeout.ms,那么请求会抛出InvalidTransactionTimeout异常。默认值为60000,防止下游消费阻塞等待超过1分钟。
- transactional.id:事务投递所使用的TransactionalId值。这个可以保证多个生产者会话的可靠性语义,因为这可以保证在使用相同TransactionalId的情况下,老的事务必须完成才能开启新的事务。需要注意的是,如果启用这个值,必须先设置enable.idempotence为true。此值默认为空,意味着没有使用事务。
消费者配置
- isolation.level:以下是可以取的值(默认为read_uncommitted):1)read_uncommitted:按位移顺序按序消费消息,无论其为提交还是未提交。2)read_committed:按位移顺序按序消费消息,但只消费非事务消息和已提交的事务消息;为了保持位移顺序,read_committed会使得消费者需要在获取到同一事务中的所有消息前需要缓存消息。
语义保证
生产者幂等性保证
为了实现生产者幂等性语义,我们引入了生产者ID(也称为PID)和消息序列号的概念。每一个新的生产者在初始化的时候都会赋予一个PID。PID的设置是对使用者透明的,不会在客户端中暴露出来。
对于一个指定的PID,序列号从0开始并且单调递增,每个主题分区都有一个序列号序列。生产者发送消息到broker后会增加序列号。broker则在内存中维护每个PID发到主题分区的序列号,一旦发现当前收到的序列号没有比上一次收到的序列号刚好大1,那么就会拒绝当前的生产者请求。如果消息携带的序列号比预期低而导致重复异常,生产者会忽略掉这个异常;如果消息携带的序列号比预期高而导致乱序异常,这就意味着有一些消息可能丢失了,这个异常是非常严重的。
通过这样的方法,就保证了即便生产者在出现失败的情况下进行重试,每个消息也只会在日志中仅出现一次。由于每个新的生产者实例都会分配一个新的唯一PID,因此只能保证单个生产者会话中实现幂等性。
这些幂等的生产者语义对于像指标跟踪和审计等应用可能非常有用。
事务保证
事务保证的核心就是,使得应用能够原子性的生产消息到多个分区,写入到这些分区的消息要么都成功要么都失败。
进一步地,由于消费者也是通过写入到位移主题来进行记录的,因此事务的能力可以用来使得应用将消费动作和生产动作原子化,也就是说消息被消费了当且仅当整个“消费-转换-生产”的链条都执行完毕。
另外,有状态的应用也可以实现跨越多个会话的连续性。也就是说,Kafka可以保证跨越应用边界的生产幂等性和事务性。为了达到这个目标,应用需要提供一个唯一ID,而且这个唯一ID能够跨越应用边界保持稳定不变。在下面的阐述中,会使用TransactionalId表示这个ID。TransactionalId和PID是一一对应的,区别在于TransactionalId是用户提供的,至于为什么TransactionalId能够保证跨越生产者会话的幂等性的原因下面来分析。
当提供了这样的一个TransactionalId,Kafka保证:
- 对于一个TransactionalId,只会有一个活跃的生产者。当具有相同TransactionalId的生产者上线时,会把老的生产者强制下线。
- 事务恢复跨越应用会话。如果一个应用实例死亡,下一个实例启动时会保证之前进行中的事务会被结束(提交或回滚),这样就保证了新的实例处于一个干净的状态。
需要注意的是,这里提到的事务保证是从生产者的角度来看的。对于消费者,这个保证会稍微弱一点。具体来讲,我们不能保证一个已提交事务的所有消息可以一起被消费。原因如下:
- 对于compact类型的主题,一个事务中的消息可能被更新的版本所代替。
- 事务可能跨越日志段。因此当老的日志段被删除了,可能会损失一个事务的开始部分。
- 消费者可以定位到事务中的任意位置开始消费,因此可能会丢失该事务的开始部分消息。
- 消费者可能消费不到事务中涉及到的分区。因此不能读取到该事务的所有消息。
核心概念
为了实现事务,也就是保证一组消息可以原子性生产和消费,Kafka引入了如下概念;
- 引入了事务协调者(Transaction Coordinator)的概念。与消费者的组协调者类似,每个生产者会有对应的事务协调者,赋予PID和管理事务的逻辑都由事务协调者来完成。
- 引入了事务日志(Transaction Log)的内部主题。与消费者位移主题类似,事务日志是每个事务的持久化多副本存储。事务协调者使用事务日志来保存当前活跃事务的最新状态快照。
- 引入了控制消息(Control Message)的概念。这些消息是客户端产生的并写入到主题的特殊消息,但对于使用者来说不可见。它们是用来让broker告知消费者之前拉取的消息是否被原子性提交。控制消息之前在 这里 被提到过。
- 引入了TransactionalId的概念,TransactionalId可以让使用者唯一标识一个生产者。一个生产者被设置了相同的TransactionalId的话,那么该生产者的不同实例会恢复或回滚之前实例的未完成事务。
- 引入了生产者epoch的概念。生产者epoch可以保证对于一个指定的TransactionalId只会有一个合法的生产者实例,从而保证事务性即便出现故障的情况下。
除了引入了上述概念之外,Kafka还有新的请求类型、已有请求类型的版本升级和新的消息格式,以支持事务。这些细节在本篇文章中不过多涉及。
数据流
在上面这幅图中,尖角框代表不同的机器,圆角框代表Kafka的主题分区(TopicPartition),对角线圆角框代表运行在broker中的逻辑实体。
每个箭头代表一个rpc或者主题的写入。这些操作的先后顺序见旁边的数字,下面按顺序来介绍这些操作。
1. 查询事务协调者(FindCoordinatorRequest请求)
事务协调者是设置PID和管理事务的核心,因此生产者第一件事就是向broker发起FindCoordinatorRequest请求(之前命名为GroupCoordinatorRequest,此版本将其重命名)获取其协调者。
2. 获取生产者ID(InitPidRequest请求)
在查询到事务协调者之后,生产者下一步就是获取其生产者ID,这一步是通过向事务协调者发送InitPidRequest来实现。
2.1 如果指定了TransactionalId的话
如果在配置中指定了transactional.id,transactional.id会在InitPidRequest请求中传递过来,transactional.id与生产者ID的映射会在步骤2a中记录到事务日志。这样未来的生产者如果发送了相同的transactional.id则返回这个相同的PID,从而可以恢复或回滚之前未完成的事务。
在返回PID之外,InitPidRequest还会完成如下任务:
- 增加生产者的epoch值,这样之前的生产者僵尸实例会被断开,不能继续操作事务。
- 恢复(提交或回滚)之前该PID对应的生产者实例的未完成事务。
InitPidRequest请求是同步的,一旦返回,生产者可以发送数据和开启新的事务。
2.2 如果TransactionalId未指定
如果TransactionalId未指定,会赋予一个新的PID,该生产者可以在其当前会话期间实现幂等性和事务性语义。
3. 开启事务(beginTransaction方法)
新的KafkaProducer有一个beginTransaction()方法,调用该方法会开启一个新的事务。生产者在本地状态中记录事务已经开始,只有发送第一个记录时协调者才会知道事务开始状态。
4. 消费-转换-生产的循环
在这个阶段中,生产者开始事务中的消费-转换-生产循环,这个阶段比较长而且可能由多个请求组成。
4.1 AddPartitionsToTxnRequest
在一个事务中,如果需要写入一个新的主题分区,那么生产者会发送此请求到事务协调者。协调者在步骤4.1a中会记录该分区添加到事务中。这个信息是必要的,因为这样才能写入提交或回滚标记到事务中的每个分区(见5.2步骤)。如果这是事务写入的第一个分区,那么协调者还会开始事务定时器。
4.2 ProduceRequest
生产者通过一个或多个ProduceRequests请求(在生产者send方法内部发出)写入消息到主题中。这些请求包含PID,epoch和序列号,见图中的4.2a。
4.3 AddOffsetCommitsToTxnRequest
生产者有一个新的sendOffsetsToTransaction方法,该方法可以将消息消费和生产结合起来。方法参数包含一个Map<TopicPartitions, OffsetAndMetadata>和一个groupId。
sendOffsetsToTransaction内部发送一个带有groupId的AddOffsetCommitsToTxnRequests请求到事务协调者,事务协调者从内部的__consumer-offsets主题中根据此消费者组获取到相应的主题分区。事务协调者在步骤4.3a中把这个主题分区记录到事务日志中。
4.4 TxnOffsetCommitRequest
生产者发送TxnOffsetCommitRequest请求到消费协调者来在主题__consumer-offsets中持久化位移(见4.4a)。消费协调者通过请求中的PID和生产者epoch来验证生产者是否允许发起该请求。
已消费的位移在提交事务之后才对外可见,此过程在下面来讨论。
5. 提交或回滚事务
消息数据写入之后,使用者需要调用KafkaProducer中的commitTransaction或abortTransaction方法,这两个方法分别为事务的提交和回滚处理方法。
5.1 EndTxnRequest
当生产者结束事务的时候,需要调用KafkaProducer.endTransaction或者KafkaProducer.abortTransaction方法。前者使得步骤4中的数据对下游的消费者可见,后者则从日志中抹除已生产的数据:这些数据不会对用户可见,也就是说下游消费者会读取并丢弃这些回滚消息。
无论调用哪个方法,生产者都是会发起EndTxnRequest请求到事务协调者,然后通过参数来指明事务提交或回滚。接收到此请求后,协调者会:
- 写入PREPARE_COMMIT或PREPARE_ABORT消息到事务日志(见5.1a)
- 通过WriteTxnMarkerRequest请求写入命令消息(COMMIT或ABORT)到用户日志中(见下面5.2)
- 写入COMMITTED或ABORTED消息到事务日志中
5.2 WriteTxnMarkerRequest请求
这个请求是事务协调者发给事务中每个分区的leader的。接收到此请求后,每个broker会写入COMMIT(PID)或ABORT(PID) 控制消息到日志中(步骤5.2a)。
这个消息向消费者指明该PID的消息传递给用户还是丢弃。因此,消费者接收到带有PID的消息后会缓存起来,直到读取到COMMIT或者ABORT消息,然后决定消息是通知用户还是丢弃。
另外,如果事务中涉及到__consumer-offsets主题,那么commit或者abort的标记同样写入到日志中,消费协调者会被告知这些位移是否标记为已消费(事务提交则为已消费,事务回滚则忽略这些位移)。见步骤4.2a。
5.3 写入最后的提交或回滚消息
在commit或abort标记写入到数据日志后,事务协调者写入最终的COMMITTED或ABORTED消息到事务日志,标记该事务已经完成(见图中的步骤5.3)。在这个时候,事务日志中关于这个事务的大部分消息都可以被删除;只需要保留该事务的PID和时间戳,这样可以最终删除关于该生产者的TransactionalId->PID映射,详情可参考PID过期的相关资料。
以上所述就是小编给大家介绍的《Kafka的有且仅有一次语义与事务消息》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!
猜你喜欢:- 消息队列的消费语义和投递语义
- 剑桥构建视觉“语义大脑”:兼顾视觉信息和语义表示
- 新瓶装旧酒:语义网络,语义网,链接数据和知识图谱
- 超强语义分割算法!基于语义流的快速而准确的场景解析
- 语义分割领域开山之作:Google提出用神经网络搜索实现语义分割
- 语义网络
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。