Kafka消费者增量拉取

栏目: 后端 · 发布时间: 5年前

内容简介:为了减少客户端每次拉取都要拉取全部的分区,增加了增量拉取分区的概念。拉取会话(Fetch Session),类似于web中的session是有状态的,客户端的fetch也可以认为是有状态的。这里的状态指的是知道“要拉取哪些分区”,如果第一次拉取了分区1,如果后续分区1没有数据,就不需要拉取分区1了。

简介

为了减少客户端每次拉取都要拉取全部的分区,增加了增量拉取分区的概念。

拉取会话(Fetch Session),类似于web中的session是有状态的,客户端的fetch也可以认为是有状态的。

这里的状态指的是知道“要拉取哪些分区”,如果第一次拉取了分区1,如果后续分区1没有数据,就不需要拉取分区1了。

FetchSession的数据结构如下:

case class FetchSession(val id: Int, // session编号是随机32位数字,防止未授权的客户端伪造数据
                        val privileged: Boolean,
                        val partitionMap: FetchSession.CACHE_MAP,
                        val creationMs: Long,
                        var lastUsedMs: Long,
                        var epoch: Int) // 自增

为了支持增量拉取,FetchSession需要维护每个分区的以下信息:

  • topic,partition Index(来自于TopicParttition)
  • maxBytes,fetchOffset,fetcherLogStartOffset(来自于最近一次的拉取请求)
  • highWatermark,localLogStartOffset(来自Leader的本地日志)

因为Follower或者Consumer发送拉取请求都是到Leader,所以FetchSession也是记录在Leader节点上的

FetchRequest Metadata(客户端的拉取请求元数据)

sessionId epoch 含义
0 -1 全量拉取(没有使用或者创建session时)
0 0 全量拉取(如果是新的会话,epoch从1开始)
$ID 0 关闭标识为$ID的增量拉取会话,并创建一个新的全量拉取
$ID $EPOCH 创建增量拉取

对于客户端而言,什么时候一个分区会被包含到增量的拉取请求中:

  • 客户端通知Broker,分区的maxBytes,fetchOffset,logStartOffset改变了
  • 分区在之前的增量拉取会话中不存在,客户端想要增加这个分区(拉取新的分区)
  • 分区在增量拉取会话中,客户端要移除

Fetch Response Metadata(服务端返回给客户端的sessionId)

sessionId 含义
0 之前没有创建过拉取回话
$ID 下一个请求会是增量的拉取请求,并且sessionId是$ID

服务端增加分区包含到增量的拉取响应中:

  • Broker通知客户端分区的hw或者brokerLogStartOffset变化了
  • 分区有新的数据

源码解析

Fetcher.java#sendFetches(): prepareFetchRequests创建FetchSessionHandler.FetchRequestData。

构建拉取请求通过FetchSessionHandler.Builder,builder.add(partition, PartitionData)会添加next:

即要拉取的分区。构建时调用Builder.build(),针对Full拉取:

// FetchSessionHandler.Builder.build()
if (nextMetadata.isFull()) { // epoch=0或者-1
    sessionPartitions = next; // next为之前调动add添加的分区
    next = null; // 本地full拉取,下次next=null
    Map<TopicPartition, PartitionData> toSend = Collections.unmodifiableMap(new LinkedHashMap<>(sessionPartitions));
    return new FetchRequestData(toSend, Collections.emptyList(), toSend, nextMetadata);
}

收到响应结果后,通过sessionHandler,调用FetchSessionHandler.handleResponse()。

假设第一次是Full拉取,响应结果没有出错时,nextMetadata.isFull()仍然为true。

假设服务端创建了一个新的session(随机的唯一ID),客户端的Fetch SessionId会设置为服务端返回的sessionId,

并且epoch会增加1。这样下次客户端的拉取就不再是Full,而是Increment了(toSend, toForget分别表示要拉取的和不需要拉取的)。

同样假设服务端正常处理(这次不会生成新的session),客户端也正常处理响应,则sessionId不会增加,但是epoch会增加1

public boolean handleResponse(FetchResponse<?> response) {
    if (response.error() != Errors.NONE) {
        log.info("Node {} was unable to process the fetch request with {}: {}.",
            node, nextMetadata, response.error());
        if (response.error() == Errors.FETCH_SESSION_ID_NOT_FOUND) {
            nextMetadata = FetchMetadata.INITIAL;
        } else {
            nextMetadata = nextMetadata.nextCloseExisting();
        }
        return false;
    } else if (nextMetadata.isFull()) {
        String problem = verifyFullFetchResponsePartitions(response);
        if (problem != null) {
            log.info("Node {} sent an invalid full fetch response with {}", node, problem);
            nextMetadata = FetchMetadata.INITIAL;
            return false;
        } else if (response.sessionId() == INVALID_SESSION_ID) {
            log.debug("Node {} sent a full fetch response{}",
                node, responseDataToLogString(response));
            nextMetadata = FetchMetadata.INITIAL;
            return true;
        } else {
            // The server created a new incremental fetch session. 客户端正常处理全量拉取的响应
            log.debug("Node {} sent a full fetch response that created a new incremental " +
                "fetch session {}{}", node, response.sessionId(), responseDataToLogString(response));
            nextMetadata = FetchMetadata.newIncremental(response.sessionId());
            return true;
        }
    } else {
        String problem = verifyIncrementalFetchResponsePartitions(response);
        if (problem != null) {
            log.info("Node {} sent an invalid incremental fetch response with {}", node, problem);
            nextMetadata = nextMetadata.nextCloseExisting();
            return false;
        } else if (response.sessionId() == INVALID_SESSION_ID) {
            // The incremental fetch session was closed by the server.
            log.debug("Node {} sent an incremental fetch response closing session {}{}",
                node, nextMetadata.sessionId(), responseDataToLogString(response));
            nextMetadata = FetchMetadata.INITIAL;
            return true;
        } else {
            // The incremental fetch session was continued by the server. 客户端正常处理增量拉取的响应结果
            log.debug("Node {} sent an incremental fetch response for session {}{}",
                node, response.sessionId(), responseDataToLogString(response));
            nextMetadata = nextMetadata.nextIncremental();
            return true;
        }
    }
}

服务端处理拉取请求时,会创建不同类型的FetchContext:

  • SessionErrorContext:拉取会话错误(比如epoch不相等)
  • SessionlessFetchContext:不需要拉取会话(旧版本)
  • IncrementalFetchContext:增量拉取
  • FullFetchContext:全量拉取
// KafkaApis.handleFetchRequest
    val fetchContext = fetchManager.newContext(
      fetchRequest.metadata,
      fetchRequest.fetchData,
      fetchRequest.toForget,
      fetchRequest.isFromFollower)

    // 针对不同的拉取上下文,分别更新并生成响应数据
    unconvertedFetchResponse = fetchContext.updateAndGenerateResponseData(partitions)

服务端的FetchManager创建Context时,如果FetchMetadata.isFull,再判断epoch=-1时,类型为SessionlessFetchContext,

否则(epoch=0)时,类型为FullFetchContext。如果!isFull(),必须保证session.epoch = FetchMetadata.epoch,否则类型为SessionErrorContext。

当!isFull且epoch相等时,先增加session.epoch(服务端的epoch,即为客户端下次拉取的epoch),然后返回类型为IncrementalFetchContext。

FullFetchContext更新响应数据,对于全量拉取,一般是新会话,所以需要更新缓存

override def updateAndGenerateResponseData(updates: FetchSession.RESP_MAP): FetchResponse[Records] = {
  def createNewSession: FetchSession.CACHE_MAP = {
    val cachedPartitions = new FetchSession.CACHE_MAP(updates.size)
    updates.entrySet.asScala.foreach(entry => {
      val part = entry.getKey
      val respData = entry.getValue
      val reqData = fetchData.get(part)
      cachedPartitions.mustAdd(new CachedPartition(part, reqData, respData))
    })
    cachedPartitions
  }
  val responseSessionId = cache.maybeCreateSession(time.milliseconds(), isFromFollower,
      updates.size, () => createNewSession)
  debug(s"Full fetch context with session id $responseSessionId returning " +
    s"${partitionsToLogString(updates.keySet)}")
  new FetchResponse(Errors.NONE, updates, 0, responseSessionId)
}

def maybeCreateSession(now: Long,
                       privileged: Boolean,
                       size: Int,
                       createPartitions: () => FetchSession.CACHE_MAP): Int =
synchronized {
  // If there is room, create a new session entry.
  if ((sessions.size < maxEntries) ||
      tryEvict(privileged, EvictableKey(privileged, size, 0), now)) {
    val partitionMap = createPartitions()
    // 这里创建一个新的session时,同时也会增加epoch,从0到1
    val session = new FetchSession(newSessionId(), privileged, partitionMap,
        now, now, JFetchMetadata.nextEpoch(INITIAL_EPOCH))
    debug(s"Created fetch session ${session.toString}")
    sessions.put(session.id, session)
    touch(session, now)
    session.id
  } else {
    debug(s"No fetch session created for privileged=$privileged, size=$size.")
    INVALID_SESSION_ID
  }
}

总结下客户端和服务端的Full拉取过程:

1.客户端创建的拉取请求FetchMetadata.isFull(),初始时epoch=0

2.服务端创建的FetchContext类型为FullFetchContext

3.服务端创建新的Session(xxx),以及初始化epoch=1(0+1=1),并缓存

4.客户端收到服务端的FetchResponse,设置FetchMetadata.sessionId为response中的sessionId(xxx),并增加epoch=1(从步骤1的0+1=1)

5.客户端继续拉取,isFull=false,sessionId=xxx, epoch=1

6.服务端创建的FetchContext类型为IncrementalFetchContext(满足session.epoch=reqMetadata.epoch=1, isFull=false)

7.服务端增加epoch,设置session.epoch=2,为下次的拉取(对比epoch)做准备

8.对reqMetadata.epoch加1(=2)然后对比session.epoch(2),如果不等,返回错误码INVALID_FETCH_SESSION_EPOCH,相等返回NONE

9.客户端收到服务端的FetchResponse,设置epoch增加1(sessionId没有变化时,不需要更新sessionId,实际上设置的是nextMetadata对象)


以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

大型分布式网站架构设计与实践

大型分布式网站架构设计与实践

陈康贤 / 电子工业出版社 / 2014-9-1 / 79.00元

《大型分布式网站架构设计与实践》主要介绍了大型分布式网站架构所涉及的一些技术细节,包括SOA架构的实现、互联网安全架构、构建分布式网站所依赖的基础设施、系统稳定性保障和海量数据分析等内容;深入地讲述了大型分布式网站架构设计的核心原理,并通过一些架构设计的典型案例,帮助读者了解大型分布式网站设计的一些常见场景及遇到的问题。 作者结合自己在阿里巴巴及淘宝网的实际工作经历展开论述。《大型分布式网站......一起来看看 《大型分布式网站架构设计与实践》 这本书的介绍吧!

SHA 加密
SHA 加密

SHA 加密工具

Markdown 在线编辑器
Markdown 在线编辑器

Markdown 在线编辑器

RGB HSV 转换
RGB HSV 转换

RGB HSV 互转工具