内容简介:本文主要研究一下flink的slot.request.timeout配置flink-release-1.7.2/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.javaflink-release-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerConfigurat
序
本文主要研究一下flink的slot.request.timeout配置
JobManagerOptions
flink-release-1.7.2/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
@PublicEvolving public class JobManagerOptions { //...... /** * The timeout in milliseconds for requesting a slot from Slot Pool. */ public static final ConfigOption<Long> SLOT_REQUEST_TIMEOUT = key("slot.request.timeout") .defaultValue(5L * 60L * 1000L) .withDescription("The timeout in milliseconds for requesting a slot from Slot Pool."); //...... }
- slot.request.timeout默认为5分钟
SlotManagerConfiguration
flink-release-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerConfiguration.java
public class SlotManagerConfiguration { private static final Logger LOGGER = LoggerFactory.getLogger(SlotManagerConfiguration.class); private final Time taskManagerRequestTimeout; private final Time slotRequestTimeout; private final Time taskManagerTimeout; public SlotManagerConfiguration( Time taskManagerRequestTimeout, Time slotRequestTimeout, Time taskManagerTimeout) { this.taskManagerRequestTimeout = Preconditions.checkNotNull(taskManagerRequestTimeout); this.slotRequestTimeout = Preconditions.checkNotNull(slotRequestTimeout); this.taskManagerTimeout = Preconditions.checkNotNull(taskManagerTimeout); } public Time getTaskManagerRequestTimeout() { return taskManagerRequestTimeout; } public Time getSlotRequestTimeout() { return slotRequestTimeout; } public Time getTaskManagerTimeout() { return taskManagerTimeout; } public static SlotManagerConfiguration fromConfiguration(Configuration configuration) throws ConfigurationException { final String strTimeout = configuration.getString(AkkaOptions.ASK_TIMEOUT); final Time rpcTimeout; try { rpcTimeout = Time.milliseconds(Duration.apply(strTimeout).toMillis()); } catch (NumberFormatException e) { throw new ConfigurationException("Could not parse the resource manager's timeout " + "value " + AkkaOptions.ASK_TIMEOUT + '.', e); } final Time slotRequestTimeout = getSlotRequestTimeout(configuration); final Time taskManagerTimeout = Time.milliseconds( configuration.getLong(ResourceManagerOptions.TASK_MANAGER_TIMEOUT)); return new SlotManagerConfiguration(rpcTimeout, slotRequestTimeout, taskManagerTimeout); } private static Time getSlotRequestTimeout(final Configuration configuration) { final long slotRequestTimeoutMs; if (configuration.contains(ResourceManagerOptions.SLOT_REQUEST_TIMEOUT)) { LOGGER.warn("Config key {} is deprecated; use {} instead.", ResourceManagerOptions.SLOT_REQUEST_TIMEOUT, JobManagerOptions.SLOT_REQUEST_TIMEOUT); slotRequestTimeoutMs = configuration.getLong(ResourceManagerOptions.SLOT_REQUEST_TIMEOUT); } else { slotRequestTimeoutMs = configuration.getLong(JobManagerOptions.SLOT_REQUEST_TIMEOUT); } return Time.milliseconds(slotRequestTimeoutMs); } }
- SlotManagerConfiguration的getSlotRequestTimeout方法会从配置文件读取JobManagerOptions.SLOT_REQUEST_TIMEOUT
SlotManager
flink-release-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
public class SlotManager implements AutoCloseable { private static final Logger LOG = LoggerFactory.getLogger(SlotManager.class); /** Scheduled executor for timeouts. */ private final ScheduledExecutor scheduledExecutor; /** Timeout for slot requests to the task manager. */ private final Time taskManagerRequestTimeout; /** Timeout after which an allocation is discarded. */ private final Time slotRequestTimeout; /** Timeout after which an unused TaskManager is released. */ private final Time taskManagerTimeout; /** Map for all registered slots. */ private final HashMap<SlotID, TaskManagerSlot> slots; /** Index of all currently free slots. */ private final LinkedHashMap<SlotID, TaskManagerSlot> freeSlots; /** All currently registered task managers. */ private final HashMap<InstanceID, TaskManagerRegistration> taskManagerRegistrations; /** Map of fulfilled and active allocations for request deduplication purposes. */ private final HashMap<AllocationID, SlotID> fulfilledSlotRequests; /** Map of pending/unfulfilled slot allocation requests. */ private final HashMap<AllocationID, PendingSlotRequest> pendingSlotRequests; private final HashMap<TaskManagerSlotId, PendingTaskManagerSlot> pendingSlots; /** ResourceManager's id. */ private ResourceManagerId resourceManagerId; /** Executor for future callbacks which have to be "synchronized". */ private Executor mainThreadExecutor; /** Callbacks for resource (de-)allocations. */ private ResourceActions resourceActions; private ScheduledFuture<?> taskManagerTimeoutCheck; private ScheduledFuture<?> slotRequestTimeoutCheck; /** True iff the component has been started. */ private boolean started; public SlotManager( ScheduledExecutor scheduledExecutor, Time taskManagerRequestTimeout, Time slotRequestTimeout, Time taskManagerTimeout) { this.scheduledExecutor = Preconditions.checkNotNull(scheduledExecutor); this.taskManagerRequestTimeout = Preconditions.checkNotNull(taskManagerRequestTimeout); this.slotRequestTimeout = Preconditions.checkNotNull(slotRequestTimeout); this.taskManagerTimeout = Preconditions.checkNotNull(taskManagerTimeout); slots = new HashMap<>(16); freeSlots = new LinkedHashMap<>(16); taskManagerRegistrations = new HashMap<>(4); fulfilledSlotRequests = new HashMap<>(16); pendingSlotRequests = new HashMap<>(16); pendingSlots = new HashMap<>(16); resourceManagerId = null; resourceActions = null; mainThreadExecutor = null; taskManagerTimeoutCheck = null; slotRequestTimeoutCheck = null; started = false; } public void start(ResourceManagerId newResourceManagerId, Executor newMainThreadExecutor, ResourceActions newResourceActions) { LOG.info("Starting the SlotManager."); this.resourceManagerId = Preconditions.checkNotNull(newResourceManagerId); mainThreadExecutor = Preconditions.checkNotNull(newMainThreadExecutor); resourceActions = Preconditions.checkNotNull(newResourceActions); started = true; taskManagerTimeoutCheck = scheduledExecutor.scheduleWithFixedDelay( () -> mainThreadExecutor.execute( () -> checkTaskManagerTimeouts()), 0L, taskManagerTimeout.toMilliseconds(), TimeUnit.MILLISECONDS); slotRequestTimeoutCheck = scheduledExecutor.scheduleWithFixedDelay( () -> mainThreadExecutor.execute( () -> checkSlotRequestTimeouts()), 0L, slotRequestTimeout.toMilliseconds(), TimeUnit.MILLISECONDS); } /** * Suspends the component. This clears the internal state of the slot manager. */ public void suspend() { LOG.info("Suspending the SlotManager."); // stop the timeout checks for the TaskManagers and the SlotRequests if (taskManagerTimeoutCheck != null) { taskManagerTimeoutCheck.cancel(false); taskManagerTimeoutCheck = null; } if (slotRequestTimeoutCheck != null) { slotRequestTimeoutCheck.cancel(false); slotRequestTimeoutCheck = null; } for (PendingSlotRequest pendingSlotRequest : pendingSlotRequests.values()) { cancelPendingSlotRequest(pendingSlotRequest); } pendingSlotRequests.clear(); ArrayList<InstanceID> registeredTaskManagers = new ArrayList<>(taskManagerRegistrations.keySet()); for (InstanceID registeredTaskManager : registeredTaskManagers) { unregisterTaskManager(registeredTaskManager); } resourceManagerId = null; resourceActions = null; started = false; } public boolean registerSlotRequest(SlotRequest slotRequest) throws SlotManagerException { checkInit(); if (checkDuplicateRequest(slotRequest.getAllocationId())) { LOG.debug("Ignoring a duplicate slot request with allocation id {}.", slotRequest.getAllocationId()); return false; } else { PendingSlotRequest pendingSlotRequest = new PendingSlotRequest(slotRequest); pendingSlotRequests.put(slotRequest.getAllocationId(), pendingSlotRequest); try { internalRequestSlot(pendingSlotRequest); } catch (ResourceManagerException e) { // requesting the slot failed --> remove pending slot request pendingSlotRequests.remove(slotRequest.getAllocationId()); throw new SlotManagerException("Could not fulfill slot request " + slotRequest.getAllocationId() + '.', e); } return true; } } private void checkSlotRequestTimeouts() { if (!pendingSlotRequests.isEmpty()) { long currentTime = System.currentTimeMillis(); Iterator<Map.Entry<AllocationID, PendingSlotRequest>> slotRequestIterator = pendingSlotRequests.entrySet().iterator(); while (slotRequestIterator.hasNext()) { PendingSlotRequest slotRequest = slotRequestIterator.next().getValue(); if (currentTime - slotRequest.getCreationTimestamp() >= slotRequestTimeout.toMilliseconds()) { slotRequestIterator.remove(); if (slotRequest.isAssigned()) { cancelPendingSlotRequest(slotRequest); } resourceActions.notifyAllocationFailure( slotRequest.getJobId(), slotRequest.getAllocationId(), new TimeoutException("The allocation could not be fulfilled in time.")); } } } } //...... }
- SlotManager的构造器接收slotRequestTimeout参数;它维护了pendingSlotRequests的map;start方法会注册slotRequestTimeoutCheck,每隔slotRequestTimeout的时间调度一次,执行的是checkSlotRequestTimeouts方法;suspend方法会cancel这些pendingSlotRequest,然后情况pendingSlotRequests的map
- registerSlotRequest方法会先执行checkDuplicateRequest判断是否有重复,没有重复的话,则将该slotRequest维护到pendingSlotRequests,然后调用internalRequestSlot进行分配,如果出现异常则从pendingSlotRequests中异常,然后抛出SlotManagerException
- checkSlotRequestTimeouts则会遍历pendingSlotRequests,然后根据slotRequest.getCreationTimestamp()及当前时间判断时间差是否大于等于slotRequestTimeout,已经超时的话,则会从pendingSlotRequests中移除该slotRequest,然后进行cancel,同时触发resourceActions.notifyAllocationFailure
小结
- SlotManagerConfiguration的getSlotRequestTimeout方法会从配置文件读取JobManagerOptions.SLOT_REQUEST_TIMEOUT;slot.request.timeout默认为5分钟
- SlotManager的构造器接收slotRequestTimeout参数;它维护了pendingSlotRequests的map;start方法会注册slotRequestTimeoutCheck,每隔slotRequestTimeout的时间调度一次,执行的是checkSlotRequestTimeouts方法;suspend方法会cancel这些pendingSlotRequest,然后情况pendingSlotRequests的map
- registerSlotRequest方法会先执行checkDuplicateRequest判断是否有重复,没有重复的话,则将该slotRequest维护到pendingSlotRequests,然后调用internalRequestSlot进行分配,如果出现异常则从pendingSlotRequests中异常,然后抛出SlotManagerException;checkSlotRequestTimeouts则会遍历pendingSlotRequests,然后根据slotRequest.getCreationTimestamp()及当前时间判断时间差是否大于等于slotRequestTimeout,已经超时的话,则会从pendingSlotRequests中移除该slotRequest,然后进行cancel,同时触发resourceActions.notifyAllocationFailure
doc
以上所述就是小编给大家介绍的《聊聊flink的slot.request.timeout配置》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!
猜你喜欢:- 聊聊flink的logback配置
- 聊聊 Nacos 配置隔离和分类的使用
- 聊聊flink的log.file配置 原 荐
- 聊聊flink的slot.idle.timeout配置
- 聊聊动态规划(2) -- 特征
- 聊聊动态规划(1) -- 概念
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
白话大数据与机器学习
高扬、卫峥、尹会生 / 机械工业出版社 / 2016-6 / 69
本书通俗易懂,有高中数学基础即可看懂,同时结合大量案例与漫画,将高度抽象的数学、算法与应用,与现实生活中的案例和事件一一做了关联,将源自生活的抽象还原出来,帮助读者理解后,又带领大家将这些抽象的规律与算法应用于实践,贴合读者需求。同时,本书不是割裂讲解大数据与机器学习的算法和应用,还讲解了其生态环境与关联内容,让读者更全面地知晓渊源与未来,是系统学习大数据与机器学习的不二之选: ·大数据产业......一起来看看 《白话大数据与机器学习》 这本书的介绍吧!