Hiredis 实现 Redis 流水线

栏目: 数据库 · 发布时间: 8年前

内容简介:Hiredis 实现 Redis 流水线

Pipelining(流水线)允许 Redis 客户端一次向 Redis 发送多个命令,Redis 在接收到这些命令后,按顺序进行处理,然后将请求的处理结果一次性返回给客户端。流水线可以减少客户端与 Redis 之间的网络通信次数来提升 Redis 客户端在发送多个命令时的性能,可谓提升客户端性能的一个利器。

我们熟悉的 Python 版本的 Redis 客户端 redis-py 提供了 StrictPipeline 对象来实现流水线,使用起来很是方便,具体用法可以参考文章 《Redis 事务学习笔记》 。作为 C/C++ 版本的 Redis 客户端,hiredis 实现流水线稍显有点复杂,不过通过使用 hiredis 来实现流水线却可以更深刻了解流水线的内部实现原理。

Hiredis 提供 redisCommand() 函数来向 Redis 服务端发送命令, redisCommand() 函数的原型如下:

void *redisCommand(redisContext *c, const char *format, ...);

redisCommand() 执行后,返回一个 redisReply * 指针,指向 redisReply 结构体,该结构体包含了返回的结果信息。

redisCommand() 函数是阻塞的(是指使用阻塞版的 redisContext 对象,下文我们同样有这个假定),每调用一次,都会等待 Redis 服务端的返回,然后再继续执行程序下面的逻辑。

redisCommand() 函数的使用示例如下所示,完整的代码和编译可以参考文章 《Redis C 语言客户端 hiredis 的使用》

redisReply *reply;
reply = redisCommand(conn, "SET %s %s", "foo", "bar");
freeReplyObject(reply);

reply = redisCommand(conn, "GET %s", "foo");
printf("%s\n", reply->str);
freeReplyObject(reply);

如果我们需要向 Redis 服务端发送多次命令,如果都是使用 redisCommand() 函数来发送,那么每次发送后都得等待返回结果后才能继续下一次发送,这性能显然不是我们能接受的。Hiredis 提供了 redisAppendCommand() 函数来实现流水线的命令发送方案。

int redisAppendCommand(redisContext *c, const char *format, ...);

redisAppendCommand() 函数执行成功时返回 REDIS_OK ,失败时返回 REDIS_ERR

#define REDIS_ERR -1
#define REDIS_OK 0

redisCommand() 函数一样, redisAppendCommand() 函数在 hiredis 中也有其他变体,这里为了描述的简便,仅以 redisCommand() 函数为例说明。

redisAppendCommand() 函数执行后,并没有立刻将命令发送到 Redis 执行,而是先将命令缓存到 redisContext 对象中。那么, redisContext 对象中被缓存起来的命令什么时候会被发送出去呢?Hiredis 提供了 redisGetReply() 函数来将缓存的命令发送出去的功能。 redisGetReply() 函数的处理过程如下:

  1. 查看 结果缓冲区 是否还有结果没被取出,如果有,则取出结果后直接返回;如果没有,则执行步骤2
  2. 命令缓冲区 的所有命令发送到 Redis 处理,然后一直等待,直到有一个 Redis 的处理结果返回

上面我们提到的 redisCommand() 函数执行后可以直接获取 Redis 的返回结果,这是由于其内部先调用 redisAppendCommand() 函数,然后再调用 redisGetReply() 函数实现的。

说到这里,hiredis 实现流水线的过程就很清晰了。无论 redisCommand() 函数还是 redisAppendCommand() 函数,都会先将命令缓存起来,然后再发送到 Redis 执行。不同的是 redisCommand() 函数会马上发送命令然后取得返回结果,而 redisAppendCommand() 函数则在调用 redisGetReply() 函数才将所有命令一次性发送,并取得第一个命令的返回结果。

下面是使用 redisAppendCommand() 函数实现流水线方案的示例。

redisReply *reply;
redisAppendCommand(context,"SET foo bar");
redisAppendCommand(context,"GET foo");
redisGetReply(context,&reply); // SET命令的返回
freeReplyObject(reply);
redisGetReply(context,&reply); // GET命令的返回
freeReplyObject(reply);

值得注意的是,调用 redisAppendCommand() 函数的次数需要与调用 redisGetReply() 的次数要一致,否则会出现获取的 Redis 处理结果跟预期不一致的情况。

// 测试 redisGetReply 与 redisAppendCommand 调用次数不一致的情况
redisAppendCommand(conn, "get t");
// 本来想取得 set a ddd 的返回,却获取了 get t 的返回
reply = redisCommand(conn, "set a ddd");
printf("set a res: %s\n", reply->str);

输出的结果将会是 get t 命令的返回,而不是 set a ddd 命令的返回。

参考资料

  1. https://github.com/redis/hiredis
  2. https://gist.github.com/dspezia/1893378
  3. http://www.leoox.com/?p=316
  4. http://www.redis.cn/topics/pipelining.html

附:示例程序 testhiredis.c

编译:

gcc -o testhiredis testhiredis.c -L/usr/local/lib -lhiredis

执行:

./testhiredis

输出:

bar

res: OK

res: b

watch res: OK

res: OK, num: 0, type: 5

res: QUEUED, num: 0, type: 5

res: QUEUED, num: 0, type: 5

res: QUEUED, num: 0, type: 5

res: (null), num: 3, type: 2

set a res: tt

源程序:

#include <stdio.h>
#include <hiredis/hiredis.h>

int main() {
    // 阻塞 redisContext
    redisContext *conn = redisConnect("127.0.0.1", 6379);
    if (conn != NULL && conn->err) {
        printf("connection error: %s\n", conn->errstr);
        return 0;
    }

    // 使用 redisCommand 发送命令并获取返回
    redisReply *reply;
    reply = redisCommand(conn, "SET %s %s", "foo", "bar");
    freeReplyObject(reply);

    reply = redisCommand(conn, "GET %s", "foo");
    printf("%s\n", reply->str);
    freeReplyObject(reply);

    // 使用 redisAppendCommand 实现流水线
    redisAppendCommand(conn, "set a b");
    redisAppendCommand(conn,"get a");
    int r = redisGetReply(conn, (void **)&reply);
    if (r == REDIS_ERR) {
        printf("ERROR\n");
    }
    printf("res: %s\n", reply->str);
    freeReplyObject(reply);

    r = redisGetReply(conn, (void **)&reply);
    if (r == REDIS_ERR) {
        printf("ERROR\n");
    }
    printf("res: %s\n", reply->str);
    freeReplyObject(reply);

    // 使用 watch 命令监控键 a
    reply = redisCommand(conn, "watch a");
    printf("watch res: %s\n", reply->str);
    freeReplyObject(reply);

    // 事务流水线,总共5个命令
    redisAppendCommand(conn, "multi");
    redisAppendCommand(conn, "get foo");
    redisAppendCommand(conn, "set t tt");
    redisAppendCommand(conn, "set a aa");
    redisAppendCommand(conn, "exec");

    for (int i = 0; i < 5; ++i) {
        r = redisGetReply(conn, (void **)&reply);
        if (r == REDIS_ERR) {
            printf("ERROR\n");
        }
        printf("res: %s, num: %zu, type: %d\n", reply->str, reply->elements, reply->type);
        freeReplyObject(reply);
    }

    // 测试 redisGetReply 与 redisAppendCommand 调用次数不一致的情况
    redisAppendCommand(conn, "get t");
    // 本来想取得 set a ddd 的返回,却获取了 get t 的返回
    reply = redisCommand(conn, "set a ddd");
    printf("set a res: %s\n", reply->str);

    redisFree(conn);

    return 0;
}

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

第三次工业革命

第三次工业革命

[美] 杰里米•里夫金(Jeremy Rifkin) / 张体伟 / 中信出版社 / 2012-5 / 45.00元

第一次工业革命使19世纪的世界发生了翻天覆地的变化 第二次工业革命为20世纪的人们开创了新世界 第三次工业革命同样也将在21世纪从根本上改变人们的生活和工作 在这本书中,作者为我们描绘了一个宏伟的蓝图:数亿计的人们将在自己家里、办公室里、工厂里生产出自己的绿色能源,并在“能源互联网”上与大家分享,这就好像现在我们在网上发布、分享消息一样。能源民主化将从根本上重塑人际关系,它将影响......一起来看看 《第三次工业革命》 这本书的介绍吧!

JSON 在线解析
JSON 在线解析

在线 JSON 格式化工具

URL 编码/解码
URL 编码/解码

URL 编码/解码

SHA 加密
SHA 加密

SHA 加密工具