springboot 集成websocket 实现集群消息推送

1.简介

由于遇到异步的接口调用,异步任务处理结果会写在rabbitmq中,部署方式为了实现高可用会使用开启多个微服务实例。无论哪个微服务消费了mq,都能把消息推送到所有的微服务的前端。

2.配置

2.1pom.xml

<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-amqp</artifactId>
		</dependency>
		<dependency>
			<groupId>com.github.pagehelper</groupId>
			<artifactId>pagehelper-spring-boot-starter</artifactId>
			<version>1.2.9</version>
		</dependency>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-websocket</artifactId>
		</dependency>

		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-thymeleaf</artifactId>
		</dependency>

2.2application.yml

spring:
  rabbitmq:
    host: 10.1.1.6
    port: 5678
    username: lys
    password: lys
#    publisher-confirms: true
    publisher-returns: true
    #以下配置消费者方需要
    listener:
      simple:
        acknowledge-mode: manual
    template:
      mandatory: true
  redis:
    host: 10.6.6.6
    port: 32252
    timeout: 5001
    password: lys
    

3.mq消息

3.1TopicRabbitConfig

package com.lys.config.mq;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * @Auther: liuysh
 * @Date: 2021/5/24 15:51
 * @Description:
 */
@Configuration
public class TopicRabbitConfig {
    /**
     * 给topic队列起名
     */
    public static final  String TOPIC_QUEUE = "myTopic";

    @Bean
    public FanoutExchange fanoutExchange(){
        return new FanoutExchange("fanout_exchange",true,false);
    }

    @Bean
    public Queue myQueue(){
        return  new Queue(TopicRabbitConfig.TOPIC_QUEUE ,true,false,false);
    }

    @Bean
    public Binding bind(){
        return BindingBuilder.bind(myQueue()).to(fanoutExchange());
    }
}

3.2 MessageListenerConfig

import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
 * @Auther: liuysh
 * @Date: 2021/5/24 16:39
 * @Description:
 */
@Configuration

public class MessageListenerConfig {
    private final CachingConnectionFactory connectionFactory;

    @Autowired
    public MessageListenerConfig(
            CachingConnectionFactory connectionFactory
    ) {
        this.connectionFactory = connectionFactory;
    }

    @Bean
    public SimpleMessageListenerContainer simpleMessageListenerContainer() {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
        container.setConcurrentConsumers(1);
        container.setMaxConcurrentConsumers(1);
        // RabbitMQ默认是自动确认,这里改为手动确认消息
        container.setAcknowledgeMode(AcknowledgeMode.MANUAL);

        return container;
    }
}

3.3 FanoutListenerFirst


import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;

import java.util.Date;

/**
 * @Auther: liuysh
 * @Date: 2021/5/24 16:03
 * @Description:
 */
@Slf4j
@Component
public class FanoutListenerFirst implements ChannelAwareMessageListener {



    @Autowired
    StringRedisTemplate stringRedisTemplate;

    @Override
    @RabbitListener(queues = TopicRabbitConfig.TOPIC_CDB_RDS_CONSOLE_QUEUE)
    public void onMessage(Message message, Channel channel) throws Exception {
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        try {

            String json=new String(message.getBody());


           stringRedisTemplate.convertAndSend("myRedisTopic",json);
            
           //为true表示确认之前的所有消息  false表示只来处理着当前的消息
            channel.basicAck(deliveryTag, false);
        } catch (Exception e) {
            log.error("topic模式 监听者 one  处理消息时显示异常,异常是:{},现拒绝消费当前消息且不再放回队列",e);
            //为true会重新放回队列
             channel.basicReject(deliveryTag, false);
            // 为了防止存在有异常的message堆积,故异常的也进行消费
            // channel.basicAck(deliveryTag, false);


        }
    }

    


}

4.redis 订阅消费通信

4.1RedisCacheConfig

import org.springframework.cache.annotation.EnableCaching;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;

/**
 * @Auther: liuysh
 * @Date: 2021/8/2 15:29
 * @Description:
 */
@Configuration
@EnableCaching
public class RedisCacheConfig {
    @Bean
    RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory,
                                            MessageListenerAdapter listenerAdapter) {

        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        // 可以添加多个 messageListener,配置不同的交换机
        container.addMessageListener(listenerAdapter, new PatternTopic("myRedisTopic"));

        return container;
    }

    /**
     * 消息监听器适配器,绑定消息处理器,利用反射技术调用消息处理器的业务方法
     * @param receiver
     * @return
     */
    @Bean
    MessageListenerAdapter listenerAdapter(RedisReceiver receiver) {
        return new MessageListenerAdapter(receiver, "receiveMessage");
    }

    @Bean
    StringRedisTemplate template(RedisConnectionFactory connectionFactory) {
        return new StringRedisTemplate(connectionFactory);
    }
}

4.2 RedisReceiver

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.simp.SimpMessagingTemplate;
import org.springframework.stereotype.Component;

/**
 * @Auther: liuysh
 * @Date: 2021/8/2 15:29
 * @Description:
 */
@Component
@Slf4j
public class RedisReceiver {
    @Autowired
    private SimpMessagingTemplate simpMessagingTemplate;
    public void receiveMessage(String message) {
        log.info("redis 订阅者收到的消息:"+message);
        JSONObject jSONObject=JSON.parseObject(message);
        simpMessagingTemplate.convertAndSend(WsConstant.WS_TOPIC, message);
        
    }
}

5.WebSocketConfig

package com.sugon.cloud.config;

import com.sugon.cloud.common.bean.RdsWsConstant;
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.simp.config.MessageBrokerRegistry;
import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker;
import org.springframework.web.socket.config.annotation.StompEndpointRegistry;
import org.springframework.web.socket.config.annotation.WebSocketMessageBrokerConfigurer;

/**
 * @Auther: liuysh
 * @Date: 2021/7/10 13:37
 * @Description:
 */
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {


    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {
        //注册一个Stomp 协议的endpoint,并指定 SockJS协议
        registry.addEndpoint("/ws_endpoint")
                .setAllowedOrigins("*")
                .withSockJS();
    }

    @Override
    public void configureMessageBroker(MessageBrokerRegistry registry) {
        //广播式应配置一个/topic 消息代理
        registry.enableSimpleBroker("/topic/instanceStatus");
    }
}

6.前端代码

<!DOCTYPE html>
<html lang="zh-CN" xmlns:th="http://www.thymeleaf.org">
<head>
    <meta charset="UTF-8" />
    <title>Spring Boot+WebSocket+广播式</title>

</head>
<body onl oad="disconnect()">
<noscript><h2 style="color: #ff0000">貌似你的浏览器不支持websocket</h2></noscript>
<div>
    <div>
        <label>输入实例instanceId</label><input type="text" id="name" value="1" />
        <button id="connect" onclick="connect();">连接</button>
        <button id="disconnect" disabled="disabled" onclick="disconnect();">断开连接</button>
    </div>
    接收的数据:
    <div id="conversationDiv">

        <p id="response"></p>
    </div>
</div>
<script th:src="@{sockjs.min.js}"></script>
<script th:src="@{stomp.min.js}"></script>
<script th:src="@{jquery.js}"></script>
<script type="text/javascript">
    var stompClient = null;
    var instanceId=null;



    function setConnected(connected) {
        document.getElementById('connect').disabled = connected;
        document.getElementById('disconnect').disabled = !connected;
        document.getElementById('conversationDiv').style.visibility = connected ? 'visible' : 'hidden';
        $('#response').html();
    }
	
    function connect() {
        var instanceId= $('#name').val();
        var socket = new SockJS('/ws_endpoint'); //链接SockJS 的endpoint 名称为"/ws_endpoint"
        stompClient = Stomp.over(socket);//使用stomp子协议的WebSocket 客户端
        stompClient.connect({}, function(frame) {//链接Web Socket的服务端。
            setConnected(true);
            console.log('Connected: ' + frame);
            var topic=instanceId.length>0?'/topic/instanceStatus/'+instanceId:'/topic/instanceStatus';
            stompClient.subscribe(topic, function(respnose){
                //订阅/topic/getResponse 目标发送的消息。这个是在控制器的@SendTo中定义的。
                showResponse(Date()+"::"+instanceId+":::"+respnose.body);
            });
        });



    }


    function disconnect() {
        if (stompClient != null) {
            stompClient.disconnect();
        }
        setConnected(false);
        console.log("Disconnected");
    }



    function showResponse(message) {
        $('#response').append("<b>Received: " + message + "</b><br/>")
    }
</script>
</body>
</html>
上一篇:基于Nacos的服务消费者


下一篇:SpringSecurity的demo笔记