Java之线程的生产者与消费者模型

设计思路:

通过Runnable实现线程类,通过Message实现生产者与消费者的联系。

Java之线程的生产者与消费者模型

 

初步测试:

// 消费者类
public class Consumer implements Runnable{
    private Message msg;

    public Consumer(Message msg) {
        this.msg = msg;
    }

    @Override
    public void run() {
        for (int x = 0; x < 100; x++) {
            try {
                Thread.sleep(100);  // 增加延迟
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(this.msg.getTitle() + "   ---   " + this.msg.getContent());
        }
    }
}

 

//生产者类
public class Producer implements Runnable{
    private Message msg;

    public Producer(Message msg) {
        this.msg = msg;
    }

    @Override
    public void run() {
        for (int x = 0; x < 100; x++) {
            if (x % 2 == 0){
                this.msg.setTitle("王建");
                try {
                    Thread.sleep(100);  // 增加延迟
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                this.msg.setContent("宇宙大帅哥");
            }else {
                this.msg.setTitle("小高");
                try {
                    Thread.sleep(100);  // 增加延迟
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                this.msg.setContent("常态保持");
            }
        }
    }
}

 

public class Message {  // 消息类
    private String title;
    private String content;

    public String getTitle() {
        return title;
    }

    public void setTitle(String title) {
        this.title = title;
    }

    public String getContent() {
        return content;
    }

    public void setContent(String content) {
        this.content = content;
    }
}

 

public class Main {     // 客户端
    public static void main(String[] args) throws Exception{
        Message msg = new Message();
        new Thread(new Producer(msg)).start();  //启动生产者线程
        new Thread(new Consumer(msg)).start();  //启动消费者线程
    }
}

输出结果:

Java之线程的生产者与消费者模型

 

Java之线程的生产者与消费者模型

 

发现了一个问题:

  小高成了宇宙大帅哥,王建是常态保持,甚至有王建为空的结果。

  - 1、数据不同步问题;

  - 2、本应该生产一个取走一个,但是发现有了重复生产和重复取出的问题。

解决问题:

  首先解决的是同步问题,而解决同步问题最简单的就是使用synchronized关键字来定义同步代码块或同步方法,那么这个同步处理可以直接在Message类中进行。

public class Message {  // 消息类,设置信息内容
    private String title;
    private String content;

    public synchronized void set(String title,String content){
        this.title = title;
        try {
            Thread.sleep(100);  // 增加延迟
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        this.content = content;
    }
    public synchronized String get(){
        try {
            Thread.sleep(100);  // 增加延迟
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return this.title + "  ---  " + this.content;
    }
}

 

// 消费者类,取得数据
public class Consumer implements Runnable{
    private Message msg;

    public Consumer(Message msg) {
        this.msg = msg;
    }

    @Override
    public void run() {
        for (int x = 0; x < 10; x++) {
            System.out.println(this.msg.get());
        }
    }
}

 

//生产者类,生成(设置)数据
public class Producer implements Runnable{
    private Message msg;

    public Producer(Message msg) {
        this.msg = msg;
    }

    @Override
    public void run() {
        for (int x = 1; x < 10; x++) {
            if (x % 2 == 0){
                this.msg.set("王建","宇宙大帅哥");     // 设置title和content
            }else {
                this.msg.set("小高","常态保持");     // 设置title和content
            }
        }
    }
}

 

public class Main {     // 客户端
    public static void main(String[] args) throws Exception{
        Message msg = new Message();
        new Thread(new Producer(msg)).start();  //启动生产者线程
        new Thread(new Consumer(msg)).start();  //启动消费者线程
    }
}

输出结果:

Java之线程的生产者与消费者模型

解决了同步问题,但是并没有解决重复的问题。

解决重复问题(线程的等待和唤醒):

要想实现生产一个拿取一个,那么就必须进行等待与唤醒操作,主要依靠的是Object类中的方法进行处理;

  - 等待机制:

    |- 死等(无人唤醒):public final void wait() throws InterruptedException;

    |- 设置等待时间1:public final void wait(long timeout) throws InterruptedException;

    |- 设置等待时间2:public final void wait(long timeout int nanos) throws InterruptedException;

  - 唤醒第一个等待线程:public final viod notify();

  - 唤醒全部等待线程:public final viod notifyAll();

如果此时有若干个等待线程的话,那么notify()表示的是唤醒第一个等待的,而其它的线程继续等待;而notifyAll()表示会唤醒所有等待中的线程,那个线程的优先级高就有可能先执行。

对于当前的问题主要的解决应该通过Message类完成处理。

修改Message类:

  

public class Message {  // 消息类,设置信息内容
    private String title;
    private String content;
    private boolean flag = true;   // 表示生产或消费:
    // flag = true: 允许生产,不准消费
    // flag = false: 允消费,不准生产

    public synchronized void set(String title,String content){
        if (this.flag == false){ // 无法进行生产,等待被消费
            try {
                super.wait();   // 等待
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        this.title = title;
        try {
            Thread.sleep(10);  // 增加延迟
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        this.content = content;
        this.flag = false;  // 生产过了
        super.notify(); // 可能会有等待消费的线程,唤醒等待的线程
    }
    public synchronized String get(){
        if (this.flag == true){ // 还未生产,需要等待
            try {
                super.wait();   // 等待
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        try {
            Thread.sleep(100);  // 增加延迟
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        try {
            return this.title + "  ---  " + this.content;
        }finally {  // 不管如何都要执行
            this.flag = true;   // 继续生产
            super.notify(); // 唤醒等待的线程
        }
    }
}

 

//生产者类,生成(设置)数据
public class Producer implements Runnable{
    private Message msg;

    public Producer(Message msg) {
        this.msg = msg;
    }

    @Override
    public void run() {
        for (int x = 0; x < 100; x++) {
            if (x % 2 == 0){
                this.msg.set("王建","宇宙大帅哥");     // 设置title和content
            }else {
                this.msg.set("小高","常态保持");     // 设置title和content
            }
        }
    }
}

 

// 消费者类,取得数据
public class Consumer implements Runnable{
    private Message msg;

    public Consumer(Message msg) {
        this.msg = msg;
    }

    @Override
    public void run() {
        try {
            Thread.sleep(10);  // 增加延迟
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        for (int x = 0; x < 100; x++) {
            System.out.println(this.msg.get());
        }
    }
}

 

public class Main {     // 客户端
    public static void main(String[] args) throws Exception{
        Message msg = new Message();
        new Thread(new Producer(msg)).start();  //启动生产者线程
        new Thread(new Consumer(msg)).start();  //启动消费者线程
    }
}

输出结果:

Java之线程的生产者与消费者模型

可以发现现在的输出就是一个生产一个消费的模式,没有了前面的两种问题。

 

上一篇:寻找写代码感觉(十三)之 编辑功能的开发


下一篇:如何从邮箱中批量下载距离当天最近一天的邮件附件.