[Java]SpringBoot实现WebSocketServer(可分不同的客户端收发消息)

2022-01-11 208点热度 0人点赞 0条评论

本文分享我在SpringBoot下使用websocket的示例代码,可管理每个客户端的session,给不同的客户端收发信息,可管理心跳时间,以及注入service,方便业务逻辑的调用。

干货代码实现

pom.xml

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-websocket</artifactId>
</dependency>

新建WebSocketConfig.java

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;

/**
 * @author Terry E-mail: yaoxinghuo at 126 dot com
 * @date 2020-12-28 16:04
 * @description
 */
@Configuration
public class WebSocketConfig {
    @Bean
    public ServerEndpointExporter serverEndpointExporter() {
        return new ServerEndpointExporter();
    }
}

新建WebSocketServer.java

客户端连接就使用SpringBoot下(application.properties里的server.port所设定)开放的端口,例如http://127.0.0.1:8080/ws/abc123 或 ws://127.0.0.1:8080/ws/abc123

import javax.websocket.*

@Component
@ServerEndpoint(value = "/ws/{sid}")
public class WebSocketServer {
    // private static final Log log = LogFactory.getLog(WebSocketServer.class);

    private static long commandSerial = 0; //发送消息的序列号,比如每发一次,+1

    // 可以在这里调用service,来实现业务逻辑的调用
    // @Resource(name = "deviceService")
    // private final IDeviceService deviceService = SpringContext.getBean(DeviceServiceImpl.class);

    /**
     * concurrent包的线程安全Set,用来存放每个客户端对应的WebSocketServer对象。
     */
    public static CopyOnWriteArraySet<WebSocketServer> webSocketSet = new CopyOnWriteArraySet<>();
    /**
     * 与某个客户端的连接会话,需要通过它来给客户端发送数据
     */
    private Session session;

    public String sid = null;

    // 最近活跃(心跳)时间,可以启一个排程,每个多少时间,检测webSocketSet的List里面的最后活跃时间,来断开是否假死的连接
    public long lastActive = System.currentTimeMillis(); 
    public long lastConnect = 0;

    /**
     * 连接建立成功调用的方法
     * 每次客户连接传一个sid,这个sid让客户端生成,作为一个客户端唯一的标记
     **/
    @OnOpen
    public void onOpen(Session session, @PathParam("sid") String sid) {

        // 这里onOpen可以根据自己的业务需求做一些客户端的验证工作

        this.session = session;

        this.sid = sid;
        this.lastConnect = System.currentTimeMillis();

        webSocketSet.stream().filter(item -> (item.app + "-" + item.id).equals(this.app + "-" + this.id)).findFirst().ifPresent(item -> {
            try {
                item.session.close();
            } catch (IOException ignored) {
            }
            item.sid = null;
            item.session = null;
        });

        webSocketSet.add(this);
        // deviceService.wsUpdateClients(sid, webSocketSet);
        log.info("websocket session connected, sid: " + sid);
    }

    @OnClose
    public void onClose() {
        //从set中删除
        if (this.id != null) {
            webSocketSet.remove(this);
            // deviceService.wsUpdateClients(this.sid, webSocketSet);
            log.warn("websocket session closed, token: " + this.sid);
            this.sid = null;
            this.session = null;
        }
    }

    @OnMessage
    public void onMessage(String message, Session session) {
        if (this.sid == null) {
            log.warn("websocket receive message from null sid, abort");
            return;
        }
        log.debug("websocket receive message from: " + sid + ", message: " + message);
        updateLastActive();
//        下面可以处理客户端发过来的消息
    }

    @OnError
    public void onError(Session session, Throwable e) {
        log.error("websocket error, app: " + app + ", sid: " + sid, e);
        closeSession();
    }

    public void closeSession() {
        try {
            if (session != null) {
                session.close();
            }
        } catch (IOException ignored) {
        }
        this.onClose();
    }

    public long sendMessage(JSONObject message) throws IOException {
        long l = ++commandSerial;
        if (l >= Long.MAX_VALUE - 1) {
            commandSerial = 0;
        }
        message.put("serial", l);
        sendMessage(message.toString());
        return l;
    }

    private void sendMessage(String message) throws IOException {
        this.session.getBasicRemote().sendText(message);
    }

    /**
     * 发消息,可以给不同的客户端发送不同的消息
     */
    public static JsonResult sendMessage(String message, @PathParam("sid") String sid) {
        log.info("websocket send message to: " + sid + ", message:" + message);
        WebSocketServer webSocketServer = webSocketSet.stream().filter(item -> sid.equals(item.sid)).findFirst().orElse(null);
        if (webSocketServer == null) {
            return JsonResult.fail("找不到连接的客户端");
        }
        try {
            webSocketServer.sendMessage(message);
        } catch (IOException e) {
            log.error("websocket error send message to: " + sid + ", app: " + webSocketServer.app, e);
            return JsonResult.fail("发送消息错误:" + e.getMessage());
        }
        return JsonResult.success();
    }

    public void updateLastActive() {
        this.lastActive = System.currentTimeMillis();
    }

    public static WebSocketServer findWebSocketServer(String sid) {
        return webSocketSet.stream().filter(item -> sid.equals(item.sid)).findFirst().orElse(null);
    }

    @Override
    public boolean equals(Object o) {
        if (this == o) return true;
        if (o == null || getClass() != o.getClass()) return false;
        WebSocketServer that = (WebSocketServer) o;
        return (sid).equals(that.sid);
    }

    @Override
    public int hashCode() {
        return Objects.hash(sid);
    }
}

SpringContext.java

import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.stereotype.Component;

/**
 * @author Terry E-mail: yaoxinghuo at 126 dot com
 * @date 2020-12-28 18:51
 * @description
 */
@Component
public class SpringContext implements ApplicationContextAware {
    private static ApplicationContext context;

    @Override
    public void setApplicationContext(ApplicationContext context) throws BeansException {
        SpringContext.context = context;
    }

    public ApplicationContext getApplicationContext() {
        return context;
    }

    public static <T> T getBean(Class<T> beanClass) {
        return context.getBean(beanClass);
    }
}

用Java实现的WebSocket客户端来连接,请看:https://blog.terrynow.com/2022/01/19/java-okhttp3-implement-websocket-client/

admin

这个人很懒,什么都没留下

文章评论

您需要 登录 之后才可以评论