1.通常实现方式

1.引入依赖

        <!-- 实现对 WebSocket 相关依赖的引入,方便~ -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-websocket</artifactId>
        </dependency>

2.添加controller

package com.example.test_demo.websocket;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Controller;

import javax.websocket.*;
import javax.websocket.server.ServerEndpoint;

@Controller
@ServerEndpoint("/web/socket")
public class WebController {
    private Logger logger = LoggerFactory.getLogger(getClass());

    @OnOpen
    public void onOpen(Session session, EndpointConfig config) {
        logger.info("[onOpen][session({}) 接入]", session);
    }

    @OnMessage
    public void onMessage(Session session, String message) {
        logger.info("[onOpen][session({}) 接收到一条消息({})]", session, message);
        session.getAsyncRemote().sendText("这是你发送的消息吗"+message);
    }

    @OnClose
    public void onClose(Session session, CloseReason closeReason) {
        logger.info("[onClose][session({}) 连接关闭。关闭原因是({})}]", session, closeReason);
    }

    @OnError
    public void onError(Session session, Throwable throwable) {
        logger.info("[onClose][session({}) 发生异常]", session, throwable);
    }
}

3.添加配置类

package com.example.test_demo.websocket;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;

@Configuration
public class WebSocketConfiguration {
    // 该bean的作用是 扫描加有 @ServerEndPoint 注解的bean
    @Bean
    public ServerEndpointExporter serverEndpointExporter() {
        return new ServerEndpointExporter();
    }
}

4.启动测试

前段测试网址:https://www.easyswoole.com/wstool.html

2.stomp协议实现

注解

作用

@MessageMapping

将方法映射到STOMP消息的目标地址(类似HTTP的@RequestMapping

@SubscribeMapping

处理客户端的订阅请求(如初次订阅时返回初始化数据)

@SendTo

将方法返回值广播到指定目标地址(群发)

@SendToUser

将返回值发送给特定用户(点对点)

@Header

获取STOMP消息的头部信息

@Payload

获取消息的负载内容(消息体)

1.导入依赖

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-websocket</artifactId>
</dependency>

2.添加配置类

package com.websocketdemo.demos.config;

import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.simp.config.MessageBrokerRegistry;
import org.springframework.web.socket.config.annotation.StompEndpointRegistry;
import org.springframework.web.socket.config.annotation.WebSocketMessageBrokerConfigurer;

@Configuration
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {

    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {
        // 注册STOMP端点,客户端通过该URL连接WebSocket
        registry.addEndpoint("/ws") // 连接端点
                .setAllowedOriginPatterns("*") // 允许跨域
                .withSockJS(); // 支持SockJS
        ;
    }

    @Override
    public void configureMessageBroker(MessageBrokerRegistry registry) {
        // 配置消息代理
        registry.enableSimpleBroker("/topic","/queue"); // 启用内存消息代理,处理以/topic为前缀的目标
        registry.setApplicationDestinationPrefixes("/app"); // 应用处理以/app为前缀的消息
    }
}

3.添加消息类

package com.websocketdemo.demos.entity;

import lombok.Data;

@Data
public class ChatMessage {
    private String sender;
    private String content;
}

4.添加controller

package com.websocketdemo.demos.controller;

import com.websocketdemo.demos.entity.ChatMessage;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.messaging.handler.annotation.MessageMapping;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.messaging.handler.annotation.SendTo;
import org.springframework.messaging.simp.annotation.SendToUser;
import org.springframework.messaging.simp.annotation.SubscribeMapping;
import org.springframework.stereotype.Controller;

import java.security.Principal;
import java.util.List;

@Controller
public class ChatController {

    // 处理发送到/app/chat的消息,并将结果广播到/topic/messages
    @MessageMapping("/chat")
    @SendTo("/topic/messages")
    public ChatMessage handleMessage(@Payload ChatMessage message, @Header("simpSessionId") String sessionId) {
        message.setSender("User-" + sessionId.substring(0, 4));
        return message;
    }

    // 处理客户端订阅/app/init时的请求,返回初始化数据(不广播)
//    @SubscribeMapping("/init")
//    public List<ChatMessage> handleSubscribe() {
//        // 返回历史消息或初始化数据
//        return messageService.getRecentMessages();
//    }

    // 点对点发送消息到特定用户
    @MessageMapping("/private")
    @SendToUser("/queue/private")
    public ChatMessage sendPrivateMessage(
            @Payload ChatMessage message,
//            Principal principal 需要 集成 Spring Security 并配置用户登录
            @Header("simpSessionId") String sessionId) {
        message.setSender(sessionId);
        return message;
    }
}

5.测试前端代码

<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <title>WebSocket Test</title>
    <script src="https://cdn.jsdelivr.net/npm/sockjs-client@1/dist/sockjs.min.js"></script>
    <script src="https://cdn.jsdelivr.net/npm/stompjs@2.3.3/lib/stomp.min.js"></script>
</head>
<body>
    <h1>WebSocket 测试</h1>
    <button onclick="connect()">连接</button>
    <button onclick="disconnect()">断开连接</button>
    <button onclick="sendChatMessage()">发送广播消息</button>
    <button onclick="sendPrivateMessage()">发送私有消息</button>
    <div id="output"></div>
    <script>
        var stompClient = null;
        function setConnected(connected) {
            document.getElementById('output').innerHTML = connected ? '已连接' : '断开连接';
        }
        function connect() {
            // 如果你启用了 SockJS 支持,就使用 withSockJS,如果没有可以直接用 WebSocket
            var socket = new SockJS('http://localhost:8080/ws');
            stompClient = Stomp.over(socket);
            stompClient.connect({}, function(frame) {
                setConnected(true);
                console.log('Connected: ' + frame);
                // 订阅群发主题
                stompClient.subscribe('/topic/messages', function(message) {
                    showMessage(JSON.parse(message.body));
                });
                // 订阅点对点主题(注意需要加上"/user"前缀)
                stompClient.subscribe('/user/queue/private', function(message) {
                    showMessage(JSON.parse(message.body));
                });
            });
        }
        function disconnect() {
            if (stompClient !== null) {
                stompClient.disconnect();
            }
            setConnected(false);
            console.log("Disconnected");
        }
        function sendChatMessage() {
            if (stompClient) {
                var msg = {
                    sender: "测试客户端",
                    content: "Hello WebSocket"
                };
                // 发送消息到/app/chat,服务器收到后会广播到/topic/messages
                stompClient.send("/app/chat", {}, JSON.stringify(msg));
            }
        }
		function sendPrivateMessage() {
            if (stompClient) {
                var msg = {
                    sender: "testprivate",
                    content: "Hello!"
                };
                stompClient.send("/app/private", {}, JSON.stringify(msg));
            }
        }
        function showMessage(message) {
            var output = document.getElementById('output');
            output.innerHTML += "<p>" + message.sender + ": " + message.content + "</p>";
        }
    </script>
</body>
</html>

3.监听redis队列,并返回给对应用户

# 需要修改 将原生的ws 修改为stomp协议的
import com.baomidou.mybatisplus.core.toolkit.StringUtils;
import org.springframework.context.event.EventListener;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.listener.ChannelTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.stereotype.Component;
import org.springframework.web.socket.WebSocketSession;
import org.springframework.web.socket.messaging.SessionDisconnectEvent;

import javax.annotation.Resource;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;

import static cn.iocoder.yudao.framework.common.enums.RedisKeyEnum.*;


@Component
public class VerificationCodesMessageListener implements WebSocketMessageListener<String> {

    @Resource
    private RedisMessageListenerContainer container;
    # 监听器列表
    private final ConcurrentHashMap<String, RedisPubSubListener> activeListeners = new ConcurrentHashMap<>();

    @Resource
    private WebSocketMessageSender webSocketMessageSender;

    @Resource
    private StringRedisTemplate stringRedisTemplate;

    @Override
    public void onMessage(WebSocketSession session, String message) {
        String key = String.format(VERIFICATION_SUCCESS.getPrefix(), message);
        String value = stringRedisTemplate.opsForValue().get(key);

        if (StringUtils.isBlank(value)) {
            String channel = String.format(VERIFICATION_SUCCESS_CHANNEL.getPrefix(), message);
            RedisPubSubListener listener = new RedisPubSubListener(session, channel);
            container.addMessageListener(listener, new ChannelTopic(channel));
            activeListeners.put(message, listener);
        } else {
            webSocketMessageSender.send(session.getId(), "verificationCodes", "success");
        }
    }

    private class RedisPubSubListener implements MessageListener {
        private final WebSocketSession session;
        private final String channel;
        private final String listenerId;

        public RedisPubSubListener(WebSocketSession session, String channel) {
            this.session = session;
            this.channel = channel;
            this.listenerId = UUID.randomUUID().toString();
        }

        @Override
        public void onMessage(Message message, byte[] pattern) {
            String receivedChannel = new String(message.getChannel());
            String receivedMessage = new String(message.getBody());

            if (channel.equals(receivedChannel)) {
                webSocketMessageSender.send(session.getId(), "verificationCodes", receivedMessage);
                container.removeMessageListener(this);
            }
        }

        @Override
        public boolean equals(Object o) {
            if (this == o) return true;
            if (o == null || getClass() != o.getClass()) return false;
            RedisPubSubListener that = (RedisPubSubListener) o;
            return listenerId.equals(that.listenerId);
        }

        @Override
        public int hashCode() {
            return Objects.hash(listenerId);
        }
    }

    @Override
    public String getType() {
        return "verificationCodes";
    }
    // 链接结束后自动销毁
    @EventListener
    public void handleWebSocketDisconnect(SessionDisconnectEvent event) {
        String sessionId = event.getSessionId();
        activeListeners.entrySet().removeIf(entry -> {
            RedisPubSubListener listener = entry.getValue();
            if (listener.session.getId().equals(sessionId)) {
                container.removeMessageListener(listener);
                return true;
            }
            return false;
        });
    }
}