Skip to content

WebSocket 使用指南

本文档详细介绍如何在 Vue Vben Admin + Spring Boot 项目中实现 WebSocket 实时通信。

📚 目录


WebSocket 简介

什么是 WebSocket?

WebSocket 是一种在单个 TCP 连接上进行全双工通信的协议,允许服务端主动向客户端推送数据。

WebSocket vs HTTP

特性HTTPWebSocket
通信方式单向(客户端请求)双向(服务端可主动推送)
连接短连接长连接
开销每次请求都有 HTTP 头建立连接后开销小
实时性需要轮询实时推送

应用场景

  • 💬 即时聊天
  • 📊 实时数据监控
  • 🔔 消息通知
  • 🎮 在线游戏
  • 📹 视频会议

后端实现

1. 添加依赖

xml
<!-- pom.xml -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-websocket</artifactId>
</dependency>

2. WebSocket 配置

java
package com.vben.common.config;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;

@Configuration
public class WebSocketConfig {
    
    @Bean
    public ServerEndpointExporter serverEndpointExporter() {
        return new ServerEndpointExporter();
    }
}

3. WebSocket 服务端

java
package com.vben.websocket.server;

import cn.hutool.json.JSONUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

import javax.websocket.*;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;

@Slf4j
@Component
@ServerEndpoint("/ws/{userId}")
public class WebSocketServer {
    
    private static final AtomicInteger ONLINE_COUNT = new AtomicInteger(0);
    private static final ConcurrentHashMap<String, WebSocketServer> WEBSOCKET_MAP = new ConcurrentHashMap<>();
    
    private Session session;
    private String userId;
    
    @OnOpen
    public void onOpen(Session session, @PathParam("userId") String userId) {
        this.session = session;
        this.userId = userId;
        WEBSOCKET_MAP.put(userId, this);
        
        int count = ONLINE_COUNT.incrementAndGet();
        log.info("用户连接:{},当前在线人数:{}", userId, count);
        
        try {
            sendMessage("连接成功,欢迎用户:" + userId);
        } catch (IOException e) {
            log.error("用户:{},网络异常!", userId, e);
        }
    }
    
    @OnClose
    public void onClose() {
        WEBSOCKET_MAP.remove(userId);
        int count = ONLINE_COUNT.decrementAndGet();
        log.info("用户退出:{},当前在线人数:{}", userId, count);
    }
    
    @OnMessage
    public void onMessage(String message, Session session) {
        log.info("用户消息:{},报文:{}", userId, message);
        
        try {
            WebSocketMessage msg = JSONUtil.toBean(message, WebSocketMessage.class);
            
            switch (msg.getType()) {
                case "heartbeat":
                    sendMessage("pong");
                    break;
                case "chat":
                    sendToUser(msg.getToUserId(), message);
                    break;
                case "broadcast":
                    sendToAll(message);
                    break;
                default:
                    sendMessage("未知消息类型");
            }
        } catch (Exception e) {
            log.error("消息处理异常", e);
        }
    }
    
    @OnError
    public void onError(Session session, Throwable error) {
        log.error("用户错误:{},原因:{}", userId, error.getMessage());
    }
    
    public void sendMessage(String message) throws IOException {
        synchronized (session) {
            this.session.getBasicRemote().sendText(message);
        }
    }
    
    public static void sendToUser(String userId, String message) {
        try {
            WebSocketServer webSocketServer = WEBSOCKET_MAP.get(userId);
            if (webSocketServer != null) {
                webSocketServer.sendMessage(message);
            } else {
                log.warn("用户:{} 不在线", userId);
            }
        } catch (IOException e) {
            log.error("发送消息失败", e);
        }
    }
    
    public static void sendToAll(String message) {
        WEBSOCKET_MAP.forEach((userId, webSocketServer) -> {
            try {
                webSocketServer.sendMessage(message);
            } catch (IOException e) {
                log.error("发送消息失败:{}", userId, e);
            }
        });
    }
    
    public static int getOnlineCount() {
        return ONLINE_COUNT.get();
    }
}

4. 消息实体类

java
package com.vben.websocket.domain;

import lombok.Data;

@Data
public class WebSocketMessage {
    private String type;
    private String fromUserId;
    private String toUserId;
    private String content;
    private Long timestamp;
    private Object data;
}

5. WebSocket 控制器

java
package com.vben.websocket.controller;

import com.vben.common.core.domain.R;
import com.vben.websocket.server.WebSocketServer;
import org.springframework.web.bind.annotation.*;

@RestController
@RequestMapping("/websocket")
public class WebSocketController {
    
    @GetMapping("/online-count")
    public R<Integer> getOnlineCount() {
        return R.ok(WebSocketServer.getOnlineCount());
    }
    
    @PostMapping("/send-to-user")
    public R<Void> sendToUser(@RequestParam String userId, @RequestParam String message) {
        WebSocketServer.sendToUser(userId, message);
        return R.ok();
    }
    
    @PostMapping("/broadcast")
    public R<Void> broadcast(@RequestParam String message) {
        WebSocketServer.sendToAll(message);
        return R.ok();
    }
}

前端实现

1. WebSocket 工具类

typescript
// src/utils/websocket.ts
import { ref } from 'vue';

export interface WebSocketMessage {
  type: string;
  fromUserId?: string;
  toUserId?: string;
  content: string;
  timestamp?: number;
  data?: any;
}

export class WebSocketClient {
  private ws: WebSocket | null = null;
  private url: string;
  private reconnectTimer: any = null;
  private heartbeatTimer: any = null;
  private reconnectAttempts = 0;
  private maxReconnectAttempts = 5;
  private reconnectInterval = 3000;
  private heartbeatInterval = 30000;
  
  public isConnected = ref(false);
  private messageHandlers: Map<string, Function[]> = new Map();
  
  constructor(url: string) {
    this.url = url;
  }
  
  connect(token?: string) {
    try {
      let wsUrl = this.url;
      if (token) {
        wsUrl += `?token=${token}`;
      }
      
      this.ws = new WebSocket(wsUrl);
      
      this.ws.onopen = () => {
        console.log('WebSocket 连接成功');
        this.isConnected.value = true;
        this.reconnectAttempts = 0;
        this.startHeartbeat();
      };
      
      this.ws.onmessage = (event) => {
        try {
          const message = JSON.parse(event.data);
          this.handleMessage(message);
        } catch (e) {
          this.handleMessage({ type: 'text', content: event.data });
        }
      };
      
      this.ws.onclose = () => {
        console.log('WebSocket 连接关闭');
        this.isConnected.value = false;
        this.stopHeartbeat();
        this.reconnect();
      };
      
      this.ws.onerror = (error) => {
        console.error('WebSocket 错误', error);
        this.isConnected.value = false;
      };
    } catch (error) {
      console.error('WebSocket 连接失败', error);
    }
  }
  
  disconnect() {
    this.stopHeartbeat();
    
    if (this.reconnectTimer) {
      clearTimeout(this.reconnectTimer);
      this.reconnectTimer = null;
    }
    
    if (this.ws) {
      this.ws.close();
      this.ws = null;
    }
    
    this.isConnected.value = false;
  }
  
  send(message: WebSocketMessage | string) {
    if (!this.ws || this.ws.readyState !== WebSocket.OPEN) {
      console.error('WebSocket 未连接');
      return false;
    }
    
    try {
      const data = typeof message === 'string' ? message : JSON.stringify(message);
      this.ws.send(data);
      return true;
    } catch (error) {
      console.error('发送消息失败', error);
      return false;
    }
  }
  
  on(type: string, handler: Function) {
    if (!this.messageHandlers.has(type)) {
      this.messageHandlers.set(type, []);
    }
    this.messageHandlers.get(type)!.push(handler);
  }
  
  off(type: string, handler?: Function) {
    if (!handler) {
      this.messageHandlers.delete(type);
    } else {
      const handlers = this.messageHandlers.get(type);
      if (handlers) {
        const index = handlers.indexOf(handler);
        if (index > -1) {
          handlers.splice(index, 1);
        }
      }
    }
  }
  
  private handleMessage(message: any) {
    const handlers = this.messageHandlers.get(message.type);
    if (handlers) {
      handlers.forEach(handler => handler(message));
    }
    
    const allHandlers = this.messageHandlers.get('*');
    if (allHandlers) {
      allHandlers.forEach(handler => handler(message));
    }
  }
  
  private reconnect() {
    if (this.reconnectAttempts >= this.maxReconnectAttempts) {
      console.error('WebSocket 重连失败,已达到最大重连次数');
      return;
    }
    
    this.reconnectAttempts++;
    console.log(`WebSocket 尝试重连 (${this.reconnectAttempts}/${this.maxReconnectAttempts})`);
    
    this.reconnectTimer = setTimeout(() => {
      this.connect();
    }, this.reconnectInterval);
  }
  
  private startHeartbeat() {
    this.heartbeatTimer = setInterval(() => {
      this.send({
        type: 'heartbeat',
        content: 'ping',
        timestamp: Date.now(),
      });
    }, this.heartbeatInterval);
  }
  
  private stopHeartbeat() {
    if (this.heartbeatTimer) {
      clearInterval(this.heartbeatTimer);
      this.heartbeatTimer = null;
    }
  }
}

2. WebSocket Composable

typescript
// src/composables/useWebSocket.ts
import { ref, onMounted, onUnmounted } from 'vue';
import { WebSocketClient, WebSocketMessage } from '@/utils/websocket';
import { useUserStore } from '@/store/user';

export function useWebSocket() {
  const userStore = useUserStore();
  const wsClient = ref<WebSocketClient | null>(null);
  const messages = ref<WebSocketMessage[]>([]);
  
  function init() {
    const userId = userStore.userInfo?.id;
    if (!userId) {
      console.error('用户未登录');
      return;
    }
    
    const wsUrl = `ws://localhost:8080/ws/${userId}`;
    wsClient.value = new WebSocketClient(wsUrl);
    
    wsClient.value.on('chat', handleChatMessage);
    wsClient.value.on('notification', handleNotification);
    
    const token = userStore.token;
    wsClient.value.connect(token);
  }
  
  function sendMessage(message: WebSocketMessage) {
    if (wsClient.value) {
      wsClient.value.send(message);
    }
  }
  
  function handleChatMessage(message: WebSocketMessage) {
    console.log('收到聊天消息', message);
    messages.value.push(message);
  }
  
  function handleNotification(message: WebSocketMessage) {
    console.log('收到系统通知', message);
  }
  
  function disconnect() {
    if (wsClient.value) {
      wsClient.value.disconnect();
      wsClient.value = null;
    }
  }
  
  onMounted(() => {
    init();
  });
  
  onUnmounted(() => {
    disconnect();
  });
  
  return {
    wsClient,
    messages,
    sendMessage,
    disconnect,
  };
}

3. 在组件中使用

vue
<script setup lang="ts">
import { ref } from 'vue';
import { useWebSocket } from '@/composables/useWebSocket';
import { useUserStore } from '@/store/user';

const userStore = useUserStore();
const { wsClient, messages, sendMessage } = useWebSocket();

const inputMessage = ref('');
const toUserId = ref('');

function handleSend() {
  if (!inputMessage.value.trim()) {
    return;
  }
  
  sendMessage({
    type: 'chat',
    fromUserId: userStore.userInfo?.id,
    toUserId: toUserId.value,
    content: inputMessage.value,
    timestamp: Date.now(),
  });
  
  inputMessage.value = '';
}
</script>

<template>
  <div class="chat-container">
    <!-- 连接状态 -->
    <div class="status-bar">
      <a-tag :color="wsClient?.isConnected.value ? 'success' : 'error'">
        {{ wsClient?.isConnected.value ? '已连接' : '未连接' }}
      </a-tag>
    </div>
    
    <!-- 消息列表 -->
    <div class="message-list">
      <div v-for="(msg, index) in messages" :key="index" class="message-item">
        <div class="message-header">
          <span>{{ msg.fromUserId }}</span>
          <span>{{ new Date(msg.timestamp!).toLocaleString() }}</span>
        </div>
        <div class="message-content">{{ msg.content }}</div>
      </div>
    </div>
    
    <!-- 输入框 -->
    <div class="input-area">
      <a-input
        v-model:value="toUserId"
        placeholder="接收者 ID"
        style="width: 200px; margin-right: 10px"
      />
      <a-input
        v-model:value="inputMessage"
        placeholder="输入消息"
        @pressEnter="handleSend"
        style="flex: 1; margin-right: 10px"
      />
      <a-button type="primary" @click="handleSend">发送</a-button>
    </div>
  </div>
</template>

<style scoped>
.chat-container {
  display: flex;
  flex-direction: column;
  height: 600px;
  padding: 20px;
  background: #fff;
  border-radius: 8px;
}

.message-list {
  flex: 1;
  overflow-y: auto;
  border: 1px solid #e8e8e8;
  border-radius: 4px;
  padding: 10px;
  margin: 20px 0;
}

.message-item {
  margin-bottom: 15px;
  padding: 10px;
  background: #f5f5f5;
  border-radius: 4px;
}

.input-area {
  display: flex;
  align-items: center;
}
</style>

应用场景

1. 实时聊天

java
// 后端:聊天服务
@Service
public class ChatService {
    
    public void sendPrivateMessage(String fromUserId, String toUserId, String content) {
        WebSocketMessage message = new WebSocketMessage();
        message.setType("chat");
        message.setFromUserId(fromUserId);
        message.setToUserId(toUserId);
        message.setContent(content);
        message.setTimestamp(System.currentTimeMillis());
        
        WebSocketServer.sendToUser(toUserId, JSONUtil.toJsonStr(message));
    }
}

2. 系统通知

java
// 后端:通知服务
@Service
public class NotificationService {
    
    public void sendNotification(String userId, String title, String content) {
        WebSocketMessage message = new WebSocketMessage();
        message.setType("notification");
        message.setContent(content);
        
        Map<String, Object> data = new HashMap<>();
        data.put("title", title);
        data.put("content", content);
        message.setData(data);
        
        WebSocketServer.sendToUser(userId, JSONUtil.toJsonStr(message));
    }
}

3. 实时数据监控

java
// 后端:定时推送监控数据
@Service
public class MonitorService {
    
    @Scheduled(fixedRate = 5000)
    public void pushSystemMetrics() {
        Map<String, Object> metrics = new HashMap<>();
        metrics.put("cpu", getCpuUsage());
        metrics.put("memory", getMemoryUsage());
        metrics.put("onlineUsers", WebSocketServer.getOnlineCount());
        
        WebSocketMessage message = new WebSocketMessage();
        message.setType("monitor");
        message.setData(metrics);
        
        WebSocketServer.sendToAll(JSONUtil.toJsonStr(message));
    }
}

最佳实践

1. 连接管理

typescript
// 单例模式管理 WebSocket
class WebSocketManager {
  private static instance: WebSocketClient | null = null;
  
  static getInstance(url: string): WebSocketClient {
    if (!this.instance) {
      this.instance = new WebSocketClient(url);
    }
    return this.instance;
  }
  
  static destroy() {
    if (this.instance) {
      this.instance.disconnect();
      this.instance = null;
    }
  }
}

2. 错误处理

java
// 后端:统一错误处理
@OnError
public void onError(Session session, Throwable error) {
    log.error("WebSocket 错误", error);
    
    try {
        WebSocketMessage errorMsg = new WebSocketMessage();
        errorMsg.setType("error");
        errorMsg.setContent("服务器错误:" + error.getMessage());
        session.getBasicRemote().sendText(JSONUtil.toJsonStr(errorMsg));
    } catch (IOException e) {
        log.error("发送错误消息失败", e);
    }
}

3. 性能优化

java
// 使用线程池处理消息
@Configuration
public class WebSocketConfig {
    
    @Bean
    public Executor webSocketExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(10);
        executor.setMaxPoolSize(50);
        executor.setQueueCapacity(100);
        executor.setThreadNamePrefix("websocket-");
        executor.initialize();
        return executor;
    }
}

4. 安全性

java
// IP 白名单
@OnOpen
public void onOpen(Session session, @PathParam("userId") String userId) {
    String remoteAddress = session.getUserProperties().get("remoteAddress").toString();
    
    if (!isAllowedIp(remoteAddress)) {
        log.warn("非法 IP 尝试连接:{}", remoteAddress);
        try {
            session.close();
        } catch (IOException e) {
            log.error("关闭连接失败", e);
        }
        return;
    }
}

// 消息大小限制
@OnMessage
public void onMessage(String message, Session session) {
    if (message.length() > 10240) {
        log.warn("消息过大,拒绝处理");
        return;
    }
}

常见问题

Q1: WebSocket 连接失败?

解决方案:

nginx
# Nginx 配置 WebSocket 代理
location /ws/ {
    proxy_pass http://localhost:8080/ws/;
    proxy_http_version 1.1;
    proxy_set_header Upgrade $http_upgrade;
    proxy_set_header Connection "upgrade";
    proxy_set_header Host $host;
    proxy_connect_timeout 60s;
    proxy_read_timeout 600s;
    proxy_send_timeout 600s;
}

Q2: 连接频繁断开?

解决方案:

java
// 后端:设置超时时间
@OnOpen
public void onOpen(Session session, @PathParam("userId") String userId) {
    session.setMaxIdleTimeout(300000); // 5 分钟
}
typescript
// 前端:实现心跳
private startHeartbeat() {
  this.heartbeatTimer = setInterval(() => {
    if (this.ws?.readyState === WebSocket.OPEN) {
      this.send({ type: 'heartbeat', content: 'ping' });
    }
  }, 30000);
}

Q3: 消息丢失?

解决方案:

typescript
// 实现消息队列和重发机制
class ReliableWebSocketClient extends WebSocketClient {
  private messageQueue: WebSocketMessage[] = [];
  
  send(message: WebSocketMessage) {
    if (this.ws?.readyState === WebSocket.OPEN) {
      super.send(message);
    } else {
      this.messageQueue.push(message);
    }
  }
  
  private onReconnected() {
    while (this.messageQueue.length > 0) {
      const message = this.messageQueue.shift();
      if (message) {
        this.send(message);
      }
    }
  }
}

参考资源


祝你开发顺利! 🚀

MIT License