WebSocket 使用指南
本文档详细介绍如何在 Vue Vben Admin + Spring Boot 项目中实现 WebSocket 实时通信。
📚 目录
WebSocket 简介
什么是 WebSocket?
WebSocket 是一种在单个 TCP 连接上进行全双工通信的协议,允许服务端主动向客户端推送数据。
WebSocket vs HTTP
| 特性 | HTTP | WebSocket |
|---|---|---|
| 通信方式 | 单向(客户端请求) | 双向(服务端可主动推送) |
| 连接 | 短连接 | 长连接 |
| 开销 | 每次请求都有 HTTP 头 | 建立连接后开销小 |
| 实时性 | 需要轮询 | 实时推送 |
应用场景
- 💬 即时聊天
- 📊 实时数据监控
- 🔔 消息通知
- 🎮 在线游戏
- 📹 视频会议
后端实现
1. 添加依赖
xml
<!-- pom.xml -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>2. WebSocket 配置
java
package com.vben.common.config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;
@Configuration
public class WebSocketConfig {
@Bean
public ServerEndpointExporter serverEndpointExporter() {
return new ServerEndpointExporter();
}
}3. WebSocket 服务端
java
package com.vben.websocket.server;
import cn.hutool.json.JSONUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import javax.websocket.*;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
@Slf4j
@Component
@ServerEndpoint("/ws/{userId}")
public class WebSocketServer {
private static final AtomicInteger ONLINE_COUNT = new AtomicInteger(0);
private static final ConcurrentHashMap<String, WebSocketServer> WEBSOCKET_MAP = new ConcurrentHashMap<>();
private Session session;
private String userId;
@OnOpen
public void onOpen(Session session, @PathParam("userId") String userId) {
this.session = session;
this.userId = userId;
WEBSOCKET_MAP.put(userId, this);
int count = ONLINE_COUNT.incrementAndGet();
log.info("用户连接:{},当前在线人数:{}", userId, count);
try {
sendMessage("连接成功,欢迎用户:" + userId);
} catch (IOException e) {
log.error("用户:{},网络异常!", userId, e);
}
}
@OnClose
public void onClose() {
WEBSOCKET_MAP.remove(userId);
int count = ONLINE_COUNT.decrementAndGet();
log.info("用户退出:{},当前在线人数:{}", userId, count);
}
@OnMessage
public void onMessage(String message, Session session) {
log.info("用户消息:{},报文:{}", userId, message);
try {
WebSocketMessage msg = JSONUtil.toBean(message, WebSocketMessage.class);
switch (msg.getType()) {
case "heartbeat":
sendMessage("pong");
break;
case "chat":
sendToUser(msg.getToUserId(), message);
break;
case "broadcast":
sendToAll(message);
break;
default:
sendMessage("未知消息类型");
}
} catch (Exception e) {
log.error("消息处理异常", e);
}
}
@OnError
public void onError(Session session, Throwable error) {
log.error("用户错误:{},原因:{}", userId, error.getMessage());
}
public void sendMessage(String message) throws IOException {
synchronized (session) {
this.session.getBasicRemote().sendText(message);
}
}
public static void sendToUser(String userId, String message) {
try {
WebSocketServer webSocketServer = WEBSOCKET_MAP.get(userId);
if (webSocketServer != null) {
webSocketServer.sendMessage(message);
} else {
log.warn("用户:{} 不在线", userId);
}
} catch (IOException e) {
log.error("发送消息失败", e);
}
}
public static void sendToAll(String message) {
WEBSOCKET_MAP.forEach((userId, webSocketServer) -> {
try {
webSocketServer.sendMessage(message);
} catch (IOException e) {
log.error("发送消息失败:{}", userId, e);
}
});
}
public static int getOnlineCount() {
return ONLINE_COUNT.get();
}
}4. 消息实体类
java
package com.vben.websocket.domain;
import lombok.Data;
@Data
public class WebSocketMessage {
private String type;
private String fromUserId;
private String toUserId;
private String content;
private Long timestamp;
private Object data;
}5. WebSocket 控制器
java
package com.vben.websocket.controller;
import com.vben.common.core.domain.R;
import com.vben.websocket.server.WebSocketServer;
import org.springframework.web.bind.annotation.*;
@RestController
@RequestMapping("/websocket")
public class WebSocketController {
@GetMapping("/online-count")
public R<Integer> getOnlineCount() {
return R.ok(WebSocketServer.getOnlineCount());
}
@PostMapping("/send-to-user")
public R<Void> sendToUser(@RequestParam String userId, @RequestParam String message) {
WebSocketServer.sendToUser(userId, message);
return R.ok();
}
@PostMapping("/broadcast")
public R<Void> broadcast(@RequestParam String message) {
WebSocketServer.sendToAll(message);
return R.ok();
}
}前端实现
1. WebSocket 工具类
typescript
// src/utils/websocket.ts
import { ref } from 'vue';
export interface WebSocketMessage {
type: string;
fromUserId?: string;
toUserId?: string;
content: string;
timestamp?: number;
data?: any;
}
export class WebSocketClient {
private ws: WebSocket | null = null;
private url: string;
private reconnectTimer: any = null;
private heartbeatTimer: any = null;
private reconnectAttempts = 0;
private maxReconnectAttempts = 5;
private reconnectInterval = 3000;
private heartbeatInterval = 30000;
public isConnected = ref(false);
private messageHandlers: Map<string, Function[]> = new Map();
constructor(url: string) {
this.url = url;
}
connect(token?: string) {
try {
let wsUrl = this.url;
if (token) {
wsUrl += `?token=${token}`;
}
this.ws = new WebSocket(wsUrl);
this.ws.onopen = () => {
console.log('WebSocket 连接成功');
this.isConnected.value = true;
this.reconnectAttempts = 0;
this.startHeartbeat();
};
this.ws.onmessage = (event) => {
try {
const message = JSON.parse(event.data);
this.handleMessage(message);
} catch (e) {
this.handleMessage({ type: 'text', content: event.data });
}
};
this.ws.onclose = () => {
console.log('WebSocket 连接关闭');
this.isConnected.value = false;
this.stopHeartbeat();
this.reconnect();
};
this.ws.onerror = (error) => {
console.error('WebSocket 错误', error);
this.isConnected.value = false;
};
} catch (error) {
console.error('WebSocket 连接失败', error);
}
}
disconnect() {
this.stopHeartbeat();
if (this.reconnectTimer) {
clearTimeout(this.reconnectTimer);
this.reconnectTimer = null;
}
if (this.ws) {
this.ws.close();
this.ws = null;
}
this.isConnected.value = false;
}
send(message: WebSocketMessage | string) {
if (!this.ws || this.ws.readyState !== WebSocket.OPEN) {
console.error('WebSocket 未连接');
return false;
}
try {
const data = typeof message === 'string' ? message : JSON.stringify(message);
this.ws.send(data);
return true;
} catch (error) {
console.error('发送消息失败', error);
return false;
}
}
on(type: string, handler: Function) {
if (!this.messageHandlers.has(type)) {
this.messageHandlers.set(type, []);
}
this.messageHandlers.get(type)!.push(handler);
}
off(type: string, handler?: Function) {
if (!handler) {
this.messageHandlers.delete(type);
} else {
const handlers = this.messageHandlers.get(type);
if (handlers) {
const index = handlers.indexOf(handler);
if (index > -1) {
handlers.splice(index, 1);
}
}
}
}
private handleMessage(message: any) {
const handlers = this.messageHandlers.get(message.type);
if (handlers) {
handlers.forEach(handler => handler(message));
}
const allHandlers = this.messageHandlers.get('*');
if (allHandlers) {
allHandlers.forEach(handler => handler(message));
}
}
private reconnect() {
if (this.reconnectAttempts >= this.maxReconnectAttempts) {
console.error('WebSocket 重连失败,已达到最大重连次数');
return;
}
this.reconnectAttempts++;
console.log(`WebSocket 尝试重连 (${this.reconnectAttempts}/${this.maxReconnectAttempts})`);
this.reconnectTimer = setTimeout(() => {
this.connect();
}, this.reconnectInterval);
}
private startHeartbeat() {
this.heartbeatTimer = setInterval(() => {
this.send({
type: 'heartbeat',
content: 'ping',
timestamp: Date.now(),
});
}, this.heartbeatInterval);
}
private stopHeartbeat() {
if (this.heartbeatTimer) {
clearInterval(this.heartbeatTimer);
this.heartbeatTimer = null;
}
}
}2. WebSocket Composable
typescript
// src/composables/useWebSocket.ts
import { ref, onMounted, onUnmounted } from 'vue';
import { WebSocketClient, WebSocketMessage } from '@/utils/websocket';
import { useUserStore } from '@/store/user';
export function useWebSocket() {
const userStore = useUserStore();
const wsClient = ref<WebSocketClient | null>(null);
const messages = ref<WebSocketMessage[]>([]);
function init() {
const userId = userStore.userInfo?.id;
if (!userId) {
console.error('用户未登录');
return;
}
const wsUrl = `ws://localhost:8080/ws/${userId}`;
wsClient.value = new WebSocketClient(wsUrl);
wsClient.value.on('chat', handleChatMessage);
wsClient.value.on('notification', handleNotification);
const token = userStore.token;
wsClient.value.connect(token);
}
function sendMessage(message: WebSocketMessage) {
if (wsClient.value) {
wsClient.value.send(message);
}
}
function handleChatMessage(message: WebSocketMessage) {
console.log('收到聊天消息', message);
messages.value.push(message);
}
function handleNotification(message: WebSocketMessage) {
console.log('收到系统通知', message);
}
function disconnect() {
if (wsClient.value) {
wsClient.value.disconnect();
wsClient.value = null;
}
}
onMounted(() => {
init();
});
onUnmounted(() => {
disconnect();
});
return {
wsClient,
messages,
sendMessage,
disconnect,
};
}3. 在组件中使用
vue
<script setup lang="ts">
import { ref } from 'vue';
import { useWebSocket } from '@/composables/useWebSocket';
import { useUserStore } from '@/store/user';
const userStore = useUserStore();
const { wsClient, messages, sendMessage } = useWebSocket();
const inputMessage = ref('');
const toUserId = ref('');
function handleSend() {
if (!inputMessage.value.trim()) {
return;
}
sendMessage({
type: 'chat',
fromUserId: userStore.userInfo?.id,
toUserId: toUserId.value,
content: inputMessage.value,
timestamp: Date.now(),
});
inputMessage.value = '';
}
</script>
<template>
<div class="chat-container">
<!-- 连接状态 -->
<div class="status-bar">
<a-tag :color="wsClient?.isConnected.value ? 'success' : 'error'">
{{ wsClient?.isConnected.value ? '已连接' : '未连接' }}
</a-tag>
</div>
<!-- 消息列表 -->
<div class="message-list">
<div v-for="(msg, index) in messages" :key="index" class="message-item">
<div class="message-header">
<span>{{ msg.fromUserId }}</span>
<span>{{ new Date(msg.timestamp!).toLocaleString() }}</span>
</div>
<div class="message-content">{{ msg.content }}</div>
</div>
</div>
<!-- 输入框 -->
<div class="input-area">
<a-input
v-model:value="toUserId"
placeholder="接收者 ID"
style="width: 200px; margin-right: 10px"
/>
<a-input
v-model:value="inputMessage"
placeholder="输入消息"
@pressEnter="handleSend"
style="flex: 1; margin-right: 10px"
/>
<a-button type="primary" @click="handleSend">发送</a-button>
</div>
</div>
</template>
<style scoped>
.chat-container {
display: flex;
flex-direction: column;
height: 600px;
padding: 20px;
background: #fff;
border-radius: 8px;
}
.message-list {
flex: 1;
overflow-y: auto;
border: 1px solid #e8e8e8;
border-radius: 4px;
padding: 10px;
margin: 20px 0;
}
.message-item {
margin-bottom: 15px;
padding: 10px;
background: #f5f5f5;
border-radius: 4px;
}
.input-area {
display: flex;
align-items: center;
}
</style>应用场景
1. 实时聊天
java
// 后端:聊天服务
@Service
public class ChatService {
public void sendPrivateMessage(String fromUserId, String toUserId, String content) {
WebSocketMessage message = new WebSocketMessage();
message.setType("chat");
message.setFromUserId(fromUserId);
message.setToUserId(toUserId);
message.setContent(content);
message.setTimestamp(System.currentTimeMillis());
WebSocketServer.sendToUser(toUserId, JSONUtil.toJsonStr(message));
}
}2. 系统通知
java
// 后端:通知服务
@Service
public class NotificationService {
public void sendNotification(String userId, String title, String content) {
WebSocketMessage message = new WebSocketMessage();
message.setType("notification");
message.setContent(content);
Map<String, Object> data = new HashMap<>();
data.put("title", title);
data.put("content", content);
message.setData(data);
WebSocketServer.sendToUser(userId, JSONUtil.toJsonStr(message));
}
}3. 实时数据监控
java
// 后端:定时推送监控数据
@Service
public class MonitorService {
@Scheduled(fixedRate = 5000)
public void pushSystemMetrics() {
Map<String, Object> metrics = new HashMap<>();
metrics.put("cpu", getCpuUsage());
metrics.put("memory", getMemoryUsage());
metrics.put("onlineUsers", WebSocketServer.getOnlineCount());
WebSocketMessage message = new WebSocketMessage();
message.setType("monitor");
message.setData(metrics);
WebSocketServer.sendToAll(JSONUtil.toJsonStr(message));
}
}最佳实践
1. 连接管理
typescript
// 单例模式管理 WebSocket
class WebSocketManager {
private static instance: WebSocketClient | null = null;
static getInstance(url: string): WebSocketClient {
if (!this.instance) {
this.instance = new WebSocketClient(url);
}
return this.instance;
}
static destroy() {
if (this.instance) {
this.instance.disconnect();
this.instance = null;
}
}
}2. 错误处理
java
// 后端:统一错误处理
@OnError
public void onError(Session session, Throwable error) {
log.error("WebSocket 错误", error);
try {
WebSocketMessage errorMsg = new WebSocketMessage();
errorMsg.setType("error");
errorMsg.setContent("服务器错误:" + error.getMessage());
session.getBasicRemote().sendText(JSONUtil.toJsonStr(errorMsg));
} catch (IOException e) {
log.error("发送错误消息失败", e);
}
}3. 性能优化
java
// 使用线程池处理消息
@Configuration
public class WebSocketConfig {
@Bean
public Executor webSocketExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(10);
executor.setMaxPoolSize(50);
executor.setQueueCapacity(100);
executor.setThreadNamePrefix("websocket-");
executor.initialize();
return executor;
}
}4. 安全性
java
// IP 白名单
@OnOpen
public void onOpen(Session session, @PathParam("userId") String userId) {
String remoteAddress = session.getUserProperties().get("remoteAddress").toString();
if (!isAllowedIp(remoteAddress)) {
log.warn("非法 IP 尝试连接:{}", remoteAddress);
try {
session.close();
} catch (IOException e) {
log.error("关闭连接失败", e);
}
return;
}
}
// 消息大小限制
@OnMessage
public void onMessage(String message, Session session) {
if (message.length() > 10240) {
log.warn("消息过大,拒绝处理");
return;
}
}常见问题
Q1: WebSocket 连接失败?
解决方案:
nginx
# Nginx 配置 WebSocket 代理
location /ws/ {
proxy_pass http://localhost:8080/ws/;
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection "upgrade";
proxy_set_header Host $host;
proxy_connect_timeout 60s;
proxy_read_timeout 600s;
proxy_send_timeout 600s;
}Q2: 连接频繁断开?
解决方案:
java
// 后端:设置超时时间
@OnOpen
public void onOpen(Session session, @PathParam("userId") String userId) {
session.setMaxIdleTimeout(300000); // 5 分钟
}typescript
// 前端:实现心跳
private startHeartbeat() {
this.heartbeatTimer = setInterval(() => {
if (this.ws?.readyState === WebSocket.OPEN) {
this.send({ type: 'heartbeat', content: 'ping' });
}
}, 30000);
}Q3: 消息丢失?
解决方案:
typescript
// 实现消息队列和重发机制
class ReliableWebSocketClient extends WebSocketClient {
private messageQueue: WebSocketMessage[] = [];
send(message: WebSocketMessage) {
if (this.ws?.readyState === WebSocket.OPEN) {
super.send(message);
} else {
this.messageQueue.push(message);
}
}
private onReconnected() {
while (this.messageQueue.length > 0) {
const message = this.messageQueue.shift();
if (message) {
this.send(message);
}
}
}
}参考资源
祝你开发顺利! 🚀