Freud's Blog

Stay hungry, stay foolish. 少年辛苦终身事,莫向光阴惰寸功。

Zookeeper之(十三) - zookeeper java API - curator - 05 - 分布式计数器

Posted on By Freud Kang

Shared Counter(共享计数器)

Manages a shared integer. All clients watching the same path will have the up-to-date value of the shared integer (considering ZK’s normal consistency guarantees).

共享计数器,适用于Master操作,并将计数结果同步到其他所有的从服务器的情景,Zk Watcher的一个基础应用

示例代码

package com.freud.zk.curator;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.shared.SharedCount;
import org.apache.curator.framework.recipes.shared.SharedCountListener;
import org.apache.curator.framework.recipes.shared.SharedCountReader;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.retry.ExponentialBackoffRetry;

/**
 * 
 * Zookeeper - Curator - Counter - SharedCounter
 * 
 * 共享计数器,适用于Master操作,并将计数结果同步到其他所有的从服务器的情景
 * 
 * @author Freud
 *
 */
public class CuratorCounterSharedCounterZookeeper {

	private static final int SECOND = 1000;

	public static void main(String[] args) throws Exception {
		ExecutorService service = Executors.newFixedThreadPool(3);
		for (int i = 0; i < 3; i++) {
			final int index = i;
			service.submit(new Runnable() {
				public void run() {
					try {
						new CuratorCounterSharedCounterZookeeper().schedule(index);
					} catch (Exception e) {
						e.printStackTrace();
					}
				}
			});
		}

		Thread.sleep(10 * SECOND);
		service.shutdownNow();
	}

	private void schedule(final int index) throws Exception {

		CuratorFramework client = this.getStartedClient(index);
		String path = "/curator_counter/shared_counter";

		final SharedCount count = new SharedCount(client, path, 100);
		count.addListener(new SharedCountListener() {

			public void stateChanged(CuratorFramework client, ConnectionState state) {
				System.out.println("Thread [" + index + "][Callback]State changed!");
			}

			public void countHasChanged(SharedCountReader reader, int value) throws Exception {
				System.out.println("Thread [" + index + "][Callback]Count changed to [" + value + "]!");
			}
		});

		new Thread(new Runnable() {

			public void run() {
				try {
					Thread.sleep((index + 1) * 1000);
					count.setCount(index);
				} catch (Exception e) {
					e.printStackTrace();
				}
			}
		}).start();

		count.start();
	}

	private CuratorFramework getStartedClient(final int index) {
		RetryPolicy rp = new ExponentialBackoffRetry(1 * SECOND, 3);
		// Fluent风格创建
		CuratorFramework cfFluent = CuratorFrameworkFactory.builder().connectString("localhost:2181")
				.sessionTimeoutMs(5 * SECOND).connectionTimeoutMs(3 * SECOND).retryPolicy(rp).build();
		cfFluent.start();
		System.out.println("Thread [" + index + "] Server connected...");
		return cfFluent;
	}
}

打印结果

Thread [2] Server connected...
Thread [1] Server connected...
Thread [0] Server connected...
Thread [0][Callback]State changed!
Thread [2][Callback]State changed!
Thread [1][Callback]State changed!
Thread [2][Callback]Count changed to [0]!
Thread [1][Callback]Count changed to [0]!
Thread [0][Callback]Count changed to [0]!
Thread [2][Callback]Count changed to [1]!
Thread [1][Callback]Count changed to [1]!
Thread [0][Callback]Count changed to [1]!
Thread [0][Callback]Count changed to [2]!
Thread [2][Callback]Count changed to [2]!
Thread [1][Callback]Count changed to [2]!

Distributed Atomic Long(分布式计数器)

A counter that attempts atomic increments. It first tries using optimistic locking. If that fails, an optional InterProcessMutex is taken. For both optimistic and mutex, a retry policy is used to retry the increment.

示例代码

package com.freud.zk.curator;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.atomic.AtomicValue;
import org.apache.curator.framework.recipes.atomic.DistributedAtomicLong;
import org.apache.curator.retry.ExponentialBackoffRetry;

/**
 * 
 * Zookeeper - Curator - Counter - DistributedAtomicLong
 * 
 * 分布式计数器
 * 
 * @author Freud
 *
 */
public class CuratorCounterDistributedAtomicLongZookeeper {

	private static final int SECOND = 1000;

	private final static CountDownLatch down = new CountDownLatch(1);

	public static void main(String[] args) throws Exception {
		ExecutorService service = Executors.newFixedThreadPool(3);
		for (int i = 0; i < 3; i++) {
			final int index = i;
			service.submit(new Runnable() {
				public void run() {
					try {
						new CuratorCounterDistributedAtomicLongZookeeper().schedule(index);
					} catch (Exception e) {
						e.printStackTrace();
					}
				}
			});
		}
		down.countDown();
		Thread.sleep(10 * SECOND);
		service.shutdownNow();
	}

	private void schedule(final int index) throws Exception {
		down.await();
		CuratorFramework client = this.getStartedClient(index);
		String path = "/curator_counter/distribute_atomic_long";
		DistributedAtomicLong count = new DistributedAtomicLong(client, path, new ExponentialBackoffRetry(1000, 3));
		Thread.sleep((index + 1) * SECOND);
		AtomicValue<Long> al = count.get();
		System.out.println("Thread [" + index + "] get new Long value [" + al.postValue() + "] result status ["
				+ al.succeeded() + "]");
		count.increment();
	}

	private CuratorFramework getStartedClient(final int index) {
		RetryPolicy rp = new ExponentialBackoffRetry(1 * SECOND, 3);
		// Fluent风格创建
		CuratorFramework cfFluent = CuratorFrameworkFactory.builder().connectString("localhost:2181")
				.sessionTimeoutMs(5 * SECOND).connectionTimeoutMs(3 * SECOND).retryPolicy(rp).build();
		cfFluent.start();
		System.out.println("Thread [" + index + "] Server connected...");
		return cfFluent;
	}
}

打印结果

Thread [0] Server connected...
Thread [1] Server connected...
Thread [2] Server connected...
Thread [0] get new Long value [0] result status [true]
Thread [1] get new Long value [1] result status [true]
Thread [2] get new Long value [2] result status [true]

参考资料

《从PAXOS到ZOOKEEPER分布式一致性原理与实践》 - 倪超

Curator官网 : http://curator.apache.org/