内容简介:Hiredis 实现 Redis 流水线
Pipelining(流水线)允许 Redis 客户端一次向 Redis 发送多个命令,Redis 在接收到这些命令后,按顺序进行处理,然后将请求的处理结果一次性返回给客户端。流水线可以减少客户端与 Redis 之间的网络通信次数来提升 Redis 客户端在发送多个命令时的性能,可谓提升客户端性能的一个利器。
我们熟悉的 Python 版本的 Redis 客户端 redis-py 提供了 StrictPipeline 对象来实现流水线,使用起来很是方便,具体用法可以参考文章 《Redis 事务学习笔记》 。作为 C/C++ 版本的 Redis 客户端,hiredis 实现流水线稍显有点复杂,不过通过使用 hiredis 来实现流水线却可以更深刻了解流水线的内部实现原理。
Hiredis 提供 redisCommand()
函数来向 Redis 服务端发送命令, redisCommand()
函数的原型如下:
void *redisCommand(redisContext *c, const char *format, ...);
redisCommand()
执行后,返回一个 redisReply *
指针,指向 redisReply
结构体,该结构体包含了返回的结果信息。
redisCommand()
函数是阻塞的(是指使用阻塞版的 redisContext
对象,下文我们同样有这个假定),每调用一次,都会等待 Redis 服务端的返回,然后再继续执行程序下面的逻辑。
redisCommand()
函数的使用示例如下所示,完整的代码和编译可以参考文章 《Redis C 语言客户端 hiredis 的使用》
。
redisReply *reply; reply = redisCommand(conn, "SET %s %s", "foo", "bar"); freeReplyObject(reply); reply = redisCommand(conn, "GET %s", "foo"); printf("%s\n", reply->str); freeReplyObject(reply);
如果我们需要向 Redis 服务端发送多次命令,如果都是使用 redisCommand()
函数来发送,那么每次发送后都得等待返回结果后才能继续下一次发送,这性能显然不是我们能接受的。Hiredis 提供了 redisAppendCommand()
函数来实现流水线的命令发送方案。
int redisAppendCommand(redisContext *c, const char *format, ...);
redisAppendCommand()
函数执行成功时返回 REDIS_OK
,失败时返回 REDIS_ERR
。
#define REDIS_ERR -1 #define REDIS_OK 0
跟 redisCommand()
函数一样, redisAppendCommand()
函数在 hiredis 中也有其他变体,这里为了描述的简便,仅以 redisCommand()
函数为例说明。
redisAppendCommand()
函数执行后,并没有立刻将命令发送到 Redis 执行,而是先将命令缓存到 redisContext
对象中。那么, redisContext
对象中被缓存起来的命令什么时候会被发送出去呢?Hiredis 提供了 redisGetReply()
函数来将缓存的命令发送出去的功能。 redisGetReply()
函数的处理过程如下:
- 查看 结果缓冲区 是否还有结果没被取出,如果有,则取出结果后直接返回;如果没有,则执行步骤2
- 将 命令缓冲区 的所有命令发送到 Redis 处理,然后一直等待,直到有一个 Redis 的处理结果返回
上面我们提到的 redisCommand()
函数执行后可以直接获取 Redis 的返回结果,这是由于其内部先调用 redisAppendCommand()
函数,然后再调用 redisGetReply()
函数实现的。
说到这里,hiredis 实现流水线的过程就很清晰了。无论 redisCommand()
函数还是 redisAppendCommand()
函数,都会先将命令缓存起来,然后再发送到 Redis 执行。不同的是 redisCommand()
函数会马上发送命令然后取得返回结果,而 redisAppendCommand()
函数则在调用 redisGetReply()
函数才将所有命令一次性发送,并取得第一个命令的返回结果。
下面是使用 redisAppendCommand()
函数实现流水线方案的示例。
redisReply *reply; redisAppendCommand(context,"SET foo bar"); redisAppendCommand(context,"GET foo"); redisGetReply(context,&reply); // SET命令的返回 freeReplyObject(reply); redisGetReply(context,&reply); // GET命令的返回 freeReplyObject(reply);
值得注意的是,调用 redisAppendCommand()
函数的次数需要与调用 redisGetReply()
的次数要一致,否则会出现获取的 Redis 处理结果跟预期不一致的情况。
// 测试 redisGetReply 与 redisAppendCommand 调用次数不一致的情况 redisAppendCommand(conn, "get t"); // 本来想取得 set a ddd 的返回,却获取了 get t 的返回 reply = redisCommand(conn, "set a ddd"); printf("set a res: %s\n", reply->str);
输出的结果将会是 get t
命令的返回,而不是 set a ddd
命令的返回。
参考资料
- https://github.com/redis/hiredis
- https://gist.github.com/dspezia/1893378
- http://www.leoox.com/?p=316
- http://www.redis.cn/topics/pipelining.html
附:示例程序 testhiredis.c
编译:
gcc -o testhiredis testhiredis.c -L/usr/local/lib -lhiredis
执行:
./testhiredis
输出:
bar
res: OK
res: b
watch res: OK
res: OK, num: 0, type: 5
res: QUEUED, num: 0, type: 5
res: QUEUED, num: 0, type: 5
res: QUEUED, num: 0, type: 5
res: (null), num: 3, type: 2
set a res: tt
源程序:
#include <stdio.h> #include <hiredis/hiredis.h> int main() { // 阻塞 redisContext redisContext *conn = redisConnect("127.0.0.1", 6379); if (conn != NULL && conn->err) { printf("connection error: %s\n", conn->errstr); return 0; } // 使用 redisCommand 发送命令并获取返回 redisReply *reply; reply = redisCommand(conn, "SET %s %s", "foo", "bar"); freeReplyObject(reply); reply = redisCommand(conn, "GET %s", "foo"); printf("%s\n", reply->str); freeReplyObject(reply); // 使用 redisAppendCommand 实现流水线 redisAppendCommand(conn, "set a b"); redisAppendCommand(conn,"get a"); int r = redisGetReply(conn, (void **)&reply); if (r == REDIS_ERR) { printf("ERROR\n"); } printf("res: %s\n", reply->str); freeReplyObject(reply); r = redisGetReply(conn, (void **)&reply); if (r == REDIS_ERR) { printf("ERROR\n"); } printf("res: %s\n", reply->str); freeReplyObject(reply); // 使用 watch 命令监控键 a reply = redisCommand(conn, "watch a"); printf("watch res: %s\n", reply->str); freeReplyObject(reply); // 事务流水线,总共5个命令 redisAppendCommand(conn, "multi"); redisAppendCommand(conn, "get foo"); redisAppendCommand(conn, "set t tt"); redisAppendCommand(conn, "set a aa"); redisAppendCommand(conn, "exec"); for (int i = 0; i < 5; ++i) { r = redisGetReply(conn, (void **)&reply); if (r == REDIS_ERR) { printf("ERROR\n"); } printf("res: %s, num: %zu, type: %d\n", reply->str, reply->elements, reply->type); freeReplyObject(reply); } // 测试 redisGetReply 与 redisAppendCommand 调用次数不一致的情况 redisAppendCommand(conn, "get t"); // 本来想取得 set a ddd 的返回,却获取了 get t 的返回 reply = redisCommand(conn, "set a ddd"); printf("set a res: %s\n", reply->str); redisFree(conn); return 0; }
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网
猜你喜欢:本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
R语言实战(第2版)
[美] Robert I. Kabacoff / 王小宁、刘撷芯、黄俊文等 / 人民邮电出版社 / 2016-5 / 99.00元
本书注重实用性,是一本全面而细致的R指南,高度概括了该软件和它的强大功能,展示了使用的统计示例,且对于难以用传统方法处理的凌乱、不完整和非正态的数据给出了优雅的处理方法。作者不仅仅探讨统计分析,还阐述了大量探索和展示数据的图形功能。新版做了大量更新和修正,新增了近200页内容,介绍数据挖掘、预测性分析和高级编程。一起来看看 《R语言实战(第2版)》 这本书的介绍吧!