内容简介:即时通讯框架 T-io 之 WebSocket 协议再之 HelloWorld
一.t-io是干嘛的
这个框架已经开源到码云上面,介绍比较详细,链接: https://git.oschina.net/tywo45/t-io ,由于官方介绍中的入门程序客户端和服务端都是用的 java 写的,而且是用简单的自定义TCP协议进行通信的,也有一个websocket协议的列子,感觉有点复杂,不好入门。就想着使用javascript来做客户端,t-io做服务端,采用websocket协议搞一个helloworld。
二.WebSocket协议咋回事
首先,t-io框架简单说就是封装一下些个:
- 内置心跳检测
- 内置心跳发送
- 各种便捷的绑定API
- 各种便捷的发送API
- 一行代码拥有自动重连功能
- 各项消息统计等功能,全部一键内置搞定,省却各种烦恼
所以我们收到客户端发来的websocket协议的包,在服务端就要做对应的解包,那我们还要了解websocket包的结构和通信建立的过程:
①握手阶段:
客户端和服务器建立TCP连接之后,客户端发送握手请求,随后服务器发送握手响应即完成握手阶段。如下:
客户端握手请求类似如下:
HTTP/1.1 101 Switching Protocols Upgrade: websocket Connection: Upgrade Sec-WebSocket-Accept: HSmrc0sMlYUkAGmm5OPpG2HaGWk= Sec-WebSocket-Protocol: chat
服务器的握手响应类似如下:
HTTP/1.1 101 Switching Protocols Upgrade: websocket Connection: Upgrade Sec-WebSocket-Accept: HSmrc0sMlYUkAGmm5OPpG2HaGWk= Sec-WebSocket-Protocol: chat
②握手成功后开始发送数据帧
这是Websocket的数据传输协议,聊天信息一般会按照这个协议的规则来传输,下图中的一整个东西称为一个数据帧,数据帧的成帧和解析是处理这个协议时最麻烦的一部分了。具体这个表怎么看可以参照
0 1 2 3 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 +-+-+-+-+-------+-+-------------+-------------------------------+ |F|R|R|R| opcode|M| Payload len | Extended payload length | |I|S|S|S| (4) |A| (7) | (16/64) | |N|V|V|V| |S| | (if payload len==126/127) | | |1|2|3| |K| | | +-+-+-+-+-------+-+-------------+ - - - - - - - - - - - - - - - + | Extended payload length continued, if payload len == 127 | + - - - - - - - - - - - - - - - +-------------------------------+ | |Masking-key, if MASK set to 1 | +-------------------------------+-------------------------------+ | Masking-key (continued) | Payload Data | +-------------------------------- - - - - - - - - - - - - - - - + : Payload Data continued ... : + - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - + | Payload Data continued ... | +---------------------------------------------------------------+ 具体每一bit的意思 FIN 1bit 表示信息的最后一帧 RSV 1-3 1bit each 以后备用的 默认都为 0 Opcode 4bit 帧类型,稍后细说 Mask 1bit 掩码,是否加密数据,默认必须置为1 (这里很蛋疼) Payload 7bit 数据的长度 Masking-key 1 or 4 bit 掩码 Payload data (x + y) bytes 数据 Extension data x bytes 扩展数据 Application data y bytes 程序数据
三.T-io服务端编写
整体的编写和框架自带的Helloworld程序模式一致,需要改变的就是encode()编码方法和decode()解码方法,因为需要根据websocket协议来。
收到消息后先解码,再判断包的类型,交给响应类型的handler。发送消息先编码。
①握手
decode()解码方法:
由于客户端收到的第一个包是握手包,所以:
if (!barrageSessionContext.isHandshaked()) // 如果还没有握手,则先进行握手操作 { if (BarragePacket.HANDSHAKE_BYTE == firstbyte) { buffer.position(1 + initPosition); return handshakePacket; } else { HttpRequestPacket httpRequestPacket = HttpRequestDecoder.decode(buffer); if (httpRequestPacket == null) { return null; } //交给握手包的Handler httpRequestPacket.setType(Type.COMMAND_HANDSHAKE_REQ); barrageSessionContext.setWebsocket(true); return httpRequestPacket; } } //握手成功后在barrageSessionContext中保存此次连接的握手情况. boolean isWebsocket = barrageSessionContext.isWebsocket();
握手包的Handler,返回握手响应包,让客户端知道握手成功,可以进行数据传输了,:
public class HandshakeReqHandler implements BarrageHandlerIntf<HandshakeBody> { private BarragePacket handshakeRespPacket = new BarragePacket(Type.COMMAND_HANDSHAKE_RESP); @Override public Object handler( BarragePacket packet, String body, ChannelContext<BarrageSessionContext, BarragePacket, Object> channelContext) throws Exception { BarrageSessionContext barrageSessionContext = channelContext.getSessionContext(); barrageSessionContext.setHandshaked(true); boolean isWebsocket = barrageSessionContext.isWebsocket(); if (isWebsocket) { HttpRequestPacket httpRequestPacket = (HttpRequestPacket) packet; HttpResponsePacket httpResponsePacket = updateWebSocketProtocol(httpRequestPacket); if (httpResponsePacket != null) { //发送握手响应包,交给握手响应的handler httpResponsePacket.setType(Type.COMMAND_HANDSHAKE_RESP); Aio.send(channelContext, httpResponsePacket); } else { Aio.remove(channelContext, "不是websocket协议"); } } else { Aio.send(channelContext, handshakeRespPacket); } return null; } public HttpResponsePacket updateWebSocketProtocol(HttpRequestPacket httpRequestPacket) { Map<String, String> headers = httpRequestPacket.getHeaders(); String Sec_WebSocket_Key = headers.get("Sec-WebSocket-Key"); if (StringUtils.isNotBlank(Sec_WebSocket_Key)) { String Sec_WebSocket_Key_Magic = Sec_WebSocket_Key + "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"; byte[] key_array = SHA1Util.SHA1(Sec_WebSocket_Key_Magic); String acceptKey = BASE64Util.byteArrayToBase64(key_array); HttpResponsePacket httpResponsePacket = new HttpResponsePacket(); HttpResponseStatus httpResponseStatus = HttpResponseStatus.C101; httpResponsePacket.setHttpResponseStatus(httpResponseStatus); Map<String, String> respHeaders = new HashMap<>(); respHeaders.put("Connection", "Upgrade"); respHeaders.put("Upgrade", "WebSocket"); respHeaders.put("Sec-WebSocket-Accept", acceptKey); httpResponsePacket.setHeaders(respHeaders); return httpResponsePacket; } return null; } }
encode()编码方法:
上面在握手包的handler中,调用了send方法,发送了一个握手响应包,所以要在编码的时候先判断是否是握手响应包,对其编码:
if (packet.getType() == Type.COMMAND_HANDSHAKE_RESP) { if (isWebsocket) { return HttpResponseEncoder.encode((HttpResponsePacket) packet, groupContext, channelContext); } else { ByteBuffer buffer = ByteBuffer.allocate(1); buffer.put(BarragePacket.HANDSHAKE_BYTE); return buffer; } }
握手响应包的编码
public static ByteBuffer encode(HttpResponsePacket httpResponsePacket, GroupContext<BarrageSessionContext, BarragePacket, Object> groupContext, ChannelContext<BarrageSessionContext, BarragePacket, Object> channelContext) { int bodyLength = 0; byte[] httpResponseBody = httpResponsePacket.getHttpResponseBody(); if (httpResponseBody != null) { bodyLength = httpResponseBody.length; } StringBuilder sb = new StringBuilder(128); HttpResponseStatus httpResponseStatus = httpResponsePacket.getHttpResponseStatus(); // httpResponseStatus.get sb.append("HTTP/1.1 ").append(httpResponseStatus.getStatus()).append(" ").append(httpResponseStatus.getDescription()).append("\r\n"); Map<String, String> headers = httpResponsePacket.getHeaders(); if (headers != null && headers.size() > 0) { headers.put("Content-Length", bodyLength + ""); Set<Entry<String, String>> set = headers.entrySet(); for (Entry<String, String> entry : set) { sb.append(entry.getKey()).append(": ").append(entry.getValue()).append("\r\n"); } } sb.append("\r\n"); byte[] headerBytes = null; try { headerBytes = sb.toString().getBytes("utf-8"); } catch (Exception e) { throw new RuntimeException(e); } ByteBuffer buffer = ByteBuffer.allocate(headerBytes.length + bodyLength); buffer.put(headerBytes); if (bodyLength > 0) { buffer.put(httpResponseBody); } return buffer; }
②数据包
decode()解码方法:
上面握手成功的时候会在BarrageSessionContext中保存握手的情况。
if (isWebsocket) // 走的websocket协议 { WebsocketPacket websocketPacket = WebsocketDecoder.decode(buffer, channelContext); if (websocketPacket == null) { return null; } Opcode opcode = websocketPacket.getWsOpcode(); if (opcode == Opcode.BINARY) { byte[] wsBody = websocketPacket.getWsBody(); if (wsBody == null || wsBody.length == 0) { throw new AioDecodeException("错误的websocket包,body为空"); } BarragePacket barragePacket = new BarragePacket(Type.COMMAND_HANDSHAKE_RESP); if (wsBody.length > 1) { byte[] dst = new byte[wsBody.length - 1]; System.arraycopy(wsBody, 1, dst, 0, dst.length); barragePacket.setBody(dst); } return barragePacket; } else if (opcode == Opcode.PING || opcode == Opcode.PONG) { return heartbeatPacket; } else if (opcode == Opcode.CLOSE) { BarragePacket barragePacket = new BarragePacket(Type.COMMAND_CLOSE_REQ); return barragePacket; } else if (opcode == Opcode.TEXT) { byte[] wsBody = websocketPacket.getWsBody(); if (wsBody == null || wsBody.length == 0) { throw new AioDecodeException("错误的websocket包,body为空"); } BarragePacket barragePacket = new BarragePacket(Type.P2P_REQ); if (wsBody.length > 1) { barragePacket.setBody(wsBody); } return barragePacket; } else { throw new AioDecodeException("错误的websocket包,错误的Opcode"); } } else { if (BarragePacket.HEARTBEAT_BYTE == firstbyte) { buffer.position(1 + initPosition); return heartbeatPacket; } }
数据包的handler:
public class P2PReqHandler implements BarrageHandlerIntf<P2PReqBody>{ @Override public Object handler( BarragePacket packet, String jsonStr, ChannelContext<BarrageSessionContext, BarragePacket, Object> channelContext) throws Exception { System.out.println(jsonStr); BarragePacket barragePacket = new BarragePacket(Type.P2P_REQ); barragePacket.setBody(("收到了你的消息,你的消息是:" + jsonStr).getBytes(BarragePacket.CHARSET)); Aio.send(channelContext, barragePacket); return null; } }
这个收到消息后的处理是,发送收到的消息到客户端。
encode()编码方法:
如果不是握手响应包,走websocket包的编码 if (isWebsocket) { return WebsocketEncoder.encode(packet, groupContext, channelContext); }
public static ByteBuffer encode(BarragePacket barragePacket, GroupContext<BarrageSessionContext, BarragePacket, Object> groupContext, ChannelContext<BarrageSessionContext, BarragePacket, Object> channelContext) { byte[] websocketHeader; byte[] imBody = barragePacket.getBody(); int wsBodyLength = 1; //固定有一个命令码,占一位 if (imBody != null) { wsBodyLength += imBody.length; } byte header0 = (byte) (0x8f & (Opcode.BINARY.getCode() | 0xf0)); if (wsBodyLength < 126) { websocketHeader = new byte[2]; websocketHeader[0] = header0; websocketHeader[1] = (byte) wsBodyLength; } else if (wsBodyLength < ((1 << 16) - 1)) { websocketHeader = new byte[4]; websocketHeader[0] = header0; websocketHeader[1] = 126; websocketHeader[3] = (byte) (wsBodyLength & 0xff); websocketHeader[2] = (byte) ((wsBodyLength >> 8) & 0x80); } else { websocketHeader = new byte[6]; websocketHeader[0] = header0; websocketHeader[1] = 127; int2Byte(websocketHeader, wsBodyLength, 2); } ByteBuffer buf = ByteBuffer.allocate(websocketHeader.length + wsBodyLength); buf.put(websocketHeader); buf.put(barragePacket.getType()); if (imBody != null) { buf.put(imBody); } return buf; } public static void int2Byte(byte[] bytes, int value, int offset) { checkLength(bytes, 4, offset); bytes[offset + 3] = (byte) ((value & 0xff)); bytes[offset + 2] = (byte) ((value >> 8 * 1) & 0xff); bytes[offset + 1] = (byte) ((value >> 8 * 2) & 0xff); bytes[offset + 0] = (byte) ((value >> 8 * 3)); } private static void checkLength(byte[] bytes, int length, int offset) { if (bytes == null) { throw new IllegalArgumentException("null"); } if (offset < 0) { throw new IllegalArgumentException("invalidate offset " + offset); } if (bytes.length - offset < length) { throw new IllegalArgumentException("invalidate length " + bytes.length); } }
websocket协议的服务器的处理流程大致是上面这个流程,但是代码我只贴了关键部分,完整的代码可以去我的码云上下载。
四.javascript客户端编写
<!DOCTYPE html> <html> <head> <title>Testing websockets</title> <meta http-equiv="Content-Type" content="text/html; charset=UTF-8"> </head> <body> <div> <input type="submit" value="Start" onclick="start()" /> </div> <div id="messages"></div> <script type="text/javascript"> var webSocket = new WebSocket('ws://localhost:5678'); webSocket.onerror = function(event) { onError(event) }; webSocket.onopen = function(event) { onOpen(event) }; webSocket.onmessage = function(event) { onMessage(event) }; function onMessage(event) { var blob = event.data; var reader = new FileReader(); reader.readAsText(blob, 'utf-8'); reader.onload = function (e) { document.getElementById('messages').innerHTML += '<br />' + reader.result; } } function onOpen(event) { document.getElementById('messages').innerHTML = 'Connection established'; } function onError(event) { alert(event.data); } function start() { webSocket.send('hellohellohellohellohellohellohellohellohe'); } </script> </body> </html>
最好是下载程序下来,打上断点,debug去跟一下,每一步包做了那些处理,变成了什么样子,传输的二进制数据转换成字符串是甚么样子等等。
·······················································································································································
个人博客: http://z77z.oschina.io/
此项目下载地址: https://git.oschina.net/z77z/springboot_mybatisplus
·······················································································································································
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网
猜你喜欢:- 分布式服务框架之远程通讯技术及原理分析
- gim 1.0.0 版本发布,基于 getty 的即时通讯框架
- Swoole 分布式通讯框架 SwooleDistributed 2.6 正式版发布
- Spark内置框架rpc通讯机制及RpcEnv基础设施-Spark商业环境实战
- t-io 3.2.3发布,OSC官方人员也在使用的通讯框架
- Vue组件通讯
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。