内容简介:本文主要研究一下flink JobManager的High Availabilityflink-runtime_2.11-1.7.1-sources.jar!/org/apache/flink/runtime/jobmanager/HighAvailabilityMode.javaflink-core-1.7.1-sources.jar!/org/apache/flink/configuration/HighAvailabilityOptions.java
序
本文主要研究一下flink JobManager的High Availability
配置
flink-conf.yaml
high-availability: zookeeper high-availability.zookeeper.quorum: zookeeper:2181 high-availability.zookeeper.path.root: /flink high-availability.cluster-id: /cluster_one # important: customize per cluster high-availability.storageDir: file:///share
- high-availability的可选值为NONE或者zookeeper;high-availability.zookeeper.quorum用于指定zookeeper的peers;high-availability.zookeeper.path.root用于指定在zookeeper的root node路径;high-availability.cluster-id用于指定当前cluster的node名称,该cluster node位于root node下面;high-availability.storageDir用于指定JobManager metadata的存储路径
masters文件
localhost:8081 localhost:8082
- masters文件用于指定jobmanager的地址
HighAvailabilityMode
flink-runtime_2.11-1.7.1-sources.jar!/org/apache/flink/runtime/jobmanager/HighAvailabilityMode.java
public enum HighAvailabilityMode {
NONE(false),
ZOOKEEPER(true),
FACTORY_CLASS(true);
private final boolean haActive;
HighAvailabilityMode(boolean haActive) {
this.haActive = haActive;
}
/**
* Return the configured {@link HighAvailabilityMode}.
*
* @param config The config to parse
* @return Configured recovery mode or {@link HighAvailabilityMode#NONE} if not
* configured.
*/
public static HighAvailabilityMode fromConfig(Configuration config) {
String haMode = config.getValue(HighAvailabilityOptions.HA_MODE);
if (haMode == null) {
return HighAvailabilityMode.NONE;
} else if (haMode.equalsIgnoreCase(ConfigConstants.DEFAULT_RECOVERY_MODE)) {
// Map old default to new default
return HighAvailabilityMode.NONE;
} else {
try {
return HighAvailabilityMode.valueOf(haMode.toUpperCase());
} catch (IllegalArgumentException e) {
return FACTORY_CLASS;
}
}
}
/**
* Returns true if the defined recovery mode supports high availability.
*
* @param configuration Configuration which contains the recovery mode
* @return true if high availability is supported by the recovery mode, otherwise false
*/
public static boolean isHighAvailabilityModeActivated(Configuration configuration) {
HighAvailabilityMode mode = fromConfig(configuration);
return mode.haActive;
}
}
- HighAvailabilityMode有三个枚举,分别是NONE、ZOOKEEPER、FACTORY_CLASS;这些枚举有一个属性haActive,用于表示是否支持HighAvailability
HighAvailabilityOptions
flink-core-1.7.1-sources.jar!/org/apache/flink/configuration/HighAvailabilityOptions.java
@PublicEvolving
@ConfigGroups(groups = {
@ConfigGroup(name = "HighAvailabilityZookeeper", keyPrefix = "high-availability.zookeeper")
})
public class HighAvailabilityOptions {
// ------------------------------------------------------------------------
// Required High Availability Options
// ------------------------------------------------------------------------
/**
* Defines high-availability mode used for the cluster execution.
* A value of "NONE" signals no highly available setup.
* To enable high-availability, set this mode to "ZOOKEEPER".
* Can also be set to FQN of HighAvailability factory class.
*/
@Documentation.CommonOption(position = Documentation.CommonOption.POSITION_HIGH_AVAILABILITY)
public static final ConfigOption<String> HA_MODE =
key("high-availability")
.defaultValue("NONE")
.withDeprecatedKeys("recovery.mode")
.withDescription("Defines high-availability mode used for the cluster execution." +
" To enable high-availability, set this mode to \"ZOOKEEPER\" or specify FQN of factory class.");
/**
* The ID of the Flink cluster, used to separate multiple Flink clusters
* Needs to be set for standalone clusters, is automatically inferred in YARN and Mesos.
*/
public static final ConfigOption<String> HA_CLUSTER_ID =
key("high-availability.cluster-id")
.defaultValue("/default")
.withDeprecatedKeys("high-availability.zookeeper.path.namespace", "recovery.zookeeper.path.namespace")
.withDescription("The ID of the Flink cluster, used to separate multiple Flink clusters from each other." +
" Needs to be set for standalone clusters but is automatically inferred in YARN and Mesos.");
/**
* File system path (URI) where Flink persists metadata in high-availability setups.
*/
@Documentation.CommonOption(position = Documentation.CommonOption.POSITION_HIGH_AVAILABILITY)
public static final ConfigOption<String> HA_STORAGE_PATH =
key("high-availability.storageDir")
.noDefaultValue()
.withDeprecatedKeys("high-availability.zookeeper.storageDir", "recovery.zookeeper.storageDir")
.withDescription("File system path (URI) where Flink persists metadata in high-availability setups.");
// ------------------------------------------------------------------------
// Recovery Options
// ------------------------------------------------------------------------
/**
* Optional port (range) used by the job manager in high-availability mode.
*/
public static final ConfigOption<String> HA_JOB_MANAGER_PORT_RANGE =
key("high-availability.jobmanager.port")
.defaultValue("0")
.withDeprecatedKeys("recovery.jobmanager.port")
.withDescription("Optional port (range) used by the job manager in high-availability mode.");
/**
* The time before a JobManager after a fail over recovers the current jobs.
*/
public static final ConfigOption<String> HA_JOB_DELAY =
key("high-availability.job.delay")
.noDefaultValue()
.withDeprecatedKeys("recovery.job.delay")
.withDescription("The time before a JobManager after a fail over recovers the current jobs.");
// ------------------------------------------------------------------------
// ZooKeeper Options
// ------------------------------------------------------------------------
/**
* The ZooKeeper quorum to use, when running Flink in a high-availability mode with ZooKeeper.
*/
public static final ConfigOption<String> HA_ZOOKEEPER_QUORUM =
key("high-availability.zookeeper.quorum")
.noDefaultValue()
.withDeprecatedKeys("recovery.zookeeper.quorum")
.withDescription("The ZooKeeper quorum to use, when running Flink in a high-availability mode with ZooKeeper.");
/**
* The root path under which Flink stores its entries in ZooKeeper.
*/
public static final ConfigOption<String> HA_ZOOKEEPER_ROOT =
key("high-availability.zookeeper.path.root")
.defaultValue("/flink")
.withDeprecatedKeys("recovery.zookeeper.path.root")
.withDescription("The root path under which Flink stores its entries in ZooKeeper.");
public static final ConfigOption<String> HA_ZOOKEEPER_LATCH_PATH =
key("high-availability.zookeeper.path.latch")
.defaultValue("/leaderlatch")
.withDeprecatedKeys("recovery.zookeeper.path.latch")
.withDescription("Defines the znode of the leader latch which is used to elect the leader.");
/** ZooKeeper root path (ZNode) for job graphs. */
public static final ConfigOption<String> HA_ZOOKEEPER_JOBGRAPHS_PATH =
key("high-availability.zookeeper.path.jobgraphs")
.defaultValue("/jobgraphs")
.withDeprecatedKeys("recovery.zookeeper.path.jobgraphs")
.withDescription("ZooKeeper root path (ZNode) for job graphs");
public static final ConfigOption<String> HA_ZOOKEEPER_LEADER_PATH =
key("high-availability.zookeeper.path.leader")
.defaultValue("/leader")
.withDeprecatedKeys("recovery.zookeeper.path.leader")
.withDescription("Defines the znode of the leader which contains the URL to the leader and the current" +
" leader session ID.");
/** ZooKeeper root path (ZNode) for completed checkpoints. */
public static final ConfigOption<String> HA_ZOOKEEPER_CHECKPOINTS_PATH =
key("high-availability.zookeeper.path.checkpoints")
.defaultValue("/checkpoints")
.withDeprecatedKeys("recovery.zookeeper.path.checkpoints")
.withDescription("ZooKeeper root path (ZNode) for completed checkpoints.");
/** ZooKeeper root path (ZNode) for checkpoint counters. */
public static final ConfigOption<String> HA_ZOOKEEPER_CHECKPOINT_COUNTER_PATH =
key("high-availability.zookeeper.path.checkpoint-counter")
.defaultValue("/checkpoint-counter")
.withDeprecatedKeys("recovery.zookeeper.path.checkpoint-counter")
.withDescription("ZooKeeper root path (ZNode) for checkpoint counters.");
/** ZooKeeper root path (ZNode) for Mesos workers. */
@PublicEvolving
public static final ConfigOption<String> HA_ZOOKEEPER_MESOS_WORKERS_PATH =
key("high-availability.zookeeper.path.mesos-workers")
.defaultValue("/mesos-workers")
.withDeprecatedKeys("recovery.zookeeper.path.mesos-workers")
.withDescription(Description.builder()
.text("The ZooKeeper root path for persisting the Mesos worker information.")
.build());
// ------------------------------------------------------------------------
// ZooKeeper Client Settings
// ------------------------------------------------------------------------
public static final ConfigOption<Integer> ZOOKEEPER_SESSION_TIMEOUT =
key("high-availability.zookeeper.client.session-timeout")
.defaultValue(60000)
.withDeprecatedKeys("recovery.zookeeper.client.session-timeout")
.withDescription("Defines the session timeout for the ZooKeeper session in ms.");
public static final ConfigOption<Integer> ZOOKEEPER_CONNECTION_TIMEOUT =
key("high-availability.zookeeper.client.connection-timeout")
.defaultValue(15000)
.withDeprecatedKeys("recovery.zookeeper.client.connection-timeout")
.withDescription("Defines the connection timeout for ZooKeeper in ms.");
public static final ConfigOption<Integer> ZOOKEEPER_RETRY_WAIT =
key("high-availability.zookeeper.client.retry-wait")
.defaultValue(5000)
.withDeprecatedKeys("recovery.zookeeper.client.retry-wait")
.withDescription("Defines the pause between consecutive retries in ms.");
public static final ConfigOption<Integer> ZOOKEEPER_MAX_RETRY_ATTEMPTS =
key("high-availability.zookeeper.client.max-retry-attempts")
.defaultValue(3)
.withDeprecatedKeys("recovery.zookeeper.client.max-retry-attempts")
.withDescription("Defines the number of connection retries before the client gives up.");
public static final ConfigOption<String> ZOOKEEPER_RUNNING_JOB_REGISTRY_PATH =
key("high-availability.zookeeper.path.running-registry")
.defaultValue("/running_job_registry/");
public static final ConfigOption<String> ZOOKEEPER_CLIENT_ACL =
key("high-availability.zookeeper.client.acl")
.defaultValue("open")
.withDescription("Defines the ACL (open|creator) to be configured on ZK node. The configuration value can be" +
" set to “creator” if the ZooKeeper server configuration has the “authProvider” property mapped to use" +
" SASLAuthenticationProvider and the cluster is configured to run in secure mode (Kerberos).");
// ------------------------------------------------------------------------
/** Not intended to be instantiated. */
private HighAvailabilityOptions() {}
}
- HighAvailabilityOptions定义了前缀为high-availability.zookeeper的配置项
HighAvailabilityServicesUtils
flink-runtime_2.11-1.7.1-sources.jar!/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtils.java
public class HighAvailabilityServicesUtils {
public static HighAvailabilityServices createAvailableOrEmbeddedServices(
Configuration config,
Executor executor) throws Exception {
HighAvailabilityMode highAvailabilityMode = LeaderRetrievalUtils.getRecoveryMode(config);
switch (highAvailabilityMode) {
case NONE:
return new EmbeddedHaServices(executor);
case ZOOKEEPER:
BlobStoreService blobStoreService = BlobUtils.createBlobStoreFromConfig(config);
return new ZooKeeperHaServices(
ZooKeeperUtils.startCuratorFramework(config),
executor,
config,
blobStoreService);
case FACTORY_CLASS:
return createCustomHAServices(config, executor);
default:
throw new Exception("High availability mode " + highAvailabilityMode + " is not supported.");
}
}
public static HighAvailabilityServices createHighAvailabilityServices(
Configuration configuration,
Executor executor,
AddressResolution addressResolution) throws Exception {
HighAvailabilityMode highAvailabilityMode = LeaderRetrievalUtils.getRecoveryMode(configuration);
switch (highAvailabilityMode) {
case NONE:
final Tuple2<String, Integer> hostnamePort = getJobManagerAddress(configuration);
final String jobManagerRpcUrl = AkkaRpcServiceUtils.getRpcUrl(
hostnamePort.f0,
hostnamePort.f1,
JobMaster.JOB_MANAGER_NAME,
addressResolution,
configuration);
final String resourceManagerRpcUrl = AkkaRpcServiceUtils.getRpcUrl(
hostnamePort.f0,
hostnamePort.f1,
ResourceManager.RESOURCE_MANAGER_NAME,
addressResolution,
configuration);
final String dispatcherRpcUrl = AkkaRpcServiceUtils.getRpcUrl(
hostnamePort.f0,
hostnamePort.f1,
Dispatcher.DISPATCHER_NAME,
addressResolution,
configuration);
final String address = checkNotNull(configuration.getString(RestOptions.ADDRESS),
"%s must be set",
RestOptions.ADDRESS.key());
final int port = configuration.getInteger(RestOptions.PORT);
final boolean enableSSL = SSLUtils.isRestSSLEnabled(configuration);
final String protocol = enableSSL ? "https://" : "http://";
return new StandaloneHaServices(
resourceManagerRpcUrl,
dispatcherRpcUrl,
jobManagerRpcUrl,
String.format("%s%s:%s", protocol, address, port));
case ZOOKEEPER:
BlobStoreService blobStoreService = BlobUtils.createBlobStoreFromConfig(configuration);
return new ZooKeeperHaServices(
ZooKeeperUtils.startCuratorFramework(configuration),
executor,
configuration,
blobStoreService);
case FACTORY_CLASS:
return createCustomHAServices(configuration, executor);
default:
throw new Exception("Recovery mode " + highAvailabilityMode + " is not supported.");
}
}
/**
* Returns the JobManager's hostname and port extracted from the given
* {@link Configuration}.
*
* @param configuration Configuration to extract the JobManager's address from
* @return The JobManager's hostname and port
* @throws ConfigurationException if the JobManager's address cannot be extracted from the configuration
*/
public static Tuple2<String, Integer> getJobManagerAddress(Configuration configuration) throws ConfigurationException {
final String hostname = configuration.getString(JobManagerOptions.ADDRESS);
final int port = configuration.getInteger(JobManagerOptions.PORT);
if (hostname == null) {
throw new ConfigurationException("Config parameter '" + JobManagerOptions.ADDRESS +
"' is missing (hostname/address of JobManager to connect to).");
}
if (port <= 0 || port >= 65536) {
throw new ConfigurationException("Invalid value for '" + JobManagerOptions.PORT +
"' (port of the JobManager actor system) : " + port +
". it must be greater than 0 and less than 65536.");
}
return Tuple2.of(hostname, port);
}
private static HighAvailabilityServices createCustomHAServices(Configuration config, Executor executor) throws FlinkException {
final ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
final String haServicesClassName = config.getString(HighAvailabilityOptions.HA_MODE);
final HighAvailabilityServicesFactory highAvailabilityServicesFactory;
try {
highAvailabilityServicesFactory = InstantiationUtil.instantiate(
haServicesClassName,
HighAvailabilityServicesFactory.class,
classLoader);
} catch (Exception e) {
throw new FlinkException(
String.format(
"Could not instantiate the HighAvailabilityServicesFactory '%s'. Please make sure that this class is on your class path.",
haServicesClassName),
e);
}
try {
return highAvailabilityServicesFactory.createHAServices(config, executor);
} catch (Exception e) {
throw new FlinkException(
String.format(
"Could not create the ha services from the instantiated HighAvailabilityServicesFactory %s.",
haServicesClassName),
e);
}
}
/**
* Enum specifying whether address resolution should be tried or not when creating the
* {@link HighAvailabilityServices}.
*/
public enum AddressResolution {
TRY_ADDRESS_RESOLUTION,
NO_ADDRESS_RESOLUTION
}
}
- HighAvailabilityServicesUtils提供了创建HighAvailabilityServices的静态方法,这些方法有createAvailableOrEmbeddedServices、createHighAvailabilityServices、createCustomHAServices
- 其中createAvailableOrEmbeddedServices方法主要是给FlinkMiniCluster使用;createHighAvailabilityServices方法主要是给ClusterEntrypoint使用,它在highAvailabilityMode为NONE的时候创建的是StandaloneHaServices,在highAvailabilityMode为ZOOKEEPER创建的是ZooKeeperHaServices,在highAvailabilityMode为FACTORY_CLASS的时候使用createCustomHAServices方法来创建
- HighAvailabilityServicesUtils还提供了getJobManagerAddress静态方法,用于获取JobManager的hostname及port
HighAvailabilityServices
flink-runtime_2.11-1.7.1-sources.jar!/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java
/**
* The HighAvailabilityServices give access to all services needed for a highly-available
* setup. In particular, the services provide access to highly available storage and
* registries, as well as distributed counters and leader election.
*
* <ul>
* <li>ResourceManager leader election and leader retrieval</li>
* <li>JobManager leader election and leader retrieval</li>
* <li>Persistence for checkpoint metadata</li>
* <li>Registering the latest completed checkpoint(s)</li>
* <li>Persistence for the BLOB store</li>
* <li>Registry that marks a job's status</li>
* <li>Naming of RPC endpoints</li>
* </ul>
*/
public interface HighAvailabilityServices extends AutoCloseable {
// ------------------------------------------------------------------------
// Constants
// ------------------------------------------------------------------------
/**
* This UUID should be used when no proper leader election happens, but a simple
* pre-configured leader is used. That is for example the case in non-highly-available
* standalone setups.
*/
UUID DEFAULT_LEADER_ID = new UUID(0, 0);
/**
* This JobID should be used to identify the old JobManager when using the
* {@link HighAvailabilityServices}. With the new mode every JobMaster will have a
* distinct JobID assigned.
*/
JobID DEFAULT_JOB_ID = new JobID(0L, 0L);
// ------------------------------------------------------------------------
// Services
// ------------------------------------------------------------------------
/**
* Gets the leader retriever for the cluster's resource manager.
*/
LeaderRetrievalService getResourceManagerLeaderRetriever();
/**
* Gets the leader retriever for the dispatcher. This leader retrieval service
* is not always accessible.
*/
LeaderRetrievalService getDispatcherLeaderRetriever();
/**
* Gets the leader retriever for the job JobMaster which is responsible for the given job
*
* @param jobID The identifier of the job.
* @return Leader retrieval service to retrieve the job manager for the given job
* @deprecated This method should only be used by the legacy code where the JobManager acts as the master.
*/
@Deprecated
LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID);
/**
* Gets the leader retriever for the job JobMaster which is responsible for the given job
*
* @param jobID The identifier of the job.
* @param defaultJobManagerAddress JobManager address which will be returned by
* a static leader retrieval service.
* @return Leader retrieval service to retrieve the job manager for the given job
*/
LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID, String defaultJobManagerAddress);
LeaderRetrievalService getWebMonitorLeaderRetriever();
/**
* Gets the leader election service for the cluster's resource manager.
*
* @return Leader election service for the resource manager leader election
*/
LeaderElectionService getResourceManagerLeaderElectionService();
/**
* Gets the leader election service for the cluster's dispatcher.
*
* @return Leader election service for the dispatcher leader election
*/
LeaderElectionService getDispatcherLeaderElectionService();
/**
* Gets the leader election service for the given job.
*
* @param jobID The identifier of the job running the election.
* @return Leader election service for the job manager leader election
*/
LeaderElectionService getJobManagerLeaderElectionService(JobID jobID);
LeaderElectionService getWebMonitorLeaderElectionService();
/**
* Gets the checkpoint recovery factory for the job manager
*
* @return Checkpoint recovery factory
*/
CheckpointRecoveryFactory getCheckpointRecoveryFactory();
/**
* Gets the submitted job graph store for the job manager
*
* @return Submitted job graph store
* @throws Exception if the submitted job graph store could not be created
*/
SubmittedJobGraphStore getSubmittedJobGraphStore() throws Exception;
/**
* Gets the registry that holds information about whether jobs are currently running.
*
* @return Running job registry to retrieve running jobs
*/
RunningJobsRegistry getRunningJobsRegistry() throws Exception;
/**
* Creates the BLOB store in which BLOBs are stored in a highly-available fashion.
*
* @return Blob store
* @throws IOException if the blob store could not be created
*/
BlobStore createBlobStore() throws IOException;
// ------------------------------------------------------------------------
// Shutdown and Cleanup
// ------------------------------------------------------------------------
/**
* Closes the high availability services, releasing all resources.
*
* <p>This method <b>does not delete or clean up</b> any data stored in external stores
* (file systems, ZooKeeper, etc). Another instance of the high availability
* services will be able to recover the job.
*
* <p>If an exception occurs during closing services, this method will attempt to
* continue closing other services and report exceptions only after all services
* have been attempted to be closed.
*
* @throws Exception Thrown, if an exception occurred while closing these services.
*/
@Override
void close() throws Exception;
/**
* Closes the high availability services (releasing all resources) and deletes
* all data stored by these services in external stores.
*
* <p>After this method was called, the any job or session that was managed by
* these high availability services will be unrecoverable.
*
* <p>If an exception occurs during cleanup, this method will attempt to
* continue the cleanup and report exceptions only after all cleanup steps have
* been attempted.
*
* @throws Exception Thrown, if an exception occurred while closing these services
* or cleaning up data stored by them.
*/
void closeAndCleanupAllData() throws Exception;
}
- HighAvailabilityServices定义了highly-available所需的各种services的get方法
ZooKeeperHaServices
flink-runtime_2.11-1.7.1-sources.jar!/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperHaServices.java
/**
* An implementation of the {@link HighAvailabilityServices} using Apache ZooKeeper.
* The services store data in ZooKeeper's nodes as illustrated by teh following tree structure:
*
* <pre>
* /flink
* +/cluster_id_1/resource_manager_lock
* | |
* | +/job-id-1/job_manager_lock
* | | /checkpoints/latest
* | | /latest-1
* | | /latest-2
* | |
* | +/job-id-2/job_manager_lock
* |
* +/cluster_id_2/resource_manager_lock
* |
* +/job-id-1/job_manager_lock
* |/checkpoints/latest
* | /latest-1
* |/persisted_job_graph
* </pre>
*
* <p>The root path "/flink" is configurable via the option {@link HighAvailabilityOptions#HA_ZOOKEEPER_ROOT}.
* This makes sure Flink stores its data under specific subtrees in ZooKeeper, for example to
* accommodate specific permission.
*
* <p>The "cluster_id" part identifies the data stored for a specific Flink "cluster".
* This "cluster" can be either a standalone or containerized Flink cluster, or it can be job
* on a framework like YARN or Mesos (in a "per-job-cluster" mode).
*
* <p>In case of a "per-job-cluster" on YARN or Mesos, the cluster-id is generated and configured
* automatically by the client or dispatcher that submits the Job to YARN or Mesos.
*
* <p>In the case of a standalone cluster, that cluster-id needs to be configured via
* {@link HighAvailabilityOptions#HA_CLUSTER_ID}. All nodes with the same cluster id will join the same
* cluster and participate in the execution of the same set of jobs.
*/
public class ZooKeeperHaServices implements HighAvailabilityServices {
private static final Logger LOG = LoggerFactory.getLogger(ZooKeeperHaServices.class);
private static final String RESOURCE_MANAGER_LEADER_PATH = "/resource_manager_lock";
private static final String DISPATCHER_LEADER_PATH = "/dispatcher_lock";
private static final String JOB_MANAGER_LEADER_PATH = "/job_manager_lock";
private static final String REST_SERVER_LEADER_PATH = "/rest_server_lock";
// ------------------------------------------------------------------------
/** The ZooKeeper client to use */
private final CuratorFramework client;
/** The executor to run ZooKeeper callbacks on */
private final Executor executor;
/** The runtime configuration */
private final Configuration configuration;
/** The zookeeper based running jobs registry */
private final RunningJobsRegistry runningJobsRegistry;
/** Store for arbitrary blobs */
private final BlobStoreService blobStoreService;
public ZooKeeperHaServices(
CuratorFramework client,
Executor executor,
Configuration configuration,
BlobStoreService blobStoreService) {
this.client = checkNotNull(client);
this.executor = checkNotNull(executor);
this.configuration = checkNotNull(configuration);
this.runningJobsRegistry = new ZooKeeperRunningJobsRegistry(client, configuration);
this.blobStoreService = checkNotNull(blobStoreService);
}
// ------------------------------------------------------------------------
// Services
// ------------------------------------------------------------------------
@Override
public LeaderRetrievalService getResourceManagerLeaderRetriever() {
return ZooKeeperUtils.createLeaderRetrievalService(client, configuration, RESOURCE_MANAGER_LEADER_PATH);
}
@Override
public LeaderRetrievalService getDispatcherLeaderRetriever() {
return ZooKeeperUtils.createLeaderRetrievalService(client, configuration, DISPATCHER_LEADER_PATH);
}
@Override
public LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID) {
return ZooKeeperUtils.createLeaderRetrievalService(client, configuration, getPathForJobManager(jobID));
}
@Override
public LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID, String defaultJobManagerAddress) {
return getJobManagerLeaderRetriever(jobID);
}
@Override
public LeaderRetrievalService getWebMonitorLeaderRetriever() {
return ZooKeeperUtils.createLeaderRetrievalService(client, configuration, REST_SERVER_LEADER_PATH);
}
@Override
public LeaderElectionService getResourceManagerLeaderElectionService() {
return ZooKeeperUtils.createLeaderElectionService(client, configuration, RESOURCE_MANAGER_LEADER_PATH);
}
@Override
public LeaderElectionService getDispatcherLeaderElectionService() {
return ZooKeeperUtils.createLeaderElectionService(client, configuration, DISPATCHER_LEADER_PATH);
}
@Override
public LeaderElectionService getJobManagerLeaderElectionService(JobID jobID) {
return ZooKeeperUtils.createLeaderElectionService(client, configuration, getPathForJobManager(jobID));
}
@Override
public LeaderElectionService getWebMonitorLeaderElectionService() {
return ZooKeeperUtils.createLeaderElectionService(client, configuration, REST_SERVER_LEADER_PATH);
}
@Override
public CheckpointRecoveryFactory getCheckpointRecoveryFactory() {
return new ZooKeeperCheckpointRecoveryFactory(client, configuration, executor);
}
@Override
public SubmittedJobGraphStore getSubmittedJobGraphStore() throws Exception {
return ZooKeeperUtils.createSubmittedJobGraphs(client, configuration);
}
@Override
public RunningJobsRegistry getRunningJobsRegistry() {
return runningJobsRegistry;
}
@Override
public BlobStore createBlobStore() throws IOException {
return blobStoreService;
}
// ------------------------------------------------------------------------
// Shutdown
// ------------------------------------------------------------------------
@Override
public void close() throws Exception {
Throwable exception = null;
try {
blobStoreService.close();
} catch (Throwable t) {
exception = t;
}
internalClose();
if (exception != null) {
ExceptionUtils.rethrowException(exception, "Could not properly close the ZooKeeperHaServices.");
}
}
@Override
public void closeAndCleanupAllData() throws Exception {
LOG.info("Close and clean up all data for ZooKeeperHaServices.");
Throwable exception = null;
try {
blobStoreService.closeAndCleanupAllData();
} catch (Throwable t) {
exception = t;
}
internalClose();
if (exception != null) {
ExceptionUtils.rethrowException(exception, "Could not properly close and clean up all data of ZooKeeperHaServices.");
}
}
/**
* Closes components which don't distinguish between close and closeAndCleanupAllData
*/
private void internalClose() {
client.close();
}
// ------------------------------------------------------------------------
// Utilities
// ------------------------------------------------------------------------
private static String getPathForJobManager(final JobID jobID) {
return "/" + jobID + JOB_MANAGER_LEADER_PATH;
}
}
- ZooKeeperHaServices实现了HighAvailabilityServices接口,它通过ZooKeeperUtils的各种create方法来创建所需的service,比如ZooKeeperUtils.createLeaderRetrievalService、ZooKeeperUtils.createLeaderElectionService、ZooKeeperUtils.createSubmittedJobGraphs
JobClient.submitJob
flink-runtime_2.11-1.7.1-sources.jar!/org/apache/flink/runtime/client/JobClient.java
public class JobClient {
private static final Logger LOG = LoggerFactory.getLogger(JobClient.class);
//......
/**
* Submits a job to a Flink cluster (non-blocking) and returns a JobListeningContext which can be
* passed to {@code awaitJobResult} to get the result of the submission.
* @return JobListeningContext which may be used to retrieve the JobExecutionResult via
* {@code awaitJobResult(JobListeningContext context)}.
*/
public static JobListeningContext submitJob(
ActorSystem actorSystem,
Configuration config,
HighAvailabilityServices highAvailabilityServices,
JobGraph jobGraph,
FiniteDuration timeout,
boolean sysoutLogUpdates,
ClassLoader classLoader) {
checkNotNull(actorSystem, "The actorSystem must not be null.");
checkNotNull(highAvailabilityServices, "The high availability services must not be null.");
checkNotNull(jobGraph, "The jobGraph must not be null.");
checkNotNull(timeout, "The timeout must not be null.");
// for this job, we create a proxy JobClientActor that deals with all communication with
// the JobManager. It forwards the job submission, checks the success/failure responses, logs
// update messages, watches for disconnect between client and JobManager, ...
Props jobClientActorProps = JobSubmissionClientActor.createActorProps(
highAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID),
timeout,
sysoutLogUpdates,
config);
ActorRef jobClientActor = actorSystem.actorOf(jobClientActorProps);
Future<Object> submissionFuture = Patterns.ask(
jobClientActor,
new JobClientMessages.SubmitJobAndWait(jobGraph),
new Timeout(AkkaUtils.INF_TIMEOUT()));
return new JobListeningContext(
jobGraph.getJobID(),
submissionFuture,
jobClientActor,
timeout,
classLoader,
highAvailabilityServices);
}
//......
}
- 像JobClient.submitJob方法就使用到了highAvailabilityServices.getJobManagerLeaderRetriever方法来获取JobManagerLeader的地址,用于提交job
小结
- HighAvailabilityMode有三个枚举,分别是NONE、ZOOKEEPER、FACTORY_CLASS;这些枚举有一个属性haActive,用于表示是否支持HighAvailability;HighAvailabilityOptions定义了前缀为high-availability.zookeeper的配置项
- HighAvailabilityServicesUtils提供了创建HighAvailabilityServices的静态方法,这些方法有createAvailableOrEmbeddedServices、createHighAvailabilityServices、createCustomHAServices;其中createAvailableOrEmbeddedServices方法主要是给FlinkMiniCluster使用;createHighAvailabilityServices方法主要是给ClusterEntrypoint使用,它在highAvailabilityMode为NONE的时候创建的是StandaloneHaServices,在highAvailabilityMode为ZOOKEEPER创建的是ZooKeeperHaServices,在highAvailabilityMode为FACTORY_CLASS的时候使用createCustomHAServices方法来创建
- HighAvailabilityServices定义了highly-available所需的各种services的get方法;ZooKeeperHaServices实现了HighAvailabilityServices接口,它通过ZooKeeperUtils的各种create方法来创建所需的service,比如ZooKeeperUtils.createLeaderRetrievalService、ZooKeeperUtils.createLeaderElectionService、ZooKeeperUtils.createSubmittedJobGraphs;像JobClient.submitJob方法就使用到了highAvailabilityServices.getJobManagerLeaderRetriever方法来获取JobManagerLeader的地址,用于提交job
doc
以上所述就是小编给大家介绍的《聊聊flink JobManager的High Availability》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!
猜你喜欢:本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
Head First JavaScript Programming
Eric T. Freeman、Elisabeth Robson / O'Reilly Media / 2014-4-10 / USD 49.99
This brain-friendly guide teaches you everything from JavaScript language fundamentals to advanced topics, including objects, functions, and the browser’s document object model. You won’t just be read......一起来看看 《Head First JavaScript Programming》 这本书的介绍吧!
XML 在线格式化
在线 XML 格式化压缩工具
Markdown 在线编辑器
Markdown 在线编辑器