内容简介:本文主要研究一下flink的slot.idle.timeout配置flink-release-1.7.2/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.javaflink-release-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
序
本文主要研究一下flink的slot.idle.timeout配置
JobManagerOptions
flink-release-1.7.2/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
@PublicEvolving public class JobManagerOptions { //...... /** * The timeout in milliseconds for a idle slot in Slot Pool. */ public static final ConfigOption<Long> SLOT_IDLE_TIMEOUT = key("slot.idle.timeout") // default matches heartbeat.timeout so that sticky allocation is not lost on timeouts for local recovery .defaultValue(HeartbeatManagerOptions.HEARTBEAT_TIMEOUT.defaultValue()) .withDescription("The timeout in milliseconds for a idle slot in Slot Pool."); //...... }
- slot.idle.timeout默认为HeartbeatManagerOptions.HEARTBEAT_TIMEOUT.defaultValue(),即50000L毫秒
SlotPool
flink-release-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
public class SlotPool extends RpcEndpoint implements SlotPoolGateway, AllocatedSlotActions { /** The interval (in milliseconds) in which the SlotPool writes its slot distribution on debug level. */ private static final int STATUS_LOG_INTERVAL_MS = 60_000; private final JobID jobId; private final SchedulingStrategy schedulingStrategy; private final ProviderAndOwner providerAndOwner; /** All registered TaskManagers, slots will be accepted and used only if the resource is registered. */ private final HashSet<ResourceID> registeredTaskManagers; /** The book-keeping of all allocated slots. */ private final AllocatedSlots allocatedSlots; /** The book-keeping of all available slots. */ private final AvailableSlots availableSlots; /** All pending requests waiting for slots. */ private final DualKeyMap<SlotRequestId, AllocationID, PendingRequest> pendingRequests; /** The requests that are waiting for the resource manager to be connected. */ private final HashMap<SlotRequestId, PendingRequest> waitingForResourceManager; /** Timeout for external request calls (e.g. to the ResourceManager or the TaskExecutor). */ private final Time rpcTimeout; /** Timeout for releasing idle slots. */ private final Time idleSlotTimeout; private final Clock clock; /** Managers for the different slot sharing groups. */ protected final Map<SlotSharingGroupId, SlotSharingManager> slotSharingManagers; /** the fencing token of the job manager. */ private JobMasterId jobMasterId; /** The gateway to communicate with resource manager. */ private ResourceManagerGateway resourceManagerGateway; private String jobManagerAddress; //...... /** * Start the slot pool to accept RPC calls. * * @param jobMasterId The necessary leader id for running the job. * @param newJobManagerAddress for the slot requests which are sent to the resource manager */ public void start(JobMasterId jobMasterId, String newJobManagerAddress) throws Exception { this.jobMasterId = checkNotNull(jobMasterId); this.jobManagerAddress = checkNotNull(newJobManagerAddress); // TODO - start should not throw an exception try { super.start(); } catch (Exception e) { throw new RuntimeException("This should never happen", e); } scheduleRunAsync(this::checkIdleSlot, idleSlotTimeout); if (log.isDebugEnabled()) { scheduleRunAsync(this::scheduledLogStatus, STATUS_LOG_INTERVAL_MS, TimeUnit.MILLISECONDS); } } /** * Check the available slots, release the slot that is idle for a long time. */ private void checkIdleSlot() { // The timestamp in SlotAndTimestamp is relative final long currentRelativeTimeMillis = clock.relativeTimeMillis(); final List<AllocatedSlot> expiredSlots = new ArrayList<>(availableSlots.size()); for (SlotAndTimestamp slotAndTimestamp : availableSlots.availableSlots.values()) { if (currentRelativeTimeMillis - slotAndTimestamp.timestamp > idleSlotTimeout.toMilliseconds()) { expiredSlots.add(slotAndTimestamp.slot); } } final FlinkException cause = new FlinkException("Releasing idle slot."); for (AllocatedSlot expiredSlot : expiredSlots) { final AllocationID allocationID = expiredSlot.getAllocationId(); if (availableSlots.tryRemove(allocationID) != null) { log.info("Releasing idle slot [{}].", allocationID); final CompletableFuture<Acknowledge> freeSlotFuture = expiredSlot.getTaskManagerGateway().freeSlot( allocationID, cause, rpcTimeout); freeSlotFuture.whenCompleteAsync( (Acknowledge ignored, Throwable throwable) -> { if (throwable != null) { if (registeredTaskManagers.contains(expiredSlot.getTaskManagerId())) { log.debug("Releasing slot [{}] of registered TaskExecutor {} failed. " + "Trying to fulfill a different slot request.", allocationID, expiredSlot.getTaskManagerId(), throwable); tryFulfillSlotRequestOrMakeAvailable(expiredSlot); } else { log.debug("Releasing slot [{}] failed and owning TaskExecutor {} is no " + "longer registered. Discarding slot.", allocationID, expiredSlot.getTaskManagerId()); } } }, getMainThreadExecutor()); } } scheduleRunAsync(this::checkIdleSlot, idleSlotTimeout); } //...... }
- SlotPool在start方法里头,调用scheduleRunAsync方法,延时idleSlotTimeout调度执行checkIdleSlot;checkIdleSlot方法会挨个检查availableSlots的SlotAndTimestamp,判断当前时间与slotAndTimestamp.timestamp的时间差是否超过idleSlotTimeout,超过的话,则放入expiredSlots,之后对expiredSlots挨个进行availableSlots.tryRemove,然后调用TaskManagerGateway.freeSlot进行释放,之后再次调用scheduleRunAsync(this::checkIdleSlot, idleSlotTimeout)进行下一次的延时调度检测
RpcEndpoint
flink-release-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
public abstract class RpcEndpoint implements RpcGateway { //...... /** * Execute the runnable in the main thread of the underlying RPC endpoint, with * a delay of the given number of milliseconds. * * @param runnable Runnable to be executed * @param delay The delay after which the runnable will be executed */ protected void scheduleRunAsync(Runnable runnable, Time delay) { scheduleRunAsync(runnable, delay.getSize(), delay.getUnit()); } /** * Execute the runnable in the main thread of the underlying RPC endpoint, with * a delay of the given number of milliseconds. * * @param runnable Runnable to be executed * @param delay The delay after which the runnable will be executed */ protected void scheduleRunAsync(Runnable runnable, long delay, TimeUnit unit) { rpcServer.scheduleRunAsync(runnable, unit.toMillis(delay)); } //...... }
- RpcEndpoint提供了scheduleRunAsync,其最后调用的是rpcServer.scheduleRunAsync
小结
- slot.idle.timeout默认为HeartbeatManagerOptions.HEARTBEAT_TIMEOUT.defaultValue(),即50000L毫秒
- SlotPool在start方法里头,调用scheduleRunAsync方法,延时idleSlotTimeout调度执行checkIdleSlot;checkIdleSlot方法会挨个检查availableSlots的SlotAndTimestamp,判断当前时间与slotAndTimestamp.timestamp的时间差是否超过idleSlotTimeout,超过的话,则放入expiredSlots,之后对expiredSlots挨个进行availableSlots.tryRemove,然后调用TaskManagerGateway.freeSlot进行释放,之后再次调用scheduleRunAsync(this::checkIdleSlot, idleSlotTimeout)进行下一次的延时调度检测
- RpcEndpoint提供了scheduleRunAsync,其最后调用的是rpcServer.scheduleRunAsync
doc
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网
猜你喜欢:- 聊聊flink的logback配置
- 聊聊 Nacos 配置隔离和分类的使用
- 聊聊flink的log.file配置 原 荐
- 聊聊flink的slot.request.timeout配置
- 聊聊动态规划(2) -- 特征
- 聊聊动态规划(1) -- 概念
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
结构化计算机组成
Andrew S.Tanenbaum / 刘卫东 / 机械工业出版社 / 2001-10-1 / 46.00
AndrewcS.Tanenbaum获得过美国麻省理工学院的理学学士学位和加利福尼亚大学伯克利分校的哲学博士学位,目前是荷兰阿姆斯特丹Vrije大学计算机科学系的教授,并领导着一个计算机系统的研究小组.同时,他还是一所计算与图像处理学院的院长,这是由几所大学合作成立的研究生院.尽管社会工作很多,但他并没有中断学术研究. 多年来,他在编译技术.操作系统.网络及局域分布式系统方面进行了大量的一起来看看 《结构化计算机组成》 这本书的介绍吧!