1.下载protobuf3
2.编写.proto文件
syntax="proto3";
package cn.edu.tju.nt;
option java_outer_classname = "SubscribeReqProto";
message SubscribeReq{
int32 subReqID = 1 ;
string userName = 2;
string productName = 3;
repeated string address = 4;
}
syntax="proto3";
package cn.edu.tju.nt;
option java_outer_classname = "SubscribeRespProto";
message SubscribeResp{
int32 subReqID = 1;
int32 respCode = 2;
string desc = 3;
}
3.进入.proto文件所在路径,执行命令
protoc SubscribeReq.proto --java_out=.
protoc SubscribeResp.proto --java_out=.
4.创建maven项目,并把步骤3中生成的文件夹拷贝到项目中
5.增加Netty和protobuf相关依赖
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>cn.edu.tju</groupId>
<artifactId>netty-protobuf</artifactId>
<version>1.0-SNAPSHOT</version>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>8</source>
<target>8</target>
</configuration>
</plugin>
</plugins>
</build>
<dependencies>
<!-- https://mvnrepository.com/artifact/io.netty/netty-all -->
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.0.0.Final</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.google.protobuf/protobuf-java -->
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>3.0.0</version>
</dependency>
</dependencies>
</project>
6.创建服务器端通道处理器和客户端通道处理器:
package cn.edu.tju.handler;
import cn.edu.tju.nt.SubscribeReqProto;
import cn.edu.tju.nt.SubscribeRespProto;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandler;
import io.netty.channel.ChannelInboundHandlerAdapter;
import java.util.Date;
public class ObjectServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg)
throws Exception {
SubscribeReqProto.SubscribeReq req = (SubscribeReqProto.SubscribeReq) msg;
System.out.println("Server received : " + req.toString() );
ctx.writeAndFlush(resp(req.getSubReqID()));
}
private SubscribeRespProto.SubscribeResp resp(int subReqID) {
SubscribeRespProto.SubscribeResp.Builder builder = SubscribeRespProto.SubscribeResp
.newBuilder();
builder.setSubReqID(subReqID);
builder.setRespCode(0);
builder.setDesc("Java in action");
return builder.build();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
System.out.println(cause.getMessage());
ctx.close();
}
}
package cn.edu.tju.handler;
import java.util.ArrayList;
import java.util.List;
import cn.edu.tju.nt.SubscribeReqProto;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
public class ClientHandler extends ChannelInboundHandlerAdapter
{
public ClientHandler()
{
}
@Override
public void channelActive(ChannelHandlerContext ctx)
{
for(int i=0;i<1;i++) {
ctx.write(subReq(i));
}
ctx.flush();
}
private SubscribeReqProto.SubscribeReq subReq(int i)
{
SubscribeReqProto.SubscribeReq.Builder builder=SubscribeReqProto.SubscribeReq.newBuilder();
builder.setSubReqID(i);
builder.setUserName("amadeus");
builder.setProductName("javaStudy");
List<String> address = new ArrayList<String>();
address.add("beijing");
address.add("tianjin");
address.add("shanghai");
builder.addAllAddress(address);
return builder.build();
}
@Override
public void channelRead(ChannelHandlerContext ctx,Object msg)
{
System.out.println("Client received: "+msg);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception
{
ctx.flush();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx,Throwable e)
{
System.out.println(e.getMessage());
}
}
7.创建服务器端启动类和客户端启动类:
package cn.edu.tju;
import cn.edu.tju.handler.ObjectServerHandler;
import cn.edu.tju.nt.SubscribeReqProto;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.protobuf.ProtobufDecoder;
import io.netty.handler.codec.protobuf.ProtobufEncoder;
import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender;
public class NettyServer {
private static int port=9095;
public static void main(String[] args) {
EventLoopGroup bossGroup=new NioEventLoopGroup();
EventLoopGroup workerGroup=new NioEventLoopGroup();
try{
ServerBootstrap serverBootstrap=new ServerBootstrap();
serverBootstrap.group(bossGroup,workerGroup).channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel channel) throws Exception {
channel.pipeline().addLast(new ProtobufVarint32FrameDecoder());
channel.pipeline().addLast(new ProtobufDecoder(SubscribeReqProto.SubscribeReq.getDefaultInstance()));
channel.pipeline().addLast(new ProtobufVarint32LengthFieldPrepender());
channel.pipeline().addLast(new ProtobufEncoder());
channel.pipeline().addLast(new ObjectServerHandler());
}
});
ChannelFuture channelFuture=serverBootstrap.bind(port).sync();
channelFuture.channel().closeFuture().sync();
}catch (Exception ex){
}
}
}
package cn.edu.tju;
import cn.edu.tju.handler.ClientHandler;
import cn.edu.tju.nt.SubscribeRespProto;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.protobuf.ProtobufDecoder;
import io.netty.handler.codec.protobuf.ProtobufEncoder;
import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender;
import java.net.InetAddress;
public class NettyClient {
private static int port=9095;
public static void main(String[] args) {
EventLoopGroup workerGroup=new NioEventLoopGroup();
try{
Bootstrap bootstrap=new Bootstrap();
bootstrap.group(workerGroup);
bootstrap.channel(NioSocketChannel.class);
bootstrap.handler(new ChannelInitializer<SocketChannel>(){
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(new ProtobufVarint32FrameDecoder());
socketChannel.pipeline().addLast(new ProtobufDecoder(SubscribeRespProto.SubscribeResp.getDefaultInstance()));
socketChannel.pipeline().addLast(new ProtobufVarint32LengthFieldPrepender());
socketChannel.pipeline().addLast(new ProtobufEncoder());
socketChannel.pipeline().addLast(new ClientHandler());
}
});
ChannelFuture channelFuture = bootstrap.connect("localhost", port).sync();
channelFuture.channel().closeFuture().sync();
}catch (Exception ex){
}
finally {
workerGroup.shutdownGracefully();
}
}
}
8.启动服务器端和客户端,服务器端和客户端分别输出: