Spring Boot中使用WebSocket几种实现方式-详解

简介

所谓WebSocket, 类似于Socket,它的作用是可以让Web应用中的客户端和服务端建立全双工通信。在基于Spring的应用中使用WebSocket一般可以有以下三种方式:

  1. 使用Java提供的@ServerEndpoint注解实现
  2. 使用Spring提供的低层级WebSocket API实现
  3. 使用STOMP消息实现

下面,我将对这三种实现方式做一个简单介绍,此外有关WebSocket性质的更多介绍可以参考以下这篇文章:WebSocket探秘

注:本篇文章的完整源码可以参考:https://github.com/zifangsky/WebSocketDemo

使用Java提供的@ServerEndpoint注解实现

(1)使用@ServerEndpoint注解监听一个WebSocket请求路径:

这里监听了客户端的连接端口/reverse,并定义了如何处理客户端发来的消息
 

package cn.zifangsky.samplewebsocket.websocket;



import javax.websocket.OnMessage;

import javax.websocket.Session;

import javax.websocket.server.ServerEndpoint;

import java.io.IOException;



/**

* ReverseWebSocketEndpoint

*

* @author zifangsky

* @date 2018/9/30

* @since 1.0.0

*/

@ServerEndpoint("/reverse")

public class ReverseWebSocketEndpoint {



    @OnMessage

    public void handleMessage(Session session, String message) throws IOException {

        session.getBasicRemote().sendText("Reversed: " + new StringBuilder(message).reverse());

    }



}

(2)WebSocket相关配置:
 

package cn.zifangsky.samplewebsocket.config;



import cn.zifangsky.samplewebsocket.websocket.ReverseWebSocketEndpoint;

import org.springframework.context.annotation.Bean;

import org.springframework.context.annotation.Configuration;

import org.springframework.web.socket.config.annotation.EnableWebSocket;

import org.springframework.web.socket.server.standard.ServerEndpointExporter;



/**

* WebSocket相关配置

*

* @author zifangsky

* @date 2018/9/30

* @since 1.0.0

*/

@Configuration

@EnableWebSocket

public class WebSocketConfig{



    @Bean

    public ReverseWebSocketEndpoint reverseWebSocketEndpoint() {

        return new ReverseWebSocketEndpoint();

    }



    @Bean

    public ServerEndpointExporter serverEndpointExporter() {

        return new ServerEndpointExporter();

    }



}

(3)示例页面:

使用Spring提供的低层级WebSocket API实现

Spring 4.0为WebSocket通信提供了支持,包括:

  • 发送和接收消息的低层级API;

  • 发送和接收消息的高级API;

  • 用来发送消息的模板;

  • 支持SockJS,用来解决浏览器端、服务器以及代理不支持WebSocket的问题。

使用Spring提供的低层级API实现WebSocket,主要需要以下几个步骤:

(1)添加一个WebSocketHandler:

        定义一个继承了AbstractWebSocketHandler类的消息处理类,然后自定义对”建立连接“、”接收/发送消息“、”异常情况“等情况进行处理

package cn.zifangsky.samplewebsocket.websocket;



import cn.zifangsky.samplewebsocket.service.EchoService;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

import org.springframework.web.socket.CloseStatus;

import org.springframework.web.socket.TextMessage;

import org.springframework.web.socket.WebSocketSession;

import org.springframework.web.socket.handler.TextWebSocketHandler;



import javax.annotation.Resource;

import java.text.MessageFormat;



/**

* 通过继承 {@link org.springframework.web.socket.handler.AbstractWebSocketHandler} 的示例

*

* @author zifangsky

* @date 2018/10/9

* @since 1.0.0

*/

public class EchoWebSocketHandler extends TextWebSocketHandler{

    private final Logger logger = LoggerFactory.getLogger(getClass());



    @Resource(name = "echoServiceImpl")

    private EchoService echoService;



    @Override

    public void afterConnectionEstablished(WebSocketSession session) throws Exception {

        logger.debug("Opened new session in instance " + this);

    }



    @Override

    protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {

        //组装返回的Echo信息

        String echoMessage = this.echoService.echo(message.getPayload());

        logger.debug(MessageFormat.format("Echo message \"{0}\"", echoMessage));



        session.sendMessage(new TextMessage(echoMessage));

    }



    @Override

    public void handleTransportError(WebSocketSession session, Throwable exception) throws Exception {

        session.close(CloseStatus.SERVER_ERROR);

        logger.debug("Info: WebSocket connection closed.");

    }

}

(2)WebSocket相关配置:

package cn.zifangsky.samplewebsocket.config;

import cn.zifangsky.samplewebsocket.websocket.EchoWebSocketHandler;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.WebSocketHandler;
import org.springframework.web.socket.config.annotation.EnableWebSocket;
import org.springframework.web.socket.config.annotation.WebSocketConfigurer;
import org.springframework.web.socket.config.annotation.WebSocketHandlerRegistry;

/**
 * WebSocket相关配置
 *
 * @author zifangsky
 * @date 2018/9/30
 * @since 1.0.0
 */
@Configuration
@EnableWebSocket
public class WebSocketConfig implements WebSocketConfigurer{

    @Override
    public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
        registry.addHandler(echoWebSocketHandler(), "/echoMessage");
        registry.addHandler(echoWebSocketHandler(), "/echoMessage_SockJS").withSockJS();
    }

    /**
     * 通过继承 {@link org.springframework.web.socket.handler.AbstractWebSocketHandler} 的示例
     */
    @Bean
    public WebSocketHandler echoWebSocketHandler(){
        return new EchoWebSocketHandler();
    }

}


(3)两个示例页面:略

        从上面代码可以看出,这里除了配置了基本的WebSocket(也就是/echoMessage这个连接地址),还使用SockJS配置了浏览器不支持WebSocket技术时的替代方案(也就是/echoMessage_SockJS这个连接地址)。

使用STOMP消息实现

        所谓STOMP(Simple Text Oriented Messaging Protocol),就是在WebSocket基础之上提供了一个基于帧的线路格式(frame-based wire format)层。它对发送简单文本消息定义了一套规范格式(STOMP消息基于Text,当然也支持传输二进制数据),目前很多服务端消息队列都已经支持STOMP,比如:RabbitMQ、 ActiveMQ等。

(1)WebSocket相关配置:

package cn.zifangsky.stompwebsocket.config;

import cn.zifangsky.stompwebsocket.interceptor.websocket.MyChannelInterceptor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.simp.config.ChannelRegistration;
import org.springframework.messaging.simp.config.MessageBrokerRegistry;
import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker;
import org.springframework.web.socket.config.annotation.StompEndpointRegistry;
import org.springframework.web.socket.config.annotation.WebSocketMessageBrokerConfigurer;

/**
 * WebSocket相关配置
 *
 * @author zifangsky
 * @date 2018/9/30
 * @since 1.0.0
 */
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer{
    @Autowired
    private MyChannelInterceptor myChannelInterceptor;

    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {
        registry.addEndpoint("/stomp-websocket").withSockJS();
    }

    @Override
    public void configureMessageBroker(MessageBrokerRegistry registry) {
        //客户端需要把消息发送到/message/xxx地址
        registry.setApplicationDestinationPrefixes("/message");
        //服务端广播消息的路径前缀,客户端需要相应订阅/topic/yyy这个地址的消息
        registry.enableSimpleBroker("/topic");
    }

    @Override
    public void configureClientInboundChannel(ChannelRegistration registration) {
        registration.interceptors(myChannelInterceptor);
    }

}


从上面代码可以看出,这里设置了好几个地址,简单解释如下:

  • 首先注册了一个名为/stomp-websocket的端点,也就是STOMP客户端连接的地址。

  • 此外,定义了服务端处理WebSocket消息的前缀是/message,这个地址用于客户端向服务端发送消息(比如客户端向/message/hello这个地址发送消息,那么服务端通过@MessageMapping(“/hello”)这个注解来接收并处理消息)

  • 最后,定义了一个简单消息代理,也就是服务端广播消息的路径前缀(比如客户端监听/topic/greeting这个地址,那么服务端就可以通过@SendTo(“/topic/greeting”)这个注解向客户端发送STOMP消息)。

需要注意的是,上面代码中还添加了一个名为MyChannelInterceptor的拦截器,目的是为了在客户端断开连接后打印一下日志。相关代码如下:

package cn.zifangsky.stompwebsocket.interceptor.websocket;

import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.simp.stomp.StompCommand;
import org.springframework.messaging.simp.stomp.StompHeaderAccessor;
import org.springframework.messaging.support.ChannelInterceptor;
import org.springframework.stereotype.Component;

import java.security.Principal;
import java.text.MessageFormat;

/**
 * 自定义{@link org.springframework.messaging.support.ChannelInterceptor},实现断开连接的处理
 *
 * @author zifangsky
 * @date 2018/10/10
 * @since 1.0.0
 */
@Component
public class MyChannelInterceptor implements ChannelInterceptor{
    private final Logger logger = LoggerFactory.getLogger(getClass());

    @Override
    public void afterSendCompletion(Message<?> message, MessageChannel channel, boolean sent, Exception ex) {
        StompHeaderAccessor accessor = StompHeaderAccessor.wrap(message);
        StompCommand command = accessor.getCommand();

        //用户已经断开连接
        if(StompCommand.DISCONNECT.equals(command)){
            String user = "";
            Principal principal = accessor.getUser();
            if(principal != null && StringUtils.isNoneBlank(principal.getName())){
                user = principal.getName();
            }else{
                user = accessor.getSessionId();
            }

            logger.debug(MessageFormat.format("用户{0}的WebSocket连接已经断开", user));
        }
    }

}


(2)使用@MessageMapping和@SendTo注解处理消息:

@MessageMapping注解用于监听指定路径的客户端消息,而@SendTo注解则用于将服务端的消息发送给监听了该路径的客户端。

package cn.zifangsky.stompwebsocket.controller;

import cn.zifangsky.stompwebsocket.model.websocket.Greeting;
import cn.zifangsky.stompwebsocket.model.websocket.HelloMessage;
import org.springframework.messaging.handler.annotation.MessageMapping;
import org.springframework.messaging.handler.annotation.SendTo;
import org.springframework.stereotype.Controller;

/**
 * Greeting
 * @author zifangsky
 * @date 2018/9/30
 * @since 1.0.0
 */
@Controller
public class GreetingController {

    @MessageMapping("/hello")
    @SendTo("/topic/greeting")
    public HelloMessage greeting(Greeting greeting) {
        return new HelloMessage("Hello," + greeting.getName() + "!");
    }
}


(3)示例页面:

上一篇:性能进阶:使用JMeter进行websocket测试


下一篇:Django3 channels websocket实时读取日志返回前端