原生API Maven依赖 示例内容 代码 打印结果 参考资料 原生API Maven依赖 <dependency> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> <version>3.3.1</version> </dependency> 示例内容 创建节点 获取节点内容 修改节点内容 删除节点 添加子节点 检测节点是否存在 异步创建节点 异步修改节点内容 异步修改节点内容 带权限创建节点 带权限获取节点内容 带权限删除节点 权限补充:删除操作的权限跟其他操作相比比较特殊,当客户端对一个数据节点添加了权限信息之后,对于删除操作而言,其作用范围是其子节点。也就是说,当我们对一个数据节点添加权限信息之后,依然可以自由地删除这个节点,但是对于这个节点的子节点,就必须使用相应的权限信息才能够删除掉它。 代码 package com.freud.zk.defaultapi; import java.text.MessageFormat; import java.util.concurrent.CountDownLatch; import org.apache.zookeeper.AsyncCallback; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooDefs.Ids; import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.data.Stat; /** * * Zookeeper 默认API * * @author Freud * */ public class DefaultZookeeper { public static final CountDownLatch CONNECT_SIGNAL = new CountDownLatch(1); private static final String CONNECT_STRING = "localhost:2181"; private static final Integer WAITING = 1; private static final Integer SECOND = 1000; public static void main(String[] args) throws Exception { Watcher watcher = new DefaultWatcher(); // 创建连接 - 异步操作,所以需要手动做等待处理 ZooKeeper zk = new ZooKeeper(CONNECT_STRING, 2 * SECOND, watcher); CONNECT_SIGNAL.await(); System.out.println("Zk server connected..."); // 同步操作 System.out.println("\r\n--Sync operations--"); new DefaultZookeeper().syncOperations(zk, watcher); // 异步操作 System.out.println("\r\n--Async operations--"); new DefaultZookeeper().asyncOperations(zk); // 带权限相关操作 System.out.println("\r\n--Authorize operations--"); new DefaultZookeeper().authOperations(watcher); // 关闭连接 zk.close(); System.out.println("\r\nZkServer closed..."); } private void syncOperations(ZooKeeper zk, Watcher watcher) throws Exception { Thread.sleep(WAITING * SECOND); Stat stat = zk.exists("/hifreud", true); // 创建节点 if (stat == null) { // CreateMode // PERSISTENT : 持久 // PERSISTENT_SEQUENTIAL : 持久顺序 // EPHEMERAL : 临时 // EPHEMERAL_SEQUENTIAL : 临时顺序 zk.create("/hifreud", "Hi Freud".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } Thread.sleep(WAITING * SECOND); stat = zk.exists("/hifreud", true); // 获得和修改节点内容 if (stat != null) { System.out.println("Path 'hifreud' data before change : " + new String(zk.getData("/hifreud", true, stat))); zk.setData("/hifreud", "Hi Freud Again".getBytes(), stat.getVersion()); System.out.println("Path 'hifreud' data after change : " + new String(zk.getData("/hifreud", true, stat))); } Thread.sleep(WAITING * SECOND); stat = zk.exists("/hifreud", true); // 创建子节点 if (stat != null) { if (zk.exists("/hifreud/hi", watcher) == null) { zk.create("/hifreud/hi", "Hi Freud - hi".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } } Thread.sleep(WAITING * SECOND); stat = zk.exists("/hifreud/hi", true); // 删除子节点 if (stat != null) { zk.delete("/hifreud/hi", stat.getVersion()); } Thread.sleep(WAITING * SECOND); stat = zk.exists("/hifreud", true); // 删除节点 if (stat != null) { zk.delete("/hifreud", stat.getVersion()); } } private void asyncOperations(ZooKeeper zk) throws Exception { Thread.sleep(WAITING * SECOND); Stat stat = zk.exists("/asynchifreud", true); // 异步创建节点 if (stat == null) { zk.create("/asynchifreud", "hifreud".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, new AsyncCreateCallBack(), "I am create context."); } Thread.sleep(WAITING * SECOND); stat = zk.exists("/asynchifreud", true); // 异步获取和修改节点内容 if (stat != null) { zk.getData("/asynchifreud", true, new AsyncGetCallBack(), "I am before change context."); zk.setData("/asynchifreud", "hifreud again".getBytes(), stat.getVersion(), new AsyncChangeCallBack(), "i am change context"); zk.getData("/asynchifreud", true, new AsyncGetCallBack(), "I am after change context."); } Thread.sleep(WAITING * SECOND); stat = zk.exists("/asynchifreud", true); // 删除节点 if (stat != null) { zk.delete("/asynchifreud", stat.getVersion()); } } private void authOperations(Watcher watcher) throws Exception { String authPath1 = "/authhifreud"; String authPath2 = "/authhifreud/hifreud"; String SCHEMA_DIGEST = "digest"; String USERNAME_PASSWORD = "username:password"; ZooKeeper zk1 = new ZooKeeper(CONNECT_STRING, 2 * SECOND, watcher); CONNECT_SIGNAL.await(); Thread.sleep(WAITING * SECOND); // 添加权限信息,其中digest的schema下需要指定用户名和密码 zk1.addAuthInfo(SCHEMA_DIGEST, USERNAME_PASSWORD.getBytes()); Stat stat = zk1.exists(authPath1, true); if (stat == null) { // 带权限创建节点 zk1.create(authPath1, "Hi freud".getBytes(), Ids.CREATOR_ALL_ACL, CreateMode.PERSISTENT); } Thread.sleep(WAITING * SECOND); stat = zk1.exists(authPath2, true); if (stat == null) { // 带权限创建子节点 zk1.create(authPath2, "Hi freud 2".getBytes(), Ids.CREATOR_ALL_ACL, CreateMode.PERSISTENT); } ZooKeeper zk2 = new ZooKeeper(CONNECT_STRING, 2 * SECOND, watcher); CONNECT_SIGNAL.await(); Thread.sleep(WAITING * SECOND); try { stat = zk2.exists(authPath1, false); if (stat != null) { // 无权限获取节点内容->失败 System.out.println(new String(zk2.getData(authPath1, false, null))); } } catch (Exception e) { System.out.println("Get data failed cause have no authorize information."); } Thread.sleep(WAITING * SECOND); zk2.addAuthInfo(SCHEMA_DIGEST, USERNAME_PASSWORD.getBytes()); stat = zk2.exists(authPath1, false); if (stat != null) { // 带权限获取节点内容->成功 System.out.println("Get data from path [" + authPath1 + "] after add the authorize infomation:" + new String(zk2.getData(authPath1, false, stat))); } ZooKeeper zk3 = new ZooKeeper(CONNECT_STRING, 2 * SECOND, watcher); CONNECT_SIGNAL.await(); Thread.sleep(WAITING * SECOND); try { stat = zk3.exists(authPath2, false); if (stat != null) { // 无权限删除子节点->失败 zk3.delete(authPath2, stat.getVersion()); } } catch (Exception e) { System.out.println("Delete the child node failed cause have no authorize infomation."); } Thread.sleep(WAITING * SECOND); zk3.addAuthInfo(SCHEMA_DIGEST, USERNAME_PASSWORD.getBytes()); stat = zk3.exists(authPath2, false); if (stat != null) { // 带权限删除子节点->成功 zk3.delete(authPath2, stat.getVersion()); System.out.println("Delete the child node[" + authPath2 + "] success after add the authorize infomation."); } ZooKeeper zk4 = new ZooKeeper(CONNECT_STRING, 2 * SECOND, watcher); CONNECT_SIGNAL.await(); Thread.sleep(WAITING * SECOND); stat = zk4.exists(authPath1, false); if (stat != null) { // 删除根节点不需要权限 zk3.delete(authPath1, stat.getVersion()); System.out.println("Delete the root node[" + authPath1 + "] do not use the authorize infomation."); } } } class AsyncCreateCallBack implements AsyncCallback.StringCallback { @Override public void processResult(int rc, String path, Object ctx, String name) { System.out.println("Async create path [" + rc + ", " + path + ", " + ctx + ", " + name + "]"); } } class AsyncGetCallBack implements AsyncCallback.DataCallback { @Override public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) { System.out.println("Async get path [" + rc + ", " + path + ", " + ctx + ", " + new String(data) + "]"); System.out.println("\t[" + stat.getCzxid() + ", " + stat.getMzxid() + ", " + stat.getVersion() + "]"); } } class AsyncChangeCallBack implements AsyncCallback.StatCallback { @Override public void processResult(int rc, String path, Object ctx, Stat stat) { if (rc == 0) { System.out.println("Async change path [" + rc + ", " + path + ", " + ctx + "] SUCCESS."); } } } class DefaultWatcher implements Watcher { @SuppressWarnings("deprecation") @Override public void process(WatchedEvent event) { switch (event.getType()) { case NodeCreated: System.out.println(MessageFormat.format("Create note on path [{0}]", event.getPath())); break; case NodeDeleted: System.out.println(MessageFormat.format("Delete node path [{0}]", event.getPath())); break; case NodeDataChanged: System.out.println(MessageFormat.format("Data changed on path [{0}]", event.getPath())); break; case NodeChildrenChanged: System.out.println(MessageFormat.format("Children changed on path [{0}]", event.getPath())); break; case None: default: System.out.println(MessageFormat.format("Unkown operations on path [{0}]", event.getPath())); switch (event.getState()) { case Disconnected: System.out.println("\tConnection state : disconnected"); break; case Expired: System.out.println("\tConnection state : expired"); break; case NoSyncConnected: System.out.println("\tConnection state : no sync connected"); break; case SyncConnected: System.out.println("\tConnection state : sync connected"); DefaultZookeeper.CONNECT_SIGNAL.countDown(); break; case Unknown: default: System.out.println("\tConnection state : unkown"); break; } break; } } } 打印结果 Unkown operations on path [null] Connection state : sync connected Zk server connected... --Sync operations-- Create note on path [/hifreud] Path 'hifreud' data before change : Hi Freud Data changed on path [/hifreud] Path 'hifreud' data after change : Hi Freud Again Create note on path [/hifreud/hi] Delete node path [/hifreud/hi] Delete node path [/hifreud] --Async operations-- Create note on path [/asynchifreud] Async create path [0, /asynchifreud, I am create context., /asynchifreud] Async get path [0, /asynchifreud, I am before change context., hifreud] [10784, 10784, 0] Data changed on path [/asynchifreud] Async change path [0, /asynchifreud, i am change context] SUCCESS. Async get path [0, /asynchifreud, I am after change context., hifreud again] [10784, 10785, 1] Delete node path [/asynchifreud] --Authorize operations-- Unkown operations on path [null] Connection state : sync connected Create note on path [/authhifreud] Create note on path [/authhifreud/hifreud] Unkown operations on path [null] Connection state : sync connected Get data failed cause have no authorize information. Get data from path [/authhifreud] after add the authorize infomation:Hi freud Unkown operations on path [null] Connection state : sync connected Delete the child node failed cause have no authorize infomation. Delete the child node[/authhifreud/hifreud] success after add the authorize infomation. Unkown operations on path [null] Connection state : sync connected Delete the root node[/authhifreud] do not use the authorize infomation. ZkServer closed... 参考资料 《从PAXOS到ZOOKEEPER分布式一致性原理与实践》 - 倪超