内容简介:大致来说,zookeeper的使用场景如下,我就举几个简单的,大家能说几个就好了:分布式协调分布式锁
大致来说,zookeeper的使用场景如下,我就举几个简单的,大家能说几个就好了:
分布式协调
分布式锁
元数据/配置信息管理
HA高可用
1、分布式协调
这个其实是 zookeeper 很经典的一个用法,简单来说,就好比,你A系统发送个请求到 mq,然后 B 系统消息消费之后处理了。那 A 系统如何知道 B 系统的处理结果?用zookeeper 就可以实现分布式系统之间的协调工作。A 系统发送请求之后可以在 zookeeper 上对某个节点的值注册个监听器,一旦 B 系统处理完了就修改 zookeeper 那个节点的值,A系统立马就可以收到通知,完美解决。
2、分布式锁
举个例子。对某一个数据连续发出两个修改操作,两台机器同时收到了请求,但是只能一台机器先执行完另外一个机器再执行。那么此时就可以使用 zookeeper 分布式锁,一个机器接收到了请求之后先获取zookeeper上的一把分布式锁,就是可以去创建一个znode,接着执行操作;然后另外一个机器也尝试去创建那个znode,结果发现自己创建不了,因为被别人创建了,那只能等着,等第一个机器执行完了自己再执行
3、元数据/配置信息管理
zookeeper 可以用作很多系统的配置信息的管理,比如kafka、storm等等很多分布式系统都会选用 zookeeper来做一些元数据、配置信息的管理,包括dubbo注册中心不也支持zookeeper 么?
4、HA高可用
这个应该是很常见的,比如 hadoop、hdfs、yarn 等很多大数据系统,都选择基于 zookeeper 来开发 HA高可用机制,就是一个重要进程一般会做主备两个,主进程挂了立马通过 zookeeper 感知到切换到备用进程。
二、Zookeeper实现分布式锁
1、查看目标Node是否已经创建,已经创建,那么等待锁。
2、如果未创建,创建一个瞬时Node,表示已经占有锁。
3、如果创建失败,那么证明锁已经被其他线程占有了,那么同样等待锁。
4、当释放锁,或者当前Session超时的时候,节点被删除,唤醒之前等待锁的线程去争抢锁。
代码如下:
package com.codertom.params.engine; import com.google.common.base.Strings; import org.apache.zookeeper.*; import java.io.IOException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.locks.Lock; /** * Zookeepr实现分布式锁 */ public class LockTest { private String zkQurom = "localhost:2181"; private String lockNameSpace = "/mylock"; private String nodeString = lockNameSpace + "/test1"; private Lock mainLock; private ZooKeeper zk; public LockTest(){ try { zk = new ZooKeeper(zkQurom, 6000, new Watcher() { @Override public void process(WatchedEvent watchedEvent) { System.out.println("Receive event "+watchedEvent); if(Event.KeeperState.SyncConnected == watchedEvent.getState()) System.out.println("connection is established..."); } }); } catch (IOException e) { e.printStackTrace(); } } private void ensureRootPath() throws InterruptedException { try { if (zk.exists(lockNameSpace,true)==null){ zk.create(lockNameSpace,"".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT); } } catch (KeeperException e) { e.printStackTrace(); } } private void watchNode(String nodeString, final Thread thread) throws InterruptedException { try { zk.exists(nodeString, new Watcher() { @Override public void process(WatchedEvent watchedEvent) { System.out.println( "==" + watchedEvent.toString()); if(watchedEvent.getType() == Event.EventType.NodeDeleted){ System.out.println("Threre is a Thread released Lock=============="); thread.interrupt(); } try { zk.exists(nodeString,new Watcher() { @Override public void process(WatchedEvent watchedEvent) { System.out.println( "==" + watchedEvent.toString()); if(watchedEvent.getType() == Event.EventType.NodeDeleted){ System.out.println("Threre is a Thread released Lock=============="); thread.interrupt(); } try { zk.exists(nodeString,true); } catch (KeeperException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } }); } catch (KeeperException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } }); } catch (KeeperException e) { e.printStackTrace(); } } /** * 获取锁 * @return * @throws InterruptedException */ public boolean lock() throws InterruptedException { String path = null; ensureRootPath(); watchNode(nodeString,Thread.currentThread()); while (true) { try { path = zk.create(nodeString, "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); } catch (KeeperException e) { System.out.println(Thread.currentThread().getName() + " getting Lock but can not get"); try { Thread.sleep(5000); }catch (InterruptedException ex){ System.out.println("thread is notify"); } } if (!Strings.nullToEmpty(path).trim().isEmpty()) { System.out.println(Thread.currentThread().getName() + " get Lock..."); return true; } } } /** * 释放锁 */ public void unlock(){ try { zk.delete(nodeString,-1); System.out.println("Thread.currentThread().getName() + release Lock..."); } catch (InterruptedException e) { e.printStackTrace(); } catch (KeeperException e) { e.printStackTrace(); } } public static void main(String args[]) throws InterruptedException { ExecutorService service = Executors.newFixedThreadPool(10); for (int i = 0;i<4;i++){ service.execute(()-> { LockTest test = new LockTest(); try { test.lock(); Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } test.unlock(); }); } service.shutdown(); } } 复制代码
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网
猜你喜欢:- 一文读懂监督学习、无监督学习、半监督学习、强化学习这四种深度学习方式
- 学习:人工智能-机器学习-深度学习概念的区别
- 统计学习,机器学习与深度学习概念的关联与区别
- 混合学习环境下基于学习行为数据的学习预警系统设计与实现
- 学习如何学习
- 深度学习的学习历程
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
Linux程序设计
马修 / 陈健 / 人民邮电出版社 / 2007-7 / 89.00元
《Linux 程序设计(第3版)》讲述在Linux系统及其他UNIX风格的操作系统上进行的程序开发,主要内容包括标准Linux C语言函数库和由不同的Linux或UNIX标准指定的各种工具的使用方法,大多数标准Linux开发工具的使用方法,通过DBM和MySQL数据库系统对Linux中的数据进行存储,为X视窗系统建立图形化用户界面等。《Linux 程序设计(第3版)》通过先介绍程序设计理论,再以适......一起来看看 《Linux程序设计》 这本书的介绍吧!