netty-socketio(二)整合redis实现发布订阅

1、Redis 发布订阅

参考:https://www.runoob.com/redis/redis-pub-sub.html

Redis 发布订阅(pub/sub)是一种消息通信模式:发送者(pub)发送消息,订阅者(sub)接收消息。

Redis 客户端可以订阅任意数量的频道。

2、案例:netty-socketio和redis实现发布/订阅功能

  本Demo实现:netty-socketio实现订阅(参考:https://www.cnblogs.com/xy-ouyang/p/10675904.html),redis实现推送消息。本demo保存地址:https://github.com/wenbinouyang/oy_java

  demo使用 springboot 2.1.3.RELEASE,项目总体结构:

  netty-socketio(二)整合redis实现发布订阅

  pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.1.3.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.oy</groupId>
<artifactId>nettysocketio007</artifactId>
<version>0.0.1</version>
<name>nettysocketio008</name>
<description>nettysocketio008 for Spring Boot</description> <properties>
<java.version>1.8</java.version>
</properties> <dependencies>
<!-- spring boot start -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-tomcat</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency> <!-- netty-socketio server -->
<dependency>
<groupId>com.corundumstudio.socketio</groupId>
<artifactId>netty-socketio</artifactId>
<version>1.7.11</version>
</dependency> <!-- springboot data redis -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
<!-- exclusion lettuce,jedis -->
<exclusions>
<exclusion>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
</exclusion>
<exclusion>
<groupId>io.lettuce</groupId>
<artifactId>lettuce-core</artifactId>
</exclusion>
</exclusions>
</dependency> <!-- jedis -->
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
</dependency> <!-- jedis pool dependency -->
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-pool2</artifactId>
</dependency> <dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.47</version>
</dependency>
</dependencies> <build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build> </project>

  application.properties

logging.file=/home/wwwlogs/nettysocketio008/log.log

#redis
#spring.redis.host=127.0.0.1
spring.redis.host=47.244.48.230
spring.redis.port=
spring.redis.password=Redis0929 spring.redis.jedis.pool.maxActive=
spring.redis.jedis.pool.max-idle=
spring.redis.jedis.pool.min-idle=
spring.redis.timeout=

  Nettysocketio008Application类

package com.oy;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.core.annotation.Order;
import com.corundumstudio.socketio.Configuration;
import com.corundumstudio.socketio.SocketIOServer;
import com.corundumstudio.socketio.annotation.SpringAnnotationScanner; @SpringBootApplication
@Order(1)
public class Nettysocketio008Application implements CommandLineRunner {
private SocketIOServer server; public static void main(String[] args) {
SpringApplication.run(Nettysocketio008Application.class, args);
} @Bean
public SocketIOServer socketIOServer() {
Configuration config = new Configuration();
config.setHostname("localhost");
config.setPort(4001);
this.server = new SocketIOServer(config);
return server;
} @Bean
public SpringAnnotationScanner springAnnotationScanner(SocketIOServer socketServer) {
return new SpringAnnotationScanner(socketServer);
} @Override
public void run(String... args) throws Exception {
server.start();
UtilFunctions.log.info("socket.io run success!"); // 向"channel_1" push数据
// Service.send(args);
}
}

  RedisConfig类

package com.oy;

import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map; import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cache.annotation.CachingConfigurerSupport;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.listener.ChannelTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.listener.Topic;
import org.springframework.data.redis.serializer.RedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler; @Configuration
public class RedisConfig extends CachingConfigurerSupport {
public static final Logger log = LoggerFactory.getLogger(RedisConfig.class); @Bean
public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory factory) {
RedisSerializer<String> redisSerializer = new StringRedisSerializer(); // RedisTemplate
RedisTemplate<String, Object> template = new RedisTemplate<>();
template.setConnectionFactory(factory);
template.setKeySerializer(redisSerializer);
template.setValueSerializer(redisSerializer);
template.setHashKeySerializer(redisSerializer);
template.setHashValueSerializer(redisSerializer); template.afterPropertiesSet();
return template;
} @Bean(destroyMethod = "destroy")
public RedisMessageListenerContainer redisMessageListenerContainer(RedisConnectionFactory factory,
MessageListener redisMessageListener) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(factory); ThreadPoolTaskScheduler taskExecutor = new ThreadPoolTaskScheduler();
taskExecutor.setPoolSize(10);
taskExecutor.initialize();
container.setTaskExecutor(taskExecutor); Map<MessageListener, Collection<? extends Topic>> listeners = new HashMap<>();
List<Topic> list = new ArrayList<>();
list.add(new ChannelTopic("cfd_md"));
listeners.put(redisMessageListener, list);
container.setMessageListeners(listeners); return container;
} }

  MessageEventHandler类

package com.oy;
import java.util.Set;
import java.util.UUID; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component; import com.corundumstudio.socketio.AckRequest;
import com.corundumstudio.socketio.SocketIOClient;
import com.corundumstudio.socketio.SocketIOServer;
import com.corundumstudio.socketio.annotation.OnConnect;
import com.corundumstudio.socketio.annotation.OnDisconnect;
import com.corundumstudio.socketio.annotation.OnEvent; @Component
public class MessageEventHandler {
public static SocketIOServer socketIoServer; @Autowired
public MessageEventHandler(SocketIOServer server) {
MessageEventHandler.socketIoServer = server;
} @OnConnect
public void onConnect(SocketIOClient client) {
UUID socketSessionId = client.getSessionId();
String ip = client.getRemoteAddress().toString();
UtilFunctions.log.info("client connect, socketSessionId:{}, ip:{}", socketSessionId, ip);
} @OnEvent("sub")
public void sub(SocketIOClient client, AckRequest request, String channel) {
UUID socketSessionId = client.getSessionId();
String ip = client.getRemoteAddress().toString();
client.joinRoom(channel);
UtilFunctions.log.info("client sub, channel:{}, socketSessionId:{}, ip:{}", channel, socketSessionId, ip); Set<String> rooms = client.getAllRooms();
for (String room : rooms) {
UtilFunctions.log.info("after client connect, room:{}", room);
} // 客户端一订阅,就马上push一次
if ("channel_1".equals(channel)) {
sendAllEvent(Service.getMsg());
} else if ("redis_channel".equals(channel)) {
sendAllEvent(RedisSub.getMsg());
}
} // @OnEvent("unsub")
// public void unsub(SocketIOClient client, AckRequest request, String channel) {
// UUID socketSessionId = client.getSessionId();
// String ip = client.getRemoteAddress().toString();
// client.leaveRoom(channel);
// UtilFunctions.log.info("client unsub, channel:{}, socketSessionId:{}, ip:{}", channel, socketSessionId, ip);
// } @OnDisconnect
public void onDisconnect(SocketIOClient client) {
UUID socketSessionId = client.getSessionId();
String ip = client.getRemoteAddress().toString();
UtilFunctions.log.info("client disconnect, socketSessionId:{}, ip:{}", socketSessionId, ip); Set<String> rooms = client.getAllRooms();
for (String room : rooms) {
UtilFunctions.log.info("after client disconnect, room:{}", room);
}
} // broadcast to channel "channel_1"
public static void sendAllEvent(String data) {
socketIoServer.getRoomOperations("channel_1").sendEvent("channel_1", data);
} // broadcast to channel "redis_channel"
public static void sendAllEvent2(String data) {
socketIoServer.getRoomOperations("redis_channel").sendEvent("redis_channel", data);
}
}

  RedisSub类

package com.oy;

import java.util.Date;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service; import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject; @Service
public class RedisSub implements MessageListener {
private static String msg;
private String redisStr; @Autowired
private RedisTemplate<String, Object> redisTemplate; public RedisTemplate<String, Object> getRedisTemplate() {
return redisTemplate;
} @Override
public void onMessage(Message message, byte[] pattern) {
String msg = (String) redisTemplate.getValueSerializer().deserialize(message.getBody());
String channel = (String) redisTemplate.getValueSerializer().deserialize(message.getChannel()); if (null != channel && !"".equals(channel) && null != msg && !"".equals(msg)) { // 相同数据不push
if(redisStr == null) {
UtilFunctions.log.info("===== redisStr == null =====");
redisStr = msg;
} else if (redisStr.equals(msg)) {
UtilFunctions.log.info("===== redisStr is same =====");
return ;
} else {
UtilFunctions.log.info("===== redisStr:{} =====", redisStr);
redisStr = msg;
} JSONObject data = new JSONObject();
JSONObject json = JSON.parseObject(msg);
data.put(json.getString("contract"), json); JSONObject jsonObj = new JSONObject();
jsonObj.put("channel", channel);
jsonObj.put("timestamp", new Date().getTime());
jsonObj.put("data", data); MessageEventHandler.sendAllEvent2(jsonObj.toJSONString());
RedisSub.msg = jsonObj.toJSONString();
UtilFunctions.log.info("message from channel:{}, msg:{} ", channel, jsonObj.toJSONString());
} } public static String getMsg() {
return msg;
} public static void setMsg(String msg) {
RedisSub.msg = msg;
}
}

  

  客户端html页面及测试参考:https://www.cnblogs.com/xy-ouyang/p/10675904.html

上一篇:【redis】spring boot利用redis的Keyspace Notifications实现消息通知


下一篇:Springboot+Redis(发布订阅模式)跨多服务器实战