Hiredis 实现 Redis 流水线

栏目: 数据库 · 发布时间: 7年前

内容简介:Hiredis 实现 Redis 流水线

Pipelining(流水线)允许 Redis 客户端一次向 Redis 发送多个命令,Redis 在接收到这些命令后,按顺序进行处理,然后将请求的处理结果一次性返回给客户端。流水线可以减少客户端与 Redis 之间的网络通信次数来提升 Redis 客户端在发送多个命令时的性能,可谓提升客户端性能的一个利器。

我们熟悉的 Python 版本的 Redis 客户端 redis-py 提供了 StrictPipeline 对象来实现流水线,使用起来很是方便,具体用法可以参考文章 《Redis 事务学习笔记》 。作为 C/C++ 版本的 Redis 客户端,hiredis 实现流水线稍显有点复杂,不过通过使用 hiredis 来实现流水线却可以更深刻了解流水线的内部实现原理。

Hiredis 提供 redisCommand() 函数来向 Redis 服务端发送命令, redisCommand() 函数的原型如下:

void *redisCommand(redisContext *c, const char *format, ...);

redisCommand() 执行后,返回一个 redisReply * 指针,指向 redisReply 结构体,该结构体包含了返回的结果信息。

redisCommand() 函数是阻塞的(是指使用阻塞版的 redisContext 对象,下文我们同样有这个假定),每调用一次,都会等待 Redis 服务端的返回,然后再继续执行程序下面的逻辑。

redisCommand() 函数的使用示例如下所示,完整的代码和编译可以参考文章 《Redis C 语言客户端 hiredis 的使用》

redisReply *reply;
reply = redisCommand(conn, "SET %s %s", "foo", "bar");
freeReplyObject(reply);

reply = redisCommand(conn, "GET %s", "foo");
printf("%s\n", reply->str);
freeReplyObject(reply);

如果我们需要向 Redis 服务端发送多次命令,如果都是使用 redisCommand() 函数来发送,那么每次发送后都得等待返回结果后才能继续下一次发送,这性能显然不是我们能接受的。Hiredis 提供了 redisAppendCommand() 函数来实现流水线的命令发送方案。

int redisAppendCommand(redisContext *c, const char *format, ...);

redisAppendCommand() 函数执行成功时返回 REDIS_OK ,失败时返回 REDIS_ERR

#define REDIS_ERR -1
#define REDIS_OK 0

redisCommand() 函数一样, redisAppendCommand() 函数在 hiredis 中也有其他变体,这里为了描述的简便,仅以 redisCommand() 函数为例说明。

redisAppendCommand() 函数执行后,并没有立刻将命令发送到 Redis 执行,而是先将命令缓存到 redisContext 对象中。那么, redisContext 对象中被缓存起来的命令什么时候会被发送出去呢?Hiredis 提供了 redisGetReply() 函数来将缓存的命令发送出去的功能。 redisGetReply() 函数的处理过程如下:

  1. 查看 结果缓冲区 是否还有结果没被取出,如果有,则取出结果后直接返回;如果没有,则执行步骤2
  2. 命令缓冲区 的所有命令发送到 Redis 处理,然后一直等待,直到有一个 Redis 的处理结果返回

上面我们提到的 redisCommand() 函数执行后可以直接获取 Redis 的返回结果,这是由于其内部先调用 redisAppendCommand() 函数,然后再调用 redisGetReply() 函数实现的。

说到这里,hiredis 实现流水线的过程就很清晰了。无论 redisCommand() 函数还是 redisAppendCommand() 函数,都会先将命令缓存起来,然后再发送到 Redis 执行。不同的是 redisCommand() 函数会马上发送命令然后取得返回结果,而 redisAppendCommand() 函数则在调用 redisGetReply() 函数才将所有命令一次性发送,并取得第一个命令的返回结果。

下面是使用 redisAppendCommand() 函数实现流水线方案的示例。

redisReply *reply;
redisAppendCommand(context,"SET foo bar");
redisAppendCommand(context,"GET foo");
redisGetReply(context,&reply); // SET命令的返回
freeReplyObject(reply);
redisGetReply(context,&reply); // GET命令的返回
freeReplyObject(reply);

值得注意的是,调用 redisAppendCommand() 函数的次数需要与调用 redisGetReply() 的次数要一致,否则会出现获取的 Redis 处理结果跟预期不一致的情况。

// 测试 redisGetReply 与 redisAppendCommand 调用次数不一致的情况
redisAppendCommand(conn, "get t");
// 本来想取得 set a ddd 的返回,却获取了 get t 的返回
reply = redisCommand(conn, "set a ddd");
printf("set a res: %s\n", reply->str);

输出的结果将会是 get t 命令的返回,而不是 set a ddd 命令的返回。

参考资料

  1. https://github.com/redis/hiredis
  2. https://gist.github.com/dspezia/1893378
  3. http://www.leoox.com/?p=316
  4. http://www.redis.cn/topics/pipelining.html

附:示例程序 testhiredis.c

编译:

gcc -o testhiredis testhiredis.c -L/usr/local/lib -lhiredis

执行:

./testhiredis

输出:

bar

res: OK

res: b

watch res: OK

res: OK, num: 0, type: 5

res: QUEUED, num: 0, type: 5

res: QUEUED, num: 0, type: 5

res: QUEUED, num: 0, type: 5

res: (null), num: 3, type: 2

set a res: tt

源程序:

#include <stdio.h>
#include <hiredis/hiredis.h>

int main() {
    // 阻塞 redisContext
    redisContext *conn = redisConnect("127.0.0.1", 6379);
    if (conn != NULL && conn->err) {
        printf("connection error: %s\n", conn->errstr);
        return 0;
    }

    // 使用 redisCommand 发送命令并获取返回
    redisReply *reply;
    reply = redisCommand(conn, "SET %s %s", "foo", "bar");
    freeReplyObject(reply);

    reply = redisCommand(conn, "GET %s", "foo");
    printf("%s\n", reply->str);
    freeReplyObject(reply);

    // 使用 redisAppendCommand 实现流水线
    redisAppendCommand(conn, "set a b");
    redisAppendCommand(conn,"get a");
    int r = redisGetReply(conn, (void **)&reply);
    if (r == REDIS_ERR) {
        printf("ERROR\n");
    }
    printf("res: %s\n", reply->str);
    freeReplyObject(reply);

    r = redisGetReply(conn, (void **)&reply);
    if (r == REDIS_ERR) {
        printf("ERROR\n");
    }
    printf("res: %s\n", reply->str);
    freeReplyObject(reply);

    // 使用 watch 命令监控键 a
    reply = redisCommand(conn, "watch a");
    printf("watch res: %s\n", reply->str);
    freeReplyObject(reply);

    // 事务流水线,总共5个命令
    redisAppendCommand(conn, "multi");
    redisAppendCommand(conn, "get foo");
    redisAppendCommand(conn, "set t tt");
    redisAppendCommand(conn, "set a aa");
    redisAppendCommand(conn, "exec");

    for (int i = 0; i < 5; ++i) {
        r = redisGetReply(conn, (void **)&reply);
        if (r == REDIS_ERR) {
            printf("ERROR\n");
        }
        printf("res: %s, num: %zu, type: %d\n", reply->str, reply->elements, reply->type);
        freeReplyObject(reply);
    }

    // 测试 redisGetReply 与 redisAppendCommand 调用次数不一致的情况
    redisAppendCommand(conn, "get t");
    // 本来想取得 set a ddd 的返回,却获取了 get t 的返回
    reply = redisCommand(conn, "set a ddd");
    printf("set a res: %s\n", reply->str);

    redisFree(conn);

    return 0;
}

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

ES6标准入门(第3版)

ES6标准入门(第3版)

阮一峰 / 电子工业出版社 / 2017-9 / 99.00

ES6是下一代JavaScript语言标准的统称,每年6月发布一次修订版,迄今为止已经发布了3个版本,分别是ES2015、ES2016、ES2017。本书根据ES2017标准,详尽介绍了所有新增的语法,对基本概念、设计目的和用法进行了清晰的讲解,给出了大量简单易懂的示例。本书为中级难度,适合那些已经对JavaScript语言有一定了解的读者,可以作为学习这门语言最新进展的工具书,也可以作为参考手册......一起来看看 《ES6标准入门(第3版)》 这本书的介绍吧!

RGB转16进制工具
RGB转16进制工具

RGB HEX 互转工具

MD5 加密
MD5 加密

MD5 加密工具

SHA 加密
SHA 加密

SHA 加密工具