本文共 4069 字,大约阅读时间需要 13 分钟。
import java.util.concurrent.Callable;import java.util.concurrent.ExecutionException;import java.util.concurrent.Future;import java.util.concurrent.FutureTask;import java.util.concurrent.LinkedBlockingQueue;/*通过一个简单的例子研究一个典型的ActiveObject的实现*/public class ActiveObject { /*为了方便,将所有相关的类都定义在一个class内*/ public void startTest() { ServiceProxy proxy = new ServiceProxy(); Client client = new Client(proxy); client.getNumberOfCustomers(); client.getBalance(); } /*模拟客户端,此客户端调用proxy的异步方法,并保存Future对象,在晚些时候通过Future获取调用结果. * */ public class Client { private ServiceProxy proxy = null; private FuturegetNumberOfCustomers = null; private Future getBalance = null; public Client(ServiceProxy proxy) { super(); this.proxy = proxy; } public void getNumberOfCustomers() { getNumberOfCustomers = proxy.getNumberOfCustomers(); try { System.out.println("Number of customers = "+getNumberOfCustomers.get()); } catch (InterruptedException | ExecutionException e) { // TODO Auto-generated catch block e.printStackTrace(); } } public void getBalance() { getBalance = proxy.getBalance(); try { System.out.println("Balance = "+getBalance.get()); } catch (InterruptedException | ExecutionException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } /*对client提供业务调用接口,返回Future给client,内部使用Scheduler维护methodRequest*/ public class ServiceProxy { private Scheduler scheduler = new Scheduler(); private Servant servant = new Servant(); /*将这个servant封装到MethodRequest中,实际使用Servant必然需要一个继承体系,不同的业务使用不同的Servant*/ public Future getNumberOfCustomers() { /*生成一个MethodRequest对象,并通过scheduler加入队列。*/ MethodRequestGetCustomerNumber request = new MethodRequestGetCustomerNumber(servant); return (Future )scheduler.enqueue(request); } public Future getBalance() { MethodRequestGetBalance request = new MethodRequestGetBalance(servant); return (Future )scheduler.enqueue(request); } } /*封装客户端的调用*/ public abstract class MethodRequestInteger implements Callable { protected int requestType = 0; } public abstract class MethodRequestDouble implements Callable { protected int requestType = 0; } public class MethodRequestGetCustomerNumber extends MethodRequestInteger { private Servant servant = null; public MethodRequestGetCustomerNumber(Servant servant) { super(); this.servant = servant; } @Override public Integer call() throws Exception { // TODO Auto-generated method stub return servant.getNumberOfCustomers(); } } public class MethodRequestGetBalance extends MethodRequestDouble { private Servant servant = null; public MethodRequestGetBalance(Servant servant) { super(); this.servant = servant; } @Override public Double call() throws Exception { // TODO Auto-generated method stub return servant.getBalance(); } } /*维护请求队列,并调用队列中的methodRequest*/ public class Scheduler { private LinkedBlockingQueue requestQueue = new LinkedBlockingQueue (100); private Dispatcher dispatcher = new Dispatcher(); public Scheduler() { /*启动dispatcher线程(consumer),实际使用中,dispatcher应该会有多个,并使用线程池进行管理,此处简化为1个。*/ new Thread(dispatcher).start(); } /*对外提供的接口,client通过ServiceProxy可并发调用此方法*/ public Future enqueue(Callable request) { final FutureTask task = new FutureTask(request); try { requestQueue.put(task); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } return task; } private class Dispatcher implements Runnable { public volatile boolean isRunning = true; @Override public void run() { // TODO Auto-generated method stub while(!Thread.currentThread().isInterrupted() && true == isRunning) { try { FutureTask task = requestQueue.take(); if( null != task ) { task.run(); /*task利用给定的Servant去执行调用,这里引入了Command模式*/ } } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); Thread.currentThread().interrupt(); /*阻塞状态下被中断,中断标志会清掉,需要重新置位一下*/ } } } } } /*模拟执行最终的调用*/ public class Servant { public int getNumberOfCustomers() { return 11; } public double getBalance() { return 0.177; } }}
转载地址:http://pvhii.baihongyu.com/