内容简介:Kafka是一款很棒的消息系统,可以看看我之前写的要使用kafka首先要实例化一个核心实现是这个方法:
Kafka是一款很棒的消息系统,可以看看我之前写的 后端好书阅读与推荐 来了解一下它的整体设计。今天我们就来深入了解一下它的实现细节(我fork了一份 代码 ),首先关注 Producer 这一方。
要使用kafka首先要实例化一个 KafkaProducer
,需要有 brokerIP、序列化器
等 必要Properties
以及 acks(0、1、n)、compression、retries、batch.size
等 非必要Properties
,通过这个简单的接口可以控制 Producer
大部分行为,实例化后就可以调用 send
方法发送消息了。
核心实现是这个方法:
public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) { // intercept the record, which can be potentially modified; this method does not throw exceptions ProducerRecord<K, V> interceptedRecord = this.interceptors.onSend(record);//① return doSend(interceptedRecord, callback);//② }
通过不同的模式可以实现 发送即忘 (忽略返回结果)、 同步发送 (获取返回的future对象,回调函数置为null)、 异步发送 (设置回调函数)三种消息模式。
我们来看看消息类 ProducerRecord
有哪些属性:
private final String topic;//主题 private final Integer partition;//分区 private final Headers headers;//头 private final K key;//键 private final V value;//值 private final Long timestamp;//时间戳
它有多个构造函数,可以适应不同的消息类型:比如有无分区、有无key等。
①中 ProducerInterceptors
(有0 ~ 无穷多个,形成一个拦截链)对 ProducerRecord
进行拦截处理(比如打上时间戳,进行审计与统计等操作)
public ProducerRecord<K, V> onSend(ProducerRecord<K, V> record) { ProducerRecord<K, V> interceptRecord = record; for (ProducerInterceptor<K, V> interceptor : this.interceptors) { try { interceptRecord = interceptor.onSend(interceptRecord); } catch (Exception e) { // 不抛出异常,继续执行下一个拦截器 if (record != null) log.warn("Error executing interceptor onSend callback for topic: {}, partition: {}", record.topic(), record.partition(), e); else log.warn("Error executing interceptor onSend callback", e); } } return interceptRecord; }
如果用户有定义就进行处理并返回处理后的 ProducerRecord
,否则直接返回本身。
然后②中 doSend
真正发送消息,并且是异步的(源码太长只保留关键):
private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) { TopicPartition tp = null; try { // 序列化 key 和 value byte[] serializedKey; try { serializedKey = keySerializer.serialize(record.topic(), record.headers(), record.key()); } catch (ClassCastException cce) { } byte[] serializedValue; try { serializedValue = valueSerializer.serialize(record.topic(), record.headers(), record.value()); } catch (ClassCastException cce) { } // 计算分区获得主题与分区 int partition = partition(record, serializedKey, serializedValue, cluster); tp = new TopicPartition(record.topic(), partition); // 回调与事务处理省略。 Header[] headers = record.headers().toArray(); // 消息追加到RecordAccumulator中 RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey, serializedValue, headers, interceptCallback, remainingWaitMs); // 该批次满了或者创建了新的批次就要唤醒IO线程发送该批次了,也就是sender的wakeup方法 if (result.batchIsFull || result.newBatchCreated) { log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition); this.sender.wakeup(); } return result.future; } catch (Exception e) { // 拦截异常并抛出 this.interceptors.onSendError(record, tp, e); throw e; } }
下面是计算分区的方法:
private int partition(ProducerRecord<K, V> record, byte[] serializedKey, byte[] serializedValue, Cluster cluster) { Integer partition = record.partition(); // 消息有分区就直接使用,否则就使用分区器计算 return partition != null ? partition : partitioner.partition( record.topic(), record.key(), serializedKey, record.value(), serializedValue, cluster); }
默认的分区器 DefaultPartitioner
实现方式是如果partition存在就直接使用,否则根据key计算partition,如果key也不存在就使用round robin算法分配partition。
/** * The default partitioning strategy: * <ul> * <li>If a partition is specified in the record, use it * <li>If no partition is specified but a key is present choose a partition based on a hash of the key * <li>If no partition or key is present choose a partition in a round-robin fashion */ public class DefaultPartitioner implements Partitioner { private final ConcurrentMap<String, AtomicInteger> topicCounterMap = new ConcurrentHashMap<>(); public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { List<PartitionInfo> partitions = cluster.partitionsForTopic(topic); int numPartitions = partitions.size(); if (keyBytes == null) {//key为空 int nextValue = nextValue(topic); List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);//可用的分区 if (availablePartitions.size() > 0) {//有分区,取模就行 int part = Utils.toPositive(nextValue) % availablePartitions.size(); return availablePartitions.get(part).partition(); } else {// 无分区, return Utils.toPositive(nextValue) % numPartitions; } } else {// key 不为空,计算key的hash并取模获得分区 return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions; } } private int nextValue(String topic) { AtomicInteger counter = topicCounterMap.get(topic); if (null == counter) { counter = new AtomicInteger(ThreadLocalRandom.current().nextInt()); AtomicInteger currentCounter = topicCounterMap.putIfAbsent(topic, counter); if (currentCounter != null) { counter = currentCounter; } } return counter.getAndIncrement();//返回并加一,在取模的配合下就是round robin } }
以上就是发送消息的逻辑处理,接下来我们再看看消息发送的物理处理。
Sender
(是一个 Runnable
,被包含在一个IO线程 ioThread
中,该线程不断从 RecordAccumulator
队列中的读取消息并通过 Selector
将数据发送给 Broker
)的 wakeup
方法,实际上是 KafkaClient
接口的 wakeup
方法,由 NetworkClient
类实现,采用了NIO,也就是 java.nio.channels.Selector.wakeup()
方法实现。
Sender
的 run
中主要逻辑是不停执行准备消息和等待消息:
long pollTimeout = sendProducerData(now);//③ client.poll(pollTimeout, now);//④
③完成消息设置并保存到信道中,然后监听感兴趣的key,由 KafkaChannel
实现。
public void setSend(Send send) { if (this.send != null) throw new IllegalStateException("Attempt to begin a send operation with prior send operation still in progress, connection id is " + id); this.send = send; this.transportLayer.addInterestOps(SelectionKey.OP_WRITE); } // transportLayer的一种实现中的相关方法 public void addInterestOps(int ops) { key.interestOps(key.interestOps() | ops); }
④主要是 Selector
的 poll
,其select被wakeup唤醒:
public void poll(long timeout) throws IOException { /* check ready keys */ long startSelect = time.nanoseconds(); int numReadyKeys = select(timeout);//wakeup使其停止阻塞 long endSelect = time.nanoseconds(); this.sensors.selectTime.record(endSelect - startSelect, time.milliseconds()); if (numReadyKeys > 0 || !immediatelyConnectedKeys.isEmpty() || dataInBuffers) { Set<SelectionKey> readyKeys = this.nioSelector.selectedKeys(); // Poll from channels that have buffered data (but nothing more from the underlying socket) if (dataInBuffers) { keysWithBufferedRead.removeAll(readyKeys); //so no channel gets polled twice Set<SelectionKey> toPoll = keysWithBufferedRead; keysWithBufferedRead = new HashSet<>(); //poll() calls will repopulate if needed pollSelectionKeys(toPoll, false, endSelect); } // Poll from channels where the underlying socket has more data pollSelectionKeys(readyKeys, false, endSelect); // Clear all selected keys so that they are included in the ready count for the next select readyKeys.clear(); pollSelectionKeys(immediatelyConnectedKeys, true, endSelect); immediatelyConnectedKeys.clear(); } else { madeReadProgressLastPoll = true; //no work is also "progress" } long endIo = time.nanoseconds(); this.sensors.ioTime.record(endIo - endSelect, time.milliseconds()); }
其中 pollSelectionKeys
方法会调用如下方法完成消息发送:
public Send write() throws IOException { Send result = null; if (send != null && send(send)) { result = send; send = null; } return result; } private boolean send(Send send) throws IOException { send.writeTo(transportLayer); if (send.completed()) transportLayer.removeInterestOps(SelectionKey.OP_WRITE); return send.completed(); }
Send
是一次数据发包,一般由 ByteBufferSend
或者 MultiRecordsSend
实现,其 writeTo
调用 transportLayer
的 write
方法,一般由 PlaintextTransportLayer
或者 SslTransportLayer
实现,区分是否使用 ssl
:
public long writeTo(GatheringByteChannel channel) throws IOException { long written = channel.write(buffers); if (written < 0) throw new EOFException("Wrote negative bytes to channel. This shouldn't happen."); remaining -= written; pending = TransportLayers.hasPendingWrites(channel); return written; } public int write(ByteBuffer src) throws IOException { return socketChannel.write(src); }
到此就把 Producer 的 业务相关逻辑处理 和 非业务相关的网络 2方面的主要流程梳理清楚了。其他额外的功能是通过一些配置保证的。
比如顺序保证就是 max.in.flight.requests.per.connection
, InFlightRequests
的 doSend
会进行判断(由 NetworkClient
的 canSendRequest
调用),只要该参数设为1即可保证当前包未确认就不能发送下一个包从而实现有序性
public boolean canSendMore(String node) { Deque<NetworkClient.InFlightRequest> queue = requests.get(node); return queue == null || queue.isEmpty() || (queue.peekFirst().send.completed() && queue.size() < this.maxInFlightRequestsPerConnection); }
再比如可靠性,通过设置 acks
, Sender
中 sendProduceRequest
的 clientRequest
加入了回调函数:
RequestCompletionHandler callback = new RequestCompletionHandler() { public void onComplete(ClientResponse response) { handleProduceResponse(response, recordsByPartition, time.milliseconds());//调用completeBatch } }; /** * 完成或者重试投递,这里如果acks不对就会重试 * * @param batch The record batch * @param response The produce response * @param correlationId The correlation id for the request * @param now The current POSIX timestamp in milliseconds */ private void completeBatch(ProducerBatch batch, ProduceResponse.PartitionResponse response, long correlationId, long now, long throttleUntilTimeMs) { } public class ProduceResponse extends AbstractResponse { /** * Possible error code: * INVALID_REQUIRED_ACKS (21) */ }
kafka源码一层一层包装很多,错综复杂,如有错误请大家不吝赐教。
以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网
猜你喜欢:- 以太坊源码分析(36)ethdb源码分析
- [源码分析] kubelet源码分析(一)之 NewKubeletCommand
- libmodbus源码分析(3)从机(服务端)功能源码分析
- [源码分析] nfs-client-provisioner源码分析
- [源码分析] kubelet源码分析(三)之 Pod的创建
- Spring事务源码分析专题(一)JdbcTemplate使用及源码分析
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。