简单的线程同步问题:两个线程交替执行N次【Synchronized、Lock、ArrayBlockingQueue】

方法一:传统的线程方法
import org.apache.log4j.Logger; /**
* 两个线程执行的代码片段要实现同步互斥的效果,它们必须用同一个Lock对象。<br/>
* 锁是上在代表要操作的资源的类的内部方法中,而不是线程代码中。<br/>
*
* 样例:<br/>
* 1)、主线程执行10次循环,接着子线程执行100此循环;<br/>
* 2)、重复1)操作50次,结束。<br/>
*
* @author wangzhu
* @date 2015-3-21下午8:00:57
*
*/
public class ThreadCommunication {
public static void main(String[] args) {
final Bussiness bussiness = new Bussiness();
new Thread() {
@Override
public void run() {
for (int i = 0; i < 50; i++) {
bussiness.main(i);
}
};
}.start();
new Thread() {
@Override
public void run() {
for (int i = 0; i < 50; i++) {
bussiness.sub(i);
}
};
}.start(); } } class Bussiness {
private static final Logger logger = Logger.getLogger(Bussiness.class); /**
* true:主线程<br/>
* false:子线程<br/>
* 默认执行主线程<br/>
*/
private boolean mark = false; public synchronized void main(int c) {
while (mark) {
// 表示当前是主线程,则进入等待状态
try {
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// 当不是主线程,则开始执行
for (int i = 0; i < 10; i++) {
logger.info(c + "==main thread====" + i);
}
// 表示当前是主线程
mark = true;
// 唤醒所有的线程
this.notifyAll();
} public synchronized void sub(int c) {
while (!mark) {
// 表示当前是子线程,则进入等待状态
try {
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// 当不是子线程,则开始执行
for (int i = 0; i < 100; i++) {
logger.info(c + "==sub thread====" + i);
}
// 表示当前是子线程
mark = false;
// 唤醒所有的线程
this.notifyAll();
} }
2、JDK1.5之后(包括1.5)加入的并发包(Lock与Condition)
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock; import org.apache.log4j.Logger; /**
* 两个线程执行的代码片段要实现同步互斥的效果,它们必须用同一个Lock对象。<br/>
* 锁是上在代表要操作的资源的类的内部方法中,而不是线程代码中。<br/>
*
* 样例:<br/>
* 1)、主线程执行10次循环,接着子线程执行100此循环;<br/>
* 2)、重复1)操作50次,结束。<br/>
*
* @author wangzhu
* @date 2015-3-22下午9:50:11
*
*/
public class ConditionCommunication {
private static final Logger logger = Logger
.getLogger(ConditionCommunication.class); /**
* @param args
*/
public static void main(String[] args) {
final Bussiness bussiness = new Bussiness();
new Thread() {
@Override
public void run() {
for (int i = 0; i < 50; i++) {
bussiness.sub(i);
}
};
}.start();
new Thread() {
@Override
public void run() {
for (int i = 0; i < 50; i++) {
bussiness.main(i);
}
};
}.start();
} static class Bussiness {
private Lock lock = new ReentrantLock();
private Condition condition = lock.newCondition();
/**
* true:主线程<br/>
* false:子线程<br/>
*/
private boolean mark; public void main(int k) {
// 加锁
lock.lock();
try {
while (mark) {
// 当前是主线程,则等待
try {
condition.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
for (int i = 0; i < 10; i++) {
logger.info(k + "==main thread====" + i);
}
// 标记当前执行的是主线程
mark = true;
// 发出信号
condition.signal();
} finally {
// 释放锁
lock.unlock();
}
} public void sub(int k) {
// 加锁
lock.lock();
try {
while (!mark) {
try {
condition.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
for (int i = 0; i < 100; i++) {
logger.info(k + "==sub thread====" + i);
}
// 标记当前执行的是子线程
mark = false;
// 发出信号
condition.signal();
} finally {
// 释放锁
lock.unlock();
}
}
}
}
JDK中的实例:
class BoundedBuffer {
final Lock lock = new ReentrantLock();//锁对象
final Condition notFull = lock.newCondition();//写线程条件
final Condition notEmpty = lock.newCondition();//读线程条件 final Object[] items = new Object[100];//缓存队列
int putptr/*写索引*/, takeptr/*读索引*/, count/*队列中存在的数据个数*/; public void put(Object x) throws InterruptedException {
lock.lock();
try {
while (count == items.length)//如果队列满了
notFull.await();//阻塞写线程
items[putptr] = x;//赋值
if (++putptr == items.length){
putptr = 0;//如果写索引写到队列的最后一个位置了,那么置为0
}
++count;//个数++
notEmpty.signal();//唤醒读线程
} finally {
lock.unlock();
}
} public Object take() throws InterruptedException {
lock.lock();
try {
while (count == 0)//如果队列为空
notEmpty.await();//阻塞读线程
Object x = items[takeptr];//取值
if (++takeptr == items.length){
takeptr = 0;//如果读索引读到队列的最后一个位置了,那么置为0
}
--count;//个数--
notFull.signal();//唤醒写线程
return x;
} finally {
lock.unlock();
}
}
}

参考:

http://blog.csdn.net/vking_wang/article/details/9952063

3、JDK1.5并发包中的阻塞队列(ArrayBlockingQueue)
import java.util.concurrent.ArrayBlockingQueue; import org.apache.log4j.Logger; /**
* ArrayBlockingQueue(阻塞队列)<br/>
*    put(anObject):<br/>
* 把anObject加到BlockingQueue里,如果BlockQueue没有空间,<br/>
* 则调用此方法的线程被阻断,直到BlockingQueue里面有空间再继续.<br/>
*
*   take():<br/>
* 取走BlockingQueue里排在首位的对象,若BlockingQueue为空,<br/>
* 阻断进入等待状态直到,BlockingQueue有新的数据被加入;<br/>
*
* @author wangzhu
* @date 2015-3-23上午9:31:22
*
*/
public class ArrayBlockingQueueCommunication {
private static final Logger logger = Logger
.getLogger(ArrayBlockingQueueCommunication.class); /**
* @param args
*/
public static void main(String[] args) {
final Bussiness bussiness = new Bussiness();
new Thread() {
@Override
public void run() {
for (int i = 0; i < 50; i++) {
bussiness.sub(i);
}
};
}.start();
new Thread() {
@Override
public void run() {
for (int i = 0; i < 50; i++) {
bussiness.main(i);
}
};
}.start();
} static class Bussiness {
// 阻塞队列
ArrayBlockingQueue<Integer> mainQuery = new ArrayBlockingQueue<Integer>(
1);
ArrayBlockingQueue<Integer> subQuery = new ArrayBlockingQueue<Integer>(
1);
{
try {
subQuery.put(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
} public void main(int k) {
try {
mainQuery.put(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
for (int i = 0; i < 10; i++) {
logger.info(k + "==main thread====" + i);
}
try {
subQuery.take();
} catch (InterruptedException e) {
e.printStackTrace();
}
} public void sub(int k) {
try {
subQuery.put(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
for (int i = 0; i < 100; i++) {
logger.info(k + "==sub thread====" + i);
}
try {
mainQuery.take();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}

参考:

http://www.cnblogs.com/jackyuj/archive/2010/11/24/1886553.html

上一篇:linux分区-df


下一篇:Hadoop概念学习系列之常见的分布式文件系统(二十六)