时间:2023-04-18 20:32:02 | 来源:网站运营
时间:2023-04-18 20:32:02 来源:网站运营
WebSocket 集群解决方案:protected void handleTextMessage(WebSocketSession session, TextMessage message) { System.out.println("服务器接收到的消息: "+ message ); //send message to client session.sendMessage(new TextMessage("message"));}
那么问题来了:ws的session无法序列化到redis,因此在集群中,我们无法将所有WebSocketSession都缓存到redis进行session共享。每台服务器都有各自的session。于此相反的是HttpSession,redis可以支持httpsession共享,但是目前没有websocket session共享的方案,因此走redis websocket session共享这条路是行不通的。/** * TODO 根据服务器传进来的id,分配到不同的group */ private static final ChannelGroup GROUP = new DefaultChannelGroup(ImmediateEventExecutor.INSTANCE); @Override protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception { //retain增加引用计数,防止接下来的调用引用失效 System.out.println("服务器接收到来自 " + ctx.channel().id() + " 的消息: " + msg.text()); //将消息发送给group里面的所有channel,也就是发送消息给客户端 GROUP.writeAndFlush(msg.retain()); }
那么,服务端用netty还是用spring websocket?以下我将从几个方面列举这两种实现方式的优缺点<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-websocket</artifactId></dependency>
第二步:添加配置类@Configurationpublic class WebSocketConfig implements WebSocketConfigurer {@Overridepublic void registerWebSocketHandlers(WebSocketHandlerRegistry registry) { registry.addHandler(myHandler(), "/") .setAllowedOrigins("*");} @Bean public WebSocketHandler myHandler() { return new MessageHandler(); }}
第三步:实现消息监听类@Component@SuppressWarnings("unchecked")public class MessageHandler extends TextWebSocketHandler { private List<WebSocketSession> clients = new ArrayList<>(); @Override public void afterConnectionEstablished(WebSocketSession session) { clients.add(session); System.out.println("uri :" + session.getUri()); System.out.println("连接建立: " + session.getId()); System.out.println("current seesion: " + clients.size()); } @Override public void afterConnectionClosed(WebSocketSession session, CloseStatus status) { clients.remove(session); System.out.println("断开连接: " + session.getId()); } @Override protected void handleTextMessage(WebSocketSession session, TextMessage message) { String payload = message.getPayload(); Map<String, String> map = JSONObject.parseObject(payload, HashMap.class); System.out.println("接受到的数据" + map); clients.forEach(s -> { try { System.out.println("发送消息给: " + session.getId()); s.sendMessage(new TextMessage("服务器返回收到的信息," + payload)); } catch (Exception e) { e.printStackTrace(); } }); }}
从这个demo中,使用spring websocket实现ws服务的便利性大家可想而知了。为了能更好地向spring cloud大家族看齐,我最终采用了spring websocket实现ws服务。server: port: 443 ssl: enabled: true key-store: classpath:xxx.jks key-store-password: xxxx key-store-type: JKS key-alias: aliasspring: application: name: api-gateway cloud: gateway: httpclient: ssl: handshake-timeout-millis: 10000 close-notify-flush-timeout-millis: 3000 close-notify-read-timeout-millis: 0 useInsecureTrustManager: true discovery: locator: enabled: true lower-case-service-id: true routes: - id: dc uri: lb://dc predicates: - Path=/dc/** - id: wecheck uri: lb://wecheck predicates: - Path=/wecheck/**
如果要愉快地玩https卸载,我们还需要配置一个filter,否则请求网关时会出现错误not an SSL/TLS record@Componentpublic class HttpsToHttpFilter implements GlobalFilter, Ordered { private static final int HTTPS_TO_HTTP_FILTER_ORDER = 10099; @Override public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) { URI originalUri = exchange.getRequest().getURI(); ServerHttpRequest request = exchange.getRequest(); ServerHttpRequest.Builder mutate = request.mutate(); String forwardedUri = request.getURI().toString(); if (forwardedUri != null && forwardedUri.startsWith("https")) { try { URI mutatedUri = new URI("http", originalUri.getUserInfo(), originalUri.getHost(), originalUri.getPort(), originalUri.getPath(), originalUri.getQuery(), originalUri.getFragment()); mutate.uri(mutatedUri); } catch (Exception e) { throw new IllegalStateException(e.getMessage(), e); } } ServerHttpRequest build = mutate.build(); ServerWebExchange webExchange = exchange.mutate().request(build).build(); return chain.filter(webExchange); } @Override public int getOrder() { return HTTPS_TO_HTTP_FILTER_ORDER; }}
这样子我们就可以使用gateway来卸载https请求了,到目前为止,我们的基本框架已经搭建完毕,网关既可以转发https请求,也可以转发wss请求。接下来就是用户多对多之间session互通的通讯解决方案了。接下来,我将根据方案的优雅性,从最不优雅的方案开始讲起。@Resourceprivate EurekaClient eurekaClient; Application app = eurekaClient.getApplication("service-name");//instanceInfo包括了一台服务器ip,port等消息InstanceInfo instanceInfo = app.getInstances().get(0);System.out.println("ip address: " + instanceInfo.getIPAddr());
服务器需要维护关系映射表,将用户的id与session做映射,session建立时在映射表中添加映射关系,session断开后要删除映射表内关联关系现假设集群中有服务 CacheB上线了,该服务器的ip地址刚好被映射到key1和 cacheA之间。那么key1对应的用户每次要发消息时都跑去 CacheB发送消息,结果明显是发送不了消息,因为 CacheB没有key1对应的session。
关键词:方案,解决