利用ZooKeeper简单实现分布式锁

栏目: 编程工具 · 发布时间: 5年前

内容简介:在程序开发过程中不得不考虑的就是并发问题。在Java中对于同一个jvm而言,jdk已经提供了lock和同步等。但是在分布式情况下,往往存在多个进程对一些资源产生竞争关系,而这些进程往往在不同的机器上,这个时候jdk中提供的已经不能满足。分布式锁顾名思义就是可以满足分布式情况下的并发锁。 下面我们讲解怎么利用ZooKeeper实现分布式锁。ZooKeeper是Apache软件基金会的一个软件项目,他为大型分布式计算提供开源的分布式配置服务、同步服务和命名注册。在 ZooKeeper 中,节点类型可以分为持久节

在程序开发过程中不得不考虑的就是并发问题。在 Java 中对于同一个jvm而言,jdk已经提供了lock和同步等。但是在分布式情况下,往往存在多个进程对一些资源产生竞争关系,而这些进程往往在不同的机器上,这个时候jdk中提供的已经不能满足。分布式锁顾名思义就是可以满足分布式情况下的并发锁。 下面我们讲解怎么利用ZooKeeper实现分布式锁。

2.实现思路:

2.1 ZooKeeper简单介绍:

ZooKeeper是Apache软件基金会的一个软件项目,他为大型分布式计算提供开源的分布式配置服务、同步服务和命名注册。在 ZooKeeper 中,节点类型可以分为持久节点(PERSISTENT )、临时节点(EPHEMERAL),以及时序节点(SEQUENTIAL ),具体在节点创建过程中,一般是组合使用,可以生成 4 种节点类型:持久节点(PERSISTENT),持久顺序节点(PERSISTENT_SEQUENTIAL),临时节点(EPHEMERAL),临时顺序节点(EPHEMERAL_SEQUENTIAL)。

ZooKeeper 节点是有生命周期的,这取决于节点的类型。在 ZooKeeper 中,节点类型可以分为持久节点(PERSISTENT )、临时节点(EPHEMERAL),以及时序节点(SEQUENTIAL ),具体在节点创建过程中,一般是组合使用,可以生成以下 4 种节点类型。

  • 持久节点(PERSISTENT)

所谓持久节点,是指在节点创建后,就一直存在,直到有删除操作来主动清除这个节点——不会因为创建该节点的客户端会话失效而消失。

  • 持久顺序节点(PERSISTENT_SEQUENTIAL)

这类节点的基本特性和上面的节点类型是一致的。额外的特性是,在ZK中,每个父节点会为他的第一级子节点维护一份时序,会记录每个子节点创建的先后顺序。基于这个特性,在创建子节点的时候,可以设置这个属性,那么在创建节点过程中,ZK会自动为给定节点名加上一个数字后缀,作为新的节点名。这个数字后缀的范围是整型的最大值。

  • 临时节点(EPHEMERAL)

和持久节点不同的是,临时节点的生命周期和客户端会话绑定。也就是说,如果客户端会话失效,那么这个节点就会自动被清除掉。注意,这里提到的是会话失效,而非连接断开。另外,在临时节点下面不能创建子节点。

  • 临时顺序节点(EPHEMERAL_SEQUENTIAL)

可以用来实现分布式锁

  1. 客户端调用create()方法创建名为“_locknode_/guid-lock-”的节点,需要注意的是,这里节点的创建类型需要设置为EPHEMERAL_SEQUENTIAL。
  2. 客户端调用getChildren(“_locknode_”)方法来获取所有已经创建的子节点,注意,这里不注册任何Watcher。
  3. 客户端获取到所有子节点path之后,如果发现自己在步骤1中创建的节点序号最小,那么就认为这个客户端获得了锁。

如果在步骤3中发现自己并非所有子节点中最小的,说明自己还没有获取到锁。此时客户端需要找到比自己小的那个节点,然后对其调用exist()方法,同时注册事件监听。

之后当这个被关注的节点被移除了,客户端会收到相应的通知。这个时候客户端需要再次调用getChildren(“_locknode_”)方法来获取所有已经创建的子节点,确保自己确实是最小的节点了,然后进入步骤3。

2.2 利用zk实现:

当很多进程需要访问共享资源时,我们可以通过zk来实现分布式锁。主要步骤是:

  1. 建立一个节点,假如名为:lock 。节点类型为持久节点(PERSISTENT)
  2. 每当进程需要访问共享资源时,会调用分布式锁的lock()或tryLock()方法获得锁,这个时候会在第一步创建的lock节点下建立相应的顺序子节点,节点类型为临时顺序节点(EPHEMERAL_SEQUENTIAL),通过组成特定的名字name+ lock +顺序号。
  3. 在建立子节点后,对lock下面的所有以name开头的子节点进行排序,判断刚刚建立的子节点顺序号是否是最小的节点,假如是最小节点,则获得该锁对资源进行访问。
  4. 假如不是该节点,就获得该节点的上一顺序节点,并给该节点是否存在注册监听事件。同时在这里阻塞。等待监听事件的发生,获得锁控制权。
  5. 当调用完共享资源后,调用unlock()方法,关闭zk,进而可以引发监听事件,释放该锁。
  6. 实现的分布式锁是严格的按照顺序访问的并发锁。

3.代码实现:

下面将讲解使用java实现分布式锁:

  1. 建立类DistributedLock,实现java.util.concurrent.locks.Lock;和org.apache.zookeeper.Watcher接口
  2. 实现lock下面的方法:主要包括lock,tryLock,unlock等
  3. 实现watcher接口下的process方法。
  4. 在构造器中对zk进行初始化。
  5. 详细见代码注释

import java.io.IOException;

import java.util.ArrayList;

import java.util.Collections;

import java.util.List;

import java.util.concurrent.CountDownLatch;

import java.util.concurrent.TimeUnit;

import java.util.concurrent.locks.Condition;

import java.util.concurrent.locks.Lock;

import org.apache.zookeeper.CreateMode;

import org.apache.zookeeper.KeeperException;

import org.apache.zookeeper.WatchedEvent;

import org.apache.zookeeper.Watcher;

import org.apache.zookeeper.ZooDefs;

import org.apache.zookeeper.ZooKeeper;

import org.apache.zookeeper.Watcher.Event.KeeperState;

import org.apache.zookeeper.data.Stat;

public class DistributedLock implements Lock, Watcher{

private ZooKeeper zk;

private String root = "/locks";//根

private String lockName;//竞争资源的标志

private String waitNode;//等待前一个锁

private String myZnode;//当前锁

private CountDownLatch latch;//计数器

private CountDownLatch connectedSignal=new CountDownLatch(1);

private int sessionTimeout = 30000;

/**

* 创建分布式锁,使用前请确认config配置的zookeeper服务可用

* @param config 192.168.1.127:2181

* @param lockName 竞争资源标志,lockName中不能包含单词_lock_

*/

public DistributedLock(String config, String lockName){

this.lockName = lockName;

// 创建一个与服务器的连接

try {

zk = new ZooKeeper(config, sessionTimeout, this);

connectedSignal.await();

Stat stat = zk.exists(root, false);//此去不执行 Watcher

if(stat == null){

// 创建根节点

zk.create(root, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT);

}

} catch (IOException e) {

throw new LockException(e);

} catch (KeeperException e) {

throw new LockException(e);

} catch (InterruptedException e) {

throw new LockException(e);

}

}

/**

* zookeeper节点的监视器

*/

public void process(WatchedEvent event) {

//建立连接用

if(event.getState()==KeeperState.SyncConnected){

connectedSignal.countDown();

return;

}

//其他线程放弃锁的标志

if(this.latch != null) { 

this.latch.countDown(); 

}

}

public void lock() { 

try {

if(this.tryLock()){

System.out.println("Thread " + Thread.currentThread().getId() + " " +myZnode + " get lock true");

return;

}

else{

waitForLock(waitNode, sessionTimeout);//等待锁

}

} catch (KeeperException e) {

throw new LockException(e);

} catch (InterruptedException e) {

throw new LockException(e);

}

}

public boolean tryLock() {

try {

String splitStr = "_lock_";

if(lockName.contains(splitStr))

throw new LockException("lockName can not contains \\u000B ");

//创建临时子节点

myZnode = zk.create(root + "/" + lockName + splitStr, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.EPHEMERAL_SEQUENTIAL);

System.out.println(myZnode + " is created ");

//取出所有子节点

List subNodes = zk.getChildren(root, false);

//取出所有lockName的锁

List lockObjNodes = new ArrayList();

for (String node : subNodes) {

String _node = node.split(splitStr)[0];

if(_node.equals(lockName)){

lockObjNodes.add(node);

}

}

Collections.sort(lockObjNodes);

if(myZnode.equals(root+"/"+lockObjNodes.get(0))){

//如果是最小的节点,则表示取得锁

System.out.println(myZnode + "==" + lockObjNodes.get(0));

return true;

}

//如果不是最小的节点,找到比自己小1的节点

String subMyZnode = myZnode.substring(myZnode.lastIndexOf("/") + 1);

waitNode = lockObjNodes.get(Collections.binarySearch(lockObjNodes, subMyZnode) - 1);//找到前一个子节点

} catch (KeeperException e) {

throw new LockException(e);

} catch (InterruptedException e) {

throw new LockException(e);

}

return false;

}

public boolean tryLock(long time, TimeUnit unit) {

try {

if(this.tryLock()){

return true;

}

return waitForLock(waitNode,time);

} catch (Exception e) {

e.printStackTrace();

}

return false;

}

private boolean waitForLock(String lower, long waitTime) throws InterruptedException, KeeperException {

Stat stat = zk.exists(root + "/" + lower,true);//同时注册监听。

//判断比自己小一个数的节点是否存在,如果不存在则无需等待锁,同时注册监听

if(stat != null){

System.out.println("Thread " + Thread.currentThread().getId() + " waiting for " + root + "/" + lower);

this.latch = new CountDownLatch(1);

this.latch.await(waitTime, TimeUnit.MILLISECONDS);//等待,这里应该一直等待其他线程释放锁

this.latch = null;

}

return true;

}

public void unlock() {

try {

System.out.println("unlock " + myZnode);

zk.delete(myZnode,-1);

myZnode = null;

zk.close();

} catch (InterruptedException e) {

e.printStackTrace();

} catch (KeeperException e) {

e.printStackTrace();

}

}

public void lockInterruptibly() throws InterruptedException {

this.lock();

}

public Condition newCondition() {

return null;

}

public class LockException extends RuntimeException {

private static final long serialVersionUID = 1L;

public LockException(String e){

super(e);

}

public LockException(Exception e){

super(e);

}

}

Linux公社的RSS地址https://www.linuxidc.com/rssFeed.aspx

本文永久更新链接地址: https://www.linuxidc.com/Linux/2019-06/159108.htm


以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

Distributed Systems

Distributed Systems

Sukumar Ghosh / Chapman and Hall/CRC / 2014-7-14 / USD 119.95

Distributed Systems: An Algorithmic Approach, Second Edition provides a balanced and straightforward treatment of the underlying theory and practical applications of distributed computing. As in the p......一起来看看 《Distributed Systems》 这本书的介绍吧!

XML、JSON 在线转换
XML、JSON 在线转换

在线XML、JSON转换工具

UNIX 时间戳转换
UNIX 时间戳转换

UNIX 时间戳转换

HSV CMYK 转换工具
HSV CMYK 转换工具

HSV CMYK互换工具