本文共 6404 字,大约阅读时间需要 21 分钟。
0.写个消息实体类/** * 消息类 */public class Message { // 发送者 public String from; // 接收者 public String to; // 发送的文本 public String text;}1.先配置import javax.annotation.Resource;import org.springframework.stereotype.Component;import org.springframework.web.servlet.config.annotation.WebMvcConfigurerAdapter;import org.springframework.web.socket.config.annotation.EnableWebSocket;import org.springframework.web.socket.config.annotation.WebSocketConfigurer;import org.springframework.web.socket.config.annotation.WebSocketHandlerRegistry;/** * WebScoket配置处理器 */@Component@EnableWebSocketpublic class WebSocketConfig extends WebMvcConfigurerAdapter implements WebSocketConfigurer { @Resource MyWebSocketHandler handler; public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) { registry.addHandler(handler, "/ws").addInterceptors(new HandShake()); registry.addHandler(handler, "/ws/sockjs").addInterceptors(new HandShake()).withSockJS(); }}2.握手import java.util.Map;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.http.server.ServerHttpRequest;import org.springframework.http.server.ServerHttpResponse;import org.springframework.web.socket.WebSocketHandler;import org.springframework.web.socket.server.HandshakeInterceptor;import com.fm.daimler.listener.constant.WS;import com.fm.daimler.listener.service.WechatListenerService;/** * WEBSOCKET拦截器 * * @author swt **/public class HandShake implements HandshakeInterceptor { private static final Logger LOG = LoggerFactory.getLogger(WechatListenerService.class); public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Mapattributes) throws Exception { String token = getToken(request.getURI().toString()); LOG.info("TOKEN为【{}】的用户准备建立消息连接", token); attributes.put(WS.TOKEN, token); return true; } public void afterHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Exception exception) { String token = getToken(request.getURI().toString()); LOG.info("TOKEN为【{}】的用户建立消息连接已完成", token); } private String getToken(String uri) { int index = uri.indexOf("token="); return uri.substring(index + 6, uri.length()); }}3.对话import java.io.IOException;import java.util.HashMap;import java.util.Iterator;import java.util.Map;import java.util.Map.Entry;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.stereotype.Component;import org.springframework.web.socket.CloseStatus;import org.springframework.web.socket.TextMessage;import org.springframework.web.socket.WebSocketHandler;import org.springframework.web.socket.WebSocketMessage;import org.springframework.web.socket.WebSocketSession;import com.alibaba.fastjson.JSONObject;import com.fm.daimler.listener.constant.Symbol;import com.fm.daimler.listener.constant.WS;import com.fm.daimler.listener.service.WechatListenerService;/** * @author swt **/@Componentpublic class MyWebSocketHandler implements WebSocketHandler { private static final Logger LOG = LoggerFactory.getLogger(WechatListenerService.class); public static final Map userSocketSessionMap; static { userSocketSessionMap = new HashMap (); } /** * 建立连接后 */ @Override public void afterConnectionEstablished(WebSocketSession session) throws Exception { System.out.println("3建立连接后,再之后WebSocketHandler"); String token = session.getAttributes().get(WS.TOKEN).toString(); if (userSocketSessionMap.get(token) == null) { userSocketSessionMap.put(token, session); } } /** * 消息处理,在客户端通过Websocket API发送的消息会经过这里,然后进行相应的处理 */ @Override public void handleMessage(WebSocketSession session, WebSocketMessage message) throws Exception { String msgStr = message.getPayloadLength() != 0 ? message.getPayload().toString() : Symbol.EMPT; LOG.info("TOKEN为【{}】的用户发送了一条内容为【{}】的消息", session.getAttributes().get(WS.TOKEN).toString(), msgStr); if (msgStr.equals(Symbol.EMPT)) return; Message msg = JSONObject.parseObject(msgStr, Message.class); sendMessageToUser(msg.getTo(), new TextMessage(JSONObject.toJSONBytes(msg))); } /** * 消息传输错误处理 */ @Override public void handleTransportError(WebSocketSession session, Throwable exception) throws Exception { System.out.println(3); if (session.isOpen()) { session.close(); } Iterator > it = userSocketSessionMap.entrySet().iterator(); // 移除Socket会话 while (it.hasNext()) { Entry entry = it.next(); if (entry.getValue().getId().equals(session.getId())) { userSocketSessionMap.remove(entry.getKey()); System.out.println("Socket会话已经移除:用户ID" + entry.getKey()); break; } } } /** * 关闭连接后 */ @Override public void afterConnectionClosed(WebSocketSession session, CloseStatus closeStatus) throws Exception { System.out.println("5Websocket:" + session.getId() + "已经关闭"); Iterator > it = userSocketSessionMap.entrySet().iterator(); // 移除Socket会话 while (it.hasNext()) { Entry entry = it.next(); if (entry.getValue().getId().equals(session.getId())) { userSocketSessionMap.remove(entry.getKey()); System.out.println("Socket会话已经移除:用户ID" + entry.getKey()); break; } } } @Override public boolean supportsPartialMessages() { return false; } /** * 给所有在线用户发送消息 * * @param message * @throws IOException */ public void broadcast(final TextMessage message) throws IOException { Iterator > it = userSocketSessionMap.entrySet().iterator(); // 多线程群发 while (it.hasNext()) { final Entry entry = it.next(); if (entry.getValue().isOpen()) { // entry.getValue().sendMessage(message); new Thread(new Runnable() { public void run() { try { if (entry.getValue().isOpen()) { entry.getValue().sendMessage(message); } } catch (IOException e) { e.printStackTrace(); } } }).start(); } } } /** * 给某个用户发送消息 * * @param userName * @param message * @throws IOException */ public void sendMessageToUser(String token, TextMessage message) throws IOException { LOG.info("TOKEN为【{}】的用户接收了一条内容为【{}】的消息", token, message.getPayload().toString()); WebSocketSession session = userSocketSessionMap.get(token); if (session != null && session.isOpen()) { session.sendMessage(message); } }}4.前端的JS
转载地址:http://dvhws.baihongyu.com/