springboot整合websocket集群

websocket的session无法序列化到redis,当多个服务启动时,A用户在服务1中,B用户在服务2时,A用户和B用户则无法进行通讯。
可以依赖mq或者redis消息的发布订阅实现,原理是每个服务收到消息都会发送一条mq消息,所有服务都会收到这个消息,然后每个服务都会向当前session的用户发送websocket消息(如果用户在当前服务中)。

本文以redis发布订阅消息演示

结合redis发布订阅消息和websocket

引入依赖

1
2
3
4
5
6
7
8
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>

部分修改

  1. websocket
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    @RestController
    public class ImController {

    @Autowired
    private StringRedisTemplate stringRedisTemplate;

    /**
    * 发送消息,一对一
    */
    @MessageMapping("/send2user")
    public String send2user(WsMessage wsMessage, Principal principal) {
    // 获取用户的uid
    String uid = principal.getName();
    // 设置发送信息的uid
    wsMessage.setUid(uid);
    System.out.println(uid + ":" + wsMessage);
    // 发送订阅消息
    // 不直接发送websocket消息,发布redis消息
    stringRedisTemplate.convertAndSend("ws-msg", JSON.toJSONString(wsMessage));
    return "success";
    }
    }
  2. redis订阅消息
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    @Component
    public class MyMessageListener implements MessageListener {

    @Autowired
    private SimpMessagingTemplate simpMessagingTemplate;

    @Override
    public void onMessage(Message message, byte[] pattern) {
    WsMessage wsMessage = JSON.parseObject(message.toString(), WsMessage.class);
    // 发送websocket消息
    simpMessagingTemplate.convertAndSendToUser(wsMessage.getToUid(), "msg", wsMessage);
    }
    }