[Java]SpringBoot实现WebSocketServer(可分不同的客户端收发消息)

2022-01-11 881点热度 0人点赞 0条评论

本文分享我在SpringBoot下使用websocket的示例代码,可管理每个客户端的session,给不同的客户端收发信息,可管理心跳时间,以及注入service,方便业务逻辑的调用。

干货代码实现

pom.xml

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-websocket</artifactId> </dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-websocket</artifactId>
</dependency>

新建WebSocketConfig.java

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;
/**
* @author Terry E-mail: yaoxinghuo at 126 dot com
* @date 2020-12-28 16:04
* @description
*/
@Configuration
public class WebSocketConfig {
@Bean
public ServerEndpointExporter serverEndpointExporter() {
return new ServerEndpointExporter();
}
}
import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.web.socket.server.standard.ServerEndpointExporter; /** * @author Terry E-mail: yaoxinghuo at 126 dot com * @date 2020-12-28 16:04 * @description */ @Configuration public class WebSocketConfig { @Bean public ServerEndpointExporter serverEndpointExporter() { return new ServerEndpointExporter(); } }
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;

/**
 * @author Terry E-mail: yaoxinghuo at 126 dot com
 * @date 2020-12-28 16:04
 * @description
 */
@Configuration
public class WebSocketConfig {
    @Bean
    public ServerEndpointExporter serverEndpointExporter() {
        return new ServerEndpointExporter();
    }
}

新建WebSocketServer.java

客户端连接就使用SpringBoot下(application.properties里的server.port所设定)开放的端口,例如http://127.0.0.1:8080/ws/abc123 或 ws://127.0.0.1:8080/ws/abc123

import javax.websocket.*
@Component
@ServerEndpoint(value = "/ws/{sid}")
public class WebSocketServer {
// private static final Log log = LogFactory.getLog(WebSocketServer.class);
private static long commandSerial = 0; //发送消息的序列号,比如每发一次,+1
// 可以在这里调用service,来实现业务逻辑的调用
// @Resource(name = "deviceService")
// private final IDeviceService deviceService = SpringContext.getBean(DeviceServiceImpl.class);
/**
* concurrent包的线程安全Set,用来存放每个客户端对应的WebSocketServer对象。
*/
public static CopyOnWriteArraySet<WebSocketServer> webSocketSet = new CopyOnWriteArraySet<>();
/**
* 与某个客户端的连接会话,需要通过它来给客户端发送数据
*/
private Session session;
public String sid = null;
// 最近活跃(心跳)时间,可以启一个排程,每个多少时间,检测webSocketSet的List里面的最后活跃时间,来断开是否假死的连接
public long lastActive = System.currentTimeMillis();
public long lastConnect = 0;
/**
* 连接建立成功调用的方法
* 每次客户连接传一个sid,这个sid让客户端生成,作为一个客户端唯一的标记
**/
@OnOpen
public void onOpen(Session session, @PathParam("sid") String sid) {
// 这里onOpen可以根据自己的业务需求做一些客户端的验证工作
this.session = session;
this.sid = sid;
this.lastConnect = System.currentTimeMillis();
webSocketSet.stream().filter(item -> (item.app + "-" + item.id).equals(this.app + "-" + this.id)).findFirst().ifPresent(item -> {
try {
item.session.close();
} catch (IOException ignored) {
}
item.sid = null;
item.session = null;
});
webSocketSet.add(this);
// deviceService.wsUpdateClients(sid, webSocketSet);
log.info("websocket session connected, sid: " + sid);
}
@OnClose
public void onClose() {
//从set中删除
if (this.id != null) {
webSocketSet.remove(this);
// deviceService.wsUpdateClients(this.sid, webSocketSet);
log.warn("websocket session closed, token: " + this.sid);
this.sid = null;
this.session = null;
}
}
@OnMessage
public void onMessage(String message, Session session) {
if (this.sid == null) {
log.warn("websocket receive message from null sid, abort");
return;
}
log.debug("websocket receive message from: " + sid + ", message: " + message);
updateLastActive();
// 下面可以处理客户端发过来的消息
}
@OnError
public void onError(Session session, Throwable e) {
log.error("websocket error, app: " + app + ", sid: " + sid, e);
closeSession();
}
public void closeSession() {
try {
if (session != null) {
session.close();
}
} catch (IOException ignored) {
}
this.onClose();
}
public long sendMessage(JSONObject message) throws IOException {
long l = ++commandSerial;
if (l >= Long.MAX_VALUE - 1) {
commandSerial = 0;
}
message.put("serial", l);
sendMessage(message.toString());
return l;
}
private void sendMessage(String message) throws IOException {
this.session.getBasicRemote().sendText(message);
}
/**
* 发消息,可以给不同的客户端发送不同的消息
*/
public static JsonResult sendMessage(String message, @PathParam("sid") String sid) {
log.info("websocket send message to: " + sid + ", message:" + message);
WebSocketServer webSocketServer = webSocketSet.stream().filter(item -> sid.equals(item.sid)).findFirst().orElse(null);
if (webSocketServer == null) {
return JsonResult.fail("找不到连接的客户端");
}
try {
webSocketServer.sendMessage(message);
} catch (IOException e) {
log.error("websocket error send message to: " + sid + ", app: " + webSocketServer.app, e);
return JsonResult.fail("发送消息错误:" + e.getMessage());
}
return JsonResult.success();
}
public void updateLastActive() {
this.lastActive = System.currentTimeMillis();
}
public static WebSocketServer findWebSocketServer(String sid) {
return webSocketSet.stream().filter(item -> sid.equals(item.sid)).findFirst().orElse(null);
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
WebSocketServer that = (WebSocketServer) o;
return (sid).equals(that.sid);
}
@Override
public int hashCode() {
return Objects.hash(sid);
}
}
import javax.websocket.* @Component @ServerEndpoint(value = "/ws/{sid}") public class WebSocketServer { // private static final Log log = LogFactory.getLog(WebSocketServer.class); private static long commandSerial = 0; //发送消息的序列号,比如每发一次,+1 // 可以在这里调用service,来实现业务逻辑的调用 // @Resource(name = "deviceService") // private final IDeviceService deviceService = SpringContext.getBean(DeviceServiceImpl.class); /** * concurrent包的线程安全Set,用来存放每个客户端对应的WebSocketServer对象。 */ public static CopyOnWriteArraySet<WebSocketServer> webSocketSet = new CopyOnWriteArraySet<>(); /** * 与某个客户端的连接会话,需要通过它来给客户端发送数据 */ private Session session; public String sid = null; // 最近活跃(心跳)时间,可以启一个排程,每个多少时间,检测webSocketSet的List里面的最后活跃时间,来断开是否假死的连接 public long lastActive = System.currentTimeMillis(); public long lastConnect = 0; /** * 连接建立成功调用的方法 * 每次客户连接传一个sid,这个sid让客户端生成,作为一个客户端唯一的标记 **/ @OnOpen public void onOpen(Session session, @PathParam("sid") String sid) { // 这里onOpen可以根据自己的业务需求做一些客户端的验证工作 this.session = session; this.sid = sid; this.lastConnect = System.currentTimeMillis(); webSocketSet.stream().filter(item -> (item.app + "-" + item.id).equals(this.app + "-" + this.id)).findFirst().ifPresent(item -> { try { item.session.close(); } catch (IOException ignored) { } item.sid = null; item.session = null; }); webSocketSet.add(this); // deviceService.wsUpdateClients(sid, webSocketSet); log.info("websocket session connected, sid: " + sid); } @OnClose public void onClose() { //从set中删除 if (this.id != null) { webSocketSet.remove(this); // deviceService.wsUpdateClients(this.sid, webSocketSet); log.warn("websocket session closed, token: " + this.sid); this.sid = null; this.session = null; } } @OnMessage public void onMessage(String message, Session session) { if (this.sid == null) { log.warn("websocket receive message from null sid, abort"); return; } log.debug("websocket receive message from: " + sid + ", message: " + message); updateLastActive(); // 下面可以处理客户端发过来的消息 } @OnError public void onError(Session session, Throwable e) { log.error("websocket error, app: " + app + ", sid: " + sid, e); closeSession(); } public void closeSession() { try { if (session != null) { session.close(); } } catch (IOException ignored) { } this.onClose(); } public long sendMessage(JSONObject message) throws IOException { long l = ++commandSerial; if (l >= Long.MAX_VALUE - 1) { commandSerial = 0; } message.put("serial", l); sendMessage(message.toString()); return l; } private void sendMessage(String message) throws IOException { this.session.getBasicRemote().sendText(message); } /** * 发消息,可以给不同的客户端发送不同的消息 */ public static JsonResult sendMessage(String message, @PathParam("sid") String sid) { log.info("websocket send message to: " + sid + ", message:" + message); WebSocketServer webSocketServer = webSocketSet.stream().filter(item -> sid.equals(item.sid)).findFirst().orElse(null); if (webSocketServer == null) { return JsonResult.fail("找不到连接的客户端"); } try { webSocketServer.sendMessage(message); } catch (IOException e) { log.error("websocket error send message to: " + sid + ", app: " + webSocketServer.app, e); return JsonResult.fail("发送消息错误:" + e.getMessage()); } return JsonResult.success(); } public void updateLastActive() { this.lastActive = System.currentTimeMillis(); } public static WebSocketServer findWebSocketServer(String sid) { return webSocketSet.stream().filter(item -> sid.equals(item.sid)).findFirst().orElse(null); } @Override public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; WebSocketServer that = (WebSocketServer) o; return (sid).equals(that.sid); } @Override public int hashCode() { return Objects.hash(sid); } }
import javax.websocket.*

@Component
@ServerEndpoint(value = "/ws/{sid}")
public class WebSocketServer {
    // private static final Log log = LogFactory.getLog(WebSocketServer.class);

    private static long commandSerial = 0; //发送消息的序列号,比如每发一次,+1

    // 可以在这里调用service,来实现业务逻辑的调用
    // @Resource(name = "deviceService")
    // private final IDeviceService deviceService = SpringContext.getBean(DeviceServiceImpl.class);

    /**
     * concurrent包的线程安全Set,用来存放每个客户端对应的WebSocketServer对象。
     */
    public static CopyOnWriteArraySet<WebSocketServer> webSocketSet = new CopyOnWriteArraySet<>();
    /**
     * 与某个客户端的连接会话,需要通过它来给客户端发送数据
     */
    private Session session;

    public String sid = null;

    // 最近活跃(心跳)时间,可以启一个排程,每个多少时间,检测webSocketSet的List里面的最后活跃时间,来断开是否假死的连接
    public long lastActive = System.currentTimeMillis(); 
    public long lastConnect = 0;

    /**
     * 连接建立成功调用的方法
     * 每次客户连接传一个sid,这个sid让客户端生成,作为一个客户端唯一的标记
     **/
    @OnOpen
    public void onOpen(Session session, @PathParam("sid") String sid) {

        // 这里onOpen可以根据自己的业务需求做一些客户端的验证工作

        this.session = session;

        this.sid = sid;
        this.lastConnect = System.currentTimeMillis();

        webSocketSet.stream().filter(item -> (item.app + "-" + item.id).equals(this.app + "-" + this.id)).findFirst().ifPresent(item -> {
            try {
                item.session.close();
            } catch (IOException ignored) {
            }
            item.sid = null;
            item.session = null;
        });

        webSocketSet.add(this);
        // deviceService.wsUpdateClients(sid, webSocketSet);
        log.info("websocket session connected, sid: " + sid);
    }

    @OnClose
    public void onClose() {
        //从set中删除
        if (this.id != null) {
            webSocketSet.remove(this);
            // deviceService.wsUpdateClients(this.sid, webSocketSet);
            log.warn("websocket session closed, token: " + this.sid);
            this.sid = null;
            this.session = null;
        }
    }

    @OnMessage
    public void onMessage(String message, Session session) {
        if (this.sid == null) {
            log.warn("websocket receive message from null sid, abort");
            return;
        }
        log.debug("websocket receive message from: " + sid + ", message: " + message);
        updateLastActive();
//        下面可以处理客户端发过来的消息
    }

    @OnError
    public void onError(Session session, Throwable e) {
        log.error("websocket error, app: " + app + ", sid: " + sid, e);
        closeSession();
    }

    public void closeSession() {
        try {
            if (session != null) {
                session.close();
            }
        } catch (IOException ignored) {
        }
        this.onClose();
    }

    public long sendMessage(JSONObject message) throws IOException {
        long l = ++commandSerial;
        if (l >= Long.MAX_VALUE - 1) {
            commandSerial = 0;
        }
        message.put("serial", l);
        sendMessage(message.toString());
        return l;
    }

    private void sendMessage(String message) throws IOException {
        this.session.getBasicRemote().sendText(message);
    }

    /**
     * 发消息,可以给不同的客户端发送不同的消息
     */
    public static JsonResult sendMessage(String message, @PathParam("sid") String sid) {
        log.info("websocket send message to: " + sid + ", message:" + message);
        WebSocketServer webSocketServer = webSocketSet.stream().filter(item -> sid.equals(item.sid)).findFirst().orElse(null);
        if (webSocketServer == null) {
            return JsonResult.fail("找不到连接的客户端");
        }
        try {
            webSocketServer.sendMessage(message);
        } catch (IOException e) {
            log.error("websocket error send message to: " + sid + ", app: " + webSocketServer.app, e);
            return JsonResult.fail("发送消息错误:" + e.getMessage());
        }
        return JsonResult.success();
    }

    public void updateLastActive() {
        this.lastActive = System.currentTimeMillis();
    }

    public static WebSocketServer findWebSocketServer(String sid) {
        return webSocketSet.stream().filter(item -> sid.equals(item.sid)).findFirst().orElse(null);
    }

    @Override
    public boolean equals(Object o) {
        if (this == o) return true;
        if (o == null || getClass() != o.getClass()) return false;
        WebSocketServer that = (WebSocketServer) o;
        return (sid).equals(that.sid);
    }

    @Override
    public int hashCode() {
        return Objects.hash(sid);
    }
}

SpringContext.java

import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.stereotype.Component;
/**
* @author Terry E-mail: yaoxinghuo at 126 dot com
* @date 2020-12-28 18:51
* @description
*/
@Component
public class SpringContext implements ApplicationContextAware {
private static ApplicationContext context;
@Override
public void setApplicationContext(ApplicationContext context) throws BeansException {
SpringContext.context = context;
}
public ApplicationContext getApplicationContext() {
return context;
}
public static <T> T getBean(Class<T> beanClass) {
return context.getBean(beanClass);
}
}
import org.springframework.beans.BeansException; import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContextAware; import org.springframework.stereotype.Component; /** * @author Terry E-mail: yaoxinghuo at 126 dot com * @date 2020-12-28 18:51 * @description */ @Component public class SpringContext implements ApplicationContextAware { private static ApplicationContext context; @Override public void setApplicationContext(ApplicationContext context) throws BeansException { SpringContext.context = context; } public ApplicationContext getApplicationContext() { return context; } public static <T> T getBean(Class<T> beanClass) { return context.getBean(beanClass); } }
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.stereotype.Component;

/**
 * @author Terry E-mail: yaoxinghuo at 126 dot com
 * @date 2020-12-28 18:51
 * @description
 */
@Component
public class SpringContext implements ApplicationContextAware {
    private static ApplicationContext context;

    @Override
    public void setApplicationContext(ApplicationContext context) throws BeansException {
        SpringContext.context = context;
    }

    public ApplicationContext getApplicationContext() {
        return context;
    }

    public static <T> T getBean(Class<T> beanClass) {
        return context.getBean(beanClass);
    }
}

用Java实现的WebSocket客户端来连接,请看:https://blog.terrynow.com/2022/01/19/java-okhttp3-implement-websocket-client/

admin

这个人很懒,什么都没留下

文章评论

您需要 登录 之后才可以评论