Sphinx源码学习笔记(一):索引创建

栏目: 编程工具 · 发布时间: 6年前

内容简介:因为项目开发需要在游戏内部实现玩家名称的模糊查找功能,本身直接使用Sphinx配置mysql可以直接搭建一套模糊匹配的即可支持功能的实现。但是因为目前公司游戏都使用是tcaplus作为存储引擎,本着不想在项目中额外的引入mysql环境,因此准备参考Sphinx的源码自己实现或者在源码基础上做二次开发实现一套类似的模糊匹配服务功能,因此此笔记主要是自己在理解Sphinx源码记录,方便以后回顾总结。   参考Sphinx源码的版本号为sphinx-2.3.2-beta。索引创建主要源码是 indexer.cp

前言

  因为项目开发需要在游戏内部实现玩家名称的模糊查找功能,本身直接使用Sphinx配置 mysql 可以直接搭建一套模糊匹配的即可支持功能的实现。但是因为目前公司游戏都使用是tcaplus作为存储引擎,本着不想在项目中额外的引入mysql环境,因此准备参考Sphinx的源码自己实现或者在源码基础上做二次开发实现一套类似的模糊匹配服务功能,因此此笔记主要是自己在理解Sphinx源码记录,方便以后回顾总结。   参考Sphinx源码的版本号为sphinx-2.3.2-beta。

源码结构

  • 重要的目录:
    • api 这个目录主要是包含了各种sphinx的native客户端
    • config 这个目录包含了configure需要的一些文件
    • cmake 这个目录包含了cmake构建需要的一些模块
    • mysqlse 这个目录包含了SphinxSE(用于直接集成于mysql引擎的工具)
    • src 这个目录就是最主要的源码目录
    • src/http search服务的http接口
  • Sphinx最终会生成5个可执行文件,分别是:
    • indexer 主要是操作索引文件,比如合并索引,重新构建索引等等
    • indextool dump索引的一些信息,比如统计信息等
    • searchd 对外提供搜索服务
    • spelldump 拼写检查的工具
    • wordbreaker 自带一个分词工具

索引创建

 索引创建主要源码是 indexer.cpp文件,源码的学习与理解只关注关键重点方面的代码,因此忽略掉一部分个人认为不是很重要的代码,文中对代码的理解属于个人的理解可能存在偏差误解,如遇这方面的问题欢迎指出讨论。

首先加在配置文件sphinx.conf

//
    CSphConfigParser cp;
    CSphConfig & hConf = cp.m_tConf;
    sOptConfig = sphLoadConfig ( sOptConfig, g_bQuiet, cp );

    if ( !hConf ( "source" ) )
        sphDie ( "no indexes found in config file '%s'", sOptConfig );

    sphCheckDuplicatePaths ( hConf );
    if ( hConf("indexer") && hConf["indexer"]("indexer") )
    {
        CSphConfigSection & hIndexer = hConf["indexer"]["indexer"];
        g_iMemLimit = hIndexer.GetSize ( "mem_limit", g_iMemLimit );
        g_iMaxXmlpipe2Field = hIndexer.GetSize ( "max_xmlpipe2_field", 2*1024*1024 );
        g_iWriteBuffer = hIndexer.GetSize ( "write_buffer", 1024*1024 );
        g_iMaxFileFieldBuffer = Max ( 1024*1024, hIndexer.GetSize ( "max_file_field_buffer", 8*1024*1024 ) );

        if ( hIndexer("on_file_field_error") )
        {
            const CSphString & sVal = hIndexer["on_file_field_error"].strval();
            if ( sVal=="ignore_field" )
                g_eOnFileFieldError = FFE_IGNORE_FIELD;
            else if ( sVal=="skip_document" )
                g_eOnFileFieldError = FFE_SKIP_DOCUMENT;
            else if ( sVal=="fail_index" )
                g_eOnFileFieldError = FFE_FAIL_INDEX;
            else
                sphDie ( "unknown on_field_field_error value (must be one of ignore_field, skip_document, fail_index)" );
        }

        bool bJsonStrict = false;
        bool bJsonAutoconvNumbers = false;
        bool bJsonKeynamesToLowercase = false;
        if ( hIndexer("on_json_attr_error") )
        {
            const CSphString & sVal = hIndexer["on_json_attr_error"].strval();
            if ( sVal=="ignore_attr" )
                bJsonStrict = false;
            else if ( sVal=="fail_index" )
                bJsonStrict = true;
            else
                sphDie ( "unknown on_json_attr_error value (must be one of ignore_attr, fail_index)" );
        }

        if ( hIndexer("json_autoconv_keynames") )
        {
            const CSphString & sVal = hIndexer["json_autoconv_keynames"].strval();
            if ( sVal=="lowercase" )
                bJsonKeynamesToLowercase = true;
            else
                sphDie ( "unknown json_autoconv_keynames value (must be 'lowercase')" );
        }

        bJsonAutoconvNumbers = ( hIndexer.GetInt ( "json_autoconv_numbers", 0 )!=0 );
        sphSetJsonOptions ( bJsonStrict, bJsonAutoconvNumbers, bJsonKeynamesToLowercase );

        sphSetThrottling ( hIndexer.GetInt ( "max_iops", 0 ), hIndexer.GetSize ( "max_iosize", 0 ) );

        sphAotSetCacheSize ( hIndexer.GetSize ( "lemmatizer_cache", 262144 ) );
    }
  • CSphConfigParser是一个sphinx解析保存配置文件的数据结构,它的关键结构定义如下,里面关键的字段是m_tConf字段,这个字段通过解析配置文件保存了所有相关配置信息的内容,其它一些字段主要是记录配置文件名称以以及为解析配置起作用的,后续读取各种配置项都是通过m_tConf这个字段读取的。m_tConf字段定义是CSphConfig结构体,而CSphConfig定义是一个二层嵌入的SmallStringHash_T类型的hash映射表。
  • SmallStringHash_T本身是以短字符串为KEY,vlaue是模板化的任意类型数据结构,在这里hash的value是CSphConfigSection结构,CSphConfigSection可以理解成是配置文件中的一个段落章节,比如indexer,source,searchd之类的配置,每个章节里面可以包含多个字配置项,它内部是一种支持可变类型的变量通过继承SmallStringHash_T结构以字符串为key,value是多重类型的实现的数据结构。
  • 最终CSphConfigParser解析出配置文件中的某一行名称作为key,读取对应值插入到vlaue中,提供后面创建索引的时候使用。
// simple config file
class CSphConfigParser
{
public:
    CSphConfig        m_tConf;
protected:
    CSphString        m_sFileName;
    int                m_iLine;
    CSphString        m_sSectionType;
    CSphString        m_sSectionName;
    char            m_sError [ 1024 ];
    int                    m_iWarnings;
    static const int    WARNS_THRESH    = 5;
};
/// config section type (hash of sections)
typedef SmallStringHash_T < CSphConfigSection >    CSphConfigType;
/// config (hash of section types)
typedef SmallStringHash_T < CSphConfigType >    CSphConfig;
/// small hash with string keys
template < typename T >
class SmallStringHash_T : public CSphOrderedHash < T, CSphString, CSphStrHashFunc, 256 > {};
/// config section (hash of variant values)
class CSphConfigSection : public SmallStringHash_T < CSphVariant >
  • 我们来梳理关于配置涉及相关的关键数据结构定义:
    • CSphConfigParser 解析并保存所有配置的信息总得数据结构
    • CSphConfig CSphConfigParser结构体中实际保存数据信息的数据结构
    • CSphString Sphinx内部自己定义的一个支持字符串各种处理的结构体
    • CSphConfigSection 保存配置文件中一个章节比如indexer,source,searchd之类的配置
    • SmallStringHash_T 以短字符串为key,value支持各种数据结构的hash表
    • CSphOrderedHash 一个内部以桶加链接实现的hash表,内部保持有序。

开始创建索引

根据启动参数选择对应业务处理流程

int iIndexed = 0;
    int iFailed = 0;
    if ( bMerge )
    {
        if ( dIndexes.GetLength()!=2 )
            sphDie ( "there must be 2 indexes to merge specified" );

        if ( !hConf["index"](dIndexes[0]) )
            sphDie ( "no merge destination index '%s'", dIndexes[0] );

        if ( !hConf["index"](dIndexes[1]) )
            sphDie ( "no merge source index '%s'", dIndexes[1] );

        bool bLastOk = DoMerge (
            hConf["index"][dIndexes[0]], dIndexes[0],
            hConf["index"][dIndexes[1]], dIndexes[1], dMergeDstFilters, g_bRotate, bMergeKillLists );
        if ( bLastOk )
            iIndexed++;
        else
            iFailed++;
    } else if ( bIndexAll )
    {
        uint64_t tmRotated = sphMicroTimer();
        hConf["index"].IterateStart ();
        while ( hConf["index"].IterateNext() )
        {
            bool bLastOk = DoIndex ( hConf["index"].IterateGet (), hConf["index"].IterateGetKey().cstr(), hConf["source"], bVerbose, fpDumpRows );
            if ( bLastOk && ( sphMicroTimer() - tmRotated > ROTATE_MIN_INTERVAL ) && g_bSendHUP && SendRotate ( hConf, false ) )
                tmRotated = sphMicroTimer();
            if ( bLastOk )
                iIndexed++;
        }
    } else
    {
        uint64_t tmRotated = sphMicroTimer();
        ARRAY_FOREACH ( j, dIndexes )
        {
            if ( !hConf["index"](dIndexes[j]) )
                fprintf ( stdout, "WARNING: no such index '%s', skipping.\n", dIndexes[j] );
            else
            {
                bool bLastOk = DoIndex ( hConf["index"][dIndexes[j]], dIndexes[j], hConf["source"], bVerbose, fpDumpRows );
                if ( bLastOk && ( sphMicroTimer() - tmRotated > ROTATE_MIN_INTERVAL ) && g_bSendHUP && SendRotate ( hConf, false ) )
                    tmRotated = sphMicroTimer();
                if ( bLastOk )
                    iIndexed++;
                else
                    iFailed++;
            }
        }
    }
  • bMerge路径代表是否是合并索引,这个一般是在增量的更新索引的时候使用。bIndexAll路径代表是否创建配置文件中的所有索引内容,这个参数意味着将会创建sphinx.conf所有index项指定的索引内容。如果这两个参数都不是那一般是指定创建一个特定的索引。我们一般用指定创建特定的索引和增量索引比较多。
  • DoIndex是真正创建索引的函数,下面我们来具体看看该函数内部做的一些关键一些流程处理。

创建分词器

CSphTokenizerSettings tTokSettings;
    sphConfTokenizer ( hIndex, tTokSettings );

    CSphDictSettings tDictSettings;
    sphConfDictionary ( hIndex, tDictSettings );

    ISphTokenizer * pTokenizer = ISphTokenizer::Create ( tTokSettings, NULL, sError );
    if ( !pTokenizer )
        sphDie ( "index '%s': %s", sIndexName, sError.cstr() );

    // enable sentence indexing on tokenizer
    // (not in Create() because search time tokenizer does not care)
    bool bIndexSP = ( hIndex.GetInt ( "index_sp" )!=0 );
    if ( bIndexSP )
        if ( !pTokenizer->EnableSentenceIndexing ( sError ) )
            sphDie ( "index '%s': %s", sIndexName, sError.cstr() );

    if ( hIndex("index_zones") )
        if ( !pTokenizer->EnableZoneIndexing ( sError ) )
            sphDie ( "index '%s': %s", sIndexName, sError.cstr() );
  • 首先读取配置文件中的设置到tTokSettings变量中,然后通过来ISphTokenizer::Create接口来创建对应的分词器对象。这一步一般是为后面读入需要索引的词语做分词使用,比如输入一个“中国人民”,sphinx如果配置一元分词会分成“中,国,人,民”等这第四个词,然后分别对这四个词做处理,当然如果有其它多元分词就需要配置对应词库文件。后面index_sp选项代表是否检测并索引句子和段落边界,如果配置会进一步设置对应的分词器对象。对于分词不是我们的重点研究对象,我们目前业务也只需要做一元分词即可,因此先对分词有个初步了解后面需要在深入研究。

解析配置文件中数据源

// parse all sources
    CSphVector<CSphSource*> dSources;
    bool bGotAttrs = false;
    bool bSpawnFailed = false;

    for ( CSphVariant * pSourceName = hIndex("source"); pSourceName; pSourceName = pSourceName->m_pNext )
    {
        if ( !hSources ( pSourceName->cstr() ) )
        {
            fprintf ( stdout, "ERROR: index '%s': source '%s' not found.\n", sIndexName, pSourceName->cstr() );
            continue;
        }
        const CSphConfigSection & hSource = hSources [ pSourceName->cstr() ];

        CSphSource * pSource = SpawnSource ( hSource, pSourceName->cstr(), tSettings.m_eChineseRLP==SPH_RLP_BATCHED );
        if ( !pSource )
        {
            bSpawnFailed = true;
            continue;
        }

        if ( pSource->HasAttrsConfigured() )
            bGotAttrs = true;

        if ( bHtmlStrip )
        {
            if ( !pSource->SetStripHTML ( sHtmlIndexAttrs.cstr(), sHtmlRemoveElements.cstr(), bIndexSP, hIndex.GetStr("index_zones"), sError ) )
            {
                fprintf ( stdout, "ERROR: source '%s': %s.\n", pSourceName->cstr(), sError.cstr() );
                return false;
            }
        }

        pSource->SetTokenizer ( pTokenizer );
        pSource->SetFieldFilter ( pFieldFilter );
        pSource->SetDumpRows ( fpDumpRows );
        dSources.Add ( pSource );
    }
  • CSphSource是一个数据源的基类,sphinx目前支持mysql、odbc、xmlpipe2、tsvpipe、csvpipe各种类型的数据源。目前主流使用的都是mysql的数据源。在sphinx.conf配置文件中可以配置多个数据源,此步逻辑就是根据配置文件中“source”段读取每一个配置源,并且根据源类型创建对应处理各种源的类对象。创建成功后会把之前已经创建好的分词器pTokenizer设置进去供后面读取数据后分词使用。最后把所有创建好的数据源对象加到dSources数组中提供后面代码使用。

创建索引解析器

// if searchd is running, we want to reindex to .tmp files
        CSphString sIndexPath;
        sIndexPath.SetSprintf ( g_bRotate ? "%s.tmp" : "%s", hIndex["path"].cstr() );

        // do index
        CSphIndex * pIndex = sphCreateIndexPhrase ( sIndexName, sIndexPath.cstr() );
        assert ( pIndex );

        // check lock file
        if ( !pIndex->Lock() )
        {
            fprintf ( stdout, "FATAL: %s, will not index. Try --rotate option.\n", pIndex->GetLastError().cstr() );
            exit ( 1 );
        }

        pIndex->SetFieldFilter ( pFieldFilter );
        pIndex->SetTokenizer ( pTokenizer );
        pIndex->SetDictionary ( pDict );
        if ( g_bKeepAttrs )
        {
            if ( g_sKeepAttrsPath.IsEmpty() )
                pIndex->SetKeepAttrs ( hIndex["path"].strval(), g_dKeepAttrs );
            else
                pIndex->SetKeepAttrs ( g_sKeepAttrsPath, g_dKeepAttrs );
        }
        pIndex->Setup ( tSettings );

        bOK = pIndex->Build ( dSources, g_iMemLimit, g_iWriteBuffer )!=0;
        if ( bOK && g_bRotate && g_bSendHUP )
        {
            sIndexPath.SetSprintf ( "%s.new", hIndex["path"].cstr() );
            bOK = pIndex->Rename ( sIndexPath.cstr() );
        }

        pIndex->Unlock ();
        SafeDelete ( pIndex );
  • 准备好分词器和数据源紧接着就是创建真正执行解析索引工作的索引解析器了,对于正在运行使用的searchd服务程序,会创建一个临时路径中间文件来操作。sphCreateIndexPhrase函数是创建一个实际处理业务的CSphIndex索引解析器对象,在sphinx中对应实际处理的类是CSphIndex_VLN类对象,该类从CSphIndex继承实现真正处理业务逻辑的代码。
  • 创建好CSphIndex对象后,设置对应的 pFieldFilter(设置通过正则方式映射一些查询),pTokenizer(分词规则),pDict(根据配置文件配置中的规则分词)后,就开始执行构建索引程序,对应的函数就是Build函数,实际执行的是CSphIndex_VLN::Build()函数。

连接数据源

int CSphIndex_VLN::Build ( const CSphVector<CSphSource*> & dSources, int iMemoryLimit, int iWriteBuffer )
{
    // setup sources
    ARRAY_FOREACH ( iSource, dSources )
    {
        CSphSource * pSource = dSources[iSource];
        assert ( pSource );

        pSource->SetDict ( m_pDict );
        pSource->Setup ( m_tSettings );
    }

    // connect 1st source and fetch its schema
    if ( !dSources[0]->Connect ( m_sLastError )
        || !dSources[0]->IterateStart ( m_sLastError )
        || !dSources[0]->UpdateSchema ( &m_tSchema, m_sLastError ) )
    {
        return 0;
    }
  • 进入Build函数后首先对数据源设置参数,然后开始连接第一个数据源即调用dSources[0]->Connect()函数,连接成功以后紧接着执行IterateStart()函数,此函数里面主要做一些初始化各项参数的基本工作,比如记录需要索引的字段,以及字段属性信息。UpdateSchema()函数是将刚刚pSource里面记录的索引相关字段信息更新到CSphIndex_VLN类里面的m_tSchema变量需要在后面使用。

计算创建索引需要内存大小

// adjust memory requirements
    int iOldLimit = iMemoryLimit;

    // book memory to store at least 64K attribute rows
    const int iDocinfoStride = DOCINFO_IDSIZE + m_tSchema.GetRowSize();
    int iDocinfoMax = Max ( iMemoryLimit/16/iDocinfoStride/sizeof(DWORD), 65536ul );
    if ( m_tSettings.m_eDocinfo==SPH_DOCINFO_NONE )
        iDocinfoMax = 1;

    // book at least 32 KB for field MVAs, if needed
    int iFieldMVAPoolSize = Max ( 32768, iMemoryLimit/16 );
    if ( bHaveFieldMVAs==0 )
        iFieldMVAPoolSize = 0;

    // book at least 2 MB for keywords dict, if needed
    int iDictSize = 0;
    if ( m_pDict->GetSettings().m_bWordDict )
        iDictSize = Max ( MIN_KEYWORDS_DICT, iMemoryLimit/8 );

    // do we have enough left for hits?
    int iHitsMax = 1048576;

    iMemoryLimit -= iDocinfoMax*iDocinfoStride*sizeof(DWORD) + iFieldMVAPoolSize + iDictSize;
    if ( iMemoryLimit < iHitsMax*(int)sizeof(CSphWordHit) )
    {
        iMemoryLimit = iOldLimit + iHitsMax*sizeof(CSphWordHit) - iMemoryLimit;
        sphWarn ( "collect_hits: mem_limit=%d kb too low, increasing to %d kb",
            iOldLimit/1024, iMemoryLimit/1024 );
    } else
    {
        iHitsMax = iMemoryLimit / sizeof(CSphWordHit);
    }

    // allocate raw hits block
    CSphFixedVector<CSphWordHit> dHits ( iHitsMax + MAX_SOURCE_HITS );
    CSphWordHit * pHits = dHits.Begin();
    CSphWordHit * pHitsMax = dHits.Begin() + iHitsMax;

    // after finishing with hits this pool will be used to sort strings
    int iPoolSize = dHits.GetSizeBytes();

    // allocate docinfos buffer
    CSphFixedVector<DWORD> dDocinfos ( iDocinfoMax*iDocinfoStride );
    DWORD * pDocinfo = dDocinfos.Begin();
    const DWORD * pDocinfoMax = dDocinfos.Begin() + iDocinfoMax*iDocinfoStride;
    if ( m_tSettings.m_eDocinfo==SPH_DOCINFO_NONE )
    {
        pDocinfo = NULL;
        pDocinfoMax = NULL;
    }
  • 此段代码主要计算创建索引所需要的最小内存信息,在sphinx.conf配置文件中我们一般会指定使用内存的大小,此段计算主要检查配置的内存大小是否满足实际所需要的内存,如果不满足会给出告警信息。其中iFieldMVAPoolSize和iDictSize并不是必须的,iFieldMVAPoolSize俗称多值属性,它是文档属性的一种重要的特例,主要作用是向文档附加一些列的值作为属性,例如文档的标签,产品类别信息等。iDictSize是为配置文件中指定的分词列表申请空间的,如果配置文件没有指定则不用申请。

创建索引需要临时文件

// create temp files
    CSphAutofile fdLock ( GetIndexFileName("tmp0"), SPH_O_NEW, m_sLastError, true );
    CSphAutofile fdHits ( GetIndexFileName ( m_bInplaceSettings ? "spp" : "tmp1" ), SPH_O_NEW, m_sLastError, !m_bInplaceSettings );
    CSphAutofile fdDocinfos ( GetIndexFileName ( m_bInplaceSettings ? "spa" : "tmp2" ), SPH_O_NEW, m_sLastError, !m_bInplaceSettings );
    CSphAutofile fdTmpFieldMVAs ( GetIndexFileName("tmp7"), SPH_O_NEW, m_sLastError, true );
    CSphWriter tStrWriter;
    CSphWriter tStrFinalWriter;

    if ( !tStrWriter.OpenFile ( GetIndexFileName("tmps"), m_sLastError ) )
        return 0;
    tStrWriter.PutByte ( 0 ); // dummy byte, to reserve magic zero offset

    if ( !tStrFinalWriter.OpenFile ( GetIndexFileName("sps"), m_sLastError ) )
        return 0;
    tStrFinalWriter.PutByte ( 0 ); // dummy byte, to reserve magic zero offset
  • 创建程序过程中使用的临文件,主要有一下几个文件:
    • tmp0 存储的是上锁的Index文件,主要是考虑有些Index正在查询使用因此需要上锁。
    • tmp1 存储词汇的位置信息,包含该词所在的文档ID,在词典映射的ID,以及该词在文档中的位置信息。
    • tmo2 存储文档ID及文档本身信息的内容。
    • tmp7 存储文档对应多值查询的信息。
    • tmps 存储创建索引过程中临时用的缓冲区(待定)。
    • sps 存储字符串属性的信息。

循环读取数据源

// fetch documents
        for ( ;; )
        {
            // get next doc, and handle errors
            bool bGotDoc = pSource->IterateDocument ( m_sLastError );
            if ( !bGotDoc )
                return 0;

            // ensure docid is sane
            if ( pSource->m_tDocInfo.m_uDocID==DOCID_MAX )
            {
                m_sLastError.SetSprintf ( "docid==DOCID_MAX (source broken?)" );
                return 0;
            }

            // check for eof
            if ( !pSource->m_tDocInfo.m_uDocID )
                break;

            const DWORD * pPrevDocinfo = NULL;
            if ( m_tSettings.m_eDocinfo==SPH_DOCINFO_EXTERN && pPrevIndex.Ptr() )
                pPrevDocinfo = pPrevIndex->FindDocinfo ( pSource->m_tDocInfo.m_uDocID );
        }
  • 准备好以上流程就开始循环遍历读取数据了,以上是开始读取数据的关键代码,最重要的就是
bool CSphSource_Document::IterateDocument ( CSphString & sError )
{
  // fetch next document
  for ( ;; )
  {
      m_tState.m_dFields = NextDocument ( sError );
      if ( m_tDocInfo.m_uDocID==0 )
          return true;

      const int * pFieldLengths = GetFieldLengths ();
      for ( int iField=0; iField<m_tState.m_iEndField; iField++ )
          m_tState.m_dFieldLengths[iField] = pFieldLengths[iField];

      // moved that here as docid==0 means eof for regular query
      // but joined might produce doc with docid==0 and breaks delta packing
      if ( HasJoinedFields() )
          m_dAllIds.Add ( m_tDocInfo.m_uDocID );

      if ( !m_tState.m_dFields )
          return false;
      // we're good
      break;
  }

  m_tStats.m_iTotalDocuments++;
  return true;
}
  • 可以看到IterateDocument里面重点调用了NextDocument()函数,该函数是在父类是一个虚函数,实际上执行的是子类实现的CSphSource_SQL::NextDocument()函数,该函数调用成功会返回一个字段的指针地址,该函数的内部关键逻辑如下:
BYTE ** CSphSource_SQL::NextDocument ( CSphString & sError )
{
  assert ( m_bSqlConnected );

  // get next non-zero-id row
  do
  {
      // try to get next row
      bool bGotRow = SqlFetchRow ();

      // get him!
      m_tDocInfo.m_uDocID = VerifyID ( sphToDocid ( SqlColumn(0) ) );
      m_uMaxFetchedID = Max ( m_uMaxFetchedID, m_tDocInfo.m_uDocID );
  } while ( !m_tDocInfo.m_uDocID );

  // cleanup attrs
  for ( int i=0; i<m_tSchema.GetRowSize(); i++ )
      m_tDocInfo.m_pDynamic[i] = 0;

  // split columns into fields and attrs
  for ( int i=0; i<m_iPlainFieldsLength; i++ )
  {
      // get that field
      #if USE_ZLIB
      if ( m_dUnpack[i]!=SPH_UNPACK_NONE )
      {
          DWORD uUnpackedLen = 0;
          m_dFields[i] = (BYTE*) SqlUnpackColumn ( i, uUnpackedLen, m_dUnpack[i] );
          m_dFieldLengths[i] = (int)uUnpackedLen;
          continue;
      }
      #endif
      m_dFields[i] = (BYTE*) SqlColumn ( m_tSchema.m_dFields[i].m_iIndex );
      m_dFieldLengths[i] = SqlColumnLength ( m_tSchema.m_dFields[i].m_iIndex );
  }

  for ( int i=0; i<m_tSchema.GetAttrsCount(); i++ )
  {
      const CSphColumnInfo & tAttr = m_tSchema.GetAttr(i); // shortcut

      if ( tAttr.m_eAttrType==SPH_ATTR_UINT32SET || tAttr.m_eAttrType==SPH_ATTR_INT64SET )
      {
          int uOff = 0;
          if ( tAttr.m_eSrc==SPH_ATTRSRC_FIELD )
          {
              uOff = ParseFieldMVA ( m_dMva, SqlColumn ( tAttr.m_iIndex ), tAttr.m_eAttrType==SPH_ATTR_INT64SET );
          }
          m_tDocInfo.SetAttr ( tAttr.m_tLocator, uOff );
          continue;
      }

      switch ( tAttr.m_eAttrType )
      {
          case SPH_ATTR_STRING:
          case SPH_ATTR_JSON:
              // memorize string, fixup NULLs
              m_dStrAttrs[i] = SqlColumn ( tAttr.m_iIndex );
              if ( !m_dStrAttrs[i].cstr() )
                  m_dStrAttrs[i] = "";

              m_tDocInfo.SetAttr ( tAttr.m_tLocator, 0 );
              break;

          case SPH_ATTR_FLOAT:
              m_tDocInfo.SetAttrFloat ( tAttr.m_tLocator, sphToFloat ( SqlColumn ( tAttr.m_iIndex ) ) ); // FIXME? report conversion errors maybe?
              break;

          case SPH_ATTR_BIGINT:
              m_tDocInfo.SetAttr ( tAttr.m_tLocator, sphToInt64 ( SqlColumn ( tAttr.m_iIndex ) ) ); // FIXME? report conversion errors maybe?
              break;

          case SPH_ATTR_TOKENCOUNT:
              // reset, and the value will be filled by IterateHits()
              m_tDocInfo.SetAttr ( tAttr.m_tLocator, 0 );
              break;

          default:
              // just store as uint by default
              m_tDocInfo.SetAttr ( tAttr.m_tLocator, sphToDword ( SqlColumn ( tAttr.m_iIndex ) ) ); // FIXME? report conversion errors maybe?
              break;
      }
  }

  return m_dFields;
}
  • CSphSource_SQL::NextDocument()函数里面主要处理以下流程:
    • 首先通过SqlFetchRow ()函数调用mysql接口取出一条记录数据。
    • 遍历mysql返回的数据源将数据放在成员变量m_dFields数组中。
    • 判断是否有需要文档属性字段内容需要保存,如果有保存到成员遍历m_tDocInfo中。

开始分词构建索引

// store hits
            while ( const ISphHits * pDocHits = pSource->IterateHits ( m_sLastWarning ) )
            {
                int iDocHits = pDocHits->Length();
                #if PARANOID
                for ( int i=0; i<iDocHits; i++ )
                {
                    assert ( pDocHits->m_dData[i].m_uDocID==pSource->m_tDocInfo.m_uDocID );
                    assert ( pDocHits->m_dData[i].m_uWordID );
                    assert ( pDocHits->m_dData[i].m_iWordPos );
                }
                #endif
                assert ( ( pHits+iDocHits )<=( pHitsMax+MAX_SOURCE_HITS ) );
                memcpy ( pHits, pDocHits->First(), iDocHits*sizeof(CSphWordHit) );
                pHits += iDocHits;

                // sort hits
                int iHits = pHits - dHits.Begin();
                {
                    sphSort ( dHits.Begin(), iHits, CmpHit_fn() );
                    m_pDict->HitblockPatch ( dHits.Begin(), iHits );
                }
                pHits = dHits.Begin();

                {
                    // we're not inlining, so only flush hits, docs are flushed independently
                    dHitBlocks.Add ( tHitBuilder.cidxWriteRawVLB ( fdHits.GetFD(), dHits.Begin(), iHits,
                        NULL, 0, 0 ) );
                }
                m_pDict->HitblockReset ();

                if ( dHitBlocks.Last()<0 )
                    return 0;

            }
/// hit info
struct CSphWordHit
{
    SphDocID_t        m_uDocID;        ///< document ID
    SphWordID_t        m_uWordID;        ///< word ID in current dictionary
    Hitpos_t        m_uWordPos;        ///< word position in current document
};
class ISphHits
{
public:
    CSphVector<CSphWordHit> m_dData;
};
  • 取出一行记录以后就开始对数据进行分词操作了,这里我们重点关注一下ISphHits结构体,ISphHits里面最重要的一个成员变量是CSphWordHit类型的数组m_dData,CSphWordHit里面有三个成员变量,解析如下:
    • m_uDocID 保存分词以后该词对应的文档ID,索引的时候就是根据这个变量找到对应的文档信息。
    • m_uWordID 该词本身在词典中的位置信息,对单词的hash值,主要用于标志唯一的一个词信息。
    • m_uWordPos 该词在对应文档中的位置信息,一般索引时使用。
    • m_dData 保存一个数据源分词后所有的CSphWordHit信息的数组。
  • 具体分词逻辑是在IterateHits函数里面做的,因为本身分词逻辑比较复杂,并且分词不是我们关注的重点,因此不做深入分析研究。简单介绍IterateHits函数流程如下:
    • 准备分词器对象,根据配置参数设置不同的分词类型,并且把源数据传进给分词器对象。
    • 分词器对象根据一定的规则对传入的源数据进行分词处理,记录分词对应的文档id,词id,在文档中的位置信息。
    • 把分好的词加入到内部定义的ISphHits数组,最后通过函数返回值返回给上层使用。
  • 上层函数获取到分词后CSphWordHit的数组信息后,即对应的pDocHits变量后,首先把新获取的分词信息加入到自己已经存在pHits数组中,然后进行一遍 排序 处理,最后在做一些修复操作比如检测是否存在词id重复之类的问题。至此关键的索引数据结构已经建立好了,后面还剩下的就是一些数据更新和写到磁盘文件上的操作等等。

存储hits信息到tmp1文件

if ( m_tSettings.m_eDocinfo==SPH_DOCINFO_INLINE )
    {
        // we're inlining, so let's flush both hits and docs
        int iDocs = ( pDocinfo - dDocinfos.Begin() ) / iDocinfoStride;
        pDocinfo = dDocinfos.Begin();

        sphSortDocinfos ( dDocinfos.Begin(), iDocs, iDocinfoStride );

        dHitBlocks.Add ( tHitBuilder.cidxWriteRawVLB ( fdHits.GetFD(), dHits.Begin(), iHits,
            dDocinfos.Begin(), iDocs, iDocinfoStride ) );

        // we are inlining, so if there are more hits in this document,
        // we'll need to know it's info next flush
        if ( iDocHits )
        {
            DOCINFOSETID ( pDocinfo, pSource->m_tDocInfo.m_uDocID );
            memcpy ( DOCINFO2ATTRS ( pDocinfo ), pSource->m_tDocInfo.m_pDynamic, sizeof(CSphRowitem)*m_tSchema.GetRowSize() );
            pDocinfo += iDocinfoStride;
        }
    } else
    {
        // we're not inlining, so only flush hits, docs are flushed independently
        dHitBlocks.Add ( tHitBuilder.cidxWriteRawVLB ( fdHits.GetFD(), dHits.Begin(), iHits,
            NULL, 0, 0 ) );
    }
  • 经过分词提取出单个词以后,然后就开始存储这些信息到文件了。首先判断配置文件文档类型是否是SPH_DOCINFO_INLINE模式,如果是SPH_DOCINFO_INLINE模式则文档信息和索引信息放在一个文件存储,否则先存储索引文件,后面再单独存储文档信息,对应的存储函数是cidxWriteRawVLB()函数,该函数内部主要是对hits数据进行排序编码存储,文件格式如下:

item

itme

itme

m_uWordID1

m_uDocID1

m_uWordPos1,m_uWordPos2,m_uWordPos3

m_uDocID2

m_uWordPos1,m_uWordPos2,m_uWordPos3

m_uDocID3

m_uWordPos1,m_uWordPos2,m_uWordPos3

m_uWordID2

m_uDocID1

m_uWordPos1,m_uWordPos2,m_uWordPos3

m_uDocID2

m_uWordPos1,m_uWordPos2,m_uWordPos3

m_uDocID3

m_uWordPos1,m_uWordPos2,m_uWordPos3

……

……

……

  • 主要有一下几个特点:
    • 在文件内按块存储,快内按照递增排序。
    • 首先按照m_uWordID排序,然后按照m_uDocID排序,最后按照m_uWordPos排序。
    • 实际保存的时候,后面Item保存的是前一个的差分值,主要用于节省空间。

存储docinfo信息到tmp2文件

// store docinfo
    // with the advent of SPH_ATTR_TOKENCOUNT, now MUST be done AFTER iterating the hits
    // because field lengths are computed during that iterating
    if ( m_tSettings.m_eDocinfo==SPH_DOCINFO_EXTERN )
    {
        // store next entry
        DOCINFOSETID ( pDocinfo, pSource->m_tDocInfo.m_uDocID );

        CSphRowitem * pAttr = DOCINFO2ATTRS ( pDocinfo );
        if ( !pPrevDocinfo )
        {
            memcpy ( pAttr, pSource->m_tDocInfo.m_pDynamic, sizeof(CSphRowitem)*m_tSchema.GetRowSize() );
        } else
        {
            if ( !m_dKeepAttrs.GetLength() )
            {
                // copy whole row from old index
                memcpy ( pAttr, DOCINFO2ATTRS ( pPrevDocinfo ), sizeof(CSphRowitem)*m_tSchema.GetRowSize() );

                // copy some strings attributes
                // 2nd stage - copy offsets from source, data already copied at string indexing
                if ( dStringAttrs.GetLength() )
                    CopyRow ( pSource->m_tDocInfo.m_pDynamic, m_tSchema, dStringAttrs, pAttr );

            } else
            {
                // copy new attributes, however keep some of them from old index
                memcpy ( pAttr, pSource->m_tDocInfo.m_pDynamic, sizeof(CSphRowitem)*m_tSchema.GetRowSize() );

                // copy some plain attributes
                if ( dPrevAttrsPlain.GetLength() )
                    CopyRow ( DOCINFO2ATTRS ( pPrevDocinfo ), m_tSchema, dPrevAttrsPlain, pAttr );

                // copy some strings attributes
                // 2nd stage - copy offsets from source, data already copied at string indexing
                if ( dStringAttrs.GetLength() )
                    CopyRow ( pSource->m_tDocInfo.m_pDynamic, m_tSchema, dStringAttrs, pAttr );
            }
        }

        pDocinfo += iDocinfoStride;

        // if not inlining, flush buffer if it's full
        // (if inlining, it will flushed later, along with the hits)
        if ( pDocinfo>=pDocinfoMax )
        {
            assert ( pDocinfo==pDocinfoMax );
            int iLen = iDocinfoMax*iDocinfoStride*sizeof(DWORD);

            sphSortDocinfos ( dDocinfos.Begin(), iDocinfoMax, iDocinfoStride );
            if ( !sphWriteThrottled ( fdDocinfos.GetFD(), dDocinfos.Begin(), iLen, "raw_docinfos", m_sLastError, &g_tThrottle ) )
                return 0;

            pDocinfo = dDocinfos.Begin();
            iDocinfoBlocks++;
        }
    }
  • 存储完成hits信息后,紧接着就开始存储docinfo信息,从代码可以看到docinfo信息主要保存两个重要信息一个m_uDocID信息,另一个是文档属性信息,存储格式简要如下:

item

itme

itme

m_uDocID1

attr0

attr1

m_uDocID2

attr0

attr1

……

  • 这个只是最重要的字段信息,还有一些可选信息比如mva值方面的信息,因为对mva多值不做重点研究因此没有深入研究代码,存储文件也是按照DocID排序处理。

创建最终索引文件

// vars shared between phases
    CSphVector<CSphBin*> dBins;
    SphOffset_t iSharedOffset = -1;

    int iBinSize = CSphBin::CalcBinSize ( int ( iMemoryLimit * fReadFactor ),
        dHitBlocks.GetLength() + m_pDict->GetSettings().m_bWordDict, "sort_hits" );

    CSphFixedVector <BYTE> dRelocationBuffer ( iRelocationSize );
    iSharedOffset = -1;

    ARRAY_FOREACH ( i, dHitBlocks )
    {
        dBins.Add ( new CSphBin ( m_tSettings.m_eHitless, m_pDict->GetSettings().m_bWordDict ) );
        dBins[i]->m_iFileLeft = dHitBlocks[i];
        dBins[i]->m_iFilePos = ( i==0 ) ? iHitsGap : dBins[i-1]->m_iFilePos + dBins[i-1]->m_iFileLeft;
        dBins[i]->Init ( fdHits.GetFD(), &iSharedOffset, iBinSize );
    }
  • 首先使用一个CSphBin*数组dBins来保存前面所有写入的临时文件信息,每个临时文件按照块的方式来加入数组,主要记录了文件块的大小iBinSize,文件块开始的偏移指针m_iFilePos,以及当前块唯读取数据的指针位置m_iFileLeft,文件句柄对象等等信息,用于后面合并写入最终的文件。
//////////////////////////////
// create new index files set
//////////////////////////////

tHitBuilder.CreateIndexFiles ( GetIndexFileName("spd").cstr(), GetIndexFileName("spp").cstr(),
    GetIndexFileName("spe").cstr(), m_bInplaceSettings, iWriteBuffer, fdHits, &iSharedOffset );

// dict files
CSphAutofile fdTmpDict ( GetIndexFileName("tmp8"), SPH_O_NEW, m_sLastError, true );
CSphAutofile fdDict ( GetIndexFileName("spi"), SPH_O_NEW, m_sLastError, false );

bool CSphHitBuilder::CreateIndexFiles ( const char * sDocName, const char * sHitName, const char * sSkipName,
    bool bInplace, int iWriteBuffer, CSphAutofile & tHit, SphOffset_t * pSharedOffset )
{
    if ( !m_wrDoclist.OpenFile ( sDocName, *m_pLastError ) )
        return false;

    if ( bInplace )
    {
        sphSeek ( tHit.GetFD(), 0, SEEK_SET );
        m_wrHitlist.SetFile ( tHit, pSharedOffset, *m_pLastError );
    } else
    {
        if ( !m_wrHitlist.OpenFile ( sHitName, *m_pLastError ) )
            return false;
    }

    if ( !m_wrSkiplist.OpenFile ( sSkipName, *m_pLastError ) )
        return false;

    return true;
}
  • 创建最终保存索引的spd,spp,spe等文件句柄信息,在代码中对应的是tHitBuilder对象里面三个成员变量m_wrDoclist,m_wrHitlist,m_wrSkiplis文件句柄,后续读取对应数据都是通过这些文件对象写入到最终磁盘索引文件中。
  • 创建tmp8临时文件,这个文件是在生成spi文件中使用的。然后创建spi文件句柄fdDict,spi文件存储的是词列表信息即词id和指向spd文件的偏移信息,通过spi文件可以定位一个词对应的文档id列表。
CSphHitQueue tQueue ( iRawBlocks );
    CSphAggregateHit tHit;

    // initial fill
    int iRowitems = ( m_tSettings.m_eDocinfo==SPH_DOCINFO_INLINE ) ? m_tSchema.GetRowSize() : 0;
    CSphFixedVector<CSphRowitem> dInlineAttrs ( iRawBlocks*iRowitems );

    CSphFixedVector<BYTE> dActive ( iRawBlocks );
    for ( int i=0; i<iRawBlocks; i++ )
    {
        if ( !dBins[i]->ReadHit ( &tHit, iRowitems, dInlineAttrs.Begin() + i * iRowitems ) )
        {
            m_sLastError.SetSprintf ( "sort_hits: warmup failed (io error?)" );
            return 0;
        }
        dActive[i] = ( tHit.m_uWordID!=0 );
        if ( dActive[i] )
            tQueue.Push ( tHit, i );
    }
  • 重新读取刚刚写入临时文件的hit信息到内存,这里看代码只是读取每个文件块的第一个hit信息放入到tQueue队列中,后面的代码会根据第一个hit记录的信息继续循环读取信息放入tQueue队列进行处理。
// while the queue has data for us
    // FIXME! analyze binsRead return code
    int iHitsSorted = 0;
    iMinBlock = -1;
    while ( tQueue.m_iUsed )
    {
        int iBin = tQueue.m_pData->m_iBin;

        // pack and emit queue root
        tQueue.m_pData->m_uDocID -= m_uMinDocid;

        tHitBuilder.cidxHit ( tQueue.m_pData, iRowitems ? dInlineAttrs.Begin() + iBin * iRowitems : NULL );
        if ( tHitBuilder.IsError() )
            return 0;

        // pop queue root and push next hit from popped bin
        tQueue.Pop ();
        if ( dActive[iBin] )
        {
            dBins[iBin]->ReadHit ( &tHit, iRowitems, dInlineAttrs.Begin() + iBin * iRowitems );
            dActive[iBin] = ( tHit.m_uWordID!=0 );
            if ( dActive[iBin] )
                tQueue.Push ( tHit, iBin );
        }

    }
  • 此部分代码就开始对刚刚放入的tQueue队列中的数据进行弹出处理了,可以看到在处理的过程中,紧接着又会从刚临时缓存文件读取数据插入,直到所有的数据都读取处理完毕。真正创建索引的函数就是tHitBuilder.cidxHit()函数。
void CSphHitBuilder::cidxHit ( CSphAggregateHit * pHit, const CSphRowitem * pAttrs )
/////////////
    // next word
    /////////////

    bool bNextWord = ( m_tLastHit.m_uWordID!=pHit->m_uWordID ||
        ( m_pDict->GetSettings().m_bWordDict && strcmp ( (char*)m_tLastHit.m_sKeyword, (char*)pHit->m_sKeyword ) ) ); // OPTIMIZE?
    bool bNextDoc = bNextWord || ( m_tLastHit.m_uDocID!=pHit->m_uDocID );
  • 首先判断处理的词信息是延续上一个继续处理的词信息还是新的下一个要处理的词信息,主要是根据CSphHitBuilder对象里面保存的上一个处理的m_tLastHit变量,很据WordID和m_uDocID,m_sKeyword来确定是否是下一个要处理的词。
if ( bNextDoc )
    {
        // finish hitlist, if any
        Hitpos_t uLastPos = m_tLastHit.m_iWordPos;
        if ( m_tLastHit.m_iWordPos!=EMPTY_HIT )
        {
            m_wrHitlist.ZipInt ( 0 );
            m_tLastHit.m_iWordPos = EMPTY_HIT;
            m_iPrevHitPos = EMPTY_HIT;
        }

        // finish doclist entry, if any
        if ( m_tLastHit.m_uDocID )
            DoclistEndEntry ( uLastPos );
    }

    void CSphHitBuilder::DoclistEndEntry ( Hitpos_t uLastPos )
    {
        // end doclist entry
        {
            assert ( m_eHitFormat==SPH_HIT_FORMAT_PLAIN );
            m_wrDoclist.ZipOffset ( m_iLastHitlistDelta );
            m_wrDoclist.ZipInt ( m_dLastDocFields.GetMask32() );
            m_wrDoclist.ZipInt ( m_uLastDocHits );
        }
        m_dLastDocFields.UnsetAll();
        m_uLastDocHits = 0;

        // update keyword stats
        m_tWord.m_iDocs++;
    }
  • 如果是处理新的文档ID,检查上一个hit信息m_iWordPos是否有值,如果有说明上一个hit信息处理结束,则首先在spp文件句柄m_wrHitlist当前位置写入0,0代表是一个词位置信息的结束或间隔。
  • 检查上一个文档id是否存在,如果存在开始在spd文件句柄m_wrDoclist写入上一个hit信息在spp的偏移值,上个文档字段信息,上一个文档分词的数量信息。
if ( bNextWord )
  {
      // finish doclist, if any
      if ( m_tLastHit.m_uDocID )
      {
          // emit end-of-doclist marker
          DoclistEndList ();

          // emit dict entry
          m_tWord.m_uWordID = m_tLastHit.m_uWordID;
          m_tWord.m_sKeyword = m_tLastHit.m_sKeyword;
          m_tWord.m_iDoclistLength = m_wrDoclist.GetPos() - m_tWord.m_iDoclistOffset;
          m_pDict->DictEntry ( m_tWord );

          // reset trackers
          m_tWord.m_iDocs = 0;
          m_tWord.m_iHits = 0;

          m_tLastHit.m_uDocID = 0;
          m_iLastHitlistPos = 0;
      }

      // flush wordlist, if this is the end
      if ( pHit->m_iWordPos==EMPTY_HIT )
      {
          m_pDict->DictEndEntries ( m_wrDoclist.GetPos() );
          return;
      }

  }
  • 判断如果是开始下一个分词信息,则首先将上一个分词的信息调用m_pDict->DictEntry写入到m_pDict字典对象中。
void CSphDictKeywords::DictEntry ( const CSphDictEntry & tEntry )
{
    DictKeyword_t * pWord = NULL;
    int iLen = strlen ( (char*)tEntry.m_sKeyword ) + 1;
    for ( ;; )
    {
        // alloc dict entry
        if ( !m_iDictChunkFree )
        {
            if ( m_iDictLimit && ( m_iMemUse + (int)sizeof(DictKeyword_t)*DICT_CHUNK )>m_iDictLimit )
                DictFlush ();

            m_pDictChunk = new DictKeyword_t [ DICT_CHUNK ];
            m_iDictChunkFree = DICT_CHUNK;
            m_dDictChunks.Add ( m_pDictChunk );
            m_iMemUse += sizeof(DictKeyword_t)*DICT_CHUNK;
        }

        // alloc keyword
        if ( m_iKeywordChunkFree < iLen )
        {
            if ( m_iDictLimit && ( m_iMemUse + KEYWORD_CHUNK )>m_iDictLimit )
            {
                DictFlush ();
                continue; // because we just flushed pWord
            }

            m_pKeywordChunk = new BYTE [ KEYWORD_CHUNK ];
            m_iKeywordChunkFree = KEYWORD_CHUNK;
            m_dKeywordChunks.Add ( m_pKeywordChunk );
            m_iMemUse += KEYWORD_CHUNK;
        }
        // aw kay
        break;
    }

    pWord = m_pDictChunk++;
    m_iDictChunkFree--;
    pWord->m_sKeyword = (char*)m_pKeywordChunk;
    memcpy ( m_pKeywordChunk, tEntry.m_sKeyword, iLen );
    m_pKeywordChunk[iLen-1] = '\0';
    m_pKeywordChunk += iLen;
    m_iKeywordChunkFree -= iLen;

    pWord->m_uOff = tEntry.m_iDoclistOffset;
    pWord->m_iDocs = tEntry.m_iDocs;
    pWord->m_iHits = tEntry.m_iHits;
    pWord->m_uHint = sphDoclistHintPack ( tEntry.m_iDocs, tEntry.m_iDoclistLength );
    pWord->m_iSkiplistPos = 0;
    if ( tEntry.m_iDocs > SPH_SKIPLIST_BLOCK )
        pWord->m_iSkiplistPos = (int)( tEntry.m_iSkiplistOffset );
}

    // write em
    DictBlock_t & tBlock = m_dDictBlocks.Add();
    tBlock.m_iPos = m_wrTmpDict.GetPos ();

    ARRAY_FOREACH ( i, dWords )
    {
        const DictKeyword_t * pWord = dWords[i];
        int iLen = strlen ( pWord->m_sKeyword );
        m_wrTmpDict.PutByte ( iLen );
        m_wrTmpDict.PutBytes ( pWord->m_sKeyword, iLen );
        m_wrTmpDict.ZipOffset ( pWord->m_uOff );
        m_wrTmpDict.ZipInt ( pWord->m_iDocs );
        m_wrTmpDict.ZipInt ( pWord->m_iHits );
        m_wrTmpDict.PutByte ( pWord->m_uHint );
        assert ( ( pWord->m_iDocs > SPH_SKIPLIST_BLOCK )==( pWord->m_iSkiplistPos!=0 ) );
        if ( pWord->m_iDocs > SPH_SKIPLIST_BLOCK )
            m_wrTmpDict.ZipInt ( pWord->m_iSkiplistPos );
    }

    tBlock.m_iLen = (int)( m_wrTmpDict.GetPos() - tBlock.m_iPos );
  • 在CSphDictKeywords::DictEntry函数中首先在内存中开辟一块空间用于存储DictKeyword_t结构信息,当然这块内存的大小是有限的,当词的信息超过内存的大小以后会将前面存储的信息通过一定的排序规则更新到磁盘文件,对应的文件名称是刚刚创建的tmp8临时文件,对应的处理函数是DictFlush函数。
  • 写入m_pDict缓存中主要有以下几个字段:
    • m_sKeyword 对应分词信息。
    • m_uOff 该词对应spd文件偏移的差值,通过该偏移可以找到该词可匹配文档ID的列表。
    • m_iDocs 对应文档的计数值。
    • m_iHits 对应hit信息的计数值。
    • m_uHint 当对应的hit计数值超过一定阀值,根据一定规则记录这个字段信息。
    • m_iSkiplistPos 当对应的文档计数值超过一定阀值,根据一定规则记录这个字段信息。
  • 贴一下刷新到tmp8临时文件部分关键代码,注意在刷新到磁盘信息同时,程序通过一个磁盘块数组记录每一个块对应磁盘位置的指针以及该块的大小信息,这两个信息很关键后面需要用到。
void CSphHitBuilder::DoclistEndList ()
{
    if ( m_tWord.m_iDocs>SPH_SKIPLIST_BLOCK )
    {

        m_tWord.m_iSkiplistOffset = m_wrSkiplist.GetPos();

        for ( int i=1; i<m_dSkiplist.GetLength(); i++ )
        {
            const SkiplistEntry_t & t = m_dSkiplist[i];
            assert ( t.m_iBaseDocid - tLast.m_iBaseDocid>=SPH_SKIPLIST_BLOCK );
            assert ( t.m_iOffset - tLast.m_iOffset>=4*SPH_SKIPLIST_BLOCK );
            m_wrSkiplist.ZipOffset ( t.m_iBaseDocid - tLast.m_iBaseDocid - SPH_SKIPLIST_BLOCK );
            m_wrSkiplist.ZipOffset ( t.m_iOffset - tLast.m_iOffset - 4*SPH_SKIPLIST_BLOCK );
            m_wrSkiplist.ZipOffset ( t.m_iBaseHitlistPos - tLast.m_iBaseHitlistPos );
            tLast = t;
        }
    }

    // in any event, reset skiplist
    m_dSkiplist.Resize ( 0 );
}
  • 将文档id、文档信息在spd文件中的偏移,hit信息的偏移写入spe文件,spe文件这个主要是用于后面的查询使用,从描述看将来是用这些数据构造一个排序的跳表用来快速查到spp文档信息使用。
//
    if ( bNextDoc )
    {
        // begin new doclist entry for new doc id
        assert ( pHit->m_uDocID>m_tLastHit.m_uDocID );
        assert ( m_wrHitlist.GetPos()>=m_iLastHitlistPos );

        DoclistBeginEntry ( pHit->m_uDocID, pAttrs );
        m_iLastHitlistDelta = m_wrHitlist.GetPos() - m_iLastHitlistPos;

        m_tLastHit.m_uDocID = pHit->m_uDocID;
        m_iLastHitlistPos = m_wrHitlist.GetPos();
    }

    void CSphHitBuilder::DoclistBeginEntry ( SphDocID_t uDocid, const DWORD * pAttrs )
    {
    // build skiplist
    // that is, save decoder state and doclist position per every 128 documents
    if ( ( m_tWord.m_iDocs & ( SPH_SKIPLIST_BLOCK-1 ) )==0 )
    {
        SkiplistEntry_t & tBlock = m_dSkiplist.Add();
        tBlock.m_iBaseDocid = m_tLastHit.m_uDocID;
        tBlock.m_iOffset = m_wrDoclist.GetPos();
        tBlock.m_iBaseHitlistPos = m_iLastHitlistPos;
    }

    // begin doclist entry
    m_wrDoclist.ZipOffset ( uDocid - m_tLastHit.m_uDocID );
    assert ( !pAttrs || m_dMinRow.GetLength() );
    if ( pAttrs )
    {
        ARRAY_FOREACH ( i, m_dMinRow )
            m_wrDoclist.ZipInt ( pAttrs[i] - m_dMinRow[i] );
    }
  • 如果是新的文档ID,则调用DoclistBeginEntry函数开始写文档信息到文件,可以看到是调用m_wrDoclist.ZipOffset()函数写入到m_wrDoclist对应的文件句柄中即spd索引文件。第一个字段写的是(uDocid-m_tLastHit.m_uDocID),即相对上一个Docid的差值,然后检查该文档是否有属性信息,如果有则按照字段个数顺序写入属性信息。
Hitpos_t iHitPosPure = HITMAN::GetPosWithField ( pHit->m_iWordPos );

    // add hit delta without field end marker
    // or postpone adding to hitlist till got another uniq hit
    if ( iHitPosPure==pHit->m_iWordPos )
    {
        m_wrHitlist.ZipInt ( pHit->m_iWordPos - m_tLastHit.m_iWordPos );
        m_tLastHit.m_iWordPos = pHit->m_iWordPos;
    } else
    {
        assert ( HITMAN::IsEnd ( pHit->m_iWordPos ) );
        m_bGotFieldEnd = true;
        m_iPrevHitPos = m_tLastHit.m_iWordPos;
        m_tLastHit.m_iWordPos = HITMAN::GetPosWithField ( pHit->m_iWordPos );
    }

    // update matched fields mask
    m_dLastDocFields.Set ( HITMAN::GetField ( pHit->m_iWordPos ) );

    m_uLastDocHits++;
    m_tWord.m_iHits++;
  • 开始在spp文件存储每个词在文档中的位置信息,从代码看存储的是差值,也就是这一个字段信息,当文档切换后会在后面插入一个0值代表结束间隔。
bool CSphHitBuilder::cidxDone ( int iMemLimit, int iMinInfixLen, int iMaxCodepointLen, DictHeader_t * pDictHeader )
{
    assert ( pDictHeader );

    if ( m_bGotFieldEnd )
    {
        HITMAN::SetEndMarker ( &m_tLastHit.m_iWordPos );
        m_wrHitlist.ZipInt ( m_tLastHit.m_iWordPos - m_iPrevHitPos );
        m_bGotFieldEnd = false;
    }

    // finalize dictionary
    // in dict=crc mode, just flushes wordlist checkpoints
    // in dict=keyword mode, also creates infix index, if needed

    if ( iMinInfixLen>0 && m_pDict->GetSettings().m_bWordDict )
        pDictHeader->m_iInfixCodepointBytes = iMaxCodepointLen;

    if ( !m_pDict->DictEnd ( pDictHeader, iMemLimit, *m_pLastError, m_pThrottle ) )
        return false;

    // close all data files
    m_wrDoclist.CloseFile ();
    m_wrHitlist.CloseFile ( true );
    return !IsError();
}
  • 当以上流程循环读取词信息写入完成以后,开始进入CSphHitBuilder::cidxDone做一些收尾工作了,这里最关键的地方是开始构建真正的spi文件信息,对应的处理函数是m_pDict->DictEnd。spi文件可以说是整个搜索关键文件信息,下面我们重点分析下spi文件的生成流程,以下代码在CSphDictKeywords::DictEnd函数中。
bool CSphDictKeywords::DictEnd ( DictHeader_t * pHeader, int iMemLimit, CSphString & sError, ThrottleState_t * pThrottle )
// initialize readers
    CSphVector<CSphBin*> dBins ( m_dDictBlocks.GetLength() );

    int iMaxBlock = 0;
    ARRAY_FOREACH ( i, m_dDictBlocks )
        iMaxBlock = Max ( iMaxBlock, m_dDictBlocks[i].m_iLen );

    iMemLimit = Max ( iMemLimit, iMaxBlock*m_dDictBlocks.GetLength() );
    int iBinSize = CSphBin::CalcBinSize ( iMemLimit, m_dDictBlocks.GetLength(), "sort_dict" );

    SphOffset_t iSharedOffset = -1;
    ARRAY_FOREACH ( i, m_dDictBlocks )
    {
        dBins[i] = new CSphBin();
        dBins[i]->m_iFileLeft = m_dDictBlocks[i].m_iLen;
        dBins[i]->m_iFilePos = m_dDictBlocks[i].m_iPos;
        dBins[i]->Init ( m_iTmpFD, &iSharedOffset, iBinSize );
        dBins[i]->SetThrottle ( pThrottle );
    }
  • 定义一个二进制文件块文件读取缓存数组,把刚刚记录的m_dDictBlocks磁盘块信息对应的设置每个dBins元素中。
// sort em
    int iTotalWords = m_dDictChunks.GetLength()*DICT_CHUNK - m_iDictChunkFree;
    CSphVector<DictKeyword_t*> dWords ( iTotalWords );

    int iIdx = 0;
    ARRAY_FOREACH ( i, m_dDictChunks )
    {
        int iWords = DICT_CHUNK;
        if ( i==m_dDictChunks.GetLength()-1 )
            iWords -= m_iDictChunkFree;

        DictKeyword_t * pWord = m_dDictChunks[i];
        for ( int j=0; j<iWords; j++ )
            dWords[iIdx++] = pWord++;
    }

    dWords.Sort ( DictKeywordCmp_fn() );
  • 因为每个块中的数据已经是排好序的,因此开始读取每个块的第一个数据到dWords数组中进行块与块之间的排序逻辑。
bool bHasMorphology = HasMorphology();
    CSphKeywordDeltaWriter tLastKeyword;
    int iWords = 0;
    while ( qWords.GetLength() )
    {
        const DictKeywordTagged_t & tWord = qWords.Root();
        const int iLen = strlen ( tWord.m_sKeyword ); // OPTIMIZE?

        // store checkpoints as needed
        if ( ( iWords % SPH_WORDLIST_CHECKPOINT )==0 )
        {
            // emit a checkpoint, unless we're at the very dict beginning
            if ( iWords )
            {
                m_wrDict.ZipInt ( 0 );
                m_wrDict.ZipInt ( 0 );
            }

            BYTE * sClone = new BYTE [ iLen+1 ]; // OPTIMIZE? pool these?
            memcpy ( sClone, tWord.m_sKeyword, iLen+1 );
            sClone[iLen] = '\0';

            CSphWordlistCheckpoint & tCheckpoint = m_dCheckpoints.Add ();
            tCheckpoint.m_sWord = (char*) sClone;
            tCheckpoint.m_iWordlistOffset = m_wrDict.GetPos();

            tLastKeyword.Reset();
        }
        iWords++;


        tLastKeyword.PutDelta ( m_wrDict, (const BYTE *)tWord.m_sKeyword, iLen );
        m_wrDict.ZipOffset ( tWord.m_uOff );
        m_wrDict.ZipInt ( tWord.m_iDocs );
        m_wrDict.ZipInt ( tWord.m_iHits );
        if ( tWord.m_uHint )
            m_wrDict.PutByte ( tWord.m_uHint );
        if ( tWord.m_iDocs > SPH_SKIPLIST_BLOCK )
            m_wrDict.ZipInt ( tWord.m_iSkiplistPos );

        // build infixes
        if ( pInfixer )
            pInfixer->AddWord ( (const BYTE*)tWord.m_sKeyword, iLen, m_dCheckpoints.GetLength(), bHasMorphology );

        // next
        int iBin = tWord.m_iBlock;
        qWords.Pop ();

        if ( !dBins[iBin]->IsDone() )
        {
            DictReadEntry ( dBins[iBin], tEntry, pKeywords + iBin*MAX_KEYWORD_BYTES );
            if ( dBins[iBin]->IsError() )
            {
                sError.SetSprintf ( "entry read error in dictionary sort (bin %d of %d)", iBin, dBins.GetLength() );
                LOC_CLEANUP();
                return false;
            }

            tEntry.m_iBlock = iBin;
            qWords.Push ( tEntry );
        }
    }
  • 根据dWords排好的顺序,开始一次读取每个块中的数据进行api文件的写入,主要写入下面几个字段:
    • m_sKeyword 根据语法第一次分词后的关键词,之所以提第一次是因为后面还要一次最基本一元分词。
    • m_uOff 对应spd文件的偏移地址
    • m_iDocs 对应文档列表的个数
    • m_iHits 对应hit信息的个数
  • 注意这里面对每64个关键词,记录一次Checkpoints元素信息,主要记录每一块第一个关键词信息和该块在spi文件的位置信息。Checkpoints信息后面会统一写入到spi文件中,该块记录主要是为了在查询时定位某一个关键词信息位置使用。
  • 这里需要重点注意一下pInfixer->AddWord函数,如果配置文件中配置了支持中缀索引,该函数会把分词后的关键词创建一个中缀索引哈希表,中缀索引主要是为了支持模糊查询使用(也可以通过一元分词实现),每个中缀索引项会记录基本词单元对应所有的Checkpoints的位置信息,同样pInfixer记录的数据最后也会写入到spi文件中供后续基本查询使用。
// flush infix hash entries, if any
    if ( pInfixer )
        pInfixer->SaveEntries ( m_wrDict );

    // flush wordlist checkpoints (blocks)
    pHeader->m_iDictCheckpointsOffset = m_wrDict.GetPos();
    pHeader->m_iDictCheckpoints = m_dCheckpoints.GetLength();

    ARRAY_FOREACH ( i, m_dCheckpoints )
    {
        const int iLen = strlen ( m_dCheckpoints[i].m_sWord );

        assert ( m_dCheckpoints[i].m_iWordlistOffset>0 );
        assert ( iLen>0 && iLen<MAX_KEYWORD_BYTES );

        m_wrDict.PutDword ( iLen );
        m_wrDict.PutBytes ( m_dCheckpoints[i].m_sWord, iLen );
        m_wrDict.PutOffset ( m_dCheckpoints[i].m_iWordlistOffset );

        SafeDeleteArray ( m_dCheckpoints[i].m_sWord );
    }
  • SaveEntries保存刚刚创建中缀索引哈希表每一个项的信息到spi文件中,此项非必须当启用中缀索引时才会写入。
  • 将刚刚生成的Checkpoints信息写入到spi文件中,主要写入块第一个关键词长度,关键词,对应块信息的文件偏移信息。
// flush infix hash blocks
    if ( pInfixer )
    {
        pHeader->m_iInfixBlocksOffset = pInfixer->SaveEntryBlocks ( m_wrDict );
        pHeader->m_iInfixBlocksWordsSize = pInfixer->GetBlocksWordsSize();
        if ( pHeader->m_iInfixBlocksOffset>UINT_MAX ) // FIXME!!! change to int64
            sphDie ( "INTERNAL ERROR: dictionary size " INT64_FMT " overflow at dictend save", pHeader->m_iInfixBlocksOffset );
    }

    // flush header
    // mostly for debugging convenience
    // primary storage is in the index wide header
    m_wrDict.PutBytes ( "dict-header", 11 );
    m_wrDict.ZipInt ( pHeader->m_iDictCheckpoints );
    m_wrDict.ZipOffset ( pHeader->m_iDictCheckpointsOffset );
    m_wrDict.ZipInt ( pHeader->m_iInfixCodepointBytes );
    m_wrDict.ZipInt ( (DWORD)pHeader->m_iInfixBlocksOffset );
  • SaveEntryBlocks是为把中缀索引每64个项分成一个块记录的块信息,类似Checkpoint信息作用。
  • 最后写入”dict-header”头字段信息,至此spi文件构造完成。
  • 因为spi文件在查询中起到关键重要的作用,因此简单总结画下spi文件实际存储示意关系图,其它spd和spp之类相对就简单很多。
tBuildHeader.m_sHeaderExtension = "sph";
    tBuildHeader.m_pMinRow = m_dMinRow.Begin();
    tBuildHeader.m_uMinDocid = m_uMinDocid;
    tBuildHeader.m_pThrottle = &g_tThrottle;
    tBuildHeader.m_uKillListSize = uKillistSize;
    tBuildHeader.m_iMinMaxIndex = m_iMinMaxIndex;
    tBuildHeader.m_iTotalDups = iDupes;

    // we're done
    if ( !BuildDone ( tBuildHeader, m_sLastError ) )
        return 0;
  • 开始写入索引的文件头信息到sph文件,主要是一些属性配置信息,非重点就不做深入研究了。至此创建索引的几个重要文件sph,spi,spd,spp,spa,spe文件就生成了,我们在对最终生成的这些索引文件总结一下。
    • sph 索引头文件信息,保存索引需要的配置属性字段信息。
    • spi 存储词列表信息,以及每个词id对应指向spd文件的偏移,通过这个偏移可以定位该词可匹配所有文档ID信息。
    • spd 存储每个词ID可匹配文档ID列表的信息,如果有属性信息也存储属性信息。
    • spp 存储每次词在文档的位置信息。
    • spa 存储文档属性信息,这个一般在extern模式下使用。
    • spe 构造存储文档id类似跳表的结构信息,方便后续查询使用。
  • 有这写索引文件可以简单思考下查询一个关键词流程:
    • 首先对搜索关键词进行分词处理。
    • 对于分词后的单词求得词ID,然后定位该词在spi文件的位置。
    • 定位到spi文件位置后,开始根据spi文件记录对应spd文件的偏移定位spd文件文档ID列表位置。
    • 定位到spd文件位置后,根据记录hitpos位置的偏移定位spp文件中保存词位置信息。
    • 对每次词查找定位后文档id求交集,在根据词位置信息做优先排序处理返回结果。
  • 以上查询只是个人理解,后续再分析sphinx源码中实际查询的逻辑流程。

以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

Algorithms

Algorithms

Robert Sedgewick、Kevin Wayne / Addison-Wesley Professional / 2011-3-19 / USD 89.99

Essential Information about Algorithms and Data Structures A Classic Reference The latest version of Sedgewick,s best-selling series, reflecting an indispensable body of knowledge developed over the ......一起来看看 《Algorithms》 这本书的介绍吧!

JS 压缩/解压工具
JS 压缩/解压工具

在线压缩/解压 JS 代码

Base64 编码/解码
Base64 编码/解码

Base64 编码/解码

UNIX 时间戳转换
UNIX 时间戳转换

UNIX 时间戳转换