[关闭]
@boothsun 2018-03-14T15:49:55.000000Z 字数 15458 阅读 2990

ZK客户端操作

ZK


shell操作

Java客户端

原始API

pom文件:

  1. <dependency>
  2. <groupId>org.apache.zookeeper</groupId>
  3. <artifactId>zookeeper</artifactId>
  4. <version>3.4.10</version>
  5. </dependency>

Java代码:

  1. public class ZkClientUtil {
  2. private static final Logger logger = LoggerFactory.getLogger(ZkDemo.class);
  3. private static ZooKeeper zk;
  4. // /zfpt 必须提前创建好
  5. private static String zkPath = "master:2181,slave1:2181,slave2:2181/zfpt" ;
  6. static CountDownLatch connectedSemaphore = new CountDownLatch( 1 );
  7. static {
  8. try {
  9. zk = new ZooKeeper(zkPath, 1000 , new Watcher() {
  10. // 监控所有被触发的事件
  11. public void process(WatchedEvent event) {
  12. logger.info("已经触发了 {} 事件! ", event.getType());
  13. connectedSemaphore.countDown();
  14. }
  15. });
  16. }catch (Exception e) {
  17. System.err.println("系统异常");
  18. }
  19. }
  20. public static ZooKeeper getZKConnection() {
  21. try {
  22. if (zk == null) {
  23. connectedSemaphore.await();
  24. }
  25. return zk ;
  26. }catch (Exception e) {
  27. System.err.printf("ZK初始化失败");
  28. }
  29. return null ;
  30. }
  31. }
  32. /**
  33. * 相应操作
  34. */
  35. public class ZkDemoTest {
  36. /**
  37. * 同步创建 zk节点
  38. * @throws Exception
  39. */
  40. @Test
  41. public void create() throws Exception {
  42. String response = getZKConnection().create("/aa3","test".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
  43. System.out.println(response) ;
  44. }
  45. /**
  46. * 异步回调创建 zk节点
  47. * @throws Exception
  48. */
  49. @Test
  50. public void createASync() throws Exception {
  51. CountDownLatch countDownLatch = new CountDownLatch(1) ;
  52. //StringCallback 异步回调 ctx:用于传递给回调方法的一个参数。通常是放一个上下文(Context)信息
  53. getZKConnection().create("/aa2", "test".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL, (rc, path, ctx, name) -> {
  54. System.out.println("rc:" + rc + "&path:" + path + "&ctx:" + ctx + "&name:" + name );
  55. countDownLatch.countDown();
  56. },"1212121");
  57. countDownLatch.await();
  58. }
  59. /**
  60. * 同步删除
  61. * @throws Exception
  62. */
  63. @Test
  64. public void delete() throws Exception {
  65. // version 表示此次删除针对于的版本号。 传-1 表示不忽略版本号
  66. getZKConnection().delete("/aa1",-1);
  67. }
  68. /**
  69. * 异步删除
  70. * @throws Exception
  71. */
  72. @Test
  73. public void deleteASync() throws Exception {
  74. CountDownLatch countDownLatch = new CountDownLatch(1) ;
  75. getZKConnection().delete("/aa1",-1, (rc, path, ctx) -> {
  76. System.out.println("rc:" + rc +"&path:" + path + "&ctx:" + ctx);
  77. countDownLatch.countDown();
  78. },"删除操作");
  79. countDownLatch.await();
  80. }
  81. /**
  82. * 同步获取数据,包括子节点列表的获取和当前节点数据的获取
  83. * @throws Exception
  84. */
  85. @Test
  86. public void getChildren() throws Exception {
  87. Stat stat = new Stat() ;
  88. // path:指定数据节点的节点路径, 即API调用的目的是获取该节点的子节点列表
  89. // Watcher : 注册的Watcher。一旦在本次获取子节点之后,子节点列表发生变更的话,就会向该Watcher发送通知。Watcher仅会被触发一次。
  90. // state: 获取指定数据节点(也就是path参数对应的节点)的状态信息(无节点名和数据内容),传入旧的state将会被来自服务端响应的新state对象替换。
  91. List<String> list = ZkClientUtil.getZKConnection().getChildren("/", event -> {
  92. System.out.println("我是监听事件,监听子节点变化");
  93. } ,stat);
  94. System.out.println(list);
  95. System.out.println(stat);
  96. }
  97. /**
  98. * 异步获取子节点
  99. * @throws Exception
  100. */
  101. @Test
  102. public void getChildrenASync() throws Exception {
  103. CountDownLatch countDownLatch = new CountDownLatch(1) ;
  104. ZkClientUtil.getZKConnection().getChildren("/",event -> {
  105. System.out.println("我是监听事件,监听子节点变化");
  106. } , (rc, path, ctx, children) -> {
  107. //异步回调
  108. System.out.println("children:" + children);
  109. countDownLatch.countDown();
  110. },"context");
  111. countDownLatch.await();
  112. }
  113. /**
  114. * 同步获取数据
  115. * @throws Exception
  116. */
  117. @Test
  118. public void getDataTest() throws Exception {
  119. Stat stat = new Stat() ;
  120. byte[] bytes = ZkClientUtil.getZKConnection().getData("/aa1",event -> {
  121. System.out.println("我是监听事件,监听数据状态发生变化");
  122. },stat);
  123. System.out.println(new String(bytes));
  124. }
  125. @Test
  126. public void getDataASync() throws Exception {
  127. CountDownLatch countDownLatch = new CountDownLatch(1) ;
  128. ZkClientUtil.getZKConnection().getData("/aa1",event -> {
  129. System.out.println("我是监听事件,监听数据状态发生变化");
  130. }, (rc, path, ctx, data, stat) -> {
  131. System.out.println("获取到的内容是:"+new String(data));
  132. countDownLatch.countDown();
  133. },"121");
  134. countDownLatch.await();
  135. }
  136. /**
  137. * 同步更新数据
  138. */
  139. @Test
  140. public void setData() throws Exception{
  141. byte[] oldValue = ZkClientUtil.getZKConnection().getData("/aa1",false,null);
  142. System.out.println("更新前值是:" + new String(oldValue));
  143. Stat stat = ZkClientUtil.getZKConnection().setData("/aa1","helloWorld".getBytes(),-1);
  144. byte[] newValue = ZkClientUtil.getZKConnection().getData("/aa1",false,null);
  145. System.out.println("更新后值是:" + new String(newValue));
  146. }
  147. /**
  148. * 异步更新数据
  149. * @throws Exception
  150. */
  151. @Test
  152. public void setDataASync() throws Exception {
  153. CountDownLatch countDownLatch = new CountDownLatch(1) ;
  154. ZkClientUtil.getZKConnection().setData("/aa1","helloChina".getBytes(),-1, (rc, path, ctx, name) -> {
  155. System.out.println("更新成功");
  156. countDownLatch.countDown();
  157. },"1111");
  158. countDownLatch.await();
  159. byte[] newValue = ZkClientUtil.getZKConnection().getData("/aa1",false,null);
  160. System.out.println("更新前值是:" + new String(newValue));
  161. }
  162. }

使用ZkClient客户端

pom.xml

  1. <dependency>
  2. <groupId>org.apache.zookeeper</groupId>
  3. <artifactId>zookeeper</artifactId>
  4. <version>3.4.10</version>
  5. </dependency>
  6. <!-- https://mvnrepository.com/artifact/com.101tec/zkclient -->
  7. <dependency>
  8. <groupId>com.101tec</groupId>
  9. <artifactId>zkclient</artifactId>
  10. <version>0.10</version>
  11. </dependency>

创建节点:

  1. public class ZKClientTest {
  2. private static final String zkPath = "master:2181,slave1:2181,slave2:2181/zfpt";
  3. private static ZkClient zkClient = null;
  4. @Before
  5. public void init() {
  6. zkClient = new ZkClient(zkPath,10000,10000);
  7. }
  8. @Test
  9. public void create() {
  10. // 创建节点
  11. String result = zkClient.create("/aa3","test", CreateMode.EPHEMERAL);
  12. System.out.println(result);
  13. // 递归创建
  14. zkClient.createPersistent("/trade/open",true);
  15. // 注意不要写成这种,API的问题,这种无法递归创建
  16. // zkClient.createPersistent("/trade/open",true);
  17. }
  18. }

相比原始API,ZkClient创建Znode的特性:

删除接口:


  1. @Test
  2. public void delete() {
  3. // 递归删除
  4. Boolean results = zkClient.deleteRecursive("/trade");
  5. System.out.println("删除结果:" + results);
  6. }

特性:

读取子节点:

  1. /**
  2. * 获取子节点
  3. */
  4. @Test
  5. public void getChildren() {
  6. List<String> childrenList = zkClient.getChildren("/trade");
  7. System.out.println(childrenList);
  8. }

获取节点数据:

  1. @Test
  2. public void readData() {
  3. String data = zkClient.readData("/trade");
  4. System.out.println(data);
  5. }

更新数据:

  1. @Test
  2. public void setData() {
  3. String oldValue = zkClient.readData("/trade");
  4. System.out.println("获取前:" + oldValue);
  5. zkClient.writeData("/trade","I am trade");
  6. String newValue = zkClient.readData("/trade");
  7. System.out.println("更新后:" + newValue);
  8. }

监听器:

  1. //监听子节点变化
  2. zkClient.subscribeChildChanges("/trade",(parentPath,currenChilds)->{
  3. System.out.println("子节点发生变化");
  4. });
  5. zkClient.subscribeDataChanges("/trade",new IZkDataListener() {
  6. @Override
  7. public void handleDataChange(String dataPath, Object data) throws Exception {
  8. System.out.println("dataPath:" + dataPath +"发生变化,最新数据是:" + data);
  9. }
  10. @Override
  11. public void handleDataDeleted(String dataPath) throws Exception {
  12. System.out.printf("dataPath被删除");
  13. }
  14. });

原生Watcher只支持一次注册,但是ZkClient的listener已经支持重复注册。

Curator

Curator在ZooKeeper原生API的基础上进行了包装,提供了一套易用性和可读性更强的Fluent风格的客户端API框架。

除此之外,Curator中还提供了ZooKeeper各种应用场景(Recipe 如共享锁服务、Master选举机制和分布式计数器等)的抽象封装。

pom.xml

  1. <dependency>
  2. <groupId>org.apache.curator</groupId>
  3. <artifactId>curator-framework</artifactId>
  4. <version>RELEASE</version>
  5. </dependency>
  6. <!-- https://mvnrepository.com/artifact/org.apache.curator/curator-recipes -->
  7. <dependency>
  8. <groupId>org.apache.curator</groupId>
  9. <artifactId>curator-recipes</artifactId>
  10. <version>RELEASE</version>
  11. </dependency>

创建节点:

  1. @Before
  2. public void before() {
  3. // 非Fluent风格
  4. // CuratorFramework curatorFramework = CuratorFrameworkFactory.newClient(zkPath, new RetryOneTime(100));
  5. // System.out.println(curatorFramework.getState());
  6. // curatorFramework.start();
  7. // System.out.println(curatorFramework.getState());
  8. // Fluent风格
  9. curatorFramework = CuratorFrameworkFactory.builder()
  10. .connectString("master:2181,slave1:2181,slave2:2181")
  11. .retryPolicy(new RetryOneTime(1000)) //重试策略
  12. .namespace("zfpt") // 命名空间
  13. .build();
  14. curatorFramework.start();
  15. }
  16. @Test
  17. public void create() throws Exception {
  18. // 创建一个持久化节点,初始化内容为空
  19. curatorFramework.create().forPath("/dus");
  20. // 创建一个持久化节点,初始化内容不为空
  21. curatorFramework.create().forPath("/dus1","test".getBytes());
  22. // 创建一个临时节点 初始化内容为空
  23. curatorFramework.create().withMode(CreateMode.EPHEMERAL).forPath("/dus2");
  24. // 创建一个临时节点,并递归创建不存在的父节点
  25. // ZooKeeper中规定所有非叶子节点必须为持久节点。因此下面创建出来只有dus2会是临时节点。
  26. curatorFramework.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath("/dj/dus2");
  27. }

删除节点:

  1. //删除一个节点
  2. client.delete().forPath(path);
  3. // 删除一个节点,并递归删除其所有子节点
  4. client.delete().deletingChildrenIfNeeded().forPath(path)
  5. // 删除一个节点,强制指定版本进行删除
  6. client.delete().withVersion(version).forPath(path);
  7. //删除一个节点,强制保证删除成功
  8. client.delete().delete().guaranteed().forPath(path);

guaranteed() 保证删除失败后,Curator会在后台持续进行删除操作。

读取数据:

  1. // 读取一个节点的数据内容
  2. client.getData().forPath(path);
  3. // 读取一个节点的数据内容,同时获取到该节点的stat
  4. client.getData().storingStatIn(stat).forPath(path);

更新数据:

  1. // 更新一个节点的数据内容
  2. client.setData().forPath(path);
  3. // 更新一个节点的数据内容,强制指定版本进行更新
  4. client.setData().withVersion(version).forPath(path);

异步接口:

也就是说如果没有传入自定义线程池,就由EventThread这个线程串行处理所有的事件通知,如果传入了,则由自定义线程池去处理。

  1. @Test
  2. public void BackgroundCallbackTest() throws Exception{
  3. CountDownLatch countDownLatch = new CountDownLatch(2);
  4. curatorFramework.getData().inBackground((client,event)->{
  5. System.out.println(Thread.currentThread().getName());
  6. System.out.println(event);
  7. System.out.println(client);
  8. }).forPath("/trade");
  9. Executor executor = Executors.newFixedThreadPool(2,new ThreadFactoryBuilder().setNameFormat("curator-%d").build() );
  10. curatorFramework.getData().inBackground((client,event)->{
  11. System.out.println(Thread.currentThread().getName());
  12. System.out.println(event);
  13. System.out.println(client);
  14. },executor).forPath("/trade");
  15. countDownLatch.await();
  16. }

事件监听:

Curator引入了Cache来实现对ZooKeeper服务端事件的监听,Cache是Curator中对事件监听的包装,其对事件的监听其实可以近似看作是一个本地缓存视图和远程ZooKeeper视图的对比过程。同时Curator能够自动为开发人员处理反复注册监听,从而大大简化了原生API开发的繁琐过程。Cache分为两类监听类型:节点监听和子节点监听。

NodeCache:
NodeCache即可以用于监听指定ZooKeeper数据节点内容的变化,也能监听指定节点是否存在,如果原本节点不存在,那么Cache就会在节点被创建后出发NodeCacheListener。但是如果该数据节点被删除,那么Curator就无法再出发NodeCacheListener了。

  1. @Test
  2. public void NodeCacheTest() throws Exception{
  3. // client : Curator 客户端实例 。 path: 监听节点的节点路径 。 dataIsCompressed:是否进行数据压缩
  4. NodeCache nodeCache = new NodeCache(curatorFramework,"/trade",false);
  5. // buildInitial:如果设置为true 则NodeCache在第一次启动的时候就会立刻从ZK上读取对应节点的数据内容 保存到Cache中。
  6. nodeCache.start(false);
  7. nodeCache.getListenable().addListener(()->{
  8. System.out.println("Node data update , new data:" + new String(nodeCache.getCurrentData().getData()));
  9. });
  10. //******************** 监听一个不存在的节点 当节点被创建后,也会触发监听器 **********************//
  11. // client : Curator 客户端实例 。 path: 监听节点的节点路径 。 dataIsCompressed:是否进行数据压缩
  12. NodeCache nodeCache2 = new NodeCache(curatorFramework,"/trade1",false);
  13. // buildInitial:如果设置为true 则NodeCache在第一次启动的时候就会立刻从ZK上读取对应节点的数据内容 保存到Cache中。
  14. nodeCache2.start(false);
  15. nodeCache2.getListenable().addListener(()->{
  16. System.out.println("Node data update , new data:" + new String(nodeCache.getCurrentData().getData()));
  17. });
  18. Thread.sleep(Integer.MAX_VALUE);
  19. }

PathChildrenCache:
用于监听指定ZooKeeper数据节点的子节点变化情况。当指定节点的子节点发生变化时,就会回调该方法。PathChildrenCacheEvent类中定义了所有的事件类型,主要包括新增子节点(CHILD_ADDED)、子节点数据变更(CHILD_UPDATED)和子节点删除(CHILD_REMOVED)三类。但是该数据节点的变化不会被此监听器监听到。无法监听孙子节点的变更。

  1. @Test
  2. public void PathChildrenCacheTest() throws Exception {
  3. PathChildrenCache nodeCache = new PathChildrenCache(curatorFramework,"/trade",true);
  4. // buildInitial:如果设置为true 则NodeCache在第一次启动的时候就会立刻从ZK上读取对应节点的数据内容 保存到Cache中。
  5. nodeCache.start();
  6. nodeCache.getListenable().addListener((client , event)->{
  7. switch (event.getType()) {
  8. case CHILD_ADDED :
  9. System.out.println("新增子节点,数据内容是" + new String(event.getData().getData())); break;
  10. case CHILD_UPDATED:
  11. System.out.println("子节点被更新,数据内容是" + new String(event.getData().getData())); break;
  12. case CHILD_REMOVED:
  13. System.out.println("删除子节点,数据内容是" + new String(event.getData().getData())); break;
  14. default: break;
  15. }
  16. });
  17. curatorFramework.create().withMode(CreateMode.PERSISTENT).forPath("/trade/PathChildrenCache","new".getBytes());
  18. Thread.sleep(100L);
  19. curatorFramework.setData().forPath("/trade/PathChildrenCache","update".getBytes());
  20. Thread.sleep(100L);
  21. curatorFramework.delete().withVersion(-1).forPath("/trade/PathChildrenCache");
  22. }

Master选举:

在分布式系统中,经常会碰到这样的场景:对于一个复杂的任务,仅需要从集群中选举出一台进行处理即可。诸如此类的分布式问题,我们统称为“Master选举”。借助于ZooKeeper,我们可以比较方便地实现Master选举的功能,其大体思路非常简单:

选择一个根节点,例如/master_select,多台机器同时向该节点创建一个子节点/master_select/lock,利用ZooKeeper的特性,最终只有一台机器能够创建成功,成功的那台机器就作为Master。

Curator也是基于这个思路,但是它将节点创建、事件监听和自动选举过程进行了封装,开发人员只需要调用简单的API即可实现Master选举。

  1. @Test
  2. public void leaderSelector() throws Exception {
  3. AtomicInteger masterCount = new AtomicInteger(0);
  4. ExecutorService executor = Executors.newFixedThreadPool(4,new ThreadFactoryBuilder().setNameFormat("master_selector-%d").build() );
  5. for( int i = 0 ; i < 4; i++) {
  6. executor.execute(()-> {
  7. LeaderSelector leaderSelector = new LeaderSelector(curatorFramework, "/master_selector", new LeaderSelectorListenerAdapter() {
  8. @Override
  9. public void takeLeadership(CuratorFramework curatorFramework) throws Exception {
  10. masterCount.incrementAndGet();
  11. System.out.println(Thread.currentThread().getName() + "成为Master, 当前Master数量:" + masterCount);
  12. Thread.sleep(1000L);
  13. System.out.println(Thread.currentThread().getName() + "宕机,失去Master角色,剩下master数量:" + masterCount.decrementAndGet());
  14. }
  15. });
  16. leaderSelector.autoRequeue();
  17. leaderSelector.start();
  18. });
  19. }
  20. Thread.sleep(Integer.MAX_VALUE);
  21. }

分布式锁:

为了保证数据的一致性,临界资源加锁,保持有序访问。

  1. /**
  2. * 观察 Lock【n】 抢到锁 和 Lock【n】 释放锁 是不是成对出现。 如果不是,则说明有重复加锁的
  3. * @throws Exception
  4. */
  5. @Test
  6. public void InterProcessMutex() throws Exception {
  7. InterProcessMutex lock = new InterProcessMutex(curatorFramework,"/trade/mylock") ;
  8. for (int i = 0 ; i < 100 ; i++) {
  9. Thread currentThread = new Thread(() -> {
  10. try {
  11. // 加锁
  12. lock.acquire();
  13. System.out.println(Thread.currentThread().getName() + " 抢到锁");
  14. }catch (Exception e) {
  15. } finally {
  16. try {
  17. System.out.println(Thread.currentThread().getName() + " 释放锁");
  18. // 释放锁
  19. lock.release();
  20. } catch (Exception e) {
  21. e.printStackTrace();
  22. }
  23. }
  24. });
  25. currentThread.setName("Lock【" + i + "】");
  26. currentThread.start();
  27. }
  28. Thread.sleep(Integer.MAX_VALUE);
  29. }

分布式计数器:

  1. @Test
  2. public void DistributedAtomicInteger() throws Exception {
  3. DistributedAtomicInteger distributedAtomicInteger = new DistributedAtomicInteger(curatorFramework,"/trade/PathChildrenCache", new RetryNTimes(1000,3)) ;
  4. System.out.println(distributedAtomicInteger.increment().postValue());
  5. }

分布式Barrier:

与CyClicBarrir同样的语义。

  1. /**
  2. * 没有定义成员数量。直接通过removeBarrier();释放屏障
  3. * @throws Exception
  4. */
  5. @Test
  6. public void barrier() throws Exception {
  7. ExecutorService executor = Executors.newFixedThreadPool(4,new ThreadFactoryBuilder().setNameFormat("barrier-%d").build() );
  8. for( int i = 0 ; i < 4; i++) {
  9. executor.execute(()-> {
  10. CuratorFramework client = CuratorFrameworkFactory.builder()
  11. .connectString("master:2181,slave1:2181,slave2:2181")
  12. .retryPolicy(new RetryOneTime(1000)) //重试策略
  13. .namespace("zfpt") // 命名空间
  14. .build();
  15. client.start();
  16. distributedBarrier = new DistributedBarrier(curatorFramework,"/trade/PathChildrenCache") ;
  17. System.out.println(Thread.currentThread().getName() + "到达Barrier前");
  18. try {
  19. distributedBarrier.setBarrier();
  20. distributedBarrier.waitOnBarrier();
  21. System.out.println(Thread.currentThread().getName() + "越过屏障");
  22. } catch (Exception e) {
  23. e.printStackTrace();
  24. }
  25. });
  26. }
  27. Thread.sleep(3000L);
  28. distributedBarrier.removeBarrier();
  29. }
  30. /**
  31. * 定义成员数量,到齐了就 越过屏障
  32. * @throws Exception
  33. */
  34. @Test
  35. public void barrier2() throws Exception {
  36. ExecutorService executor = Executors.newFixedThreadPool(4,new ThreadFactoryBuilder().setNameFormat("barrier-%d").build() );
  37. for( int i = 0 ; i < 4; i++) {
  38. executor.execute(()-> {
  39. CuratorFramework client = CuratorFrameworkFactory.builder()
  40. .connectString("master:2181,slave1:2181,slave2:2181")
  41. .retryPolicy(new RetryOneTime(1000)) //重试策略
  42. .namespace("zfpt") // 命名空间
  43. .build();
  44. client.start();
  45. DistributedDoubleBarrier distributedDoubleBarrier = new DistributedDoubleBarrier(client,"/trade/PathChildrenCache",4) ;
  46. try {
  47. Thread.sleep(1000L);
  48. System.out.println(Thread.currentThread().getName() + "到达Barrier前");
  49. distributedDoubleBarrier.enter();
  50. System.out.println(Thread.currentThread().getName() + "越过屏障");
  51. Thread.sleep(1000L);
  52. distributedDoubleBarrier.leave();
  53. System.out.println(Thread.currentThread().getName() + "已经离开");
  54. } catch (Exception e) {
  55. e.printStackTrace();
  56. }
  57. });
  58. }
  59. Thread.sleep(Integer.MAX_VALUE);
  60. }

ZK三种客户端对比

原生API问题

  1. Zookeeper的Watcher是一次性的,每次触发之后都需要重新进行注册。
  2. Session超时之后没有实现重试机制。
  3. 无法实现递归操作(递归创建 递归删除)
  4. 创建节点时如果节点存在抛出异常,需要自行检查节点是否存在。
  5. 只提供了简单的byte[]数组的接口,没有提供更丰富的序列化方式(比如对象序列化)。

ZkClient

Zookeeper原生API接口的基础上进行了包装,内部实现了Session超时重连,Watcher反复注册等功能。

  1. 解决了Session超时重连
  2. Watcher反复注册等功能。
  3. 更丰富的序列化方式,ZKClient允许用户自行注册序列化器,然后客户端在进行读写操作过程中,就会自动进行系列化和反序列化操作,默认情况下,ZKClient使用Java自带的序列化方式对对象进行序列化。

Curator 客户端

Curator也解决了很多Zookeeper客户端非常底层的细节开发工作,包括连接重连、Watcher反复注册和NodeExistsException异常。

Curator还在Zookeeper原生API的基础上进行了包装,提供了一套易用性和可读性更强的Fluent风格的客户端API框架。

除此之外,Curator中还提供了Zookeeper各种应用场景(共享锁、Master选择、分布式计数器等)抽象封装。

添加新批注
在作者公开此批注前,只有你和作者可见。
回复批注