1.通常实现方式
1.引入依赖
<!-- 实现对 WebSocket 相关依赖的引入,方便~ -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
2.添加controller
package com.example.test_demo.websocket;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Controller;
import javax.websocket.*;
import javax.websocket.server.ServerEndpoint;
@Controller
@ServerEndpoint("/web/socket")
public class WebController {
private Logger logger = LoggerFactory.getLogger(getClass());
@OnOpen
public void onOpen(Session session, EndpointConfig config) {
logger.info("[onOpen][session({}) 接入]", session);
}
@OnMessage
public void onMessage(Session session, String message) {
logger.info("[onOpen][session({}) 接收到一条消息({})]", session, message);
session.getAsyncRemote().sendText("这是你发送的消息吗"+message);
}
@OnClose
public void onClose(Session session, CloseReason closeReason) {
logger.info("[onClose][session({}) 连接关闭。关闭原因是({})}]", session, closeReason);
}
@OnError
public void onError(Session session, Throwable throwable) {
logger.info("[onClose][session({}) 发生异常]", session, throwable);
}
}
3.添加配置类
package com.example.test_demo.websocket;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;
@Configuration
public class WebSocketConfiguration {
// 该bean的作用是 扫描加有 @ServerEndPoint 注解的bean
@Bean
public ServerEndpointExporter serverEndpointExporter() {
return new ServerEndpointExporter();
}
}
4.启动测试
前段测试网址:https://www.easyswoole.com/wstool.html
2.stomp协议实现
1.导入依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
2.添加配置类
package com.websocketdemo.demos.config;
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.simp.config.MessageBrokerRegistry;
import org.springframework.web.socket.config.annotation.StompEndpointRegistry;
import org.springframework.web.socket.config.annotation.WebSocketMessageBrokerConfigurer;
@Configuration
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
// 注册STOMP端点,客户端通过该URL连接WebSocket
registry.addEndpoint("/ws") // 连接端点
.setAllowedOriginPatterns("*") // 允许跨域
.withSockJS(); // 支持SockJS
;
}
@Override
public void configureMessageBroker(MessageBrokerRegistry registry) {
// 配置消息代理
registry.enableSimpleBroker("/topic","/queue"); // 启用内存消息代理,处理以/topic为前缀的目标
registry.setApplicationDestinationPrefixes("/app"); // 应用处理以/app为前缀的消息
}
}
3.添加消息类
package com.websocketdemo.demos.entity;
import lombok.Data;
@Data
public class ChatMessage {
private String sender;
private String content;
}
4.添加controller
package com.websocketdemo.demos.controller;
import com.websocketdemo.demos.entity.ChatMessage;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.messaging.handler.annotation.MessageMapping;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.messaging.handler.annotation.SendTo;
import org.springframework.messaging.simp.annotation.SendToUser;
import org.springframework.messaging.simp.annotation.SubscribeMapping;
import org.springframework.stereotype.Controller;
import java.security.Principal;
import java.util.List;
@Controller
public class ChatController {
// 处理发送到/app/chat的消息,并将结果广播到/topic/messages
@MessageMapping("/chat")
@SendTo("/topic/messages")
public ChatMessage handleMessage(@Payload ChatMessage message, @Header("simpSessionId") String sessionId) {
message.setSender("User-" + sessionId.substring(0, 4));
return message;
}
// 处理客户端订阅/app/init时的请求,返回初始化数据(不广播)
// @SubscribeMapping("/init")
// public List<ChatMessage> handleSubscribe() {
// // 返回历史消息或初始化数据
// return messageService.getRecentMessages();
// }
// 点对点发送消息到特定用户
@MessageMapping("/private")
@SendToUser("/queue/private")
public ChatMessage sendPrivateMessage(
@Payload ChatMessage message,
// Principal principal 需要 集成 Spring Security 并配置用户登录
@Header("simpSessionId") String sessionId) {
message.setSender(sessionId);
return message;
}
}
5.测试前端代码
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>WebSocket Test</title>
<script src="https://cdn.jsdelivr.net/npm/sockjs-client@1/dist/sockjs.min.js"></script>
<script src="https://cdn.jsdelivr.net/npm/stompjs@2.3.3/lib/stomp.min.js"></script>
</head>
<body>
<h1>WebSocket 测试</h1>
<button onclick="connect()">连接</button>
<button onclick="disconnect()">断开连接</button>
<button onclick="sendChatMessage()">发送广播消息</button>
<button onclick="sendPrivateMessage()">发送私有消息</button>
<div id="output"></div>
<script>
var stompClient = null;
function setConnected(connected) {
document.getElementById('output').innerHTML = connected ? '已连接' : '断开连接';
}
function connect() {
// 如果你启用了 SockJS 支持,就使用 withSockJS,如果没有可以直接用 WebSocket
var socket = new SockJS('http://localhost:8080/ws');
stompClient = Stomp.over(socket);
stompClient.connect({}, function(frame) {
setConnected(true);
console.log('Connected: ' + frame);
// 订阅群发主题
stompClient.subscribe('/topic/messages', function(message) {
showMessage(JSON.parse(message.body));
});
// 订阅点对点主题(注意需要加上"/user"前缀)
stompClient.subscribe('/user/queue/private', function(message) {
showMessage(JSON.parse(message.body));
});
});
}
function disconnect() {
if (stompClient !== null) {
stompClient.disconnect();
}
setConnected(false);
console.log("Disconnected");
}
function sendChatMessage() {
if (stompClient) {
var msg = {
sender: "测试客户端",
content: "Hello WebSocket"
};
// 发送消息到/app/chat,服务器收到后会广播到/topic/messages
stompClient.send("/app/chat", {}, JSON.stringify(msg));
}
}
function sendPrivateMessage() {
if (stompClient) {
var msg = {
sender: "testprivate",
content: "Hello!"
};
stompClient.send("/app/private", {}, JSON.stringify(msg));
}
}
function showMessage(message) {
var output = document.getElementById('output');
output.innerHTML += "<p>" + message.sender + ": " + message.content + "</p>";
}
</script>
</body>
</html>
3.监听redis队列,并返回给对应用户
# 需要修改 将原生的ws 修改为stomp协议的
import com.baomidou.mybatisplus.core.toolkit.StringUtils;
import org.springframework.context.event.EventListener;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.listener.ChannelTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.stereotype.Component;
import org.springframework.web.socket.WebSocketSession;
import org.springframework.web.socket.messaging.SessionDisconnectEvent;
import javax.annotation.Resource;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import static cn.iocoder.yudao.framework.common.enums.RedisKeyEnum.*;
@Component
public class VerificationCodesMessageListener implements WebSocketMessageListener<String> {
@Resource
private RedisMessageListenerContainer container;
# 监听器列表
private final ConcurrentHashMap<String, RedisPubSubListener> activeListeners = new ConcurrentHashMap<>();
@Resource
private WebSocketMessageSender webSocketMessageSender;
@Resource
private StringRedisTemplate stringRedisTemplate;
@Override
public void onMessage(WebSocketSession session, String message) {
String key = String.format(VERIFICATION_SUCCESS.getPrefix(), message);
String value = stringRedisTemplate.opsForValue().get(key);
if (StringUtils.isBlank(value)) {
String channel = String.format(VERIFICATION_SUCCESS_CHANNEL.getPrefix(), message);
RedisPubSubListener listener = new RedisPubSubListener(session, channel);
container.addMessageListener(listener, new ChannelTopic(channel));
activeListeners.put(message, listener);
} else {
webSocketMessageSender.send(session.getId(), "verificationCodes", "success");
}
}
private class RedisPubSubListener implements MessageListener {
private final WebSocketSession session;
private final String channel;
private final String listenerId;
public RedisPubSubListener(WebSocketSession session, String channel) {
this.session = session;
this.channel = channel;
this.listenerId = UUID.randomUUID().toString();
}
@Override
public void onMessage(Message message, byte[] pattern) {
String receivedChannel = new String(message.getChannel());
String receivedMessage = new String(message.getBody());
if (channel.equals(receivedChannel)) {
webSocketMessageSender.send(session.getId(), "verificationCodes", receivedMessage);
container.removeMessageListener(this);
}
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
RedisPubSubListener that = (RedisPubSubListener) o;
return listenerId.equals(that.listenerId);
}
@Override
public int hashCode() {
return Objects.hash(listenerId);
}
}
@Override
public String getType() {
return "verificationCodes";
}
// 链接结束后自动销毁
@EventListener
public void handleWebSocketDisconnect(SessionDisconnectEvent event) {
String sessionId = event.getSessionId();
activeListeners.entrySet().removeIf(entry -> {
RedisPubSubListener listener = entry.getValue();
if (listener.session.getId().equals(sessionId)) {
container.removeMessageListener(listener);
return true;
}
return false;
});
}
}