volatile与zookeeper

为什么我会把volatile与zookeeper放在一起,原因是这两个有个共同点,就是可见性。

我写了两个测试代码,先给出volatile的测试代码

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
 * volatile修饰的变量的可见性测试
 * 
 * @author zhaowen
 *
 */
public class AtomicTest {
	// 全部线程可见变量
	public static volatile int ccount = 0;
	// 计数器
	public static int add = 0;

	// 线程安全示例
	class AddSafe implements Runnable {

		@Override
		public void run() {
			for (;;) {
				// 相当于可见性的锁
				if (ccount == 0) {
					try {
						// lock
						ccount = 1;
						++add;
						System.out.println(add);
						System.out.println(add);
						System.out.println(add);
						// exit the for loop
						return;
					} finally {
						// unlock
						ccount = 0;
					}
				}
			}
		}

	}

	// 线程不安全示例
	class AddUnsafe implements Runnable {

		@Override
		public void run() {
			++add;
			System.out.println(add);
			System.out.println(add);
			System.out.println(add);
		}

	}

	public void test() {
		ThreadPoolExecutor executor = new ThreadPoolExecutor(8, 16, 30L,
				TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(200),
				new ThreadPoolExecutor.CallerRunsPolicy());
		for (int i = 0; i < 100; i++) {
			executor.execute(new AddSafe());
			// executor.execute(new AddUnsafe());
		}
	}

	public static void main(String[] args) {
		AtomicTest t = new AtomicTest();
		t.test();
	}
}

这是zookepper的测试代码:

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.ZkClient;
import org.apache.zookeeper.CreateMode;

/**
 * zookeeper数据的可见性测试
 * 
 * @author zhaowen
 * 
 */
public class Subscribe {
	// 结点名
	private static final String PATH = "/clusters";
	
	class Sub implements Runnable {

		@Override
		public void run() {
			// 创建连接
			ZkClient zkClient = new ZkClient("127.0.0.1:2181");
			// 初始化
			if (!zkClient.exists(PATH)) {
				zkClient.create(PATH, "false", CreateMode.PERSISTENT);
			}
			// 增加监听
			zkClient.subscribeDataChanges(PATH, new IZkDataListener() {

				@Override
				public void handleDataChange(String datapath, Object data)
						throws Exception {
					// 处理结点数据变化事件
					System.out.println(System.currentTimeMillis() + " : [" + this.hashCode() + "] change : " + String.valueOf(data));
				}

				@Override
				public void handleDataDeleted(String datapath) throws Exception {
					// 处理结点删除事件
					System.out.println(System.currentTimeMillis() + " : [" + this.hashCode() + "]" + " node deleted");
				}

			});
		}
	}
	
	public void test() {
		ThreadPoolExecutor executor = new ThreadPoolExecutor(8, 16, 30L,
				TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(200),
				new ThreadPoolExecutor.CallerRunsPolicy());
		for (int i = 0; i < 10; i++) {
			executor.execute(new Sub());
		}
	}

	public static void main(String[] args) throws Exception {
		// 创建连接
		ZkClient zkClient = new ZkClient("127.0.0.1:2181");
		// 初始化
		if (!zkClient.exists(PATH)) {
			zkClient.create(PATH, "false", CreateMode.PERSISTENT);
		}
		// 启动监听
		Subscribe subtest = new Subscribe();
		subtest.test();
		// 延时2秒
		Thread.sleep(2000);
		// 测试结点数据的变化事件
		zkClient.writeData(PATH, "true");
		// 延时2秒
		Thread.sleep(2000);
		// 测试结点数据的删除事件
		zkClient.delete(PATH);
	}
}

volatile修饰的变量在多线程环境下的作用是全局的可见性,即当一个线程修改后,其他线程对此变量是立马可见的,这一点我发现和在分布式集群环境下zookeeper所存储的变量的特性相同,当集群中的某个结点修改zookeeper中的变量后,所有监听的结点都能立马感知,会收到具体的变更事件,这就是我为什么要把volatile和zookeeper放在一起的原因,希望能对理解volatile和zookeeper有些帮助。

volatile修饰的变量与原子性是没有关系的,我最初接触volatitle是在写嵌入式C程序时,对于摄像头采集的数据用volatile修饰,意即当硬件使数据变化时,代码也能立马感知处理。

从Demo代码可以拓展到分布式锁的应用,具体怎么用应该很明了了。

上一篇:高性能序列化工具Google Protobuf的使用


下一篇:RabbitMQ的高可用性方案