Phoenix解读 | Phoenix源码解读之索引

Phoenix 索引不支持全文检索,我们需要修改源码达到这一目的。添加全文检索的前提是我们需要了解当前索引创建、维护的机制。今天将带大家,逐步探索 Phoenix 原生索引的源码。根据之前对 Phoenix 源码简略了解的经验来看,每一个 DDL/DML 等语句都会通过一个 ***Compiler 类编译成一个 MutationPlan ,创建索引的逻辑在:

Phoenix 索引不支持全文检索,我们需要修改源码达到这一目的。添加全文检索的前提是我们需要了解当前索引创建、维护的机制。今天将带大家,逐步探索 Phoenix 原生索引的源码。


根据之前对 Phoenix 源码简略了解的经验来看,每一个 DDL/DML 等语句都会通过一个 ***Compiler 类编译成一个 MutationPlan ,创建索引的逻辑在:

public MutationPlan compile(final CreateIndexStatement create) throws SQLException {
       final PhoenixConnection connection = statement.getConnection();
       final ColumnResolver resolver = FromCompiler.getResolver(create, connection, create.getUdfParseNodes());
       Scan scan = new Scan();
       final StatementContext context = new StatementContext(statement, resolver, scan, new SequenceManager(statement));
       ExpressionCompiler expressionCompiler = new ExpressionCompiler(context);
       List<ParseNode> splitNodes = create.getSplitNodes();
       if (create.getIndexType() == IndexType.LOCAL) {
            if (!splitNodes.isEmpty()) {
                throw new SQLExceptionInfo.Builder(SQLExceptionCode.CANNOT_SPLIT_LOCAL_INDEX)
       List<Pair<String, Object>> list = create.getProps() != null ? create.getProps().get("") : null;
       if (list != null) {
                for (Pair<String, Object> pair : list) {
                    if (pair.getFirst().equals(PhoenixDatabaseMetaData.SALT_BUCKETS)) {
                        throw new SQLExceptionInfo.Builder(SQLExceptionCode.CANNOT_SALT_LOCAL_INDEX)
       final byte[][] splits = new byte[splitNodes.size()][];
       for (int i = 0; i < splits.length; i++) {
            ParseNode node = splitNodes.get(i);
            if (!node.isStateless()) {
                throw new SQLExceptionInfo.Builder(SQLExceptionCode.SPLIT_POINT_NOT_CONSTANT)
                    .setMessage("Node: " + node).build().buildException();
            LiteralExpression expression = (LiteralExpression)node.accept(expressionCompiler);
            splits[i] = expression.getBytes();
       final MetaDataClient client = new MetaDataClient(connection);

       return new BaseMutationPlan(context, operation) {
            public MutationState execute() throws SQLException {
                return client.createIndex(create, splits);

            public ExplainPlan getExplainPlan() throws SQLException {
                return new ExplainPlan(Collections.singletonList("CREATE INDEX"));



  1. “create.getIndexType() == IndexType.LOCAL” 这段代码主要用来校验,可忽略,不再深入研究。

  2. “for (int i = 0; i < splits.length; i++) ” 这段代码是有关 split 的逻辑,目前不用深入研究。

  3. compile 函数最后返回了一个 BaseMutationPlan 类,这是最终的可执行计划,创建索引的逻辑在这里。

BaseMutationPlan 是最终可执行的执行计划,execute 函数最终创建索引,源码如下:

public MutationState execute() throws SQLException {
       return client.createIndex(create, splits);

逻辑非常简单,就是调用 client 的 createIndex ,client 是根据当前连接创建的 MetaDataClient 类。createIndex 源码如下:

  * Create an index table by morphing the CreateIndexStatement into a CreateTableStatement and calling
  * MetaDataClient.createTable. In doing so, we perform the following translations:
  * 1) Change the type of any columns being indexed to types that support null if the column is nullable.
  *    For example, a BIGINT type would be coerced to a DECIMAL type, since a DECIMAL type supports null
  *    when it's in the row key while a BIGINT does not.
  * 2) Append any row key column from the data table that is not in the indexed column list. Our indexes
  *    rely on having a 1:1 correspondence between the index and data rows.
  * 3) Change the name of the columns to include the column family. For example, if you have a column
  *    named "B" in a column family named "A", the indexed column name will be "A:B". This makes it easy
  *    to translate the column references in a query to the correct column references in an index table
  *    regardless of whether the column reference is prefixed with the column family name or not. It also
  *    has the side benefit of allowing the same named column in different column families to both be
  *    listed as an index column.
  * @param statement
  * @param splits
  * @return MutationState from population of index table from data table
  * @throws SQLException
public MutationState createIndex(CreateIndexStatement statement, byte[][] splits) throws SQLException {

通过这段注释我们知道,其实创建一个索引,就是通过 CreateTableStatement 来实现的,这期间有以下三个转换:

  1. 把索引字段转换成支持 null 值的类型。比如会把 bigint 转换成 decimal 。

  2. 把数据表的 row key 字段添加到索引字段列表。索引的数据与源表数据是11的关系

  3. 修改索引字段名包含列簇名。索引名的格式就是:


    比如列簇 A 里面有一个 B 字段,那么索引列的名称就是 “A:B” 。主要是为了引用方便和防止同名。


我们来看 createIndex 的 return ,这里又调用一个函数 : buildIndex 。

buildIndex 的代码也比较长,其中一个比较重要的代码逻辑如下:

MutationPlan mutationPlan = getMutationPlanForBuildingIndex(index, dataTableRef);

getMutationPlanForBuildingIndex 源码如下:

private MutationPlan getMutationPlanForBuildingIndex(PTable index, TableRef dataTableRef) throws SQLException {
        if (index.getIndexType() == IndexType.LOCAL) {
            PostLocalIndexDDLCompiler compiler =
                    new PostLocalIndexDDLCompiler(connection, getFullTableName(dataTableRef));
            return compiler.compile(index);
        } else if (dataTableRef.getTable().isTransactional()){
            PostIndexDDLCompiler compiler = new PostIndexDDLCompiler(connection, dataTableRef);
            return compiler.compile(index);
        } else {
            ServerBuildIndexCompiler compiler = new ServerBuildIndexCompiler(connection, getFullTableName(dataTableRef));
            return compiler.compile(index);

为了简化分析流程,我们不分析 local 的索引。这段代码其实就是创建了一个 PostIndexDDLCompiler 类。

 * Class that compiles plan to generate initial data values after a DDL command for
 * index table.
public class PostIndexDDLCompiler {

官方注释也讲解的很清楚了,这个执行计划是为了给索引表生成初始化数据。其实创建索引分两步,第一步向系统表插入索引的元数据信息,第二步根据索引元数据生成索引数据。PostIndexDDLCompiler 就是用来生成初始化数据的。

StringBuilder updateStmtStr = new StringBuilder();
updateStmtStr.append("UPSERT /*+ NO_INDEX */ INTO ").append(schemaName.length() == 0 ? "" : '"' + schemaName + "\".").append('"').append(tableName).append("\"(")
           .append(indexColumns).append(") ");
final StringBuilder selectQueryBuilder = new StringBuilder();
selectQueryBuilder.append(" SELECT /*+ NO_INDEX */ ").append(dataColumns).append(" FROM ")
        .append(schemaName.length() == 0 ? "" : '"' + schemaName + "\".").append('"').append(dataTable.getTableName().getString()).append('"');
this.selectQuery = selectQueryBuilder.toString();

try (final PhoenixStatement statement = new PhoenixStatement(connection)) {
           DelegateMutationPlan delegate = new DelegateMutationPlan(statement.compileMutation(updateStmtStr.toString())) {
                   public MutationState execute() throws SQLException {
                          return super.execute();
           return delegate;

根据经验,我们还是只看 compile 函数,上面是 compile 函数的重点逻辑,其实就是生成了一个 upsert select 的 SQL ,然后执行:

Scan scan = mutationPlan.getContext().getScan();
Long scn = connection.getSCN();
try {
     if (ScanUtil.isDefaultTimeRange(scan.getTimeRange())) {
            if (scn == null) {
                       scn = mutationPlan.getContext().getCurrentTime();
            scan.setTimeRange(dataTableRef.getLowerBoundTimeStamp(), scn);
} catch (IOException e) {
         throw new SQLException(e);

// execute index population upsert select
long startTime = EnvironmentEdgeManager.currentTimeMillis();
MutationState state = connection.getQueryServices().updateData(mutationPlan);
long firstUpsertSelectTime = EnvironmentEdgeManager.currentTimeMillis() - startTime;

// for global indexes on non transactional tables we might have to
// run a second index population upsert select to handle data rows
// that were being written on the server while the index was created.
// TODO: this sleep time is really arbitrary. If any query is in progress
// while the index is being built, we're depending on this sleep
// waiting them out. Instead we should have a means of waiting until
// all in progress queries are complete (though I'm not sure that's
// feasible). See PHOENIX-4092.
long sleepTime = connection.getQueryServices().getProps()
if (!dataTableRef.getTable().isTransactional() && sleepTime > 0) {
         long delta = sleepTime - firstUpsertSelectTime;
         if (delta > 0) {
                  try {
                  } catch (InterruptedException e) {
                        throw new SQLExceptionInfo.Builder(SQLExceptionCode.INTERRUPTED_EXCEPTION)
          // set the min timestamp of second index upsert select some time before the index
          // was created
          long minTimestamp = index.getTimeStamp() - firstUpsertSelectTime;
          try {
                    // TODO: Use scn or LATEST_TIMESTAMP here? It's possible that a DML statement
                    // ran and ended up with timestamps later than this time. If we use a later
                    // timestamp, we'll need to run the partial index rebuilder here as it's
                    // possible that the updates to the table were made (such as deletes) after
                    // the scn which would not be properly reflected correctly this mechanism.
                    // See PHOENIX-4092.
                    mutationPlan.getContext().getScan().setTimeRange(minTimestamp, scn);
          } catch (IOException e) {
                    throw new SQLException(e);
          MutationState newMutationState =

indexStatement = FACTORY.alterIndex(FACTORY.namedTable(null,
                      TableName.create(index.getSchemaName().getString(), index.getTableName().getString())),
                        dataTableRef.getTable().getTableName().getString(), false, PIndexState.ACTIVE);

return state;

我们在回到 buildIndex 的主逻辑,简单来看就是执行完生成索引数据的 sql 之后,通过 alterIndex 命令修改索引的状态为 PIndexState.ACTIVE 。当然了,还有设置 Scan 的时间范围等其他逻辑,这里也先不深入研究了。


  1. 根据语句创建索引元数据信息,然后插入 Phoenix 系统表。

  2. 根据元数据信息,生成初始化索引数据的逻辑。


  1. 扩展索引类型。这样“全文索引”类型就会插入到 Phoenix 系统表。

  2. 将全文检索的参数保存到 Phoenix 系统表,如果 Phoenix 系统表不支持,则需要对其进行扩展。作者计划用 Solr 作为全文检索的引擎,那么 Solr 的连接信息就需要保存。

  3. 增加初始化索引数据的逻辑。需要读取源表索引列的数据,将其插入到全文检索引擎中,比如 Solr 或 ElasticSearch 。


另外,如何扩展当前的 DDL 语句也是问题,比如目前的创建索引的语法如下:

简单来看有两种方案增加全文检索类型,第一个就是扩展 DDL 语法,使其在 local 的基础之上增加 solr ;第二个就是把索引的类型保存到 indexOptions 中。具体选择哪种方案就后面再深入研究了。




在引用的文章 《Phoenix重磅 | Phoenix(云HBase SQL)核心功能原理及应用场景介绍》 中有提到,全局索引的维护分为两类:Multable 表、Immutable 表。Mutable 表全局索引在 HBase 的 RegionServer 完成,其实就是通过 HBase 的 Coprocessor 实现,是异步的;Immutable 表是在客户端完成,也就是写数据的同时写入索引。

那我们寻找 Phoenix 索引维护的源码时,就要从 Coprocessor 入手,也就是继承并实现 Coprocessor 的类。根据包名和继承关系,我们找到了类

 * Do all the work of managing index updates from a single coprocessor. All Puts/Delets are passed
 * to an {@link IndexBuilder} to determine the actual updates to make.
 * <p>
 * If the WAL is enabled, these updates are then added to the WALEdit and attempted to be written to
 * the WAL after the WALEdit has been saved. If any of the index updates fail, this server is
 * immediately terminated and we rely on WAL replay to attempt the index updates again (see
 * {@link #preWALRestore(ObserverContext, HRegionInfo, HLogKey, WALEdit)}).
 * <p>
 * If the WAL is disabled, the updates are attempted immediately. No consistency guarantees are made
 * if the WAL is disabled - some or none of the index updates may be successful. All updates in a
 * single batch must have the same durability level - either everything gets written to the WAL or
 * nothing does. Currently, we do not support mixed-durability updates within a single batch. If you
 * want to have different durability levels, you only need to split the updates into two different
 * batches.
 * <p>
 * We don't need to implement {@link #postPut(ObserverContext, Put, WALEdit, Durability)} and
 * {@link #postDelete(ObserverContext, Delete, WALEdit, Durability)} hooks because 
 * Phoenix always does batch mutations.
 * <p>
public class Indexer implements RegionObserver, RegionCoprocessor {

简单分析这个类的注释,就得知 Indexer 拦截数据的写入和删除,然后将先关数据传值给 IndexBuilder 类来完成实际的索引更新;Indexer 只拦截批量提交的数据更新操作。

public void start(CoprocessorEnvironment e) throws IOException 

public void stop(CoprocessorEnvironment e) throws IOException 

 * We use an Increment to serialize the ON DUPLICATE KEY clause so that the HBase plumbing
 * sets up the necessary locks and mvcc to allow an atomic update. The Increment is not a
 * real increment, though, it's really more of a Put. We translate the Increment into a
 * list of mutations, at most a single Put and Delete that are the changes upon executing
 * the list of ON DUPLICATE KEY clauses for this row.
public Result preIncrementAfterRowLock(final ObserverContext<RegionCoprocessorEnvironment> e,
        final Increment inc)

public void preBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c,
    MiniBatchOperationInProgress<Mutation> miniBatchOp)

public void postBatchMutateIndispensably(ObserverContext<RegionCoprocessorEnvironment> c,
    MiniBatchOperationInProgress<Mutation> miniBatchOp, final boolean success)

public void postOpen(final ObserverContext<RegionCoprocessorEnvironment> c)

public void preWALRestore(ObserverContext<RegionCoprocessorEnvironment> env, HRegionInfo info,
    HLogKey logKey, WALEdit logEdit) throws IOException

 * Create a custom {@link InternalScanner} for a compaction that tracks the versions of rows that
 * are removed so we can clean then up from the the index table(s).
 * <p>
 * This is not yet implemented - its not clear if we should even mess around with the Index table
 * for these rows as those points still existed. TODO: v2 of indexing
public InternalScanner preCompactScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c,
        final Store store, final List<? extends KeyValueScanner> scanners, final ScanType scanType,
        final long earliestPutTs, final InternalScanner s) throws IOException

分析其实现的 Coprocessor 各个函数,排除掉不重要的函数,我们着重分析 preBatchMutate:

public void preBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c,
      MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
       if (this.disabled) {
       long start = EnvironmentEdgeManager.currentTimeMillis();
       try {
            preBatchMutateWithExceptions(c, miniBatchOp);
       } catch (Throwable t) {
       } finally {
            long duration = EnvironmentEdgeManager.currentTimeMillis() - start;
            if (duration >= slowIndexPrepareThreshold) {
                if (LOGGER.isDebugEnabled()) {
                  LOGGER.debug(getCallTooSlowMessage("preBatchMutate", duration, slowIndexPrepareThreshold));
       throw new RuntimeException(
        "Somehow didn't return an index update but also didn't propagate the failure to the client!");

这段代码比较简单,重要的逻辑都在  preBatchMutateWithExceptions 函数中。

preBatchMutateWithExceptions 代码比较多,不再具体分析,但我们要特别关注代码中对 builder 对象的相关调用,因为这个类的类型是 IndexBuildManager ,所有索引维护的操作都由其完成。

 * Manage the building of index updates from primary table updates.
public class IndexBuildManager implements Stoppable {
       private final IndexBuilder delegate;

这个类有一个很重要的变量:delegate 。所有索引维护的具体实现又转发给了 delegate 。

 * Interface to build updates ({@link Mutation}s) to the index tables, based on the primary table
 * updates.
 * <p>
 * Either all the index updates will be applied to all tables or the primary table will kill itself
 * and will attempt to replay the index edits through the WAL replay mechanism.
public interface IndexBuilder extends Stoppable {

根据其定义,getIndexUpdate 函数完成最终的索引维护:

  * Your opportunity to update any/all index tables based on the update of the primary table row.
  * Its up to your implementation to ensure that timestamps match between the primary and index
  * tables.
  * <p>
  * The mutation is a generic mutation (not a {@link Put} or a {@link Delete}), as it actually
  * corresponds to a batch update. Its important to note that {@link Put}s always go through the
  * batch update code path, so a single {@link Put} will come through here and update the primary
  * table as the only update in the mutation.
  * <p>
  * Implementers must ensure that this method is thread-safe - it could (and probably will) be
  * called concurrently for different mutations, which may or may not be part of the same batch.
  * @param mutation update to the primary table to be indexed.
  * @param context index meta data for the mutation
  * @return a Map of the mutations to make -> target index table name
  * @throws IOException on failure
public Collection<Pair<Mutation, byte[]>> getIndexUpdate(Mutation mutation, IndexMetaData context) throws IOException;

我们在回到 preBatchMutateWithExceptions 函数,找到 getIndexUpdate 调用的地方。

// get the current span, or just use a null-span to avoid a bunch of if statements
try (TraceScope scope = Trace.startSpan("Starting to build index updates")) {
     Span current = scope.getSpan();
     if (current == null) {
         current = NullSpan.INSTANCE;
     long start = EnvironmentEdgeManager.currentTimeMillis();

     // get the index updates for all elements in this batch
     Collection<Pair<Mutation, byte[]>> indexUpdates =
                this.builder.getIndexUpdate(miniBatchOp, mutations);

     long duration = EnvironmentEdgeManager.currentTimeMillis() - start;
     if (duration >= slowIndexPrepareThreshold) {
         if (LOGGER.isDebugEnabled()) {
             LOGGER.debug(getCallTooSlowMessage("indexPrepare", duration, slowIndexPrepareThreshold));
     current.addTimelineAnnotation("Built index updates, doing preStep");
     TracingUtils.addAnnotation(current, "index update count", indexUpdates.size());
     byte[] tableName = c.getEnvironment().getRegion().getTableDescriptor().getTableName().getName();
     Iterator<Pair<Mutation, byte[]>> indexUpdatesItr = indexUpdates.iterator();
     List<Mutation> localUpdates = new ArrayList<Mutation>(indexUpdates.size());
     while(indexUpdatesItr.hasNext()) {
           Pair<Mutation, byte[]> next =;
           if (Bytes.compareTo(next.getSecond(), tableName) == 0) {
     if (!localUpdates.isEmpty()) {
                localUpdates.toArray(new Mutation[localUpdates.size()]));
     if (!indexUpdates.isEmpty()) {
         context.indexUpdates = indexUpdates;
         // write index updates to WAL
         if (durability != Durability.SKIP_WAL) {
             // we have all the WAL durability, so we just update the WAL entry and move on
             for (Pair<Mutation, byte[]> entry : indexUpdates) {
                  edit.add(new IndexedKeyValue(entry.getSecond(), entry.getFirst()));

“index update count” 这段英文消息,也更加证实了我们的分析,getIndexUpdate 函数维护了最终的索引数据。

delegate 最终由哪个类实现呢?

private static IndexBuilder getIndexBuilder(RegionCoprocessorEnvironment e) throws IOException {
        Configuration conf = e.getConfiguration();
        Class<? extends IndexBuilder> builderClass =
        conf.getClass(Indexer.INDEX_BUILDER_CONF_KEY, null, IndexBuilder.class);
        try {
             IndexBuilder builder = builderClass.newInstance();
             return builder;
        } catch (InstantiationException e1) {
            throw new IOException("Couldn't instantiate index builder:" + builderClass
                + ", disabling indexing on table " + e.getRegion().getTableDescriptor().getTableName().getNameAsString());
        } catch (IllegalAccessException e1) {
            throw new IOException("Couldn't instantiate index builder:" + builderClass
                + ", disabling indexing on table " + e.getRegion().getTableDescriptor().getTableName().getNameAsString());

这段代码由显示 “index.builder” 这个参数指定,我们看一下建表的参数:

'PHOENIX:CONTRACT_GPS', {TABLE_ATTRIBUTES => {coprocessor$1 => '|org.apache.phoenix.coprocessor.ScanRegionObserver|805306366|', coprocessor$2 => '|org.apache.phoenix.coprocessor.UngroupedAggregateRegionObserver|805306366|', coprocessor$3 => '|org.apache.phoenix.coprocessor.GroupedAggregateRegionObserver|805306366|', coprocessor$4 => '|org.apache.phoenix.coprocessor.ServerCachingEndpointImpl|805306366|', coprocessor$5 => '|org.apache.phoenix.hbase.index.Indexer|805306366|org.apache.hadoop.hbase.index.codec.class=org.apache.phoenix.index.PhoenixIndexCodec,index.builder=org.apache.phoenix.index.PhoenixIndexBuilder'}, {NAME => 'CF_1', BLOOMFILTER => 'NONE', DATA_BLOCK_ENCODING => 'FAST_DIFF'}

很明显 delegate 的实际类型就是:



  1. 创建索引元数据,插入系统表

  2. 根据索引元数据,构造当前数据的索引数据

  3. 通过 Coprocessor 拦截源表的数据变更,维护索引表数据。

当然上面三个步骤是非常简化的,还要很多细节没有理清,但这并不妨碍我们实现基于 Solr 或 ElasticSearch 的全文索引。


前两章我们分析了索引的创建和维护过程,后面我们会简要分析一下索引的使用,而在这之前需要先分析一下 PhoenixSQL 编译的过程。

与大部分的 SQL 引擎一样,Phoenix 也是用 ANTLR:

实现 SQL 编译的,那我们就要简要分析一下 PhoenixSQL.g 这个文件。

grammar PhoenixSQL;
tokens {}
@parser::header {}
@lexer::header {}
@parser::members {}
@rulecatch {}
@lexer::members {}

nextStatement returns [BindableStatement ret]

HINT_START: '/*+' ;


  1. tokens 。定义关键字。如果我们在 local 的基础上新增 full text 类型的索引,就需要在这里提前定义好。

  2. @parser::header、@lexer::header。在生成对应的类中,自动带上这些包路径。其实就是定义最终生成的 java 文件的头

  3. @parser::members、@lexer::members。其实就是扩展最终生成的parser类,简单来说就是在 java 文件中,插入一段代码

  4. @rulecatch。定义语法解析异常时的规则,根据文件定义,Phoenix选择直接抛出异常。

剩余部分就是定义 DSL 语法和词法。

根据 antlr 语法,这个文件会生成:


我们需要知道这三个类在哪里被使用,才方便找到客户端提供的 SQL 在哪里编译。当然了,直接 debug 其他类 ( 比如 PhoenixStatement ) 也是可以跟踪到的。



根据 POM 文件的 plugin 可以得知,编译后的文件在 target

target/generated-sources/antlr3 目录。感兴趣的读者可以看一下这三个类的结构,我们就不再过多介绍。

经过全文检索,我们找到 PhoenixSQLParser 创建的地方:

public SQLParser(String query, ParseNodeFactory factory) {
       PhoenixSQLLexer lexer;
       try {
            lexer = new PhoenixSQLLexer(new CaseInsensitiveReaderStream(new StringReader(query)));
       } catch (IOException e) {
            throw new RuntimeException(e); // Impossible
       CommonTokenStream cts = new CommonTokenStream(lexer);
       parser = new PhoenixSQLParser(cts);

SQLParser 构造函数中,这段代码还是比较重要的。这里设置了一个 factory 变量。下面是这个变量的类型和定义:

  * Factory used by parser to construct object model while parsing a SQL statement
public class ParseNodeFactory {

根据注释我们知道,它是把 SQL 语句转化为模型对象的。

// Parse a create index statement.
create_index_node returns [CreateIndexStatement ret]
    :   CREATE l=LOCAL? INDEX (IF NOT ex=EXISTS)? i=index_name ON t=from_table_name
        (LPAREN ik=ik_constraint RPAREN)
        (INCLUDE (LPAREN icrefs=column_names RPAREN))?
        (SPLIT ON v=value_expression_list)?
        {ret = factory.createIndex(i, factory.namedTable(null,t), ik, icrefs, v, p, ex!=null, l==null ? IndexType.getDefault() : IndexType.LOCAL, async != null, getBindCount(), new HashMap<String, UDFParseNode>(udfParseNodes)); }

上面是 create_index_node的antlr 定义语句,简单来看就是解析完语法之后,调用 factory.createIndex 创建并返回一个 CreateIndexStatement 。这样我们的 create index 语句就被 ParseNodeFactory 转化成了 CreateIndexStatement 对象。

如果我们要增加全文索引,就需要修改这个定义语句的。比如在 LOCAL 的地方添加 FULL TEXT ,在 SPLIT 后面添加 Solr 或 ElasticSearch 的连接属性等参数。同时修改 factory.createIndex 方法,接收这些参数。

编译后的 create_index_node 函数代码还是很长的,我们只看最后的返回逻辑,可以发现,跟预期的一样。

       if (state.backtracking==0) {
           ret = factory.createIndex(i, factory.namedTable(null,t), ik, icrefs, v, p, ex!=null, l==null ? IndexType.getDefault() : IndexType.LOCAL, async != null, getBindCount(), new HashMap<String, UDFParseNode>(udfParseNodes)); 
} catch (RecognitionException re) {
     throw re;
} finally {
     // do for sure before leaving
return ret;

SQL 解析编译的过程,相信大家已经有了简单的了解。下面就要研究 SQLParser 的初始化和创建的过程:

还是全文检索 SQLParser ,这样虽然比较笨,但还是非常直观的。细心的读者会发现,有很多地方会调用 SQLParser ,而且不同的场景会调用不同的方法。这就说明,Phoenix 在编译 SQL 时,会大概判断一下当前的 SQL 类型,DDL/DML 还是其他。

由于 SQLParser 使用的地方太多,如果一个个跟踪研究,过于繁琐。我们会转换一下思路,从源头开始,也就是编译 PreparedStatement 的地方,后面将会介绍。


