内容简介:Apache Curator是用于Apache ZooKeeper的一个Java客户端库;它包括一个高级API框架和实用程序,使使用Apache ZooKeeper更加容易和可靠。Curator之于ZooKeeper就像Cuava之于Java。本文件主要介绍使用Curator操作Zookeeper,文中所使用到的软件版本:Java 1.8.0_191、Zookeeper 3.6.0、Junit 4.13。
Apache Curator是用于Apache ZooKeeper的一个 Java 客户端库;它包括一个高级API框架和实用程序,使使用Apache ZooKeeper更加容易和可靠。Curator之于ZooKeeper就像Cuava之于Java。
本文件主要介绍使用Curator操作Zookeeper,文中所使用到的软件版本:Java 1.8.0_191、Zookeeper 3.6.0、Junit 4.13。
1、引入依赖
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.6.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>4.3.0</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.13</version>
</dependency>
2、基本操作
package com.inspur.demo.general.zookeeper;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.BackgroundCallback;
import org.apache.curator.framework.api.CuratorEvent;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Id;
import org.apache.zookeeper.data.Stat;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
/**
* 使用Curator操作Zookeeper
*/
public class CuratorCase {
//Zookeeper地址,集群多个地址间用逗号分隔,如:10.49.196.10:2181,10.49.196.11:2181,10.49.196.12:2181
private static String connectString = "10.49.196.10:2181";
private static int sessionTimeout = 20 * 1000;
private static int connectionTimeout = 10 * 1000;
private CuratorFramework cf;
@Before
public void before() {
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000,3);
cf = CuratorFrameworkFactory.builder()
.connectString(connectString)
.sessionTimeoutMs(sessionTimeout)
.connectionTimeoutMs(connectionTimeout)
.retryPolicy(retryPolicy)
.build();
cf.start();
}
@After
public void after() throws Exception {
cf.close();
}
/**
* 创建节点
*/
@Test
public void create() throws Exception {
/*
* 同步创建节点
* 1.除非指明创建节点的类型,默认是持久节点
* 2.临时节点没有子节点;所以递归创建出来的节点,只有最后的数据节点才是指定类型的节点,其父节点是持久节点
*/
//创建一个内容为空的节点
cf.create().forPath("/curator/node1");
//创建一个内容为aaa的节点
cf.create().forPath("/curator/node2", "aaa".getBytes());
//创建一个临时节点
cf.create().withMode(CreateMode.EPHEMERAL).forPath("/curator/node3");
//递归创建,最后的节点类型为临时节点
cf.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath("/curator/node4/a/b");
//创建一个节点,ACL为digest:jack:tgi9UCnyPo5FJjVylKr05nAlWeg=:cdrwa
cf.create().withACL(Collections.singletonList(new ACL(ZooDefs.Perms.ALL, new Id("digest", "jack:tgi9UCnyPo5FJjVylKr05nAlWeg=")))).forPath("/curator/node5");
/*
* 异步创建节点
* 可以指定线程池,不指定则使用Zookeeper的EventThread线程对事件进行串行处理
*/
CountDownLatch counter = new CountDownLatch(2);
cf.create().inBackground(new BackgroundCallback(){
@Override
public void processResult(CuratorFramework client, CuratorEvent event) throws Exception {
System.out.println(event);
counter.countDown();
}
}, Executors.newFixedThreadPool(1)).forPath("/curator/node6");
cf.create().inBackground(new BackgroundCallback(){
@Override
public void processResult(CuratorFramework client, CuratorEvent event) throws Exception {
System.out.println(event);
counter.countDown();
}
}).forPath("/curator/node7");
counter.await();
}
/**
* 获取节点内容
* @throws Exception
*/
@Test
public void getData() throws Exception {
Stat stat = new Stat();
byte[] bytes = cf.getData()
.storingStatIn(stat)//状态,可选
.forPath("/curator/node2");
System.out.println("状态信息:" + stat);
System.out.println("内容:" + new String(bytes));
//异步获取数据
CountDownLatch counter = new CountDownLatch(1);
cf.getData().inBackground(new BackgroundCallback(){
@Override
public void processResult(CuratorFramework client, CuratorEvent event) throws Exception {
System.out.println("event:" + event);
System.out.println("内容:"+ new String(event.getData()));
counter.countDown();
}
}).forPath("/curator/node2");
counter.await();
}
/**
* 设置节点的值
* @throws Exception
*/
@Test
public void setData() throws Exception {
cf.setData()
.withVersion(0) //指定版本,可选
.forPath("/curator/node2", "测试修改".getBytes());
}
/**
* 删除节点
* @throws Exception
*/
@Test
public void delete() throws Exception {
cf.delete()
.guaranteed() //如果删除失败,只要会话有效就会不断的重试,直到删除成功为止
.deletingChildrenIfNeeded()//删除子节点,可选
.withVersion(0) //指定版本,可选
.forPath("/curator/node4");
}
/**
* 获取子节点
* @throws Exception
*/
@Test
public void getChildren() throws Exception {
List<String> list = cf.getChildren().forPath("/curator");
System.out.println("子节点:" + list);
}
}
3、监控数据变化
package com.inspur.demo.general.zookeeper;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.*;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.util.concurrent.*;
public class CuratorWatchCase {
//Zookeeper地址,集群多个地址间用逗号分隔,如:10.49.196.10:2181,10.49.196.11:2181,10.49.196.12:2181
private static String connectString = "10.49.196.10:2181";
private static int sessionTimeout = 20 * 1000;
private static int connectionTimeout = 10 * 1000;
private CuratorFramework cf;
@Before
public void before() {
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000,3);
cf = CuratorFrameworkFactory.builder()
.connectString(connectString)
.sessionTimeoutMs(sessionTimeout)
.connectionTimeoutMs(connectionTimeout)
.retryPolicy(retryPolicy)
.build();
cf.start();
}
@After
public void after() throws Exception {
cf.close();
}
/**
* 监控节点变化
* @throws Exception
*/
@Test
public void watchNode() throws Exception {
CountDownLatch counter = new CountDownLatch(1);
NodeCache cache = new NodeCache(cf, "/curator/node2", false);
cache.start();
cache.getListenable().addListener(new NodeCacheListener() {
@Override
public void nodeChanged() throws Exception {
System.out.println("路径为:" + cache.getCurrentData().getPath());
System.out.println("数据为:" + new String(cache.getCurrentData().getData()));
System.out.println("状态为:" + cache.getCurrentData().getStat());
//某种情况下退出监控
//if (...) {
// counter.countDown();
//}
}
});
counter.await();
}
/**
* 监控子节点变化
* @throws Exception
*/
@Test
public void watchChildren() throws Exception {
//使用自定义的线程池
ExecutorService threadPool = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(32), Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy());
CountDownLatch counter = new CountDownLatch(1);
PathChildrenCache cache = new PathChildrenCache(cf, "/curator/node2", true);
cache.start();
cache.getListenable().addListener(new PathChildrenCacheListener() {
@Override
public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
switch (event.getType()) {
case CHILD_ADDED:
System.out.println("CHILD_ADDED");
break;
case CHILD_UPDATED:
System.out.println("CHILD_UPDATED");
break;
case CHILD_REMOVED:
System.out.println("CHILD_REMOVED");
break;
default:
System.out.println(event.getType());
}
System.out.println("子节点信息:" + event.getData());
//某种情况下退出监控
//if (...) {
// counter.countDown();
//}
}
}, threadPool);
counter.await();
threadPool.shutdownNow();
}
}
可以看到不管是基本的增删改查还是监控数据变化,Curator都比原生的API好用很多。
以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网
猜你喜欢:本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。