内容简介:原创不易,好看,就点个"在看"
点击箭头处 “蓝色字” ,关注我们哦!!
checkpoint是保证Flink状态容错的重要机制,通过checkpoint可以实现不同的数据语义,也就是我们所说的Exactly-Once与At-Least-Once,通过不同的checkpoint机制实现不同的数据语义,这里所说的机制表示的是checkpoint对齐机制:对齐,实现 Exactly-Once 语义,不对齐,实现 At- Least -Once 语义。
官方文档解释:
对齐通常发生在需要接受上游多个输入流的操作中,例如keyBy、join等操作,接下来将会从源码角度分析对齐机制的实现。
checkpoint机制的处理发生在StreamInputProcessor/StreamTwoInputProcessor中,该类主要负责从远端读取数据然后交给StreamOperator处理,数据读取由CheckpointBarrierHandler完成,同时也负责对齐机制的处理,由getNextNonBlocked方法完成,该接口有两个不同的实现类BarrierBuffer与BarrierTracker:
//在StreamInputProcessor/StreamTwoInputProcessor 中创建CheckpointBarrierHandler
//被调用
public static CheckpointBarrierHandler createCheckpointBarrierHandler(
StreamTask<?, ?> checkpointedTask,
CheckpointingMode checkpointMode,
IOManager ioManager,
InputGate inputGate,
Configuration taskManagerConfig) throws IOException {
CheckpointBarrierHandler barrierHandler;
if (checkpointMode == CheckpointingMode.EXACTLY_ONCE) {
long maxAlign = taskManagerConfig.getLong(TaskManagerOptions.TASK_CHECKPOINT_ALIGNMENT_BYTES_LIMIT);
if (!(maxAlign == -1 || maxAlign > 0)) {
throw new IllegalConfigurationException(
TaskManagerOptions.TASK_CHECKPOINT_ALIGNMENT_BYTES_LIMIT.key()
+ " must be positive or -1 (infinite)");
}
if (taskManagerConfig.getBoolean(TaskManagerOptions.NETWORK_CREDIT_MODEL)) {
barrierHandler = new BarrierBuffer(inputGate, new CachedBufferBlocker(inputGate.getPageSize()), maxAlign);
} else {
barrierHandler = new BarrierBuffer(inputGate, new BufferSpiller(ioManager, inputGate.getPageSize()), maxAlign);
}
} else if (checkpointMode == CheckpointingMode.AT_LEAST_ONCE) {
barrierHandler = new BarrierTracker(inputGate);
} else {
throw new IllegalArgumentException("Unrecognized Checkpointing Mode: " + checkpointMode);
}
if (checkpointedTask != null) {
barrierHandler.registerCheckpointEventHandler(checkpointedTask);
}
return barrierHandler;
}
由此可见 BarrierBuffer 用来实现对齐机制, BarrierTracker 用来实现非对齐机制。
对齐- BarrierBuffer
在 BarrierBuffer 包含了对齐使用的几个重要的成员变量:BufferBlocker类型的bufferBlocker、boolean类型数组的blockedChannels , Buffer Blocker 内部包含一个ArraryDeque的队列,用于缓存对齐时的数据, blockedChannels 用于判断通道是否处于对齐状态中。
对齐流程方法:
@Override
public BufferOrEvent getNextNonBlocked() throws Exception {
while (true) {
//.....
BufferOrEvent bufferOrEvent = next.get();
if (isBlocked(bufferOrEvent.getChannelIndex())) {
//当前获取数据channel处于对齐状态中则将数据添加到缓存中
//也就是 BufferBlocker中
bufferBlocker.add(bufferOrEvent);
checkSizeLimit();
}
else if (bufferOrEvent.isBuffer()) {
//buffer 则直接返回
return bufferOrEvent;
}
else if (bufferOrEvent.getEvent().getClass() == CheckpointBarrier.class) {
if (!endOfStream) {
// 处理CheckpointBarrier 类型的数据
processBarrier((CheckpointBarrier) bufferOrEvent.getEvent(), bufferOrEvent.getChannelIndex());
}
}
//.......
}
}
processBarrier方法:
private void processBarrier(CheckpointBarrier receivedBarrier, int channelIndex) throws Exception {
//barrierId表示当前批次的checkpointId
final long barrierId = receivedBarrier.getId();
// 如果是单输入流 则直接触发checkpoint
if (totalNumberOfInputChannels == 1) {
if (barrierId > currentCheckpointId) {
// new checkpoint
currentCheckpointId = barrierId;
notifyCheckpoint(receivedBarrier);
}
return;
}
//多输入流的处理,numBarriersReceived表示已接收到的
//当前批次checkpointId 的channel 个数
//numBarriersReceived >0 表示正在对齐过程中
if (numBarriersReceived > 0) {
// this is only true if some alignment is already progress and was not canceled
if (barrierId == currentCheckpointId) {
// regular case
onBarrier(channelIndex);
}
else if (barrierId > currentCheckpointId) {
// 如果到来的barrierId也就是checkpointId 大于当前正在
//发生对齐机制的checkpointId ,那么会取消当前的checkpoint(比喻说超时导致)
// 并且重置blockedChannels状态 重置numBarriersReceived为0
//然后开启下一次(barrierId) checkpoint对齐机制
LOG.warn("{}: Received checkpoint barrier for checkpoint {} before completing current checkpoint {}. " +
"Skipping current checkpoint.",
inputGate.getOwningTaskName(),
barrierId,
currentCheckpointId);
notifyAbort(currentCheckpointId, new CheckpointDeclineSubsumedException(barrierId));
releaseBlocksAndResetBarriers();
beginNewAlignment(barrierId, channelIndex);
}
else {
// ignore trailing barrier from an earlier checkpoint (obsolete now)
return;
}
}
else if (barrierId > currentCheckpointId) {
//numBarriersReceived==0 开启一次新的chechpoint
//将对应的blockedChannels置为阻塞状态true
beginNewAlignment(barrierId, channelIndex);
}
else {
// either the current checkpoint was canceled (numBarriers == 0) or
// this barrier is from an old subsumed checkpoint
return;
}
// check if we have all barriers - since canceled checkpoints always have zero barriers
// this can only happen on a non canceled checkpoint
if (numBarriersReceived + numClosedChannels == totalNumberOfInputChannels) {
// actually trigger checkpoint
if (LOG.isDebugEnabled()) {
LOG.debug("{}: Received all barriers, triggering checkpoint {} at {}.",
inputGate.getOwningTaskName(),
receivedBarrier.getId(),
receivedBarrier.getTimestamp());
}
//对齐完成 将缓存的数据(BufferBlocker中的数据)插入到消费队列中
//被消费 ,然后触发checkpoint
releaseBlocksAndResetBarriers();
notifyCheckpoint(receivedBarrier);
}
}
对齐总体流程:在接受上游多个输入情况,当从一个输入中接受到checkpointBarrier时,会暂时将该输入channel 置为阻塞状态,并且将后续从该channel读取到的数据暂存在缓存中,当后续所有channel的checkpointBarrier都达到后,将会处理缓存中的数据,并且开始checkpoint。
非对齐- BarrierTracker
对于非对齐机制相对来说就比较简单,不会发生数据缓存,当所有的channel的 checkpointB arrier 达到就开始执行checkpoint。
public BufferOrEvent getNextNonBlocked() throws Exception {
while (true) {
Optional<BufferOrEvent> next = inputGate.getNextBufferOrEvent();
if (!next.isPresent()) {
// buffer or input exhausted
return null;
}
BufferOrEvent bufferOrEvent = next.get();
if (bufferOrEvent.isBuffer()) {
return bufferOrEvent;
}
else if (bufferOrEvent.getEvent().getClass() == CheckpointBarrier.class) {
processBarrier((CheckpointBarrier) bufferOrEvent.getEvent(), bufferOrEvent.getChannelIndex());
}
else if (bufferOrEvent.getEvent().getClass() == CancelCheckpointMarker.class) {
processCheckpointAbortBarrier((CancelCheckpointMarker) bufferOrEvent.getEvent(), bufferOrEvent.getChannelIndex());
}
else {
// some other event
return bufferOrEvent;
}
}
}
processBarrier方法:
private void processBarrier(CheckpointBarrier receivedBarrier, int channelIndex) throws Exception {
final long barrierId = receivedBarrier.getId();
// 如果只有一个输入则直接触发checkpoint
if (totalNumberOfInputChannels == 1) {
notifyCheckpoint(barrierId, receivedBarrier.getTimestamp(), receivedBarrier.getCheckpointOptions());
return;
}
// general path for multiple input channels
if (LOG.isDebugEnabled()) {
LOG.debug("Received barrier for checkpoint {} from channel {}", barrierId, channelIndex);
}
// find the checkpoint barrier in the queue of pending barriers
CheckpointBarrierCount cbc = null;
int pos = 0;
//寻找同一批次的checkpoint
for (CheckpointBarrierCount next : pendingCheckpoints) {
if (next.checkpointId == barrierId) {
cbc = next;
break;
}
pos++;
}
if (cbc != null) {
// add one to the count to that barrier and check for completion
int numBarriersNew = cbc.incrementBarrierCount();
if (numBarriersNew == totalNumberOfInputChannels) {
// 集齐七龙珠 可以触发checkpoint了
for (int i = 0; i <= pos; i++) {
pendingCheckpoints.pollFirst();
}
// notify the listener
if (!cbc.isAborted()) {
if (LOG.isDebugEnabled()) {
LOG.debug("Received all barriers for checkpoint {}", barrierId);
}
notifyCheckpoint(receivedBarrier.getId(), receivedBarrier.getTimestamp(), receivedBarrier.getCheckpointOptions());
}
}
}
else {
// 新的开始了
if (barrierId > latestPendingCheckpointID) {
latestPendingCheckpointID = barrierId;
pendingCheckpoints.addLast(new CheckpointBarrierCount(barrierId));
// make sure we do not track too many checkpoints
if (pendingCheckpoints.size() > MAX_CHECKPOINTS_TO_TRACK) {
pendingCheckpoints.pollFirst();
}
}
}
}
非对齐总体流程:在接受上游多个输入情况下,每一个批次的checkpoint不会发生数据缓存,会直接交给下游去处理,checkpoint信息会被缓存在一个 CheckpointBarrierCount 类型的队列中,CheckpointBarrierCount标识了一次checkpoint与其channel输入checkpointBarrier个数,当 checkpointB arri e r个数 与channel个数相同则会触发checkpoint。
关注回复 Flink
获取更多系列
原创不易,好看,就点个"在看"
以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网
猜你喜欢:本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。