Spring Boot 下 Kafka和Websocket通信


{
"Spring Boot 版本":"2.5.2",
""
}

0.(pom.xml)配置文件添加依赖项



<!-- 我是没有引用这个{javaee-api},好像spring-boot已经包含了类似这样的基础类库... -->
<dependency>
	<groupId>javax</groupId>
	<artifactId>javaee-api</artifactId>
	<version>7.0</version>
	<scope>provided</scope>
</dependency>


<!-- 我的这个案例中,这个是必选依赖项(基于其它的插件也是可以实现,不在本文讨论范围内),springboot的高级组件会自动引用基础的组件,像spring-boot-starter-websocket就引入了spring-boot-starter-web和spring-boot-starter -->
<dependency>
	<groupId>org.springframework.boot</groupId>
	<artifactId>spring-boot-starter-websocket</artifactId>
	<version>1.3.5.RELEASE</version>
</dependency>


依赖项这里我搞了一上午才搞定,各种问题,各种依赖关系冲突,
比如:明明在pom.xml中添加了对应的包{
spring-boot-starter-websocket},并且刷新了Maven,
结果在程序中就是访问不到对应的类{ServerEndpointExporter}

1.注入ServerEndpointExporter到容器中,为了后续使用注解{@ServerEndpoint}做准备

本案例中首先要注入ServerEndpointExporter,这个bean会自动注册使用了@ServerEndpoint注解声明的Websocket endpoint.
注意,如果使用独立的servlet容器,而不是直接使用springboot的内置容器,就不要注入ServerEndpointExporter,因为它将由容器自己提供和管理

//文件 - "WebSocketConfigOne.java"
@Configuration
public class WebSocketConfigOne {
    /*
     * 这个bean会自动注册使用了@ServerEndpoint注解声明的对象
     * 没有的话会报404
     * @return
     */
    @Bean
    public ServerEndpointExporter serverEndpointExporter() {
        return new ServerEndpointExporter();
    }
}

2.WebSocket具体实现类

前端需要建立连接的webSocket实现

  • 连接.建立
  • 连接.关闭
  • 服务端向客户端.发送消息
  • 客户端向服务端.发送消息

//文件 - "MyWebSocket.java"
@Component
//此处定义接口的uri
@ServerEndpoint("/wbSocket")
public class MyWebSocket {
    private Session session;
    public static CopyOnWriteArraySet<MyWebSocket> myWebSockets = new CopyOnWriteArraySet<MyWebSocket>(); //此处定义静态变量,以在其他方法中获取到所有连接

    /**
     * 建立连接。
     * 建立连接时入参为session
     */
    @OnOpen
    public void onOpen(Session session){
        this.session = session;
        myWebSockets.add(this); //将此对象存入集合中以在之后广播用,如果要实现一对一订阅,则类型对应为Map。由于这里广播就可以了随意用Set
        System.out.println("New session insert,sessionId is "+ session.getId());
    }
    /**
     * 关闭连接
     */
    @OnClose
    public void onClose(){
        myWebSockets.remove(this);//将socket对象从集合中移除,以便广播时不发送次连接。如果不移除会报错(需要测试)
        System.out.println("A session insert,sessionId is "+ session.getId());
    }
    /**
     * 接收前端传过来的数据。
     * 虽然在实现推送逻辑中并不需要接收前端数据,但是作为一个webSocket的教程或叫备忘,还是将接收数据的逻辑加上了。
     */
    @OnMessage
    public void onMessage(String message ,Session session){
        System.out.println(message + "from " + session.getId());
    }

    public void sendMessage(String message) throws IOException {
        this.session.getBasicRemote().sendText(message);
    }
}

3.Kafka消费者实现类

本案例中,Kafka消费的消息会直接通过和前端WebSocket的连接显示到前端


//文件 - "ConsumerKafka.java"
/*
Kafka
 */
public class ConsumerKafka extends Thread {

    private KafkaConsumer<String,String> consumer;
    //Kafka.主题
    private String topic = "topid_kafka";

    public ConsumerKafka(){
    }

    /*
    启动
     */
    @Override
    public void run(){
        //加载kafka消费者参数
        Properties props = new Properties();
        props.put("bootstrap.servers", "127.0.0.1:9092");
        props.put("group.id", "groupId_kafka");
        props.put("enable.auto.commit", "true");
        props.put("auto.commit.interval.ms", "1000");
        props.put("session.timeout.ms", "15000");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        //创建消费者对象
        consumer = new KafkaConsumer<String,String>(props);
        consumer.subscribe(Arrays.asList(this.topic));
        //死循环,持续消费kafka
        while (true){
            try {
                //消费数据,并设置超时时间
                ConsumerRecords<String, String> records = consumer.poll(100);
                //Consumer message
                for (ConsumerRecord<String, String> record : records) {
                    //注意,变量{myWebSockets}存在于类文件{MyWebSocket.cs}中
                    for (MyWebSocket myWebSocket :myWebSockets){
                        myWebSocket.sendMessage(record.value());
                    }
                }
            }catch (IOException e){
                System.out.println(e.getMessage());
                continue;
            }
        }
    }

    /*
    关闭
     */
    public void close() {
        try {
            consumer.close();
        } catch (Exception e) {
            System.out.println(e.getMessage());
        }
    }
}

4.main函数修改

将Kafka消费者实例化


//文件 - "Application.java"
@SpringBootApplication
public class Application {
    public static void main(String[] args) {
        //将Kafka消费者实例化,运行起来....
        ConsumerKafka consumerKafka = new ConsumerKafka();
        consumerKafka.start();
        SpringApplication.run(Application.class, args);
    }
}

5.前端WebSocket代码


<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <title>WebSocket client</title>
    <script type="text/javascript">
        var socket;
        if (typeof (WebSocket) == "undefined"){
			setMessageInnerHTML("This explorer don't support WebSocket");
        }

        function connect() {
            //Connect WebSocket server
            socket =new WebSocket("ws://127.0.0.1:8080/wbSocket");
			
            //open
            socket.onopen = function () {
                setMessageInnerHTML("open");
            }
            //Get message
            socket.onmessage = function (msg) {
				var data = JSON.parse(msg.data)
				setMessageInnerHTML("服务端消息:" + data);
            }
            //close
            socket.onclose = function () {				
				setMessageInnerHTML("连接已关闭...");
            }
            //error
            socket.onerror = function (e) {
				setMessageInnerHTML("错误:"+JSON.stringify(e));
            }
        }

        function close() {
			setMessageInnerHTML("关闭连接...");
            socket.close();
        }

        function sendMsg() {
			var clientMsg = document.getElementById('id_clientMsg').value;
            socket.send("This is a client message :" + clientMsg);
        }
		
		//将消息显示在网页上
		function setMessageInnerHTML(innerHTML){
			document.getElementById('id_msgRecord').innerHTML += innerHTML + '<br/>';
		}
    </script>
</head>
<body>
	<!-- 连接 -->
    <button onclick="connect()">connect</button>
	<!-- 关闭连接 -->
    <button onclick="close()">close</button>
	<!-- 客户端消息发送 -->
    <button onclick="sendMsg()">sendMsg</button>
	
	<!-- 客户端消息 -->
	<input id="id_clientMsg" type="text" />
	
	<!-- 消息记录 -->
	<div id="id_msgRecord">
	</div>
</body>
</html>

总结

坑集合

  • [x] 坑1 -- pom.xml中各种依赖的关系,搞得有些依赖明明已经写入了pom.xml中,但是依然使用不了 ...
  • [X] 坑2 -- 刚开始WebSocket一度连接不上,后来添加上文件{WebSocketConfigOne.java}就可以了
  • [X] 坑3 -- Kafka消费者刚开始一直忘了实例化...

其它

扩展

  • WebSocket和Socket什么关系?
  • HTTP和WebSocket什么关系?
上一篇:nginx 反向代理 ws 同时兼容 http


下一篇:rabbitmq websocket stomp 错误payload=Access refused for user 'guest'