内容简介:工作中需要使用到先说结论:接口定义来自
工作中需要使用到 WebSocket
,以前没有用过,最近找时间实践了下,顺便了解一下和 WebSocket
的相关知识,在这里记录一下使用过程中的一些问题。
先说说WebSocket中遇到的问题
-
WebSocket
支持跨域吗? -
WebSocket
的事件处理函数onxxx
系列和使用addEventListener
绑定的处理函数冲突吗?可以共存吗? -
WebSocket
需要自己维护心跳检测吗? - 浏览器对同一个
url
的创建的WebSocket
个数有限制吗? - 没有消息的情况下浏览器会关闭
WebSocket
链接吗?
先说结论:
-
WebSocket
支持跨域,但是WebSocket
第一次建立握手链接使用的是http
请求,如果跨域headers
设置不正确,还是出现 握手失败 。 -
WebSocket
的事件处理函数onxxx
系列和使用addEventListener
绑定的处理函数 不冲突可以共存所以最好使用
addEventListener
,例如message
事件,处理函数可以有多个。 -
WebSocket
需要自己维护心跳检测吗?如果客户端和服务端都实现了
WebSocket
协议中的ping/pong
的机制,是不需要心跳维护的,这个需要自己测试一下,需要抓包工具。保险起见还是自己维护一个心跳检测机制,还有自动重试机制。 -
浏览器对同一个
url
的创建的WebSocket
个数有限制吗?有限制,看 max-parallel-http-connections-in-a-browser ,WebSocket
资源是一个很大的开销,所以不要对同一个域创建多个WebSocket
. -
如果没有消息,并且client和server有一方没有实现
ping/pong
机制,浏览器会主动关闭WebSocket
。测试了一下,如果没有消息,server没有主动close, 并且没有实现ping/pong的时候firefox 67和chrome 74都是60秒关闭
WebSocket
, ie 11和edge没有主动关闭。
了解WebSocket API
接口定义来自 TypeScript
的接口定义文件
interface WebSocket extends EventTarget { // 二进制类型blob或者arraybuffer binaryType: BinaryType; // 默认blob readonly bufferedAmount: number; // 缓冲区还剩余的数据(发二进制情况下) readonly extensions: string; // 关闭时触发 onclose: ((this: WebSocket, ev: CloseEvent) => any) | null; // 出错时触发 onerror: ((this: WebSocket, ev: Event) => any) | null; // 有消息时触发 onmessage: ((this: WebSocket, ev: MessageEvent) => any) | null; // websocket open成功时触发 onopen: ((this: WebSocket, ev: Event) => any) | null; // 属性 readonly protocol: string; // new WebSocket时的协议 readonly readyState: number; // WebSocket的状态(像XMLHttpRequest) readonly url: string; // websocket的地址 // 方法 close(code?: number, reason?: string): void; // 主动关闭websocket // 发送数据 send(data: string | ArrayBufferLike | Blob | ArrayBufferView): void; // 状态常量 readonly CLOSED: number; readonly CLOSING: number; readonly CONNECTING: number; readonly OPEN: number; // 添加事件的接口,事件有:close, message, open, error,和onxxx系列对应 addEventListener<K extends keyof WebSocketEventMap>(type: K, listener: (this: WebSocket, ev: WebSocketEventMap[K]) => any, options?: boolean | AddEventListenerOptions): void; addEventListener(type: string, listener: EventListenerOrEventListenerObject, options?: boolean | AddEventListenerOptions): void; removeEventListener<K extends keyof WebSocketEventMap>(type: K, listener: (this: WebSocket, ev: WebSocketEventMap[K]) => any, options?: boolean | EventListenerOptions): void; removeEventListener(type: string, listener: EventListenerOrEventListenerObject, options?: boolean | EventListenerOptions): void; } 复制代码
WebSocket
状态常量
declare var WebSocket: { prototype: WebSocket; new(url: string, protocols?: string | string[]): WebSocket; readonly CLOSED: number; readonly CLOSING: number; readonly CONNECTING: number; readonly OPEN: number; }; 复制代码
发现写 TypeScript
的一个好处就是,写代码看 MDN
的次数变少了。
封装Socket和SocketManager使用
先看使用,然后再看内部的实现
import { SocketManager } from './socket/SocketManager'; const wsUri = 'wss://echo.websocket.org/'; SocketManager.create(wsUri, { pingMessage: 'ping', pongMessage: 'pong', type: 'WebSocket', // 内部封装了SockJs,如果服务端支持可以使用SockJs onopen: (evt, socket) => { // 如果服务端是使用队列维护客户端socket,可能需要你先注册 // 这里可以发送注册信息 socket.send('registe'); }, // onmessage: (evt) => { // 建议消息格式定义为元数据模型 evt.data = { code: number, data: any, type: string } // 这样就解耦的目的 // SocketManager内部也是这么做的 eventBus.$emit(`ws-${JSON.parse(evt.data).code}`, JSON.parse(evt.data)) // } }); // 监听SocketManager触发的事件 eventBus.$on('ws-110000', (data: any) => { console.log(data); }); // 可以随时从SocketManager中取出已经创建的socket const socket = SocketManager.find(wsUri); 复制代码
Socket实现
Socket代码基于 github.com/zimv/websoc… 进行重构和改造。
Socket功能
- heartbeat功能:client和server之间固定间隔发送一个固定的消息
ping
和pong
(自定义),来检测网络状态 - 必须有 重试机制(retry) ,如果client不是主动关闭,需要有 reconnect机制
-
open
和message
事件中,重置retry计数,同时开始下一次心跳检测nextHeartbeat
-
close
和error
事件中,添加重活机制reconnect
(主动close,不再激活reconnect) - 封装的
Socket
对象尽量和原生WebSocket
主要接口保持一致(兼容WebSocket
接口)。 -
Socket
内置一个message queue,如果socket发送(send)时,socket已经close状态,把message缓存,当open事件触发时,重新发送(flush).
Socket对象结构
这里的代码使用 TypeScript
写的,所以 send
和 close
是public方法, url
和 ws(原生WebSocket)
是public属性.
/** * Socket * * based on @see https://github.com/zimv/websocket-heartbeat-js */ export class Socket { public ws!: WebSocket; public url: string; private messageQueue: string[] = []; private retries: number = 0; private opts: SocketOptions; private sockJsOpts: SockJs.Options; private stopRetry: boolean = false; private pongTimerId!: number; private pingTimerId!: number; private lock: boolean = false; constructor( url: string, options: SocketOptions, sockJsOptions?: SockJs.Options ) { this.url = url; this.opts = merge({}, defaultSocketOptions, options || {}); this.sockJsOpts = sockJsOptions || {}; this.createWebSocket(); } private createWebSocket() { const self = this; try { if (this.opts.protocols) { this.ws = this.opts.type === 'WebSocket' ? new WebSocket(this.url, this.opts.protocols) : new SockJs(this.url, this.opts.protocols, this.sockJsOpts); } else { this.ws = this.opts.type === 'WebSocket' ? new WebSocket(this.url) : new SockJs(this.url, null, this.sockJsOpts); } this.bindEvents(); } catch (e) { self.reconnect(); logger.error(e); } } private bindEvents() { this.ws.addEventListener('open', evt => { this.retries = 0; this.opts.onopen && this.opts.onopen(evt, this); this.flush(); // 清空消息队列 // 心跳检测 this.nextHeartbeat(); }); this.ws.addEventListener('close', (evt: CloseEvent) => { this.opts.onclose && this.opts.onclose(evt, this); const closeCode = isFunction(this.opts.closeCode) ? this.opts.closeCode() : this.opts.closeCode; // 1.服务端主动关闭,发送closeCode这样客户端不会reconnect // 2.如果客户端主动关闭,即使evt.code !== closeCode也不会重活 if (evt.code !== closeCode) { this.reconnect(); } }); this.ws.addEventListener('error', (evt: Event) => { this.opts.onerror && this.opts.onerror(evt, this); this.reconnect(); }); this.ws.addEventListener('message', (evt: MessageEvent) => { const pongMessage = isFunction(this.opts.pongMessage) ? this.opts.pongMessage() : this.opts.pongMessage; if (evt.data === pongMessage) { logger.log('socket heartbeat'); } else { this.opts.onmessage && this.opts.onmessage(evt, this); } // 如果获取到消息,心跳检测重置 // 拿到任何消息都说明当前连接是正常的 this.nextHeartbeat(); }); } send(message: string, retry = true) { if (isSocketOpen(this.ws.readyState)) { this.ws.send(message); } else if (retry) { this.addMessage(message); } } close(code?: number, reason?: string) { // 如果手动关闭连接,不再重连 this.stopRetry = true; this.flush(); // 清空消息 this.ws.close(code, reason); this.cleanup(); } private nextHeartbeat() { this.cleanup(); this.startBeating(); } private startBeating() { if (this.stopRetry) return; // 不再重连就不再执行心跳 this.pingTimerId = setTimeout(() => { // 这里发送一个心跳,后端收到后,返回一个心跳消息, // onmessage拿到返回的心跳就说明连接正常 this.ws.send( isFunction(this.opts.pingMessage) ? this.opts.pingMessage() : this.opts.pingMessage ); // onmessage -> nextBeat -> cleanup // 1. 如果没有消息触发onmessage, 这里的pongTimer会执行进行reconnect // 2. onclose -> reconnect this.pongTimerId = setTimeout(() => { // 如果onclose会执行reconnect,我们执行ws.close()就行了.如果直接执行reconnect 会触发onclose导致重连两次 this.ws.close(); }, this.opts.pongDelay); }, this.opts.pingDelay); } private cleanup() { clearTimeout(this.pingTimerId); clearTimeout(this.pongTimerId); } private reconnect() { if ( // 不是无限大,且重试次数已经大于maxRetryCount !Number.isFinite(this!.opts!.maxRetryCount as number) && this.retries >= (this.opts!.maxRetryCount as number) ) { return; } if (this.lock || this.stopRetry) { return; } this.lock = true; this.retries++; // 必须在lock之后,避免进行无效计数 this.opts.onreconnect && this.opts.onreconnect(); // 没连接上会一直重连,设置延迟避免请求过多 setTimeout(() => { this.createWebSocket(); this.lock = false; }, this.opts.retryDelay); } private flush() { while (this.messageQueue.length) { const message = this.messageQueue.shift() as string; this.send(message, false /* no cache */); } } private addMessage(message: string) { if (this.messageQueue.length >= (this.opts!.maxQueueLength as number)) { this.messageQueue.shift(); } this.messageQueue.push(message); } } 复制代码
注意:
WebSocket WebSocket error
Socket的配置参数
pingMessage
:用来发送心跳时候使用
pongMessage
:用来回复服务端心跳检测时使用
有时都是client主动ping,服务端被动pong,所以 pongMessage
可以用来验证message是否时心跳消息,如果是心跳消息,就不触发相关的事件处理函数。
// 事件处理函数 export type SocketEventHandler = ( evt: CloseEvent | MessageEvent | Event, socket: Socket ) => any; export type SocketType = 'WebSocket' | 'SockJs'; export interface SocketOptions { type: SocketType; protocols?: string | string[]; pingMessage: string | (() => string); pongMessage: string | (() => string); // 4000–4999 Available for use by applications. // Reserved code. @see https://developer.mozilla.org/en-US/docs/Web/API/CloseEvent#Properties closeCode: number | (() => number); pingDelay?: number; pongDelay?: number; maxRetryCount?: number; retryDelay?: number; onclose?: SocketEventHandler; onerror?: SocketEventHandler; onopen?: SocketEventHandler; onmessage?: SocketEventHandler; onreconnect?: () => void; maxQueueLength?: number; } 复制代码
SocketManager
管理器
WebSocket
就像数据库连接一样,属于有限的资源,应该进行合理的管理,防止同一个 域名 重复创建。而且重复创建还可能导致事件重复触发,导致服务端资源紧张。
SocketManager
实现了对 Socket
增删改查的管理,防止同一个 域名 重复创建;
同时使用 EventBus
解耦其他组件,使用的时候只需要创建 Socket
,随便在任何地方监听事件就可以了。
SocketManager
结构
export class SocketManager { private static sockets: Socket[] = []; static isEmpty() { return !SocketManager.sockets.length; } static create( url: string, socketOptions: SocketOptions, sockJsOptions?: SockJs.Options ): Socket { let socket: Socket; const existSocket = SocketManager.find(url); // @see https://stackoverflow.com/questions/985431/max-parallel-http-connections-in-a-browser if (existSocket && isSocketActive(existSocket.ws.readyState)) { return existSocket; } if (existSocket && isSocketClose(existSocket.ws.readyState)) { SocketManager.remove(url); } socketOptions.onopen = mergeHandler( socketOptions.onopen as SocketEventHandler, (evt: Event, socket: Socket) => { logger.log('socket onopen'); SocketManager.remove(socket.url); SocketManager.add(socket); } ); socketOptions.onclose = mergeHandler( socketOptions.onclose as SocketEventHandler, (evt: Event, socket: Socket) => { logger.warn('socket onclose'); SocketManager.remove(socket.url); } ); socketOptions.onerror = mergeHandler( socketOptions.onerror as SocketEventHandler, (evt: Event, socket: Socket) => { logger.warn('socket onerror'); SocketManager.remove(socket.url); } ); socketOptions.onmessage = mergeHandler( socketOptions.onmessage as SocketEventHandler, ((evt: MessageEvent) => { logger.log('socket onmessage: ', evt.data); // 链接成功时候返回的消息 if (typeof evt.data === 'string') { try { // data = {code: number, data: any} const msg = JSON.parse(evt.data); // 例如: ws-10010 eventBus.$emit(`${WEB_SOCKET_EVENT_PREFIX}-${msg.code}`, msg); } catch (err) { logger.error(err); } } else if (evt.data instanceof Blob || evt.data instanceof ArrayBuffer) { // 二进制 eventBus.$emit(`${WEB_SOCKET_EVENT_PREFIX}-binary`, evt.data); } else { // unknown eventBus.$emit(`${WEB_SOCKET_EVENT_PREFIX}-unknown`, evt.data); } }) as SocketEventHandler, ); socket = new Socket(url, socketOptions, sockJsOptions); SocketManager.add(socket); return socket; } static find(url: string): Socket | undefined { return SocketManager.sockets.find(item => { return item.url === url; }); } static add(socket: Socket) { if (isObject(socket)) { SocketManager.sockets.push(socket); } } static remove(url: string) { return _remove(SocketManager.sockets, socket => url === socket.url); } static closeAll() { SocketManager.sockets.forEach(socket => { SocketManager.close(socket); }); SocketManager.sockets = []; // reset sockets }; static closeBy(url: string) { if (isString(trim(url)) && !isEmpty(url)) { const socket = SocketManager.find(url); socket && SocketManager.close(socket); } } static close(socket: Socket) { try { socket.close(); } catch (err) { logger.error(err); } SocketManager.remove(socket.url); } } 复制代码
建议快速浏览一下《HTML5 WebSocket权威指南》里面说了WebSocket协议方面的知识,有利于HTTP知识的扩展。
以上所述就是小编给大家介绍的《前端小纠结--WebSocket实战》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!
猜你喜欢:- 前端小纠结--VS Code调试配置分享
- 前端小纠结--约定式提交和自动生成`changelog`
- 前端小纠结--集成gitflow和standard-version使用
- 前端小纠结--IE11下SVG元素默认focusable=true
- 悟懂 MapReduce,不纠结
- 为什么你还在纠结于语法糖?
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
Python金融大数据分析
[德] 伊夫·希尔皮斯科 / 姚军 / 人民邮电出版社 / 2015-12 / CNY 99.00
唯一一本详细讲解使用Python分析处理金融大数据的专业图书;金融应用开发领域从业人员必读。 Python凭借其简单、易读、可扩展性以及拥有巨大而活跃的科学计算社区,在需要分析、处理大量数据的金融行业得到了广泛而迅速的应用,并且成为该行业开发核心应用的首选编程语言。《Python金融大数据分析》提供了使用Python进行数据分析,以及开发相关应用程序的技巧和工具。 《Python金融大......一起来看看 《Python金融大数据分析》 这本书的介绍吧!