内容简介:众所周知,Redis的所有数据都存储在内存中,但是内存是一种有限的资源,所以为了防止Redis无限制的使用内存,在启动Redis时可以通过配置项当Redis使用的内存超过配置的可以在启动Redis时,通过配置项
众所周知,Redis的所有数据都存储在内存中,但是内存是一种有限的资源,所以为了防止 Redis 无限制的使用内存,在启动Redis时可以通过配置项 maxmemory
来指定其最大能使用的内存容量。例如可以通过以下配置来设置Redis最大能使用 1G 内存:
maxmemory 1G
当Redis使用的内存超过配置的 maxmemory
时,便会触发数据淘汰策略。Redis提供了多种数据淘汰的策略,如下:
-
volatile-lru
: 最近最少使用算法,从设置了过期时间的键中选择空转时间最长的键值对清除掉 -
volatile-lfu
: 最近最不经常使用算法,从设置了过期时间的键中选择某段时间之内使用频次最小的键值对清除掉 -
volatile-ttl
: 从设置了过期时间的键中选择过期时间最早的键值对清除 -
volatile-random
: 从设置了过期时间的键中,随机选择键进行清除 -
allkeys-lru
: 最近最少使用算法,从所有的键中选择空转时间最长的键值对清除 -
allkeys-lfu
: 最近最不经常使用算法,从所有的键中选择某段时间之内使用频次最少的键值对清除 -
allkeys-random
: 所有的键中,随机选择键进行删除 -
noeviction
: 不做任何的清理工作,在redis的内存超过限制之后,所有的写入操作都会返回错误;但是读操作都能正常的进行
可以在启动Redis时,通过配置项 maxmemory_policy
来指定要使用的数据淘汰策略。例如要使用 volatile-lru
策略可以通过以下配置来指定:
maxmemory_policy volatile-lru
LRU算法
LRU是 Least Recently Used
的缩写,即最近最少使用,很多缓存系统都使用此算法作为淘汰策略。
最简单的实现方式就是把所有缓存通过一个链表连接起来,新创建的缓存添加到链表的头部,如果有缓存被访问了,就把缓存移动到链表的头部。由于被访问的缓存会移动到链表的头部,所以没有被访问的缓存会随着时间的推移移动的链表的尾部,淘汰数据时只需要从链表的尾部开始即可。下图展示了这个过程:
Redis的LRU算法
Redis使用了结构体 robj
来存储缓存对象,而 robj
结构有个名为 lru
的字段,用于记录缓存对象最后被访问的时间,Redis就是以 lru
字段的值作为淘汰依据。 robj
结构如下:
typedef struct redisObject { ... unsigned lru:24; ... } robj;
当缓存对象被访问时,便会更新此字段的值。代码如下:
robj *lookupKey(redisDb *db, robj *key, int flags) { dictEntry *de = dictFind(db->dict,key->ptr); if (de) { robj *val = dictGetVal(de); /* Update the access time for the ageing algorithm. * Don't do it if we have a saving child, as this will trigger * a copy on write madness. */ if (server.rdb_child_pid == -1 && server.aof_child_pid == -1 && !(flags & LOOKUP_NOTOUCH)) { if (server.maxmemory_policy & MAXMEMORY_FLAG_LFU) { updateLFU(val); } else { val->lru = LRU_CLOCK(); // 更新lru字段的值 } } return val; } else { return NULL; } }
lookupKey()
函数用于查找key对应的缓存对象,所以当缓存对象被访问时便会调用此函数。
Redis数据淘汰
接下来我们分析一下当Redis内存使用超过配置的最大内存使用限制时的处理方式。
Redis在处理每一个命令时都会检查内存的使用是否超过了限制的最大值,处理命令是通过 processCommand()
函数进行的,检查内存使用情况的代码如下:
int processCommand(client *c) { ... if (server.maxmemory && !server.lua_timedout) { int out_of_memory = freeMemoryIfNeededAndSafe() == C_ERR; if (server.current_client == NULL) return C_ERR; if (out_of_memory && (c->cmd->flags & CMD_DENYOOM || (c->flags & CLIENT_MULTI && c->cmd->proc != execCommand))) { flagTransaction(c); addReply(c, shared.oomerr); return C_OK; } } ... }
检查内存的使用情况主要通过 freeMemoryIfNeededAndSafe()
函数进行,而 freeMemoryIfNeededAndSafe()
函数最终会调用 freeMemoryIfNeeded()
函数进行处理,由于 freeMemoryIfNeeded()
函数比较庞大,所以我们分段来进行分析:
int freeMemoryIfNeeded(void) { ... size_t mem_reported, mem_tofree, mem_freed; mstime_t latency, eviction_latency; long long delta; int slaves = listLength(server.slaves); ... if (getMaxmemoryState(&mem_reported,NULL,&mem_tofree,NULL) == C_OK) return C_OK; mem_freed = 0; if (server.maxmemory_policy == MAXMEMORY_NO_EVICTION) goto cant_free;
freeMemoryIfNeeded()
函数首先会调用 getMaxmemoryState()
函数来获取Redis的内存使用情况,如果 getMaxmemoryState()
函数返回 C_OK
,表示内存使用总量还没有超出限制,直接返回 C_OK
就可以了。如果 getMaxmemoryState()
函数不是返回 C_OK
,表示内存使用总量已经超出限制,需要进行数据淘汰,需要淘汰数据的大小通过 mem_tofree
参数返回。
当然,如果配置的淘汰策略为 noeviction
,表示不能进行数据淘汰,所以需要返回 C_ERR
表示有错误。
接着分析剩余的代码片段:
latencyStartMonitor(latency); while (mem_freed < mem_tofree) { int j, k, i, keys_freed = 0; static unsigned int next_db = 0; sds bestkey = NULL; int bestdbid; redisDb *db; dict *dict; dictEntry *de; if (server.maxmemory_policy & (MAXMEMORY_FLAG_LRU|MAXMEMORY_FLAG_LFU) || server.maxmemory_policy == MAXMEMORY_VOLATILE_TTL) { struct evictionPoolEntry *pool = EvictionPoolLRU; while(bestkey == NULL) { unsigned long total_keys = 0, keys; for (i = 0; i < server.dbnum; i++) { db = server.db+i; dict = (server.maxmemory_policy & MAXMEMORY_FLAG_ALLKEYS) ? db->dict : db->expires; if ((keys = dictSize(dict)) != 0) { evictionPoolPopulate(i, dict, db->dict, pool); total_keys += keys; } } if (!total_keys) break; /* No keys to evict. */ for (k = EVPOOL_SIZE-1; k >= 0; k--) { if (pool[k].key == NULL) continue; bestdbid = pool[k].dbid; if (server.maxmemory_policy & MAXMEMORY_FLAG_ALLKEYS) { de = dictFind(server.db[pool[k].dbid].dict, pool[k].key); } else { de = dictFind(server.db[pool[k].dbid].expires, pool[k].key); } if (pool[k].key != pool[k].cached) sdsfree(pool[k].key); pool[k].key = NULL; pool[k].idle = 0; if (de) { bestkey = dictGetKey(de); break; } else { /* Ghost... Iterate again. */ } } } }
如果内存使用总量超出限制,并且配置了淘汰策略,那么就开始数据淘汰过程。在上面的代码中, mem_tofree
变量表示要淘汰的数据总量,而 mem_freed
变量表示已经淘汰的数据总量。所以在 while
循环中的条件是 mem_freed < mem_tofree
,表示淘汰的数据总量一定要达到 mem_tofree
为止。
前面介绍过,Redis的淘汰策略有很多中,所以进行数据淘汰时需要根据配置的策略进行。如果配置的淘汰策略是 LRU/LFU/TTL
的话,那么就进入 if
代码块。在 if
代码块里,首先调用 evictionPoolPopulate()
函数选择一些缓存对象样本放置到 EvictionPoolLRU
数组中。 evictionPoolPopulate()
函数后面会进行分析,现在只需要知道 evictionPoolPopulate()
函数是选取一些缓存对象样本就可以了。
获取到缓存对象样本后,还需要从样本中获取最合适的缓存对象进行淘汰,因为在选择样本时会把最合适的缓存对象放置在 EvictionPoolLRU
数组的尾部,所以只需要从 EvictionPoolLRU
数组的尾部开始查找一个不为空的缓存对象即可。
else if (server.maxmemory_policy == MAXMEMORY_ALLKEYS_RANDOM || server.maxmemory_policy == MAXMEMORY_VOLATILE_RANDOM) { for (i = 0; i < server.dbnum; i++) { j = (++next_db) % server.dbnum; db = server.db+j; dict = (server.maxmemory_policy == MAXMEMORY_ALLKEYS_RANDOM) ? db->dict : db->expires; if (dictSize(dict) != 0) { de = dictGetRandomKey(dict); bestkey = dictGetKey(de); bestdbid = j; break; } } }
如果使用随机淘汰策略,那么就进入 else if
代码块,这部分代码的逻辑很简单,如果配置的淘汰策略是 volatile-random
,那么就从有过期时间的缓存对象中随机获取,否则就从所有的缓存对象中随机获取。
if (bestkey) { db = server.db+bestdbid; robj *keyobj = createStringObject(bestkey,sdslen(bestkey)); propagateExpire(db,keyobj,server.lazyfree_lazy_eviction); delta = (long long) zmalloc_used_memory(); latencyStartMonitor(eviction_latency); // 删除缓存对象 if (server.lazyfree_lazy_eviction) dbAsyncDelete(db,keyobj); else dbSyncDelete(db,keyobj); latencyEndMonitor(eviction_latency); latencyAddSampleIfNeeded("eviction-del",eviction_latency); latencyRemoveNestedEvent(latency,eviction_latency); delta -= (long long) zmalloc_used_memory(); mem_freed += delta; server.stat_evictedkeys++; notifyKeyspaceEvent(NOTIFY_EVICTED, "evicted", keyobj, db->id); decrRefCount(keyobj); keys_freed++; if (slaves) flushSlavesOutputBuffers(); if (server.lazyfree_lazy_eviction && !(keys_freed % 16)) { if (getMaxmemoryState(NULL,NULL,NULL,NULL) == C_OK) { mem_freed = mem_tofree; } } }
如果找到要淘汰的缓存对象,那么就开始释放缓存对象所占用的内存空间。除了需要释放缓存对象占用的内存空间外,还需要进行一些其他的操作,比如把淘汰的缓存对象同步到从服务器和把淘汰的缓存对象追加到 AOF文件
中等。
当条件 mem_freed < mem_tofree
为假时便会退出 while
循环,说明Redis的内存使用总量已经小于最大的内存使用限制, freeMemoryIfNeeded()
函数便会返回 C_OK
表示成功执行。
淘汰数据样本采集
前面说了,当使用非随机淘汰策略时需要进行数据采样( volatile-lru/volatile-lfu/volatile-ttl/allkeys-lru/allkeys-lfu
),数据采样通过 evictionPoolPopulate()
函数进行,由于此函数比较庞大,所以对代码分段分析:
void evictionPoolPopulate(int dbid, dict *sampledict, dict *keydict, struct evictionPoolEntry *pool) { int j, k, count; dictEntry *samples[server.maxmemory_samples]; count = dictGetSomeKeys(sampledict,samples,server.maxmemory_samples);
evictionPoolPopulate()
函数首先调用 dictGetSomeKeys()
函数从缓存对象集合中获取一些样本,并保存在 samples
数组中。
for (j = 0; j < count; j++) { unsigned long long idle; sds key; robj *o; dictEntry *de; de = samples[j]; key = dictGetKey(de); if (server.maxmemory_policy != MAXMEMORY_VOLATILE_TTL) { if (sampledict != keydict) de = dictFind(keydict, key); o = dictGetVal(de); } if (server.maxmemory_policy & MAXMEMORY_FLAG_LRU) { idle = estimateObjectIdleTime(o); } else if (server.maxmemory_policy & MAXMEMORY_FLAG_LFU) { idle = 255-LFUDecrAndReturn(o); } else if (server.maxmemory_policy == MAXMEMORY_VOLATILE_TTL) { idle = ULLONG_MAX - (long)dictGetVal(de); } else { serverPanic("Unknown eviction policy in evictionPoolPopulate()"); }
上面的代码主要是获取样本缓存对象的 排序 权值 idel
,如果使用 LRU淘汰算法
,那么就调用 estimateObjectIdleTime()
函数获取排序权值, estimateObjectIdleTime()
函数用于获取缓存对象有多长时间没有被访问。排序按照 idle
的值升序排序,就是说 idle
的值越大,就排到越后。
k = 0; while (k < EVPOOL_SIZE && pool[k].key && pool[k].idle < idle) k++; if (k == 0 && pool[EVPOOL_SIZE-1].key != NULL) { continue; } else if (k < EVPOOL_SIZE && pool[k].key == NULL) { } else { if (pool[EVPOOL_SIZE-1].key == NULL) { sds cached = pool[EVPOOL_SIZE-1].cached; memmove(pool+k+1,pool+k, sizeof(pool[0])*(EVPOOL_SIZE-k-1)); pool[k].cached = cached; } else { k--; sds cached = pool[0].cached; if (pool[0].key != pool[0].cached) sdsfree(pool[0].key); memmove(pool,pool+1,sizeof(pool[0])*k); pool[k].cached = cached; } } int klen = sdslen(key); if (klen > EVPOOL_CACHED_SDS_SIZE) { pool[k].key = sdsdup(key); } else { memcpy(pool[k].cached,key,klen+1); sdssetlen(pool[k].cached,klen); pool[k].key = pool[k].cached; } pool[k].idle = idle; pool[k].dbid = dbid; } }
上面这段代码的作用是:根据 idle
的值找到当前缓存对象所在 EvictionPoolLRU
数组的位置,然后把缓存对象保存到 EvictionPoolLRU
数组中。以下插图解释了数据采样的过程:
所以 EvictionPoolLRU
数组的最后一个元素便是最优的淘汰缓存对象。
从上面的分析可知,淘汰数据时只是从样本中找到最优的淘汰缓存对象,并不是从所有缓存对象集合中查找。由于前面介绍的 LRU算法
需要维护一个LRU链表,而维护一个LRU链表的成本比较大,所以Redis才出此下策。
以上所述就是小编给大家介绍的《Redis数据淘汰算法》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!
猜你喜欢:- 算法 - 06 | 链表(上):如何实现LRU缓存淘汰算法?
- Redis中内存淘汰算法实现
- 数据结构与算法 | 如何实现LRU缓存淘汰算法
- golang实现LRU缓存淘汰算法
- 聊聊缓存淘汰算法:LRU 实现原理
- 这几个缓存淘汰算法你知道吗?
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。