内容简介:Apache Curator是用于Apache ZooKeeper的一个Java客户端库;它包括一个高级API框架和实用程序,使使用Apache ZooKeeper更加容易和可靠。Curator之于ZooKeeper就像Cuava之于Java。本文件主要介绍使用Curator操作Zookeeper,文中所使用到的软件版本:Java 1.8.0_191、Zookeeper 3.6.0、Junit 4.13。
Apache Curator是用于Apache ZooKeeper的一个 Java 客户端库;它包括一个高级API框架和实用程序,使使用Apache ZooKeeper更加容易和可靠。Curator之于ZooKeeper就像Cuava之于Java。
本文件主要介绍使用Curator操作Zookeeper,文中所使用到的软件版本:Java 1.8.0_191、Zookeeper 3.6.0、Junit 4.13。
1、引入依赖
<dependency> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> <version>3.6.0</version> </dependency> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-recipes</artifactId> <version>4.3.0</version> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.13</version> </dependency>
2、基本操作
package com.inspur.demo.general.zookeeper; import org.apache.curator.RetryPolicy; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.api.BackgroundCallback; import org.apache.curator.framework.api.CuratorEvent; import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.ZooDefs; import org.apache.zookeeper.data.ACL; import org.apache.zookeeper.data.Id; import org.apache.zookeeper.data.Stat; import org.junit.After; import org.junit.Before; import org.junit.Test; import java.util.Collections; import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executors; /** * 使用Curator操作Zookeeper */ public class CuratorCase { //Zookeeper地址,集群多个地址间用逗号分隔,如:10.49.196.10:2181,10.49.196.11:2181,10.49.196.12:2181 private static String connectString = "10.49.196.10:2181"; private static int sessionTimeout = 20 * 1000; private static int connectionTimeout = 10 * 1000; private CuratorFramework cf; @Before public void before() { RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000,3); cf = CuratorFrameworkFactory.builder() .connectString(connectString) .sessionTimeoutMs(sessionTimeout) .connectionTimeoutMs(connectionTimeout) .retryPolicy(retryPolicy) .build(); cf.start(); } @After public void after() throws Exception { cf.close(); } /** * 创建节点 */ @Test public void create() throws Exception { /* * 同步创建节点 * 1.除非指明创建节点的类型,默认是持久节点 * 2.临时节点没有子节点;所以递归创建出来的节点,只有最后的数据节点才是指定类型的节点,其父节点是持久节点 */ //创建一个内容为空的节点 cf.create().forPath("/curator/node1"); //创建一个内容为aaa的节点 cf.create().forPath("/curator/node2", "aaa".getBytes()); //创建一个临时节点 cf.create().withMode(CreateMode.EPHEMERAL).forPath("/curator/node3"); //递归创建,最后的节点类型为临时节点 cf.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath("/curator/node4/a/b"); //创建一个节点,ACL为digest:jack:tgi9UCnyPo5FJjVylKr05nAlWeg=:cdrwa cf.create().withACL(Collections.singletonList(new ACL(ZooDefs.Perms.ALL, new Id("digest", "jack:tgi9UCnyPo5FJjVylKr05nAlWeg=")))).forPath("/curator/node5"); /* * 异步创建节点 * 可以指定线程池,不指定则使用Zookeeper的EventThread线程对事件进行串行处理 */ CountDownLatch counter = new CountDownLatch(2); cf.create().inBackground(new BackgroundCallback(){ @Override public void processResult(CuratorFramework client, CuratorEvent event) throws Exception { System.out.println(event); counter.countDown(); } }, Executors.newFixedThreadPool(1)).forPath("/curator/node6"); cf.create().inBackground(new BackgroundCallback(){ @Override public void processResult(CuratorFramework client, CuratorEvent event) throws Exception { System.out.println(event); counter.countDown(); } }).forPath("/curator/node7"); counter.await(); } /** * 获取节点内容 * @throws Exception */ @Test public void getData() throws Exception { Stat stat = new Stat(); byte[] bytes = cf.getData() .storingStatIn(stat)//状态,可选 .forPath("/curator/node2"); System.out.println("状态信息:" + stat); System.out.println("内容:" + new String(bytes)); //异步获取数据 CountDownLatch counter = new CountDownLatch(1); cf.getData().inBackground(new BackgroundCallback(){ @Override public void processResult(CuratorFramework client, CuratorEvent event) throws Exception { System.out.println("event:" + event); System.out.println("内容:"+ new String(event.getData())); counter.countDown(); } }).forPath("/curator/node2"); counter.await(); } /** * 设置节点的值 * @throws Exception */ @Test public void setData() throws Exception { cf.setData() .withVersion(0) //指定版本,可选 .forPath("/curator/node2", "测试修改".getBytes()); } /** * 删除节点 * @throws Exception */ @Test public void delete() throws Exception { cf.delete() .guaranteed() //如果删除失败,只要会话有效就会不断的重试,直到删除成功为止 .deletingChildrenIfNeeded()//删除子节点,可选 .withVersion(0) //指定版本,可选 .forPath("/curator/node4"); } /** * 获取子节点 * @throws Exception */ @Test public void getChildren() throws Exception { List<String> list = cf.getChildren().forPath("/curator"); System.out.println("子节点:" + list); } }
3、监控数据变化
package com.inspur.demo.general.zookeeper; import org.apache.curator.RetryPolicy; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.recipes.cache.*; import org.apache.curator.retry.ExponentialBackoffRetry; import org.junit.After; import org.junit.Before; import org.junit.Test; import java.util.concurrent.*; public class CuratorWatchCase { //Zookeeper地址,集群多个地址间用逗号分隔,如:10.49.196.10:2181,10.49.196.11:2181,10.49.196.12:2181 private static String connectString = "10.49.196.10:2181"; private static int sessionTimeout = 20 * 1000; private static int connectionTimeout = 10 * 1000; private CuratorFramework cf; @Before public void before() { RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000,3); cf = CuratorFrameworkFactory.builder() .connectString(connectString) .sessionTimeoutMs(sessionTimeout) .connectionTimeoutMs(connectionTimeout) .retryPolicy(retryPolicy) .build(); cf.start(); } @After public void after() throws Exception { cf.close(); } /** * 监控节点变化 * @throws Exception */ @Test public void watchNode() throws Exception { CountDownLatch counter = new CountDownLatch(1); NodeCache cache = new NodeCache(cf, "/curator/node2", false); cache.start(); cache.getListenable().addListener(new NodeCacheListener() { @Override public void nodeChanged() throws Exception { System.out.println("路径为:" + cache.getCurrentData().getPath()); System.out.println("数据为:" + new String(cache.getCurrentData().getData())); System.out.println("状态为:" + cache.getCurrentData().getStat()); //某种情况下退出监控 //if (...) { // counter.countDown(); //} } }); counter.await(); } /** * 监控子节点变化 * @throws Exception */ @Test public void watchChildren() throws Exception { //使用自定义的线程池 ExecutorService threadPool = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(32), Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy()); CountDownLatch counter = new CountDownLatch(1); PathChildrenCache cache = new PathChildrenCache(cf, "/curator/node2", true); cache.start(); cache.getListenable().addListener(new PathChildrenCacheListener() { @Override public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception { switch (event.getType()) { case CHILD_ADDED: System.out.println("CHILD_ADDED"); break; case CHILD_UPDATED: System.out.println("CHILD_UPDATED"); break; case CHILD_REMOVED: System.out.println("CHILD_REMOVED"); break; default: System.out.println(event.getType()); } System.out.println("子节点信息:" + event.getData()); //某种情况下退出监控 //if (...) { // counter.countDown(); //} } }, threadPool); counter.await(); threadPool.shutdownNow(); } }
可以看到不管是基本的增删改查还是监控数据变化,Curator都比原生的API好用很多。
以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网
猜你喜欢:本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
Django 1.0 Template Development
Scott Newman / Packt / 2008 / 24.99
Django is a high-level Python web application framework designed to support the rapid development of dynamic websites, web applications, and web services. Getting the most out of its template system a......一起来看看 《Django 1.0 Template Development》 这本书的介绍吧!