内容简介:紧跟上文的:代码如下:解锁就是通过zkClient的delete删除当前节点
紧跟上文的: 分布式锁实现(一):Redis ,这篇我们用Zookeeper来设计和实现分布式锁,并且研究下开源客户端工具Curator的分布式锁源码
设计实现
一、基本算法
1.在某父节点下创建临时有序节点 2.判断创建的节点是否是当前父节点下所有子节点中序号最小的 3.是序号最小的成功获取锁,否则监听比自己小的那个节点,进行watch,当该节点被删除的时候通知当前节点,重新获取锁 4.解锁的时候删除当前节点 复制代码
二、关键点
临时有序节点
实现Zookeeper分布式锁关键就在于其[临时有序节点]的特性,在Zookeeper中有四种节点 1.PERSISTENT 持久,若不手动删除就永久存在 2.PERSISTENT_SEQUENTIAL 持久有序节点,zookeeper会为节点编号(保证有序) 3.EPHEMERAL 临时,一个客户端会话断开后会自动删除 4.EPHEMERAL_SEQUENTIAL 临时有序节点,zookeeper会为节点编号(保证有序) 复制代码
监听
Zookeeper提供事件监听机制,通过对节点、节点数据、子节点都提供了监听,我们通过这种监听watcher机制实现锁的等待 复制代码
三、代码实现
我们基于ZkClient这个客户端来实现,当然也可以用原生Zookeeper API,大致是一样的 坐标如下: <dependency> <groupId>com.101tec</groupId> <artifactId>zkclient</artifactId> <version>0.2</version> </dependency> 复制代码
代码如下:
public class MyDistributedLock { private ZkClient zkClient; private String name; private String currentLockPath; private CountDownLatch countDownLatch; private static final String PARENT_LOCK_PATH = "/distribute_lock"; public MyDistributedLock(ZkClient zkClient, String name) { this.zkClient = zkClient; this.name = name; } //加锁 public void lock() { //判断父节点是否存在,不存在就创建 if (!zkClient.exists(PARENT_LOCK_PATH)) { try { //多个线程只会成功建立一次 zkClient.createPersistent(PARENT_LOCK_PATH); } catch (Exception ignored) { } } //创建当前目录下的临时有序节点 currentLockPath = zkClient.createEphemeralSequential(PARENT_LOCK_PATH + "/", System.currentTimeMillis()); //校验是否最小节点 checkMinNode(currentLockPath); } //解锁 public void unlock() { System.out.println("delete : " + currentLockPath); zkClient.delete(currentLockPath); } private boolean checkMinNode(String lockPath) { //获取当前目录下所有子节点 List<String> children = zkClient.getChildren(PARENT_LOCK_PATH); Collections.sort(children); int index = children.indexOf(lockPath.substring(PARENT_LOCK_PATH.length() + 1)); if (index == 0) { System.out.println(name + ":success"); if (countDownLatch != null) { countDownLatch.countDown(); } return true; } else { String waitPath = PARENT_LOCK_PATH + "/" + children.get(index - 1); //等待前一个节点释放的监听 waitForLock(waitPath); return false; } } private void waitForLock(String prev) { System.out.println(name + " current path :" + currentLockPath + ":fail add listener" + " wait path :" + prev); countDownLatch = new CountDownLatch(1); zkClient.subscribeDataChanges(prev, new IZkDataListener() { @Override public void handleDataChange(String s, Object o) throws Exception { } @Override public void handleDataDeleted(String s) throws Exception { System.out.println("prev node is done"); checkMinNode(currentLockPath); } }); if (!zkClient.exists(prev)) { return; } try { countDownLatch.await(); } catch (InterruptedException e) { e.printStackTrace(); } countDownLatch = null; } } 复制代码
加锁
-
zkClient.exists先判断父节点是否存在,不存在就创建,zookeeper可以保证只会创建成功一次
-
在当前目录下zkClient.createEphemeralSequential创建临时有序节点,再判断当前目录下此节点是否为序号最小的,如果是,成功获取锁,否则的话拿比自己小的节点,并做监听
-
waitForLock等待比自己小的节点,subscribeDataChanges监听一个节点的变化,handleDataDeleted里面再次做checkMinNode的判断
-
监听完毕后,再判断一次此节点是否存在,因为在监听的过程中有可能之前小的那个节点重新释放了锁,如果之前节点不存在的话,无需在这里等待,这里的等待是通过countDownLatch实现的
解锁
解锁就是通过zkClient的delete删除当前节点
测试用例
通过启动多个线程来测试lock、unlock的过程,查看是否有序
public class MyDistributedLockTest { public static void main(String[] args) { ZkClient zk = new ZkClient("127.0.0.1:2181", 5 * 10000); for (int i = 0; i < 20; i++) { String name = "thread" + i; Thread thread = new Thread(() -> { MyDistributedLock myDistributedLock = new MyDistributedLock(zk, name); myDistributedLock.lock(); // try { // Thread.sleep(1 * 1000); // } catch (InterruptedException e) { // e.printStackTrace(); // } myDistributedLock.unlock(); }); thread.start(); } } } 复制代码
执行结果如下,多线程情况下lock/unlock和监听一切正常:
thread1 current path :/distribute_lock2/0000000007:fail add listener wait path :/distribute_lock2/0000000006 thread6 current path :/distribute_lock2/0000000006:fail add listener wait path :/distribute_lock2/0000000005 thread3:success delete : /distribute_lock2/0000000000 thread2 current path :/distribute_lock2/0000000005:fail add listener wait path :/distribute_lock2/0000000004 thread7 current path :/distribute_lock2/0000000004:fail add listener wait path :/distribute_lock2/0000000003 thread9 current path :/distribute_lock2/0000000009:fail add listener wait path :/distribute_lock2/0000000008 thread5 current path :/distribute_lock2/0000000008:fail add listener wait path :/distribute_lock2/0000000007 thread0 current path :/distribute_lock2/0000000001:fail add listener wait path :/distribute_lock2/0000000000 thread8 current path :/distribute_lock2/0000000002:fail add listener wait path :/distribute_lock2/0000000001 thread4 current path :/distribute_lock2/0000000003:fail add listener wait path :/distribute_lock2/0000000002 delete : /distribute_lock2/0000000001 prev node is done thread8:success delete : /distribute_lock2/0000000002 prev node is done thread4:success delete : /distribute_lock2/0000000003 prev node is done thread7:success delete : /distribute_lock2/0000000004 prev node is done thread2:success delete : /distribute_lock2/0000000005 prev node is done thread6:success delete : /distribute_lock2/0000000006 prev node is done thread1:success delete : /distribute_lock2/0000000007 prev node is done thread5:success delete : /distribute_lock2/0000000008 prev node is done thread9:success delete : /distribute_lock2/0000000009 复制代码
Curator源码分析
一、基本使用
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3); CuratorFramework client = CuratorFrameworkFactory.newClient("127.0.0.1:2181", retryPolicy); client.start(); InterProcessMutex lock2 = new InterProcessMutex(client, "/test"); try { lock.acquire(); //业务 } catch (Exception e) { e.printStackTrace(); } finally { lock.release(); } 复制代码
-
CuratorFrameworkFactory.newClient获取zookeeper的客户端,retryPolicy指定重试策略,开启客户端
-
Curator本身提供了多种锁的实现,这里我们以InterProcessMutex可重入锁为例, lock.acquire()方法获取锁,lock.release()来释放锁,acquire方法也提供了重载的等待时间参数
二、源码分析
加锁
acquire内部就直接internalLock方法,传了-1的等待时间
public void acquire() throws Exception { if(!this.internalLock(-1L, (TimeUnit)null)) { throw new IOException("Lost connection while trying to acquire lock: " + this.basePath); } } 复制代码
internalLock方法首先判断是否是重入锁,通过ConcurrentMap维护线程和一个原子计数器,非重入锁的话,再通过attemptLock去获取锁
private boolean internalLock(long time, TimeUnit unit) throws Exception { /* Note on concurrency: a given lockData instance can be only acted on by a single thread so locking isn't necessary */ Thread currentThread = Thread.currentThread(); LockData lockData = threadData.get(currentThread); if ( lockData != null ) { // re-entering lockData.lockCount.incrementAndGet(); return true; } String lockPath = internals.attemptLock(time, unit, getLockNodeBytes()); if ( lockPath != null ) { LockData newLockData = new LockData(currentThread, lockPath); threadData.put(currentThread, newLockData); return true; } return false; } 复制代码
attemptLock在这里进行循环等待,createsTheLock方法去创建节点,internalLockLoop去判断当前节点是否是最小节点
String attemptLock(long time, TimeUnit unit, byte[] lockNodeBytes) throws Exception { final long startMillis = System.currentTimeMillis(); final Long millisToWait = (unit != null) ? unit.toMillis(time) : null; final byte[] localLockNodeBytes = (revocable.get() != null) ? new byte[0] : lockNodeBytes; int retryCount = 0; String ourPath = null; boolean hasTheLock = false; boolean isDone = false; while ( !isDone ) { isDone = true; try { ourPath = driver.createsTheLock(client, path, localLockNodeBytes); hasTheLock = internalLockLoop(startMillis, millisToWait, ourPath); } catch ( KeeperException.NoNodeException e ) { // gets thrown by StandardLockInternalsDriver when it can't find the lock node // this can happen when the session expires, etc. So, if the retry allows, just try it all again if ( client.getZookeeperClient().getRetryPolicy().allowRetry(retryCount++, System.currentTimeMillis() - startMillis, RetryLoop.getDefaultRetrySleeper()) ) { isDone = false; } else { throw e; } } } if ( hasTheLock ) { return ourPath; } return null; } 复制代码
createsTheLock就是调用curator封装的api去创建临时有序节点
public String createsTheLock(CuratorFramework client, String path, byte[] lockNodeBytes) throws Exception { String ourPath; if ( lockNodeBytes != null ) { ourPath = client.create().creatingParentContainersIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path, lockNodeBytes); } else { ourPath = client.create().creatingParentContainersIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path); } return ourPath; } 复制代码
internalLockLoop锁判断,内部就是driver.getsTheLock去判断是否是当前目录下最小节点,如果是的话,返回获取锁成功,否则的话对previousSequencePath进行监听,监听动作完成后再对等待时间进行重新判断
private boolean internalLockLoop(long startMillis, Long millisToWait, String ourPath) throws Exception { boolean haveTheLock = false; boolean doDelete = false; try { if ( revocable.get() != null ) { client.getData().usingWatcher(revocableWatcher).forPath(ourPath); } while ( (client.getState() == CuratorFrameworkState.STARTED) && !haveTheLock ) { List<String> children = getSortedChildren(); String sequenceNodeName = ourPath.substring(basePath.length() + 1); // +1 to include the slash PredicateResults predicateResults = driver.getsTheLock(client, children, sequenceNodeName, maxLeases); if ( predicateResults.getsTheLock() ) { haveTheLock = true; } else { String previousSequencePath = basePath + "/" + predicateResults.getPathToWatch(); synchronized(this) { try { // use getData() instead of exists() to avoid leaving unneeded watchers which is a type of resource leak client.getData().usingWatcher(watcher).forPath(previousSequencePath); if ( millisToWait != null ) { millisToWait -= (System.currentTimeMillis() - startMillis); startMillis = System.currentTimeMillis(); if ( millisToWait <= 0 ) { doDelete = true; // timed out - delete our node break; } wait(millisToWait); } else { wait(); } } catch ( KeeperException.NoNodeException e ) { // it has been deleted (i.e. lock released). Try to acquire again } } } } } catch ( Exception e ) { ThreadUtils.checkInterrupted(e); doDelete = true; throw e; } finally { if ( doDelete ) { deleteOurPath(ourPath); } } return haveTheLock; } 复制代码
解锁
release代码相对来说比较简单,就是先判断map里面是否存在当前线程的锁计数,不存在抛出异常,存在的话,进行原子减一操作,releaseLock内部就是删除节点操作,小于0的时候,从map里面移除
public void release() throws Exception { /* Note on concurrency: a given lockData instance can be only acted on by a single thread so locking isn't necessary */ Thread currentThread = Thread.currentThread(); LockData lockData = threadData.get(currentThread); if ( lockData == null ) { throw new IllegalMonitorStateException("You do not own the lock: " + basePath); } int newLockCount = lockData.lockCount.decrementAndGet(); if ( newLockCount > 0 ) { return; } if ( newLockCount < 0 ) { throw new IllegalMonitorStateException("Lock count has gone negative for lock: " + basePath); } try { internals.releaseLock(lockData.lockPath); } finally { threadData.remove(currentThread); } } 复制代码
后记
分布式锁的实现目前主流比较常用的实现就是 Redis 和Zookeeper了,相比较自己的实现,Redission和Curator的设计实现更为优秀,也更值得我们借鉴和学习 千里之行,积于跬步;万里之船,成于罗盘,共勉。 复制代码
以上所述就是小编给大家介绍的《分布式锁实现(二):Zookeeper》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!
猜你喜欢:- 【分布式锁】07-Zookeeper实现分布式锁:Semaphore、读写锁实现原理
- 原 荐 分布式锁与实现(二)基于ZooKeeper实现
- 分布式实现原理
- 实现分布式锁
- SOFAJRaft 实现原理:SOFAJRaft-RheaKV 分布式锁实现剖析
- RedLock 实现分布式锁
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
Effective JavaScript
赫尔曼 (David Herman) / 黄博文、喻杨 / 机械工业出版社 / 2014-1-1 / CNY 49.00
Effective 系列丛书经典著作,亚马逊五星级畅销书,Ecma 的JavaScript 标准化委员会著名专家撰写,JavaScript 语言之父、Mozilla CTO —— Brendan Eich 作序鼎力推荐!作者凭借多年标准化委员会工作和实践经验,深刻辨析JavaScript 的内部运作机制、特性、陷阱和编程最佳实践,将它们高度浓缩为极具实践指导意义的 68 条精华建议。 本书共......一起来看看 《Effective JavaScript》 这本书的介绍吧!