Zookeeper之(九) - zookeeper java API - curator - 01 - 基础API

Curator是Netflix公司开源的一套Zookeeper客户端,作者是Jordan Zimmerman。和Zkclient一样,Curator解决了很多Zookeeper客户端非常底层的细节开发工作,包括自动重连,反复注册Watcher和NodeExistsException异常等,目前已经成为了Apache的顶级项目,是全世界范围内使用最广泛的Zookeeper客户端之一。



Curator Maven依赖






  • 创建连接,创建连接中RetryPolicy重试策略默认有5种实现。
ExponentialBackoffRetry Retry policy that retries a set number of times with increasing sleep time between retries
BoundedExponentialBackoffRetry Retry policy that retries a set number of times with an increasing (up to a maximum bound) sleep time between retries
RetryNTimes Retry policy that retries a max number of times
RetryOneTime A retry policy that retries only once
RetryUntilElapsed A retry policy that retries until a given amount of time elapses
  • 创建节点
  • 创建子节点,支持递归创建
  • 修改节点数据
  • 获取节点数据
  • 强制删除节点,guaranteed()表示如果当前客户端会话有效,则Curator会在后台持续进行删除操作,直至节点删除成功为止。
  • 递归删除节点


package com.freud.zk.curator;

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.CuratorEvent;
import org.apache.curator.framework.api.CuratorListener;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.framework.state.ConnectionStateListener;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;

 * Zookeeper - Curator
 * @author Freud
public class CuratorNormalZookeeper {

	private static final int SECOND = 1000;

	public static void main(String[] args) throws Exception {
		// 节点
		String root = "/hifreud";
		String path = root + "/sayhi";
		String path2 = root + "/sayhello";
		String data = "hi freud";
		String dataAgain = "hi freud again!";

		// 创建连接
		RetryPolicy rp = new ExponentialBackoffRetry(1 * SECOND, 3);
		// Fluent风格创建
		CuratorFramework cfFluent = CuratorFrameworkFactory.builder().connectString("localhost:2181")
				.sessionTimeoutMs(5 * SECOND).connectionTimeoutMs(3 * SECOND).retryPolicy(rp).build();
		System.out.println("Server connected...");

		// 添加节点操作监听事件
		cfFluent.getCuratorListenable().addListener(new CuratorListener() {
			public void eventReceived(CuratorFramework curatorFramework, CuratorEvent event) throws Exception {
				System.out.println("Curator framework operations : " + event.getType().toString());
		// 添加连接信息监听事件
		cfFluent.getConnectionStateListenable().addListener(new ConnectionStateListener() {
			public void stateChanged(CuratorFramework arg0, ConnectionState arg1) {
				System.out.println("Connection state changed to : " +;
		System.out.println("Listener added success...");

		Thread.sleep(1 * SECOND);
		if (cfFluent.checkExists().forPath(path) == null) {
			// 创建节点
			cfFluent.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(path, data.getBytes());
			System.out.println("Created node [" + path + "] with data [" + data + "]");

		Thread.sleep(1 * SECOND);
		if (cfFluent.checkExists().forPath(path2) == null) {
			// 创建节点
			cfFluent.create().withMode(CreateMode.PERSISTENT).forPath(path2, data.getBytes());
			System.out.println("Created node [" + path2 + "] with data [" + data + "]");

		Thread.sleep(1 * SECOND);
		if (cfFluent.checkExists().forPath(path) != null) {
			// 获取节点内容
			Stat stat = new Stat();
			System.out.println("Read from node [" + path + "] data : "
					+ new String(cfFluent.getData().storingStatIn(stat).forPath(path)));
			System.out.println("\tversion : " + stat.getVersion());
			System.out.println("\tczxid : " + stat.getCzxid());
			System.out.println("\taversion : " + stat.getAversion());
			System.out.println("\tmzxid : " + stat.getMzxid());

		Thread.sleep(1 * SECOND);
		if (cfFluent.checkExists().forPath(path) != null) {
			// 设置节点内容
			cfFluent.setData().forPath(path, dataAgain.getBytes());
			System.out.println("Set data to node [" + path + "] data : " + dataAgain);

		Thread.sleep(1 * SECOND);
		if (cfFluent.checkExists().forPath(path) != null) {
			// 获取节点内容
			Stat stat = new Stat();
			System.out.println("Read from node after change [" + path + "] data : "
					+ new String(cfFluent.getData().storingStatIn(stat).forPath(path)));
			System.out.println("\tversion : " + stat.getVersion());
			System.out.println("\tczxid : " + stat.getCzxid());
			System.out.println("\taversion : " + stat.getAversion());
			System.out.println("\tmzxid : " + stat.getMzxid());

		Thread.sleep(1 * SECOND);
		if (cfFluent.checkExists().forPath(path2) != null) {
			// 强制删除节点
			System.out.println("Delete node [" + path2 + "].");

		Thread.sleep(1 * SECOND);
		if (cfFluent.checkExists().forPath(root) != null) {
			// 递归删除节点
			System.out.println("Delete node [" + root + "] use recursion.");

		Thread.sleep(2 * SECOND);
		System.out.println("Server closed...");


Server connected...
Listener added success...
Connection state changed to : CONNECTED
Curator framework operations : WATCHED
Created node [/hifreud/sayhi] with data [hi freud]
Created node [/hifreud/sayhello] with data [hi freud]
Read from node [/hifreud/sayhi] data : hi freud
	version : 0
	czxid : 11396
	aversion : 0
	mzxid : 11396
Set data to node [/hifreud/sayhi] data : hi freud again!
Read from node after change [/hifreud/sayhi] data : hi freud again!
	version : 1
	czxid : 11396
	aversion : 0
	mzxid : 11398
Delete node [/hifreud/sayhello].
Delete node [/hifreud] use recursion.
Server closed...



create() Begins a create operation. Call additional methods (mode or background) and finalize the operation by calling forPath()
delete() Begins a delete operation. Call additional methods (version or background) and finalize the operation by calling forPath()
checkExists() Begins an operation to check that a ZNode exists. Call additional methods (watch or background) and finalize the operation by calling forPath()
getData() Begins an operation to get a ZNode’s data. Call additional methods (watch, background or get stat) and finalize the operation by calling forPath()
setData() Begins an operation to set a ZNode’s data. Call additional methods (version or background) and finalize the operation by calling forPath()
getChildren() Begins an operation to get a ZNode’s list of children ZNodes. Call additional methods (watch, background or get stat) and finalize the operation by calling forPath()
transactionOp() Used to allocate operations to be used with transaction().
transaction() Atomically submit a set of operations as a transaction.
getACL() Begins an operation to return a ZNode’s ACL settings. Call additional methods and finalize the operation by calling forPath()
setACL() Begins an operation to set a ZNode’s ACL settings. Call additional methods and finalize the operation by calling forPath()
getConfig() Begins an operation to return the last committed configuration. Call additional methods and finalize the operation by calling forEnsemble()
reconfig() Begins an operation to change the configuration. Call additional methods and finalize the operation by calling forEnsemble()



Event Type Event Methods
CREATE getResultCode() and getPath()
DELETE getResultCode() and getPath()
EXISTS getResultCode(), getPath() and getStat()
GET_DATA getResultCode(), getPath(), getStat() and getData()
SET_DATA getResultCode(), getPath() and getStat()
CHILDREN getResultCode(), getPath(), getStat(), getChildren()
SYNC getResultCode(), getStat()
GET_ACL getResultCode(), getACLList()
SET_ACL getResultCode()
TRANSACTION getResultCode(), getOpResults()
WATCHED getWatchedEvent()
GET_CONFIG getResultCode(), getData()
RECONFIG getResultCode(), getData()


《从PAXOS到ZOOKEEPER分布式一致性原理与实践》 - 倪超

Curator官网 :