Active Object[接收异步消息的对象]
一:Active Object的参与者
--->客户端线程(发起某种操作请求处理)
--->代理角色(工头)
--->实际执行者(工人)
--->主动对象接口(工人和工头)
--->生产端线程(加工产品的线程)
--->存放生产请求的队列(存放请求的队列)
--->请求实例化(将方法的启动和执行分离的实例化包含)
--->订单
--->产品
--->订单产品的共同接口
二:Active Object模式什么时候使用
--->大型模式,适合处理大并发量的业务场景
三:Active Object思考
--->
四进阶说明
--->
Active Object例子
工厂大量需要制造文字
一:订单和产品的接口
1 package com.yeepay.sxf.thread12; 2 /** 3 * 订单和产品的接口 4 * @author sxf 5 * 6 */ 7 public abstract class Result { 8 //获取结果的方法 9 public abstract Object getResultValue(); 10 11 }
二:订单类
1 package com.yeepay.sxf.thread12; 2 /** 3 * 订单类 4 * @author sxf 5 * 6 */ 7 public class FutureResult extends Result{ 8 //真正的产品 9 private Result result; 10 //产品是否生产好 11 private boolean ready=false; 12 13 //供生产线程使用,将生产好的产品放入订单 14 public synchronized void setResult(Result result){ 15 this.result=result; 16 this.ready=true; 17 notifyAll(); 18 } 19 20 //从订单里获取真正的产品结果 21 @Override 22 public synchronized Object getResultValue() { 23 while(!ready){ 24 try { 25 wait(); 26 } catch (InterruptedException e) { 27 // TODO Auto-generated catch block 28 e.printStackTrace(); 29 } 30 } 31 return result.getResultValue(); 32 } 33 34 35 }
三:产品类
1 package com.yeepay.sxf.thread12; 2 3 /** 4 * 产品类 5 * @author sxf 6 * 7 */ 8 public class RealResult extends Result { 9 private final Object resultValue; 10 11 //制造产品的方法 12 public RealResult(Object resultValue) { 13 // TODO Auto-generated constructor stub 14 this.resultValue=resultValue; 15 } 16 17 //获得产品 18 @Override 19 public Object getResultValue() { 20 21 return resultValue; 22 } 23 24 25 }
四:工头和工人的接口
1 package com.yeepay.sxf.thread12; 2 3 /** 4 * 工头和工人的接口 5 * @author sxf 6 * 7 */ 8 public interface ActiveObject { 9 //制造字符串 10 public abstract Result makeString(int count,char fillchar); 11 12 //显示字符串 13 public abstract void displayString(String string); 14 15 }
五:工头类
1 package com.yeepay.sxf.thread12; 2 3 /** 4 * 发起制作操作的代理(工头) 5 * @author sxf 6 * 7 */ 8 public class Poxy implements ActiveObject{ 9 //制造操作的线程 10 private final SchedulerTHread schedulerTHread; 11 //制造操作的工人 12 private final Servant servant; 13 //构造器 14 public Poxy(SchedulerTHread schedulerTHread,Servant servant){ 15 this.schedulerTHread=schedulerTHread; 16 this.servant=servant; 17 } 18 19 /** 20 * 发起制造操作(开启制造操作的单据,并将制造原材料和制造工人打包,放入制造线程的队列中) 21 */ 22 @Override 23 public Result makeString(int count, char fillchar) { 24 //制作一张订单 25 FutureResult future=new FutureResult(); 26 //将原材料和工人和订单放入队列 27 schedulerTHread.Invoke(new MakeStringRequset(servant, future, count, fillchar)); 28 //返回订单 29 return future; 30 } 31 32 /** 33 * 查看字符串 34 */ 35 @Override 36 public void displayString(String string) { 37 //这个制造不需要订单 38 schedulerTHread.Invoke(new DisplayStringRequest(servant, string)); 39 } 40 41 42 }
六:工人类
1 package com.yeepay.sxf.thread12; 2 /** 3 * 执行者(工人) 4 * @author sxf 5 * 6 */ 7 8 public class Servant implements ActiveObject { 9 10 /** 11 * 生产字符串 12 */ 13 @Override 14 public Result makeString(int count, char fillchar) { 15 char[] buffer=new char[count]; 16 for(int i=0;i<count;i++){ 17 buffer[i]=fillchar; 18 try { 19 Thread.sleep(100); 20 } catch (InterruptedException e) { 21 // TODO Auto-generated catch block 22 e.printStackTrace(); 23 } 24 } 25 return new RealResult(buffer.toString()); 26 } 27 28 /** 29 * 打印字符串 30 */ 31 @Override 32 public void displayString(String string) { 33 System.out.println("Servant.displayString()"+string); 34 try { 35 Thread.sleep(10); 36 } catch (InterruptedException e) { 37 // TODO Auto-generated catch block 38 e.printStackTrace(); 39 } 40 41 } 42 43 44 }
七:工作请求抽象的类
1 package com.yeepay.sxf.thread12; 2 /** 3 * 执行请求的实例(工人,订单,原材料) 4 * @author sxf 5 * 6 */ 7 public abstract class MethodRequest { 8 //工人 9 protected final Servant servant; 10 //订单 11 protected final FutureResult future; 12 13 protected MethodRequest(Servant servant,FutureResult future) { 14 this.servant=servant; 15 this.future=future; 16 } 17 //工人的具体操作 18 public abstract void execute(); 19 }
八:制造文字的请求
1 package com.yeepay.sxf.thread12; 2 /** 3 * 制造字符串请求实例 4 * 其做用:将制造的发起者和制造的执行者分离。 5 * ==》方法的启动和方法的执行进行分离 6 * ==》分离的本质:(1)启动者将执行请求转换成实例(原材料,执行者,订单) 7 * (2)执行者用原材料生产完产品,将产品存入订单,供发起者使用 8 * @author sxf 9 * 10 */ 11 public class MakeStringRequset extends MethodRequest{ 12 private final int count; 13 private final char fillchar; 14 //构造器(制造工人,订单,产品原材料) 15 public MakeStringRequset(Servant servant,FutureResult futureResult,int count,char fillchar) { 16 super(servant, futureResult); 17 this.count=count; 18 this.fillchar=fillchar; 19 } 20 21 @Override 22 public void execute() { 23 //制造工人制造出产品 24 RealResult realResult=(RealResult) servant.makeString(count, fillchar); 25 //再将产品放入订单 26 future.setResult(realResult); 27 } 28 29 30 }
九:打印文字的请求
1 package com.yeepay.sxf.thread12; 2 /** 3 * 打印字符串的请求实例 4 * @author sxf 5 * 6 */ 7 public class DisplayStringRequest extends MethodRequest{ 8 //打印原材料 9 private final String string; 10 public DisplayStringRequest(Servant servant,String string){ 11 super(servant, null); 12 this.string=string; 13 } 14 //执行打印的方法 15 @Override 16 public void execute() { 17 servant.displayString(string); 18 } 19 20 21 }
十:十:客户线程(1)(发起制造字符串请求的线程)
1 package com.yeepay.sxf.thread12; 2 3 /** 4 * 发起制造字符串请求的线程 5 * @author sxf 6 * 7 */ 8 public class MakerClientThread implements Runnable{ 9 //代理类(工头) 10 private final ActiveObject activeObject; 11 //生产原材料 12 private final char fillChar; 13 14 public MakerClientThread(ActiveObject activeObject,char fillChar) { 15 this.activeObject=activeObject; 16 this.fillChar=fillChar; 17 } 18 19 @Override 20 public void run() { 21 for(int i=0;true;i++){ 22 //(工头)发起制造,得到订单 23 Result result=activeObject.makeString(i, fillChar); 24 //忙别的事情 25 try { 26 Thread.sleep(1000); 27 } catch (InterruptedException e) { 28 e.printStackTrace(); 29 } 30 //从订单中获取产品结果 31 String value=(String) result.getResultValue(); 32 //打印产品 33 System.out.println(Thread.currentThread().getName()+"=>value:["+value+"]"); 34 } 35 36 } 37 38 39 40 }
十一:客户线程(2)发起打印字符串的请求线程
1 package com.yeepay.sxf.thread12; 2 /** 3 * 发起打印字符串请求的线程 4 * @author sxf 5 * 6 */ 7 public class DisplayClientThread implements Runnable{ 8 //工头 9 private final ActiveObject activeObject; 10 11 public DisplayClientThread(ActiveObject activeObject){ 12 this.activeObject=activeObject; 13 } 14 15 @Override 16 public void run() { 17 for(int i=0;true;i++){ 18 //打印原材料 19 String string=Thread.currentThread().getName()+" "+i; 20 //工头进行找人打印 21 activeObject.displayString(string); 22 try { 23 Thread.sleep(200); 24 } catch (InterruptedException e) { 25 // TODO Auto-generated catch block 26 e.printStackTrace(); 27 } 28 } 29 30 } 31 32 }
十二:生产线程
1 package com.yeepay.sxf.thread12; 2 3 4 /** 5 * 制造线程(制造原材料==>原材料+工人) 6 * @author sxf 7 * 8 */ 9 public class SchedulerTHread implements Runnable{ 10 //制造线程中的队列 11 private final ActivationQueue activatonQueue; 12 public SchedulerTHread(ActivationQueue activationQueue) { 13 this.activatonQueue=activationQueue; 14 } 15 16 //发起制造的线程,调用该方法,该方法是将制造请求(原材料+工人),放入队列 17 public void Invoke(MethodRequest request){ 18 activatonQueue.putRequest(request); 19 } 20 21 22 //制造线程体 23 @Override 24 public void run() { 25 while(true){ 26 //从队列中取出制造请求 27 MethodRequest request=activatonQueue.takeRequest(); 28 //制造请求被执行 29 request.execute(); 30 } 31 32 33 } 34 35 36 }
十三:客户线程和生产线程的队列
1 package com.yeepay.sxf.thread12; 2 3 /** 4 * 队列 5 * @author sxf 6 * 7 */ 8 public class ActivationQueue { 9 private static final int MAX_METHOD_REQUEST=100; 10 private final MethodRequest[] requestQueue; 11 private int tail;//下一个put Request的地方 12 private int head;//下一个take Request的地方 13 private int countRequest;//request的数量 14 15 public ActivationQueue() { 16 this.requestQueue=new MethodRequest[MAX_METHOD_REQUEST]; 17 this.head=0; 18 this.tail=0; 19 this.countRequest=0; 20 } 21 22 //放入请求 23 public synchronized void putRequest(MethodRequest request){ 24 while(countRequest>=requestQueue.length){ 25 try { 26 wait(); 27 } catch (InterruptedException e) { 28 // TODO Auto-generated catch block 29 e.printStackTrace(); 30 } 31 } 32 requestQueue[tail]=request; 33 //计算下一个放请求的位置 34 tail=(tail+1)%requestQueue.length; 35 //请求数加1 36 countRequest++; 37 //唤醒其他线程 38 notifyAll(); 39 } 40 41 //取出请求 42 public synchronized MethodRequest takeRequest(){ 43 while(countRequest<=0){ 44 try { 45 wait(); 46 } catch (InterruptedException e) { 47 // TODO Auto-generated catch block 48 e.printStackTrace(); 49 } 50 } 51 //取出 52 MethodRequest methodRequest=requestQueue[head]; 53 //计算下一个取出请求的位置 54 head=(head+1)%requestQueue.length; 55 //请求数减去1 56 countRequest--; 57 //唤醒其他线程 58 notifyAll(); 59 return methodRequest; 60 } 61 }
十四:产生工头的工厂类
1 package com.yeepay.sxf.thread12; 2 3 /** 4 * 工头对象的工厂类 5 * @author sxf 6 * 7 */ 8 public class ActiveObjectFactory { 9 10 //生产工头的对象 11 public static ActiveObject createActiveObject(){ 12 //执行制造操作(工人) 13 Servant servant=new Servant(); 14 //存放制造请求实例 15 ActivationQueue queue=new ActivationQueue(); 16 //制造操作的线程 17 SchedulerTHread schedulerTHread=new SchedulerTHread(queue); 18 //发起制造动作(工头) 19 Poxy poxy=new Poxy(schedulerTHread, servant); 20 //启动制造操作的线程 21 new Thread(schedulerTHread).start(); 22 return poxy; 23 } 24 }
十五:测试类
1 package com.yeepay.sxf.thread12; 2 /** 3 * 测试类 4 * @author sxf 5 * 6 */ 7 public class Test { 8 9 public static void main(String[] args) { 10 ActiveObject activeObject=ActiveObjectFactory.createActiveObject(); 11 new Thread(new MakerClientThread(activeObject, 's')).start();; 12 new Thread(new DisplayClientThread(activeObject)).start(); 13 14 } 15 }