跟着小白学zookeeper: 分布式锁的实现

栏目: 服务器 · 发布时间: 6年前

内容简介:最近小白在做一个系统功能时,发现有个方法是需要做同步的,but,生产环境中项目的部署是多个tomcat做集群的,而简单的使用synchronized加锁只是针对同一个JVM进程中的多线程实现同步,对于跨进程的同步无法达到统一加锁的目的。于是,小白便想到了分布式锁。前段时间刚好看到一幅有意思的漫画,其中就提到Zookeeper被设计的初衷,就是利用临时顺序节点,可以轻松实现分布式锁,便研究了下利用zk实现分布式锁。本文只研究了zk的基本特性以及使用java实现一个简单的分布式锁,如有错误,欢迎拍砖,另外稍微

最近小白在做一个系统功能时,发现有个方法是需要做同步的,but,生产环境中项目的部署是多个tomcat做集群的,而简单的使用synchronized加锁只是针对同一个JVM进程中的多线程实现同步,对于跨进程的同步无法达到统一加锁的目的。于是,小白便想到了分布式锁。前段时间刚好看到一幅有意思的漫画,其中就提到Zookeeper被设计的初衷,就是利用临时顺序节点,可以轻松实现分布式锁,便研究了下利用zk实现分布式锁。本文只研究了zk的基本特性以及使用 java 实现一个简单的分布式锁,如有错误,欢迎拍砖,另外稍微白话,不喜勿喷。

假设背景

假设小白的系统生产环境上部署了2台tomcat(t1 和 t2),而同一时间用户A、B的请求刚好分别由t1和t2进行响应处理,用户A、B的请求都需要调用方法m作相关处理(对共享数据的处理),为了保证数据的准确性,小白希望一个时间点只有一个线程可以执行方法m,也就是说t1中有线程执行m时,t1、t2的其他线程都不能执行m,直至那个线程对m调用结束。

思考方案

单机环境下如何实现同步的?可以使用synchronized或是ReentrantLock实现,究其原理也是存在一个锁标志变量,线程每次要执行同步代码时先去查看该标志是否已经被其他线程占有,若是则阻塞等待其他线程释放锁,若不是则设置标志后执行(此处只是简单描述,具体原理博大精深)。

为何跨进程就不行了呢?因为同一个进程内,锁是所有这个进程内所有线程都可以访问的,但是其他进程中的线程时访问不了的。OK,那只要提供一个所有进程内线程都可见的锁标志,问题就解决咯。so,zookeeper就可以充当第三方进程,对需要管理的进程开放访问权限,所有需要跨进程同步的代码在被执行前,都需要先来我大zk这里查看是否可以执行。

一、动手前多问几个问题

为什么zookeeper可以实现分布式锁?

多个进程内同一时间都有线程在执行方法m,锁就一把,你获得了锁得以执行,我就得被阻塞,那你执行完了谁来唤醒我呢?你并不知道我被阻塞了,你也就不能通知我“嗨,小白,我用完了,你用吧”。你能做的只有用的时候设置锁标志,用完了再取消你设置的标志。我就必须在阻塞的时候隔一段时间主动去看看,但这样总归是有点麻烦的,最好有人来通知我可以执行了。zookeeper对于自身节点的监听者提供事件通知功能,是不是有点雪中送炭的感觉呢。

节点是什么?节点是zookeeper中数据存储的基础结构,zk中万物皆节点,就好比java中万物皆对象是一样的。zk的数据模型就是基于好多个节点的树结构,但zk规定每个节点的引用规则是 路径引用 。每个节点中包含子节点引用、存储数据、访问权限以及节点元数据等四部分。

zk中节点有类型区分吗?有。zk中提供了四种类型的节点,各种类型节点及其区别如下:

  • 持久节点(PERSISTENT):节点创建后,就一直存在,直到有删除操作来主动清除这个节点
  • 持久顺序节点(PERSISTENT_SEQUENTIAL):保留持久节点的特性,额外的特性是,每个节点会为其第一层子节点维护一个顺序,记录每个子节点创建的先后顺序,ZK会自动为给定节点名加上一个数字后缀(自增的),作为新的节点名。
  • 临时节点(EPHEMERAL):和持久节点不同的是,临时节点的生命周期和客户端会话绑定,当然也可以主动删除。
  • 临时顺序节点(EPHEMERAL_SEQUENTIAL):保留临时节点的特性,额外的特性如持久顺序节点的额外特性。

如何操作节点?节点的增删改查分别是creat\delete\setData\getData,exists判断节点是否存在,getChildren获取所有子节点的引用。

上面提到了节点的监听者,我们可以在对zk的节点进行查询操作时,设置当前线程是否监听所查询的节点。getData、getChildren、exists都属于对节点的查询操作,这些方法都有一个boolean类型的watch参数,用来设置是否监听该节点。一旦某个线程监听了某个节点,那么这个节点发生的creat(在该节点下新建子节点)、setData、delete(删除节点本身或是删除其某个子节点)都会触发zk去通知监听该节点的线程。但需要注意的是,线程对节点设置的监听是一次性的,也就是说zk通知监听线程后需要改线程再次设置监听节点,否则该节点再次的修改zk不会再次通知。

zookeeper具备了实现分布式锁的基础条件:多进程共享、可以存储锁信息、有主动通知的机制。

怎么使用zookeeper实现分布式锁呢?

分布式锁也是锁,没什么牛的,它也需要一个名字来告诉别人自己管理的是哪块同步资源,也同样需要一个标识告诉别人自己现在是空闲还是被使用。zk中,需要创建一个专门的放锁的节点,然后各种锁节点都作为该节点的子节点方便管理,节点名称用来表明自己管理的同步资源。那么锁标识呢?

方案一:使用节点中的存储数据区域,zk中节点存储数据的大小不能超过1M,但是只是存放一个标识是足够的。线程获得锁时,先检查该标识是否是无锁标识,若是可修改为占用标识,使用完再恢复为无锁标识。

方案二:使用子节点,每当有线程来请求锁的时候,便在锁的节点下创建一个子节点,子节点类型必须维护一个顺序,对子节点的自增序号进行排序,默认总是最小的子节点对应的线程获得锁,释放锁时删除对应子节点便可。

死锁风险

两种方案其实都是可行的,但是使用锁的时候一定要去规避死锁。方案一看上去是没问题的,用的时候设置标识,用完清除标识,但是要是持有锁的线程发生了意外,释放锁的代码无法执行,锁就无法释放,其他线程就会一直等待锁,相关同步代码便无法执行。方案二也存在这个问题,但方案二可以利用zk的临时顺序节点来解决这个问题,只要线程发生了异常导致程序中断,就会丢失与zk的连接,zk检测到该链接断开,就会自动删除该链接创建的临时节点,这样就可以达到即使占用锁的线程程序发生意外,也能保证锁正常释放的目的。

那要是zk挂了怎么办?sad,zk要是挂了就没辙了,因为线程都无法链接到zk,更何谈获取锁执行同步代码呢。不过,一般部署的时候,为了保证zk的高可用,都会使用多个zk部署为集群,集群内部一主多从,主zk一旦挂掉,会立刻通过选举机制有新的主zk补上。zk集群挂了怎么办?不好意思,除非所有zk同时挂掉,zk集群才会挂,概率超级小。

二、开始动手搞一搞

要什么东西

  1. 需要一个锁对象,每次创建这个锁对象的时候需要连接zk(也可将连接操作放在加锁的时候);
  2. 锁对象需要提供一个加锁的方法;
  3. 锁对象需要提供一个释放锁的方法;
  4. 锁对象需要监听zk节点,提供接收zk通知的回调方法。

实现分析

  1. 构造器中,创建zk连接,创建锁的根节点,相关API如下:
    public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher)

    创建zk连接。该构造器要求传入三个参数分别是:ip:端口(String)、会话超时时间、本次连接的监听器。

    public String create(String path, byte[] data, List<ACL> acl, CreateMode createMode) 创建节点。参数:节点路径、节点数据、权限策略、节点类型

  2. 加锁 时,首先需要在锁的根节点下 创建一个临时顺序节点 (该节点名称规则统一,由zk拼接自增序号),然后获取根节点下所有子节点,将节点根据自增序号进行排序,判断最小的节点是否为本次加锁创建的节点,若是,加锁成功,若否,阻塞当前线程,等待锁释放(阻塞线程可以使用)。相关API如下:

    public List<String> getChildren(String path, boolean watch)
    获取某节点的所有子节点。参数:节点路径、是否监控该节点

  3. 释放锁 时, 删除线程创建的子节点 ,同时关闭zk连接。相关API如下:
    public void delete(String path, int version)

    删除指定节点。参数:节点路径、数据版本号

    public synchronized void close()

    断开zk链接

  4. 监听节点 。首先需要明确监听哪个节点,我们可以监听锁的根节点,这样每当有线程释放锁删除对应子节点时,zk就会通知监听线程,有锁被释放了,这个时候只需要获取根节点的所有子节点,根据自增序号判断自己对应的节点是否为最小,便可知道自己能否获取锁。但是上述做法很明显有一点不太好,只要有子节点被移除,zk就会重新通知所有等待锁的线程。获得不到锁的线程接收到通知后发现自己还需等待,又得重新设置监听再次等待。由于我们要采用临时 有序 节点,该类型节点的特性就是有序,那么就可以只监听上一个节点,也就是等待被移除的节点,这样可以保证接到通知时,就是对应子节点时最小,可以获得锁的时候。在实现分布式锁的时候,线程加锁时如果不能立马获得锁,便会被通过特定方式阻塞,那么既然 接到通知 时便是 可以获得锁 的时候,那么对应的操作就应该是 恢复线程 的执行, 取消阻塞

    zk提供了Watcher接口,锁对象需要监听zk中上一个节点,便需要实现该接口。Watcher接口内部包含封装了事件类型和连接类型的Event接口,还提供了唯一一个需要实现的方法。

    void process(WatchedEvent var1)

    该方法便是用来接收zk通知的回调方法。参数为监听节点发生的事件。当监听器监听的节点发生变化时,zk会通知监听者,同时该方法被执行,参数便是zk通知的信息。

开写代码

虽然是一个简单的分布式锁的实现,代码也有点略长。建议跟小白一样从零开始了解分布式锁实现的朋友,先从上面的大步骤分析简单思考下每个方法内部的具体实现再看代码,印象更为深刻,理解也更容易。如有不同思路,欢迎留言讨论。代码中判断加锁的方法中,使用分隔符字符串是为了区分各个资源的锁。项目中有临界资源A和B,那么管理A的锁释放与否,跟线程要持有管理B的锁是没有关系的。当然,也可以每一类锁单独建立独立的根节点。

public class ZooKeeperLock implements Watcher {

    private ZooKeeper zk = null;
    private String rootLockNode;            // 锁的根节点
    private String lockName;                // 竞争资源,用来生成子节点名称
    private String currentLock;             // 当前锁
    private String waitLock;                // 等待的锁(前一个锁)
    private CountDownLatch countDownLatch;  // 计数器(用来在加锁失败时阻塞加锁线程)
    private int sessionTimeout = 30000;     // 超时时间
    
    // 1. 构造器中创建ZK链接,创建锁的根节点
    public ZooKeeperLock(String zkAddress, String rootLockNode, String lockName) {
        this.rootLockNode = rootLockNode;
        this.lockName = lockName;
        try {
            // 创建连接,zkAddress格式为:IP:PORT
            zk = new ZooKeeper(zkAddress,this.sessionTimeout,this);
            // 检测锁的根节点是否存在,不存在则创建
            Stat stat = zk.exists(rootLockNode,false);
            if (null == stat) {
                zk.create(rootLockNode, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
            }
        } catch (IOException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (KeeperException e) {
            e.printStackTrace();
        }
    }
    
    // 2. 加锁方法,先尝试加锁,不能加锁则等待上一个锁的释放
    public boolean lock() {
        if (this.tryLock()) {
            System.out.println("线程【" + Thread.currentThread().getName() + "】加锁(" + this.currentLock + ")成功!");
            return true;
        } else {
            return waitOtherLock(this.waitLock, this.sessionTimeout);
        }
    }
    
    public boolean tryLock() {
        // 分隔符
        String split = "_lock_";
        if (this.lockName.contains("_lock_")) {
            throw new RuntimeException("lockName can't contains '_lock_' ");
        }
        try {
            // 创建锁节点(临时有序节点)
            this.currentLock = zk.create(this.rootLockNode + "/" + this.lockName + split, new byte[0],
                    ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
            System.out.println("线程【" + Thread.currentThread().getName() 
                        + "】创建锁节点(" + this.currentLock + ")成功,开始竞争...");
            // 取所有子节点
            List<String> nodes = zk.getChildren(this.rootLockNode, false);
            // 取所有竞争lockName的锁
            List<String> lockNodes = new ArrayList<String>();
            for (String nodeName : nodes) {
                if (nodeName.split(split)[0].equals(this.lockName)) {
                    lockNodes.add(nodeName);
                }
            }
            Collections.sort(lockNodes);
            // 取最小节点与当前锁节点比对加锁
            String currentLockPath = this.rootLockNode + "/" + lockNodes.get(0);
            if (this.currentLock.equals(currentLockPath)) {
                return true;
            }
            // 加锁失败,设置前一节点为等待锁节点
            String currentLockNode = this.currentLock.substring(this.currentLock.lastIndexOf("/") + 1);
            int preNodeIndex = Collections.binarySearch(lockNodes, currentLockNode) - 1;
            this.waitLock = lockNodes.get(preNodeIndex);
        } catch (KeeperException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return false;
    }

    private boolean waitOtherLock(String waitLock, int sessionTimeout) {
        boolean islock = false;
        try {
            // 监听等待锁节点
            String waitLockNode = this.rootLockNode + "/" + waitLock;
            Stat stat = zk.exists(waitLockNode,true);
            if (null != stat) {
                System.out.println("线程【" + Thread.currentThread().getName() 
                            + "】锁(" + this.currentLock + ")加锁失败,等待锁(" + waitLockNode + ")释放...");
                // 设置计数器,使用计数器阻塞线程
                this.countDownLatch = new CountDownLatch(1);
                islock = this.countDownLatch.await(sessionTimeout,TimeUnit.MILLISECONDS);
                this.countDownLatch = null;
                if (islock) {
                    System.out.println("线程【" + Thread.currentThread().getName() + "】锁(" 
                                + this.currentLock + ")加锁成功,锁(" + waitLockNode + ")已经释放");
                } else {
                    System.out.println("线程【" + Thread.currentThread().getName() + "】锁(" 
                                + this.currentLock + ")加锁失败...");
                }
            }
        } catch (KeeperException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return islock;
    }
    
    // 3. 释放锁
    public void unlock() throws InterruptedException {
        try {
            Stat stat = zk.exists(this.currentLock,false);
            if (null != stat) {
                System.out.println("线程【" + Thread.currentThread().getName() + "】释放锁 " + this.currentLock);
                zk.delete(this.currentLock, -1);
                this.currentLock = null;
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (KeeperException e) {
            e.printStackTrace();
        } finally {
            zk.close();
        }
    }
    
    // 4. 监听器回调
    @Override
    public void process(WatchedEvent watchedEvent) {
        if (null != this.countDownLatch && watchedEvent.getType() == Event.EventType.NodeDeleted) {
            // 计数器减一,恢复线程操作
            this.countDownLatch.countDown();
        }
    }
}
复制代码

测试类如下:

public class Test {
    public static void doSomething() {
        System.out.println("线程【" + Thread.currentThread().getName() + "】正在运行...");
    }

    public static void main(String[] args) {
        Runnable runnable = new Runnable() {
            public void run() {
                ZooKeeperLock lock = null;
                lock = new ZooKeeperLock("10.150.27.51:2181","/locks", "test1");
                if (lock.lock()) {
                    doSomething();
                    try {
                        Thread.sleep(1000);
                        lock.unlock();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        };

        for (int i = 0; i < 5; i++) {
            Thread t = new Thread(runnable);
            t.start();
        }
    }
}
复制代码

这里启动了5个线程来进行验证,输出结果如下。需要注意的是,子节点的创建顺序一定是从小到大的,但是下面输出结果中显示创建顺序的随机是由于创建节点和输出语句不是原子操作导致的。重点是锁的获取和释放,从输出结果中可以看出,每个线程只有在上一个节点被删除后才能执行。ok,一个基于zk的简单的分布式锁就实现了。

线程【Thread-3】创建锁节点(/locks/test1_lock_0000000238)成功,开始竞争...
线程【Thread-2】创建锁节点(/locks/test1_lock_0000000237)成功,开始竞争...
线程【Thread-1】创建锁节点(/locks/test1_lock_0000000236)成功,开始竞争...
线程【Thread-0】创建锁节点(/locks/test1_lock_0000000240)成功,开始竞争...
线程【Thread-4】创建锁节点(/locks/test1_lock_0000000239)成功,开始竞争...
线程【Thread-1】加锁(/locks/test1_lock_0000000236)成功!
线程【Thread-1】正在运行...
线程【Thread-3】锁(/locks/test1_lock_0000000238)加锁失败,等待锁(/locks/test1_lock_0000000237)释放...
线程【Thread-2】锁(/locks/test1_lock_0000000237)加锁失败,等待锁(/locks/test1_lock_0000000236)释放...
线程【Thread-0】锁(/locks/test1_lock_0000000240)加锁失败,等待锁(/locks/test1_lock_0000000239)释放...
线程【Thread-4】锁(/locks/test1_lock_0000000239)加锁失败,等待锁(/locks/test1_lock_0000000238)释放...
线程【Thread-1】释放锁 /locks/test1_lock_0000000236
线程【Thread-2】锁(/locks/test1_lock_0000000237)加锁成功,锁(/locks/test1_lock_0000000236)已经释放
线程【Thread-2】正在运行...
线程【Thread-2】释放锁 /locks/test1_lock_0000000237
线程【Thread-3】锁(/locks/test1_lock_0000000238)加锁成功,锁(/locks/test1_lock_0000000237)已经释放
线程【Thread-3】正在运行...
线程【Thread-3】释放锁 /locks/test1_lock_0000000238
线程【Thread-4】锁(/locks/test1_lock_0000000239)加锁成功,锁(/locks/test1_lock_0000000238)已经释放
线程【Thread-4】正在运行...
线程【Thread-4】释放锁 /locks/test1_lock_0000000239
线程【Thread-0】锁(/locks/test1_lock_0000000240)加锁成功,锁(/locks/test1_lock_0000000239)已经释放
线程【Thread-0】正在运行...
线程【Thread-0】释放锁 /locks/test1_lock_0000000240
复制代码

三、别人造好的轮子

话说zookeeper红火了这么久,就没有几个牛逼的人物去开源一些好用的工具,还需要自己这么费劲去写分布式锁的实现?是的,有的,上面小白也只是为了加深自己对zk实现分布式锁的理解去尝试做一个简单实现。有个叫Jordan Zimmerman的牛人提供了Curator来更好地操作zookeeper。

curator的分布式锁

curator提供了四种分布式锁,分别是:

跟着小白学zookeeper: 分布式锁的实现
  • InterProcessMutex:分布式可重入排它锁
  • InterProcessSemaphoreMutex:分布式排它锁
  • InterProcessReadWriteLock:分布式读写锁
  • InterProcessMultiLock:将多个锁作为单个实体管理的容器

pom依赖:

<dependency>
      <groupId>org.apache.curator</groupId>
      <artifactId>curator-framework</artifactId>
      <version>4.0.0</version>
    </dependency>
    <dependency>
      <groupId>org.apache.curator</groupId>
      <artifactId>curator-recipes</artifactId>
      <version>4.0.0</version>
    </dependency>
复制代码

这里使用InterProcessMutex,即分布式可重入排他锁,用法如下:

// 设置重试策略,创建zk客户端
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000,3);
CuratorFramework client = CuratorFrameworkFactory.newClient("10.150.27.51:2181",retryPolicy);
// 启动客户端
client.start();
// 创建分布式可重入排他锁,监听客户端为client,锁的根节点为/locks
InterProcessMutex mutex = new InterProcessMutex(client,"/locks");
try {
    // 加锁
    if (mutex.acquire(3,TimeUnit.SECONDS)) {
        // TODO-同步操作
        //释放锁
        mutex.release();
    }
} catch (Exception e) {
    e.printStackTrace();
} finally {
    client.close();
}
复制代码

InterProcessMutex源码解读

InterProcessMutex 改造器较多,这里就不展示改造器源码了,建议感兴趣的朋友自己看看。 InterProcessMutex 内部有个 ConcurrentMap 类型的 threadData 属性,该属性会以线程对象为键,线程对应的 LcokData 对象为值,记录每个锁的相关信息。在new一个 InterProcessMutex 实例时,其构造器主要是为 threadData 进行 Map 初始化,校验锁的根节点的合法性并使用 basePath 属性记录,此外还会实例化一个 LockInternals 对象由属性 internals 引用, LockInternalsInterProcessMutex 加锁的核心。

加锁

// InterProcessMutex.class
    public void acquire() throws Exception {
        if (!this.internalLock(-1L, (TimeUnit)null)) {
            throw new IOException("Lost connection while trying to acquire lock: " + this.basePath);
        }
    }
    
    public boolean acquire(long time, TimeUnit unit) throws Exception {
        return this.internalLock(time, unit);
    }
    
    private boolean internalLock(long time, TimeUnit unit) throws Exception {
        Thread currentThread = Thread.currentThread();
        InterProcessMutex.LockData lockData = (InterProcessMutex.LockData)this.threadData.get(currentThread);
        if (lockData != null) {
            // 锁的可重入性
            lockData.lockCount.incrementAndGet();
            return true;
        } else {
            // 加锁并返回锁节点
            String lockPath = this.internals.attemptLock(time, unit, this.getLockNodeBytes());
            if (lockPath != null) {
                InterProcessMutex.LockData newLockData = new InterProcessMutex.LockData(currentThread, lockPath);
                this.threadData.put(currentThread, newLockData);
                return true;
            } else {
                return false;
            }
        }
    }
复制代码

加锁提供了两个接口,分别为不设置超时和设置超时。不设置超时的话,线程等待锁时会一直阻塞,直到获取到锁。不管哪个加锁接口,都调用了 internalLock() 方法。这个方法里的代码体现了锁的可重入性。 InterProcessMutex 会直接从 threadData 中根据当前线程获取其 LockData ,若LockData不为空,则意味着当前线程拥有此,在锁的次数上加一就直接返回true。若为空,则通过 internals 属性的 attemptLock() 方法去竞争锁,该方法返回一个锁对应节点的路径。若该路径不为空,代表当前线程获得到了锁,然后为当前线程创建对应的 LcokData 并记录进 threadData 中。

竞争锁

// LockInternals.class
    String attemptLock(long time, TimeUnit unit, byte[] lockNodeBytes) throws Exception {
        long startMillis = System.currentTimeMillis();
        Long millisToWait = unit != null ? unit.toMillis(time) : null;
        byte[] localLockNodeBytes = this.revocable.get() != null ? new byte[0] : lockNodeBytes;
        int retryCount = 0;
        String ourPath = null;
        boolean hasTheLock = false;
        boolean isDone = false;

        while(!isDone) {
            isDone = true;
            try {
                // 创建锁节点
                ourPath = this.driver.createsTheLock(this.client, this.path, localLockNodeBytes);
                // 竞争锁
                hasTheLock = this.internalLockLoop(startMillis, millisToWait, ourPath);
            } catch (NoNodeException var14) {
                if (!this.client.getZookeeperClient().getRetryPolicy().allowRetry(retryCount++,  
                        System.currentTimeMillis() - startMillis, RetryLoop.getDefaultRetrySleeper())) {
                    throw var14;
                }

                isDone = false;
            }
        }

        return hasTheLock ? ourPath : null;
    }
复制代码

一看这个方法,一大堆的变量定义,全部先忽略掉。最终的返回值由 hasTheLock 决定,为true时返回 ourPathourPath 初始化为null,后经 this.driver.createsTheLock(this.client, this.path, localLockNodeBytes) 赋值,这个方法点击去可看到默认的锁驱动类的创建锁节点方法,可知这里只是创建了锁节点。再看 hasTheLock ,为 internalLockLoop() 方法的返回值,只有该方法返回true时, attemptLock() 才会返回锁节点路径,才会加锁成功。那OK,锁的竞争实现是由 internalLockLoop 进行。上面循环中的异常捕捉中是根据客户端的重试策略进行重试。

// LockInternals.class
    private boolean internalLockLoop(long startMillis, Long millisToWait, String ourPath) throws Exception {
        boolean haveTheLock = false;
        boolean doDelete = false;

        try {
            if (this.revocable.get() != null) {
                ((BackgroundPathable)this.client.getData().usingWatcher(this.revocableWatcher)).forPath(ourPath);
            }
            while(this.client.getState() == CuratorFrameworkState.STARTED && !haveTheLock) {
                // 获取所有子节点
                List<String> children = this.getSortedChildren();
                // 获取当前锁节点
                String sequenceNodeName = ourPath.substring(this.basePath.length() + 1);
                // 使用锁驱动加锁
                PredicateResults predicateResults = this.driver.getsTheLock(this.client, children, 
                            sequenceNodeName, this.maxLeases);
                if (predicateResults.getsTheLock()) {
                    haveTheLock = true;
                } else {
                    // 阻塞等待上一个锁释放
                    String previousSequencePath = this.basePath + "/" + predicateResults.getPathToWatch();
                    synchronized(this) {
                        try {
                            ((BackgroundPathable)this.client.getData().usingWatcher(this.watcher)).forPath(previousSequencePath);
                            if (millisToWait == null) {
                                // 未设置超时一直阻塞
                                this.wait();
                            } else {
                                millisToWait = millisToWait - (System.currentTimeMillis() - startMillis);
                                startMillis = System.currentTimeMillis();
                                // 根据时间设置阻塞时间
                                if (millisToWait > 0L) {
                                    this.wait(millisToWait);
                                } else {
                                    // 已经超时,设置删除节点标识
                                    doDelete = true;
                                    break;
                                }
                            }
                        } catch (NoNodeException var19) {
                            ;
                        }
                    }
                }
            }
        } catch (Exception var21) {
            ThreadUtils.checkInterrupted(var21);
            doDelete = true;
            throw var21;
        } finally {
            if (doDelete) {
                // 删除已超时的锁节点
                this.deleteOurPath(ourPath);
            }
        }
        return haveTheLock;
    }
复制代码

好吧,又是一大堆代码。还是先挑着看,返回值是 haveTheLock ,布尔型,看名字就知道这个变量代表竞争锁的成功与否。该变量的赋值发生在循环内,ok,看循环。先是获取所有子节点以及当前节点名称,再由驱动类进行锁竞争,竞争结果封装在 PredicateResults 类中,该类中包含一个布尔型的结果标识 getsTheLock 和一个监听节点路径 pathToWatch 。最后根据所竞争结果决定是否阻塞线程等待监听锁节点的释放。需要注意的是,这里阻塞使用的是对象的wait()机制,同时根据是否设置超时时间,是否已经超时决定线程阻塞时间或是删除超时节点。but,锁竞争的具体实现还是不在这里,这里只是有详细的锁等待实现。 Curator 默认的锁驱动类是 StandardLockInternalsDriver

// StandardLockInternalsDriver.class
    public PredicateResults getsTheLock(CuratorFramework client, List<String> children, String sequenceNodeName, 
            int maxLeases) throws Exception {
        int ourIndex = children.indexOf(sequenceNodeName);
        validateOurIndex(sequenceNodeName, ourIndex);
        boolean getsTheLock = ourIndex < maxLeases;
        String pathToWatch = getsTheLock ? null : (String)children.get(ourIndex - maxLeases);
        return new PredicateResults(pathToWatch, getsTheLock);
    }
复制代码

首先获取所有子节点中当前节点所在的位置索引,然后校验该索引,内部实现为判断是否小于0,成立则抛出一个NoNodeException。那肯定不是0啦。最终能否获得锁取决于该位置索引是否为0,也就是当前节点是否最小(maxLeases在InterProcessMutex构造器中初始化LockInternals设定的是1)。

总结

本文基于ZK实现分布式锁的思路、实现以及Curator的分布式可重入排他锁的原理剖析,算是小白研究ZK实现分布式锁的所有收获了。个人觉的关键点还是在于以下几点:

  • 利用临时节点避免客户端程序异常导致的死锁;
  • 利用有序节点设定锁的获取规则;
  • 利用进程内的线程同步机制实现跨进程的分布式锁等待。

嗯,应该就这些了,要是小白有哪里遗漏的,后续再补。


以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

Ruby on Rails敏捷开发最佳实践

Ruby on Rails敏捷开发最佳实践

李刚 / 电子工业出版社 / 2008-4 / 79.80元

《Ruby on Rails敏捷开发最佳实践》适用于正在使用Ruby On Rails进行应用开发的开发人员、渴望了解Ruby On Rails框架的开发人员,尤其适合有初步的Java EE开发经验,想从Java EE平台过渡到Ruby On Rails开发平台的开发者。 Ruby On Rails框架一经推出,立即引起B/S结构应用开发领域革命性的变化:开发者无需理会架构,只需要按Rail......一起来看看 《Ruby on Rails敏捷开发最佳实践》 这本书的介绍吧!

RGB转16进制工具
RGB转16进制工具

RGB HEX 互转工具

RGB CMYK 转换工具
RGB CMYK 转换工具

RGB CMYK 互转工具