安聊服务端Netty的应用

Netty简介

Netty是一个面向网络编程的Java基础框架,它基于异步的事件驱动,并且内置多种网络协议的支持,可以快速地开发可维护的高性能的面向协议的服务器和客户端。

 

安聊简介

安聊是一个即时聊天系统,服务端通过点对点与客户端建立TCP链接,接受来自客户端的请求,同时,也可以实时地将消息通知给客户端。

 

安聊为什么选择Netty

首先是性能和稳定性。在我们内部团队进行过测试,使用Netty框架,单台服务器可以维持10000个客户端长链接,并且稳定性非常高:我们的服务器曾经有过连续六个月稳定运行的记录,并且中断的原因还是因为服务端版本升级。

其次是应用程序的简洁性和易维护性。使用Netty进行网络开发,可以利用框架屏蔽那些网络底层的实现细节,让应用只关注于业务逻辑本身;同时因为pipeline的设计模式,让应用添加对数据/事件的额外处理变得非常简单。

 

安聊使用Netty的一些技术特点:

1、结合Spring,让端口侦听服务成为一个Bean,结合Bean的生命周期挂钩函数完成端口服务的安装/关闭行为

2、将终端长连接的ChannelHandleContext与对应的用户ID进行绑定,方便消息转发

3、使用自定义的编码/解码器对协议包进行处理

4、通过继承SimpleChannelInboundHandler的类,来处理客户端请求的协议包

5、因为处理客户端包的业务过程中,会涉及到数据库操作,磁盘读写操作,若直接在网络IO线程中处理,则会显著降低网络IO的处理能力,所以把每个业务处理都独立成为一个任务(Task)实例,然后放到线程池中去执行;当任务执行完毕,需要通知回网络IO线程时,使用userEvent的形式通知回去

 

一些关键代码:

 

网络服务初始化

public class IMClientServerInitializer extends ChannelInitializer<SocketChannel> {

    private final EventExecutorGroup execGroup;
    private final int pduTimeout;

    public IMClientServerInitializer(EventExecutorGroup execGroup, int pduTimeout) {
        this.execGroup = execGroup;
        this.pduTimeout = pduTimeout;
    }

    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();
        pipeline.addLast(new IdleStateHandler(this.pduTimeout, 0, 0, TimeUnit.SECONDS));
        //pipeline.addLast(new LoggingHandler(LogLevel.DEBUG));
        pipeline.addLast(new IMPacketEncoder());
        pipeline.addLast(new IMPacketDecoder());
        pipeline.addLast(new IMClientPacketHandler(execGroup));
    }
}

 

网络服务类

public class IMClientListenService implements InitializingBean, DisposableBean {

    private static final Logger logger = LoggerFactory.getLogger(IMClientListenService.class);

    @Value("${imcore.client.service.listen}")
    private String clientServiceListenAddress;

    @Value("${imcore.client.service.port}")
    private Integer clientServiceListenPort;

    @Value("${imcore.client.service.pdu.timeout}")
    private Integer clientServicePduTimeout;


    private EventLoopGroup bossEventLoopGroup;
    private EventLoopGroup childEventLoopGroup;
    private EventExecutorGroup eventExecutorGroup;
    private ServerBootstrap serverBootstrap;
    private Channel listenChannel;
    private final IMClientChannelManager imClientChannelManager = new IMClientChannelManager();

    public IMClientChannelManager getImClientChannelManager() {
        return imClientChannelManager;
    }


    @Override
    public void afterPropertiesSet() {
        eventExecutorGroup = new DefaultEventExecutorGroup(128);
        bossEventLoopGroup = new NioEventLoopGroup();
        childEventLoopGroup = new NioEventLoopGroup();
        serverBootstrap = new ServerBootstrap();
        serverBootstrap.group(bossEventLoopGroup, childEventLoopGroup);
        serverBootstrap.channel(NioServerSocketChannel.class);
        serverBootstrap.option(ChannelOption.SO_BACKLOG, 128);
        serverBootstrap.childHandler(new IMClientServerInitializer(eventExecutorGroup, clientServicePduTimeout));
    }

    public void destroy() {
        logger.info("IMClientListenService destroy called");
    }

    public void run() {
        try {
            listenChannel = serverBootstrap.bind(clientServiceListenPort).sync().channel();
            logger.info("Client Listen Service started at port: " + clientServiceListenPort);
            listenChannel.closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    public void shutdown() {
        try {
            //
            // 先关闭侦听端口
            //
            logger.info("close listen channel...");
            ChannelFuture closeFuture = listenChannel.close();
            closeFuture.sync();
            logger.info("close listen channel...done!");

            //
            // 再关闭所有客户端的连接
            //
            List<ChannelHandlerContext> channels = imClientChannelManager.getAllChannels();
            for (ChannelHandlerContext channel : channels) {
                channel.close().sync();
            }

            logger.info("Client Listen Service stopped");
        }
        catch (Exception ex) {
            logger.error("Client Listen Service close failed", ex);
        }
        finally {
            //
            // 最后关闭所有线程池
            //
            childEventLoopGroup.shutdownGracefully();
            bossEventLoopGroup.shutdownGracefully();
            eventExecutorGroup.shutdownGracefully();
        }
    }
}

 

网络数据包处理Handler类

public class IMClientPacketHandler extends SimpleChannelInboundHandler<IMPacket> {

    private static final int NEED_LOGIN_FLAG = 1;

    private static final Logger logger = LoggerFactory.getLogger(IMClientPacketHandler.class);

    private final EventExecutorGroup eventExecutor;
    private final IMClientChannelManager imClientChannelManager;
    private final IMClientListenService imClientListenService;

    private int userId;
    private String loginName;
    ...


    public IMClientPacketHandler(EventExecutorGroup eventExecutor) {
        this.eventExecutor = eventExecutor;
        this.imClientListenService = Application.getInstance().getBean(IMClientListenService.class);
        this.imClientChannelManager = imClientListenService.getImClientChannelManager();
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        logger.error("ImPacketHandler exception caught, closing...");
        if (cause != null) {
            logger.error(cause.getMessage(), cause);
        } else {
            logger.error("exception object is null");
        }
        ctx.close();

        this.imClientChannelManager.removeChannel(ctx);
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        super.channelActive(ctx);
        InetSocketAddress socketAddress = (InetSocketAddress)(ctx.channel().remoteAddress());
        this.clientIP = socketAddress.getAddress().getHostAddress();

        this.imClientChannelManager.addChannel(ctx);
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        this.imClientChannelManager.removeChannel(ctx);

        if (this.userId != 0 && this.terminal != 0) {
            this.imClientChannelManager.removeLoginChannel(ctx, this.userId, this.terminal);
        }

        super.channelInactive(ctx);
        this.userId = 0;
        this.loginName = null;
        ...
    }

    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        if (logger.isDebugEnabled()) {
            logger.debug("userEventTriggered:" + evt);
        }
        if (evt instanceof IdleStateEvent) {
            // 如果是长时间没有write事件,则尝试去从队列里拿出通知来发送
            IdleStateEvent e = (IdleStateEvent) evt;
            if (e.state() == IdleState.READER_IDLE) {
                // close the channel
                ctx.close();
                return;
            }
        } else if (evt instanceof IMUserLoginEvent) {
            logger.info("user login event thread id " + Thread.currentThread().getId());
            IMUserLoginEvent loginEvent = (IMUserLoginEvent)evt;
            if (loginEvent.isSucceed()) {
                this.userId = loginEvent.getUserInfo().getUserId();
                this.loginName = loginEvent.getUserInfo().getLoginName();
                ...
                if (logger.isDebugEnabled()) {
                    logger.debug("set login succeed in channel handler for user ‘" + this.loginName + "‘ with session ‘" + this.sessionKey + "‘");
                }

                this.imClientChannelManager.addLoginChannel(ctx, this.userId, this.terminal);
            }
        }

        super.userEventTriggered(ctx, evt);
    }

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, IMPacket msg) throws Exception {
        if (this.loginName == null && this.userId == 0) {
            if (msg.getCommandId() == IMBaseDefine.LoginCmdID.CID_USER_LOGIN_REQ_VALUE) {
                handleUserLoginRequest(ctx, msg);
            }
            else {
                // 如果未登录的情况下发送其他所有非登录命令,一律返回flag为1的响应头
                IMPacket packet = new IMPacket(NEED_LOGIN_FLAG, null);
                ctx.channel().write(packet).addListener(ChannelFutureListener.CLOSE);
            }
        }
        else {
            if (msg.getCommandId() == IMBaseDefine.LoginCmdID.CID_USER_LOGIN_REQ_VALUE) {
                // 如果已登录的情况下发送登录命令,则返回 ERR_ALREADY_LOGGIN_VALUE 登录错误
                IMLogin.IMLoginRsp.Builder loginRespBuilder = IMLogin.IMLoginRsp.newBuilder()
                        .setErrorCode(IMBaseDefine.CommonErrors.ERR_DUPLICATE_LOGIN_VALUE)
                        .setErrorMsg("duplicate login"));

                IMPacket packet = new IMPacket(loginRespBuilder.build().toByteArray());
                ctx.channel().write(packet).addListener(ChannelFutureListener.CLOSE);
            }
            else {
                switch (msg.getCommandId()) {
                    case IMBaseDefine.OtherCmdID.CID_OTHER_HEARTBEAT_VALUE:
                        handleHeartbeatRequest(ctx, msg);
                        break;
                    case IMBaseDefine.LoginCmdID.CID_USER_LOGOUT_REQ_VALUE:
                        handleUserLogoutRequest(ctx, msg);
                        break;
                    // 创建群聊
                    case IMBaseDefine.GroupCmdID.CID_GROUP_CREATE_REQ_VALUE:
                        handleGroupCreateRequest(ctx, msg);
                        break;
                    ...
                    default:
                        logger.warn("unsupported command: " + msg.getCommandId());
                        break;
                }
            }
        }
    }


    private void handleUserLoginRequest(ChannelHandlerContext ctx, IMPacket msg) {
        if (logger.isDebugEnabled()) {
            logger.debug("user login received thread id " + Thread.currentThread().getId());
        }

        TaskContext taskContext = new TaskContext(ctx, clientIP, userId, loginName, clientType, sessionKey, pushToken);
        try {
            UserLoginTask userLoginTask = new UserLoginTask(taskContext, msg);
            eventExecutor.submit(userLoginTask);
        } catch (CreateTaskException ex) {
            logger.error("create user login task failed", ex);
        }
    }

    private void handleHeartbeatRequest(ChannelHandlerContext ctx, IMPacket msg) {
        if (logger.isDebugEnabled()) {
            logger.debug("heartbeat received");
        }

        TaskContext taskContext = new TaskContext(ctx, clientIP, userId, loginName, clientType, sessionKey, pushToken);
        try {
            HeartbeatTask heartbeatTask = new HeartbeatTask(taskContext, msg);
            eventExecutor.submit(heartbeatTask);
        } catch (CreateTaskException ex) {
            logger.error("create heartbeat task failed", ex);
        }
    }


    private void handleUserLogoutRequest(ChannelHandlerContext ctx, IMPacket msg) {
        if (logger.isDebugEnabled()) {
            logger.debug("session logout received");
        }

        TaskContext taskContext = new TaskContext(ctx, clientIP, userId, loginName, clientType, sessionKey, pushToken);
        try {
            UserLogoutTask userLogoutTask = new UserLogoutTask(taskContext, msg);
            eventExecutor.submit(userLogoutTask);
        } catch (CreateTaskException ex) {
            logger.error("create logout task failed", ex);
        }
    }

    private void handleGroupCreateRequest(ChannelHandlerContext ctx, IMPacket msg) {
        if (logger.isDebugEnabled()) {
            logger.debug("group creation received");
        }

        TaskContext taskContext = new TaskContext(ctx, clientIP, userId, loginName, clientType, sessionKey, pushToken);
        try {
            GroupCreationTask groupCreationTask = new GroupCreationTask(taskContext, msg);
            eventExecutor.submit(groupCreationTask);
        } catch (CreateTaskException ex) {
            logger.error("create group creation task failed", ex);
        }
    }

}

 

用户登录Task类

/**
 * Task executed in thread pool for user login
 */
public class UserLoginTask extends TaskBase {

    private static final Logger logger = LoggerFactory.getLogger(UserLoginTask.class);

    private final IMPacket request;
    private final IMLogin.IMLoginReq reqBody;
    private final IMUserService userService;

    private int errorCode;
    private String errorMessage;

    public UserLoginTask(TaskContext taskContext, IMPacket request) throws CreateTaskException {
        super(taskContext);

        this.request = request;
        try {
            this.reqBody = IMLogin.IMLoginReq.parseFrom(request.getPayload());
        } catch (InvalidProtocolBufferException e) {
            throw new CreateTaskException("parse pb failed", e);
        }
        this.userService = Application.getInstance().getBean(IMUserService.class);
    }


    @Override
    protected void taskRun() {
        this.errorCode = 0;
        this.errorMessage = "ok";


        IMUserRecord user = userService.findUserByLoginName(reqBody.getUserName());
        if (user == null) {
            this.errorCode = IMBaseDefine.CommonErrors.ERR_USERNAME_OR_PASSWD_INVALID_VALUE;
            this.errorMessage = "bad username or password";
            handleErrorResponse();
            return;
        }


        if (!user.getPassword().equals(reqBody.getPassword())) {
            this.errorCode = IMBaseDefine.CommonErrors.ERR_USERNAME_OR_PASSWD_INVALID_VALUE;
            this.errorMessage = "bad username or password";

            handleErrorResponse();
            return;
        }

            
        handleSucceedResponse(user);
        return;
    }


    private void handleErrorResponse() {
        logger.error("user ‘" + reqBody.getUserName() + "‘ login failed with code " + this.errorCode + " ‘" + this.errorMessage + "‘");

        IMLogin.IMLoginRsp.Builder loginResp = IMLogin.IMLoginRsp.newBuilder()
                .setErrorCode(this.errorCode)
                .setErrorMsg(this.errorMessage);
        IMPacket packetResp = new IMPacket(loginResp.build().toByteArray());

        getContext().getChannelContext().writeAndFlush(packetResp);
    }


    private void handleSucceedResponse(IMUserRecord user) {
        IMLogin.IMLoginRsp.Builder loginResp = IMLogin.IMLoginRsp.newBuilder()
                .setErrorCode(0)
                .setErrorMsg("succeed")
                .setUserInfo(IMUserProtobufUtils.toProtobuf(user));


        IMPacket packetResp = new IMPacket(loginResp.build().toByteArray());

        getContext().getChannelContext().writeAndFlush(packetResp);

        // ??触发 user event 通知 IO 线程:我们可以异步的改变相关 pipeline 的状态
        IMUserLoginEvent event = new IMUserLoginEvent();
        event.setSucceed(true);
        event.setUserInfo(user);
        getContext().getChannelContext().pipeline().fireUserEventTriggered(event);
    }
}

 

踩过的一个坑备注一下:

之前使用ChannelHandlerContext向客户端写数据的时候,都是这样子的:

getContext().getChannelContext().write(packetResp);

写完之后,发现有概率性的客户端收不到响应包,原来是写完数据,还需要flush一下:

getContext().getChannelContext().writeAndFlush(packetResp);

这样子就没有问题了。

 

-------------------------------------------------

本人在企业做过五年的即时聊天系统开发,关注这一块开发的同学,可以一起探讨。

另外,本人独自开发了一套安聊系统,感兴趣的同学可以去下载demo试用一下:安聊系统1.0发布

安聊服务端Netty的应用

上一篇:Web 趋势榜: 上周不可错过的最热门的 10 大 Web 项目 - 又增加了那么多的好项目啊 - 210611


下一篇:C语言 内存分配 地址 指针 数组 参数 实例解析