紧跟上文的: 分布式锁实现(一):Redis ,这篇我们用Zookeeper来设计和实现分布式锁,并且研究下开源客户端工具Curator的分布式锁源码
1.在某父节点下创建临时有序节点 2.判断创建的节点是否是当前父节点下所有子节点中序号最小的 3.是序号最小的成功获取锁,否则监听比自己小的那个节点,进行watch,当该节点被删除的时候通知当前节点,重新获取锁 4.解锁的时候删除当前节点
实现Zookeeper分布式锁关键就在于其[临时有序节点]的特性,在Zookeeper中有四种节点 1.PERSISTENT 持久,若不手动删除就永久存在 2.PERSISTENT_SEQUENTIAL 持久有序节点,zookeeper会为节点编号(保证有序) 3.EPHEMERAL 临时,一个客户端会话断开后会自动删除 4.EPHEMERAL_SEQUENTIAL 临时有序节点,zookeeper会为节点编号(保证有序)
Zookeeper提供事件监听机制,通过对节点、节点数据、子节点都提供了监听,我们通过这种监听watcher机制实现锁的等待
我们基于ZkClient这个客户端来实现,当然也可以用原生Zookeeper API,大致是一样的 坐标如下: <dependency> <groupId>com.101tec</groupId> <artifactId>zkclient</artifactId> <version>0.2</version> </dependency>
public class MyDistributedLock { private ZkClient zkClient; private String name; private String currentLockPath; private CountDownLatch countDownLatch; private static final String PARENT_LOCK_PATH = "/distribute_lock"; public MyDistributedLock(ZkClient zkClient, String name) { this.zkClient = zkClient; this.name = name; } //加锁 public void lock() { //判断父节点是否存在,不存在就创建 if (!zkClient.exists(PARENT_LOCK_PATH)) { try { //多个线程只会成功建立一次 zkClient.createPersistent(PARENT_LOCK_PATH); } catch (Exception ignored) { } } //创建当前目录下的临时有序节点 currentLockPath = zkClient.createEphemeralSequential(PARENT_LOCK_PATH + "/", System.currentTimeMillis()); //校验是否最小节点 checkMinNode(currentLockPath); } //解锁 public void unlock() { System.out.println("delete : " + currentLockPath); zkClient.delete(currentLockPath); } private boolean checkMinNode(String lockPath) { //获取当前目录下所有子节点 List<String> children = zkClient.getChildren(PARENT_LOCK_PATH); Collections.sort(children); int index = children.indexOf(lockPath.substring(PARENT_LOCK_PATH.length() + 1)); if (index == 0) { System.out.println(name + ":success"); if (countDownLatch != null) { countDownLatch.countDown(); } return true; } else { String waitPath = PARENT_LOCK_PATH + "/" + children.get(index - 1); //等待前一个节点释放的监听 waitForLock(waitPath); return false; } } private void waitForLock(String prev) { System.out.println(name + " current path :" + currentLockPath + ":fail add listener" + " wait path :" + prev); countDownLatch = new CountDownLatch(1); zkClient.subscribeDataChanges(prev, new IZkDataListener() { @Override public void handleDataChange(String s, Object o) throws Exception { } @Override public void handleDataDeleted(String s) throws Exception { System.out.println("prev node is done"); checkMinNode(currentLockPath); } }); if (!zkClient.exists(prev)) { return; } try { countDownLatch.await(); } catch (InterruptedException e) { e.printStackTrace(); } countDownLatch = null; } }
public class MyDistributedLockTest { public static void main(String[] args) { ZkClient zk = new ZkClient("", 5 * 10000); for (int i = 0; i < 20; i++) { String name = "thread" + i; Thread thread = new Thread(() -> { MyDistributedLock myDistributedLock = new MyDistributedLock(zk, name); myDistributedLock.lock(); // try { // Thread.sleep(1 * 1000); // } catch (InterruptedException e) { // e.printStackTrace(); // } myDistributedLock.unlock(); }); thread.start(); } } }
thread1 current path :/distribute_lock2/0000000007:fail add listener wait path :/distribute_lock2/0000000006 thread6 current path :/distribute_lock2/0000000006:fail add listener wait path :/distribute_lock2/0000000005 thread3:success delete : /distribute_lock2/0000000000 thread2 current path :/distribute_lock2/0000000005:fail add listener wait path :/distribute_lock2/0000000004 thread7 current path :/distribute_lock2/0000000004:fail add listener wait path :/distribute_lock2/0000000003 thread9 current path :/distribute_lock2/0000000009:fail add listener wait path :/distribute_lock2/0000000008 thread5 current path :/distribute_lock2/0000000008:fail add listener wait path :/distribute_lock2/0000000007 thread0 current path :/distribute_lock2/0000000001:fail add listener wait path :/distribute_lock2/0000000000 thread8 current path :/distribute_lock2/0000000002:fail add listener wait path :/distribute_lock2/0000000001 thread4 current path :/distribute_lock2/0000000003:fail add listener wait path :/distribute_lock2/0000000002 delete : /distribute_lock2/0000000001 prev node is done thread8:success delete : /distribute_lock2/0000000002 prev node is done thread4:success delete : /distribute_lock2/0000000003 prev node is done thread7:success delete : /distribute_lock2/0000000004 prev node is done thread2:success delete : /distribute_lock2/0000000005 prev node is done thread6:success delete : /distribute_lock2/0000000006 prev node is done thread1:success delete : /distribute_lock2/0000000007 prev node is done thread5:success delete : /distribute_lock2/0000000008 prev node is done thread9:success delete : /distribute_lock2/0000000009
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3); CuratorFramework client = CuratorFrameworkFactory.newClient("", retryPolicy); client.start(); InterProcessMutex lock2 = new InterProcessMutex(client, "/test"); try { lock.acquire(); //业务 } catch (Exception e) { e.printStackTrace(); } finally { lock.release(); }
Curator本身提供了多种锁的实现,这里我们以InterProcessMutex可重入锁为例, lock.acquire()方法获取锁,lock.release()来释放锁,acquire方法也提供了重载的等待时间参数
public void acquire() throws Exception { if(!this.internalLock(-1L, (TimeUnit)null)) { throw new IOException("Lost connection while trying to acquire lock: " + this.basePath); } }
private boolean internalLock(long time, TimeUnit unit) throws Exception { /* Note on concurrency: a given lockData instance can be only acted on by a single thread so locking isn't necessary */ Thread currentThread = Thread.currentThread(); LockData lockData = threadData.get(currentThread); if ( lockData != null ) { // re-entering lockData.lockCount.incrementAndGet(); return true; } String lockPath = internals.attemptLock(time, unit, getLockNodeBytes()); if ( lockPath != null ) { LockData newLockData = new LockData(currentThread, lockPath); threadData.put(currentThread, newLockData); return true; } return false; }
String attemptLock(long time, TimeUnit unit, byte[] lockNodeBytes) throws Exception { final long startMillis = System.currentTimeMillis(); final Long millisToWait = (unit != null) ? unit.toMillis(time) : null; final byte[] localLockNodeBytes = (revocable.get() != null) ? new byte[0] : lockNodeBytes; int retryCount = 0; String ourPath = null; boolean hasTheLock = false; boolean isDone = false; while ( !isDone ) { isDone = true; try { ourPath = driver.createsTheLock(client, path, localLockNodeBytes); hasTheLock = internalLockLoop(startMillis, millisToWait, ourPath); } catch ( KeeperException.NoNodeException e ) { // gets thrown by StandardLockInternalsDriver when it can't find the lock node // this can happen when the session expires, etc. So, if the retry allows, just try it all again if ( client.getZookeeperClient().getRetryPolicy().allowRetry(retryCount++, System.currentTimeMillis() - startMillis, RetryLoop.getDefaultRetrySleeper()) ) { isDone = false; } else { throw e; } } } if ( hasTheLock ) { return ourPath; } return null; }
public String createsTheLock(CuratorFramework client, String path, byte[] lockNodeBytes) throws Exception { String ourPath; if ( lockNodeBytes != null ) { ourPath = client.create().creatingParentContainersIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path, lockNodeBytes); } else { ourPath = client.create().creatingParentContainersIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path); } return ourPath; }
private boolean internalLockLoop(long startMillis, Long millisToWait, String ourPath) throws Exception { boolean haveTheLock = false; boolean doDelete = false; try { if ( revocable.get() != null ) { client.getData().usingWatcher(revocableWatcher).forPath(ourPath); } while ( (client.getState() == CuratorFrameworkState.STARTED) && !haveTheLock ) { List<String> children = getSortedChildren(); String sequenceNodeName = ourPath.substring(basePath.length() + 1); // +1 to include the slash PredicateResults predicateResults = driver.getsTheLock(client, children, sequenceNodeName, maxLeases); if ( predicateResults.getsTheLock() ) { haveTheLock = true; } else { String previousSequencePath = basePath + "/" + predicateResults.getPathToWatch(); synchronized(this) { try { // use getData() instead of exists() to avoid leaving unneeded watchers which is a type of resource leak client.getData().usingWatcher(watcher).forPath(previousSequencePath); if ( millisToWait != null ) { millisToWait -= (System.currentTimeMillis() - startMillis); startMillis = System.currentTimeMillis(); if ( millisToWait <= 0 ) { doDelete = true; // timed out - delete our node break; } wait(millisToWait); } else { wait(); } } catch ( KeeperException.NoNodeException e ) { // it has been deleted (i.e. lock released). Try to acquire again } } } } } catch ( Exception e ) { ThreadUtils.checkInterrupted(e); doDelete = true; throw e; } finally { if ( doDelete ) { deleteOurPath(ourPath); } } return haveTheLock; }
public void release() throws Exception { /* Note on concurrency: a given lockData instance can be only acted on by a single thread so locking isn't necessary */ Thread currentThread = Thread.currentThread(); LockData lockData = threadData.get(currentThread); if ( lockData == null ) { throw new IllegalMonitorStateException("You do not own the lock: " + basePath); } int newLockCount = lockData.lockCount.decrementAndGet(); if ( newLockCount > 0 ) { return; } if ( newLockCount < 0 ) { throw new IllegalMonitorStateException("Lock count has gone negative for lock: " + basePath); } try { internals.releaseLock(lockData.lockPath); } finally { threadData.remove(currentThread); } }
分布式锁的实现目前主流比较常用的实现就是 Redis 和Zookeeper了,相比较自己的实现,Redission和Curator的设计实现更为优秀,也更值得我们借鉴和学习 千里之行,积于跬步;万里之船,成于罗盘,共勉。
