用netty 实现IM聊天(一)

封装的消息体

package com.example.netty.im.common.message;

import lombok.Data;

/**
 * @Class MsgBody
 * @Description 消息体
 * @Author
 * @Date 2021/7/14
 **/
@Data
public class MsgBody {
    //发送人名称
    private String sendUserName;
    private String msg;
}

NettyServerHandler

package com.example.netty.im.common.handler;

import com.alibaba.fastjson.JSONObject;
import com.example.netty.im.common.message.MsgBody;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelId;
import io.netty.channel.SimpleChannelInboundHandler;

import java.io.UnsupportedEncodingException;
import java.util.HashMap;
import java.util.Map;

/**
 * @Class NettyServerHandler
 * 问题1:服务端为何没有转发到目的客户端,而是只返回给发送消息的客户端,
 * 原因是服务端回复消息时,将消息转发给了自己,没有转发给其他客户端
 * @Author
 * @Date 2021/7/14
 **/
public class NettyServerHandler extends SimpleChannelInboundHandler {

   // 保存id和容器的关系
    private static Map<String, ChannelHandlerContext> map = new HashMap<>();

    @Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext, Object o) throws Exception {
        Channel channel = channelHandlerContext.channel();
        ChannelId channelId = channel.id();
        map.put(channelId.toString(), channelHandlerContext);

        ByteBuf byteBuf = (ByteBuf) o;
        String rev = getMessage(byteBuf);
        MsgBody msgBody = JSONObject.parseObject(rev, MsgBody.class);
        String format = String.format("服务器接收到客户端消息,发送人:%s, 发送消息:%s .", msgBody.getSendUserName(), msgBody.getMsg());
        System.out.println(format);

        map.forEach((k, v) -> {
            try {
                // 出现问题1的原因:遇到不是自身的客户端过滤掉了,但实际上返回消息是需要自身的
                if (channelId.toString().equals(k)) {
                    return;
                }

                MsgBody sendMsgBody = new MsgBody();
                sendMsgBody.setSendUserName(msgBody.getSendUserName());
                sendMsgBody.setMsg(msgBody.getMsg());
                v.writeAndFlush(getSendByteBuf(JSONObject.toJSONString(sendMsgBody)));
                System.out.println("服务端回复消息: " + JSONObject.toJSONString(sendMsgBody));
            } catch (Exception e) {
                e.printStackTrace();
            }
        });
    }

    /**
     * 从ByteBuf 中获取信息,使用UTF-8编码返回
     *
     * @param buf
     * @return
     */
    private String getMessage(ByteBuf buf) {
        byte[] con = new byte[buf.readableBytes()];
        buf.readBytes(con);

        try {
            return new String(con, "UTF8");
        } catch (UnsupportedEncodingException e) {
            e.printStackTrace();
            return null;
        }
    }

    private ByteBuf getSendByteBuf(String message) {
        byte[] req = message.getBytes();
        ByteBuf pingMessage = Unpooled.buffer();
        pingMessage.writeBytes(req);

        return pingMessage;
    }
}

NettyClientHandler

package com.example.netty.im.common.handler;

//import cn.hutool.json.JSONObject;

import com.alibaba.fastjson.JSONObject;
import com.example.netty.im.common.message.MsgBody;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;

import java.io.UnsupportedEncodingException;

/**
 * @Class NettyClientHandler
 * @Description TODO
 * @Author
 * @Date 2021/7/16
 **/
public class NettyClientHandler extends SimpleChannelInboundHandler {

    private ByteBuf firstMessage;
    private ChannelHandlerContext ctx;
    private String userName;

    public String getUserName() {
        return userName;
    }

    public void setUserName(String userName) {
        this.userName = userName;
    }

    public void sendMsg(String str) {
        byte[] data = str.getBytes();
        firstMessage = Unpooled.buffer();
        firstMessage.writeBytes(data);
        ctx.writeAndFlush(firstMessage);
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        this.ctx = ctx;
        MsgBody msgBody = new MsgBody();
        msgBody.setSendUserName(userName);
        msgBody.setMsg("进入聊天室");
        byte[] data = JSONObject.toJSONString(msgBody).getBytes();
        firstMessage = Unpooled.buffer();
        firstMessage.writeBytes(data);
        ctx.writeAndFlush(firstMessage);
    }

    @Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext, Object o) throws Exception {
        ByteBuf buf = (ByteBuf) o;
        String rev = getMessage(buf);
        MsgBody msgBody = JSONObject.parseObject(rev, MsgBody.class);
        String format = String.format("客户端收到服务端的消息,发送人:%s , 发送消息:%s .", msgBody.getSendUserName(), msgBody.getMsg());
        System.out.println(format);
    }

    private String getMessage(ByteBuf buf) {
        byte[] conn = new byte[buf.readableBytes()];
        buf.readBytes(conn);
        try {
            return new String(conn, "UTF8");
        } catch (UnsupportedEncodingException e) {
            e.printStackTrace();
            return null;
        }
    }
}

NettyServer

package com.example.netty.im.server;

import com.example.netty.im.common.handler.NettyServerHandler;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;

/**
 * @Class NettyServer
 * @Description netty 的服务端
 * @Author
 * @Date 2021/7/14
 **/
public class NettyServer {
    private int port;

    public NettyServer(int port) {
        this.port = port;
        bind();
    }

    private void bind() {
        EventLoopGroup parentEventLoopGroup = new NioEventLoopGroup();
        EventLoopGroup childEventLoopGroup = new NioEventLoopGroup();

        try {
            // 服务端引导类
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.group(parentEventLoopGroup, childEventLoopGroup)
                    .channel(NioServerSocketChannel.class)
                    .option(ChannelOption.SO_BACKLOG, 1024)     // 连接数
                    .option(ChannelOption.TCP_NODELAY, true)    // 不延迟,消息立即发送
                    .childOption(ChannelOption.SO_KEEPALIVE, true)   // 长连接
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            ChannelPipeline channelPipeline = socketChannel.pipeline();
                            NettyServerHandler serverHandler = new NettyServerHandler();
                            // 添加NettyServerHandler,用来处理
                            channelPipeline.addLast(serverHandler);
                        }
                    });

            ChannelFuture channelFuture = serverBootstrap.bind(port).sync();

            if (channelFuture.isSuccess()) {
                System.out.println(" 启动Netty 服务成功,端口号:" + this.port);
            }
            // 关闭连接
            channelFuture.channel().closeFuture().sync();
        } catch (Exception e) {
            System.out.println("启动Netty服务异常,异常信息: " + e.getMessage());
            e.printStackTrace();
        } finally {
            if (parentEventLoopGroup != null) {
                parentEventLoopGroup.shutdownGracefully();
            }
            if (childEventLoopGroup != null) {
                childEventLoopGroup.shutdownGracefully();
            }
        }
    }

    public static void main(String[] args) {
        final int port = 10086;
        new NettyServer(port);
    }
}

NettyClient

package com.example.netty.im.client;

import com.alibaba.fastjson.JSONObject;
import com.example.netty.im.common.handler.NettyClientHandler;
import com.example.netty.im.common.message.MsgBody;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;

import java.util.Scanner;

/**
 * @Class NettyClient
 * @Description TODO
 * @Author
 * @Date 2021/7/16
 **/
public class NettyClient {

    private NettyClientHandler nettyClientHandler;

    private int port;
    private String host;
    private String sendUserName;

    public NettyClient(int port, String host, String sendUserName) throws InterruptedException {
        this.port = port;
        this.host = host;
        this.sendUserName = sendUserName;
        start();
    }

    private void start() throws InterruptedException {
        EventLoopGroup eventLoopGroup = new NioEventLoopGroup();

        try {
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.channel(NioSocketChannel.class)
                    .option(ChannelOption.SO_KEEPALIVE, true)
                    .group(eventLoopGroup)
                    .remoteAddress(host, port)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel)
                                throws Exception {
                            nettyClientHandler = new NettyClientHandler();
                            nettyClientHandler.setUserName(sendUserName);
                            socketChannel.pipeline().addLast(nettyClientHandler);
                        }
                    });
            ChannelFuture channelFuture = bootstrap.connect(host, port).sync();
            if (channelFuture.isSuccess()) {
                new Thread(new Runnable() {
                    @Override
                    public void run() {
                        Scanner sc = new Scanner(System.in);
                        while (sc.hasNext()) {

                            MsgBody msgBody = new MsgBody();
                            msgBody.setSendUserName(sendUserName);
                            msgBody.setMsg(sc.next());
                            nettyClientHandler.sendMsg(JSONObject.toJSONString(msgBody));
                        }
                    }
                }).start();
                System.err.println(sendUserName + "连接服务器成功");
            }
            channelFuture.channel().closeFuture().sync();
        } finally {
            eventLoopGroup.shutdownGracefully();
        }
    }
}

三个模拟客户端

package com.example.netty.im.client;

/**
 * @Class MockClient01
 * @Description TODO
 * @Author
 * @Date 2021/7/16
 **/
public class MockClient01 {
    public static void main(String[] args) {
        try {
            new NettyClient(10086, "localhost", "Tom");
        }catch (InterruptedException e){
            e.printStackTrace();
        }
    }
}

package com.example.netty.im.client;

/**
 * @Class MockClient02
 * @Description TODO
 * @Author
 * @Date 2021/7/16
 **/
public class MockClient02 {
    public static void main(String[] args) {
        try {
            new NettyClient(10086, "localhost", "Jerry");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

package com.example.netty.im.client;

/**
 * @Class MockClient03
 * @Description TODO
 * @Author
 * @Date 2021/7/16
 **/
public class MockClient03 {
    public static void main(String[] args) {
        try {
            new NettyClient(10086, "localhost", "Jack");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

启动服务端和3个模拟客户端
用netty 实现IM聊天(一)

参考链接:https://www.cnblogs.com/yeyongjian/p/12824955.html

上一篇:阿里技术分享:闲鱼IM基于Flutter的移动端跨端改造实践


下一篇:im即时通讯哪家好一点?