前端小纠结--WebSocket实战

栏目: Html5 · 发布时间: 5年前

内容简介:工作中需要使用到先说结论:接口定义来自

工作中需要使用到 WebSocket ,以前没有用过,最近找时间实践了下,顺便了解一下和 WebSocket 的相关知识,在这里记录一下使用过程中的一些问题。

先说说WebSocket中遇到的问题

  1. WebSocket 支持跨域吗?
  2. WebSocket 的事件处理函数 onxxx 系列和使用 addEventListener 绑定的处理函数冲突吗?可以共存吗?
  3. WebSocket 需要自己维护心跳检测吗?
  4. 浏览器对同一个 url 的创建的 WebSocket 个数有限制吗?
  5. 没有消息的情况下浏览器会关闭 WebSocket 链接吗?

先说结论:

  • WebSocket 支持跨域,但是 WebSocket 第一次建立握手链接使用的是 http 请求,如果跨域 headers 设置不正确,还是出现 握手失败

  • WebSocket 的事件处理函数 onxxx 系列和使用 addEventListener 绑定的处理函数 不冲突可以共存

    所以最好使用 addEventListener ,例如 message 事件,处理函数可以有多个。

    看这里 Why_use_addEventListener

  • WebSocket 需要自己维护心跳检测吗?

    如果客户端和服务端都实现了 WebSocket 协议中的 ping/pong 的机制,是不需要心跳维护的,这个需要自己测试一下,需要抓包工具。保险起见还是自己维护一个心跳检测机制,还有自动重试机制。

  • 浏览器对同一个 url 的创建的 WebSocket 个数有限制吗?有限制,看 max-parallel-http-connections-in-a-browserWebSocket 资源是一个很大的开销,所以不要对同一个域创建多个 WebSocket .

  • 如果没有消息,并且client和server有一方没有实现 ping/pong 机制,浏览器会主动关闭 WebSocket

    测试了一下,如果没有消息,server没有主动close, 并且没有实现ping/pong的时候firefox 67和chrome 74都是60秒关闭 WebSocket , ie 11和edge没有主动关闭。

了解WebSocket API

接口定义来自 TypeScript 的接口定义文件

interface WebSocket extends EventTarget {
    // 二进制类型blob或者arraybuffer
    binaryType: BinaryType; // 默认blob
    readonly bufferedAmount: number; // 缓冲区还剩余的数据(发二进制情况下)
    readonly extensions: string;
    // 关闭时触发
    onclose: ((this: WebSocket, ev: CloseEvent) => any) | null;
    // 出错时触发
    onerror: ((this: WebSocket, ev: Event) => any) | null;
    // 有消息时触发
    onmessage: ((this: WebSocket, ev: MessageEvent) => any) | null;
    // websocket open成功时触发
    onopen: ((this: WebSocket, ev: Event) => any) | null;
    // 属性
    readonly protocol: string;  // new WebSocket时的协议
    readonly readyState: number; // WebSocket的状态(像XMLHttpRequest)
    readonly url: string; // websocket的地址
    // 方法
    close(code?: number, reason?: string): void; // 主动关闭websocket
    // 发送数据
    send(data: string | ArrayBufferLike | Blob | ArrayBufferView): void;
    // 状态常量
    readonly CLOSED: number;
    readonly CLOSING: number;
    readonly CONNECTING: number;
    readonly OPEN: number;
    // 添加事件的接口,事件有:close, message, open, error,和onxxx系列对应
    addEventListener<K extends keyof WebSocketEventMap>(type: K, listener: (this: WebSocket, ev: WebSocketEventMap[K]) => any, options?: boolean | AddEventListenerOptions): void;
    addEventListener(type: string, listener: EventListenerOrEventListenerObject, options?: boolean | AddEventListenerOptions): void;
    removeEventListener<K extends keyof WebSocketEventMap>(type: K, listener: (this: WebSocket, ev: WebSocketEventMap[K]) => any, options?: boolean | EventListenerOptions): void;
    removeEventListener(type: string, listener: EventListenerOrEventListenerObject, options?: boolean | EventListenerOptions): void;
}
复制代码

WebSocket 状态常量

declare var WebSocket: {
    prototype: WebSocket;
    new(url: string, protocols?: string | string[]): WebSocket;
    readonly CLOSED: number;
    readonly CLOSING: number;
    readonly CONNECTING: number;
    readonly OPEN: number;
};
复制代码

发现写 TypeScript 的一个好处就是,写代码看 MDN 的次数变少了。

封装Socket和SocketManager使用

先看使用,然后再看内部的实现

import { SocketManager } from './socket/SocketManager';

const wsUri = 'wss://echo.websocket.org/';
SocketManager.create(wsUri, {
  pingMessage: 'ping',
  pongMessage: 'pong',
  type: 'WebSocket', // 内部封装了SockJs,如果服务端支持可以使用SockJs
  onopen: (evt, socket) => {
    // 如果服务端是使用队列维护客户端socket,可能需要你先注册
    // 这里可以发送注册信息
    socket.send('registe');
  },
  // onmessage: (evt) => {
    // 建议消息格式定义为元数据模型 evt.data = { code: number, data: any, type: string }
    // 这样就解耦的目的
    // SocketManager内部也是这么做的 eventBus.$emit(`ws-${JSON.parse(evt.data).code}`, JSON.parse(evt.data))
  // }
});

// 监听SocketManager触发的事件
eventBus.$on('ws-110000', (data: any) => {
  console.log(data);
});

// 可以随时从SocketManager中取出已经创建的socket
const socket = SocketManager.find(wsUri);

复制代码

Socket实现

Socket代码基于 github.com/zimv/websoc… 进行重构和改造。

Socket功能

  1. heartbeat功能:client和server之间固定间隔发送一个固定的消息 pingpong (自定义),来检测网络状态
  2. 必须有 重试机制(retry) ,如果client不是主动关闭,需要有 reconnect机制
  3. openmessage 事件中,重置retry计数,同时开始下一次心跳检测 nextHeartbeat
  4. closeerror 事件中,添加重活机制 reconnect (主动close,不再激活reconnect)
  5. 封装的 Socket 对象尽量和原生 WebSocket 主要接口保持一致(兼容 WebSocket 接口)。
  6. Socket 内置一个message queue,如果socket发送(send)时,socket已经close状态,把message缓存,当open事件触发时,重新发送(flush).

Socket对象结构

这里的代码使用 TypeScript 写的,所以 sendclose 是public方法, urlws(原生WebSocket) 是public属性.

前端小纠结--WebSocket实战
/**
 * Socket
 *
 * based on @see https://github.com/zimv/websocket-heartbeat-js
 */

export class Socket {
  public ws!: WebSocket;
  public url: string;
  private messageQueue: string[] = [];
  private retries: number = 0;
  private opts: SocketOptions;
  private sockJsOpts: SockJs.Options;
  private stopRetry: boolean = false;
  private pongTimerId!: number;
  private pingTimerId!: number;
  private lock: boolean = false;

  constructor(
    url: string,
    options: SocketOptions,
    sockJsOptions?: SockJs.Options
  ) {
    this.url = url;
    this.opts = merge({}, defaultSocketOptions, options || {});
    this.sockJsOpts = sockJsOptions || {};
    this.createWebSocket();
  }

  private createWebSocket() {
    const self = this;
    try {
      if (this.opts.protocols) {
        this.ws =
          this.opts.type === 'WebSocket'
            ? new WebSocket(this.url, this.opts.protocols)
            : new SockJs(this.url, this.opts.protocols, this.sockJsOpts);
      } else {
        this.ws =
          this.opts.type === 'WebSocket'
            ? new WebSocket(this.url)
            : new SockJs(this.url, null, this.sockJsOpts);
      }
      this.bindEvents();
    } catch (e) {
      self.reconnect();
      logger.error(e);
    }
  }

  private bindEvents() {
    this.ws.addEventListener('open', evt => {
      this.retries = 0;
      this.opts.onopen && this.opts.onopen(evt, this);
      this.flush(); // 清空消息队列
      // 心跳检测
      this.nextHeartbeat();
    });

    this.ws.addEventListener('close', (evt: CloseEvent) => {
      this.opts.onclose && this.opts.onclose(evt, this);
      const closeCode = isFunction(this.opts.closeCode)
        ? this.opts.closeCode()
        : this.opts.closeCode;
      // 1.服务端主动关闭,发送closeCode这样客户端不会reconnect
      // 2.如果客户端主动关闭,即使evt.code !== closeCode也不会重活
      if (evt.code !== closeCode) {
        this.reconnect();
      }
    });

    this.ws.addEventListener('error', (evt: Event) => {
      this.opts.onerror && this.opts.onerror(evt, this);
      this.reconnect();
    });

    this.ws.addEventListener('message', (evt: MessageEvent) => {
      const pongMessage = isFunction(this.opts.pongMessage)
        ? this.opts.pongMessage()
        : this.opts.pongMessage;
      if (evt.data === pongMessage) {
        logger.log('socket heartbeat');
      } else {
        this.opts.onmessage && this.opts.onmessage(evt, this);
      }

      // 如果获取到消息,心跳检测重置
      // 拿到任何消息都说明当前连接是正常的
      this.nextHeartbeat();
    });
  }

  send(message: string, retry = true) {
    if (isSocketOpen(this.ws.readyState)) {
      this.ws.send(message);
    } else if (retry) {
      this.addMessage(message);
    }
  }

  close(code?: number, reason?: string) {
    // 如果手动关闭连接,不再重连
    this.stopRetry = true;
    this.flush(); // 清空消息
    this.ws.close(code, reason);
    this.cleanup();
  }

  private nextHeartbeat() {
    this.cleanup();
    this.startBeating();
  }

  private startBeating() {
    if (this.stopRetry) return; // 不再重连就不再执行心跳
    this.pingTimerId = setTimeout(() => {
      // 这里发送一个心跳,后端收到后,返回一个心跳消息,
      // onmessage拿到返回的心跳就说明连接正常
      this.ws.send(
        isFunction(this.opts.pingMessage)
          ? this.opts.pingMessage()
          : this.opts.pingMessage
      );

      // onmessage -> nextBeat -> cleanup
      // 1. 如果没有消息触发onmessage, 这里的pongTimer会执行进行reconnect
      // 2. onclose -> reconnect
      this.pongTimerId = setTimeout(() => {
        // 如果onclose会执行reconnect,我们执行ws.close()就行了.如果直接执行reconnect 会触发onclose导致重连两次
        this.ws.close();
      }, this.opts.pongDelay);
    }, this.opts.pingDelay);
  }

  private cleanup() {
    clearTimeout(this.pingTimerId);
    clearTimeout(this.pongTimerId);
  }

  private reconnect() {
    if (
      // 不是无限大,且重试次数已经大于maxRetryCount
      !Number.isFinite(this!.opts!.maxRetryCount as number) &&
      this.retries >= (this.opts!.maxRetryCount as number)
    ) {
      return;
    }
    if (this.lock || this.stopRetry) {
      return;
    }
    this.lock = true;
    this.retries++; // 必须在lock之后,避免进行无效计数
    this.opts.onreconnect && this.opts.onreconnect();

    // 没连接上会一直重连,设置延迟避免请求过多
    setTimeout(() => {
      this.createWebSocket();
      this.lock = false;
    }, this.opts.retryDelay);
  }

  private flush() {
    while (this.messageQueue.length) {
      const message = this.messageQueue.shift() as string;
      this.send(message, false /* no cache */);
    }
  }

  private addMessage(message: string) {
    if (this.messageQueue.length >= (this.opts!.maxQueueLength as number)) {
      this.messageQueue.shift();
    }
    this.messageQueue.push(message);
  }
}

复制代码

注意:

WebSocket
WebSocket
error

Socket的配置参数

pingMessage :用来发送心跳时候使用

pongMessage :用来回复服务端心跳检测时使用

有时都是client主动ping,服务端被动pong,所以 pongMessage 可以用来验证message是否时心跳消息,如果是心跳消息,就不触发相关的事件处理函数。

前端小纠结--WebSocket实战
// 事件处理函数
export type SocketEventHandler = (
  evt: CloseEvent | MessageEvent | Event,
  socket: Socket
) => any;

export type SocketType = 'WebSocket' | 'SockJs';

export interface SocketOptions {
  type: SocketType;
  protocols?: string | string[];
  pingMessage: string | (() => string);
  pongMessage: string | (() => string);
  // 4000–4999 Available for use by applications.
  // Reserved code. @see https://developer.mozilla.org/en-US/docs/Web/API/CloseEvent#Properties
  closeCode: number | (() => number);
  pingDelay?: number;
  pongDelay?: number;
  maxRetryCount?: number;
  retryDelay?: number;
  onclose?: SocketEventHandler;
  onerror?: SocketEventHandler;
  onopen?: SocketEventHandler;
  onmessage?: SocketEventHandler;
  onreconnect?: () => void;
  maxQueueLength?: number;
}
复制代码

SocketManager 管理器

WebSocket 就像数据库连接一样,属于有限的资源,应该进行合理的管理,防止同一个 域名 重复创建。而且重复创建还可能导致事件重复触发,导致服务端资源紧张。

SocketManager 实现了对 Socket 增删改查的管理,防止同一个 域名 重复创建;

同时使用 EventBus 解耦其他组件,使用的时候只需要创建 Socket ,随便在任何地方监听事件就可以了。

SocketManager 结构

前端小纠结--WebSocket实战
export class SocketManager {
  private static sockets: Socket[] = [];

  static isEmpty() {
    return !SocketManager.sockets.length;
  }
  static create(
    url: string,
    socketOptions: SocketOptions,
    sockJsOptions?: SockJs.Options
  ): Socket {
    let socket: Socket;
    const existSocket = SocketManager.find(url);

    // @see https://stackoverflow.com/questions/985431/max-parallel-http-connections-in-a-browser
    if (existSocket && isSocketActive(existSocket.ws.readyState)) {
      return existSocket;
    }

    if (existSocket && isSocketClose(existSocket.ws.readyState)) {
      SocketManager.remove(url);
    }

    socketOptions.onopen = mergeHandler(
      socketOptions.onopen as SocketEventHandler,
      (evt: Event, socket: Socket) => {
        logger.log('socket onopen');
        SocketManager.remove(socket.url);
        SocketManager.add(socket);
      }
    );

    socketOptions.onclose = mergeHandler(
      socketOptions.onclose as SocketEventHandler,
      (evt: Event, socket: Socket) => {
        logger.warn('socket onclose');
        SocketManager.remove(socket.url);
      }
    );

    socketOptions.onerror = mergeHandler(
      socketOptions.onerror as SocketEventHandler,
      (evt: Event, socket: Socket) => {
        logger.warn('socket onerror');
        SocketManager.remove(socket.url);
      }
    );

    socketOptions.onmessage = mergeHandler(
      socketOptions.onmessage as SocketEventHandler,
      ((evt: MessageEvent) => {
        logger.log('socket onmessage: ', evt.data);
        // 链接成功时候返回的消息
        if (typeof evt.data === 'string') {
          try {
            // data = {code: number, data: any}
            const msg = JSON.parse(evt.data);
            // 例如: ws-10010
            eventBus.$emit(`${WEB_SOCKET_EVENT_PREFIX}-${msg.code}`, msg);
          } catch (err) {
            logger.error(err);
          }
        } else if (evt.data instanceof Blob || evt.data instanceof ArrayBuffer) {
          // 二进制
          eventBus.$emit(`${WEB_SOCKET_EVENT_PREFIX}-binary`, evt.data);
        } else {
          // unknown
          eventBus.$emit(`${WEB_SOCKET_EVENT_PREFIX}-unknown`, evt.data);
        }
      }) as SocketEventHandler,
    );

    socket = new Socket(url, socketOptions, sockJsOptions);
    SocketManager.add(socket);

    return socket;
  }

  static find(url: string): Socket | undefined {
    return SocketManager.sockets.find(item => {
      return item.url === url;
    });
  }

  static add(socket: Socket) {
    if (isObject(socket)) {
      SocketManager.sockets.push(socket);
    }
  }

  static remove(url: string) {
    return _remove(SocketManager.sockets, socket => url === socket.url);
  }

  static closeAll() {
    SocketManager.sockets.forEach(socket => {
      SocketManager.close(socket);
    });
    SocketManager.sockets = []; // reset sockets
  };

  static closeBy(url: string) {
    if (isString(trim(url)) && !isEmpty(url)) {
      const socket = SocketManager.find(url);
      socket && SocketManager.close(socket);
    }
  }

  static close(socket: Socket) {
    try {
      socket.close();
    } catch (err) {
      logger.error(err);
    }

    SocketManager.remove(socket.url);
  }
}

复制代码

建议快速浏览一下《HTML5 WebSocket权威指南》里面说了WebSocket协议方面的知识,有利于HTTP知识的扩展。


以上所述就是小编给大家介绍的《前端小纠结--WebSocket实战》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

Python金融大数据分析

Python金融大数据分析

[德] 伊夫·希尔皮斯科 / 姚军 / 人民邮电出版社 / 2015-12 / CNY 99.00

唯一一本详细讲解使用Python分析处理金融大数据的专业图书;金融应用开发领域从业人员必读。 Python凭借其简单、易读、可扩展性以及拥有巨大而活跃的科学计算社区,在需要分析、处理大量数据的金融行业得到了广泛而迅速的应用,并且成为该行业开发核心应用的首选编程语言。《Python金融大数据分析》提供了使用Python进行数据分析,以及开发相关应用程序的技巧和工具。 《Python金融大......一起来看看 《Python金融大数据分析》 这本书的介绍吧!

随机密码生成器
随机密码生成器

多种字符组合密码

SHA 加密
SHA 加密

SHA 加密工具

XML、JSON 在线转换
XML、JSON 在线转换

在线XML、JSON转换工具