Spring实现WebSocket服务端和客户端。

1.业务场景描述

这个案例主要是用来监控用的,就是把接口中的数据实时的展示到页面上,比如一个下订单的接口,在外界访问订单接口时,订单存入到数据库后需要将订单数据展示到页面上。因此就可以在接口中使用Java客户端将订单信息发送到websocket服务器,然后页面作为客户端把接口订单数据暂时到页面上。在接口中将Java客户端的连接传入接口中,直接通过连接发送订单信息到服务器。连接websocket的客户端代码通过静态代码块的方式加载,然后将建立连接后的websocket连接存到静态变量,在接口中通过静态变量,就可以获取websocket连接,进而达到在接口中发送订单信息的目的。

2.所需jar包

服务端:spring-websocket,spring-messaging

<dependency>
    <groupId>org.springframework</groupId>
    <artifactId>spring-websocket</artifactId>
    <version>4.3.4.RELEASE</version>
</dependency>
<dependency>
    <groupId>org.springframework</groupId>
    <artifactId>spring-messaging</artifactId>
    <version>4.3.4.RELEASE</version>
</dependency>

客户端:javax.websocket-api,tyrus-standalone-client

<dependency>
    <groupId>javax.websocket</groupId>
    <artifactId>javax.websocket-api</artifactId>
    <version>1.1</version>
</dependency>
<dependency>
    <groupId>org.glassfish.tyrus.bundles</groupId>
    <artifactId>tyrus-standalone-client</artifactId>
    <version>1.9</version>
</dependency>

服务端

3.websocket配置文件

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.web.servlet.config.annotation.EnableWebMvc;
import org.springframework.web.servlet.config.annotation.WebMvcConfigurerAdapter;
import org.springframework.web.socket.WebSocketHandler;
import org.springframework.web.socket.config.annotation.EnableWebSocket;
import org.springframework.web.socket.config.annotation.WebSocketConfigurer;
import org.springframework.web.socket.config.annotation.WebSocketHandlerRegistry;
 
/**
 * websocket配置文件
 * 有了该配置文件,就不用在spring-mvc.xml中进行websocket的配置
 * EnableWebSocket注解 :开启websocket服务
 * @author nowjava
 */
@Configuration
@EnableWebMvc
@EnableWebSocket
public class WebSocketConfig extends WebMvcConfigurerAdapter implements WebSocketConfigurer {
 
    /**
     *添加处理器和拦截器,处理器后面的地址就是websocket的访问路径
     * setAllowedOrigins:指定的域名或IP,如果不限制使用"*"就可以了
     * @param registry
     */
    @Override
    public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
        registry.addHandler(myWebSocketHandler(), "/websocket/demo")
                .addInterceptors(myInterceptor()).setAllowedOrigins("*");
 
    }
 
    /**
     * 直接注入自己定义的websocket处理器
     * @return
     */
    @Bean
    public WebSocketHandler myWebSocketHandler(){
        return new MyWebSocketHandler();
    }
 
    /**
     * 直接注入自己定义的websocket拦截器
     * @return
     */
    @Bean
    public WebSocketHandshakeInterceptor myInterceptor(){
        return new WebSocketHandshakeInterceptor();
    }
}

4.websocket拦截器

import org.springframework.http.server.ServerHttpRequest;
import org.springframework.http.server.ServerHttpResponse;
import org.springframework.web.socket.WebSocketHandler;
import org.springframework.web.socket.server.HandshakeInterceptor;
 
import java.util.Map;
 
/**
 * websocket拦截器:一般情况下不做处理
 */
public class WebSocketHandshakeInterceptor implements HandshakeInterceptor {
 
    @Override
    public void afterHandshake(ServerHttpRequest arg0, ServerHttpResponse arg1, WebSocketHandler arg2, Exception arg3) {
 
    }
 
    @Override
    public boolean beforeHandshake(ServerHttpRequest arg0, ServerHttpResponse arg1, WebSocketHandler arg2,
                                   Map<String, Object> arg3) throws Exception {
        return true;
    }
 
}

5.websocket处理器

import org.springframework.scheduling.annotation.Async;
import org.springframework.web.socket.*;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.*;
 
/**
 * websocket处理器:功能实现的核心代码编写类
 * @author nowjava
 */
public class MyWebSocketHandler implements WebSocketHandler{
 
    /**
     * 定义一个全局的初始化值count=0,记录连接数
     */
    private static int count = 0;
 
    /**
     * 记录所有的客户端连接
     */
 
    private volatile static List<WebSocketSession> sessions = Collections.synchronizedList(new ArrayList());
 
    
    @Override
    public void afterConnectionClosed(WebSocketSession session, CloseStatus closeStatus) throws Exception {
        sessions.remove(session);
        if (session.isOpen()){
            session.close();
        }
        count = count-1;
        System.out.println(count);
    }
 
    /**
     * 建立连接后的操作
     * @param session
     * @throws Exception
     */
    @Override
    public void afterConnectionEstablished(WebSocketSession session) throws Exception {
        count++;
        sessions.add(session);
 
    }
 
    /**
     * 消息处理,在客户端通过Websocket API发送的消息会经过这里,然后进行相应的处理
     * @param session
     * @param message
     * @throws Exception
     */
    @Override
    public void handleMessage(WebSocketSession session, WebSocketMessage<?> message) throws Exception {
        String data = message.getPayload().toString();
        System.out.println("客户端" + message.getPayload().toString());
        if(!data.equals("B")) {
            sessions.remove(session);
            try {
                 for (WebSocketSession ws : sessions) {
                     if (ws.isOpen()) {
                         synchronized (ws) {
                            ws.sendMessage(new TextMessage("" + data));
                            System.out.println("服务端" + data);
                         }
 
                      }
                 }
            } catch (IOException e) {
                e.printStackTrace();
             
            }
        }
    }
 
    /**
     * 消息传输错误处理
     * @param session
     * @param throwable
     * @throws Exception
     */
    @Override
    public void handleTransportError(WebSocketSession session, Throwable throwable) throws Exception {
        if(session.isOpen()){
            sessions.remove(session);
            session.close();
        }
    }
 
    @Override
    public boolean supportsPartialMessages() {
        return false;
    }
 
}

客户端

6.Java客户端代码

展开阅读全文

本文系作者在时代Java发表,未经许可,不得转载。

如有侵权,请联系nowjava@qq.com删除。

编辑于

关注时代Java

关注时代Java