今天遇到了一个广告网络比较现实的需求,如下:
最为一个广告服务端,可以从publisher的app接收到很多的加载广告的请求。。。这个时候可以将这些请求的数据发给一些中间的机构(exchange),然后由他们返回广告的数据。。。因为请求量较大,而且要保证延迟不能太高,所以这里与这些中间机构进行通信的时候就只能采用长连接的方式了,不能每次请求都生成一次连接来进行http请求。。。
其实以前一般用netty开发都是作为服务端来的,很少用作客户端。。。在网上找了一下代码,好像找不到类似立马可以用的,没办法了,只有自己写一个了。。。还好自己对netty还算是比较熟。。。大致思路如下:
(1)创建一个eventLoopGroup,用于维护nio的io事件
(2)创建一个niosocketchanel,然后将其注册到eventLoopGroup上面去,并未channel设置初始化的handler
(3)调用channel的connect方法发起与远端的连接请求
(4)当链接建立以后,刚刚提到的初始化handler将会用于响应,为channel添加http的decode与encode的handler。。
(5)最后就可以开始进行http通信了。。
当然这里就不要主动的去断开channel了,断开还是让对方的服务器去做吧,或者超时,或者什么的。。反正客户端不会主动断开。。。
其实只要上面的步骤想出来,接下来写代码用netty来实现一个基于长连接的http客户端还算是很简单 的。。直接上代码吧:
package fjs; import java.net.InetSocketAddress; import java.net.URI; import java.net.URISyntaxException; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandler; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.codec.http.FullHttpRequest; import io.netty.handler.codec.http.HttpClientCodec; import io.netty.handler.codec.http.HttpMethod; import io.netty.handler.codec.http.HttpVersion; import io.netty.handler.codec.http.QueryStringEncoder; import io.netty.handler.codec.http.DefaultFullHttpRequest; public class Fjs { public static AtomicInteger number = new AtomicInteger(0); public static AtomicLong time = new AtomicLong(0); public static void doIt(Channel channel) { if (number.get() < 50) { number.incrementAndGet(); time.set(System.currentTimeMillis()); QueryStringEncoder encoder = new QueryStringEncoder("http://www.baidu.com/oapi/reqAd.jsp?pub=923875870&adspace=65826983&adcount=1&response=HTML&devip=22.56.22.66&user=900&format=IMG&position=top&height=&width=&device=Mozilla%2F5.0%20%28Linux%3B%20Android%204.2.1%3B%20en-us%3B%20Nexus%204%20Build%2FJOP40D%29%20AppleWebKit%2F535.19%20%28KHTML%2C%20like%20Gecko%29%20Chrome%2F18.0.1025.166%20Mobile%20Safari%2F535.19&beacon=TRUE&phpsnip=104"); URI uriGet = null; try { uriGet = new URI(encoder.toString()); } catch (URISyntaxException e) { System.out.println("我擦,,,,"); } FullHttpRequest request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, uriGet.toASCIIString()); channel.pipeline().write(request); channel.flush(); } else { System.out.println("over"); } } public static void main(String args[]) throws InterruptedException { NioEventLoopGroup group = new NioEventLoopGroup(); NioSocketChannel channel = new NioSocketChannel(); //创建一个channel,待会用它来发起链接 channel.pipeline().addFirst(new InitHandler()); //为这个channel添加一个初始化的handler,用于响应待会channel建立成功 group.register(channel); //注册这个channel channel.connect(new InetSocketAddress("www.baidu.com", 80)); //调用connect方法 Thread.currentThread().sleep(Long.MAX_VALUE); } public static class InitHandler implements ChannelInboundHandler { public void handlerAdded(ChannelHandlerContext ctx) throws Exception { // TODO Auto-generated method stub } public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { // TODO Auto-generated method stub } public void channelRegistered(ChannelHandlerContext ctx) throws Exception { // TODO Auto-generated method stub } public void channelUnregistered(ChannelHandlerContext ctx) throws Exception { // TODO Auto-generated method stub } // 当连接建立成功之后会调用这个方法初始化channel public void channelActive(ChannelHandlerContext ctx) throws Exception { // TODO Auto-generated method stub ctx.channel().pipeline().remove(this); //嗯,当前这个handler对这个channel就算是没有用了,可以移除了。。。 ctx.channel().pipeline().addFirst(new HttpClientCodec()); //添加一个http协议的encoder与decoder ctx.channel().pipeline().addLast(new ReponseHandler()); //添加用于处理http返回信息的handler Fjs.doIt(ctx.channel()); } public void channelInactive(ChannelHandlerContext ctx) throws Exception { // TODO Auto-generated method stub System.out.println("disconnect " + System.currentTimeMillis() / 1000); } public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { // TODO Auto-generated method stub System.out.println("read " + System.currentTimeMillis() / 1000); } public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { // TODO Auto-generated method stub } public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { // TODO Auto-generated method stub } public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception { // TODO Auto-generated method stub } public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { // TODO Auto-generated method stub System.out.println("error " + System.currentTimeMillis() / 1000); } } }
这部分的内容基本上就囊括了上面提到的所有的步骤。。。而且写了一个发起http请求的静态方法。。。到这里基本上一个基于长连接的http客户端就算差不多了。。。最后再来一个响应httpresponse的handler吧:
package fjs; import java.nio.charset.Charset; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.handler.codec.http.DefaultHttpResponse; import io.netty.handler.codec.http.DefaultLastHttpContent; import io.netty.handler.codec.http.HttpContent; import io.netty.handler.codec.http.HttpResponse; import io.netty.handler.codec.http.LastHttpContent; public class ReponseHandler extends ChannelInboundHandlerAdapter{ ByteBuf buf = Unpooled.buffer(); public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { if (msg instanceof HttpResponse) { DefaultHttpResponse response = (DefaultHttpResponse)msg; } if (msg instanceof HttpContent) { DefaultLastHttpContent chunk = (DefaultLastHttpContent)msg; buf.writeBytes(chunk.content()); if (chunk instanceof LastHttpContent) { long now = System.currentTimeMillis(); long before = Fjs.time.get(); System.out.println(((double) now - (double)before) / 1000); String xml = buf.toString(Charset.forName("UTF-8")); buf.clear(); Fjs.doIt(ctx.channel()); } } } }
嗯,netty4版本以后,开发变得简洁的多了。。
当然上面的代码只不过是一个很粗糙的版本,就用作参考大概用netty如何开发长连接客户端吧。。。
在实际应用中应该可以维护一个长连接池。。另外可能还有很多细节性的东西要处理吧。。例如超时什么的。。。