netty 基于 http1.0 开发

 

 

package com.netty.core;

import java.security.cert.CertificateException;

import javax.net.ssl.SSLException;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.util.SelfSignedCertificate;

import org.apache.log4j.Logger;
import org.springframework.beans.factory.InitializingBean;

import com.netty.util.NettyPropertiesUtil;

public class NettyServer implements InitializingBean {

	Logger logger = Logger.getLogger(NettyServer.class);

	public boolean isSSL = true;
	public int port = 8443;
	public int threadSize = 1;
	public int connnecttimeout = 10000;
	public int sotimeout = 10000;

	public void init() {
		isSSL = Integer
				.parseInt(NettyPropertiesUtil.getProperty("netty.isSSL")) == 1 ? true
				: false;
		port = Integer.parseInt(NettyPropertiesUtil.getProperty("netty.port"));
		threadSize = Integer.parseInt(NettyPropertiesUtil
				.getProperty("netty.nioeventloopgroup.size"));
		connnecttimeout = Integer.parseInt(NettyPropertiesUtil
				.getProperty("netty.serverbootstrap.connnecttimeout"));
		sotimeout = Integer.parseInt(NettyPropertiesUtil
				.getProperty("netty.serverbootstrap.sotimeout"));
	}

	@SuppressWarnings({ "deprecation" })
	public void start() throws CertificateException, SSLException,
			InterruptedException {
		logger.info("netty server  start");
		final SslContext sslCtx;
		if (isSSL) {
			SelfSignedCertificate ssc = new SelfSignedCertificate();
			sslCtx = SslContext.newServerContext(ssc.certificate(),
					ssc.privateKey());
		} else {
			sslCtx = null;
		}

		EventLoopGroup bossGroup = new NioEventLoopGroup(threadSize);
		EventLoopGroup workerGroup = new NioEventLoopGroup();

		try {
			ServerBootstrap b = new ServerBootstrap();
			b.option(ChannelOption.SO_BACKLOG, 1024);
			b.group(bossGroup, workerGroup)
					.channel(NioServerSocketChannel.class)
					.handler(new LoggingHandler(LogLevel.INFO))
					.childHandler(new NettyServerInitializer(sslCtx));
			b.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, connnecttimeout);
			b.option(ChannelOption.SO_TIMEOUT, sotimeout);

			Channel ch = b.bind(port).sync().channel();
			logger.info("netty server end");
			ch.closeFuture().sync();
			logger.info("netty end ");
		} finally {
			bossGroup.shutdownGracefully();
			workerGroup.shutdownGracefully();
		}
	}

	@Override
	public void afterPropertiesSet() throws Exception {
		try {
			logger.info(" netty conf init start");
			init();
			logger.info(" netty conf init end");
		} catch (Exception e) {
			logger.info(" netty conf init fail");
			e.printStackTrace();
		}
	}

}

 

package com.netty.core;
 
import org.apache.log4j.Logger;
 

import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.codec.http.DefaultFullHttpResponse;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.HttpRequest;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpVersion;
import io.netty.handler.codec.http.HttpHeaders.Names; 

public class NettyServerHandler extends ChannelInboundHandlerAdapter {
	
	Logger logger = Logger.getLogger(NettyServerHandler.class);
	 
	
	@Override
	public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
		if (msg instanceof HttpRequest){
			HttpRequest req = (HttpRequest) msg;
			logger.info("RECV MSG: "+req.getUri());
			if(HttpHeaders.is100ContinueExpected(req)){
				ctx.write(new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.CONTINUE));
			}
			String resp="ok";
			FullHttpResponse  response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK, Unpooled.wrappedBuffer(resp.getBytes()));
			response.headers().set(Names.CONTENT_TYPE, "text/html; charset=utf-8");
			response.headers().set(Names.CONTENT_LENGTH, response.content().readableBytes());
			ctx.write(response).addListener(ChannelFutureListener.CLOSE);
		}
	}
	
	@Override
	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
		ctx.close();
	}
	

	@Override
	public void channelReadComplete(ChannelHandlerContext ctx) {
		ctx.flush();
	}
	
}

 

package com.netty.core;


import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.ssl.SslContext;

public class NettyServerInitializer  extends ChannelInitializer<SocketChannel> {

private final SslContext sslCtx;
    
    public NettyServerInitializer() {
    	sslCtx=null;
    }
    
    public NettyServerInitializer(SslContext sslCtx) {
        this.sslCtx = sslCtx;
    }

    @Override
    public void initChannel(SocketChannel ch) {
        ChannelPipeline p = ch.pipeline();
        if (null!=sslCtx) {
            p.addLast(sslCtx.newHandler(ch.alloc()));
        }
        p.addLast(new HttpServerCodec());
        //完整参数封装
        p.addLast("aggregator", new HttpObjectAggregator(1048576));  
        p.addLast(new NettyServerHandler());
    }


	
}

 

package test.netty;

import java.net.URI;

import org.junit.Test; 

import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.http.DefaultFullHttpRequest;
import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.HttpMethod;
import io.netty.handler.codec.http.HttpRequestEncoder;
import io.netty.handler.codec.http.HttpResponseDecoder;
import io.netty.handler.codec.http.HttpVersion;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.codec.http.HttpContent; 
import io.netty.handler.codec.http.HttpResponse;
 
public class NettyClientTest {

	public void connect(String host, int port) throws Exception {
        EventLoopGroup workerGroup = new NioEventLoopGroup();

        try {
            Bootstrap b = new Bootstrap();
            b.group(workerGroup);
            b.channel(NioSocketChannel.class);
            b.option(ChannelOption.SO_KEEPALIVE, true);
            b.handler(new ChannelInitializer<SocketChannel>() {
                @Override
                public void initChannel(SocketChannel ch) throws Exception {
                    // 客户端接收到的是httpResponse响应,所以要使用HttpResponseDecoder进行解码
                    ch.pipeline().addLast(new HttpResponseDecoder());
                    // 客户端发送的是httprequest,所以要使用HttpRequestEncoder进行编码
                    ch.pipeline().addLast(new HttpRequestEncoder());
                    ch.pipeline().addLast(new HttpClientInboundHandler());
                }
            });

            // Start the client.
            ChannelFuture f = b.connect(host, port).sync();

            URI uri = new URI("http://127.0.0.1:8443");
            String msg = "Are you ok?";
            DefaultFullHttpRequest request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET,
                    uri.toASCIIString(), Unpooled.wrappedBuffer(msg.getBytes("UTF-8")));

            // 构建http请求
            request.headers().set(HttpHeaders.Names.HOST, host);
            request.headers().set(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.KEEP_ALIVE);
            request.headers().set(HttpHeaders.Names.CONTENT_LENGTH, request.content().readableBytes());
            // 发送http请求
            f.channel().write(request);
            f.channel().flush();
            f.channel().closeFuture().sync();
        } finally {
            workerGroup.shutdownGracefully();
        }

    }
 
	@Test
	public void Test(){
		try{ 
			NettyClientTest client = new NettyClientTest();
	        client.connect("127.0.0.1", 8443);
		}catch(Exception e){
			e.printStackTrace();
		}
	}
}

class HttpClientInboundHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        if (msg instanceof HttpResponse) 
        {
            HttpResponse response = (HttpResponse) msg;
            System.out.println("CONTENT_TYPE:" + response.headers().get(HttpHeaders.Names.CONTENT_TYPE));
        }
        if(msg instanceof HttpContent)
        {
            HttpContent content = (HttpContent)msg;
            ByteBuf buf = content.content();
            System.out.println(buf.toString(io.netty.util.CharsetUtil.UTF_8));
            buf.release();
        }
    }
}

 

 

 

 

 

 

 

 

 

 

 

 

 

上一篇:fire库的使用


下一篇:英特尔当心 谷歌服务器芯片订单要被高通抢了