Freud's Blog

Stay hungry, stay foolish. 少年辛苦终身事,莫向光阴惰寸功。

Zookeeper之(十五) - zookeeper java API - curator - 07 - 队列

Posted on By Freud Kang

Curator官方强烈不建议把Zookeeper当作MQ使用。IMPORTANT - We recommend that you do NOT use ZooKeeper for Queues. Please see Tech Note 4 for details.

Distributed Queue(分布式队列)

An implementation of the Distributed Queue ZK recipe. Items put into the queue are guaranteed to be ordered (by means of ZK’s PERSISTENTSEQUENTIAL node). If a single consumer takes items out of the queue, they will be ordered FIFO. If ordering is important, use a LeaderSelector to nominate a single consumer.

示例代码

package com.freud.zk.curator;

import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.queue.DistributedQueue;
import org.apache.curator.framework.recipes.queue.QueueBuilder;
import org.apache.curator.framework.recipes.queue.QueueConsumer;
import org.apache.curator.framework.recipes.queue.QueueSerializer;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.retry.ExponentialBackoffRetry;

/**
 * 
 * Zookeeper - Curator - Queue - Distributed Queue
 * 
 * @author Freud
 *
 */
public class CuratorQueueDistributedQueueZookeeper {

	private static final int SECOND = 1000;
	private static final String path = "/curator_queue/distributed_queue";
	private static final CountDownLatch down = new CountDownLatch(1);

	public static void main(String[] args) throws Exception {

		ExecutorService service = Executors.newFixedThreadPool(10);
		for (int i = 0; i < 5; i++) {
			final int index = i;
			service.submit(new Callable<Void>() {
				public Void call() throws Exception {
					new CuratorQueueDistributedQueueZookeeper().schedule(index);
					return null;
				}
			});
		}
		down.countDown();

		Thread.sleep(10 * SECOND);
		service.shutdown();
	}

	private void schedule(final int index) throws Exception {
		down.await();
		CuratorFramework client = this.getStartedClient(index);
		// 创建队列
		DistributedQueue<String> queue = QueueBuilder.builder(client, new StringQueueConsumer(index),
				new StringQueueSerializer(), path).buildQueue();
		queue.start();
		if (index == 4) {
			Thread.sleep(3 * SECOND);
			for (int i = 0; i < 20; i++) {
				// 生产消息
				queue.put("message " + i);
			}
		}
	}

	private CuratorFramework getStartedClient(int index) {
		RetryPolicy rp = new ExponentialBackoffRetry(1 * SECOND, 3);
		// Fluent风格创建
		CuratorFramework cfFluent = CuratorFrameworkFactory.builder().connectString("localhost:2181")
				.sessionTimeoutMs(5 * SECOND).connectionTimeoutMs(3 * SECOND).retryPolicy(rp).build();
		cfFluent.start();
		System.out.println("Thread [" + index + "] Server connected...");
		return cfFluent;
	}

	/**
	 * 消息消费者
	 * 
	 * @author Freud
	 *
	 */
	public class StringQueueConsumer implements QueueConsumer<String> {

		private int index;

		public StringQueueConsumer(int index) {
			super();
			this.index = index;
		}

		public void stateChanged(CuratorFramework cf, ConnectionState state) {
		}

		public void consumeMessage(String message) throws Exception {
			System.out.println("Thread [" + index + "] get the queue value : " + message);
		}
	}

	/**
	 * 消息序列化和反序列化逻辑
	 * 
	 * @author Freud
	 *
	 */
	public class StringQueueSerializer implements QueueSerializer<String> {

		public byte[] serialize(String item) {
			return item.getBytes();
		}

		public String deserialize(byte[] bytes) {
			return new String(bytes);
		}
	}
}

打印结果

Thread [0] Server connected...
Thread [3] Server connected...
Thread [2] Server connected...
Thread [1] Server connected...
Thread [4] Server connected...
Thread [0] get the queue value : message 0
Thread [1] get the queue value : message 1
Thread [1] get the queue value : message 2
Thread [1] get the queue value : message 3
Thread [1] get the queue value : message 4
Thread [4] get the queue value : message 5
Thread [4] get the queue value : message 6
Thread [4] get the queue value : message 7
Thread [3] get the queue value : message 8
Thread [3] get the queue value : message 9
Thread [3] get the queue value : message 10
Thread [3] get the queue value : message 11
Thread [4] get the queue value : message 12
Thread [2] get the queue value : message 13
Thread [2] get the queue value : message 14
Thread [2] get the queue value : message 15
Thread [4] get the queue value : message 16
Thread [2] get the queue value : message 17
Thread [2] get the queue value : message 18
Thread [2] get the queue value : message 19

Distributed Id Queue(分布式ID队列)

A version of DistributedQueue that allows IDs to be associated with queue items. Items can then be removed from the queue if needed.

示例代码

package com.freud.zk.curator;

import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.queue.DistributedIdQueue;
import org.apache.curator.framework.recipes.queue.QueueBuilder;
import org.apache.curator.framework.recipes.queue.QueueConsumer;
import org.apache.curator.framework.recipes.queue.QueueSerializer;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.retry.ExponentialBackoffRetry;

/**
 * 
 * Zookeeper - Curator - Queue - Distributed Id Queue
 * 
 * @author Freud
 *
 */
public class CuratorQueueDistributedIdQueueZookeeper {

	private static final int SECOND = 1000;
	private static final String path = "/curator_queue/distributed_id_queue";
	private static final CountDownLatch down = new CountDownLatch(1);

	public static void main(String[] args) throws Exception {

		ExecutorService service = Executors.newFixedThreadPool(10);
		for (int i = 0; i < 5; i++) {
			final int index = i;
			service.submit(new Callable<Void>() {
				public Void call() throws Exception {
					new CuratorQueueDistributedIdQueueZookeeper().schedule(index);
					return null;
				}
			});
		}
		down.countDown();

		Thread.sleep(10 * SECOND);
		service.shutdown();
	}

	private void schedule(final int index) throws Exception {
		down.await();
		CuratorFramework client = this.getStartedClient(index);
		// 创建队列
		DistributedIdQueue<String> queue = QueueBuilder.builder(client, new StringQueueConsumer(index),
				new StringQueueSerializer(), path).buildIdQueue();
		queue.start();
		if (index == 4) {
			Thread.sleep(3 * SECOND);
			for (int i = 0; i < 20; i++) {
				// 生产消息
				queue.put("message " + i, i + "");
			}
			// ID Queue的特性是可以通过ID删除消息。
			queue.remove("10");
		}
	}

	private CuratorFramework getStartedClient(int index) {
		RetryPolicy rp = new ExponentialBackoffRetry(1 * SECOND, 3);
		// Fluent风格创建
		CuratorFramework cfFluent = CuratorFrameworkFactory.builder().connectString("localhost:2181")
				.sessionTimeoutMs(5 * SECOND).connectionTimeoutMs(3 * SECOND).retryPolicy(rp).build();
		cfFluent.start();
		System.out.println("Thread [" + index + "] Server connected...");
		return cfFluent;
	}

	/**
	 * 消息消费者
	 * 
	 * @author Freud
	 *
	 */
	public class StringQueueConsumer implements QueueConsumer<String> {

		private int index;

		public StringQueueConsumer(int index) {
			super();
			this.index = index;
		}

		public void stateChanged(CuratorFramework cf, ConnectionState state) {
		}

		public void consumeMessage(String message) throws Exception {
			System.out.println("Thread [" + index + "] get the queue value : " + message);
		}
	}

	/**
	 * 消息序列化和反序列化逻辑
	 * 
	 * @author Freud
	 *
	 */
	public class StringQueueSerializer implements QueueSerializer<String> {

		public byte[] serialize(String item) {
			return item.getBytes();
		}

		public String deserialize(byte[] bytes) {
			return new String(bytes);
		}
	}
}

打印结果

Thread [2] Server connected...
Thread [3] Server connected...
Thread [1] Server connected...
Thread [4] Server connected...
Thread [0] Server connected...
Thread [1] get the queue value : message 0
Thread [3] get the queue value : message 1
Thread [3] get the queue value : message 2
Thread [3] get the queue value : message 3
Thread [3] get the queue value : message 4
Thread [3] get the queue value : message 5
Thread [3] get the queue value : message 6
Thread [3] get the queue value : message 7
Thread [3] get the queue value : message 8
Thread [3] get the queue value : message 9
Thread [4] get the queue value : message 11
Thread [4] get the queue value : message 12
Thread [0] get the queue value : message 13
Thread [0] get the queue value : message 14
Thread [0] get the queue value : message 15
Thread [1] get the queue value : message 16
Thread [1] get the queue value : message 17
Thread [1] get the queue value : message 18
Thread [1] get the queue value : message 19

Distributed Priority Queue(分布式优先级队列)

An implementation of the Distributed Priority Queue ZK recipe.

示例代码

package com.freud.zk.curator;

import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.queue.DistributedPriorityQueue;
import org.apache.curator.framework.recipes.queue.QueueBuilder;
import org.apache.curator.framework.recipes.queue.QueueConsumer;
import org.apache.curator.framework.recipes.queue.QueueSerializer;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.retry.ExponentialBackoffRetry;

/**
 * 
 * Zookeeper - Curator - Queue - Distributed Priority Queue
 * 
 * @author Freud
 *
 */
public class CuratorQueueDistributedPriorityQueueZookeeper {

	private static final int SECOND = 1000;
	private static final String path = "/curator_queue/distributed_priority_queue";
	private static final CountDownLatch down = new CountDownLatch(1);

	public static void main(String[] args) throws Exception {

		ExecutorService service = Executors.newFixedThreadPool(10);
		for (int i = 0; i < 5; i++) {
			final int index = i;
			service.submit(new Callable<Void>() {
				public Void call() throws Exception {
					new CuratorQueueDistributedPriorityQueueZookeeper().schedule(index);
					return null;
				}
			});
		}
		down.countDown();

		Thread.sleep(10 * SECOND);
		service.shutdown();
	}

	private void schedule(final int index) throws Exception {
		down.await();
		CuratorFramework client = this.getStartedClient(index);
		// 创建队列
		DistributedPriorityQueue<String> queue = QueueBuilder.builder(client, new StringQueueConsumer(index),
				new StringQueueSerializer(), path).buildPriorityQueue(3);
		queue.start();
		if (index == 4) {
			Thread.sleep(3 * SECOND);
			for (int i = 0; i < 20; i++) {
				// 生产消息
				queue.put("message " + i, 20 - i);
			}
		}
	}

	private CuratorFramework getStartedClient(int index) {
		RetryPolicy rp = new ExponentialBackoffRetry(1 * SECOND, 3);
		// Fluent风格创建
		CuratorFramework cfFluent = CuratorFrameworkFactory.builder().connectString("localhost:2181")
				.sessionTimeoutMs(5 * SECOND).connectionTimeoutMs(3 * SECOND).retryPolicy(rp).build();
		cfFluent.start();
		System.out.println("Thread [" + index + "] Server connected...");
		return cfFluent;
	}

	/**
	 * 消息消费者
	 * 
	 * @author Freud
	 *
	 */
	public class StringQueueConsumer implements QueueConsumer<String> {

		private int index;

		public StringQueueConsumer(int index) {
			super();
			this.index = index;
		}

		public void stateChanged(CuratorFramework cf, ConnectionState state) {
		}

		public void consumeMessage(String message) throws Exception {
			System.out.println("Thread [" + index + "] get the queue value : " + message);
		}
	}

	/**
	 * 消息序列化和反序列化逻辑
	 * 
	 * @author Freud
	 *
	 */
	public class StringQueueSerializer implements QueueSerializer<String> {

		public byte[] serialize(String item) {
			return item.getBytes();
		}

		public String deserialize(byte[] bytes) {
			return new String(bytes);
		}
	}
}

打印结果

Thread [0] Server connected...
Thread [2] Server connected...
Thread [4] Server connected...
Thread [3] Server connected...
Thread [1] Server connected...
Thread [3] get the queue value : message 19
Thread [0] get the queue value : message 18
Thread [0] get the queue value : message 17
Thread [0] get the queue value : message 16
Thread [0] get the queue value : message 15
Thread [0] get the queue value : message 14
Thread [0] get the queue value : message 13
Thread [3] get the queue value : message 12
Thread [3] get the queue value : message 11
Thread [3] get the queue value : message 10
Thread [3] get the queue value : message 9
Thread [0] get the queue value : message 8
Thread [1] get the queue value : message 7
Thread [1] get the queue value : message 6
Thread [0] get the queue value : message 5
Thread [0] get the queue value : message 4
Thread [0] get the queue value : message 3
Thread [0] get the queue value : message 2
Thread [0] get the queue value : message 1
Thread [0] get the queue value : message 0

Distributed Delay Queue(分布式延迟队列)

An implementation of a Distributed Delay Queue.

示例代码

package com.freud.zk.curator;

import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.queue.DistributedDelayQueue;
import org.apache.curator.framework.recipes.queue.QueueBuilder;
import org.apache.curator.framework.recipes.queue.QueueConsumer;
import org.apache.curator.framework.recipes.queue.QueueSerializer;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.retry.ExponentialBackoffRetry;

/**
 * 
 * Zookeeper - Curator - Queue - Distributed Delay Queue
 * 
 * @author Freud
 *
 */
public class CuratorQueueDistributedDelayQueueZookeeper {

	private static final int SECOND = 1000;
	private static final String path = "/curator_queue/distributed_delay_queue";
	private static final CountDownLatch down = new CountDownLatch(1);

	public static void main(String[] args) throws Exception {

		CuratorFramework client = new CuratorQueueDistributedDelayQueueZookeeper().getStartedClient(-1);
		if (client.checkExists().forPath(path) != null) {
			client.delete().deletingChildrenIfNeeded().forPath(path);
		}
		client.close();
		ExecutorService service = Executors.newFixedThreadPool(10);
		for (int i = 0; i < 5; i++) {
			final int index = i;
			service.submit(new Callable<Void>() {
				public Void call() throws Exception {
					new CuratorQueueDistributedDelayQueueZookeeper().schedule(index);
					return null;
				}
			});
		}
		down.countDown();

		Thread.sleep(20 * SECOND);
		service.shutdown();
	}

	private void schedule(final int index) throws Exception {
		down.await();
		CuratorFramework client = this.getStartedClient(index);
		// 创建队列
		DistributedDelayQueue<String> queue = QueueBuilder.builder(client, new StringQueueConsumer(index),
				new StringQueueSerializer(), path).buildDelayQueue();
		queue.start();
		if (index == 4) {
			Thread.sleep(3 * SECOND);
			for (int i = 0; i < 20; i++) {
				// 生产消息 ,其中DelayUtilEpoch的单位为毫秒,代表触发时间的毫秒数,集群环境下需要注意做时间同步
				queue.put("message " + i, System.currentTimeMillis() + ((i + 1) * SECOND / 2));
			}
		}
	}

	private CuratorFramework getStartedClient(int index) {
		RetryPolicy rp = new ExponentialBackoffRetry(1 * SECOND, 3);
		// Fluent风格创建
		CuratorFramework cfFluent = CuratorFrameworkFactory.builder().connectString("localhost:2181")
				.sessionTimeoutMs(5 * SECOND).connectionTimeoutMs(3 * SECOND).retryPolicy(rp).build();
		cfFluent.start();
		System.out.println("Thread [" + index + "] Server connected...");
		return cfFluent;
	}

	/**
	 * 消息消费者
	 * 
	 * @author Freud
	 *
	 */
	public class StringQueueConsumer implements QueueConsumer<String> {

		private int index;

		public StringQueueConsumer(int index) {
			super();
			this.index = index;
		}

		public void stateChanged(CuratorFramework cf, ConnectionState state) {
		}

		public void consumeMessage(String message) throws Exception {
			System.out.println("Thread [" + index + "] get the queue value : " + message);
		}
	}

	/**
	 * 消息序列化和反序列化逻辑
	 * 
	 * @author Freud
	 *
	 */
	public class StringQueueSerializer implements QueueSerializer<String> {

		public byte[] serialize(String item) {
			return item.getBytes();
		}

		public String deserialize(byte[] bytes) {
			return new String(bytes);
		}
	}
}

打印结果

Thread [-1] Server connected...
Thread [4] Server connected...
Thread [3] Server connected...
Thread [0] Server connected...
Thread [2] Server connected...
Thread [1] Server connected...
Thread [1] get the queue value : message 0
Thread [1] get the queue value : message 1
Thread [2] get the queue value : message 2
Thread [2] get the queue value : message 3
Thread [0] get the queue value : message 4
Thread [1] get the queue value : message 5
Thread [4] get the queue value : message 6
Thread [1] get the queue value : message 7
Thread [2] get the queue value : message 8
Thread [0] get the queue value : message 9
Thread [2] get the queue value : message 10
Thread [1] get the queue value : message 11
Thread [1] get the queue value : message 12
Thread [1] get the queue value : message 13
Thread [0] get the queue value : message 14
Thread [0] get the queue value : message 15
Thread [4] get the queue value : message 16
Thread [0] get the queue value : message 17
Thread [2] get the queue value : message 18
Thread [1] get the queue value : message 19

可以观察到消息打印按照500毫秒的间隔依次有序打印出来。

Simple Distributed Queue(简单分布式队列)

A drop-in replacement for the DistributedQueue that comes with the ZK distribution.

示例代码

package com.freud.zk.curator;

import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.queue.SimpleDistributedQueue;
import org.apache.curator.retry.ExponentialBackoffRetry;

/**
 * 
 * Zookeeper - Curator - Queue - Simple Distributed Queue
 * 
 * @author Freud
 *
 */
public class CuratorQueueSimpleDistributedQueueZookeeper {

	private static final int SECOND = 1000;
	private static final String path = "/curator_queue/simple_distributed_queue";
	private static final CountDownLatch down = new CountDownLatch(1);

	public static void main(String[] args) throws Exception {

		CuratorFramework client = new CuratorQueueSimpleDistributedQueueZookeeper().getStartedClient(-1);
		if (client.checkExists().forPath(path) != null) {
			client.delete().deletingChildrenIfNeeded().forPath(path);
		}
		client.close();
		ExecutorService service = Executors.newFixedThreadPool(10);
		for (int i = 0; i < 5; i++) {
			final int index = i;
			service.submit(new Callable<Void>() {
				public Void call() throws Exception {
					new CuratorQueueSimpleDistributedQueueZookeeper().schedule(index);
					return null;
				}
			});
		}
		down.countDown();

		Thread.sleep(20 * SECOND);
		service.shutdown();
	}

	private void schedule(final int index) throws Exception {
		down.await();
		CuratorFramework client = this.getStartedClient(index);
		SimpleDistributedQueue queue = new SimpleDistributedQueue(client, path);
		if (index == 4) {
			Thread.sleep(3 * SECOND);
			for (int i = 0; i < 20; i++) {
				// 生产消息
				queue.offer(("message " + i).getBytes());
			}
		} else {
			while (true) {
				System.out.println("Thread [" + index + "] get queue value :" + new String(queue.take()));
			}
		}
	}

	private CuratorFramework getStartedClient(int index) {
		RetryPolicy rp = new ExponentialBackoffRetry(1 * SECOND, 3);
		// Fluent风格创建
		CuratorFramework cfFluent = CuratorFrameworkFactory.builder().connectString("localhost:2181")
				.sessionTimeoutMs(5 * SECOND).connectionTimeoutMs(3 * SECOND).retryPolicy(rp).build();
		cfFluent.start();
		System.out.println("Thread [" + index + "] Server connected...");
		return cfFluent;
	}
}

打印结果

Thread [-1] Server connected...
Thread [0] Server connected...
Thread [1] Server connected...
Thread [4] Server connected...
Thread [2] Server connected...
Thread [3] Server connected...
Thread [0] get queue value :message 0
Thread [0] get queue value :message 1
Thread [1] get queue value :message 2
Thread [0] get queue value :message 3
Thread [1] get queue value :message 4
Thread [0] get queue value :message 5
Thread [1] get queue value :message 6
Thread [0] get queue value :message 7
Thread [3] get queue value :message 8
Thread [1] get queue value :message 9
Thread [0] get queue value :message 10
Thread [0] get queue value :message 11
Thread [1] get queue value :message 12
Thread [2] get queue value :message 13
Thread [1] get queue value :message 14
Thread [3] get queue value :message 15
Thread [0] get queue value :message 16
Thread [1] get queue value :message 17
Thread [0] get queue value :message 18
Thread [3] get queue value :message 19

参考资料

《从PAXOS到ZOOKEEPER分布式一致性原理与实践》 - 倪超

Curator官网 : http://curator.apache.org/