本文分享我在SpringBoot下使用websocket的示例代码,可管理每个客户端的session,给不同的客户端收发信息,可管理心跳时间,以及注入service,方便业务逻辑的调用。
干货代码实现
pom.xml
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-websocket</artifactId> </dependency>
新建WebSocketConfig.java
import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.web.socket.server.standard.ServerEndpointExporter; /** * @author Terry E-mail: yaoxinghuo at 126 dot com * @date 2020-12-28 16:04 * @description */ @Configuration public class WebSocketConfig { @Bean public ServerEndpointExporter serverEndpointExporter() { return new ServerEndpointExporter(); } }
新建WebSocketServer.java
客户端连接就使用SpringBoot下(application.properties里的server.port所设定)开放的端口,例如http://127.0.0.1:8080/ws/abc123 或 ws://127.0.0.1:8080/ws/abc123
import javax.websocket.* @Component @ServerEndpoint(value = "/ws/{sid}") public class WebSocketServer { // private static final Log log = LogFactory.getLog(WebSocketServer.class); private static long commandSerial = 0; //发送消息的序列号,比如每发一次,+1 // 可以在这里调用service,来实现业务逻辑的调用 // @Resource(name = "deviceService") // private final IDeviceService deviceService = SpringContext.getBean(DeviceServiceImpl.class); /** * concurrent包的线程安全Set,用来存放每个客户端对应的WebSocketServer对象。 */ public static CopyOnWriteArraySet<WebSocketServer> webSocketSet = new CopyOnWriteArraySet<>(); /** * 与某个客户端的连接会话,需要通过它来给客户端发送数据 */ private Session session; public String sid = null; // 最近活跃(心跳)时间,可以启一个排程,每个多少时间,检测webSocketSet的List里面的最后活跃时间,来断开是否假死的连接 public long lastActive = System.currentTimeMillis(); public long lastConnect = 0; /** * 连接建立成功调用的方法 * 每次客户连接传一个sid,这个sid让客户端生成,作为一个客户端唯一的标记 **/ @OnOpen public void onOpen(Session session, @PathParam("sid") String sid) { // 这里onOpen可以根据自己的业务需求做一些客户端的验证工作 this.session = session; this.sid = sid; this.lastConnect = System.currentTimeMillis(); webSocketSet.stream().filter(item -> (item.app + "-" + item.id).equals(this.app + "-" + this.id)).findFirst().ifPresent(item -> { try { item.session.close(); } catch (IOException ignored) { } item.sid = null; item.session = null; }); webSocketSet.add(this); // deviceService.wsUpdateClients(sid, webSocketSet); log.info("websocket session connected, sid: " + sid); } @OnClose public void onClose() { //从set中删除 if (this.id != null) { webSocketSet.remove(this); // deviceService.wsUpdateClients(this.sid, webSocketSet); log.warn("websocket session closed, token: " + this.sid); this.sid = null; this.session = null; } } @OnMessage public void onMessage(String message, Session session) { if (this.sid == null) { log.warn("websocket receive message from null sid, abort"); return; } log.debug("websocket receive message from: " + sid + ", message: " + message); updateLastActive(); // 下面可以处理客户端发过来的消息 } @OnError public void onError(Session session, Throwable e) { log.error("websocket error, app: " + app + ", sid: " + sid, e); closeSession(); } public void closeSession() { try { if (session != null) { session.close(); } } catch (IOException ignored) { } this.onClose(); } public long sendMessage(JSONObject message) throws IOException { long l = ++commandSerial; if (l >= Long.MAX_VALUE - 1) { commandSerial = 0; } message.put("serial", l); sendMessage(message.toString()); return l; } private void sendMessage(String message) throws IOException { this.session.getBasicRemote().sendText(message); } /** * 发消息,可以给不同的客户端发送不同的消息 */ public static JsonResult sendMessage(String message, @PathParam("sid") String sid) { log.info("websocket send message to: " + sid + ", message:" + message); WebSocketServer webSocketServer = webSocketSet.stream().filter(item -> sid.equals(item.sid)).findFirst().orElse(null); if (webSocketServer == null) { return JsonResult.fail("找不到连接的客户端"); } try { webSocketServer.sendMessage(message); } catch (IOException e) { log.error("websocket error send message to: " + sid + ", app: " + webSocketServer.app, e); return JsonResult.fail("发送消息错误:" + e.getMessage()); } return JsonResult.success(); } public void updateLastActive() { this.lastActive = System.currentTimeMillis(); } public static WebSocketServer findWebSocketServer(String sid) { return webSocketSet.stream().filter(item -> sid.equals(item.sid)).findFirst().orElse(null); } @Override public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; WebSocketServer that = (WebSocketServer) o; return (sid).equals(that.sid); } @Override public int hashCode() { return Objects.hash(sid); } }
SpringContext.java
import org.springframework.beans.BeansException; import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContextAware; import org.springframework.stereotype.Component; /** * @author Terry E-mail: yaoxinghuo at 126 dot com * @date 2020-12-28 18:51 * @description */ @Component public class SpringContext implements ApplicationContextAware { private static ApplicationContext context; @Override public void setApplicationContext(ApplicationContext context) throws BeansException { SpringContext.context = context; } public ApplicationContext getApplicationContext() { return context; } public static <T> T getBean(Class<T> beanClass) { return context.getBean(beanClass); } }
用Java实现的WebSocket客户端来连接,请看:https://blog.terrynow.com/2022/01/19/java-okhttp3-implement-websocket-client/
文章评论