您现在的位置是:网站首页> 编程资料编程资料

websocket+redis动态订阅和动态取消订阅的实现示例_Redis_

2023-05-27 526人已围观

简介 websocket+redis动态订阅和动态取消订阅的实现示例_Redis_

原理

websocket的订阅就是在前后端建立ws连接之后,前端通过发送一定格式的消息,后端解析出来去订阅或者取消订阅redis频道。

订阅频道消息格式:

{ "cmd":"subscribe", "topic":[ "topic_name" ] } 

模糊订阅格式

{ "cmd":"psubscribe", "topic":[ "topic_name" ] } 

取消订阅格式

{ "cmd":"unsubscribe", "topic":[ "topic_name" ] } 

两个核心类,一个是redis的订阅监听类,一个是websocket的发布订阅类。

redis订阅监听类

package com.curtain.core; import com.curtain.config.GetBeanUtil; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; import redis.clients.jedis.Jedis; import redis.clients.jedis.JedisPool; import redis.clients.jedis.JedisPubSub; import java.util.Arrays; /**  * @Author Curtain  * @Date 2021/6/7 14:27  * @Description  */ @Component @Slf4j public class RedisPubSub extends JedisPubSub {     private JedisPool jedisPool = GetBeanUtil.getBean(JedisPool.class);     private Jedis jedis;     //订阅     public void subscribe(String... channels) {         jedis = jedisPool.getResource();         try {             jedis.subscribe(this, channels);         } catch (Exception e) {             log.error(e.getMessage());             if (jedis != null)                 jedis.close();             //遇到异常后关闭连接重新订阅             log.info("监听遇到异常,四秒后重新订阅频道:");             Arrays.asList(channels).forEach(s -> {log.info(s);});             try {                 Thread.sleep(4000);             } catch (InterruptedException interruptedException) {                 interruptedException.printStackTrace();             }             subscribe(channels);         }     }     //模糊订阅     public void psubscribe(String... channels) {         Jedis jedis = jedisPool.getResource();         try {             jedis.psubscribe(this, channels);         } catch (ArithmeticException e) {//取消订阅故意造成的异常             if (jedis != null)                 jedis.close();         } catch (Exception e) {             log.error(e.getMessage());             if (jedis != null)                 jedis.close();             //遇到异常后关闭连接重新订阅             log.info("监听遇到异常,四秒后重新订阅频道:");             Arrays.asList(channels).forEach(s -> {log.info(s);});             try {                 Thread.sleep(4000);             } catch (InterruptedException interruptedException) {                 interruptedException.printStackTrace();             }             psubscribe(channels);         }     }     public void unsubscribeAndClose(String... channels){         unsubscribe(channels);         if (jedis != null && !isSubscribed())             jedis.close();     }     public void punsubscribeAndClose(String... channels){         punsubscribe(channels);         if (jedis != null && !isSubscribed())             jedis.close();     }     @Override     public void onSubscribe(String channel, int subscribedChannels) {         log.info("subscribe redis channel:" + channel + ", 线程id:" + Thread.currentThread().getId());     }     @Override     public void onPSubscribe(String pattern, int subscribedChannels) {         log.info("psubscribe redis channel:" + pattern + ", 线程id:" + Thread.currentThread().getId());     }     @Override     public void onPMessage(String pattern, String channel, String message) {         log.info("receive from redis channal: " + channel + ",pattern: " + pattern + ",message:" + message + ", 线程id:" + Thread.currentThread().getId());         WebSocketServer.publish(message, pattern);         WebSocketServer.publish(message, channel);     }     @Override     public void onMessage(String channel, String message) {         log.info("receive from redis channal: " + channel + ",message:" + message + ", 线程id:" + Thread.currentThread().getId());         WebSocketServer.publish(message, channel);     }     @Override     public void onUnsubscribe(String channel, int subscribedChannels) {         log.info("unsubscribe redis channel:" + channel);     }     @Override     public void onPUnsubscribe(String pattern, int subscribedChannels) {         log.info("punsubscribe redis channel:" + pattern);     } }

1.jedis监听redis频道的时候如果遇见异常会关闭连接导致后续没有监听该频道,所以这里在subscribe捕获到异常的时候会重新创建一个jedis连接订阅该redis频道。

webSocket订阅推送类

这个类会有两个ConcurrentHashMap>类型类变量,分别存储订阅和模糊订阅的信息。

外面一层的String对应的值是topic_name,里面一层的String对应的值是sessionId。前端发送过来的消息里面对应的这三类操作其实就是对这两个map里面的。

还有个ConcurrentHashMap类型的变量,存储的是事件-RedisPubSub,便于取消订阅的时候找到监听该频道(事件)的RedisPubSub对象。

信息进行增加或者删除;后端往前端推送数据也会根据不同的topic_name推送到不同的订阅者这边。

package com.curtain.core; import com.alibaba.fastjson.JSON; import com.curtain.config.WebsocketProperties; import com.curtain.service.Cancelable; import com.curtain.service.impl.TaskExecuteService; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import javax.websocket.*; import javax.websocket.server.ServerEndpoint; import java.io.IOException; import java.io.UnsupportedEncodingException; import java.nio.ByteBuffer; import java.util.*; import java.util.concurrent.ConcurrentHashMap; /**  * @Author Curtain  * @Date 2021/5/14 16:49  * @Description  */ @ServerEndpoint("/ws") @Component @Slf4j public class WebSocketServer {     /**      * concurrent包的线程安全Set,用来存放每个客户端对应的MyWebSocket对象。      */     private static volatile ConcurrentHashMap> webSocketMap = new ConcurrentHashMap<>();     /**      * 存放psub的事件      **/     private static volatile ConcurrentHashMap> pWebSocketMap = new ConcurrentHashMap<>();     /**      * 存放topic(pattern)-对应的RedisPubsub      */     private static volatile ConcurrentHashMap redisPubSubMap = new ConcurrentHashMap<>();     /**      * 与某个客户端的连接会话,需要通过它来给客户端发送数据      */     private Session session;     private String sessionId = "";     //要注入的对象     private static TaskExecuteService executeService;     private static WebsocketProperties properties;     private Cancelable cancelable;     @Autowired     public void setTaskExecuteService(TaskExecuteService taskExecuteService) {         WebSocketServer.executeService = taskExecuteService;     }     @Autowired     public void setWebsocketProperties(WebsocketProperties properties) {         WebSocketServer.properties = properties;     }     /**      * 连接建立成功调用的方法      */     @OnOpen     public void onOpen(Session session) {         this.session = session;         this.sessionId = session.getId();         //构造推送数据         Map pubHeader = new HashMap();         pubHeader.put("name", "connect_status");         pubHeader.put("type", "create");         pubHeader.put("from", "pubsub");         pubHeader.put("time", new Date().getTime() / 1000);         Map pubPayload = new HashMap();         pubPayload.put("status", "success");         Map pubMap = new HashMap();         pubMap.put("header", pubHeader);         pubMap.put("payload", pubPayload);         sendMessage(JSON.toJSONString(pubMap));         cancelable = executeService.runPeriodly(() -> {             try {                 if (cancelable != null && !session.isOpen()) {                     log.info("断开连接,停止发送ping");                     cancelable.cancel();                 } else {                     String data = "ping";                     ByteBuffer payload = ByteBuffer.wrap(data.getBytes());                     session.getBasicRemote().sendPing(payload);                 }             } catch (IOException e) {                 e.printStackTrace();             }         }, properties.getPeriod());     }     @OnMessage     public void onMessage(String message) {         synchronized (session) {             Map msgMap = (Map) JSON.parse(message);             String cmd = (String) msgMap.get("cmd");             //订阅消息             if ("subscribe".equals(cmd)) {                 List topics = (List) msgMap.get("topic");                 //本地记录订阅信息                 for (int i = 0; i < topics.size(); i++) {                     String topic = topics.get(i);                     log.info("============================subscribe-start============================");                     log.info("sessionId:" + this.sessionId + ",开始订阅:" + topic);                     if (webSocketMap.containsKey(topic)) {//有人订阅过了                         webSocketMap.get(topic).put(this.sessionId, this);                     } else {//之前还没人订阅过,所以需要订阅redis频道                         ConcurrentHashMap map = new ConcurrentHashMap<>();                         map.put(this.sessionId, this);                         webSocketMap.put(topic, map);                         new Thread(() -> {                             RedisPubSub redisPubSub = new RedisPubSub();                             //存入map                             redisPubSubMap.put(topic, redisPubSub);                             redisPubSub.subscribe(topic);                         }).start();                     }                     log.info("sessionId:" + this.sessionId + ",完成订阅:" + topic);                     log();                     log.info("============================subscribe-end============================");                 }             }             //psubscribe             if ("psubscribe".equals(cmd)) {                 List topics = (List) msgMap.get("topic"); 
                
                

-六神源码网