Kafka消费者增量拉取

栏目: 后端 · 发布时间: 5年前

内容简介:为了减少客户端每次拉取都要拉取全部的分区,增加了增量拉取分区的概念。拉取会话(Fetch Session),类似于web中的session是有状态的,客户端的fetch也可以认为是有状态的。这里的状态指的是知道“要拉取哪些分区”,如果第一次拉取了分区1,如果后续分区1没有数据,就不需要拉取分区1了。

简介

为了减少客户端每次拉取都要拉取全部的分区,增加了增量拉取分区的概念。

拉取会话(Fetch Session),类似于web中的session是有状态的,客户端的fetch也可以认为是有状态的。

这里的状态指的是知道“要拉取哪些分区”,如果第一次拉取了分区1,如果后续分区1没有数据,就不需要拉取分区1了。

FetchSession的数据结构如下:

case class FetchSession(val id: Int, // session编号是随机32位数字,防止未授权的客户端伪造数据
                        val privileged: Boolean,
                        val partitionMap: FetchSession.CACHE_MAP,
                        val creationMs: Long,
                        var lastUsedMs: Long,
                        var epoch: Int) // 自增

为了支持增量拉取,FetchSession需要维护每个分区的以下信息:

  • topic,partition Index(来自于TopicParttition)
  • maxBytes,fetchOffset,fetcherLogStartOffset(来自于最近一次的拉取请求)
  • highWatermark,localLogStartOffset(来自Leader的本地日志)

因为Follower或者Consumer发送拉取请求都是到Leader,所以FetchSession也是记录在Leader节点上的

FetchRequest Metadata(客户端的拉取请求元数据)

sessionId epoch 含义
0 -1 全量拉取(没有使用或者创建session时)
0 0 全量拉取(如果是新的会话,epoch从1开始)
$ID 0 关闭标识为$ID的增量拉取会话,并创建一个新的全量拉取
$ID $EPOCH 创建增量拉取

对于客户端而言,什么时候一个分区会被包含到增量的拉取请求中:

  • 客户端通知Broker,分区的maxBytes,fetchOffset,logStartOffset改变了
  • 分区在之前的增量拉取会话中不存在,客户端想要增加这个分区(拉取新的分区)
  • 分区在增量拉取会话中,客户端要移除

Fetch Response Metadata(服务端返回给客户端的sessionId)

sessionId 含义
0 之前没有创建过拉取回话
$ID 下一个请求会是增量的拉取请求,并且sessionId是$ID

服务端增加分区包含到增量的拉取响应中:

  • Broker通知客户端分区的hw或者brokerLogStartOffset变化了
  • 分区有新的数据

源码解析

Fetcher.java#sendFetches(): prepareFetchRequests创建FetchSessionHandler.FetchRequestData。

构建拉取请求通过FetchSessionHandler.Builder,builder.add(partition, PartitionData)会添加next:

即要拉取的分区。构建时调用Builder.build(),针对Full拉取:

// FetchSessionHandler.Builder.build()
if (nextMetadata.isFull()) { // epoch=0或者-1
    sessionPartitions = next; // next为之前调动add添加的分区
    next = null; // 本地full拉取,下次next=null
    Map<TopicPartition, PartitionData> toSend = Collections.unmodifiableMap(new LinkedHashMap<>(sessionPartitions));
    return new FetchRequestData(toSend, Collections.emptyList(), toSend, nextMetadata);
}

收到响应结果后,通过sessionHandler,调用FetchSessionHandler.handleResponse()。

假设第一次是Full拉取,响应结果没有出错时,nextMetadata.isFull()仍然为true。

假设服务端创建了一个新的session(随机的唯一ID),客户端的Fetch SessionId会设置为服务端返回的sessionId,

并且epoch会增加1。这样下次客户端的拉取就不再是Full,而是Increment了(toSend, toForget分别表示要拉取的和不需要拉取的)。

同样假设服务端正常处理(这次不会生成新的session),客户端也正常处理响应,则sessionId不会增加,但是epoch会增加1

public boolean handleResponse(FetchResponse<?> response) {
    if (response.error() != Errors.NONE) {
        log.info("Node {} was unable to process the fetch request with {}: {}.",
            node, nextMetadata, response.error());
        if (response.error() == Errors.FETCH_SESSION_ID_NOT_FOUND) {
            nextMetadata = FetchMetadata.INITIAL;
        } else {
            nextMetadata = nextMetadata.nextCloseExisting();
        }
        return false;
    } else if (nextMetadata.isFull()) {
        String problem = verifyFullFetchResponsePartitions(response);
        if (problem != null) {
            log.info("Node {} sent an invalid full fetch response with {}", node, problem);
            nextMetadata = FetchMetadata.INITIAL;
            return false;
        } else if (response.sessionId() == INVALID_SESSION_ID) {
            log.debug("Node {} sent a full fetch response{}",
                node, responseDataToLogString(response));
            nextMetadata = FetchMetadata.INITIAL;
            return true;
        } else {
            // The server created a new incremental fetch session. 客户端正常处理全量拉取的响应
            log.debug("Node {} sent a full fetch response that created a new incremental " +
                "fetch session {}{}", node, response.sessionId(), responseDataToLogString(response));
            nextMetadata = FetchMetadata.newIncremental(response.sessionId());
            return true;
        }
    } else {
        String problem = verifyIncrementalFetchResponsePartitions(response);
        if (problem != null) {
            log.info("Node {} sent an invalid incremental fetch response with {}", node, problem);
            nextMetadata = nextMetadata.nextCloseExisting();
            return false;
        } else if (response.sessionId() == INVALID_SESSION_ID) {
            // The incremental fetch session was closed by the server.
            log.debug("Node {} sent an incremental fetch response closing session {}{}",
                node, nextMetadata.sessionId(), responseDataToLogString(response));
            nextMetadata = FetchMetadata.INITIAL;
            return true;
        } else {
            // The incremental fetch session was continued by the server. 客户端正常处理增量拉取的响应结果
            log.debug("Node {} sent an incremental fetch response for session {}{}",
                node, response.sessionId(), responseDataToLogString(response));
            nextMetadata = nextMetadata.nextIncremental();
            return true;
        }
    }
}

服务端处理拉取请求时,会创建不同类型的FetchContext:

  • SessionErrorContext:拉取会话错误(比如epoch不相等)
  • SessionlessFetchContext:不需要拉取会话(旧版本)
  • IncrementalFetchContext:增量拉取
  • FullFetchContext:全量拉取
// KafkaApis.handleFetchRequest
    val fetchContext = fetchManager.newContext(
      fetchRequest.metadata,
      fetchRequest.fetchData,
      fetchRequest.toForget,
      fetchRequest.isFromFollower)

    // 针对不同的拉取上下文,分别更新并生成响应数据
    unconvertedFetchResponse = fetchContext.updateAndGenerateResponseData(partitions)

服务端的FetchManager创建Context时,如果FetchMetadata.isFull,再判断epoch=-1时,类型为SessionlessFetchContext,

否则(epoch=0)时,类型为FullFetchContext。如果!isFull(),必须保证session.epoch = FetchMetadata.epoch,否则类型为SessionErrorContext。

当!isFull且epoch相等时,先增加session.epoch(服务端的epoch,即为客户端下次拉取的epoch),然后返回类型为IncrementalFetchContext。

FullFetchContext更新响应数据,对于全量拉取,一般是新会话,所以需要更新缓存

override def updateAndGenerateResponseData(updates: FetchSession.RESP_MAP): FetchResponse[Records] = {
  def createNewSession: FetchSession.CACHE_MAP = {
    val cachedPartitions = new FetchSession.CACHE_MAP(updates.size)
    updates.entrySet.asScala.foreach(entry => {
      val part = entry.getKey
      val respData = entry.getValue
      val reqData = fetchData.get(part)
      cachedPartitions.mustAdd(new CachedPartition(part, reqData, respData))
    })
    cachedPartitions
  }
  val responseSessionId = cache.maybeCreateSession(time.milliseconds(), isFromFollower,
      updates.size, () => createNewSession)
  debug(s"Full fetch context with session id $responseSessionId returning " +
    s"${partitionsToLogString(updates.keySet)}")
  new FetchResponse(Errors.NONE, updates, 0, responseSessionId)
}

def maybeCreateSession(now: Long,
                       privileged: Boolean,
                       size: Int,
                       createPartitions: () => FetchSession.CACHE_MAP): Int =
synchronized {
  // If there is room, create a new session entry.
  if ((sessions.size < maxEntries) ||
      tryEvict(privileged, EvictableKey(privileged, size, 0), now)) {
    val partitionMap = createPartitions()
    // 这里创建一个新的session时,同时也会增加epoch,从0到1
    val session = new FetchSession(newSessionId(), privileged, partitionMap,
        now, now, JFetchMetadata.nextEpoch(INITIAL_EPOCH))
    debug(s"Created fetch session ${session.toString}")
    sessions.put(session.id, session)
    touch(session, now)
    session.id
  } else {
    debug(s"No fetch session created for privileged=$privileged, size=$size.")
    INVALID_SESSION_ID
  }
}

总结下客户端和服务端的Full拉取过程:

1.客户端创建的拉取请求FetchMetadata.isFull(),初始时epoch=0

2.服务端创建的FetchContext类型为FullFetchContext

3.服务端创建新的Session(xxx),以及初始化epoch=1(0+1=1),并缓存

4.客户端收到服务端的FetchResponse,设置FetchMetadata.sessionId为response中的sessionId(xxx),并增加epoch=1(从步骤1的0+1=1)

5.客户端继续拉取,isFull=false,sessionId=xxx, epoch=1

6.服务端创建的FetchContext类型为IncrementalFetchContext(满足session.epoch=reqMetadata.epoch=1, isFull=false)

7.服务端增加epoch,设置session.epoch=2,为下次的拉取(对比epoch)做准备

8.对reqMetadata.epoch加1(=2)然后对比session.epoch(2),如果不等,返回错误码INVALID_FETCH_SESSION_EPOCH,相等返回NONE

9.客户端收到服务端的FetchResponse,设置epoch增加1(sessionId没有变化时,不需要更新sessionId,实际上设置的是nextMetadata对象)


以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

Data Structures and Algorithm Analysis in Java

Data Structures and Algorithm Analysis in Java

Mark A. Weiss / Pearson / 2011-11-18 / GBP 129.99

Data Structures and Algorithm Analysis in Java is an “advanced algorithms” book that fits between traditional CS2 and Algorithms Analysis courses. In the old ACM Curriculum Guidelines, this course wa......一起来看看 《Data Structures and Algorithm Analysis in Java》 这本书的介绍吧!

随机密码生成器
随机密码生成器

多种字符组合密码

XML 在线格式化
XML 在线格式化

在线 XML 格式化压缩工具

Markdown 在线编辑器
Markdown 在线编辑器

Markdown 在线编辑器