内容简介:redis当中的rio的话就是在后端执行读写操作,bio相当于它的一个补充操作,它使用一个大结构体将操作全部封装起来,有点像linux当中驱动的写法,方便统一接口,具体结构体如下:由结构可知,rio会处理三种不同的read和write,分别为1. 内存buffer,内部由sds表示
redis当中的rio的话就是在后端执行读写操作,bio相当于它的一个补充操作,它使用一个大结构体将操作全部封装起来,有点像 linux 当中驱动的写法,方便统一接口,具体结构体如下:
点击( 此处 )折叠或打开
- struct _rio {
- / * Backend functions .
- * Since this functions do not tolerate short writes or reads the return
- * value is simplified to : zero on error , non zero on complete success . * /
- size_t ( * read ) ( struct _rio * , void * buf , size_t len ) ;
- size_t ( * write ) ( struct _rio * , const void * buf , size_t len ) ;
- off_t ( * tell ) ( struct _rio * ) ;
- int ( * flush ) ( struct _rio * ) ;
- / * The update_cksum method if not NULL is used to compute the checksum of
- * all the data that was read or written so far . The method should be
- * designed so that can be called with the current checksum , and the buf
- * and len fields pointing to the new block of data to add to the checksum
- * computation . * /
- void ( * update_cksum ) ( struct _rio * , const void * buf , size_t len ) ;
- / * The current checksum * /
- uint64_t cksum ;
- / * number of bytes read or written * /
- size_t processed_bytes ;
- / * maximum single read or write chunk size * /
- size_t max_processing_chunk ;
- / * Backend - specific vars . * /
- union {
- / * In - memory buffer target . * /
- struct {
- sds ptr ;
- off_t pos ;
- } buffer ;
- / * Stdio file pointer target . * /
- struct {
- FILE * fp ;
- off_t buffered ; / * Bytes written since last fsync . * /
- off_t autosync ; / * fsync after ' autosync ' bytes written . * /
- } file ;
- / * Multiple FDs target ( used to write to N sockets ) . * /
- struct {
- int * fds ; / * File descriptors . * /
- int * state ; / * Error state of each fd . 0 ( if ok ) or errno . * /
- int numfds ;
- off_t pos ;
- sds buf ;
- } fdset ;
- } io ;
- } ;
- typedef struct _rio rio ;
由结构可知,rio会处理三种不同的read和write,分别为
1. 内存buffer,内部由sds表示
2. 文件指针,内部由FILE*表示
3. 文件描述符数组,内部由int *表示
rio结构 留给外部调用的接口有read,write,tell,flush和update_cksum,而整个rio留给外部调用的接口是将结构体的接口进一步封装了:
点击( 此处 )折叠或打开
- static inline size_t rioWrite ( rio * r , const void * buf , size_t len ) {
- while ( len ) {
- size_t bytes_to_write = ( r - > max_processing_chunk & & r - > max_processing_chunk < len ) ? r - > max_processing_chunk : len ;
- if ( r - > update_cksum ) r - > update_cksum ( r , buf , bytes_to_write ) ;
- if ( r - > write ( r , buf , bytes_to_write ) = = 0 ) / * 0表示失败;非0表示成功 * /
- return 0 ;
- buf = ( char * ) buf + bytes_to_write ;
- len - = bytes_to_write ;
- r - > processed_bytes + = bytes_to_write ;
- }
- return 1 ;
- }
- static inline size_t rioRead ( rio * r , void * buf , size_t len ) {
- while ( len ) {
- size_t bytes_to_read = ( r - > max_processing_chunk & & r - > max_processing_chunk < len ) ? r - > max_processing_chunk : len ;
- if ( r - > read ( r , buf , bytes_to_read ) = = 0 ) / * 0表示失败;非0表示成功 * /
- return 0 ;
- if ( r - > update_cksum ) r - > update_cksum ( r , buf , bytes_to_read ) ;
- buf = ( char * ) buf + bytes_to_read ;
- len - = bytes_to_read ;
- r - > processed_bytes + = bytes_to_read ;
- }
- return 1 ;
- }
- static inline off_t rioTell ( rio * r ) {
- return r - > tell ( r ) ;
- }
- static inline int rioFlush ( rio * r ) {
- return r - > flush ( r ) ;
- }
接下来聊聊内部实现,还是按照之前说过的顺序内存buffer-> 文件指针->文件描述符数组来进行,其总体思路就是自定义对应类别的io函数,然后实例化结构体,将对应io函数填入结构体
1. 内存buffer主要是针对sds的封装
点击( 此处 )折叠或打开
- / * Returns 1 or 0 for success / failure . * /
- static size_t rioBufferWrite ( rio * r , const void * buf , size_t len ) {
- r - > io . buffer . ptr = sdscatlen ( r - > io . buffer . ptr , ( char * ) buf , len ) ;
- r - > io . buffer . pos + = len ;
- return 1 ;
- }
- / * Returns 1 or 0 for success / failure . * /
- static size_t rioBufferRead ( rio * r , void * buf , size_t len ) {
- if ( sdslen ( r - > io . buffer . ptr ) - r - > io . buffer . pos < len )
- return 0 ; / * not enough buffer to return len bytes . * /
- memcpy ( buf , r - > io . buffer . ptr + r - > io . buffer . pos , len ) ;
- r - > io . buffer . pos + = len ;
- return 1 ;
- }
- / * Returns read / write position in buffer . * /
- static off_t rioBufferTell ( rio * r ) {
- return r - > io . buffer . pos ;
- }
- / * Flushes any buffer to target device if applicable . Returns 1 on success
- * and 0 on failures . * /
- static int rioBufferFlush ( rio * r ) {
- UNUSED ( r ) ;
- return 1 ; / * Nothing to do , our write just appends to the buffer . * /
- }
- static const rio rioBufferIO = {
- rioBufferRead ,
- rioBufferWrite ,
- rioBufferTell ,
- rioBufferFlush ,
- NULL , / * update_checksum * /
- 0 , / * current checksum * /
- 0 , / * bytes read or written * /
- 0 , / * read / write chunk size * /
- { { NULL , 0 } } / * union for io - specific vars * /
- } ;
- void rioInitWithBuffer ( rio * r , sds s ) {
- * r = rioBufferIO ;
- r - > io . buffer . ptr = s ;
- r - > io . buffer . pos = 0 ;
- }
2. 文件指针操作的封装:
点击( 此处 )折叠或打开
- / * Returns 1 or 0 for success / failure . * /
- static size_t rioFileWrite ( rio * r , const void * buf , size_t len ) {
- size_t retval ;
- retval = fwrite ( buf , len , 1 , r - > io . file . fp ) ;
- r - > io . file . buffered + = len ;
- if ( r - > io . file . autosync & &
- r - > io . file . buffered > = r - > io . file . autosync )
- {
- fflush ( r - > io . file . fp ) ;
- aof_fsync ( fileno ( r - > io . file . fp ) ) ;
- r - > io . file . buffered = 0 ;
- }
- return retval ;
- }
- / * Returns 1 or 0 for success / failure . * /
- static size_t rioFileRead ( rio * r , void * buf , size_t len ) {
- return fread ( buf , len , 1 , r - > io . file . fp ) ;
- }
- / * Returns read / write position in file . * /
- static off_t rioFileTell ( rio * r ) {
- return ftello ( r - > io . file . fp ) ;
- }
- / * Flushes any buffer to target device if applicable . Returns 1 on success
- * and 0 on failures . * /
- static int rioFileFlush ( rio * r ) {
- return ( fflush ( r - > io . file . fp ) = = 0 ) ? 1 : 0 ;
- }
- static const rio rioFileIO = {
- rioFileRead ,
- rioFileWrite ,
- rioFileTell ,
- rioFileFlush ,
- NULL , / * update_checksum * /
- 0 , / * current checksum * /
- 0 , / * bytes read or written * /
- 0 , / * read / write chunk size * /
- { { NULL , 0 } } / * union for io - specific vars * /
- } ;
- void rioInitWithFile ( rio * r , FILE * fp ) {
- * r = rioFileIO ;
- r - > io . file . fp = fp ;
- r - > io . file . buffered = 0 ;
- r - > io . file . autosync = 0 ;
- }
3. 文件描述符数组,这里要注意的是写操作,如果要写的buffer为空,会写入文件描述符数组自身的buffer,并且每个文件描述符最多写1024个字节,可以并行执行,代码如下:
点击( 此处 )折叠或打开
- / * Returns 1 or 0 for success / failure .
- * The function returns success as long as we are able to correctly write
- * to at least one file descriptor .
- *
- * When buf is NULL and len is 0 , the function performs a flush operation
- * if there is some pending buffer , so this function is also used in order
- * to implement rioFdsetFlush ( ) . * /
- static size_t rioFdsetWrite ( rio * r , const void * buf , size_t len ) {
- ssize_t retval ;
- int j ;
- unsigned char * p = ( unsigned char * ) buf ;
- int doflush = ( buf = = NULL & & len = = 0 ) ;
- / * To start we always append to our buffer . If it gets larger than
- * a given size , we actually write to the sockets . * /
- if ( len ) {
- r - > io . fdset . buf = sdscatlen ( r - > io . fdset . buf , buf , len ) ;
- len = 0 ; / * Prevent entering the while below if we don ' t flush . * /
- if ( sdslen ( r - > io . fdset . buf ) > PROTO_IOBUF_LEN ) doflush = 1 ;
- }
- if ( doflush ) {
- p = ( unsigned char * ) r - > io . fdset . buf ;
- len = sdslen ( r - > io . fdset . buf ) ;
- }
- / * Write in little chunchs so that when there are big writes we
- * parallelize while the kernel is sending data in background to
- * the TCP socket . * /
- while ( len ) {
- size_t count = len < 1024 ? len : 1024 ;
- int broken = 0 ;
- for ( j = 0 ; j < r - > io . fdset . numfds ; j + + ) {
- if ( r - > io . fdset . state [ j ] ! = 0 ) {
- / * Skip FDs alraedy in error . * /
- broken + + ;
- continue ;
- }
- / * Make sure to write ' count ' bytes to the socket regardless
- * of short writes . * /
- size_t nwritten = 0 ;
- while ( nwritten ! = count ) {
- retval = write ( r - > io . fdset . fds [ j ] , p + nwritten , count - nwritten ) ;
- if ( retval < = 0 ) {
- / * With blocking sockets , which is the sole user of this
- * rio target , EWOULDBLOCK is returned only because of
- * the SO_SNDTIMEO socket option , so we translate the error
- * into one more recognizable by the user . * /
- if ( retval = = - 1 & & errno = = EWOULDBLOCK ) errno = ETIMEDOUT ;
- break ;
- }
- nwritten + = retval ;
- }
- if ( nwritten ! = count ) {
- / * Mark this FD as broken . * /
- r - > io . fdset . state [ j ] = errno ;
- if ( r - > io . fdset . state [ j ] = = 0 ) r - > io . fdset . state [ j ] = EIO ;
- }
- }
- if ( broken = = r - > io . fdset . numfds ) return 0 ; / * All the FDs in error . * /
- p + = count ;
以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网
猜你喜欢:- 【源码阅读】AndPermission源码阅读
- 【源码阅读】Gson源码阅读
- 如何阅读Java源码 ,阅读java的真实体会
- 我的源码阅读之路:redux源码剖析
- JDK源码阅读(六):HashMap源码分析
- 如何阅读源码?
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。