内容简介:为了减少客户端每次拉取都要拉取全部的分区,增加了增量拉取分区的概念。拉取会话(Fetch Session),类似于web中的session是有状态的,客户端的fetch也可以认为是有状态的。这里的状态指的是知道“要拉取哪些分区”,如果第一次拉取了分区1,如果后续分区1没有数据,就不需要拉取分区1了。
简介
为了减少客户端每次拉取都要拉取全部的分区,增加了增量拉取分区的概念。
拉取会话(Fetch Session),类似于web中的session是有状态的,客户端的fetch也可以认为是有状态的。
这里的状态指的是知道“要拉取哪些分区”,如果第一次拉取了分区1,如果后续分区1没有数据,就不需要拉取分区1了。
FetchSession的数据结构如下:
case class FetchSession(val id: Int, // session编号是随机32位数字,防止未授权的客户端伪造数据 val privileged: Boolean, val partitionMap: FetchSession.CACHE_MAP, val creationMs: Long, var lastUsedMs: Long, var epoch: Int) // 自增
为了支持增量拉取,FetchSession需要维护每个分区的以下信息:
- topic,partition Index(来自于TopicParttition)
- maxBytes,fetchOffset,fetcherLogStartOffset(来自于最近一次的拉取请求)
- highWatermark,localLogStartOffset(来自Leader的本地日志)
因为Follower或者Consumer发送拉取请求都是到Leader,所以FetchSession也是记录在Leader节点上的
FetchRequest Metadata(客户端的拉取请求元数据)
sessionId | epoch | 含义 |
---|---|---|
0 | -1 | 全量拉取(没有使用或者创建session时) |
0 | 0 | 全量拉取(如果是新的会话,epoch从1开始) |
$ID | 0 | 关闭标识为$ID的增量拉取会话,并创建一个新的全量拉取 |
$ID | $EPOCH | 创建增量拉取 |
对于客户端而言,什么时候一个分区会被包含到增量的拉取请求中:
- 客户端通知Broker,分区的maxBytes,fetchOffset,logStartOffset改变了
- 分区在之前的增量拉取会话中不存在,客户端想要增加这个分区(拉取新的分区)
- 分区在增量拉取会话中,客户端要移除
Fetch Response Metadata(服务端返回给客户端的sessionId)
sessionId | 含义 |
---|---|
0 | 之前没有创建过拉取回话 |
$ID | 下一个请求会是增量的拉取请求,并且sessionId是$ID |
服务端增加分区包含到增量的拉取响应中:
- Broker通知客户端分区的hw或者brokerLogStartOffset变化了
- 分区有新的数据
源码解析
Fetcher.java#sendFetches(): prepareFetchRequests创建FetchSessionHandler.FetchRequestData。
构建拉取请求通过FetchSessionHandler.Builder,builder.add(partition, PartitionData)会添加next:
即要拉取的分区。构建时调用Builder.build(),针对Full拉取:
// FetchSessionHandler.Builder.build() if (nextMetadata.isFull()) { // epoch=0或者-1 sessionPartitions = next; // next为之前调动add添加的分区 next = null; // 本地full拉取,下次next=null Map<TopicPartition, PartitionData> toSend = Collections.unmodifiableMap(new LinkedHashMap<>(sessionPartitions)); return new FetchRequestData(toSend, Collections.emptyList(), toSend, nextMetadata); }
收到响应结果后,通过sessionHandler,调用FetchSessionHandler.handleResponse()。
假设第一次是Full拉取,响应结果没有出错时,nextMetadata.isFull()仍然为true。
假设服务端创建了一个新的session(随机的唯一ID),客户端的Fetch SessionId会设置为服务端返回的sessionId,
并且epoch会增加1。这样下次客户端的拉取就不再是Full,而是Increment了(toSend, toForget分别表示要拉取的和不需要拉取的)。
同样假设服务端正常处理(这次不会生成新的session),客户端也正常处理响应,则sessionId不会增加,但是epoch会增加1
public boolean handleResponse(FetchResponse<?> response) { if (response.error() != Errors.NONE) { log.info("Node {} was unable to process the fetch request with {}: {}.", node, nextMetadata, response.error()); if (response.error() == Errors.FETCH_SESSION_ID_NOT_FOUND) { nextMetadata = FetchMetadata.INITIAL; } else { nextMetadata = nextMetadata.nextCloseExisting(); } return false; } else if (nextMetadata.isFull()) { String problem = verifyFullFetchResponsePartitions(response); if (problem != null) { log.info("Node {} sent an invalid full fetch response with {}", node, problem); nextMetadata = FetchMetadata.INITIAL; return false; } else if (response.sessionId() == INVALID_SESSION_ID) { log.debug("Node {} sent a full fetch response{}", node, responseDataToLogString(response)); nextMetadata = FetchMetadata.INITIAL; return true; } else { // The server created a new incremental fetch session. 客户端正常处理全量拉取的响应 log.debug("Node {} sent a full fetch response that created a new incremental " + "fetch session {}{}", node, response.sessionId(), responseDataToLogString(response)); nextMetadata = FetchMetadata.newIncremental(response.sessionId()); return true; } } else { String problem = verifyIncrementalFetchResponsePartitions(response); if (problem != null) { log.info("Node {} sent an invalid incremental fetch response with {}", node, problem); nextMetadata = nextMetadata.nextCloseExisting(); return false; } else if (response.sessionId() == INVALID_SESSION_ID) { // The incremental fetch session was closed by the server. log.debug("Node {} sent an incremental fetch response closing session {}{}", node, nextMetadata.sessionId(), responseDataToLogString(response)); nextMetadata = FetchMetadata.INITIAL; return true; } else { // The incremental fetch session was continued by the server. 客户端正常处理增量拉取的响应结果 log.debug("Node {} sent an incremental fetch response for session {}{}", node, response.sessionId(), responseDataToLogString(response)); nextMetadata = nextMetadata.nextIncremental(); return true; } } }
服务端处理拉取请求时,会创建不同类型的FetchContext:
- SessionErrorContext:拉取会话错误(比如epoch不相等)
- SessionlessFetchContext:不需要拉取会话(旧版本)
- IncrementalFetchContext:增量拉取
- FullFetchContext:全量拉取
// KafkaApis.handleFetchRequest val fetchContext = fetchManager.newContext( fetchRequest.metadata, fetchRequest.fetchData, fetchRequest.toForget, fetchRequest.isFromFollower) // 针对不同的拉取上下文,分别更新并生成响应数据 unconvertedFetchResponse = fetchContext.updateAndGenerateResponseData(partitions)
服务端的FetchManager创建Context时,如果FetchMetadata.isFull,再判断epoch=-1时,类型为SessionlessFetchContext,
否则(epoch=0)时,类型为FullFetchContext。如果!isFull(),必须保证session.epoch = FetchMetadata.epoch,否则类型为SessionErrorContext。
当!isFull且epoch相等时,先增加session.epoch(服务端的epoch,即为客户端下次拉取的epoch),然后返回类型为IncrementalFetchContext。
FullFetchContext更新响应数据,对于全量拉取,一般是新会话,所以需要更新缓存
override def updateAndGenerateResponseData(updates: FetchSession.RESP_MAP): FetchResponse[Records] = { def createNewSession: FetchSession.CACHE_MAP = { val cachedPartitions = new FetchSession.CACHE_MAP(updates.size) updates.entrySet.asScala.foreach(entry => { val part = entry.getKey val respData = entry.getValue val reqData = fetchData.get(part) cachedPartitions.mustAdd(new CachedPartition(part, reqData, respData)) }) cachedPartitions } val responseSessionId = cache.maybeCreateSession(time.milliseconds(), isFromFollower, updates.size, () => createNewSession) debug(s"Full fetch context with session id $responseSessionId returning " + s"${partitionsToLogString(updates.keySet)}") new FetchResponse(Errors.NONE, updates, 0, responseSessionId) } def maybeCreateSession(now: Long, privileged: Boolean, size: Int, createPartitions: () => FetchSession.CACHE_MAP): Int = synchronized { // If there is room, create a new session entry. if ((sessions.size < maxEntries) || tryEvict(privileged, EvictableKey(privileged, size, 0), now)) { val partitionMap = createPartitions() // 这里创建一个新的session时,同时也会增加epoch,从0到1 val session = new FetchSession(newSessionId(), privileged, partitionMap, now, now, JFetchMetadata.nextEpoch(INITIAL_EPOCH)) debug(s"Created fetch session ${session.toString}") sessions.put(session.id, session) touch(session, now) session.id } else { debug(s"No fetch session created for privileged=$privileged, size=$size.") INVALID_SESSION_ID } }
总结下客户端和服务端的Full拉取过程:
1.客户端创建的拉取请求FetchMetadata.isFull(),初始时epoch=0
2.服务端创建的FetchContext类型为FullFetchContext
3.服务端创建新的Session(xxx),以及初始化epoch=1(0+1=1),并缓存
4.客户端收到服务端的FetchResponse,设置FetchMetadata.sessionId为response中的sessionId(xxx),并增加epoch=1(从步骤1的0+1=1)
5.客户端继续拉取,isFull=false,sessionId=xxx, epoch=1
6.服务端创建的FetchContext类型为IncrementalFetchContext(满足session.epoch=reqMetadata.epoch=1, isFull=false)
7.服务端增加epoch,设置session.epoch=2,为下次的拉取(对比epoch)做准备
8.对reqMetadata.epoch加1(=2)然后对比session.epoch(2),如果不等,返回错误码INVALID_FETCH_SESSION_EPOCH,相等返回NONE
9.客户端收到服务端的FetchResponse,设置epoch增加1(sessionId没有变化时,不需要更新sessionId,实际上设置的是nextMetadata对象)
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网
猜你喜欢:- Kafka消费者的偏移量和高级/简单消费者
- 十一贝:航延险智能判定,公平消费环境惠及消费者
- Kafka分区与消费者的关系
- Kafka分区与消费者的关系
- Java精讲:生产者-消费者
- 7、服务发现&服务消费者Ribbon
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
Web Design Index by Content 3 (Web Design Index)
Pepin Press (EDT) / Pepin Press / 2007-11 / USD 29.99
Would you like an overview of the state of the art in web design in a specific field? WEB DESIGN INDEX BY CONTENT provides exactly that: every year, 500 new designs are selected and grouped in more th......一起来看看 《Web Design Index by Content 3 (Web Design Index)》 这本书的介绍吧!
RGB转16进制工具
RGB HEX 互转工具
RGB CMYK 转换工具
RGB CMYK 互转工具