时间:2023-04-18 20:22:01 | 来源:网站运营
时间:2023-04-18 20:22:01 来源:网站运营
Websocket集群解决方案:最近在项目中在做一个消息推送的功能,比如客户下单之后通知给给对应的客户发送系统通知,这种消息推送需要使用到全双工的websocket
推送消息。所谓的全双工表示客户端和服务端都能向对方发送消息。不使用同样是全双工的上一篇文章Spring Boot 整合单机websocket介绍了http
是因为http
只能由客户端主动发起请求,服务接收后返回消息。websocket
建立起连接之后,客户端和服务端都能主动向对方发送消息。
websocket
在单机模式下进行消息的发送和接收:用户A
和用户B
和web
服务器建立连接之后,用户A
发送一条消息到服务器,服务器再推送给用户B
,在单机系统上所有的用户都和同一个服务器建立连接,所有的session
都存储在同一个服务器中。session
,服务器的保存维持连接的session
。客户端每次只能和集群服务器其中的一个服务器连接,后续也是和该服务器进行数据传输。session共享
的问题,客户端成功连接服务器之后,其他服务器也知道客户端连接成功。websocket
类似的http
是如何解决集群问题的?解决方案之一就是共享session
,客户端登录服务端之后,将session
信息存储在Redis
数据库中,连接其他服务器时,从Redis
获取session
,实际就是将session
信息存储在Redis
中,实现redis的共享。session
可以被共享的前提是可以被序列化,而websocket
的session
是无法被序列化的,http
的session
记录的是请求的数据,而websocket
的session
对应的是连接,连接到不同的服务器,session
也不同,无法被序列化。http
不使用session
共享,就可以使用Nginx
负载均衡的ip hash
算法,客户端每次都是请求同一个服务器,客户端的session
都保存在服务器上,而后续请求都是请求该服务器,都能获取到session
,就不存在分布式session
问题了。websocket
相对http
来说,可以由服务端主动推动消息给客户端,如果接收消息的服务端和发送消息消息的服务端不是同一个服务端,发送消息的服务端无法找到接收消息对应的session
,即两个session不处于同一个服务端,也就无法推送消息。如下图所示:解决问题的方法是将所有消息的发送方和接收方都处于同一个服务器下,而消息发送方和接收方都是不确定的,显然是无法实现的。
websocket
实现消息的推送。<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-freemarker</artifactId></dependency><dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId></dependency><dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-websocket</artifactId></dependency><dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId></dependency>
@Configurationpublic class WebSocketConfig { //tomcat启动无需该配置 @Bean public ServerEndpointExporter serverEndpointExporter() { return new ServerEndpointExporter(); }}
@Component@ServerEndpoint(value = "/message")@Slf4jpublic class WebSocket { private static Map<String, WebSocket> webSocketSet = new ConcurrentHashMap<>(); private Session session; @OnOpen public void onOpen(Session session) throws SocketException { this.session = session; webSocketSet.put(this.session.getId(),this); log.info("【websocket】有新的连接,总数:{}",webSocketSet.size()); } @OnClose public void onClose(){ String id = this.session.getId(); if (id != null){ webSocketSet.remove(id); log.info("【websocket】连接断开:总数:{}",webSocketSet.size()); } } @OnMessage public void onMessage(String message){ if (!message.equals("ping")){ log.info("【wesocket】收到客户端发送的消息,message={}",message); sendMessage(message); } } /** * 发送消息 * @param message * @return */ public void sendMessage(String message){ for (WebSocket webSocket : webSocketSet.values()) { webSocket.session.getAsyncRemote().sendText(message); } log.info("【wesocket】发送消息,message={}", message); }}
<div> <input type="text" name="message" id="message"> <button id="sendBtn">发送</button></div><div style="width:100px;height: 500px;" id="content"></div><script src="https://cdn.bootcdn.net/ajax/libs/jquery/3.6.0/jquery.js"></script><script type="text/javascript"> var ws = new WebSocket("ws://127.0.0.1:8080/message"); ws.onopen = function(evt) { console.log("Connection open ..."); }; ws.onmessage = function(evt) { console.log( "Received Message: " + evt.data); var p = $("<p>"+evt.data+"</p>") $("#content").prepend(p); $("#message").val(""); }; ws.onclose = function(evt) { console.log("Connection closed."); }; $("#sendBtn").click(function(){ var aa = $("#message").val(); ws.send(aa); })</script>
服务端和客户端中的OnOpen
、onclose
、onmessage
都是一一对应的。ws.onopen
调用服务端的@OnOpen
注解的方法,储存客户端的session信息,握手建立连接。ws.send
发送消息,对应服务端的@OnMessage
注解下面的方法接收消息。session.getAsyncRemote().sendText
发送消息,对应的客户端ws.onmessage
接收消息。@GetMapping({"","index.html"})public ModelAndView index() { ModelAndView view = new ModelAndView("index"); return view;}
RabbitMQ
作为消息中间件,而RabbitMQ
支持发布订阅模式:@Configurationpublic class RabbitConfig { @Bean public FanoutExchange fanoutExchange() { return new FanoutExchange("PUBLISH_SUBSCRIBE_EXCHANGE"); } @Bean public Queue psQueue() throws SocketException { // ip + 端口 为队列名 String ip = IpUtils.getServerIp() + "_" + IpUtils.getPort(); return new Queue("ps_" + ip); } @Bean public Binding routingFirstBinding() throws SocketException { return BindingBuilder.bind(psQueue()).to(fanoutExchange()); }}
获取服务器IP和端口可以具体查看Github源码,这里就不做详细描述了。
WebSocket
添加消息的接收方法,@RabbitListener
接收消息,队列名称使用常量命名,动态队列名称使用 #{name}
,其中的name
是Queue
的bean
名称:@RabbitListener(queues= "#{psQueue.name}")public void pubsubQueueFirst(String message) { System.out.println(message); sendMessage(message);}
然后再调用sendMessage
方法发送给所在连接的客户端。WebSocket
类的onMessage
方法将消息发送改成RabbitMQ
方式发送:@OnMessagepublic void onMessage(String message){ if (!message.equals("ping")){ log.info("【wesocket】收到客户端发送的消息,message={}",message); //sendMessage(message); if (rabbitTemplate == null) { rabbitTemplate = (RabbitTemplate) SpringContextUtil.getBean("rabbitTemplate"); } rabbitTemplate.convertAndSend("PUBLISH_SUBSCRIBE_EXCHANGE", null, message); }}
消息通知流程如下所示:Edit Configurations
:server.port=8081
:8080
和8081
。在启动8081
端口的服务,将前端连接端口改成8081
:var ws = new WebSocket("ws://127.0.0.1:8081/message");
关键词:方案,解决