【zookeeper源码】启动流程详解

栏目: 编程工具 · 发布时间: 5年前

内容简介:zookeeper启动类的位置在第一步就是要加载配置文件,我们来看这里主要就是把

zookeeper启动类的位置在 org.apache.zookeeper.server.ZooKeeperServerMain ,没错,找到它,并运行Main方法,即可启动zookeeper服务器。

请注意,在笔者的环境中只启动了一个zookeeper服务器,所以它并不是一个集群环境。

一、加载配置

第一步就是要加载配置文件,我们来看 initializeAndRun 方法。

protected void initializeAndRun(String[] args)throws ConfigException, IOException{
	ServerConfig config = new ServerConfig();
	if (args.length == 1) {
		config.parse(args[0]);
	} else {
		config.parse(conf);
	}
	runFromConfig(config);
}
复制代码

这里主要就是把 zoo.cfg 中的配置加载到ServerConfig对象中,过程比较简单,不再赘述。我们先看几个简单的配置项含义。

配置 含义
clientPort 对外服务端口,一般2181
dataDir 存储快照文件的目录,默认情况下,事务日志文件也会放在这
tickTime ZK中的一个时间单元。ZK中所有时间都是以这个时间单元为基础,进行整数倍配置
minSessionTimeout maxSessionTimeout Session超时时间,默认是2 tickTime ~ 20 tickTime 之间
preAllocSize 预先开辟磁盘空间,用于后续写入事务日志,默认64M
snapCount 每进行snapCount次事务日志输出后,触发一次快照,默认是100,000
maxClientCnxns 最大并发客户端数,默认是60

二、启动服务

我们接着往下看,来到 runFromConfig 方法。

public void runFromConfig(ServerConfig config) throws IOException {
	LOG.info("Starting server");
	FileTxnSnapLog txnLog = null;
	try {
		final ZooKeeperServer zkServer = new ZooKeeperServer();
		final CountDownLatch shutdownLatch = new CountDownLatch(1);
		
		//注册服务器关闭事件
		zkServer.registerServerShutdownHandler(
				new ZooKeeperServerShutdownHandler(shutdownLatch));
	
		//操作事务日志和快照日志文件类
		txnLog = new FileTxnSnapLog(new File(config.dataLogDir), new File(
				config.dataDir));
		txnLog.setServerStats(zkServer.serverStats());
		
		//设置配置属性
		zkServer.setTxnLogFactory(txnLog);
		zkServer.setTickTime(config.tickTime);
		zkServer.setMinSessionTimeout(config.minSessionTimeout);
		zkServer.setMaxSessionTimeout(config.maxSessionTimeout);
		
		//实例化ServerCnxnFactory抽象类
		cnxnFactory = ServerCnxnFactory.createFactory();
		cnxnFactory.configure(config.getClientPortAddress(),
				config.getMaxClientCnxns());
		cnxnFactory.startup(zkServer);
		shutdownLatch.await();
		shutdown();
		cnxnFactory.join();
		if (zkServer.canShutdown()) {
			zkServer.shutdown(true);
		}
	} catch (InterruptedException e) {
		LOG.warn("Server interrupted", e);
	} finally {
		if (txnLog != null) {
			txnLog.close();
		}
	}
}
复制代码

以上代码就是zookeeper服务器从启动到关闭的流程。我们拆分来看。

1、服务关闭事件

我们看到给zkServer注册了服务器关闭的处理类。

final ZooKeeperServer zkServer = new ZooKeeperServer();
final CountDownLatch shutdownLatch = new CountDownLatch(1);
zkServer.registerServerShutdownHandler(
		new ZooKeeperServerShutdownHandler(shutdownLatch));
复制代码

首先,我们应该知道zookeeper服务器是有状态的。

protected enum State {
	INITIAL, RUNNING, SHUTDOWN, ERROR;
}
复制代码

那么,在状态发生变化的时候,就会调用到 setState 方法。

public class ZooKeeperServer{
	//当zookeeper服务器状态发生变化时候调用此方法
	protected void setState(State state) {
		this.state = state;
		if (zkShutdownHandler != null) {
			zkShutdownHandler.handle(state);
		} else {
			LOG.debug("ZKShutdownHandler is not registered, so ZooKeeper server "
					+ "won't take any action on ERROR or SHUTDOWN server state changes");
		}
	}
}
复制代码

然后在这里就会调用到注册的处理器。在处理器中,如果发现状态不对,shutdownLatch.await方法就会被唤醒。

class ZooKeeperServerShutdownHandler {
	void handle(State state) {
        if (state == State.ERROR || state == State.SHUTDOWN) {
            shutdownLatch.countDown();
        }
    }
}
复制代码

当它被唤醒,事情就变得简单了。关闭、清理各种资源。

2、日志文件

事务日志文件和快照文件的操作,分别对应着两个实现类,在这里就是为了创建文件路径和创建类实例。

public FileTxnSnapLog(File dataDir, File snapDir) throws IOException {
	LOG.debug("Opening datadir:{} snapDir:{}", dataDir, snapDir);

	this.dataDir = new File(dataDir, version + VERSION);
	this.snapDir = new File(snapDir, version + VERSION);
	if (!this.dataDir.exists()) {
		if (!this.dataDir.mkdirs()) {
			throw new IOException("Unable to create data directory "
					+ this.dataDir);
		}
	}
	if (!this.dataDir.canWrite()) {
		throw new IOException("Cannot write to data directory " + this.dataDir);
	}
	if (!this.snapDir.exists()) {
		if (!this.snapDir.mkdirs()) {
			throw new IOException("Unable to create snap directory "
					+ this.snapDir);
		}
	}
	if (!this.snapDir.canWrite()) {
		throw new IOException("Cannot write to snap directory " + this.snapDir);
	}
	if(!this.dataDir.getPath().equals(this.snapDir.getPath())){
		checkLogDir();
		checkSnapDir();
	}

	txnLog = new FileTxnLog(this.dataDir);
	snapLog = new FileSnap(this.snapDir);
}
复制代码

上面的好理解,如果文件不存在就去创建,并检查是否拥有写入权限。

中间还有个判断很有意思,如果两个文件路径不相同,还要调用 checkLogDir、checkSnapDir 去检查。检查什么呢?就是不能放在一起。

事务日志文件目录下,不能包含快照文件。 快照文件目录下,也不能包含事务日志文件。

最后,就是初始化两个实现类,把创建后的文件对象告诉它们。

3、启动服务

服务器的启动对应着两个实现:NIO服务器和Netty服务器。所以一开始要调用 createFactory 来选择实例化一个实现类。

static public ServerCnxnFactory createFactory() throws IOException {
	String serverCnxnFactoryName =
		System.getProperty("zookeeper.serverCnxnFactory");
	if (serverCnxnFactoryName == null) {
		serverCnxnFactoryName = NIOServerCnxnFactory.class.getName();
	}
	try {
		ServerCnxnFactory serverCnxnFactory = Class.forName(serverCnxnFactoryName)
				.getDeclaredConstructor().newInstance();
		return serverCnxnFactory;
	} catch (Exception e) {
		IOException ioe = new IOException("Couldn't instantiate "
				+ serverCnxnFactoryName);
		ioe.initCause(e);
		throw ioe;
	}
}
复制代码

先获取 zookeeper.serverCnxnFactory 属性值,如果它为空,默认创建的就是 NIOServerCnxnFactory 实例。

所以,如果我们希望用Netty启动,就可以这样设置: System.setProperty("zookeeper.serverCnxnFactory", NettyServerCnxnFactory.class.getName());

最后通过反射获取它们的构造器并实例化。然后调用它们的方法来绑定端口,启动服务。两者差异不大,在这里咱们以Netty为例看一下。

  • 构造函数
NettyServerCnxnFactory() {
	bootstrap = new ServerBootstrap(
			new NioServerSocketChannelFactory(
					Executors.newCachedThreadPool(),
					Executors.newCachedThreadPool()));
	bootstrap.setOption("reuseAddress", true);
	bootstrap.setOption("child.tcpNoDelay", true);
	bootstrap.setOption("child.soLinger", -1);
	bootstrap.getPipeline().addLast("servercnxnfactory", channelHandler);
}
复制代码

在构造函数中,初始化ServerBootstrap对象,设置TCP参数。我们重点关注的是,它的事件处理器channelHandler。

  • 事件处理器

这里的channelHandler是一个内部类,继承自SimpleChannelHandler。它被标注为@Sharable,还是一个共享的处理器。

@Sharable
class CnxnChannelHandler extends SimpleChannelHandler {
	
	//客户端连接被关闭
	public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e)throws Exception{
		//移除相应的Channel
		allChannels.remove(ctx.getChannel());
	}
	//客户端连接
	public void channelConnected(ChannelHandlerContext ctx,ChannelStateEvent e) throws Exception{
		allChannels.add(ctx.getChannel());
		NettyServerCnxn cnxn = new NettyServerCnxn(ctx.getChannel(),
				zkServer, NettyServerCnxnFactory.this);
		ctx.setAttachment(cnxn);
		addCnxn(cnxn);
	}
	//连接断开
	public void channelDisconnected(ChannelHandlerContext ctx,ChannelStateEvent e) throws Exception{
		NettyServerCnxn cnxn = (NettyServerCnxn) ctx.getAttachment();
		if (cnxn != null) {
			cnxn.close();
		}
	}
	//发生异常
	public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e)throws Exception{
		NettyServerCnxn cnxn = (NettyServerCnxn) ctx.getAttachment();
		if (cnxn != null) {
			if (LOG.isDebugEnabled()) {
				LOG.debug("Closing " + cnxn);
			}
			cnxn.close();
		}
	}
	//有消息可读
	public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)throws Exception{
		try {
			//找到对应的NettyServerCnxn,调用方法处理请求信息
			NettyServerCnxn cnxn = (NettyServerCnxn)ctx.getAttachment();
			synchronized(cnxn) {
				processMessage(e, cnxn);
			}
		} catch(Exception ex) {
			LOG.error("Unexpected exception in receive", ex);
			throw ex;
		}
	}
	//处理消息
	private void processMessage(MessageEvent e, NettyServerCnxn cnxn) {
		....省略
	}
}
复制代码

这里面就是处理各种IO事件。比如客户端连接、断开连接、可读消息...

我们看 messageReceived 方法。当有消息请求时,调用到此方法。它会找到当前Channel对应的NettyServerCnxn对象,调用其 receiveMessage 方法,来完成具体请求的处理。

  • 绑定端口

初始化完成之后,通过 bootstrap.bind 来绑定端口,正式开始对外提供服务。

public class NettyServerCnxnFactory extends ServerCnxnFactory {
	public void start() {
		LOG.info("binding to port " + localAddress);
		parentChannel = bootstrap.bind(localAddress);
	}
}
复制代码

上面我们调用start方法启动了Netty服务,但整个zookeeper的启动过程还没有完成。

public void startup(ZooKeeperServer zks) throws IOException,InterruptedException {
	start();
	setZooKeeperServer(zks);
	zks.startdata();
	zks.startup();
}
复制代码

三、加载数据

接着我们看 zks.startdata(); 它要从zookeeper数据库加载数据。

有的同学不禁有疑问, 什么,zk竟然还有数据库? 不着急,我们慢慢看。

public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
	//加载数据
    public void startdata()throws IOException, InterruptedException {	
        //刚启动的时候,zkDb为空,先去初始化。
        if (zkDb == null) {
            zkDb = new ZKDatabase(this.txnLogFactory);
        }  
		//加载数据
        if (!zkDb.isInitialized()) {
            loadData();
        }
    }
}
复制代码

上面的代码中,在刚启动的时候zkDb为空,所以会进入第一个条件判断,调用构造方法,初始化zkDb。之后,调用loadData方法加载数据。

1、ZKDatabase

事实上,zookeeper并没有数据库,有的只是 ZKDatabase 这个类,或者叫它内存数据库。 我们先看看它有哪些属性。

public class ZKDatabase {    
	//数据树
    protected DataTree dataTree;
	//Session超时会话
    protected ConcurrentHashMap<Long, Integer> sessionsWithTimeouts;
	//事务、快照Log
    protected FileTxnSnapLog snapLog;
	//最小、最大事务ID
    protected long minCommittedLog, maxCommittedLog;
    public static final int commitLogCount = 500;
    protected static int commitLogBuffer = 700;
	//事务日志列表,记录着提案信息
    protected LinkedList<Proposal> committedLog = new LinkedList<Proposal>();
    protected ReentrantReadWriteLock logLock = new ReentrantReadWriteLock();
	//初始化标记
    volatile private boolean initialized = false;
}
复制代码

这里面包括会话,数据树和提交日志。所有的数据都保存在DataTree中,它就是数据树,它保存所有的节点数据。

public class DataTree {
	//哈希表提供对数据节点的快速查找
    private final ConcurrentHashMap<String, DataNode> nodes =
        new ConcurrentHashMap<String, DataNode>();

	//Watcher相关
    private final WatchManager dataWatches = new WatchManager();
    private final WatchManager childWatches = new WatchManager();
	
	//zookeeper默认创建的节点
    private static final String rootZookeeper = "/";
    private static final String procZookeeper = "/zookeeper";
    private static final String procChildZookeeper = procZookeeper.substring(1);
    private static final String quotaZookeeper = "/zookeeper/quota";
    private static final String quotaChildZookeeper = quotaZookeeper
            .substring(procZookeeper.length() + 1);
}
复制代码

在我们从zookeeper上查询节点数据的时候,就是通过DataTree中的方法去获取。再具体就是通过节点名称去nodes哈希表去查询。比如:

public byte[] getData(String path, Stat stat, Watcher watcher){
	DataNode n = nodes.get(path);
	if (n == null) {
		throw new KeeperException.NoNodeException();
	}
	synchronized (n) {
		n.copyStat(stat);
		if (watcher != null) {
			dataWatches.addWatch(path, watcher);
		}
		return n.data;
	}
}
复制代码

那我们也许已经想到了,DataNode才会保存数据的真正载体。

public class DataNode implements Record {    
	//父级节点
    DataNode parent;
	//节点数据内容
    byte data[];
    //权限信息
    Long acl;
    //节点统计信息
    public StatPersisted stat;
	//子节点集合
    private Set<String> children = null;
	//空Set对象
    private static final Set<String> EMPTY_SET = Collections.emptySet();
}
复制代码

在zookeeper中的一个节点就对应一个DataNode对象。它包含一个父级节点和子节点集合、权限信息、节点数据内容、统计信息,都在此类中表示。

【zookeeper源码】启动流程详解

2、实例化对象

我们接着回过头来,继续看代码。如果zkDb为空,就要去实例化它。

public ZKDatabase(FileTxnSnapLog snapLog) {
	dataTree = new DataTree();
	sessionsWithTimeouts = new ConcurrentHashMap<Long, Integer>();
	this.snapLog = snapLog;
}
复制代码

这里就是实例化DataTree对象,初始化超时会话的Map,赋值snapLog 对象。

那么在DataTree的构造函数中,初始化zookeeper默认的节点,就是往nodes哈希表中添加DataNode对象。

public DataTree() {
	nodes.put("", root);
	nodes.put(rootZookeeper, root);
	root.addChild(procChildZookeeper);
	nodes.put(procZookeeper, procDataNode);
	procDataNode.addChild(quotaChildZookeeper);
	nodes.put(quotaZookeeper, quotaDataNode);
}
复制代码

3、加载数据

如果zkDb还没有被初始化,那就加载数据库,并设置为已初始化状态,然后清理一下过期Session。

public class ZooKeeperServer{

	public void loadData() throws IOException, InterruptedException {
		if(zkDb.isInitialized()){
			setZxid(zkDb.getDataTreeLastProcessedZxid());
		}
		else {
			setZxid(zkDb.loadDataBase());
		}
		//清理过期session
		LinkedList<Long> deadSessions = new LinkedList<Long>();
		for (Long session : zkDb.getSessions()) {
			if (zkDb.getSessionWithTimeOuts().get(session) == null) {
				deadSessions.add(session);
			}
		}
		zkDb.setDataTreeInit(true);
		for (long session : deadSessions) {
			killSession(session, zkDb.getDataTreeLastProcessedZxid());
		}
	}
}
复制代码

我们看 zkDb.loadDataBase() 方法。它将从磁盘文件中加载数据库。

public class ZKDatabase {

	//从磁盘文件中加载数据库,并返回最大事务ID
	public long loadDataBase() throws IOException {
        long zxid = snapLog.restore(dataTree, s
				essionsWithTimeouts, commitProposalPlaybackListener);
        initialized = true;
        return zxid;
    }
}
复制代码

既然是磁盘文件,那么肯定就是快照文件和事务日志文件。 snapLog.restore 将证实这一点。

public class FileTxnSnapLog {
	public long restore(DataTree dt, Map<Long, Integer> sessions, 
			PlayBackListener listener) throws IOException {	
		//从快照文件中加载数据
        snapLog.deserialize(dt, sessions);
		//从事务日志文件中加载数据
        long fastForwardFromEdits = fastForwardFromEdits(dt, sessions, listener);
        return fastForwardFromEdits;
    }
}
复制代码

加载数据的过程看起来比较复杂,但核心就一点:从文件流中读取数据,转换成DataTree对象,放入zkDb中。在这里,咱们先不看解析文件的过程,就看看文件里存放的到底是些啥?

快照文件

我们找到 org.apache.zookeeper.server.SnapshotFormatter ,它可以帮我们输出快照文件内容。在main方法中,设置一下快照文件的路径,然后运行它。

public class SnapshotFormatter {
	public static void main(String[] args) throws Exception {		
		//设置快照文件路径
		args = new String[1];
		args[0] = "E:\\zookeeper-data\\version-2\\snapshot.6";
		if (args.length != 1) {
			System.err.println("USAGE: SnapshotFormatter snapshot_file");
			System.exit(2);
		}
		new SnapshotFormatter().run(args[0]);
	}
}
复制代码

运行这个main方法,在控制台输出的就是快照文件内容。

ZNode Details (count=8):
----
/
  cZxid = 0x00000000000000
  ctime = Thu Jan 01 08:00:00 CST 1970
  mZxid = 0x00000000000000
  mtime = Thu Jan 01 08:00:00 CST 1970
  pZxid = 0x00000000000002
  cversion = 1
  dataVersion = 0
  aclVersion = 0
  ephemeralOwner = 0x00000000000000
  dataLength = 0
----
/zookeeper
  cZxid = 0x00000000000000
  ctime = Thu Jan 01 08:00:00 CST 1970
  mZxid = 0x00000000000000
  mtime = Thu Jan 01 08:00:00 CST 1970
  pZxid = 0x00000000000000
  cversion = 0
  dataVersion = 0
  aclVersion = 0
  ephemeralOwner = 0x00000000000000
  dataLength = 0
----
/zookeeper/quota
  cZxid = 0x00000000000000
  ctime = Thu Jan 01 08:00:00 CST 1970
  mZxid = 0x00000000000000
  mtime = Thu Jan 01 08:00:00 CST 1970
  pZxid = 0x00000000000000
  cversion = 0
  dataVersion = 0
  aclVersion = 0
  ephemeralOwner = 0x00000000000000
  dataLength = 0
----
/test
  cZxid = 0x00000000000002
  ctime = Sat Feb 23 19:57:43 CST 2019
  mZxid = 0x00000000000002
  mtime = Sat Feb 23 19:57:43 CST 2019
  pZxid = 0x00000000000005
  cversion = 3
  dataVersion = 0
  aclVersion = 0
  ephemeralOwner = 0x00000000000000
  dataLength = 4
----
/test/t1
  cZxid = 0x00000000000003
  ctime = Sat Feb 23 19:57:53 CST 2019
  mZxid = 0x00000000000003
  mtime = Sat Feb 23 19:57:53 CST 2019
  pZxid = 0x00000000000003
  cversion = 0
  dataVersion = 0
  aclVersion = 0
  ephemeralOwner = 0x00000000000000
  dataLength = 4
----
/test/t2
  cZxid = 0x00000000000004
  ctime = Sat Feb 23 19:57:56 CST 2019
  mZxid = 0x00000000000004
  mtime = Sat Feb 23 19:57:56 CST 2019
  pZxid = 0x00000000000004
  cversion = 0
  dataVersion = 0
  aclVersion = 0
  ephemeralOwner = 0x00000000000000
  dataLength = 4
----
/test/t3
  cZxid = 0x00000000000005
  ctime = Sat Feb 23 19:57:58 CST 2019
  mZxid = 0x00000000000005
  mtime = Sat Feb 23 19:57:58 CST 2019
  pZxid = 0x00000000000005
  cversion = 0
  dataVersion = 0
  aclVersion = 0
  ephemeralOwner = 0x00000000000000
  dataLength = 4
----
Session Details (sid, timeout, ephemeralCount):
0x10013d3939a0000, 99999, 0
0x10013d1adcb0000, 99999, 0
复制代码

我们可以看到,格式化后的快照文件内容,除了开头的count信息和结尾的Session信息,中间每一行就是一个DataNode对象。从节点名称可以推算出自己的父级节点和子节点,其它的就是此节点的统计信息对象StatPersisted。

事务日志文件

我们找到 org.apache.zookeeper.server.LogFormatter 这个类,在main方法中设置事务日志的文件路径,然后运行它。在zookeeper中的每一个事务操作,都会被记录下来。

19-2-23 下午07时57分32秒 session 0x10013d1adcb0000 cxid 0x0 zxid 0x1 createSession 99999

19-2-23 下午07时57分43秒 session 0x10013d1adcb0000 cxid 0x2 zxid 0x2 create '/test,#31323334,v{s{31,s{'world,'anyone}}},F,1

19-2-23 下午07时57分53秒 session 0x10013d1adcb0000 cxid 0x3 zxid 0x3 create '/test/t1,#31323334,v{s{31,s{'world,'anyone}}},F,1

19-2-23 下午07时57分56秒 session 0x10013d1adcb0000 cxid 0x4 zxid 0x4 create '/test/t2,#31323334,v{s{31,s{'world,'anyone}}},F,2

19-2-23 下午07时57分58秒 session 0x10013d1adcb0000 cxid 0x5 zxid 0x5 create '/test/t3,#31323334,v{s{31,s{'world,'anyone}}},F,3

19-2-23 下午07时58分51秒 session 0x10013d3939a0000 cxid 0x0 zxid 0x6 createSession 99999

19-2-23 下午07时59分07秒 session 0x10013d3939a0000 cxid 0x4 zxid 0x7 create '/test/t4,#31323334,v{s{31,s{'world,'anyone}}},F,4
复制代码

可以看到,每一个事务对应一行记录。包含操作时间、sessionId、事务ID、操作类型、节点名称和权限信息等。 需要注意的是,只有变更操作才会被记录到事务日志。所以,在这里我们看不到任何读取操作请求。

四、会话管理器

会话是Zookeeper中一个重要的抽象。保证请求有序、临时znode节点、监听点都和会话密切相关。Zookeeper服务器的一个重要任务就是跟踪并维护这些会话。

在zookeeper中,服务器要负责清理掉过期会话,而客户端要保持自己的活跃状态,只能依靠心跳信息或者一个新的读写请求。

而对于过期会话的管理,则依靠“分桶策略”来完成。具体情况是这样的:

  • 1、zookeeper会为每个会话设置一个过期时间,我们称它为nextExpirationTime
  • 2、将这个过期时间和相对应的Session集合放入Map中
  • 3、开启线程,不断轮训这个Map,取出当前过期点nextExpirationTime的Session集合,然后关闭它们
  • 4、未活跃的Session被关闭;正在活跃的Session会重新计算自己的过期时间,修改自己的过期时间nextExpirationTime,保证不会被线程扫描到

简而言之,还在活跃的Session依靠不断重置自己的nextExpirationTime时间,就不会被线程扫描到,继而被关闭。

接下来我们看调用到的 zks.startup(); 方法,具体是怎么做的。

public class ZooKeeperServer

	public synchronized void startup() {
		if (sessionTracker == null) {
			createSessionTracker();
		}
		startSessionTracker();
		setupRequestProcessors();
		registerJMX();
		setState(State.RUNNING);
		notifyAll();
	}
}
复制代码

我们只关注 createSessionTracker、startSessionTracker 两个方法,它们和会话相关。

1、创建会话跟踪器

创建会话跟踪器,这里是一个 SessionTrackerImpl 对象实例。

protected void createSessionTracker() {
	sessionTracker = new SessionTrackerImpl(this, zkDb.getSessionWithTimeOuts(),
			tickTime, 1, getZooKeeperServerListener());
}
复制代码

在构造方法里,做了一些参数初始化的工作。

public SessionTrackerImpl(SessionExpirer expirer,
		ConcurrentHashMap<Long, Integer> sessionsWithTimeout, int tickTime,
		long sid, ZooKeeperServerListener listener){
		
	super("SessionTracker", listener);
	this.expirer = expirer;
	this.expirationInterval = tickTime;
	this.sessionsWithTimeout = sessionsWithTimeout;
	nextExpirationTime = roundToInterval(Time.currentElapsedTime());
	this.nextSessionId = initializeNextSession(sid);
	for (Entry<Long, Integer> e : sessionsWithTimeout.entrySet()) {
		addSession(e.getKey(), e.getValue());
	}
}
复制代码

我们重点关注下一个过期时间 nextExpirationTime 是怎样计算出来的。我们来看 roundToInterval 方法。

private long roundToInterval(long time) {
	return (time / expirationInterval + 1) * expirationInterval;
}
复制代码

其中,time是基于当前时间的一个时间戳;expirationInterval是我们配置文件中的tickTime。如果我们假定 time=10,expirationInterval=2 ,那么上面计算后的下一个过期时间为 (10/2+1)*2=12

这也就是说,当前的Session会被分配到Id为12的分桶中。我们继续往下看这一过程。 在 addSession 方法中,先查询是否有会话Id的SessionImpl,没有则新建并存入。

synchronized public void addSession(long id, int sessionTimeout) {
	
	sessionsWithTimeout.put(id, sessionTimeout);
	//查询对应SessionId的Impl类
	if (sessionsById.get(id) == null) {
		SessionImpl s = new SessionImpl(id, sessionTimeout, 0);
		sessionsById.put(id, s);
	} else {
		if (LOG.isTraceEnabled()) {
			ZooTrace.logTraceMessage(LOG, ZooTrace.SESSION_TRACE_MASK,
					"SessionTrackerImpl --- Existing session 0x"
					+ Long.toHexString(id) + " " + sessionTimeout);
		}
	}
	touchSession(id, sessionTimeout);
}
复制代码

最后调用 touchSession 来激活会话。需要注意的是,zookeeper中的每个请求都会调用到此方法。它来计算活跃Session的下一个过期时间,并迁移到不同桶中。

我们一直在说“分桶”,或许难以理解“桶”到底是个什么东西。在代码中,它其实是个HashSet对象。

public class SessionTrackerImpl{
		
	//过期时间和对应Session集合的映射
	HashMap<Long, SessionSet> sessionSets = new HashMap<Long, SessionSet>();	
	//Session集合
	static class SessionSet {
        HashSet<SessionImpl> sessions = new HashSet<SessionImpl>();
    }
	
	synchronized public boolean touchSession(long sessionId, int timeout) {
	
		SessionImpl s = sessionsById.get(sessionId);
		//如果session被删除或者已经被标记为关闭状态
		if (s == null || s.isClosing()) {
			return false;
		}
		//计算下一个过期时间
		long expireTime = roundToInterval(Time.currentElapsedTime() + timeout);
		if (s.tickTime >= expireTime) {
			return true;
		}
		
		//获取Session当前的过期时间
		SessionSet set = sessionSets.get(s.tickTime);
		if (set != null) {
			//从集合中删除
			set.sessions.remove(s);
		}
		
		//设置新的过期时间并加入到Session集合中
		s.tickTime = expireTime;
		set = sessionSets.get(s.tickTime);
		if (set == null) {
			set = new SessionSet();
			sessionSets.put(expireTime, set);
		}
		set.sessions.add(s);
		return true;
	}
}
复制代码

我们回头看上面那个公式,如果第一次Session请求计算后的过期时间为12。 那么,对应Session的映射如下: 12=org.apache.zookeeper.server.SessionTrackerImpl$SessionSet@25143a5e 第二次请求,计算后的过期时间为15。就会变成: 15=org.apache.zookeeper.server.SessionTrackerImpl$SessionSet@3045314d

同时,过期时间为12的记录被删除。这样,通过过期时间的变更,不断迁移这个Session的位置。我们就会想到,如果由于网络原因或者客户端假死,请求长时间未能到达服务器,那么对应Session的过期时间就不会发生变化。 **时代在变化,你不变,就会被抛弃。**这句话,同样适用于zookeeper中的会话。

我们接着看 startSessionTracker();

protected void startSessionTracker() {
	((SessionTrackerImpl)sessionTracker).start();
}
复制代码

SessionTrackerImpl 继承自 ZooKeeperCriticalThread ,所以它本身也是线程类。调用start方法后开启线程,我们看run方法。

synchronized public void run() {
	try {
		while (running) {
			currentTime = Time.currentElapsedTime();
			if (nextExpirationTime > currentTime) {
				this.wait(nextExpirationTime - currentTime);
				continue;
			}	
			SessionSet set;
			//获取过期时间对应的Session集合
			set = sessionSets.remove(nextExpirationTime);
			//循环Session,关闭它们
			if (set != null) {
				for (SessionImpl s : set.sessions) {
					setSessionClosing(s.sessionId);
					expirer.expire(s);
				}
			}
			nextExpirationTime += expirationInterval;
		}
	} catch (InterruptedException e) {
		handleException(this.getName(), e);
	}
	LOG.info("SessionTrackerImpl exited loop!");
}
复制代码

这个方法通过死循环的方式,不断获取过期时间对应的Session集合。简直就是 发现一起,查处一起 。 这也就解释了为什么活跃Session必须要不断更改自己的过期时间,因为这里有人在监督。

最后就是注册了JMX,并设置服务器的运行状态。


以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

Letting Go of the Words

Letting Go of the Words

Janice (Ginny) Redish / Morgan Kaufmann / 2007-06-11 / USD 49.95

"Redish has done her homework and created a thorough overview of the issues in writing for the Web. Ironically, I must recommend that you read her every word so that you can find out why your customer......一起来看看 《Letting Go of the Words》 这本书的介绍吧!

XML、JSON 在线转换
XML、JSON 在线转换

在线XML、JSON转换工具

Markdown 在线编辑器
Markdown 在线编辑器

Markdown 在线编辑器

RGB HSV 转换
RGB HSV 转换

RGB HSV 互转工具