Phoenix解读 | Phoenix源码解读之索引

栏目: 后端 · 发布时间: 5年前

内容简介:Phoenix 索引不支持全文检索,我们需要修改源码达到这一目的。添加全文检索的前提是我们需要了解当前索引创建、维护的机制。今天将带大家,逐步探索 Phoenix 原生索引的源码。根据之前对 Phoenix 源码简略了解的经验来看,每一个 DDL/DML 等语句都会通过一个 ***Compiler 类编译成一个 MutationPlan ,创建索引的逻辑在:分析这段代码之后得到以下结论:

Phoenix 索引不支持全文检索,我们需要修改源码达到这一目的。添加全文检索的前提是我们需要了解当前索引创建、维护的机制。今天将带大家,逐步探索 Phoenix 原生索引的源码。

创建索引

根据之前对 Phoenix 源码简略了解的经验来看,每一个 DDL/DML 等语句都会通过一个 ***Compiler 类编译成一个 MutationPlan ,创建索引的逻辑在:

org.apache.phoenix.compile.CreateIndexCompiler
public MutationPlan compile(final CreateIndexStatement create) throws SQLException {
       final PhoenixConnection connection = statement.getConnection();
       final ColumnResolver resolver = FromCompiler.getResolver(create, connection, create.getUdfParseNodes());
       Scan scan = new Scan();
       final StatementContext context = new StatementContext(statement, resolver, scan, new SequenceManager(statement));
       ExpressionCompiler expressionCompiler = new ExpressionCompiler(context);
       List<ParseNode> splitNodes = create.getSplitNodes();
       if (create.getIndexType() == IndexType.LOCAL) {
            if (!splitNodes.isEmpty()) {
                throw new SQLExceptionInfo.Builder(SQLExceptionCode.CANNOT_SPLIT_LOCAL_INDEX)
                .build().buildException();
       } 
       List<Pair<String, Object>> list = create.getProps() != null ? create.getProps().get("") : null;
       if (list != null) {
                for (Pair<String, Object> pair : list) {
                    if (pair.getFirst().equals(PhoenixDatabaseMetaData.SALT_BUCKETS)) {
                        throw new SQLExceptionInfo.Builder(SQLExceptionCode.CANNOT_SALT_LOCAL_INDEX)
                        .build().buildException();
                    }
                }
            }
       }
       final byte[][] splits = new byte[splitNodes.size()][];
       for (int i = 0; i < splits.length; i++) {
            ParseNode node = splitNodes.get(i);
            if (!node.isStateless()) {
                throw new SQLExceptionInfo.Builder(SQLExceptionCode.SPLIT_POINT_NOT_CONSTANT)
                    .setMessage("Node: " + node).build().buildException();
            }
            LiteralExpression expression = (LiteralExpression)node.accept(expressionCompiler);
            splits[i] = expression.getBytes();
       }
       final MetaDataClient client = new MetaDataClient(connection);

       return new BaseMutationPlan(context, operation) {
            @Override
            public MutationState execute() throws SQLException {
                return client.createIndex(create, splits);
            }

            @Override
            public ExplainPlan getExplainPlan() throws SQLException {
                return new ExplainPlan(Collections.singletonList("CREATE INDEX"));
            }

        };
}

分析这段代码之后得到以下结论:

  1. “create.getIndexType() == IndexType.LOCAL” 这段代码主要用来校验,可忽略,不再深入研究。

  2. “for (int i = 0; i < splits.length; i++) ” 这段代码是有关 split 的逻辑,目前不用深入研究。

  3. compile 函数最后返回了一个 BaseMutationPlan 类,这是最终的可执行计划,创建索引的逻辑在这里。

BaseMutationPlan 是最终可执行的执行计划,execute 函数最终创建索引,源码如下:

public MutationState execute() throws SQLException {
       return client.createIndex(create, splits);
}

逻辑非常简单,就是调用 client 的 createIndex ,client 是根据当前连接创建的 MetaDataClient 类。createIndex 源码如下:

/**
  * Create an index table by morphing the CreateIndexStatement into a CreateTableStatement and calling
  * MetaDataClient.createTable. In doing so, we perform the following translations:
  * 1) Change the type of any columns being indexed to types that support null if the column is nullable.
  *    For example, a BIGINT type would be coerced to a DECIMAL type, since a DECIMAL type supports null
  *    when it's in the row key while a BIGINT does not.
  * 2) Append any row key column from the data table that is not in the indexed column list. Our indexes
  *    rely on having a 1:1 correspondence between the index and data rows.
  * 3) Change the name of the columns to include the column family. For example, if you have a column
  *    named "B" in a column family named "A", the indexed column name will be "A:B". This makes it easy
  *    to translate the column references in a query to the correct column references in an index table
  *    regardless of whether the column reference is prefixed with the column family name or not. It also
  *    has the side benefit of allowing the same named column in different column families to both be
  *    listed as an index column.
  * @param statement
  * @param splits
  * @return MutationState from population of index table from data table
  * @throws SQLException
  */
public MutationState createIndex(CreateIndexStatement statement, byte[][] splits) throws SQLException {
       ...
}

通过这段注释我们知道,其实创建一个索引,就是通过 CreateTableStatement 来实现的,这期间有以下三个转换:

  1. 把索引字段转换成支持 null 值的类型。比如会把 bigint 转换成 decimal 。

  2. 把数据表的 row key 字段添加到索引字段列表。索引的数据与源表数据是11的关系

  3. 修改索引字段名包含列簇名。索引名的格式就是:

    columnFamilyName:columnName

    比如列簇 A 里面有一个 B 字段,那么索引列的名称就是 “A:B” 。主要是为了引用方便和防止同名。

这个函数逻辑很复杂,我们先往下继续看,不过多涉及细节,先梳理其结构。不过这些忽略的代码,应该就是实现上面3点的。

我们来看 createIndex 的 return ,这里又调用一个函数 : buildIndex 。

buildIndex 的代码也比较长,其中一个比较重要的代码逻辑如下:

MutationPlan mutationPlan = getMutationPlanForBuildingIndex(index, dataTableRef);

getMutationPlanForBuildingIndex 源码如下:

private MutationPlan getMutationPlanForBuildingIndex(PTable index, TableRef dataTableRef) throws SQLException {
        if (index.getIndexType() == IndexType.LOCAL) {
            PostLocalIndexDDLCompiler compiler =
                    new PostLocalIndexDDLCompiler(connection, getFullTableName(dataTableRef));
            return compiler.compile(index);
        } else if (dataTableRef.getTable().isTransactional()){
            PostIndexDDLCompiler compiler = new PostIndexDDLCompiler(connection, dataTableRef);
            return compiler.compile(index);
        } else {
            ServerBuildIndexCompiler compiler = new ServerBuildIndexCompiler(connection, getFullTableName(dataTableRef));
            return compiler.compile(index);
        }
}

为了简化分析流程,我们不分析 local 的索引。这段代码其实就是创建了一个 PostIndexDDLCompiler 类。

/**
 * Class that compiles plan to generate initial data values after a DDL command for
 * index table.
 */
public class PostIndexDDLCompiler {
       ...
}

官方注释也讲解的很清楚了,这个执行计划是为了给索引表生成初始化数据。其实创建索引分两步,第一步向系统表插入索引的元数据信息,第二步根据索引元数据生成索引数据。PostIndexDDLCompiler 就是用来生成初始化数据的。

StringBuilder updateStmtStr = new StringBuilder();
updateStmtStr.append("UPSERT /*+ NO_INDEX */ INTO ").append(schemaName.length() == 0 ? "" : '"' + schemaName + "\".").append('"').append(tableName).append("\"(")
           .append(indexColumns).append(") ");
final StringBuilder selectQueryBuilder = new StringBuilder();
selectQueryBuilder.append(" SELECT /*+ NO_INDEX */ ").append(dataColumns).append(" FROM ")
        .append(schemaName.length() == 0 ? "" : '"' + schemaName + "\".").append('"').append(dataTable.getTableName().getString()).append('"');
this.selectQuery = selectQueryBuilder.toString();
updateStmtStr.append(this.selectQuery);

try (final PhoenixStatement statement = new PhoenixStatement(connection)) {
           DelegateMutationPlan delegate = new DelegateMutationPlan(statement.compileMutation(updateStmtStr.toString())) {
                   @Override
                   public MutationState execute() throws SQLException {
                          connection.getMutationState()
                                  .commitDDLFence(dataTable);
                          return super.execute();
                   }
           };
           return delegate;
}

根据经验,我们还是只看 compile 函数,上面是 compile 函数的重点逻辑,其实就是生成了一个 upsert select 的 SQL ,然后执行:

Scan scan = mutationPlan.getContext().getScan();
Long scn = connection.getSCN();
try {
     if (ScanUtil.isDefaultTimeRange(scan.getTimeRange())) {
            if (scn == null) {
                       scn = mutationPlan.getContext().getCurrentTime();
            }
            scan.setTimeRange(dataTableRef.getLowerBoundTimeStamp(), scn);
    }
} catch (IOException e) {
         throw new SQLException(e);
}

// execute index population upsert select
long startTime = EnvironmentEdgeManager.currentTimeMillis();
MutationState state = connection.getQueryServices().updateData(mutationPlan);
long firstUpsertSelectTime = EnvironmentEdgeManager.currentTimeMillis() - startTime;

// for global indexes on non transactional tables we might have to
// run a second index population upsert select to handle data rows
// that were being written on the server while the index was created.
// TODO: this sleep time is really arbitrary. If any query is in progress
// while the index is being built, we're depending on this sleep
// waiting them out. Instead we should have a means of waiting until
// all in progress queries are complete (though I'm not sure that's
// feasible). See PHOENIX-4092.
long sleepTime = connection.getQueryServices().getProps()
          .getLong(QueryServices.INDEX_POPULATION_SLEEP_TIME,
              QueryServicesOptions.DEFAULT_INDEX_POPULATION_SLEEP_TIME);
if (!dataTableRef.getTable().isTransactional() && sleepTime > 0) {
         long delta = sleepTime - firstUpsertSelectTime;
         if (delta > 0) {
                  try {
                        Thread.sleep(delta);
                  } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                        throw new SQLExceptionInfo.Builder(SQLExceptionCode.INTERRUPTED_EXCEPTION)
                        .setRootCause(e).build().buildException();
                  }
          }
          // set the min timestamp of second index upsert select some time before the index
          // was created
          long minTimestamp = index.getTimeStamp() - firstUpsertSelectTime;
          try {
                    // TODO: Use scn or LATEST_TIMESTAMP here? It's possible that a DML statement
                    // ran and ended up with timestamps later than this time. If we use a later
                    // timestamp, we'll need to run the partial index rebuilder here as it's
                    // possible that the updates to the table were made (such as deletes) after
                    // the scn which would not be properly reflected correctly this mechanism.
                    // See PHOENIX-4092.
                    mutationPlan.getContext().getScan().setTimeRange(minTimestamp, scn);
          } catch (IOException e) {
                    throw new SQLException(e);
          }
          MutationState newMutationState =
                        connection.getQueryServices().updateData(mutationPlan);
          state.join(newMutationState);
}

indexStatement = FACTORY.alterIndex(FACTORY.namedTable(null,
                      TableName.create(index.getSchemaName().getString(), index.getTableName().getString())),
                        dataTableRef.getTable().getTableName().getString(), false, PIndexState.ACTIVE);
alterIndex(indexStatement);

return state;

我们在回到 buildIndex 的主逻辑,简单来看就是执行完生成索引数据的 sql 之后,通过 alterIndex 命令修改索引的状态为 PIndexState.ACTIVE 。当然了,还有设置 Scan 的时间范围等其他逻辑,这里也先不深入研究了。

那我们就总结一下创建索引的过程,大概分两步:

  1. 根据语句创建索引元数据信息,然后插入 Phoenix 系统表。

  2. 根据元数据信息,生成初始化索引数据的逻辑。

如果要增加全文检索的类型,创建索引时需要哪些逻辑呢?

  1. 扩展索引类型。这样“全文索引”类型就会插入到 Phoenix 系统表。

  2. 将全文检索的参数保存到 Phoenix 系统表,如果 Phoenix 系统表不支持,则需要对其进行扩展。作者计划用 Solr 作为全文检索的引擎,那么 Solr 的连接信息就需要保存。

  3. 增加初始化索引数据的逻辑。需要读取源表索引列的数据,将其插入到全文检索引擎中,比如 Solr 或 ElasticSearch 。

那么是不是增加全文检索的类型,到此就结束了呢?远不止。这里只是大概分析了创建索引的过程,创建之后该如何根据数据的增、删、改自动维护全文检索的数据也是难题。

另外,如何扩展当前的 DDL 语句也是问题,比如目前的创建索引的语法如下:

Phoenix解读 | Phoenix源码解读之索引

简单来看有两种方案增加全文检索类型,第一个就是扩展 DDL 语法,使其在 local 的基础之上增加 solr ;第二个就是把索引的类型保存到 indexOptions 中。具体选择哪种方案就后面再深入研究了。

到此为止,我们就初步分析了索引创建的过程,后续会继续研究其他相关的逻辑。这里也只是抛砖引玉,如果大家有其他好的方案,欢迎留言讨论。

全局索引写

上一章我们简要分析了索引的创建流程,本章节分析全局索引写的时序和逻辑。

在引用的文章 《Phoenix重磅 | Phoenix(云HBase SQL)核心功能原理及应用场景介绍》 中有提到,全局索引的维护分为两类:Multable 表、Immutable 表。Mutable 表全局索引在 HBase 的 RegionServer 完成,其实就是通过 HBase 的 Coprocessor 实现,是异步的;Immutable 表是在客户端完成,也就是写数据的同时写入索引。

那我们寻找 Phoenix 索引维护的源码时,就要从 Coprocessor 入手,也就是继承并实现 Coprocessor 的类。根据包名和继承关系,我们找到了类

org.apache.phoenix.hbase.index.Indexer
/**
 * Do all the work of managing index updates from a single coprocessor. All Puts/Delets are passed
 * to an {@link IndexBuilder} to determine the actual updates to make.
 * <p>
 * If the WAL is enabled, these updates are then added to the WALEdit and attempted to be written to
 * the WAL after the WALEdit has been saved. If any of the index updates fail, this server is
 * immediately terminated and we rely on WAL replay to attempt the index updates again (see
 * {@link #preWALRestore(ObserverContext, HRegionInfo, HLogKey, WALEdit)}).
 * <p>
 * If the WAL is disabled, the updates are attempted immediately. No consistency guarantees are made
 * if the WAL is disabled - some or none of the index updates may be successful. All updates in a
 * single batch must have the same durability level - either everything gets written to the WAL or
 * nothing does. Currently, we do not support mixed-durability updates within a single batch. If you
 * want to have different durability levels, you only need to split the updates into two different
 * batches.
 * <p>
 * We don't need to implement {@link #postPut(ObserverContext, Put, WALEdit, Durability)} and
 * {@link #postDelete(ObserverContext, Delete, WALEdit, Durability)} hooks because 
 * Phoenix always does batch mutations.
 * <p>
 */
public class Indexer implements RegionObserver, RegionCoprocessor {
      ...
}

简单分析这个类的注释,就得知 Indexer 拦截数据的写入和删除,然后将先关数据传值给 IndexBuilder 类来完成实际的索引更新;Indexer 只拦截批量提交的数据更新操作。

@Override
public void start(CoprocessorEnvironment e) throws IOException 

@Override
public void stop(CoprocessorEnvironment e) throws IOException 

/**
 * We use an Increment to serialize the ON DUPLICATE KEY clause so that the HBase plumbing
 * sets up the necessary locks and mvcc to allow an atomic update. The Increment is not a
 * real increment, though, it's really more of a Put. We translate the Increment into a
 * list of mutations, at most a single Put and Delete that are the changes upon executing
 * the list of ON DUPLICATE KEY clauses for this row.
 */
@Override
public Result preIncrementAfterRowLock(final ObserverContext<RegionCoprocessorEnvironment> e,
        final Increment inc)

@Override
public void preBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c,
    MiniBatchOperationInProgress<Mutation> miniBatchOp)

@Override
public void postBatchMutateIndispensably(ObserverContext<RegionCoprocessorEnvironment> c,
    MiniBatchOperationInProgress<Mutation> miniBatchOp, final boolean success)

@Override
public void postOpen(final ObserverContext<RegionCoprocessorEnvironment> c)

@Override
public void preWALRestore(ObserverContext<RegionCoprocessorEnvironment> env, HRegionInfo info,
    HLogKey logKey, WALEdit logEdit) throws IOException

/**
 * Create a custom {@link InternalScanner} for a compaction that tracks the versions of rows that
 * are removed so we can clean then up from the the index table(s).
 * <p>
 * This is not yet implemented - its not clear if we should even mess around with the Index table
 * for these rows as those points still existed. TODO: v2 of indexing
 */
@Override
public InternalScanner preCompactScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c,
        final Store store, final List<? extends KeyValueScanner> scanners, final ScanType scanType,
        final long earliestPutTs, final InternalScanner s) throws IOException

分析其实现的 Coprocessor 各个函数,排除掉不重要的函数,我们着重分析 preBatchMutate:

public void preBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c,
      MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
       if (this.disabled) {
           return;
       }
       long start = EnvironmentEdgeManager.currentTimeMillis();
       try {
            preBatchMutateWithExceptions(c, miniBatchOp);
            return;
       } catch (Throwable t) {
            rethrowIndexingException(t);
       } finally {
            long duration = EnvironmentEdgeManager.currentTimeMillis() - start;
            if (duration >= slowIndexPrepareThreshold) {
                if (LOGGER.isDebugEnabled()) {
                  LOGGER.debug(getCallTooSlowMessage("preBatchMutate", duration, slowIndexPrepareThreshold));
                }
              metricSource.incrementNumSlowIndexPrepareCalls();
          }
            metricSource.updateIndexPrepareTime(duration);
       }
       throw new RuntimeException(
        "Somehow didn't return an index update but also didn't propagate the failure to the client!");
}

这段代码比较简单,重要的逻辑都在  preBatchMutateWithExceptions 函数中。

preBatchMutateWithExceptions 代码比较多,不再具体分析,但我们要特别关注代码中对 builder 对象的相关调用,因为这个类的类型是 IndexBuildManager ,所有索引维护的操作都由其完成。

/**
 * Manage the building of index updates from primary table updates.
 */
public class IndexBuildManager implements Stoppable {
       private final IndexBuilder delegate;
}

这个类有一个很重要的变量:delegate 。所有索引维护的具体实现又转发给了 delegate 。

/**
 * Interface to build updates ({@link Mutation}s) to the index tables, based on the primary table
 * updates.
 * <p>
 * Either all the index updates will be applied to all tables or the primary table will kill itself
 * and will attempt to replay the index edits through the WAL replay mechanism.
 */
public interface IndexBuilder extends Stoppable {
       ...
}

根据其定义,getIndexUpdate 函数完成最终的索引维护:

/**
  * Your opportunity to update any/all index tables based on the update of the primary table row.
  * Its up to your implementation to ensure that timestamps match between the primary and index
  * tables.
  * <p>
  * The mutation is a generic mutation (not a {@link Put} or a {@link Delete}), as it actually
  * corresponds to a batch update. Its important to note that {@link Put}s always go through the
  * batch update code path, so a single {@link Put} will come through here and update the primary
  * table as the only update in the mutation.
  * <p>
  * Implementers must ensure that this method is thread-safe - it could (and probably will) be
  * called concurrently for different mutations, which may or may not be part of the same batch.
  * @param mutation update to the primary table to be indexed.
  * @param context index meta data for the mutation
  * @return a Map of the mutations to make -> target index table name
  * @throws IOException on failure
  */
public Collection<Pair<Mutation, byte[]>> getIndexUpdate(Mutation mutation, IndexMetaData context) throws IOException;

我们在回到 preBatchMutateWithExceptions 函数,找到 getIndexUpdate 调用的地方。

// get the current span, or just use a null-span to avoid a bunch of if statements
try (TraceScope scope = Trace.startSpan("Starting to build index updates")) {
     Span current = scope.getSpan();
     if (current == null) {
         current = NullSpan.INSTANCE;
     }
     long start = EnvironmentEdgeManager.currentTimeMillis();

     // get the index updates for all elements in this batch
     Collection<Pair<Mutation, byte[]>> indexUpdates =
                this.builder.getIndexUpdate(miniBatchOp, mutations);

     long duration = EnvironmentEdgeManager.currentTimeMillis() - start;
     if (duration >= slowIndexPrepareThreshold) {
         if (LOGGER.isDebugEnabled()) {
             LOGGER.debug(getCallTooSlowMessage("indexPrepare", duration, slowIndexPrepareThreshold));
         }
         metricSource.incrementNumSlowIndexPrepareCalls();
     }
     metricSource.updateIndexPrepareTime(duration);
     current.addTimelineAnnotation("Built index updates, doing preStep");
     TracingUtils.addAnnotation(current, "index update count", indexUpdates.size());
     byte[] tableName = c.getEnvironment().getRegion().getTableDescriptor().getTableName().getName();
     Iterator<Pair<Mutation, byte[]>> indexUpdatesItr = indexUpdates.iterator();
     List<Mutation> localUpdates = new ArrayList<Mutation>(indexUpdates.size());
     while(indexUpdatesItr.hasNext()) {
           Pair<Mutation, byte[]> next = indexUpdatesItr.next();
           if (Bytes.compareTo(next.getSecond(), tableName) == 0) {
               localUpdates.add(next.getFirst());
               indexUpdatesItr.remove();
           }
     }
     if (!localUpdates.isEmpty()) {
         miniBatchOp.addOperationsFromCP(0,
                localUpdates.toArray(new Mutation[localUpdates.size()]));
     }
     if (!indexUpdates.isEmpty()) {
         context.indexUpdates = indexUpdates;
         // write index updates to WAL
         if (durability != Durability.SKIP_WAL) {
             // we have all the WAL durability, so we just update the WAL entry and move on
             for (Pair<Mutation, byte[]> entry : indexUpdates) {
                  edit.add(new IndexedKeyValue(entry.getSecond(), entry.getFirst()));
             }              
         }
     }
}

“index update count” 这段英文消息,也更加证实了我们的分析,getIndexUpdate 函数维护了最终的索引数据。

delegate 最终由哪个类实现呢?

private static IndexBuilder getIndexBuilder(RegionCoprocessorEnvironment e) throws IOException {
        Configuration conf = e.getConfiguration();
        Class<? extends IndexBuilder> builderClass =
        conf.getClass(Indexer.INDEX_BUILDER_CONF_KEY, null, IndexBuilder.class);
        try {
             IndexBuilder builder = builderClass.newInstance();
             builder.setup(e);
             return builder;
        } catch (InstantiationException e1) {
            throw new IOException("Couldn't instantiate index builder:" + builderClass
                + ", disabling indexing on table " + e.getRegion().getTableDescriptor().getTableName().getNameAsString());
        } catch (IllegalAccessException e1) {
            throw new IOException("Couldn't instantiate index builder:" + builderClass
                + ", disabling indexing on table " + e.getRegion().getTableDescriptor().getTableName().getNameAsString());
        }
}

这段代码由显示 “index.builder” 这个参数指定,我们看一下建表的参数:

'PHOENIX:CONTRACT_GPS', {TABLE_ATTRIBUTES => {coprocessor$1 => '|org.apache.phoenix.coprocessor.ScanRegionObserver|805306366|', coprocessor$2 => '|org.apache.phoenix.coprocessor.UngroupedAggregateRegionObserver|805306366|', coprocessor$3 => '|org.apache.phoenix.coprocessor.GroupedAggregateRegionObserver|805306366|', coprocessor$4 => '|org.apache.phoenix.coprocessor.ServerCachingEndpointImpl|805306366|', coprocessor$5 => '|org.apache.phoenix.hbase.index.Indexer|805306366|org.apache.hadoop.hbase.index.codec.class=org.apache.phoenix.index.PhoenixIndexCodec,index.builder=org.apache.phoenix.index.PhoenixIndexBuilder'}, {NAME => 'CF_1', BLOOMFILTER => 'NONE', DATA_BLOCK_ENCODING => 'FAST_DIFF'}

很明显 delegate 的实际类型就是:

org.apache.phoenix.index.PhoenixIndexBuilder

其实分析到这里我们就不再深入研究,结合上一章节,我们简要总结一下索引创建和维护的过程:

  1. 创建索引元数据,插入系统表

  2. 根据索引元数据,构造当前数据的索引数据

  3. 通过 Coprocessor 拦截源表的数据变更,维护索引表数据。

当然上面三个步骤是非常简化的,还要很多细节没有理清,但这并不妨碍我们实现基于 Solr 或 ElasticSearch 的全文索引。

最后贴一张引用的图,作为此次分析的结束。

Phoenix解读 | Phoenix源码解读之索引

索引使用

前两章我们分析了索引的创建和维护过程,后面我们会简要分析一下索引的使用,而在这之前需要先分析一下 PhoenixSQL 编译的过程。

与大部分的 SQL 引擎一样,Phoenix 也是用 ANTLR:

https://www.antlr.org

实现 SQL 编译的,那我们就要简要分析一下 PhoenixSQL.g 这个文件。

grammar PhoenixSQL;
tokens {}
@parser::header {}
@lexer::header {}
@parser::members {}
@rulecatch {}
@lexer::members {}

nextStatement returns [BindableStatement ret]
......

HINT_START: '/*+' ;
....
fragment
FIELDCHAR
......

其中有几个地方需要注意:

  1. tokens 。定义关键字。如果我们在 local 的基础上新增 full text 类型的索引,就需要在这里提前定义好。

  2. @parser::header、@lexer::header。在生成对应的类中,自动带上这些包路径。其实就是定义最终生成的 java 文件的头

  3. @parser::members、@lexer::members。其实就是扩展最终生成的parser类,简单来说就是在 java 文件中,插入一段代码

  4. @rulecatch。定义语法解析异常时的规则,根据文件定义,Phoenix选择直接抛出异常。

剩余部分就是定义 DSL 语法和词法。

根据 antlr 语法,这个文件会生成:

PhoenixSQL.tokens/PhoenixSQLLexer/PhoenixSQLParser

我们需要知道这三个类在哪里被使用,才方便找到客户端提供的 SQL 在哪里编译。当然了,直接 debug 其他类 ( 比如 PhoenixStatement ) 也是可以跟踪到的。

<plugin>
  <groupId>org.antlr</groupId>
  <artifactId>antlr3-maven-plugin</artifactId>
  <version>3.5.2</version>
  <executions>
    <execution>
      <goals>
        <goal>antlr</goal>
      </goals>
    </execution>
  </executions>
  <configuration>
    <outputDirectory>${antlr-output.dir}/org/apache/phoenix/parse</outputDirectory>
  </configuration>
</plugin>

<antlr-output.dir>target/generated-sources/antlr3</antlr-output.dir>

根据 POM 文件的 plugin 可以得知,编译后的文件在 target

target/generated-sources/antlr3 目录。感兴趣的读者可以看一下这三个类的结构,我们就不再过多介绍。

经过全文检索,我们找到 PhoenixSQLParser 创建的地方:

org.apache.phoenix.parse.SQLParser
public SQLParser(String query, ParseNodeFactory factory) {
       PhoenixSQLLexer lexer;
       try {
            lexer = new PhoenixSQLLexer(new CaseInsensitiveReaderStream(new StringReader(query)));
       } catch (IOException e) {
            throw new RuntimeException(e); // Impossible
       }
       CommonTokenStream cts = new CommonTokenStream(lexer);
       parser = new PhoenixSQLParser(cts);
       parser.setParseNodeFactory(factory);
}

SQLParser 构造函数中,这段代码还是比较重要的。这里设置了一个 factory 变量。下面是这个变量的类型和定义:

/**
  * 
  * Factory used by parser to construct object model while parsing a SQL statement
  */
public class ParseNodeFactory {
       ...
}

根据注释我们知道,它是把 SQL 语句转化为模型对象的。

// Parse a create index statement.
create_index_node returns [CreateIndexStatement ret]
    :   CREATE l=LOCAL? INDEX (IF NOT ex=EXISTS)? i=index_name ON t=from_table_name
        (LPAREN ik=ik_constraint RPAREN)
        (INCLUDE (LPAREN icrefs=column_names RPAREN))?
        (async=ASYNC)?
        (p=fam_properties)?
        (SPLIT ON v=value_expression_list)?
        {ret = factory.createIndex(i, factory.namedTable(null,t), ik, icrefs, v, p, ex!=null, l==null ? IndexType.getDefault() : IndexType.LOCAL, async != null, getBindCount(), new HashMap<String, UDFParseNode>(udfParseNodes)); }
    ;

上面是 create_index_node的antlr 定义语句,简单来看就是解析完语法之后,调用 factory.createIndex 创建并返回一个 CreateIndexStatement 。这样我们的 create index 语句就被 ParseNodeFactory 转化成了 CreateIndexStatement 对象。

如果我们要增加全文索引,就需要修改这个定义语句的。比如在 LOCAL 的地方添加 FULL TEXT ,在 SPLIT 后面添加 Solr 或 ElasticSearch 的连接属性等参数。同时修改 factory.createIndex 方法,接收这些参数。

编译后的 create_index_node 函数代码还是很长的,我们只看最后的返回逻辑,可以发现,跟预期的一样。

try{
     {
       ...
       if (state.backtracking==0) {
           ret = factory.createIndex(i, factory.namedTable(null,t), ik, icrefs, v, p, ex!=null, l==null ? IndexType.getDefault() : IndexType.LOCAL, async != null, getBindCount(), new HashMap<String, UDFParseNode>(udfParseNodes)); 
       }
     }
} catch (RecognitionException re) {
     throw re;
} finally {
     // do for sure before leaving
}
return ret;

SQL 解析编译的过程,相信大家已经有了简单的了解。下面就要研究 SQLParser 的初始化和创建的过程:

Phoenix解读 | Phoenix源码解读之索引

还是全文检索 SQLParser ,这样虽然比较笨,但还是非常直观的。细心的读者会发现,有很多地方会调用 SQLParser ,而且不同的场景会调用不同的方法。这就说明,Phoenix 在编译 SQL 时,会大概判断一下当前的 SQL 类型,DDL/DML 还是其他。

由于 SQLParser 使用的地方太多,如果一个个跟踪研究,过于繁琐。我们会转换一下思路,从源头开始,也就是编译 PreparedStatement 的地方,后面将会介绍。

大家工作学习遇到HBase技术问题,把问题发布到HBase技术社区论坛http://hbase.group,欢迎大家论坛上面提问留言讨论。想了解更多HBase技术关注HBase技术社区公众号(微信号:hbasegroup),非常欢迎大家积极投稿。

Phoenix解读 | Phoenix源码解读之索引

本群为HBase+Spark技术交流讨论,整合最优质的专家资源和技术资料会定期开展线下技术沙龙,专家技术直播,专家答疑活动

点击链接钉钉入群:https://dwz.cn/Fvqv066s或扫码进群

Phoenix解读 | Phoenix源码解读之索引

本群为Cassandra技术交流讨论,整合最优质的专家资源和技术资料会定期开展线下技术沙龙,专家技术直播,专家答疑活动

Cassandra 社区钉钉大群:https://c.tb.cn/F3.ZRTY0o

Phoenix解读 | Phoenix源码解读之索引

Cassandra 技术社区微信公众号:

Phoenix解读 | Phoenix源码解读之索引


以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

Weaving the Web

Weaving the Web

Tim Berners-Lee / Harper Paperbacks / 2000-11-01 / USD 15.00

Named one of the greatest minds of the 20th century by Time , Tim Berners-Lee is responsible for one of that century's most important advancements: the world wide web. Now, this low-profile genius-wh......一起来看看 《Weaving the Web》 这本书的介绍吧!

JS 压缩/解压工具
JS 压缩/解压工具

在线压缩/解压 JS 代码

Base64 编码/解码
Base64 编码/解码

Base64 编码/解码

Markdown 在线编辑器
Markdown 在线编辑器

Markdown 在线编辑器