内容简介:Phoenix 索引不支持全文检索,我们需要修改源码达到这一目的。添加全文检索的前提是我们需要了解当前索引创建、维护的机制。今天将带大家,逐步探索 Phoenix 原生索引的源码。根据之前对 Phoenix 源码简略了解的经验来看,每一个 DDL/DML 等语句都会通过一个 ***Compiler 类编译成一个 MutationPlan ,创建索引的逻辑在:分析这段代码之后得到以下结论:
Phoenix 索引不支持全文检索,我们需要修改源码达到这一目的。添加全文检索的前提是我们需要了解当前索引创建、维护的机制。今天将带大家,逐步探索 Phoenix 原生索引的源码。
创建索引
根据之前对 Phoenix 源码简略了解的经验来看,每一个 DDL/DML 等语句都会通过一个 ***Compiler 类编译成一个 MutationPlan ,创建索引的逻辑在:
org.apache.phoenix.compile.CreateIndexCompiler
public MutationPlan compile(final CreateIndexStatement create) throws SQLException { final PhoenixConnection connection = statement.getConnection(); final ColumnResolver resolver = FromCompiler.getResolver(create, connection, create.getUdfParseNodes()); Scan scan = new Scan(); final StatementContext context = new StatementContext(statement, resolver, scan, new SequenceManager(statement)); ExpressionCompiler expressionCompiler = new ExpressionCompiler(context); List<ParseNode> splitNodes = create.getSplitNodes(); if (create.getIndexType() == IndexType.LOCAL) { if (!splitNodes.isEmpty()) { throw new SQLExceptionInfo.Builder(SQLExceptionCode.CANNOT_SPLIT_LOCAL_INDEX) .build().buildException(); } List<Pair<String, Object>> list = create.getProps() != null ? create.getProps().get("") : null; if (list != null) { for (Pair<String, Object> pair : list) { if (pair.getFirst().equals(PhoenixDatabaseMetaData.SALT_BUCKETS)) { throw new SQLExceptionInfo.Builder(SQLExceptionCode.CANNOT_SALT_LOCAL_INDEX) .build().buildException(); } } } } final byte[][] splits = new byte[splitNodes.size()][]; for (int i = 0; i < splits.length; i++) { ParseNode node = splitNodes.get(i); if (!node.isStateless()) { throw new SQLExceptionInfo.Builder(SQLExceptionCode.SPLIT_POINT_NOT_CONSTANT) .setMessage("Node: " + node).build().buildException(); } LiteralExpression expression = (LiteralExpression)node.accept(expressionCompiler); splits[i] = expression.getBytes(); } final MetaDataClient client = new MetaDataClient(connection); return new BaseMutationPlan(context, operation) { @Override public MutationState execute() throws SQLException { return client.createIndex(create, splits); } @Override public ExplainPlan getExplainPlan() throws SQLException { return new ExplainPlan(Collections.singletonList("CREATE INDEX")); } }; }
分析这段代码之后得到以下结论:
-
“create.getIndexType() == IndexType.LOCAL” 这段代码主要用来校验,可忽略,不再深入研究。
-
“for (int i = 0; i < splits.length; i++) ” 这段代码是有关 split 的逻辑,目前不用深入研究。
-
compile 函数最后返回了一个 BaseMutationPlan 类,这是最终的可执行计划,创建索引的逻辑在这里。
BaseMutationPlan 是最终可执行的执行计划,execute 函数最终创建索引,源码如下:
public MutationState execute() throws SQLException { return client.createIndex(create, splits); }
逻辑非常简单,就是调用 client 的 createIndex ,client 是根据当前连接创建的 MetaDataClient 类。createIndex 源码如下:
/** * Create an index table by morphing the CreateIndexStatement into a CreateTableStatement and calling * MetaDataClient.createTable. In doing so, we perform the following translations: * 1) Change the type of any columns being indexed to types that support null if the column is nullable. * For example, a BIGINT type would be coerced to a DECIMAL type, since a DECIMAL type supports null * when it's in the row key while a BIGINT does not. * 2) Append any row key column from the data table that is not in the indexed column list. Our indexes * rely on having a 1:1 correspondence between the index and data rows. * 3) Change the name of the columns to include the column family. For example, if you have a column * named "B" in a column family named "A", the indexed column name will be "A:B". This makes it easy * to translate the column references in a query to the correct column references in an index table * regardless of whether the column reference is prefixed with the column family name or not. It also * has the side benefit of allowing the same named column in different column families to both be * listed as an index column. * @param statement * @param splits * @return MutationState from population of index table from data table * @throws SQLException */ public MutationState createIndex(CreateIndexStatement statement, byte[][] splits) throws SQLException { ... }
通过这段注释我们知道,其实创建一个索引,就是通过 CreateTableStatement 来实现的,这期间有以下三个转换:
-
把索引字段转换成支持 null 值的类型。比如会把 bigint 转换成 decimal 。
-
把数据表的 row key 字段添加到索引字段列表。索引的数据与源表数据是11的关系
-
修改索引字段名包含列簇名。索引名的格式就是:
columnFamilyName:columnName
比如列簇 A 里面有一个 B 字段,那么索引列的名称就是 “A:B” 。主要是为了引用方便和防止同名。
这个函数逻辑很复杂,我们先往下继续看,不过多涉及细节,先梳理其结构。不过这些忽略的代码,应该就是实现上面3点的。
我们来看 createIndex 的 return ,这里又调用一个函数 : buildIndex 。
buildIndex 的代码也比较长,其中一个比较重要的代码逻辑如下:
MutationPlan mutationPlan = getMutationPlanForBuildingIndex(index, dataTableRef);
getMutationPlanForBuildingIndex 源码如下:
private MutationPlan getMutationPlanForBuildingIndex(PTable index, TableRef dataTableRef) throws SQLException { if (index.getIndexType() == IndexType.LOCAL) { PostLocalIndexDDLCompiler compiler = new PostLocalIndexDDLCompiler(connection, getFullTableName(dataTableRef)); return compiler.compile(index); } else if (dataTableRef.getTable().isTransactional()){ PostIndexDDLCompiler compiler = new PostIndexDDLCompiler(connection, dataTableRef); return compiler.compile(index); } else { ServerBuildIndexCompiler compiler = new ServerBuildIndexCompiler(connection, getFullTableName(dataTableRef)); return compiler.compile(index); } }
为了简化分析流程,我们不分析 local 的索引。这段代码其实就是创建了一个 PostIndexDDLCompiler 类。
/** * Class that compiles plan to generate initial data values after a DDL command for * index table. */ public class PostIndexDDLCompiler { ... }
官方注释也讲解的很清楚了,这个执行计划是为了给索引表生成初始化数据。其实创建索引分两步,第一步向系统表插入索引的元数据信息,第二步根据索引元数据生成索引数据。PostIndexDDLCompiler 就是用来生成初始化数据的。
StringBuilder updateStmtStr = new StringBuilder(); updateStmtStr.append("UPSERT /*+ NO_INDEX */ INTO ").append(schemaName.length() == 0 ? "" : '"' + schemaName + "\".").append('"').append(tableName).append("\"(") .append(indexColumns).append(") "); final StringBuilder selectQueryBuilder = new StringBuilder(); selectQueryBuilder.append(" SELECT /*+ NO_INDEX */ ").append(dataColumns).append(" FROM ") .append(schemaName.length() == 0 ? "" : '"' + schemaName + "\".").append('"').append(dataTable.getTableName().getString()).append('"'); this.selectQuery = selectQueryBuilder.toString(); updateStmtStr.append(this.selectQuery); try (final PhoenixStatement statement = new PhoenixStatement(connection)) { DelegateMutationPlan delegate = new DelegateMutationPlan(statement.compileMutation(updateStmtStr.toString())) { @Override public MutationState execute() throws SQLException { connection.getMutationState() .commitDDLFence(dataTable); return super.execute(); } }; return delegate; }
根据经验,我们还是只看 compile 函数,上面是 compile 函数的重点逻辑,其实就是生成了一个 upsert select 的 SQL ,然后执行:
Scan scan = mutationPlan.getContext().getScan(); Long scn = connection.getSCN(); try { if (ScanUtil.isDefaultTimeRange(scan.getTimeRange())) { if (scn == null) { scn = mutationPlan.getContext().getCurrentTime(); } scan.setTimeRange(dataTableRef.getLowerBoundTimeStamp(), scn); } } catch (IOException e) { throw new SQLException(e); } // execute index population upsert select long startTime = EnvironmentEdgeManager.currentTimeMillis(); MutationState state = connection.getQueryServices().updateData(mutationPlan); long firstUpsertSelectTime = EnvironmentEdgeManager.currentTimeMillis() - startTime; // for global indexes on non transactional tables we might have to // run a second index population upsert select to handle data rows // that were being written on the server while the index was created. // TODO: this sleep time is really arbitrary. If any query is in progress // while the index is being built, we're depending on this sleep // waiting them out. Instead we should have a means of waiting until // all in progress queries are complete (though I'm not sure that's // feasible). See PHOENIX-4092. long sleepTime = connection.getQueryServices().getProps() .getLong(QueryServices.INDEX_POPULATION_SLEEP_TIME, QueryServicesOptions.DEFAULT_INDEX_POPULATION_SLEEP_TIME); if (!dataTableRef.getTable().isTransactional() && sleepTime > 0) { long delta = sleepTime - firstUpsertSelectTime; if (delta > 0) { try { Thread.sleep(delta); } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new SQLExceptionInfo.Builder(SQLExceptionCode.INTERRUPTED_EXCEPTION) .setRootCause(e).build().buildException(); } } // set the min timestamp of second index upsert select some time before the index // was created long minTimestamp = index.getTimeStamp() - firstUpsertSelectTime; try { // TODO: Use scn or LATEST_TIMESTAMP here? It's possible that a DML statement // ran and ended up with timestamps later than this time. If we use a later // timestamp, we'll need to run the partial index rebuilder here as it's // possible that the updates to the table were made (such as deletes) after // the scn which would not be properly reflected correctly this mechanism. // See PHOENIX-4092. mutationPlan.getContext().getScan().setTimeRange(minTimestamp, scn); } catch (IOException e) { throw new SQLException(e); } MutationState newMutationState = connection.getQueryServices().updateData(mutationPlan); state.join(newMutationState); } indexStatement = FACTORY.alterIndex(FACTORY.namedTable(null, TableName.create(index.getSchemaName().getString(), index.getTableName().getString())), dataTableRef.getTable().getTableName().getString(), false, PIndexState.ACTIVE); alterIndex(indexStatement); return state;
我们在回到 buildIndex 的主逻辑,简单来看就是执行完生成索引数据的 sql 之后,通过 alterIndex 命令修改索引的状态为 PIndexState.ACTIVE 。当然了,还有设置 Scan 的时间范围等其他逻辑,这里也先不深入研究了。
那我们就总结一下创建索引的过程,大概分两步:
-
根据语句创建索引元数据信息,然后插入 Phoenix 系统表。
-
根据元数据信息,生成初始化索引数据的逻辑。
如果要增加全文检索的类型,创建索引时需要哪些逻辑呢?
-
扩展索引类型。这样“全文索引”类型就会插入到 Phoenix 系统表。
-
将全文检索的参数保存到 Phoenix 系统表,如果 Phoenix 系统表不支持,则需要对其进行扩展。作者计划用 Solr 作为全文检索的引擎,那么 Solr 的连接信息就需要保存。
-
增加初始化索引数据的逻辑。需要读取源表索引列的数据,将其插入到全文检索引擎中,比如 Solr 或 ElasticSearch 。
那么是不是增加全文检索的类型,到此就结束了呢?远不止。这里只是大概分析了创建索引的过程,创建之后该如何根据数据的增、删、改自动维护全文检索的数据也是难题。
另外,如何扩展当前的 DDL 语句也是问题,比如目前的创建索引的语法如下:
简单来看有两种方案增加全文检索类型,第一个就是扩展 DDL 语法,使其在 local 的基础之上增加 solr ;第二个就是把索引的类型保存到 indexOptions 中。具体选择哪种方案就后面再深入研究了。
到此为止,我们就初步分析了索引创建的过程,后续会继续研究其他相关的逻辑。这里也只是抛砖引玉,如果大家有其他好的方案,欢迎留言讨论。
全局索引写
上一章我们简要分析了索引的创建流程,本章节分析全局索引写的时序和逻辑。
在引用的文章 《Phoenix重磅 | Phoenix(云HBase SQL)核心功能原理及应用场景介绍》 中有提到,全局索引的维护分为两类:Multable 表、Immutable 表。Mutable 表全局索引在 HBase 的 RegionServer 完成,其实就是通过 HBase 的 Coprocessor 实现,是异步的;Immutable 表是在客户端完成,也就是写数据的同时写入索引。
那我们寻找 Phoenix 索引维护的源码时,就要从 Coprocessor 入手,也就是继承并实现 Coprocessor 的类。根据包名和继承关系,我们找到了类 :
org.apache.phoenix.hbase.index.Indexer
/** * Do all the work of managing index updates from a single coprocessor. All Puts/Delets are passed * to an {@link IndexBuilder} to determine the actual updates to make. * <p> * If the WAL is enabled, these updates are then added to the WALEdit and attempted to be written to * the WAL after the WALEdit has been saved. If any of the index updates fail, this server is * immediately terminated and we rely on WAL replay to attempt the index updates again (see * {@link #preWALRestore(ObserverContext, HRegionInfo, HLogKey, WALEdit)}). * <p> * If the WAL is disabled, the updates are attempted immediately. No consistency guarantees are made * if the WAL is disabled - some or none of the index updates may be successful. All updates in a * single batch must have the same durability level - either everything gets written to the WAL or * nothing does. Currently, we do not support mixed-durability updates within a single batch. If you * want to have different durability levels, you only need to split the updates into two different * batches. * <p> * We don't need to implement {@link #postPut(ObserverContext, Put, WALEdit, Durability)} and * {@link #postDelete(ObserverContext, Delete, WALEdit, Durability)} hooks because * Phoenix always does batch mutations. * <p> */ public class Indexer implements RegionObserver, RegionCoprocessor { ... }
简单分析这个类的注释,就得知 Indexer 拦截数据的写入和删除,然后将先关数据传值给 IndexBuilder 类来完成实际的索引更新;Indexer 只拦截批量提交的数据更新操作。
@Override public void start(CoprocessorEnvironment e) throws IOException @Override public void stop(CoprocessorEnvironment e) throws IOException /** * We use an Increment to serialize the ON DUPLICATE KEY clause so that the HBase plumbing * sets up the necessary locks and mvcc to allow an atomic update. The Increment is not a * real increment, though, it's really more of a Put. We translate the Increment into a * list of mutations, at most a single Put and Delete that are the changes upon executing * the list of ON DUPLICATE KEY clauses for this row. */ @Override public Result preIncrementAfterRowLock(final ObserverContext<RegionCoprocessorEnvironment> e, final Increment inc) @Override public void preBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c, MiniBatchOperationInProgress<Mutation> miniBatchOp) @Override public void postBatchMutateIndispensably(ObserverContext<RegionCoprocessorEnvironment> c, MiniBatchOperationInProgress<Mutation> miniBatchOp, final boolean success) @Override public void postOpen(final ObserverContext<RegionCoprocessorEnvironment> c) @Override public void preWALRestore(ObserverContext<RegionCoprocessorEnvironment> env, HRegionInfo info, HLogKey logKey, WALEdit logEdit) throws IOException /** * Create a custom {@link InternalScanner} for a compaction that tracks the versions of rows that * are removed so we can clean then up from the the index table(s). * <p> * This is not yet implemented - its not clear if we should even mess around with the Index table * for these rows as those points still existed. TODO: v2 of indexing */ @Override public InternalScanner preCompactScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c, final Store store, final List<? extends KeyValueScanner> scanners, final ScanType scanType, final long earliestPutTs, final InternalScanner s) throws IOException
分析其实现的 Coprocessor 各个函数,排除掉不重要的函数,我们着重分析 preBatchMutate:
public void preBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c, MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException { if (this.disabled) { return; } long start = EnvironmentEdgeManager.currentTimeMillis(); try { preBatchMutateWithExceptions(c, miniBatchOp); return; } catch (Throwable t) { rethrowIndexingException(t); } finally { long duration = EnvironmentEdgeManager.currentTimeMillis() - start; if (duration >= slowIndexPrepareThreshold) { if (LOGGER.isDebugEnabled()) { LOGGER.debug(getCallTooSlowMessage("preBatchMutate", duration, slowIndexPrepareThreshold)); } metricSource.incrementNumSlowIndexPrepareCalls(); } metricSource.updateIndexPrepareTime(duration); } throw new RuntimeException( "Somehow didn't return an index update but also didn't propagate the failure to the client!"); }
这段代码比较简单,重要的逻辑都在 preBatchMutateWithExceptions 函数中。
preBatchMutateWithExceptions 代码比较多,不再具体分析,但我们要特别关注代码中对 builder 对象的相关调用,因为这个类的类型是 IndexBuildManager ,所有索引维护的操作都由其完成。
/** * Manage the building of index updates from primary table updates. */ public class IndexBuildManager implements Stoppable { private final IndexBuilder delegate; }
这个类有一个很重要的变量:delegate 。所有索引维护的具体实现又转发给了 delegate 。
/** * Interface to build updates ({@link Mutation}s) to the index tables, based on the primary table * updates. * <p> * Either all the index updates will be applied to all tables or the primary table will kill itself * and will attempt to replay the index edits through the WAL replay mechanism. */ public interface IndexBuilder extends Stoppable { ... }
根据其定义,getIndexUpdate 函数完成最终的索引维护:
/** * Your opportunity to update any/all index tables based on the update of the primary table row. * Its up to your implementation to ensure that timestamps match between the primary and index * tables. * <p> * The mutation is a generic mutation (not a {@link Put} or a {@link Delete}), as it actually * corresponds to a batch update. Its important to note that {@link Put}s always go through the * batch update code path, so a single {@link Put} will come through here and update the primary * table as the only update in the mutation. * <p> * Implementers must ensure that this method is thread-safe - it could (and probably will) be * called concurrently for different mutations, which may or may not be part of the same batch. * @param mutation update to the primary table to be indexed. * @param context index meta data for the mutation * @return a Map of the mutations to make -> target index table name * @throws IOException on failure */ public Collection<Pair<Mutation, byte[]>> getIndexUpdate(Mutation mutation, IndexMetaData context) throws IOException;
我们在回到 preBatchMutateWithExceptions 函数,找到 getIndexUpdate 调用的地方。
// get the current span, or just use a null-span to avoid a bunch of if statements try (TraceScope scope = Trace.startSpan("Starting to build index updates")) { Span current = scope.getSpan(); if (current == null) { current = NullSpan.INSTANCE; } long start = EnvironmentEdgeManager.currentTimeMillis(); // get the index updates for all elements in this batch Collection<Pair<Mutation, byte[]>> indexUpdates = this.builder.getIndexUpdate(miniBatchOp, mutations); long duration = EnvironmentEdgeManager.currentTimeMillis() - start; if (duration >= slowIndexPrepareThreshold) { if (LOGGER.isDebugEnabled()) { LOGGER.debug(getCallTooSlowMessage("indexPrepare", duration, slowIndexPrepareThreshold)); } metricSource.incrementNumSlowIndexPrepareCalls(); } metricSource.updateIndexPrepareTime(duration); current.addTimelineAnnotation("Built index updates, doing preStep"); TracingUtils.addAnnotation(current, "index update count", indexUpdates.size()); byte[] tableName = c.getEnvironment().getRegion().getTableDescriptor().getTableName().getName(); Iterator<Pair<Mutation, byte[]>> indexUpdatesItr = indexUpdates.iterator(); List<Mutation> localUpdates = new ArrayList<Mutation>(indexUpdates.size()); while(indexUpdatesItr.hasNext()) { Pair<Mutation, byte[]> next = indexUpdatesItr.next(); if (Bytes.compareTo(next.getSecond(), tableName) == 0) { localUpdates.add(next.getFirst()); indexUpdatesItr.remove(); } } if (!localUpdates.isEmpty()) { miniBatchOp.addOperationsFromCP(0, localUpdates.toArray(new Mutation[localUpdates.size()])); } if (!indexUpdates.isEmpty()) { context.indexUpdates = indexUpdates; // write index updates to WAL if (durability != Durability.SKIP_WAL) { // we have all the WAL durability, so we just update the WAL entry and move on for (Pair<Mutation, byte[]> entry : indexUpdates) { edit.add(new IndexedKeyValue(entry.getSecond(), entry.getFirst())); } } } }
“index update count” 这段英文消息,也更加证实了我们的分析,getIndexUpdate 函数维护了最终的索引数据。
delegate 最终由哪个类实现呢?
private static IndexBuilder getIndexBuilder(RegionCoprocessorEnvironment e) throws IOException { Configuration conf = e.getConfiguration(); Class<? extends IndexBuilder> builderClass = conf.getClass(Indexer.INDEX_BUILDER_CONF_KEY, null, IndexBuilder.class); try { IndexBuilder builder = builderClass.newInstance(); builder.setup(e); return builder; } catch (InstantiationException e1) { throw new IOException("Couldn't instantiate index builder:" + builderClass + ", disabling indexing on table " + e.getRegion().getTableDescriptor().getTableName().getNameAsString()); } catch (IllegalAccessException e1) { throw new IOException("Couldn't instantiate index builder:" + builderClass + ", disabling indexing on table " + e.getRegion().getTableDescriptor().getTableName().getNameAsString()); } }
这段代码由显示 “index.builder” 这个参数指定,我们看一下建表的参数:
'PHOENIX:CONTRACT_GPS', {TABLE_ATTRIBUTES => {coprocessor$1 => '|org.apache.phoenix.coprocessor.ScanRegionObserver|805306366|', coprocessor$2 => '|org.apache.phoenix.coprocessor.UngroupedAggregateRegionObserver|805306366|', coprocessor$3 => '|org.apache.phoenix.coprocessor.GroupedAggregateRegionObserver|805306366|', coprocessor$4 => '|org.apache.phoenix.coprocessor.ServerCachingEndpointImpl|805306366|', coprocessor$5 => '|org.apache.phoenix.hbase.index.Indexer|805306366|org.apache.hadoop.hbase.index.codec.class=org.apache.phoenix.index.PhoenixIndexCodec,index.builder=org.apache.phoenix.index.PhoenixIndexBuilder'}, {NAME => 'CF_1', BLOOMFILTER => 'NONE', DATA_BLOCK_ENCODING => 'FAST_DIFF'}
很明显 delegate 的实际类型就是:
org.apache.phoenix.index.PhoenixIndexBuilder
其实分析到这里我们就不再深入研究,结合上一章节,我们简要总结一下索引创建和维护的过程:
-
创建索引元数据,插入系统表
-
根据索引元数据,构造当前数据的索引数据
-
通过 Coprocessor 拦截源表的数据变更,维护索引表数据。
当然上面三个步骤是非常简化的,还要很多细节没有理清,但这并不妨碍我们实现基于 Solr 或 ElasticSearch 的全文索引。
最后贴一张引用的图,作为此次分析的结束。
索引使用
前两章我们分析了索引的创建和维护过程,后面我们会简要分析一下索引的使用,而在这之前需要先分析一下 PhoenixSQL 编译的过程。
与大部分的 SQL 引擎一样,Phoenix 也是用 ANTLR:
https://www.antlr.org
实现 SQL 编译的,那我们就要简要分析一下 PhoenixSQL.g 这个文件。
grammar PhoenixSQL; tokens {} @parser::header {} @lexer::header {} @parser::members {} @rulecatch {} @lexer::members {} nextStatement returns [BindableStatement ret] ...... HINT_START: '/*+' ; .... fragment FIELDCHAR ......
其中有几个地方需要注意:
-
tokens 。定义关键字。如果我们在 local 的基础上新增 full text 类型的索引,就需要在这里提前定义好。
-
@parser::header、@lexer::header。在生成对应的类中,自动带上这些包路径。其实就是定义最终生成的 java 文件的头
-
@parser::members、@lexer::members。其实就是扩展最终生成的parser类,简单来说就是在 java 文件中,插入一段代码
-
@rulecatch。定义语法解析异常时的规则,根据文件定义,Phoenix选择直接抛出异常。
剩余部分就是定义 DSL 语法和词法。
根据 antlr 语法,这个文件会生成:
PhoenixSQL.tokens/PhoenixSQLLexer/PhoenixSQLParser
我们需要知道这三个类在哪里被使用,才方便找到客户端提供的 SQL 在哪里编译。当然了,直接 debug 其他类 ( 比如 PhoenixStatement ) 也是可以跟踪到的。
<plugin> <groupId>org.antlr</groupId> <artifactId>antlr3-maven-plugin</artifactId> <version>3.5.2</version> <executions> <execution> <goals> <goal>antlr</goal> </goals> </execution> </executions> <configuration> <outputDirectory>${antlr-output.dir}/org/apache/phoenix/parse</outputDirectory> </configuration> </plugin> <antlr-output.dir>target/generated-sources/antlr3</antlr-output.dir>
根据 POM 文件的 plugin 可以得知,编译后的文件在 target
target/generated-sources/antlr3 目录。感兴趣的读者可以看一下这三个类的结构,我们就不再过多介绍。
经过全文检索,我们找到 PhoenixSQLParser 创建的地方:
org.apache.phoenix.parse.SQLParser
public SQLParser(String query, ParseNodeFactory factory) { PhoenixSQLLexer lexer; try { lexer = new PhoenixSQLLexer(new CaseInsensitiveReaderStream(new StringReader(query))); } catch (IOException e) { throw new RuntimeException(e); // Impossible } CommonTokenStream cts = new CommonTokenStream(lexer); parser = new PhoenixSQLParser(cts); parser.setParseNodeFactory(factory); }
SQLParser 构造函数中,这段代码还是比较重要的。这里设置了一个 factory 变量。下面是这个变量的类型和定义:
/** * * Factory used by parser to construct object model while parsing a SQL statement */ public class ParseNodeFactory { ... }
根据注释我们知道,它是把 SQL 语句转化为模型对象的。
// Parse a create index statement. create_index_node returns [CreateIndexStatement ret] : CREATE l=LOCAL? INDEX (IF NOT ex=EXISTS)? i=index_name ON t=from_table_name (LPAREN ik=ik_constraint RPAREN) (INCLUDE (LPAREN icrefs=column_names RPAREN))? (async=ASYNC)? (p=fam_properties)? (SPLIT ON v=value_expression_list)? {ret = factory.createIndex(i, factory.namedTable(null,t), ik, icrefs, v, p, ex!=null, l==null ? IndexType.getDefault() : IndexType.LOCAL, async != null, getBindCount(), new HashMap<String, UDFParseNode>(udfParseNodes)); } ;
上面是 create_index_node的antlr 定义语句,简单来看就是解析完语法之后,调用 factory.createIndex 创建并返回一个 CreateIndexStatement 。这样我们的 create index 语句就被 ParseNodeFactory 转化成了 CreateIndexStatement 对象。
如果我们要增加全文索引,就需要修改这个定义语句的。比如在 LOCAL 的地方添加 FULL TEXT ,在 SPLIT 后面添加 Solr 或 ElasticSearch 的连接属性等参数。同时修改 factory.createIndex 方法,接收这些参数。
编译后的 create_index_node 函数代码还是很长的,我们只看最后的返回逻辑,可以发现,跟预期的一样。
try{ { ... if (state.backtracking==0) { ret = factory.createIndex(i, factory.namedTable(null,t), ik, icrefs, v, p, ex!=null, l==null ? IndexType.getDefault() : IndexType.LOCAL, async != null, getBindCount(), new HashMap<String, UDFParseNode>(udfParseNodes)); } } } catch (RecognitionException re) { throw re; } finally { // do for sure before leaving } return ret;
SQL 解析编译的过程,相信大家已经有了简单的了解。下面就要研究 SQLParser 的初始化和创建的过程:
还是全文检索 SQLParser ,这样虽然比较笨,但还是非常直观的。细心的读者会发现,有很多地方会调用 SQLParser ,而且不同的场景会调用不同的方法。这就说明,Phoenix 在编译 SQL 时,会大概判断一下当前的 SQL 类型,DDL/DML 还是其他。
由于 SQLParser 使用的地方太多,如果一个个跟踪研究,过于繁琐。我们会转换一下思路,从源头开始,也就是编译 PreparedStatement 的地方,后面将会介绍。
大家工作学习遇到HBase技术问题,把问题发布到HBase技术社区论坛http://hbase.group,欢迎大家论坛上面提问留言讨论。想了解更多HBase技术关注HBase技术社区公众号(微信号:hbasegroup),非常欢迎大家积极投稿。
本群为HBase+Spark技术交流讨论,整合最优质的专家资源和技术资料会定期开展线下技术沙龙,专家技术直播,专家答疑活动
点击链接钉钉入群:https://dwz.cn/Fvqv066s或扫码进群
本群为Cassandra技术交流讨论,整合最优质的专家资源和技术资料会定期开展线下技术沙龙,专家技术直播,专家答疑活动
Cassandra 社区钉钉大群:https://c.tb.cn/F3.ZRTY0o
Cassandra 技术社区微信公众号:
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网
猜你喜欢:- Phoenix解读 | Phoenix源码解读之SQL
- Redux 源码解读 —— 从源码开始学 Redux
- AQS源码详细解读
- SDWebImage源码解读《一》
- MJExtension源码解读
- axios 核心源码解读
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
Weaving the Web
Tim Berners-Lee / Harper Paperbacks / 2000-11-01 / USD 15.00
Named one of the greatest minds of the 20th century by Time , Tim Berners-Lee is responsible for one of that century's most important advancements: the world wide web. Now, this low-profile genius-wh......一起来看看 《Weaving the Web》 这本书的介绍吧!