【zookeeper源码】启动流程详解

栏目: 编程工具 · 发布时间: 5年前

内容简介:zookeeper启动类的位置在第一步就是要加载配置文件,我们来看这里主要就是把

zookeeper启动类的位置在 org.apache.zookeeper.server.ZooKeeperServerMain ,没错,找到它,并运行Main方法,即可启动zookeeper服务器。

请注意,在笔者的环境中只启动了一个zookeeper服务器,所以它并不是一个集群环境。

一、加载配置

第一步就是要加载配置文件,我们来看 initializeAndRun 方法。

protected void initializeAndRun(String[] args)throws ConfigException, IOException{
	ServerConfig config = new ServerConfig();
	if (args.length == 1) {
		config.parse(args[0]);
	} else {
		config.parse(conf);
	}
	runFromConfig(config);
}
复制代码

这里主要就是把 zoo.cfg 中的配置加载到ServerConfig对象中,过程比较简单,不再赘述。我们先看几个简单的配置项含义。

配置 含义
clientPort 对外服务端口,一般2181
dataDir 存储快照文件的目录,默认情况下,事务日志文件也会放在这
tickTime ZK中的一个时间单元。ZK中所有时间都是以这个时间单元为基础,进行整数倍配置
minSessionTimeout maxSessionTimeout Session超时时间,默认是2 tickTime ~ 20 tickTime 之间
preAllocSize 预先开辟磁盘空间,用于后续写入事务日志,默认64M
snapCount 每进行snapCount次事务日志输出后,触发一次快照,默认是100,000
maxClientCnxns 最大并发客户端数,默认是60

二、启动服务

我们接着往下看,来到 runFromConfig 方法。

public void runFromConfig(ServerConfig config) throws IOException {
	LOG.info("Starting server");
	FileTxnSnapLog txnLog = null;
	try {
		final ZooKeeperServer zkServer = new ZooKeeperServer();
		final CountDownLatch shutdownLatch = new CountDownLatch(1);
		
		//注册服务器关闭事件
		zkServer.registerServerShutdownHandler(
				new ZooKeeperServerShutdownHandler(shutdownLatch));
	
		//操作事务日志和快照日志文件类
		txnLog = new FileTxnSnapLog(new File(config.dataLogDir), new File(
				config.dataDir));
		txnLog.setServerStats(zkServer.serverStats());
		
		//设置配置属性
		zkServer.setTxnLogFactory(txnLog);
		zkServer.setTickTime(config.tickTime);
		zkServer.setMinSessionTimeout(config.minSessionTimeout);
		zkServer.setMaxSessionTimeout(config.maxSessionTimeout);
		
		//实例化ServerCnxnFactory抽象类
		cnxnFactory = ServerCnxnFactory.createFactory();
		cnxnFactory.configure(config.getClientPortAddress(),
				config.getMaxClientCnxns());
		cnxnFactory.startup(zkServer);
		shutdownLatch.await();
		shutdown();
		cnxnFactory.join();
		if (zkServer.canShutdown()) {
			zkServer.shutdown(true);
		}
	} catch (InterruptedException e) {
		LOG.warn("Server interrupted", e);
	} finally {
		if (txnLog != null) {
			txnLog.close();
		}
	}
}
复制代码

以上代码就是zookeeper服务器从启动到关闭的流程。我们拆分来看。

1、服务关闭事件

我们看到给zkServer注册了服务器关闭的处理类。

final ZooKeeperServer zkServer = new ZooKeeperServer();
final CountDownLatch shutdownLatch = new CountDownLatch(1);
zkServer.registerServerShutdownHandler(
		new ZooKeeperServerShutdownHandler(shutdownLatch));
复制代码

首先,我们应该知道zookeeper服务器是有状态的。

protected enum State {
	INITIAL, RUNNING, SHUTDOWN, ERROR;
}
复制代码

那么,在状态发生变化的时候,就会调用到 setState 方法。

public class ZooKeeperServer{
	//当zookeeper服务器状态发生变化时候调用此方法
	protected void setState(State state) {
		this.state = state;
		if (zkShutdownHandler != null) {
			zkShutdownHandler.handle(state);
		} else {
			LOG.debug("ZKShutdownHandler is not registered, so ZooKeeper server "
					+ "won't take any action on ERROR or SHUTDOWN server state changes");
		}
	}
}
复制代码

然后在这里就会调用到注册的处理器。在处理器中,如果发现状态不对,shutdownLatch.await方法就会被唤醒。

class ZooKeeperServerShutdownHandler {
	void handle(State state) {
        if (state == State.ERROR || state == State.SHUTDOWN) {
            shutdownLatch.countDown();
        }
    }
}
复制代码

当它被唤醒,事情就变得简单了。关闭、清理各种资源。

2、日志文件

事务日志文件和快照文件的操作,分别对应着两个实现类,在这里就是为了创建文件路径和创建类实例。

public FileTxnSnapLog(File dataDir, File snapDir) throws IOException {
	LOG.debug("Opening datadir:{} snapDir:{}", dataDir, snapDir);

	this.dataDir = new File(dataDir, version + VERSION);
	this.snapDir = new File(snapDir, version + VERSION);
	if (!this.dataDir.exists()) {
		if (!this.dataDir.mkdirs()) {
			throw new IOException("Unable to create data directory "
					+ this.dataDir);
		}
	}
	if (!this.dataDir.canWrite()) {
		throw new IOException("Cannot write to data directory " + this.dataDir);
	}
	if (!this.snapDir.exists()) {
		if (!this.snapDir.mkdirs()) {
			throw new IOException("Unable to create snap directory "
					+ this.snapDir);
		}
	}
	if (!this.snapDir.canWrite()) {
		throw new IOException("Cannot write to snap directory " + this.snapDir);
	}
	if(!this.dataDir.getPath().equals(this.snapDir.getPath())){
		checkLogDir();
		checkSnapDir();
	}

	txnLog = new FileTxnLog(this.dataDir);
	snapLog = new FileSnap(this.snapDir);
}
复制代码

上面的好理解,如果文件不存在就去创建,并检查是否拥有写入权限。

中间还有个判断很有意思,如果两个文件路径不相同,还要调用 checkLogDir、checkSnapDir 去检查。检查什么呢?就是不能放在一起。

事务日志文件目录下,不能包含快照文件。 快照文件目录下,也不能包含事务日志文件。

最后,就是初始化两个实现类,把创建后的文件对象告诉它们。

3、启动服务

服务器的启动对应着两个实现:NIO服务器和Netty服务器。所以一开始要调用 createFactory 来选择实例化一个实现类。

static public ServerCnxnFactory createFactory() throws IOException {
	String serverCnxnFactoryName =
		System.getProperty("zookeeper.serverCnxnFactory");
	if (serverCnxnFactoryName == null) {
		serverCnxnFactoryName = NIOServerCnxnFactory.class.getName();
	}
	try {
		ServerCnxnFactory serverCnxnFactory = Class.forName(serverCnxnFactoryName)
				.getDeclaredConstructor().newInstance();
		return serverCnxnFactory;
	} catch (Exception e) {
		IOException ioe = new IOException("Couldn't instantiate "
				+ serverCnxnFactoryName);
		ioe.initCause(e);
		throw ioe;
	}
}
复制代码

先获取 zookeeper.serverCnxnFactory 属性值,如果它为空,默认创建的就是 NIOServerCnxnFactory 实例。

所以,如果我们希望用Netty启动,就可以这样设置: System.setProperty("zookeeper.serverCnxnFactory", NettyServerCnxnFactory.class.getName());

最后通过反射获取它们的构造器并实例化。然后调用它们的方法来绑定端口,启动服务。两者差异不大,在这里咱们以Netty为例看一下。

  • 构造函数
NettyServerCnxnFactory() {
	bootstrap = new ServerBootstrap(
			new NioServerSocketChannelFactory(
					Executors.newCachedThreadPool(),
					Executors.newCachedThreadPool()));
	bootstrap.setOption("reuseAddress", true);
	bootstrap.setOption("child.tcpNoDelay", true);
	bootstrap.setOption("child.soLinger", -1);
	bootstrap.getPipeline().addLast("servercnxnfactory", channelHandler);
}
复制代码

在构造函数中,初始化ServerBootstrap对象,设置TCP参数。我们重点关注的是,它的事件处理器channelHandler。

  • 事件处理器

这里的channelHandler是一个内部类,继承自SimpleChannelHandler。它被标注为@Sharable,还是一个共享的处理器。

@Sharable
class CnxnChannelHandler extends SimpleChannelHandler {
	
	//客户端连接被关闭
	public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e)throws Exception{
		//移除相应的Channel
		allChannels.remove(ctx.getChannel());
	}
	//客户端连接
	public void channelConnected(ChannelHandlerContext ctx,ChannelStateEvent e) throws Exception{
		allChannels.add(ctx.getChannel());
		NettyServerCnxn cnxn = new NettyServerCnxn(ctx.getChannel(),
				zkServer, NettyServerCnxnFactory.this);
		ctx.setAttachment(cnxn);
		addCnxn(cnxn);
	}
	//连接断开
	public void channelDisconnected(ChannelHandlerContext ctx,ChannelStateEvent e) throws Exception{
		NettyServerCnxn cnxn = (NettyServerCnxn) ctx.getAttachment();
		if (cnxn != null) {
			cnxn.close();
		}
	}
	//发生异常
	public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e)throws Exception{
		NettyServerCnxn cnxn = (NettyServerCnxn) ctx.getAttachment();
		if (cnxn != null) {
			if (LOG.isDebugEnabled()) {
				LOG.debug("Closing " + cnxn);
			}
			cnxn.close();
		}
	}
	//有消息可读
	public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)throws Exception{
		try {
			//找到对应的NettyServerCnxn,调用方法处理请求信息
			NettyServerCnxn cnxn = (NettyServerCnxn)ctx.getAttachment();
			synchronized(cnxn) {
				processMessage(e, cnxn);
			}
		} catch(Exception ex) {
			LOG.error("Unexpected exception in receive", ex);
			throw ex;
		}
	}
	//处理消息
	private void processMessage(MessageEvent e, NettyServerCnxn cnxn) {
		....省略
	}
}
复制代码

这里面就是处理各种IO事件。比如客户端连接、断开连接、可读消息...

我们看 messageReceived 方法。当有消息请求时,调用到此方法。它会找到当前Channel对应的NettyServerCnxn对象,调用其 receiveMessage 方法,来完成具体请求的处理。

  • 绑定端口

初始化完成之后,通过 bootstrap.bind 来绑定端口,正式开始对外提供服务。

public class NettyServerCnxnFactory extends ServerCnxnFactory {
	public void start() {
		LOG.info("binding to port " + localAddress);
		parentChannel = bootstrap.bind(localAddress);
	}
}
复制代码

上面我们调用start方法启动了Netty服务,但整个zookeeper的启动过程还没有完成。

public void startup(ZooKeeperServer zks) throws IOException,InterruptedException {
	start();
	setZooKeeperServer(zks);
	zks.startdata();
	zks.startup();
}
复制代码

三、加载数据

接着我们看 zks.startdata(); 它要从zookeeper数据库加载数据。

有的同学不禁有疑问, 什么,zk竟然还有数据库? 不着急,我们慢慢看。

public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
	//加载数据
    public void startdata()throws IOException, InterruptedException {	
        //刚启动的时候,zkDb为空,先去初始化。
        if (zkDb == null) {
            zkDb = new ZKDatabase(this.txnLogFactory);
        }  
		//加载数据
        if (!zkDb.isInitialized()) {
            loadData();
        }
    }
}
复制代码

上面的代码中,在刚启动的时候zkDb为空,所以会进入第一个条件判断,调用构造方法,初始化zkDb。之后,调用loadData方法加载数据。

1、ZKDatabase

事实上,zookeeper并没有数据库,有的只是 ZKDatabase 这个类,或者叫它内存数据库。 我们先看看它有哪些属性。

public class ZKDatabase {    
	//数据树
    protected DataTree dataTree;
	//Session超时会话
    protected ConcurrentHashMap<Long, Integer> sessionsWithTimeouts;
	//事务、快照Log
    protected FileTxnSnapLog snapLog;
	//最小、最大事务ID
    protected long minCommittedLog, maxCommittedLog;
    public static final int commitLogCount = 500;
    protected static int commitLogBuffer = 700;
	//事务日志列表,记录着提案信息
    protected LinkedList<Proposal> committedLog = new LinkedList<Proposal>();
    protected ReentrantReadWriteLock logLock = new ReentrantReadWriteLock();
	//初始化标记
    volatile private boolean initialized = false;
}
复制代码

这里面包括会话,数据树和提交日志。所有的数据都保存在DataTree中,它就是数据树,它保存所有的节点数据。

public class DataTree {
	//哈希表提供对数据节点的快速查找
    private final ConcurrentHashMap<String, DataNode> nodes =
        new ConcurrentHashMap<String, DataNode>();

	//Watcher相关
    private final WatchManager dataWatches = new WatchManager();
    private final WatchManager childWatches = new WatchManager();
	
	//zookeeper默认创建的节点
    private static final String rootZookeeper = "/";
    private static final String procZookeeper = "/zookeeper";
    private static final String procChildZookeeper = procZookeeper.substring(1);
    private static final String quotaZookeeper = "/zookeeper/quota";
    private static final String quotaChildZookeeper = quotaZookeeper
            .substring(procZookeeper.length() + 1);
}
复制代码

在我们从zookeeper上查询节点数据的时候,就是通过DataTree中的方法去获取。再具体就是通过节点名称去nodes哈希表去查询。比如:

public byte[] getData(String path, Stat stat, Watcher watcher){
	DataNode n = nodes.get(path);
	if (n == null) {
		throw new KeeperException.NoNodeException();
	}
	synchronized (n) {
		n.copyStat(stat);
		if (watcher != null) {
			dataWatches.addWatch(path, watcher);
		}
		return n.data;
	}
}
复制代码

那我们也许已经想到了,DataNode才会保存数据的真正载体。

public class DataNode implements Record {    
	//父级节点
    DataNode parent;
	//节点数据内容
    byte data[];
    //权限信息
    Long acl;
    //节点统计信息
    public StatPersisted stat;
	//子节点集合
    private Set<String> children = null;
	//空Set对象
    private static final Set<String> EMPTY_SET = Collections.emptySet();
}
复制代码

在zookeeper中的一个节点就对应一个DataNode对象。它包含一个父级节点和子节点集合、权限信息、节点数据内容、统计信息,都在此类中表示。

【zookeeper源码】启动流程详解

2、实例化对象

我们接着回过头来,继续看代码。如果zkDb为空,就要去实例化它。

public ZKDatabase(FileTxnSnapLog snapLog) {
	dataTree = new DataTree();
	sessionsWithTimeouts = new ConcurrentHashMap<Long, Integer>();
	this.snapLog = snapLog;
}
复制代码

这里就是实例化DataTree对象,初始化超时会话的Map,赋值snapLog 对象。

那么在DataTree的构造函数中,初始化zookeeper默认的节点,就是往nodes哈希表中添加DataNode对象。

public DataTree() {
	nodes.put("", root);
	nodes.put(rootZookeeper, root);
	root.addChild(procChildZookeeper);
	nodes.put(procZookeeper, procDataNode);
	procDataNode.addChild(quotaChildZookeeper);
	nodes.put(quotaZookeeper, quotaDataNode);
}
复制代码

3、加载数据

如果zkDb还没有被初始化,那就加载数据库,并设置为已初始化状态,然后清理一下过期Session。

public class ZooKeeperServer{

	public void loadData() throws IOException, InterruptedException {
		if(zkDb.isInitialized()){
			setZxid(zkDb.getDataTreeLastProcessedZxid());
		}
		else {
			setZxid(zkDb.loadDataBase());
		}
		//清理过期session
		LinkedList<Long> deadSessions = new LinkedList<Long>();
		for (Long session : zkDb.getSessions()) {
			if (zkDb.getSessionWithTimeOuts().get(session) == null) {
				deadSessions.add(session);
			}
		}
		zkDb.setDataTreeInit(true);
		for (long session : deadSessions) {
			killSession(session, zkDb.getDataTreeLastProcessedZxid());
		}
	}
}
复制代码

我们看 zkDb.loadDataBase() 方法。它将从磁盘文件中加载数据库。

public class ZKDatabase {

	//从磁盘文件中加载数据库,并返回最大事务ID
	public long loadDataBase() throws IOException {
        long zxid = snapLog.restore(dataTree, s
				essionsWithTimeouts, commitProposalPlaybackListener);
        initialized = true;
        return zxid;
    }
}
复制代码

既然是磁盘文件,那么肯定就是快照文件和事务日志文件。 snapLog.restore 将证实这一点。

public class FileTxnSnapLog {
	public long restore(DataTree dt, Map<Long, Integer> sessions, 
			PlayBackListener listener) throws IOException {	
		//从快照文件中加载数据
        snapLog.deserialize(dt, sessions);
		//从事务日志文件中加载数据
        long fastForwardFromEdits = fastForwardFromEdits(dt, sessions, listener);
        return fastForwardFromEdits;
    }
}
复制代码

加载数据的过程看起来比较复杂,但核心就一点:从文件流中读取数据,转换成DataTree对象,放入zkDb中。在这里,咱们先不看解析文件的过程,就看看文件里存放的到底是些啥?

快照文件

我们找到 org.apache.zookeeper.server.SnapshotFormatter ,它可以帮我们输出快照文件内容。在main方法中,设置一下快照文件的路径,然后运行它。

public class SnapshotFormatter {
	public static void main(String[] args) throws Exception {		
		//设置快照文件路径
		args = new String[1];
		args[0] = "E:\\zookeeper-data\\version-2\\snapshot.6";
		if (args.length != 1) {
			System.err.println("USAGE: SnapshotFormatter snapshot_file");
			System.exit(2);
		}
		new SnapshotFormatter().run(args[0]);
	}
}
复制代码

运行这个main方法,在控制台输出的就是快照文件内容。

ZNode Details (count=8):
----
/
  cZxid = 0x00000000000000
  ctime = Thu Jan 01 08:00:00 CST 1970
  mZxid = 0x00000000000000
  mtime = Thu Jan 01 08:00:00 CST 1970
  pZxid = 0x00000000000002
  cversion = 1
  dataVersion = 0
  aclVersion = 0
  ephemeralOwner = 0x00000000000000
  dataLength = 0
----
/zookeeper
  cZxid = 0x00000000000000
  ctime = Thu Jan 01 08:00:00 CST 1970
  mZxid = 0x00000000000000
  mtime = Thu Jan 01 08:00:00 CST 1970
  pZxid = 0x00000000000000
  cversion = 0
  dataVersion = 0
  aclVersion = 0
  ephemeralOwner = 0x00000000000000
  dataLength = 0
----
/zookeeper/quota
  cZxid = 0x00000000000000
  ctime = Thu Jan 01 08:00:00 CST 1970
  mZxid = 0x00000000000000
  mtime = Thu Jan 01 08:00:00 CST 1970
  pZxid = 0x00000000000000
  cversion = 0
  dataVersion = 0
  aclVersion = 0
  ephemeralOwner = 0x00000000000000
  dataLength = 0
----
/test
  cZxid = 0x00000000000002
  ctime = Sat Feb 23 19:57:43 CST 2019
  mZxid = 0x00000000000002
  mtime = Sat Feb 23 19:57:43 CST 2019
  pZxid = 0x00000000000005
  cversion = 3
  dataVersion = 0
  aclVersion = 0
  ephemeralOwner = 0x00000000000000
  dataLength = 4
----
/test/t1
  cZxid = 0x00000000000003
  ctime = Sat Feb 23 19:57:53 CST 2019
  mZxid = 0x00000000000003
  mtime = Sat Feb 23 19:57:53 CST 2019
  pZxid = 0x00000000000003
  cversion = 0
  dataVersion = 0
  aclVersion = 0
  ephemeralOwner = 0x00000000000000
  dataLength = 4
----
/test/t2
  cZxid = 0x00000000000004
  ctime = Sat Feb 23 19:57:56 CST 2019
  mZxid = 0x00000000000004
  mtime = Sat Feb 23 19:57:56 CST 2019
  pZxid = 0x00000000000004
  cversion = 0
  dataVersion = 0
  aclVersion = 0
  ephemeralOwner = 0x00000000000000
  dataLength = 4
----
/test/t3
  cZxid = 0x00000000000005
  ctime = Sat Feb 23 19:57:58 CST 2019
  mZxid = 0x00000000000005
  mtime = Sat Feb 23 19:57:58 CST 2019
  pZxid = 0x00000000000005
  cversion = 0
  dataVersion = 0
  aclVersion = 0
  ephemeralOwner = 0x00000000000000
  dataLength = 4
----
Session Details (sid, timeout, ephemeralCount):
0x10013d3939a0000, 99999, 0
0x10013d1adcb0000, 99999, 0
复制代码

我们可以看到,格式化后的快照文件内容,除了开头的count信息和结尾的Session信息,中间每一行就是一个DataNode对象。从节点名称可以推算出自己的父级节点和子节点,其它的就是此节点的统计信息对象StatPersisted。

事务日志文件

我们找到 org.apache.zookeeper.server.LogFormatter 这个类,在main方法中设置事务日志的文件路径,然后运行它。在zookeeper中的每一个事务操作,都会被记录下来。

19-2-23 下午07时57分32秒 session 0x10013d1adcb0000 cxid 0x0 zxid 0x1 createSession 99999

19-2-23 下午07时57分43秒 session 0x10013d1adcb0000 cxid 0x2 zxid 0x2 create '/test,#31323334,v{s{31,s{'world,'anyone}}},F,1

19-2-23 下午07时57分53秒 session 0x10013d1adcb0000 cxid 0x3 zxid 0x3 create '/test/t1,#31323334,v{s{31,s{'world,'anyone}}},F,1

19-2-23 下午07时57分56秒 session 0x10013d1adcb0000 cxid 0x4 zxid 0x4 create '/test/t2,#31323334,v{s{31,s{'world,'anyone}}},F,2

19-2-23 下午07时57分58秒 session 0x10013d1adcb0000 cxid 0x5 zxid 0x5 create '/test/t3,#31323334,v{s{31,s{'world,'anyone}}},F,3

19-2-23 下午07时58分51秒 session 0x10013d3939a0000 cxid 0x0 zxid 0x6 createSession 99999

19-2-23 下午07时59分07秒 session 0x10013d3939a0000 cxid 0x4 zxid 0x7 create '/test/t4,#31323334,v{s{31,s{'world,'anyone}}},F,4
复制代码

可以看到,每一个事务对应一行记录。包含操作时间、sessionId、事务ID、操作类型、节点名称和权限信息等。 需要注意的是,只有变更操作才会被记录到事务日志。所以,在这里我们看不到任何读取操作请求。

四、会话管理器

会话是Zookeeper中一个重要的抽象。保证请求有序、临时znode节点、监听点都和会话密切相关。Zookeeper服务器的一个重要任务就是跟踪并维护这些会话。

在zookeeper中,服务器要负责清理掉过期会话,而客户端要保持自己的活跃状态,只能依靠心跳信息或者一个新的读写请求。

而对于过期会话的管理,则依靠“分桶策略”来完成。具体情况是这样的:

  • 1、zookeeper会为每个会话设置一个过期时间,我们称它为nextExpirationTime
  • 2、将这个过期时间和相对应的Session集合放入Map中
  • 3、开启线程,不断轮训这个Map,取出当前过期点nextExpirationTime的Session集合,然后关闭它们
  • 4、未活跃的Session被关闭;正在活跃的Session会重新计算自己的过期时间,修改自己的过期时间nextExpirationTime,保证不会被线程扫描到

简而言之,还在活跃的Session依靠不断重置自己的nextExpirationTime时间,就不会被线程扫描到,继而被关闭。

接下来我们看调用到的 zks.startup(); 方法,具体是怎么做的。

public class ZooKeeperServer

	public synchronized void startup() {
		if (sessionTracker == null) {
			createSessionTracker();
		}
		startSessionTracker();
		setupRequestProcessors();
		registerJMX();
		setState(State.RUNNING);
		notifyAll();
	}
}
复制代码

我们只关注 createSessionTracker、startSessionTracker 两个方法,它们和会话相关。

1、创建会话跟踪器

创建会话跟踪器,这里是一个 SessionTrackerImpl 对象实例。

protected void createSessionTracker() {
	sessionTracker = new SessionTrackerImpl(this, zkDb.getSessionWithTimeOuts(),
			tickTime, 1, getZooKeeperServerListener());
}
复制代码

在构造方法里,做了一些参数初始化的工作。

public SessionTrackerImpl(SessionExpirer expirer,
		ConcurrentHashMap<Long, Integer> sessionsWithTimeout, int tickTime,
		long sid, ZooKeeperServerListener listener){
		
	super("SessionTracker", listener);
	this.expirer = expirer;
	this.expirationInterval = tickTime;
	this.sessionsWithTimeout = sessionsWithTimeout;
	nextExpirationTime = roundToInterval(Time.currentElapsedTime());
	this.nextSessionId = initializeNextSession(sid);
	for (Entry<Long, Integer> e : sessionsWithTimeout.entrySet()) {
		addSession(e.getKey(), e.getValue());
	}
}
复制代码

我们重点关注下一个过期时间 nextExpirationTime 是怎样计算出来的。我们来看 roundToInterval 方法。

private long roundToInterval(long time) {
	return (time / expirationInterval + 1) * expirationInterval;
}
复制代码

其中,time是基于当前时间的一个时间戳;expirationInterval是我们配置文件中的tickTime。如果我们假定 time=10,expirationInterval=2 ,那么上面计算后的下一个过期时间为 (10/2+1)*2=12

这也就是说,当前的Session会被分配到Id为12的分桶中。我们继续往下看这一过程。 在 addSession 方法中,先查询是否有会话Id的SessionImpl,没有则新建并存入。

synchronized public void addSession(long id, int sessionTimeout) {
	
	sessionsWithTimeout.put(id, sessionTimeout);
	//查询对应SessionId的Impl类
	if (sessionsById.get(id) == null) {
		SessionImpl s = new SessionImpl(id, sessionTimeout, 0);
		sessionsById.put(id, s);
	} else {
		if (LOG.isTraceEnabled()) {
			ZooTrace.logTraceMessage(LOG, ZooTrace.SESSION_TRACE_MASK,
					"SessionTrackerImpl --- Existing session 0x"
					+ Long.toHexString(id) + " " + sessionTimeout);
		}
	}
	touchSession(id, sessionTimeout);
}
复制代码

最后调用 touchSession 来激活会话。需要注意的是,zookeeper中的每个请求都会调用到此方法。它来计算活跃Session的下一个过期时间,并迁移到不同桶中。

我们一直在说“分桶”,或许难以理解“桶”到底是个什么东西。在代码中,它其实是个HashSet对象。

public class SessionTrackerImpl{
		
	//过期时间和对应Session集合的映射
	HashMap<Long, SessionSet> sessionSets = new HashMap<Long, SessionSet>();	
	//Session集合
	static class SessionSet {
        HashSet<SessionImpl> sessions = new HashSet<SessionImpl>();
    }
	
	synchronized public boolean touchSession(long sessionId, int timeout) {
	
		SessionImpl s = sessionsById.get(sessionId);
		//如果session被删除或者已经被标记为关闭状态
		if (s == null || s.isClosing()) {
			return false;
		}
		//计算下一个过期时间
		long expireTime = roundToInterval(Time.currentElapsedTime() + timeout);
		if (s.tickTime >= expireTime) {
			return true;
		}
		
		//获取Session当前的过期时间
		SessionSet set = sessionSets.get(s.tickTime);
		if (set != null) {
			//从集合中删除
			set.sessions.remove(s);
		}
		
		//设置新的过期时间并加入到Session集合中
		s.tickTime = expireTime;
		set = sessionSets.get(s.tickTime);
		if (set == null) {
			set = new SessionSet();
			sessionSets.put(expireTime, set);
		}
		set.sessions.add(s);
		return true;
	}
}
复制代码

我们回头看上面那个公式,如果第一次Session请求计算后的过期时间为12。 那么,对应Session的映射如下: 12=org.apache.zookeeper.server.SessionTrackerImpl$SessionSet@25143a5e 第二次请求,计算后的过期时间为15。就会变成: 15=org.apache.zookeeper.server.SessionTrackerImpl$SessionSet@3045314d

同时,过期时间为12的记录被删除。这样,通过过期时间的变更,不断迁移这个Session的位置。我们就会想到,如果由于网络原因或者客户端假死,请求长时间未能到达服务器,那么对应Session的过期时间就不会发生变化。 **时代在变化,你不变,就会被抛弃。**这句话,同样适用于zookeeper中的会话。

我们接着看 startSessionTracker();

protected void startSessionTracker() {
	((SessionTrackerImpl)sessionTracker).start();
}
复制代码

SessionTrackerImpl 继承自 ZooKeeperCriticalThread ,所以它本身也是线程类。调用start方法后开启线程,我们看run方法。

synchronized public void run() {
	try {
		while (running) {
			currentTime = Time.currentElapsedTime();
			if (nextExpirationTime > currentTime) {
				this.wait(nextExpirationTime - currentTime);
				continue;
			}	
			SessionSet set;
			//获取过期时间对应的Session集合
			set = sessionSets.remove(nextExpirationTime);
			//循环Session,关闭它们
			if (set != null) {
				for (SessionImpl s : set.sessions) {
					setSessionClosing(s.sessionId);
					expirer.expire(s);
				}
			}
			nextExpirationTime += expirationInterval;
		}
	} catch (InterruptedException e) {
		handleException(this.getName(), e);
	}
	LOG.info("SessionTrackerImpl exited loop!");
}
复制代码

这个方法通过死循环的方式,不断获取过期时间对应的Session集合。简直就是 发现一起,查处一起 。 这也就解释了为什么活跃Session必须要不断更改自己的过期时间,因为这里有人在监督。

最后就是注册了JMX,并设置服务器的运行状态。


以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

从算法到程序

从算法到程序

徐子珊 / 2013-3 / 59.00元

《从算法到程序:从应用问题编程实践全面体验算法理论》第1章讨论算法设计、分析的基本概念,第2章讨论算法设计中最常用的几个数据结构,包括链表、栈、队列、二叉搜索数、散列表等。第3章讨论了算法设计的两个基本策略:渐增策略与分支策略。这3章的内容,为读者阅读本书以后的内容奠定了基础。第4章讨论了几个代数计算的基本问题及其算法,包括矩阵运算、解线性方程组、多项式运算等。第5章讨论了几个关于计算几何的基本问......一起来看看 《从算法到程序》 这本书的介绍吧!

URL 编码/解码
URL 编码/解码

URL 编码/解码

UNIX 时间戳转换
UNIX 时间戳转换

UNIX 时间戳转换

RGB CMYK 转换工具
RGB CMYK 转换工具

RGB CMYK 互转工具