使用PHP Socket开发Yar TCP服务

栏目: IT技术 · 发布时间: 5年前

内容简介:Yar支持HTTP和TCP俩种Transporter, HTTP的是基于CURL,PHP中的Yar默认就是走的HTTP Transporter, 这个大家应该都不陌生, 但是基于TCP的, 可能大家会用的少一些。事实上,我6年前也写过一个C的Yar server框架,叫做Yar-c, 代码地址在只不过,Yar C需要用C来写Handle, 可能对于不少PHPer来说,会稍微有点陌生,那今天我们尝试用PHP来写一个TCP的Server,来介绍下如何实现对Yar RPC协议的处理, 这个例子可以方便的结合

Yar支持HTTP和TCP俩种Transporter, HTTP的是基于CURL,PHP中的Yar默认就是走的HTTP Transporter, 这个大家应该都不陌生, 但是基于TCP的, 可能大家会用的少一些。

事实上,我6年前也写过一个C的Yar server框架,叫做Yar-c, 代码地址在 Yar-C at Github , 它提供了服务启动,worker进程管理,Yar打包协议等。当时我们用这个框架,实现了高性能的微博白名单等服务,以供 PHP 端使用Yar Client来调用。

只不过,Yar C需要用C来写Handle, 可能对于不少PHPer来说,会稍微有点陌生,那今天我们尝试用PHP来写一个TCP的Server,来介绍下如何实现对Yar RPC协议的处理, 这个例子可以方便的结合 Swoole 等异步PHP框架,实现一个高性能的Yar TCP Server。 这个过程中, 会让大家了解Yar的RPC通信协议,以及捎带了解下Socket编程。

我们今天还是用“白名单”服务作为例子,我们提供一个接口,接受RPC客户端的请求,参数是一个用户ID,返回bool,表示是否在白名单:

function query(int $id) : bool;

首先,我们建立一个文件yar_server, 为了方便的直接执行,我们在文件写下:

#!/bin/env php7
<?php
class WhiteList {
}

然后,通过chmod a+x 给这个文件增加可执行的权限。

第一步我们需要处理服务的启动参数处理, 接受一个参数S表示要监听的IP和端口,值的格式是host:port, 我们使用PHP的 getopt 函数来处理命令行参数:

class WhiteList {
    protected $host;

    public function __construct() {
        $options = getOpt("S:");
        if (!isset($options["S"])) {
            $this->usage();
        }
    }

    protected function usage() {
        exit("Usage: yar_server -S hostname:port\n");
    }
}

这样,当用户启动yar_server的时候,没有指定S参数,我们就退出,并提示Usage。 我们还需要另外一个配置,就是指向一个词表文件,词表文件中每一行是一个在白名单中的用户ID, 我们用F表示:

class WhiteList {
    protected $host;
    protected $dicts;

    public function __construct() {
        $options = getOpt("S:F:");
        if (!isset($options["S"]) || !isset($options["F"])) {
            $this->usage();
        }
        $this->host = $options["S"];
        $this->dicts = $options["F"];
    }

    protected function usage() {
        exit("Usage: yar_server -F path_to_dict -S hostname:port\n");
    }
}

好了, 现在启动参数处理完成, 当然为了简单,我省去了对输入参数的有效性检查。

接下来, 我们需要完成俩个函数, 第一个是读取-F指定的词表文件,把所有的用户ID读入到一个数组中,因为我们的这个服务会是常驻进行, 所以不用担心性能, 它只会在启动阶段处理这个词表文件:

protected function loadDict() {
	$this->ids = array();

	$fp = fopen($this->dicts, "r");
	while (!feof($fp)) {
		$line = trim(fgets($fp));
		if ($line) {
			$this->ids[$line] = true;
		}
	}
	fclose($fp);
	echo "Loading dict successfully, ", count($this->ids), " loaded\n";

	return $this;
}

因为用户ID是整型,所以我们把它当作Hashtable的key,这样在将来查找的时候,使用isset会非常高效。 需要注意的是因为文件处理不是我们今天要讲的重点,也就省去了对文件存在行,可读性,合法性的检查。

好了, 接下来是重点了, 我们要启动一个IPV4 TCP Socket服务,监听在$host指定的地方, 为了方便大家了解Socket API,我们不采用PHP的Stream系列函数,而是采用PHP直接包装的Socket系列API, 首先我们用 socket_create 创建一个Socket套接字:

protected function listen() {
	$socket = socket_create(AF_INET, SOCK_STREAM, SOL_TCP);
	if ($socket == false) {
		throw new Exception("socket_create() failed: reason: " . socket_strerror(socket_last_error()));
	}
}

然后,我们需要使用 socket_bind 绑定这个Socket到我们需要监听的地址, 并且使用 socket_listen 来监听请求:

protected function listen() {
	$socket = socket_create(AF_INET, SOCK_STREAM, SOL_TCP);
	if ($socket == false) {
		throw new Exception("socket_create() failed: reason: " . socket_strerror(socket_last_error()));
	}
	list($hostname, $port) = explode(":", $this->host);
	if (socket_bind($socket, $hostname, $port) == false) {
		throw new Exception("socket_bind() failed: reason: " . socket_strerror(socket_last_error()));
	}
	if (socket_listen($socket, 64) === false) {
		throw new Exception("socket_listen() failed: reason: " . socket_strerror(socket_last_error()));
	}
	echo "Starting Yar_Server at {$this->host}\nPresss Ctrl + C to quit\n";

	$this->socket = $socket;
	return $this;
}

好了, 如果一切没问题,接下来我们就可以 socket_accept 来监听请求了, 默认的socket是阻塞模式,如果没有请求,进程会一直阻塞等待, 对于高性能的服务来说, 最好采用非阻塞+select或者epoll的模式来同时处理多个请求, 但是我们的这个例子主要是为了介绍Yar的协议, 所以还是采用简单的阻塞模式。

接下来,我们来编写真正的RPC处理部分,首先我们通过accept接受一个请求, 然后读取请求的的内容,分析请求头中的Yar RPC Header信息, Yar RPC的协议头定义如下:

typedef struct _yar_header {
    uint32_t       id;            // transaction id
    uint16_t       version;       // protocl version
    uint32_t       magic_num;     // default is: 0x80DFEC60
    uint32_t       reserved;
    unsigned char  provider[32];  // reqeust from who
    unsigned char  token[32];     // request token, used for authentication
    uint32_t       body_len;      // request body len
}

其中, magic_num是用来验证请求有效性的一个特殊值, 合法的Yar RPC请求都会设置这个值为0x80DFEC60(我很想告诉你为啥是这个值,但我真不记得当时我为啥用这个数字了),这个头部是82个字节,可能有同学会问,不对啊一看这个Struct不应该是82啊,那是因为头部申明的时候采用pack模式,也就是不对齐, 所以确实是82个字节.

provider是一个字符串,标明了客户端的名字, 比如对于Yar扩展的Yar_Client就是"Yar PHP Cient-x.x.x"

token在设计的最初是为了做API key验证的,但是后来没用上,因为大部分都是内网应用,可以有多种办法来保证请求来源的合法性。

id是一个唯一请求id,这个是为了排查请求问题的, version默认为0,或者1,目前我没有升级过协议头,所以这个暂时我们也不用关心,reserved可以用来传递一些请求参数, 比如客户端可以说明是否保持连接。

body_len是我们需要关心的, 这个字段表明了这次请求,请求体一共多大(不包括Yar协议头部)。

所有的这些数字, 都是以网络字节序传递的, 我们采用PHP处理二进制流的 unpack 函数来解析读取进来的二进制流:

protected function parseHeader($header) {
   return 
     unpack("Nid/nversion/Nmagic_num/Nreserved/A32provider/A32token/Nbody_len", $header);
}

这个函数会返回一个上面说到的头部结构体的数组。

对应的我们也需要使用 pack 来实现生成Yar Header的方法:

protected function genHeader($id, $len) {
	$bin = pack("NnNNA32A32N",
		$id, 0, 0x80DFEC60,
		0, "Yar PHP TCP Server",
		"", $len
	);
	return $bin;
}

如刚才说的,我们需要在接受一个请求以前, 验证请求的合法性:

const YAR_MAGIC_NUM = 0x80DFEC60;
protected function validRequest($header) {
	if ($header["magic_num"] != self::YAR_MAGIC_NUM) {
		return false;
	}
	return true;
}

所以大概请求的处理整个逻辑框架是:

protected function accept() {
	while (($conn = socket_accept($this->socket))) {
		$buf = socket_read($conn, self::HEADER_SIZE, PHP_BINARY_READ);
		if ($buf === false) {
			socket_shutdown($conn);
			continue;
		}

		if (!$this->validHeader($header = $this->parseHeader($buf))) {
			$output = $this->response(1, "illegal Yar RPC request");
			goto response;
		}

		$buf = socket_read($conn, $header["body_len"], PHP_BINARY_READ);
		if ($buf === false) {
			$output = $this->response(1, "insufficient request body");
			goto response;
		}

		if (!$this->validPackager($buf)) {
			$output = $this->response(1, "unsupported packager");
			goto response;
		}

		$buf = substr($buf, 8); /* 跳过打包信息的8个字节 */
		$request = $this->parseRequest($buf);
		if ($request == false) {
			$this->response(1, "malformed request body");
			goto response;
		}

		$status = $this->handle($request, $ret);

		$output = $this->response($status, $ret);
response:
		socket_write($conn, $output, strlen($output));

		socket_shutdown($conn); /* 关闭写 */
	}
}

现在整体的框架就算完成了,我们需要完成handle,response方法就可以了,handle是要根据用户的请求中的m, 来调用指定的方法

protected function handle($request, &$ret) {
	if ($request["m"] == "query") {
		$ret = $this->query(...$request["p"]);
	} else {
		$ret = "unsupported method '" . $request["m"]. "'";
		return 1;
	}
	return 0;
}

现在来实现query方法本身, 这个会很简单,就检查下id是不是在白名单数组:

protected function query($id) {
	return isset($this->ids[$id]);
}

好了,接下来我们要完成response方法,这个方法是打包一个符合Yar协议的返回体,包括82个字节的头部,8个字节的打包信息,以及序列化后的响应体, 我们需要根据status不同,来选择设置响应体中的r还是e字段:

protected function response($status, $ret) {
	$body = array();

	$body["i"] = 0;
	$body["s"] = $status;
	if ($status == 0) {
		$body["r"] = $ret;
	} else {
		$body["e"] = $ret;
	}

	$packed = serialize($body);
	$header = $this->genHeader(0, strlen($packed) + 8);

	return $header . str_pad("PHP", 8, "\0") . $packed;
}

好了, 马上就要大功告成了,我们最后完成启动方法和析构函数(关闭socket):

public function run() {
	$this->loadDict()->listen()->accept();
}
public function __destruct() {
	if ($this->socket) {
		socket_close($this->socket);
	}
}

现在一切就绪, 我们最后在文件末尾加入:

(new Whitelist)->run();

在测试之前,我们先准备一个测试词表,比如1到1000的id:

seq 1, 1, 10000 > user_id.dict

然后启动服务, 监听在本机的9000端口:

$ ./yar_server -F user_id.dict -S127.0.0.1:9000
Loading dict successfully, 1000 loaded
Starting Yar_Server at 127.0.0.1:9000
Presss Ctrl + C to quit

不错,服务启动成功,然后我们使用Yar扩展来编写客户端(你需要首先安装好 Yar扩展 ), 测试下用户id 999和99999的调用效果:

<?php
$yar = new Yar_Client("tcp://127.0.0.1:9000");
var_dump($yar->query("999"));
var_dump($yar->query("99999"));
?>

和调用HTTP的Yar服务不同,此处我们应该使用tcp://做地址头,表示这是一个TCP的服务。

来,运行一下看看:

php7 client.php
bool(true)
bool(false)

看起来不错, 符合预期!

你也可以尝试故意构造一些错误的可能,比如调用不存在的方法之类的,来看看服务器的反应, 这个例子的代码你可以在 这里 找到.

到这里我就算介绍完了如何采用PHP来编写Yar的TCP服务, 大家应该可以很方便的把这个例子修改完善成自己希望的格式,或者嵌入Swoole。

还是要再次说明,因为本文的主要目的是为了介绍Yar RPC通信协议,所以在服务管理这块并没有做的很完善,比如socket_accept, socket_read/write等都默认采用了阻塞模式,也没有加入超时设计,服务进程也只有一个,这个如果真的想用做实际服务的话,还是需要一些功课的,不过我相信你有兴趣的话,都是可以搞定的。:)

当然,最简单的是,你可以直接使用 Yar-C服务框架 来编写C Yar TCP服务。

在这里也有一个Yar-C Server的例子 yar_server in C .

enjoy!


以上所述就是小编给大家介绍的《使用PHP Socket开发Yar TCP服务》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

Sass and Compass in Action

Sass and Compass in Action

Wynn Netherland、Nathan Weizenbaum、Chris Eppstein、Brandon Mathis / Manning Publications / 2013-8-2 / USD 44.99

Written by Sass and Compass creators * Complete Sass language reference * Covers prominent Compass community plug-ins * Innovative approach to creating stylesheets Cascading Style Sheets paint the we......一起来看看 《Sass and Compass in Action》 这本书的介绍吧!

XML、JSON 在线转换
XML、JSON 在线转换

在线XML、JSON转换工具

RGB CMYK 转换工具
RGB CMYK 转换工具

RGB CMYK 互转工具

HSV CMYK 转换工具
HSV CMYK 转换工具

HSV CMYK互换工具