聊聊flink的AbstractNonHaServices

栏目: 编程工具 · 发布时间: 5年前

内容简介:本文主要研究一下flink的AbstractNonHaServicesflink-runtime_2.11-1.7.1-sources.jar!/org/apache/flink/runtime/highavailability/HighAvailabilityServices.javaflink-runtime_2.11-1.7.1-sources.jar!/org/apache/flink/runtime/highavailability/nonha/AbstractNonHaServices.java

本文主要研究一下flink的AbstractNonHaServices

HighAvailabilityServices

flink-runtime_2.11-1.7.1-sources.jar!/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java

public interface HighAvailabilityServices extends AutoCloseable {

    // ------------------------------------------------------------------------
    //  Constants
    // ------------------------------------------------------------------------

    /**
     * This UUID should be used when no proper leader election happens, but a simple
     * pre-configured leader is used. That is for example the case in non-highly-available
     * standalone setups.
     */
    UUID DEFAULT_LEADER_ID = new UUID(0, 0);

    /**
     * This JobID should be used to identify the old JobManager when using the
     * {@link HighAvailabilityServices}. With the new mode every JobMaster will have a
     * distinct JobID assigned.
     */
    JobID DEFAULT_JOB_ID = new JobID(0L, 0L);

    // ------------------------------------------------------------------------
    //  Services
    // ------------------------------------------------------------------------

    /**
     * Gets the leader retriever for the cluster's resource manager.
     */
    LeaderRetrievalService getResourceManagerLeaderRetriever();

    /**
     * Gets the leader retriever for the dispatcher. This leader retrieval service
     * is not always accessible.
     */
    LeaderRetrievalService getDispatcherLeaderRetriever();

    /**
     * Gets the leader retriever for the job JobMaster which is responsible for the given job
     *
     * @param jobID The identifier of the job.
     * @return Leader retrieval service to retrieve the job manager for the given job
     * @deprecated This method should only be used by the legacy code where the JobManager acts as the master.
     */
    @Deprecated
    LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID);

    /**
     * Gets the leader retriever for the job JobMaster which is responsible for the given job
     *
     * @param jobID The identifier of the job.
     * @param defaultJobManagerAddress JobManager address which will be returned by
     *                              a static leader retrieval service.
     * @return Leader retrieval service to retrieve the job manager for the given job
     */
    LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID, String defaultJobManagerAddress);

    LeaderRetrievalService getWebMonitorLeaderRetriever();

    /**
     * Gets the leader election service for the cluster's resource manager.
     *
     * @return Leader election service for the resource manager leader election
     */
    LeaderElectionService getResourceManagerLeaderElectionService();

    /**
     * Gets the leader election service for the cluster's dispatcher.
     *
     * @return Leader election service for the dispatcher leader election
     */
    LeaderElectionService getDispatcherLeaderElectionService();

    /**
     * Gets the leader election service for the given job.
     *
     * @param jobID The identifier of the job running the election.
     * @return Leader election service for the job manager leader election
     */
    LeaderElectionService getJobManagerLeaderElectionService(JobID jobID);

    LeaderElectionService getWebMonitorLeaderElectionService();

    /**
     * Gets the checkpoint recovery factory for the job manager
     *
     * @return Checkpoint recovery factory
     */
    CheckpointRecoveryFactory getCheckpointRecoveryFactory();

    /**
     * Gets the submitted job graph store for the job manager
     *
     * @return Submitted job graph store
     * @throws Exception if the submitted job graph store could not be created
     */
    SubmittedJobGraphStore getSubmittedJobGraphStore() throws Exception;

    /**
     * Gets the registry that holds information about whether jobs are currently running.
     *
     * @return Running job registry to retrieve running jobs
     */
    RunningJobsRegistry getRunningJobsRegistry() throws Exception;

    /**
     * Creates the BLOB store in which BLOBs are stored in a highly-available fashion.
     *
     * @return Blob store
     * @throws IOException if the blob store could not be created
     */
    BlobStore createBlobStore() throws IOException;

    // ------------------------------------------------------------------------
    //  Shutdown and Cleanup
    // ------------------------------------------------------------------------

    /**
     * Closes the high availability services, releasing all resources.
     * 
     * <p>This method <b>does not delete or clean up</b> any data stored in external stores
     * (file systems, ZooKeeper, etc). Another instance of the high availability
     * services will be able to recover the job.
     * 
     * <p>If an exception occurs during closing services, this method will attempt to
     * continue closing other services and report exceptions only after all services
     * have been attempted to be closed.
     *
     * @throws Exception Thrown, if an exception occurred while closing these services.
     */
    @Override
    void close() throws Exception;

    /**
     * Closes the high availability services (releasing all resources) and deletes
     * all data stored by these services in external stores.
     * 
     * <p>After this method was called, the any job or session that was managed by
     * these high availability services will be unrecoverable.
     * 
     * <p>If an exception occurs during cleanup, this method will attempt to
     * continue the cleanup and report exceptions only after all cleanup steps have
     * been attempted.
     * 
     * @throws Exception Thrown, if an exception occurred while closing these services
     *                   or cleaning up data stored by them.
     */
    void closeAndCleanupAllData() throws Exception;
}
  • HighAvailabilityServices定义了highly-available所需的各种services的get方法,它有两个直接子类,一个是ZooKeeperHaServices,一个是AbstractNonHaServices

AbstractNonHaServices

flink-runtime_2.11-1.7.1-sources.jar!/org/apache/flink/runtime/highavailability/nonha/AbstractNonHaServices.java

public abstract class AbstractNonHaServices implements HighAvailabilityServices {
    protected final Object lock = new Object();

    private final RunningJobsRegistry runningJobsRegistry;

    private final VoidBlobStore voidBlobStore;

    private boolean shutdown;

    public AbstractNonHaServices() {
        this.runningJobsRegistry = new StandaloneRunningJobsRegistry();
        this.voidBlobStore = new VoidBlobStore();

        shutdown = false;
    }

    // ----------------------------------------------------------------------
    // HighAvailabilityServices method implementations
    // ----------------------------------------------------------------------

    @Override
    public CheckpointRecoveryFactory getCheckpointRecoveryFactory() {
        synchronized (lock) {
            checkNotShutdown();

            return new StandaloneCheckpointRecoveryFactory();
        }
    }

    @Override
    public SubmittedJobGraphStore getSubmittedJobGraphStore() throws Exception {
        synchronized (lock) {
            checkNotShutdown();

            return new StandaloneSubmittedJobGraphStore();
        }
    }

    @Override
    public RunningJobsRegistry getRunningJobsRegistry() throws Exception {
        synchronized (lock) {
            checkNotShutdown();

            return runningJobsRegistry;
        }
    }

    @Override
    public BlobStore createBlobStore() throws IOException {
        synchronized (lock) {
            checkNotShutdown();

            return voidBlobStore;
        }
    }

    @Override
    public void close() throws Exception {
        synchronized (lock) {
            if (!shutdown) {
                shutdown = true;
            }
        }
    }

    @Override
    public void closeAndCleanupAllData() throws Exception {
        // this stores no data, so this method is the same as 'close()'
        close();
    }

    // ----------------------------------------------------------------------
    // Helper methods
    // ----------------------------------------------------------------------

    @GuardedBy("lock")
    protected void checkNotShutdown() {
        checkState(!shutdown, "high availability services are shut down");
    }

    protected boolean isShutDown() {
        return shutdown;
    }
}
  • AbstractNonHaServices实现了HighAvailabilityServices的getCheckpointRecoveryFactory、getSubmittedJobGraphStore、getRunningJobsRegistry、createBlobStore、close、closeAndCleanupAllData方法;它有两个子类,分别是EmbeddedHaServices及StandaloneHaServices

EmbeddedHaServices

flink-runtime_2.11-1.7.1-sources.jar!/org/apache/flink/runtime/highavailability/nonha/embedded/EmbeddedHaServices.java

public class EmbeddedHaServices extends AbstractNonHaServices {

    private final Executor executor;

    private final EmbeddedLeaderService resourceManagerLeaderService;

    private final EmbeddedLeaderService dispatcherLeaderService;

    private final HashMap<JobID, EmbeddedLeaderService> jobManagerLeaderServices;

    private final EmbeddedLeaderService webMonitorLeaderService;

    public EmbeddedHaServices(Executor executor) {
        this.executor = Preconditions.checkNotNull(executor);
        this.resourceManagerLeaderService = new EmbeddedLeaderService(executor);
        this.dispatcherLeaderService = new EmbeddedLeaderService(executor);
        this.jobManagerLeaderServices = new HashMap<>();
        this.webMonitorLeaderService = new EmbeddedLeaderService(executor);
    }

    // ------------------------------------------------------------------------
    //  services
    // ------------------------------------------------------------------------

    @Override
    public LeaderRetrievalService getResourceManagerLeaderRetriever() {
        return resourceManagerLeaderService.createLeaderRetrievalService();
    }

    @Override
    public LeaderRetrievalService getDispatcherLeaderRetriever() {
        return dispatcherLeaderService.createLeaderRetrievalService();
    }

    @Override
    public LeaderElectionService getResourceManagerLeaderElectionService() {
        return resourceManagerLeaderService.createLeaderElectionService();
    }

    @Override
    public LeaderElectionService getDispatcherLeaderElectionService() {
        return dispatcherLeaderService.createLeaderElectionService();
    }

    @Override
    public LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID) {
        checkNotNull(jobID);

        synchronized (lock) {
            checkNotShutdown();
            EmbeddedLeaderService service = getOrCreateJobManagerService(jobID);
            return service.createLeaderRetrievalService();
        }
    }

    @Override
    public LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID, String defaultJobManagerAddress) {
        return getJobManagerLeaderRetriever(jobID);
    }

    @Override
    public LeaderRetrievalService getWebMonitorLeaderRetriever() {
        return webMonitorLeaderService.createLeaderRetrievalService();
    }

    @Override
    public LeaderElectionService getJobManagerLeaderElectionService(JobID jobID) {
        checkNotNull(jobID);

        synchronized (lock) {
            checkNotShutdown();
            EmbeddedLeaderService service = getOrCreateJobManagerService(jobID);
            return service.createLeaderElectionService();
        }
    }

    @Override
    public LeaderElectionService getWebMonitorLeaderElectionService() {
        return webMonitorLeaderService.createLeaderElectionService();
    }

    // ------------------------------------------------------------------------
    // internal
    // ------------------------------------------------------------------------

    @GuardedBy("lock")
    private EmbeddedLeaderService getOrCreateJobManagerService(JobID jobID) {
        EmbeddedLeaderService service = jobManagerLeaderServices.get(jobID);
        if (service == null) {
            service = new EmbeddedLeaderService(executor);
            jobManagerLeaderServices.put(jobID, service);
        }
        return service;
    }

    // ------------------------------------------------------------------------
    //  shutdown
    // ------------------------------------------------------------------------

    @Override
    public void close() throws Exception {
        synchronized (lock) {
            if (!isShutDown()) {
                // stop all job manager leader services
                for (EmbeddedLeaderService service : jobManagerLeaderServices.values()) {
                    service.shutdown();
                }
                jobManagerLeaderServices.clear();

                resourceManagerLeaderService.shutdown();

                webMonitorLeaderService.shutdown();
            }

            super.close();
        }
    }
}
  • EmbeddedHaServices继承了AbstractNonHaServices,它是对HighAvailabilityServices接口在ResourceManager, JobManagers, TaskManagers运行在同一个进程的non-high-availability场景下的实现,FlinkMiniCluster使用的就是EmbeddedHaServices

StandaloneHaServices

flink-runtime_2.11-1.7.1-sources.jar!/org/apache/flink/runtime/highavailability/nonha/standalone/StandaloneHaServices.java

public class StandaloneHaServices extends AbstractNonHaServices {

    /** The constant name of the ResourceManager RPC endpoint */
    private static final String RESOURCE_MANAGER_RPC_ENDPOINT_NAME = "resource_manager";

    /** The fix address of the ResourceManager */
    private final String resourceManagerAddress;

    /** The fix address of the Dispatcher */
    private final String dispatcherAddress;

    /** The fix address of the JobManager */
    private final String jobManagerAddress;

    private final String webMonitorAddress;

    /**
     * Creates a new services class for the fix pre-defined leaders.
     *
     * @param resourceManagerAddress    The fix address of the ResourceManager
     * @param webMonitorAddress
     */
    public StandaloneHaServices(
            String resourceManagerAddress,
            String dispatcherAddress,
            String jobManagerAddress,
            String webMonitorAddress) {
        this.resourceManagerAddress = checkNotNull(resourceManagerAddress, "resourceManagerAddress");
        this.dispatcherAddress = checkNotNull(dispatcherAddress, "dispatcherAddress");
        this.jobManagerAddress = checkNotNull(jobManagerAddress, "jobManagerAddress");
        this.webMonitorAddress = checkNotNull(webMonitorAddress, webMonitorAddress);
    }

    // ------------------------------------------------------------------------
    //  Services
    // ------------------------------------------------------------------------

    @Override
    public LeaderRetrievalService getResourceManagerLeaderRetriever() {
        synchronized (lock) {
            checkNotShutdown();

            return new StandaloneLeaderRetrievalService(resourceManagerAddress, DEFAULT_LEADER_ID);
        }

    }

    @Override
    public LeaderRetrievalService getDispatcherLeaderRetriever() {
        synchronized (lock) {
            checkNotShutdown();

            return new StandaloneLeaderRetrievalService(dispatcherAddress, DEFAULT_LEADER_ID);
        }
    }

    @Override
    public LeaderElectionService getResourceManagerLeaderElectionService() {
        synchronized (lock) {
            checkNotShutdown();

            return new StandaloneLeaderElectionService();
        }
    }

    @Override
    public LeaderElectionService getDispatcherLeaderElectionService() {
        synchronized (lock) {
            checkNotShutdown();

            return new StandaloneLeaderElectionService();
        }
    }

    @Override
    public LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID) {
        synchronized (lock) {
            checkNotShutdown();

            return new StandaloneLeaderRetrievalService(jobManagerAddress, DEFAULT_LEADER_ID);
        }
    }

    @Override
    public LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID, String defaultJobManagerAddress) {
        synchronized (lock) {
            checkNotShutdown();

            return new StandaloneLeaderRetrievalService(defaultJobManagerAddress, DEFAULT_LEADER_ID);
        }
    }

    @Override
    public LeaderElectionService getJobManagerLeaderElectionService(JobID jobID) {
        synchronized (lock) {
            checkNotShutdown();

            return new StandaloneLeaderElectionService();
        }
    }

    @Override
    public LeaderRetrievalService getWebMonitorLeaderRetriever() {
        synchronized (lock) {
            checkNotShutdown();

            return new StandaloneLeaderRetrievalService(webMonitorAddress, DEFAULT_LEADER_ID);
        }
    }

    @Override
    public LeaderElectionService getWebMonitorLeaderElectionService() {
        synchronized (lock) {
            checkNotShutdown();

            return new StandaloneLeaderElectionService();
        }
    }

}
  • StandaloneHaServices继承了AbstractNonHaServices,它是对HighAvailabilityServices接口在non-high-availability场景下的实现,ClusterEntrypoint在highAvailabilityMode为NONE的时候使用的是StandaloneHaServices

小结

  • HighAvailabilityServices定义了highly-available所需的各种services的get方法,它有两个直接子类,一个是ZooKeeperHaServices,一个是AbstractNonHaServices
  • AbstractNonHaServices实现了HighAvailabilityServices的getCheckpointRecoveryFactory、getSubmittedJobGraphStore、getRunningJobsRegistry、createBlobStore、close、closeAndCleanupAllData方法;它有两个子类,分别是EmbeddedHaServices及StandaloneHaServices
  • EmbeddedHaServices继承了AbstractNonHaServices,它是对HighAvailabilityServices接口在ResourceManager, JobManagers, TaskManagers运行在同一个进程的non-high-availability场景下的实现,FlinkMiniCluster使用的就是EmbeddedHaServices;StandaloneHaServices继承了AbstractNonHaServices,它是对HighAvailabilityServices接口在non-high-availability场景下的实现,ClusterEntrypoint在highAvailabilityMode为NONE的时候使用的是StandaloneHaServices

doc


以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

Open Data Structures

Open Data Structures

Pat Morin / AU Press / 2013-6 / USD 29.66

Offered as an introduction to the field of data structures and algorithms, Open Data Structures covers the implementation and analysis of data structures for sequences (lists), queues, priority queues......一起来看看 《Open Data Structures》 这本书的介绍吧!

RGB转16进制工具
RGB转16进制工具

RGB HEX 互转工具

图片转BASE64编码
图片转BASE64编码

在线图片转Base64编码工具

正则表达式在线测试
正则表达式在线测试

正则表达式在线测试