阻塞队列生产者消费者

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;

import static java.util.concurrent.TimeUnit.SECONDS;

class Resource {

private volatile boolean FLAG = true;
AtomicInteger atomicInteger = new AtomicInteger();

BlockingQueue blockingQueue = null;

public Resource(BlockingQueue blockingQueue) {
    this.blockingQueue = blockingQueue;
}

public void produce() throws Exception {
    String data = "";
    while (FLAG) {
        data = atomicInteger.incrementAndGet() + "";
        blockingQueue.offer(data, 2, SECONDS);
        System.out.println("线程" + Thread.currentThread().getName() + "生产数据:" + data);
        Thread.currentThread().sleep(1000);
    }
    System.out.println("线程叫停~~~");
}

public void consumer() throws Exception {
    while (FLAG) {
        String data = (String) blockingQueue.poll(2, SECONDS);
        if ("".equals(data)) {
            FLAG = false;
            return;
        }
        System.out.println("线程" + Thread.currentThread().getName() + "消费数据:" + data);
    }
}

public void stop() {
    this.FLAG = false;
}

}

public class ProducerAndConsumerTest {
public static void main(String[] args) {
Resource resource = new Resource(new ArrayBlockingQueue(10));

    new Thread(() -> {
        System.out.println("生产者线程启动。。。");
        try {
            resource.produce();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }, "producer").start();

    new Thread(() -> {
        System.out.println("消费者线程启动。。。");
        try {
            resource.consumer();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }, "consumer").start();

    try {
        Thread.currentThread().sleep(10000);
    } catch (Exception e) {
        e.printStackTrace();
    }
    resource.stop();
}

}

上一篇:简单明了Java线程池


下一篇:阻塞队列BlockingQueue详解