redis源码阅读之aof

栏目: 数据库 · 发布时间: 5年前

内容简介:本来今天想写点别的,但是心想之前一篇既然已经提到持久化这边了,而且也说了之后会讲到,索性,今天就说说这个。所谓持久化,就是将内存中的内容同步到磁盘当中,redis提供了两种持久化机制:aof和rdb。今天的主角是aof。aof持久化将被执行的命令写到AOF的末尾,以此来记录数据发生的变化,它一共有三个配置选项,在redis.conf这个配置文件中,具体如下:

本来今天想写点别的,但是心想之前一篇既然已经提到持久化这边了,而且也说了之后会讲到,索性,今天就说说这个。

所谓持久化,就是将内存中的内容同步到磁盘当中,redis提供了两种持久化机制:aof和rdb。今天的主角是aof。

aof持久化将被执行的命令写到AOF的末尾,以此来记录数据发生的变化,它一共有三个配置选项,在redis.conf这个配置文件中,具体如下:

点击( 此处 )折叠或打开

  1. #
  2. # If unsure , use "everysec" .
  3. # appendfsync always
  4. appendfsync everysec
  5. # appendfsync no

其意义也比较明显了,就不再多说了。

redis开启aof的开关代码如下:

点击( 此处 )折叠或打开

  1. / * Called when the user switches from "appendonly no" to "appendonly yes"
  2.   * at runtime using the CONFIG command . * /
  3. int startAppendOnly ( void ) {
  4.     char cwd [ MAXPATHLEN ] ; / * Current working dir path for error messages . * /
  5.      int newfd ;
  6.     newfd = open ( server . aof_filename , O_WRONLY | O_APPEND | O_CREAT , 0644 ) ;
  7.     serverAssert ( server . aof_state = = AOF_OFF ) ;
  8.      if ( newfd = = - 1 ) {
  9.         char * cwdp = getcwd ( cwd , MAXPATHLEN ) ;
  10.         serverLog ( LL_WARNING ,
  11.              "Redis needs to enable the AOF but can't open the "
  12.              "append only file %s (in server root dir %s): %s" ,
  13.             server . aof_filename ,
  14.             cwdp ? cwdp : "unknown" ,
  15.             strerror ( errno ) ) ;
  16.         return C_ERR ;
  17.      }
  18.      if ( server . rdb_child_pid ! = - 1 ) {
  19.         server . aof_rewrite_scheduled = 1 ;
  20.         serverLog ( LL_WARNING , "AOF was enabled but there is already a child process saving an RDB file on disk. An AOF background was scheduled to start when possible." ) ;
  21.      } else {
  22.          / * If there is a pending AOF rewrite , we need to switch it off and
  23.           * start a new one : the old one cannot be reused becuase it is not
  24.           * accumulating the AOF buffer . * /
  25.          if ( server . aof_child_pid ! = - 1 ) {
  26.             serverLog ( LL_WARNING , "AOF was enabled but there is already an AOF rewriting in background. Stopping background AOF and starting a rewrite now." ) ;
  27.             killAppendOnlyChild ( ) ;
  28.          }
  29.          if ( rewriteAppendOnlyFileBackground ( ) = = C_ERR ) {
  30.             close ( newfd ) ;
  31.             serverLog ( LL_WARNING , "Redis needs to enable the AOF but can't trigger a background AOF rewrite operation. Check the above logs for more info about the error." ) ;
  32.             return C_ERR ;
  33.          }
  34.      }
  35.      / * We correctly switched on AOF , now wait for the rewrite to be complete
  36.       * in order to append data on disk . * /
  37.     server . aof_state = AOF_WAIT_REWRITE ;
  38.     server . aof_last_fsync = server . unixtime ;
  39.     server . aof_fd = newfd ;
  40.     return C_OK ;
  41. }

aof写的代码如下:

点击( 此处 )折叠或打开

  1. ssize_t aofWrite ( int fd , const char * buf , size_t len ) {
  2.     ssize_t nwritten = 0 , totwritten = 0 ;
  3.      while ( len ) {
  4.         nwritten = write ( fd , buf , len ) ;
  5.          if ( nwritten < 0 ) {
  6.              if ( errno = = EINTR ) {
  7.                 continue ;
  8.              }
  9.             return totwritten ? totwritten : - 1 ;
  10.          }
  11.          len - = nwritten ;
  12.         buf + = nwritten ;
  13.         totwritten + = nwritten ;
  14.      }
  15.     return totwritten ;
  16. }

将aof的缓冲区(将aof选项置为 everysec的时候,中间会有缓冲),写入磁盘的代码如下,其中需要判断后台是否有fsync正在执行(如果正在执行,会阻塞write调用),如果有,则会延迟,但是如果force参数被设置的话,就啥都不管不顾了,直接开整~

点击( 此处 )折叠或打开

  1. #define AOF_WRITE_LOG_ERROR_RATE 30 / * Seconds between errors logging . * /
  2. void flushAppendOnlyFile ( int force ) {
  3.     ssize_t nwritten ;
  4.      int sync_in_progress = 0 ;
  5.     mstime_t latency ;
  6.      if ( sdslen ( server . aof_buf ) = = 0 ) return ;
  7.      if ( server . aof_fsync = = AOF_FSYNC_EVERYSEC )
  8.         sync_in_progress = bioPendingJobsOfType ( BIO_AOF_FSYNC ) ! = 0 ;//bio有讲过
  9.      if ( server . aof_fsync = = AOF_FSYNC_EVERYSEC & & ! force ) {
  10.          / * With this append fsync policy we do background fsyncing .
  11.           * If the fsync is still in progress we can try to delay
  12.           * the write for a couple of seconds . * /
  13.          if ( sync_in_progress ) {
  14.              if ( server . aof_flush_postponed_start = = 0 ) {
  15.                  / * No previous write postponing , remember that we are
  16.                   * postponing the flush and return . * /
  17.                 server . aof_flush_postponed_start = server . unixtime ;
  18.                 return ;
  19.              } else if ( server . unixtime - server . aof_flush_postponed_start < 2 ) {
  20.                  / * We were already waiting for fsync to finish , but for less
  21.                   * than two seconds this is still ok . Postpone again . * /
  22.                 return ;
  23.              }
  24.              / * Otherwise fall trough , and go write since we can ' t wait
  25.               * over two seconds . * /
  26.             server . aof_delayed_fsync + + ;
  27.             serverLog ( LL_NOTICE , "Asynchronous AOF fsync is taking too long (disk is busy?). Writing the AOF buffer without waiting for fsync to complete, this may slow down Redis." ) ;
  28.          }
  29.      }
  30.      / * We want to perform a single write . This should be guaranteed atomic
  31.       * at least if the filesystem we are writing is a real physical one .
  32.       * While this will save us against the server being killed I don ' t think
  33.       * there is much to do about the whole server stopping for power problems
  34.       * or alike * /
  35.     latencyStartMonitor ( latency ) ;
  36.     nwritten = aofWrite ( server . aof_fd , server . aof_buf , sdslen ( server . aof_buf ) ) ;
  37.     latencyEndMonitor ( latency ) ;
  38.      / * We want to capture different events for delayed writes :
  39.       * when the delay happens with a pending fsync , or with a saving child
  40.       * active , and when the above two conditions are missing .
  41.       * We also use an additional event name to save all samples which is
  42.       * useful for graphing / monitoring purposes . * /
  43.      if ( sync_in_progress ) {
  44.         latencyAddSampleIfNeeded ( "aof-write-pending-fsync" , latency ) ;
  45.      } else if ( server . aof_child_pid ! = - 1 | | server . rdb_child_pid ! = - 1 ) {
  46.         latencyAddSampleIfNeeded ( "aof-write-active-child" , latency ) ;
  47.      } else {
  48.         latencyAddSampleIfNeeded ( "aof-write-alone" , latency ) ;
  49.      }
  50.     latencyAddSampleIfNeeded ( "aof-write" , latency ) ;
  51.      / * We performed the write so reset the postponed flush sentinel to zero . * /
  52.     server . aof_flush_postponed_start = 0 ;
  53.      if ( nwritten ! = ( ssize_t ) sdslen ( server . aof_buf ) ) {
  54.         static time_t last_write_error_log = 0 ;
  55.          int can_log = 0 ;
  56.          / * Limit logging rate to 1 line per AOF_WRITE_LOG_ERROR_RATE seconds . * /
  57.          if ( ( server . unixtime - last_write_error_log ) > AOF_WRITE_LOG_ERROR_RATE ) {
  58.             can_log = 1 ;
  59.             last_write_error_log = server . unixtime ;
  60.          }
  61.          / * Log the AOF write error and record the error code . * /
  62.          if ( nwritten = = - 1 ) {
  63.              if ( can_log ) {
  64.                 serverLog ( LL_WARNING , "Error writing to the AOF file: %s" ,
  65.                     strerror ( errno ) ) ;
  66.                 server . aof_last_write_errno = errno ;
  67.              }
  68.          } else {
  69.              if ( can_log ) {
  70.                 serverLog ( LL_WARNING , "Short write while writing to "
  71.                                         "the AOF file: (nwritten=%lld, "
  72.                                         "expected=%lld)" ,
  73.                                         ( long long ) nwritten ,
  74.                                         ( long long ) sdslen ( server . aof_buf ) ) ;
  75.              }
  76.              if ( ftruncate ( server . aof_fd , server . aof_current_size ) = = - 1 ) {
  77.                  if ( can_log ) {
  78.                     serverLog ( LL_WARNING , "Could not remove short write "
  79.                               "from the append-only file. Redis may refuse "
  80.                               "to load the AOF the next time it starts. "
  81.                               "ftruncate: %s" , strerror ( errno ) ) ;
  82.                  }
  83.              } else {
  84.                  / * If the ftruncate ( ) succeeded we can set nwritten to
  85.                   * - 1 since there is no longer partial data into the AOF . * /
  86.                 nwritten = - 1 ;
  87.              }
  88.             server . aof_last_write_errno = ENOSPC ;
  89.          }
  90.          / * Handle the AOF write error . * /
  91.          if ( server . aof_fsync = = AOF_FSYNC_ALWAYS ) {
  92.              / * We can ' t recover when the fsync policy is ALWAYS since the
  93.               * reply for the client is already in the output buffers , and we
  94.               * have the contract with the user that on acknowledged write data
  95.               * is synced on disk . * /
  96.             serverLog ( LL_WARNING , "Can't recover from AOF write error when the AOF fsync policy is 'always'. Exiting..." ) ;
  97.              exit ( 1 ) ;
  98.          } else {
  99.              / * Recover from failed write leaving data into the buffer . However
  100.               * set an error to stop accepting writes as long as the error
  101.               * condition is not cleared . * /
  102.             server . aof_last_write_status = C_ERR ;
  103.              / * Trim the sds buffer if there was a partial write , and there
  104.               * was no way to undo it with ftruncate ( 2 ) . * /
  105.              if ( nwritten > 0 ) {
  106.                 server . aof_current_size + = nwritten ;
  107.                 sdsrange ( server . aof_buf , nwritten , - 1 ) ;
  108.              }
  109.             return ; / * We ' ll try again on the next call . . . * /
  110.          }
  111.      } else {
  112.          / * Successful write ( 2 ) . If AOF was in error state , restore the
  113.           * OK state and log the event . * /
  114.          if ( server . aof_last_write_status = = C_ERR ) {
  115.             serverLog ( LL_WARNING ,
  116.                  "AOF write error looks solved, Redis can write again." ) ;
  117.             server . aof_last_write_status = C_OK ;
  118.          }
  119.      }
  120.     server . aof_current_size + = nwritten ;
  121.      / * Re - use AOF buffer when it is small enough . The maximum comes from the
  122.       * arena size of 4k minus some overhead ( but is otherwise arbitrary ) . * /
  123.      if ( ( sdslen ( server . aof_buf ) + sdsavail ( server . aof_buf ) ) < 4000 ) {
  124.         sdsclear ( server . aof_buf ) ;
  125.      } else {
  126.         sdsfree ( server . aof_buf ) ;
  127.         server . aof_buf = sdsempty ( ) ;
  128.      }
  129.      / * Don ' t fsync if no - appendfsync - on - rewrite is set to yes and there are
  130.       * children doing I / O in the background . * /
  131.      if ( server . aof_no_fsync_on_rewrite & &
  132.          ( server . aof_child_pid ! = - 1 | | server . rdb_child_pid ! = - 1 ) )
  133.             return ;
  134.      / * Perform the fsync if needed . * /
  135.      if ( server . aof_fsync = = AOF_FSYNC_ALWAYS ) {
  136.          / * aof_fsync is defined as fdatasync ( ) for Linux in order to avoid
  137.           * flushing metadata . * /
  138.         latencyStartMonitor ( latency ) ;
  139.         aof_fsync ( server . aof_fd ) ; / * Let ' s try to get this data on the disk * /
  140.         latencyEndMonitor ( latency ) ;
  141.         latencyAddSampleIfNeeded ( "aof-fsync-always" , latency ) ;
  142.         server . aof_last_fsync = server . unixtime ;
  143.      } else if ( ( server . aof_fsync = = AOF_FSYNC_EVERYSEC & &
  144.                 server . unixtime > server . aof_last_fsync ) ) {
  145.          if ( ! sync_in_progress ) aof_background_fsync ( server . aof_fd ) ;
  146.         server . aof_last_fsync = server . unixtime ;
  147.      }
  148. }

另一个需要注意的点就是BGREWRITEAOF命令会重写AOF文件,使AOF文件尽可能的小,其中的大部分操作都是尽可能的使用占用空间小的内存类型,在此不再赘述了。在此期间的命令缓存是通过如下机制实现的。

1.  使用多个缓存block而非一整块大缓存,每个block10M,如下所示

点击( 此处 )折叠或打开

  1. #define AOF_RW_BUF_BLOCK_SIZE ( 1024 * 1024 * 10 ) / * 10 MB per block * /
  2. typedef struct aofrwblock {
  3.     unsigned long used , free ;
  4.     char buf [ AOF_RW_BUF_BLOCK_SIZE ] ;
  5. } aofrwblock ;

2. 向缓存中写数据的时候是先找到当前链表的最后一个元素,若满足,直接写,若不满足,填上空缺,重新建,继续写

点击( 此处 )折叠或打开

  1. / * Append data to the AOF rewrite buffer , allocating new blocks if needed . * /
  2. void aofRewriteBufferAppend ( unsigned char * s , unsigned long len ) {
  3.     listNode * ln = listLast ( server . aof_rewrite_buf_blocks ) ;
  4.     aofrwblock * block = ln ? ln - > value : NULL ;
  5.      while ( len ) {
  6.          / * If we already got at least an allocated block , try appending
  7.           * at least some piece into it . * /
  8.          if ( block ) {
  9.             unsigned long thislen = ( block - > free < len ) ? block -

以上所述就是小编给大家介绍的《redis源码阅读之aof》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

统计学习方法

统计学习方法

李航 / 清华大学出版社 / 2012-3 / 38.00元

详细介绍支持向量机、Boosting、最大熵、条件随机场等十个统计学习方法。一起来看看 《统计学习方法》 这本书的介绍吧!

JSON 在线解析
JSON 在线解析

在线 JSON 格式化工具

在线进制转换器
在线进制转换器

各进制数互转换器

Markdown 在线编辑器
Markdown 在线编辑器

Markdown 在线编辑器