示例代码
1、添加pom.xml依赖
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-websocket</artifactId>
</dependency>
2、创建websocket配置类
package com.success.socket;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;
/**
* WebSocket配置类
*/
@Configuration
public class WebSocketConfig {
@Bean
public ServerEndpointExporter serverEndpointExporter() {
return new ServerEndpointExporter();
}
}
3、创建WebSokcet工具类
package com.success.socket;
import cn.hutool.json.JSONUtil;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.example.dataservice.models.common.ResultPO;
import com.success.service.HomePageService;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springblade.util.SpringUtils;
import org.springframework.stereotype.Component;
import javax.websocket.*;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
@Component
@ServerEndpoint(value = "/ws/{userId}")
public class SocketServer {
private final static Logger log = LoggerFactory.getLogger(SocketServer.class);
//记录当前在线连接数
private static final AtomicInteger onlineCount = new AtomicInteger(0);
//存放所有在线的客户端
private static final ConcurrentHashMap<String, Session> clients = new ConcurrentHashMap<>();
//注入homePageService ,实际的业务在这里实现
private static HomePageService homePageService = SpringUtils.getApplicationContext().getBean(HomePageService.class);
/**
* 连接建立成功调用的方法
* 连接成功返回 {"msg":"ok","event":"open"}
*/
@OnOpen
public void onOpen(Session session, @PathParam("userId") String userId) {
onlineCount.incrementAndGet(); // 在线数加1
clients.put(session.getId(), session);
JSONObject json = new JSONObject();
json.put("event", "open");
json.put("msg", "ok");
this.sendMessage(json.toJSONString());
log.info("有新连接加入:{},当前在线人数为:{}", session.getId(), onlineCount.get());
}
/**
* 连接关闭调用的方法
*/
@OnClose
public void onClose(Session session) {
onlineCount.decrementAndGet(); // 在线数减1
clients.remove(session.getId());
log.info("有一连接关闭:{},当前在线人数为:{}", session.getId(), onlineCount.get());
}
/**
* 收到客户端消息后调用的方法
*
* @param message 客户端发送过来的消息 {"event":"getCountGeneral"}
* 当业务改动数据时,可以主动发消息(不需要客户端主动请求)
*/
@OnMessage
public void onMessage(String message, Session session) {
log.info("服务端收到客户端[{}]的消息:{}", session.getId(), message);
if (StringUtils.isBlank(message)) {
JSONObject jsonRes = new JSONObject();
jsonRes.put("msg", "event无法处理");
this.sendMessage(jsonRes.toJSONString());
}
//解析发送的报文
JSONObject jsonObject = JSON.parseObject(message);
//24小时实时数据 前端发送{"event":"getCountGeneral"}
if ("getCountGeneral".equals(jsonObject.getString("event"))) {
this.sendMessage(getCountGeneral());
}
}
@OnError
public void onError(Session session, Throwable error) {
log.error("链接发生错误:{},原因:{}", session.getId(), error.getMessage());
error.printStackTrace();
}
/**
* 获取24小时实时数据
*
* @return
*/
public String getCountGeneral() {
Map<String,Object> map = new HashMap<>();
//业务1
map.put("interfaceCallTop",homePageService.getInterfaceCallTop(params));
//业务2
map.put("countGeneral",homePageService.getCountGeneral1());
return JSONObject.toJSONString(map);
}
/**
* 群发消息
*
* @param message 消息内容
*/
public void sendMessage(String message) {
if (clients.isEmpty()) {
log.info("********* 当前无客户端链接 *********");
}
for (Map.Entry<String, Session> sessionEntry : clients.entrySet()) {
Session toSession = sessionEntry.getValue();
toSession.getAsyncRemote().sendText(message);
log.info("服务端发送消息给客户端:{}", toSession.getId());
}
}
/**
* 启动群发任务,用于定时任务
*/
public void run() {
//首页24小时实时数据 当日接口访问排行 当日单位调用排行 当日应用调用排行
this.sendMessage(getCountGeneral());
}
}
4、定时任务
有时候需要后端定时推送数据给前端,需要增加以下代码:
package com.success.utils;
import com.success.socket.SocketServer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
@Component
@RestController
public class DataPushScheduledTask {
private final static Logger log = LoggerFactory.getLogger(DataPushScheduledTask.class);
/**
* 30秒一次
* 定时场景数据推送
*/
@Scheduled(cron = "0/30 * * * * ? ")
@GetMapping("/test")
public void executeDataPush() {
log.info("********* 定时任务执行 *********");
new SocketServer().run();
log.info("********* 定时任务完成 *********");
}
}