ZooKeeper 实现分布式锁

栏目: 编程工具 · 发布时间: 5年前

内容简介:ZooKeeper 是一个典型的分布式数据一致性解决方案,分布式应用程序可以基于 ZooKeeper 实现诸如数据发布/订阅、负载均衡、分布式协调/通知、集群管理、Master 选举、分布式锁等功能。在介绍 ZooKeeper 分布式锁前需要先了解一下 ZooKeeper 中节点(Znode),ZooKeeper 的数据存储数据模型是一棵树(Znode Tree),由斜杠(/)的进行分割的路径,就是一个 Znode(如 /locks/my_lock)。每个 Znode 上都会保存自己的数据内容,同时还会保存

ZooKeeper 是一个典型的分布式数据一致性解决方案,分布式应用程序可以基于 ZooKeeper 实现诸如数据发布/订阅、负载均衡、分布式协调/通知、集群管理、Master 选举、分布式锁等功能。

节点

在介绍 ZooKeeper 分布式锁前需要先了解一下 ZooKeeper 中节点(Znode),ZooKeeper 的数据存储数据模型是一棵树(Znode Tree),由斜杠(/)的进行分割的路径,就是一个 Znode(如 /locks/my_lock)。每个 Znode 上都会保存自己的数据内容,同时还会保存一系列属性信息。

Znode 又分为以下四种类型:

类型 描述
持久节点 节点创建后,会一直存在,不会因客户端会话失效而删除
持久顺序节点 基本特性与持久节点一致,创建节点的过程中,ZooKeeper 会在其名字后自动追加一个单调增长的数字后缀,作为新的节点名
临时节点 客户端会话失效或连接关闭后,该节点会被自动删除
临时顺序节点 基本特性与临时节点一致,创建节点的过程中,ZooKeeper 会在其名字后自动追加一个单调增长的数字后缀,作为新的节点名

锁原理

ZooKeeper 分布式锁是基于 临时顺序节点 来实现的,锁可理解为 ZooKeeper 上的一个节点,当需要获取锁时,就在这个锁节点下创建一个临时顺序节点。当存在多个客户端同时来获取锁,就按顺序依次创建多个临时顺序节点,但只有排列序号是第一的那个节点能获取锁成功,其他节点则按顺序分别监听前一个节点的变化,当被监听者释放锁时,监听者就可以马上获得锁。

而且用临时顺序节点的另外一个用意是如果某个客户端创建临时顺序节点后,自己意外宕机了也没关系,ZooKeeper 感知到某个客户端宕机后会自动删除对应的临时顺序节点,相当于自动释放锁。

ZooKeeper 实现分布式锁

如上图:ClientA 和 ClientB 同时想获取锁,所以都在 locks 节点下创建了一个临时节点 1 和 2,而 1 是当前 locks 节点下排列序号第一的节点,所以 ClientA 获取锁成功,而 ClientB 处于等待状态,这时 ZooKeeper 中的 2 节点会监听 1 节点,当 1节点锁释放(节点被删除)时,2 就变成了 locks 节点下排列序号第一的节点,这样 ClientB 就获取锁成功了。

代码测试

请确保 ZooKeeper 服务已启动,ZooKeeper 的搭建可参考Kafka 集群 中的 ZooKeeper 集群部分

以下是基于 C# 的测试,Java 可使用 Curator 框架,实现原理和上面描述是一致的,有兴趣可以看看源码,应该也不难理解。

  1. 创建 .NET Core 控制台程序
  2. Nuget 安装 ZooKeeperNetEx.Recipes
  3. 创建 ZooKeeper Client

    private const int CONNECTION_TIMEOUT = 50000;
    private const string CONNECTION_STRING = "127.0.0.1:2181";
    private ZooKeeper CreateClient()
    {
    	var zooKeeper = new ZooKeeper(CONNECTION_STRING, CONNECTION_TIMEOUT, NullWatcher.Instance);
    	Stopwatch sw = new Stopwatch();
    	sw.Start();
    	while (sw.ElapsedMilliseconds < CONNECTION_TIMEOUT)
    	{
    		var state = zooKeeper.getState();
    		if (state == ZooKeeper.States.CONNECTED || state == ZooKeeper.States.CONNECTING)
    		{
    			break;
    		}
    	}
    	sw.Stop();
    	return zooKeeper;
    }
    
    class NullWatcher : Watcher
       {
           public static readonly NullWatcher Instance = new NullWatcher();
           private NullWatcher() { }
           public override Task process(WatchedEvent @event)
           {
               return Task.CompletedTask;
           }
       }
    
  4. 添加 Lock 方法

    /// <summary>
    /// 加锁
    /// </summary>
    /// <param name="key">加锁的节点名</param>
    /// <param name="lockAcquiredAction">加锁成功后需要执行的逻辑</param>
    /// <param name="lockReleasedAction">锁释放后需要执行的逻辑,可为空</param>
    /// <returns></returns>
    public async Task Lock(string key, Action lockAcquiredAction, Action lockReleasedAction = null)
    {
    	// 获取 ZooKeeper Client
    	ZooKeeper keeper = CreateClient();
    	// 指定锁节点
    	WriteLock writeLock = new WriteLock(keeper, $"/{key}", null);
    
    	var lockCallback = new LockCallback(() =>
    	{
    		lockAcquiredAction.Invoke();
    		writeLock.unlock();
    	}, lockReleasedAction);
    	// 绑定锁获取和释放的监听对象
    	writeLock.setLockListener(lockCallback);
    	// 获取锁(获取失败时会监听上一个临时节点)
    	await writeLock.Lock();
    }
    
    class LockCallback : LockListener
    {
    	private readonly Action _lockAcquiredAction;
    	private readonly Action _lockReleasedAction;
    
    	public LockCallback(Action lockAcquiredAction, Action lockReleasedAction)
    	{
    		_lockAcquiredAction = lockAcquiredAction;
    		_lockReleasedAction = lockReleasedAction;
    	}
    
    	/// <summary>
    	/// 获取锁成功回调
    	/// </summary>
    	/// <returns></returns>
    	public Task lockAcquired()
    	{
    		_lockAcquiredAction?.Invoke();
    		return Task.FromResult(0);
    	}
    
    	/// <summary>
    	/// 释放锁成功回调
    	/// </summary>
    	/// <returns></returns>
    	public Task lockReleased()
    	{
    		_lockReleasedAction?.Invoke();
    		return Task.FromResult(0);
    	}
    }
    
  5. 多线程模拟测试

    static async Task RunAsync()
    {
    	Parallel.For(1, 10, async (i) =>
    	{
    		await new ZooKeeprDistributedLock().Lock("locks", () =>
    		{
    			Console.WriteLine($"第{i}个请求,获取锁成功:{DateTime.Now},线程Id:{Thread.CurrentThread.ManagedThreadId}");
    			Thread.Sleep(1000); // 业务逻辑...
    		}, () =>
    		{
    			Console.WriteLine($"第{i}个请求,释放锁成功:{DateTime.Now},线程Id:{Thread.CurrentThread.ManagedThreadId}");
    			Console.WriteLine("-------------------------------");
    		});
    	});
    	await Task.CompletedTask;
    }
    

    ZooKeeper 实现分布式锁

虽然模拟的是多线程并行执行,但最终都会依赖锁的获取和释放而串行执行实际业务逻辑。

参考链接


以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

产品增长力

产品增长力

李阳 / 机械工业出版社 / 2018-4-1 / 59

本书由京东资深数据产品经理撰写,重新定义了数据与产品、业务的关系,从数据分析方法、数据价值挖掘、数据结果倒逼业务优化3个层次,以及设计、运营和优化3个维度,为产品增长提供了科学的依据和方法论,得到了PMCaff创始人阿德、GrowingIO创始人&CEO张溪梦、增长官研究院创始人范冰、腾讯高级产品经理刘涵宇等专家的高度评价。 全书内容以理论为主线,以实操为目标,萃取技术实操与管理思维中的精华......一起来看看 《产品增长力》 这本书的介绍吧!

随机密码生成器
随机密码生成器

多种字符组合密码

HTML 编码/解码
HTML 编码/解码

HTML 编码/解码

HEX HSV 转换工具
HEX HSV 转换工具

HEX HSV 互换工具