Appearance
WebSocket
socket.io
socket.io支持http长链接和websocket,内部缓存了request链接,通过rooms和namespace功能可以让我们很方便的定向推送和集中推送消息,也正因为它不是简单的websocket,所以浏览器上的 WebSocket
跟socket.io无法兼容,需要使用 socket.io-cilent作为客户端. 它的适配器可选性比较多,有redis和mongodb等,都可以在集群情况下实现消息发送.
服务端
js
// server.js
import { createServer } from "http";
import { Server } from "socket.io";
import url from "url";
const httpServer = createServer();
const io = new Server(httpServer, {
path: "/websocket",
});
const ioUser = io.of("user");
ioUser.use((socket, next) => {
if (socket.handshake.auth?.authorization !== "test") {
socket.send("auth failed");
socket.disconnect();
return next(new Error("auth failed"));
}
const userId = url.parse(socket.request.url)?.userId;
socket.userId = userId;
next();
});
ioUser.on("connection", (socket) => {
const userId = socket.userId;
socket.join(userId); // join socket to userId
socket.on("message", (data) => {
console.log("receive", data);
});
console.log("userId connect");
ioUser.to(userId).emit("message", "another page is opened");
});
httpServer.listen(8080, () => {
console.log("server start with port: 8080");
});
客户端
js
// client.js
import { io } from "socket.io-client";
const socket = io("ws://localhost:8080/user?userId=userId", {
timeout: 5 * 1000,
auth: { authorization: "test" },
autoConnect: false,
path: "/websocket",
transports: ["websocket"],
});
socket.on("connect", () => {
console.log("test");
socket.emit("message", "hello world!!!!");
});
socket.on("message", (data) => {
console.log(data);
});
socket.on("error", console.error);
socket.connect();
服务端地址是 ws://localhost:8080
, /user
是namespace,/websocket
是path, 由于http long-pulling已经过时了,所以客户端只使用websocket连接(transports)。另外一点, 这里使用了auth认证和userId区分request, 在web实际应用里非常重要,当打开多页面时可以共享信息。客户端支持断开重连,所以我们也不需要做其余操作了。
js
const userSockets = await io.in(userId).allSockets();
打开多页面时如何判断这个用户所有的request都断开链接了呢?当 userSockets.length === 0
时,表示断开了所有链接。
ws
ws是node的另一个websocket库,官网明确指出它只能应用在服务端,客户端可以使用native WebSocket, 缺点也很明显,多集群的情况下需要我们自己写适配器. 下面这个例子实现了简单的消息发送
js
// server.js
import { WebSocketServer } from "ws";
const wss = new WebSocketServer({ port: 8080 });
wss.on("connection", function connection(ws) {
ws.on("error", console.error);
ws.on("message", function message(data) {
console.log("received: %s", data);
});
ws.send("something from server");
});
js
// client.js
const ws = new WebSocket("ws://localhost:8080");
ws.addEventListener("error", (event) => {
console.error(event.data);
});
ws.addEventListener("open", function open() {
ws.send("something");
});
ws.addEventListener("message", function message(event) {
console.log("received: %s", event.data);
});
下面我们来说说如何实适配器, 适配器的目的一方面实现数据存储,另外一方面可以在集群条件下实现消息定向发送
我们的集群中有N个节点,当用户访问集群时,只能触发一个节点,想象一下当user2发送消息给user1, 应该如何实现呢?这就是adapter的作用, 下面我开始对 server.js
进行改进
js
// server.js
import { WebSocketServer } from "ws";
import Redis from "ioredis";
const wss = new WebSocketServer({ port: 8080 });
const redisPub = new Redis();
const redisSub = new Redis();
const clientMap = new Map();
const addClient = (userId, ws) => {
if (!clientMap.has(userId)) {
clientMap.set(userId, []);
}
clientMap.get(userId).push(ws);
};
const sendMessage = (userId, data) => {
clientMap.get(userId).forEach((ws) => {
ws.send(data);
});
};
const channel = "message";
redisSub.subscribe(channel);
redisSub.on("message", (channel, data) => {
const obj = JSON.parse(data);
sendMessage(obj.userId, obj.data);
});
wss.on("connection", function connection(ws, request) {
const userId = new URL(request.url, "http://example.com").searchParams.get("userId");
addClient(userId, ws);
ws.on("error", console.error);
ws.on("message", (data) => {
if (data.toString() === "ping") {
console.log('length %s', clientMap.get(userId).length);
return ws.send("pong");
}
});
ws.on("close", () => {
const clients = clientMap.get(userId);
console.log(clients.length, 'close');
clients.splice(clients.indexOf(ws), 1);
if (!clients.length) {
clientMap.delete(userId);
console.log("delete userId");
}
});
redisPub.publish(channel, JSON.stringify({ userId, data: "something from server on connection" }));
});
wss.on("close", () => {
console.log("close");
});
console.log("socket start on port: 8080");
我们使用userId标志位把client集中到一起, 再使用redis的发布订阅功能把消息发布到各个node节点,就可以实现多节点部署了。
socket.io给每个socket增加了一个独立的id用来标识连接,有兴趣也可以实现一下, 这里不再赘述。
WebSocket
node
的Websocket在20.10.0
版本增加的,并且还处于实验阶段,需要使用的话增加 --experimental-websocket
flag, 还可以使用浏览器的WebSocket
在写WebSocket时遇到几个坑
- 心跳,官网说为了保持稳定性和健壮性,需要增加心跳
- ws连接不成功也会触发
close
事件,通过监听 close 事件可以继续尝试链接, 有的浏览器触发 error 事件, 有些时候会两个都触发,这里增加了一个节流方法, 保证重连一致性
js
// client.js
class Socket {
constructor(url) {
this.url = url;
this.ws = null;
}
createWebSocket() {
this.ws = new WebSocket(this.url);
}
send(message) {
this.ws.send(message);
}
keepConnect() {
clearTimeout(this.keepConnectTimeout);
this.resetHeartbeat();
this.keepConnectTimeout = setTimeout(() => {
try {
// 连接不成功会触发 "close" 事件
this.createWebSocket();
this.addEventListener();
} catch (e) {
console.error("error", e);
}
}, 6 * 1000);
}
connect() {
this.createWebSocket();
this.addEventListener();
}
resetHeartbeat() {
clearTimeout(this.pingTimeout);
clearTimeout(this.heartbeatTimeout);
}
heartbeat() {
this.resetHeartbeat();
this.heartbeatTimeout = setTimeout(() => {
this.send("ping");
this.pingTimeout = setTimeout(() => {
console.log("close websocket due to without pong");
// close 方法会触发 "close" 事件
this.ws.close();
}, 6 * 1000);
}, 6 * 1000);
}
addEventListener() {
if (!this.ws) {
return;
}
const ws = this.ws;
ws.onerror = () => {
console.log("onerror", new Date());
this.keepConnect();
};
ws.onopen = () => {
this.heartbeat();
};
ws.onmessage = (event) => {
console.log("received: %s, %s", event.data, new Date());
if (event.data === "pong") {
return this.heartbeat();
}
};
ws.onclose = () => {
console.log("onclose", new Date());
this.keepConnect();
};
}
}
const ws = new Socket("ws://localhost:8080?userId=userId");
ws.connect();