ZooKeeper 分布式锁实践(上):排它锁

栏目: 编程工具 · 发布时间: 5年前

内容简介:作者 | Sunny杏仁后端工程师,专注高并发和分布式编程,Golang爱好者。

作者 | Sunny

ZooKeeper 分布式锁实践(上):排它锁

杏仁后端工程师,专注高并发和分布式编程,Golang爱好者。

前面我们使用 Redis 实现了一个简单的分布式排它锁,它的主要问题在于无法及时得知锁状态的变化,虽然能够通过 Redis 的订阅发布模式来实现通知的功能,但实现起来比较复杂、实现成本较高。而 ZooKeeper 天生就是为分布式系统的协调工作而设计的,能够很轻松地实现日常的分布式管理工具,也就是 ZooKeeper 几大菜谱:

  • 数据发布/订阅

  • 负载均衡

  • 命名服务

  • 分布式协调/通知

  • 集群管理

  • Master选举

  • 分布式锁

  • 分布式队列

接下来,就让我们具体来看看,如何通过 ZooKeeper 的部件来实现一个分布式锁。

数据模型

对于一个锁来说,我们至少需要知道它的 ID、以及它的所有者( 只有持有者才能解锁 ),所以简单来说,锁的数据模型就是:

case class ZkLock

(

lockName : String ,
 lockOwner :

String

)

那么,如何用 ZooKeeper 来存储锁对象呢?

其实非常简单,用一个 ZNode 来表示一个锁,ZNode 的路径就是锁的 ID、ZNode 的数据即是锁的所有者:

/zk-lock (data = "zk-lock-owner")

原语

锁的原语一般有两个:

  1. 加锁

  2. 解锁

加锁

加锁的一般算法步骤是:

  1. 尝试加锁

  2. 如果锁没有被占用,则加锁成功

  3. 如果锁被占用,则等待锁被释放

  4. 锁被释放后,收到锁释放通知,重复步骤 1

翻译成ZooKeeper的算法步骤就是:

  1. 尝试创建表示锁的临时节点

  2. 如果创建节点成功,则加锁成功

  3. 如果创建节点失败,则创建一个锁节点的监视器,等待锁节点的删除通知

  4. 锁节点被持有者删除后,收到锁节点的删除通知,重复步骤 1

这里有两个注意点:

  • 为什么创建临时节点?当 ZooKeeper 客户端断开与服务端的连接时,它所创建的临时节点会被删除

  • 节点删除监视器

加锁实现

  1. 数据模型:ZkLock 对象:

    case class ZkLock
    ( lockName : String , // 锁ID = ZNode节点的路径 lockOwner : String

    // 锁的所有者 = ZNode节点的数据内容

    ) { // 1. connect // 2. try lock // 3. lock

    // 4. unlock

    }

  2. 为了示例完整性,在构造锁对象时会创建一个 ZooKeeper 客户端连接,用来和 ZooKeeper 服务端通信。

    这里通过一个闭锁来同步 ZooKeeper 客户端的连接状态:构造函数会一直阻塞,直到ZooKeeper客户端连接上服务端,然后打开闭锁:

    case class ZkLock(...) {
     //////////////////// 1. connect //////////////////// private val connectSignal = new CountDownLatch ( 1 ) private val zk = new ZooKeeper ( ZkConnection . defaultHost , 60 * 1000 , new Watcher { override def process ( e : WatchedEvent ) : Unit = { if ( KeeperState . SyncConnected . equals ( e . getState ) && EventType . None . equals ( e . getType )) { connectSignal . countDown // 连接成功,打开闭锁 } } } ) connectSignal .

    await

    }

  3. 与服务端成功建立连接之后,就可以实现下一步 尝试加锁 的操作了。

  4. 尝试加锁

    case class ZkLock(...) {
    //////////////////// 2. try lock //////////////////// def tryLock : Boolean = { Try { if ( zk . exists ( lockName , false ) != null ) { val existOwner = new String ( zk . getData ( lockName , null , null ), "UTF-8" ) lockOwner . equals ( existOwner ) // 可重入性 } else { zk . create ( lockName , // 节点路径 = 锁ID lockOwner . getBytes , // 节点数据 = 锁的所有者 Ids . OPEN_ACL_UNSAFE , // 公开访问权限 CreateMode . EPHEMERAL // 临时节点 ) true } } match { case Success ( isLocked ) => isLocked case _ => false } }

    }

  • 排他性:锁节点只能被一个 ZooKeeper 成功创建

  • 可重入性:如果锁节点已被创建、且加锁者与锁节点的持有者一样,也返回加锁成功

  • 非阻塞方法:不管是否成功加锁,都立即返回加锁的结果

  • 同步加锁

    case class ZkLock(...) {
    //////////////////// 3. lock //////////////////// def lock : Unit = { if (! tryLock ) { val releaseSignal = new CountDownLatch ( 1 ) zk . exists ( lockName , new Watcher { override def process ( e : WatchedEvent ) : Unit = { if ( lockName . equals ( e . getPath ) && EventType . NodeDeleted . equals ( e . getType )) { releaseSignal . countDown } } }) releaseSignal . await lock } else { println ( s" ${ this } Locked ${ nowHourStr } \n" ) } }

    }

    • 如果返回:加锁成功,则完成加锁操作

    • 如果返回:加锁失败,则:

    • 创建闭锁:同步等待锁释放通知

    • 创建锁节点路径的监视器:收到锁节点被删除的事件时,打开闭锁

    • 阻塞等待的闭锁被打开后,重新开始同步加锁( 尾递归调用  

    • 同步加锁算法:尝试加锁:调用 tryLock   方法

    解锁

    解锁的算法步骤是:

    1. 锁节点是否存在

    2. 如果不存在,完成解锁

    3. 如果锁节点存在,则判断锁节点的数据( 锁的持有者 )是否和解锁者相同

    4. 如果一样,则删除锁节点,完成解锁

    解锁实现

    case class ZkLock(...) {
     //////////////////// 4. unlock //////////////////// def unlock : Unit = { if ( zk . exists ( lockName , false ) != null ) { val existOwner = new String ( zk . getData ( lockName , null , null ), "UTF-8" ) if ( lockOwner . equals ( existOwner )) { zk . delete ( lockName , - 1 ) println ( s" ${ this } Unlocked ${ nowHourStr } " ) } }

    }

    }

    测试

    object ZkLockDemo extends App {
      val lockName = "/zk-lock"
      val locker1 = ZkLock(lockName, "locker1")
      val locker2 = ZkLock(lockName, "locker2")
    
      async(() => locker1.lock)
      async(() => locker2.lock, waitTime = 1000L)
    
      async(() => locker1.unlock, waitTime = 2000L)
      async(() => locker2.unlock, waitTime = 3000L)
    
      Thread.sleep(Int.MaxValue)}

    注: async  方法用于异步地执行传入的函数来演示同步加锁、解锁过程, async   实现代码如下:

    def async(action: () => Unit,

    waitTime : Long = 0L ,

    delayTime : Long = 0L ) : Unit = { new Thread ( new Runnable { override def run () : Unit = { Thread . sleep ( waitTime ) action () Thread . sleep ( delayTime ) } }). start

    ()

    }

    执行结果:

    ZkLock(/zk-lock,locker1) Locked 18:22:37.002
    ZkLock(/zk-lock,locker1) Unlocked 18:22:38.897
    ZkLock(/zk-lock,locker2) Locked 18:22:38.900
    ZkLock(/zk-lock,locker2) Unlocked 18:22:39.901

    梳理一下整个 Demo 的过程:

    1. locker1 先成功加锁

    2. locker2 在 1 秒后尝试加锁,但是锁已被占用,所以进入阻塞等待阶段

    3. locker1 在 2 秒后解锁

    4. locker2 收到锁释放通知,再次尝试加锁成功

    5. locker2 在 3 秒后解锁

    缺陷

    以上,一个简单的分布式排他锁就宣告完成,实现代码十分简单。

    但是,它也有明显的缺陷:

    1. 排它锁的粒度大,没有区分读、写操作,如果读多写少,则十分影响性能

    2. 羊群效应:锁释放后会通知所有等待中的 ZooKeeper 客户端,然后同时发起加锁请求,瞬时压力很大

    那有没有什么好的办法来解决这两个问题呢?欲知答案,敬请期待下篇:ZooKeeper 分布式锁实践(下篇)读写锁

    全文完

    以下文章您可能也会感兴趣:

    我们正在招聘 Java 工程师,欢迎有兴趣的同学投递简历到 rd-hr@xingren.com 。

    ZooKeeper 分布式锁实践(上):排它锁

    杏仁技术站

    长按左侧二维码关注我们,这里有一群热血青年期待着与您相会。


    以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网

    查看所有标签

    猜你喜欢:

    本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

    B2B品牌管理

    B2B品牌管理

    (美)菲利普·科特勒、(德)弗沃德 / 楼尊 / 2008-1 / 35.00元

    《B2B品牌管理》是第一本专门系统地阐述B2B品牌化的专业书籍,由营销大师菲利普•科特勒与弗沃德教授合作而成。他们以非凡的智慧和深厚的经验告诫B2B企业如何运用目标明确、重点突出的品牌化战略取得市场竞争优势地位,从而更加接近顾客,也更接近成功。在众多关于品牌的书籍中,《B2B品牌管理》的独特价值在于其根据实际环境探讨B2B品牌和品牌化问题,重点介绍和分析前沿的思想和最佳实践;通过与B2C企业的品牌......一起来看看 《B2B品牌管理》 这本书的介绍吧!

    CSS 压缩/解压工具
    CSS 压缩/解压工具

    在线压缩/解压 CSS 代码

    在线进制转换器
    在线进制转换器

    各进制数互转换器

    RGB CMYK 转换工具
    RGB CMYK 转换工具

    RGB CMYK 互转工具