客户端发送消息并同步获取结果,其实是违背Netty的设计原则的,但是有时候不得不这么做的话,那么建议进行如下的设计:
比如我们的具体用法如下:
NettyRequest request = new NettyRequest();
request.setRequestId(UUID.randomUUID().toString());
request.setClassName(method.getDeclaringClass().getName());
request.setMethodName(method.getName());
request.setParameterTypes(method.getParameterTypes());
request.setParameterValues(args);
NettyMessage nettyMessage = new NettyMessage();
nettyMessage.setType(MessageType.SERVICE_REQ.value());
nettyMessage.setBody(request);
if (serviceDiscovery != null) {
serverAddress = serviceDiscovery.discover();
}
String[] array = serverAddress.split(":");
String host = array[0];
int port = Integer.parseInt(array[1]);
NettyClient client = new NettyClient(host, port);
NettyMessage nettyResponse = client.send(nettyMessage);
if (nettyResponse != null) {
return JSON.toJSONString(nettyResponse.getBody());
} else {
return null;
}
先来看看NettyClient的写法 和 send方法的写法:
public class NettyClient {
/**
* 日志记录
*/
private static final Logger logger = LoggerFactory.getLogger(NettyClient.class);
/**
* 客户端业务处理handler
*/
private ClientHandler clientHandler = new ClientHandler();
/**
* 事件池
*/
private EventLoopGroup group = new NioEventLoopGroup();
/**
* 启动器
*/
private Bootstrap bootstrap = new Bootstrap();
/**
* 客户端通道
*/
private Channel clientChannel;
/**
* 客户端连接
* @param host
* @param port
* @throws InterruptedException
*/
public NettyClient(String host, int port) throws InterruptedException {
bootstrap.group(group)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel channel) throws Exception {
channel.pipeline().addLast("idleStateHandler", new IdleStateHandler(5, 5, 12));
channel.pipeline().addLast("nettyMessageDecoder", new NettyMessageDecoder(1024 * 1024, 4, 4));
channel.pipeline().addLast("nettyMessageEncoder", new NettyMessageEncoder());
channel.pipeline().addLast("heartBeatHandler", new HeartBeatRequestHandler());
channel.pipeline().addLast("clientHandler", clientHandler);
channel.pipeline().addLast("loginAuthHandler", new LoginAuthRequestHandler());
}
});
//发起同步连接操作
ChannelFuture channelFuture = bootstrap.connect(host, port);
//注册连接事件
channelFuture.addListener((ChannelFutureListener)future -> {
//如果连接成功
if (future.isSuccess()) {
logger.info("客户端[" + channelFuture.channel().localAddress().toString() + "]已连接...");
clientChannel = channelFuture.channel();
}
//如果连接失败,尝试重新连接
else{
logger.info("客户端[" + channelFuture.channel().localAddress().toString() + "]连接失败,重新连接中...");
future.channel().close();
bootstrap.connect(host, port);
}
});
//注册关闭事件
channelFuture.channel().closeFuture().addListener(cfl -> {
close();
logger.info("客户端[" + channelFuture.channel().localAddress().toString() + "]已断开...");
});
}
/**
* 客户端关闭
*/
private void close() {
//关闭客户端套接字
if(clientChannel!=null){
clientChannel.close();
}
//关闭客户端线程组
if (group != null) {
group.shutdownGracefully();
}
}
/**
* 客户端发送消息
* @param message
* @return
* @throws InterruptedException
* @throws ExecutionException
*/
public NettyMessage send(NettyMessage message) throws InterruptedException, ExecutionException {
ChannelPromise promise = clientHandler.sendMessage(message);
promise.await(3, TimeUnit.SECONDS);
return clientHandler.getResponse();
}
}
可以看出,我们使用了clientHandler来进行消息发送行为,通过promise阻塞来同步获取返回结果,接下来看看sendMessage的写法:
public class ClientHandler extends ChannelInboundHandlerAdapter {
private static final Logger logger = LoggerFactory.getLogger(ClientHandler.class);
private ChannelHandlerContext ctx;
private ChannelPromise promise;
private NettyMessage response;
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
super.channelActive(ctx);
this.ctx = ctx;
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
NettyMessage message = (NettyMessage) msg;
if (message != null && message.getType() == MessageType.SERVICE_RESP.value()) {
response = message;
promise.setSuccess();
} else {
ctx.fireChannelRead(msg);
}
}
public synchronized ChannelPromise sendMessage(Object message) {
while (ctx == null) {
try {
TimeUnit.MILLISECONDS.sleep(1);
//logger.error("等待ChannelHandlerContext实例化");
} catch (InterruptedException e) {
logger.error("等待ChannelHandlerContext实例化过程中出错",e);
}
}
promise = ctx.newPromise();
ctx.writeAndFlush(message);
return promise;
}
public NettyMessage getResponse(){
return response;
}
}
可以看到,在利用ChannelHanderContext进行发送消息前,我们先创建了一个promise并返回给send方法,那么send方法此时就会阻塞等待;当我们收到服务端消息后,promise.setSuccess就会解除send方法的等待行为,这样我们就能获取结果了。
此法针对真正需要同步等待获取结果的场景,如非必要,还是建议利用future来改造。
benchmark测试表明,此种同步获取结果的行为,表现挺稳定的,但是ops 在 150 左右, 真是性能太差了。高性能场合禁用此法。