Skip to content

WebSocket

socket.io

socket.io支持http长链接和websocket,内部缓存了request链接,通过rooms和namespace功能可以让我们很方便的定向推送和集中推送消息,也正因为它不是简单的websocket,所以浏览器上的 WebSocket跟socket.io无法兼容,需要使用 socket.io-cilent作为客户端. 它的适配器可选性比较多,有redis和mongodb等,都可以在集群情况下实现消息发送.

服务端

js
// server.js
import { createServer } from "http";
import { Server } from "socket.io";
import url from "url";

const httpServer = createServer();
const io = new Server(httpServer, {
  path: "/websocket",
});

const ioUser = io.of("user");

ioUser.use((socket, next) => {
  if (socket.handshake.auth?.authorization !== "test") {
    socket.send("auth failed");
    socket.disconnect();
    return next(new Error("auth failed"));
  }
  const userId = url.parse(socket.request.url)?.userId;
  socket.userId = userId;
  next();
});

ioUser.on("connection", (socket) => {
  const userId = socket.userId;
  socket.join(userId); // join socket to userId

  socket.on("message", (data) => {
    console.log("receive", data);
  });

  console.log("userId connect");
  ioUser.to(userId).emit("message", "another page is opened");
});

httpServer.listen(8080, () => {
  console.log("server start with port: 8080");
});

客户端

js
// client.js
import { io } from "socket.io-client";

const socket = io("ws://localhost:8080/user?userId=userId", {
  timeout: 5 * 1000,
  auth: { authorization: "test" },
  autoConnect: false,
  path: "/websocket",
  transports: ["websocket"],
});

socket.on("connect", () => {
  console.log("test");
  socket.emit("message", "hello world!!!!");
});

socket.on("message", (data) => {
  console.log(data);
});

socket.on("error", console.error);

socket.connect();

服务端地址是 ws://localhost:8080, /user是namespace,/websocket是path, 由于http long-pulling已经过时了,所以客户端只使用websocket连接(transports)。另外一点, 这里使用了auth认证和userId区分request, 在web实际应用里非常重要,当打开多页面时可以共享信息。客户端支持断开重连,所以我们也不需要做其余操作了。

js
const userSockets = await io.in(userId).allSockets();

打开多页面时如何判断这个用户所有的request都断开链接了呢?当 userSockets.length === 0时,表示断开了所有链接。

ws

ws是node的另一个websocket库,官网明确指出它只能应用在服务端,客户端可以使用native WebSocket, 缺点也很明显,多集群的情况下需要我们自己写适配器. 下面这个例子实现了简单的消息发送

js
// server.js
import { WebSocketServer } from "ws";
const wss = new WebSocketServer({ port: 8080 });

wss.on("connection", function connection(ws) {
  ws.on("error", console.error);

  ws.on("message", function message(data) {
    console.log("received: %s", data);
  });

  ws.send("something from server");
});
js
// client.js
const ws = new WebSocket("ws://localhost:8080");

ws.addEventListener("error", (event) => {
  console.error(event.data);
});

ws.addEventListener("open", function open() {
  ws.send("something");
});

ws.addEventListener("message", function message(event) {
  console.log("received: %s", event.data);
});

下面我们来说说如何实适配器, 适配器的目的一方面实现数据存储,另外一方面可以在集群条件下实现消息定向发送

uml diagram

我们的集群中有N个节点,当用户访问集群时,只能触发一个节点,想象一下当user2发送消息给user1, 应该如何实现呢?这就是adapter的作用, 下面我开始对 server.js 进行改进

js
// server.js
import { WebSocketServer } from "ws";
import Redis from "ioredis";

const wss = new WebSocketServer({ port: 8080 });
const redisPub = new Redis();
const redisSub = new Redis();

const clientMap = new Map();
const addClient = (userId, ws) => {
  if (!clientMap.has(userId)) {
    clientMap.set(userId, []);
  }
  clientMap.get(userId).push(ws);
};
const sendMessage = (userId, data) => {
  clientMap.get(userId).forEach((ws) => {
    ws.send(data);
  });
};
const channel = "message";

redisSub.subscribe(channel);
redisSub.on("message", (channel, data) => {
  const obj = JSON.parse(data);
  sendMessage(obj.userId, obj.data);
});

wss.on("connection", function connection(ws, request) {
  const userId = new URL(request.url, "http://example.com").searchParams.get("userId");
  addClient(userId, ws);

  ws.on("error", console.error);

  ws.on("message", (data) => {
    if (data.toString() === "ping") {
      console.log('length %s', clientMap.get(userId).length);
      return ws.send("pong");
    }
  });

  ws.on("close", () => {
    const clients = clientMap.get(userId);
    console.log(clients.length, 'close');
    clients.splice(clients.indexOf(ws), 1);

    if (!clients.length) {
      clientMap.delete(userId);
      console.log("delete userId");
    }
  });
  redisPub.publish(channel, JSON.stringify({ userId, data: "something from server on connection" }));
});

wss.on("close", () => {
  console.log("close");
});

console.log("socket start on port: 8080");

我们使用userId标志位把client集中到一起, 再使用redis的发布订阅功能把消息发布到各个node节点,就可以实现多节点部署了。
socket.io给每个socket增加了一个独立的id用来标识连接,有兴趣也可以实现一下, 这里不再赘述。

WebSocket

node的Websocket在20.10.0版本增加的,并且还处于实验阶段,需要使用的话增加 --experimental-websocket flag, 还可以使用浏览器的WebSocket

在写WebSocket时遇到几个坑

  1. 心跳,官网说为了保持稳定性和健壮性,需要增加心跳
  2. ws连接不成功也会触发 close 事件,通过监听 close 事件可以继续尝试链接, 有的浏览器触发 error 事件, 有些时候会两个都触发,这里增加了一个节流方法, 保证重连一致性
js
// client.js
class Socket {
  constructor(url) {
    this.url = url;
    this.ws = null;
  }
  createWebSocket() {
    this.ws = new WebSocket(this.url);
  }
  send(message) {
    this.ws.send(message);
  }
  keepConnect() {
    clearTimeout(this.keepConnectTimeout);
    this.resetHeartbeat();

    this.keepConnectTimeout = setTimeout(() => {
      try {
        // 连接不成功会触发 "close" 事件
        this.createWebSocket();
        this.addEventListener();
      } catch (e) {
        console.error("error", e);
      }
    }, 6 * 1000);
  }
  connect() {
    this.createWebSocket();
    this.addEventListener();
  }
  resetHeartbeat() {
    clearTimeout(this.pingTimeout);
    clearTimeout(this.heartbeatTimeout);
  }
  heartbeat() {
    this.resetHeartbeat();
    this.heartbeatTimeout = setTimeout(() => {
      this.send("ping");

      this.pingTimeout = setTimeout(() => {
        console.log("close websocket due to without pong");
        // close 方法会触发 "close" 事件
        this.ws.close();
      }, 6 * 1000);
    }, 6 * 1000);
  }
  addEventListener() {
    if (!this.ws) {
      return;
    }
    const ws = this.ws;

    ws.onerror = () => {
      console.log("onerror", new Date());
      this.keepConnect();
    };

    ws.onopen = () => {
      this.heartbeat();
    };

    ws.onmessage = (event) => {
      console.log("received: %s, %s", event.data, new Date());
      if (event.data === "pong") {
        return this.heartbeat();
      }
    };

    ws.onclose = () => {
      console.log("onclose", new Date());
      this.keepConnect();
    };
  }
}

const ws = new Socket("ws://localhost:8080?userId=userId");
ws.connect();

Reference

  1. https://developer.mozilla.org/en-US/docs/Web/API/WebSocket
  2. https://developer.mozilla.org/en-US/docs/Web/API/WebSockets_API
  3. https://nodejs.org/docs/v20.14.0/api/globals.html#websocket