mahout 贝叶斯分类后数据解析

作者:杨步涛

技术Bloghttp://blog.csdn.net/yangbutao

github:https://github.com/yangbutao/xlongc



长连接需要注意的事项

长连接最大的问题是对于连接池中连接的维护,连接池中的连接保持一定大小,当少于阀值时,则在获取连接时,自动创建新的连接,并添加到连接池中;连接池中的连接采用RR算法获取。

维护连接池需要应对IO类的异常(SocketTimeoutExceptionIOException)的情况,确保获取的连接是正常的连接;异常情况主要有:在建立连接时出现异常,需要采取重试策略,超过重试次数,清除连接池中的该连接,还有是在IO过程中出现异常,关闭并清除连接池中的该连接。

架构说明

架构图

mahout 贝叶斯分类后数据解析

 

客户端实现

客户端socket IO是基于NIO事件驱动的,底层封装了相关的java NIO 模型,比如SelectorSelectionKeySocketChannel等,通过接收OP_CONNECTOP_READOP_WRITE事件来实现IO操作。

同时客户端调用又是同步的,发出请求后,一直等到结果返回直到超时或者异常。

下面简单介绍一下调用的流程:

1、用户程序首先从连接池中获取连接对象Connection,参见①;初始时,会不断创建新的连接并把该连接加到池中,直到连接池中的连接达到最大数量,连接的分配采用RoundRobin机制。创建连接的过程是建立socket通道以及用于发送数据的输出流和用于接收响应的输入流。

2、把请求封装成call对象,塞到call队列中,每个Connection都有一个call队列,包含了当前正在请求还未返回响应的request,以便于进行超时和同步用,参见②。每个call对象包含对于该连接来说唯一的ID,这个ID需要服务端在返回响应数据时一起返回给客户端的,客户端在接收到响应后通过该ID获取队列中的call对象,由于发送和接收过程时异步的,因此需要通过call对象的同步操作(发送后call.wait;接收后call.notify)来实现客户端同步调用。

3、通过输出流,发送call请求,参见

4、客户端循环调用call对象的wait直到call对象中被设置了已返回响应数据标记,参见

5、每个Connection启动一个线程,不断检测输入流中是否有响应数据返回,若有数据返回,则解析响应数据得到callID,设置call的响应数据,并notify call,并把call对象从call队列缓存中删除,客户端调用得到响应数据并继续执行,参见⑤⑥

 

 

 

服务端实现

服务端实现和普通的没有什么区别,可以用阻塞的或者非阻塞方式的NIO,当然可以基于成熟的Netty或者MINA来实现;唯一需要注意的是,在返回响应数据的时,需要把CallID也一起返回来。

 

数据传输格式

Socket流中的二进制字节数组格式需要进行约定,以便于客户端和服务端解析得到相关部分的数据。

请求数据:

整个请求byte数组包含三个数据,数据长度(call_id和请求数据byte长度之和)call_id、请求数据。其中数据长度占用4个字节,call_id(int类型)占用4个字节,请求数据排在最后。

数据长度(4字节)|call_id(4字节)|请求数据

 

返回数据:

整个返回的byte数组包含三部分,call_id(4个字节)flag标记(表示后续数据是异常信息还是响应数据,占用1个字节,0x1表示异常,0x2表示响应数据)、数据或者异常数据的长度(4个字节)、异常或者响应数据

call_id(4字节)|flag(1字节)|长度(4字节)|异常或者数据

 

 

使用说明

客户端调用的伪代码是:

byte[] ret=ConnectionManager.INSTANCE.call(param,new InetSocketAddress("10.1.1.20", 9876), 20000);

/**

 * 客户端同步调用的接口

 * @param param 输入参数,已经序列化好的byte数组,该框架暂不提供序列化机制

 * @param addr 远程服务端地址

 * @param rpcTimeout rpc的超时时间

 * @return 返回数据也是byte数组,该框架暂不提供反序列化的机制

 * @throws InterruptedException

 * @throws IOException

 */

public byte[] call(byte[] param, InetSocketAddress addr, int rpcTimeout)

throws InterruptedException, IOException

 


客户端在使用时的一些相关配置参数,根据需要进行调整:

连接池的大小设置在ConnectionManager类中,poolSize默认是10

连接的相关参数配置在Connection类中,建立连接的超时时间socketTimeout 默认是20s,连接最大空闲时间maxIdleTime(这个主要是为了防止客户端对服务端的连接,长时间空闲而消耗服务端资源)默认是10s,建立连接重试次数maxRetries默认是0

 

后续事宜

后续把序列化机制集成进来,以支持多种序列化的方式,如thriftprotolcolbufferHessian 等

mahout 贝叶斯分类后数据解析

上一篇:UVA 10755 - Garbage Heap(三维子矩阵最大和)


下一篇:paip.java 注解的详细使用代码