websocket定时推送数据

示例代码

1、添加pom.xml依赖

<dependency>
    <groupId>org.springframework</groupId>
    <artifactId>spring-websocket</artifactId>
</dependency>

 2、创建websocket配置类

package com.success.socket;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;
/**
 * WebSocket配置类
 */
@Configuration
public class WebSocketConfig {
	@Bean
	public ServerEndpointExporter serverEndpointExporter() {
		return new ServerEndpointExporter();
	}
}

3、创建WebSokcet工具类 

package com.success.socket;


import cn.hutool.json.JSONUtil;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.example.dataservice.models.common.ResultPO;
import com.success.service.HomePageService;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springblade.util.SpringUtils;
import org.springframework.stereotype.Component;

import javax.websocket.*;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;

@Component
@ServerEndpoint(value = "/ws/{userId}")
public class SocketServer {
	private final static Logger log = LoggerFactory.getLogger(SocketServer.class);
	//记录当前在线连接数
	private static final AtomicInteger onlineCount = new AtomicInteger(0);
	//存放所有在线的客户端	
	private static final ConcurrentHashMap<String, Session> clients = new ConcurrentHashMap<>();
    //注入homePageService ,实际的业务在这里实现
	private static HomePageService homePageService = SpringUtils.getApplicationContext().getBean(HomePageService.class);
	/**
	 * 连接建立成功调用的方法
     * 连接成功返回 {"msg":"ok","event":"open"}
	 */
	@OnOpen
	public void onOpen(Session session, @PathParam("userId") String userId) {
		onlineCount.incrementAndGet(); // 在线数加1
		clients.put(session.getId(), session);
		JSONObject json = new JSONObject();
		json.put("event", "open");
		json.put("msg", "ok");
		this.sendMessage(json.toJSONString());
		log.info("有新连接加入:{},当前在线人数为:{}", session.getId(), onlineCount.get());
	}
	/**
	 * 连接关闭调用的方法
	 */
	@OnClose
	public void onClose(Session session) {
		onlineCount.decrementAndGet(); // 在线数减1
		clients.remove(session.getId());
		log.info("有一连接关闭:{},当前在线人数为:{}", session.getId(), onlineCount.get());
	}
    /**
	 * 收到客户端消息后调用的方法
	 *
	 * @param message 客户端发送过来的消息 {"event":"getCountGeneral"}
	 *                当业务改动数据时,可以主动发消息(不需要客户端主动请求)
	 */
	@OnMessage
	public void onMessage(String message, Session session) {
		log.info("服务端收到客户端[{}]的消息:{}", session.getId(), message);
		if (StringUtils.isBlank(message)) {
			JSONObject jsonRes = new JSONObject();
			jsonRes.put("msg", "event无法处理");
			this.sendMessage(jsonRes.toJSONString());
		}
		//解析发送的报文
		JSONObject jsonObject = JSON.parseObject(message);
		//24小时实时数据  前端发送{"event":"getCountGeneral"}
		if ("getCountGeneral".equals(jsonObject.getString("event"))) {
			this.sendMessage(getCountGeneral());
		}
	}

	@OnError
	public void onError(Session session, Throwable error) {
		log.error("链接发生错误:{},原因:{}", session.getId(), error.getMessage());
		error.printStackTrace();
	}

	/**
	 * 获取24小时实时数据
	 *
	 * @return
	 */
	public String getCountGeneral() {
		Map<String,Object> map = new HashMap<>();		
	    //业务1
		map.put("interfaceCallTop",homePageService.getInterfaceCallTop(params));
		//业务2
		map.put("countGeneral",homePageService.getCountGeneral1());
		return JSONObject.toJSONString(map);
	}
	/**
	 * 群发消息
	 *
	 * @param message 消息内容
	 */
	public void sendMessage(String message) {
		if (clients.isEmpty()) {
			log.info("*********  当前无客户端链接  *********");
		}
		for (Map.Entry<String, Session> sessionEntry : clients.entrySet()) {
			Session toSession = sessionEntry.getValue();
			toSession.getAsyncRemote().sendText(message);
			log.info("服务端发送消息给客户端:{}", toSession.getId());
		}
	}
	/**
	 * 启动群发任务,用于定时任务
	 */
	public void run() {
		//首页24小时实时数据 当日接口访问排行 当日单位调用排行 当日应用调用排行
		this.sendMessage(getCountGeneral());
	}
}

4、定时任务

有时候需要后端定时推送数据给前端,需要增加以下代码:

package com.success.utils;

import com.success.socket.SocketServer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

@Component
@RestController
public class DataPushScheduledTask {
	private final static Logger log = LoggerFactory.getLogger(DataPushScheduledTask.class);
	/**
	 * 30秒一次
	 * 定时场景数据推送
	 */
	@Scheduled(cron = "0/30 * * * * ? ")
	@GetMapping("/test")
	public void executeDataPush() {
		log.info("*********   定时任务执行   *********");
		new SocketServer().run();
		log.info("*********   定时任务完成   *********");
	}
}

上一篇:如何在Linux上安装配置Java开发环境


下一篇:uniapp如何适配ipad