使用Calcite解析Sql做维表关联(一)

栏目: IT技术 · 发布时间: 4年前

内容简介:—END—

使用Calcite解析 <a href='https://www.codercto.com/topics/18630.html'>Sql</a> 做维表关联(一)

点击箭头处 “蓝色字” ,关注我们哦!!

维表关联是离线计算或者实时计算里面常见的一种处理逻辑,常常用于字段补齐、规则过滤等,一般情况下维表数据放在 MySql 等数据库里面,对于离线计算直接通过ETL方式加载到Hive表中,然后通过sql方式关联查询即可,但是对于实时计算中Flink、SparkStreaming的表都是抽象的、虚拟的表,那么就没法使用加载方式完成。透过维表服务系列里面讲到的维表关联都是使用编码方式完成,使用Map或者AsyncIO方式完成,但是这种硬编码方式开发效率很低,特别是在实时数仓里面,我们希望能够使用跟离线一样sql方式完成维表关联操作。

在Flink1.9中提供了使用sql化方式完成维表关联,只需要实现LookupableTableSource接口即可,可以实现同步或者异步关联。在1.9之前就需要自己实现sql语法解析,然后在转换为API方式,对上层提供sql语法。看一个sql语句:

select * from orders o join gdsInfo g on o.gdsId=g.gdsId

orders表示流表,gdsInfo 表示维表。根据sql解析顺序先 from 部分、然后where 部分、最后select,那么对于join 方式,相当于join生成了一张临时表,然后去select 这张临时表,因此可以确认

sql解析流程:

1. 识别出流表与维表

3. select 临时表

现在使用calcite解析这条语句

public class ParseDemo {


public static void main(String[] args) {

//假设gdsInfo就是维表

String sql = "select * from orders o join gdsInfo g on o.gdsId=g.gdsId";


SqlParser.Config config = SqlParser.configBuilder().setLex(Lex.MYSQL).build();

SqlParser sqlParser = SqlParser.create(sql, config);

SqlSelect sqlSelect = null;

try {

sqlSelect = (SqlSelect) sqlParser.parseStmt();

} catch (Exception e) {

e.printStackTrace();

}


SqlNode sqlFrom = sqlSelect.getFrom();

boolean isSideJoin = false;

String leftTable = "";

String rightTable = "";

String newName = ""; //临时表

SqlJoin sqlJoin = null;

//解析join

if (sqlFrom.getKind() == SqlKind.JOIN) {

sqlJoin = (SqlJoin) sqlFrom;

SqlNode left = sqlJoin.getLeft();

SqlNode right = sqlJoin.getRight();

isSideJoin = true;

leftTable = paserTableName(left);

rightTable = paserTableName(right);

}

//生成新的select

if (isSideJoin) {

newName = leftTable + "_" + rightTable;

SqlParserPos pos = new SqlParserPos(0, 0);

SqlIdentifier sqlIdentifier = new SqlIdentifier(newName, pos);

sqlSelect.setFrom(sqlIdentifier);

}

}

//解析表

private static String paserTableName(SqlNode tbl) {

if (tbl.getKind() == SqlKind.AS) {

SqlBasicCall sqlBasicCall = (SqlBasicCall) tbl;

return sqlBasicCall.operands[1].toString();

}

return ((SqlIdentifier) tbl).toString();

}

}

那么我们需要的就是生成新的select节点与SqlJoin节点,执行逻辑就是根据SqlJoin节点做维表关联之后生成新的表,然后去select这样新的表。

sql解析部分已经完成,既然使用sql化方式,因此也需要定义源表与维表,数据源一般是kafka, 定义源表需要:表名称、字段名称、字段类型、数据格式、topic;维表假设为mysql,需要定义:表名称、字段类型、字段名称、关联方式(同步/异步)、缓存方式(LRU/全部缓存、无缓存)。

源表定义:

CREATE TABLE orders(

orderId varchar,

gdsId varchar,

orderTime varchar

)WITH(

type = 'kafka',

kafka.bootstrap.servers = 'localhost:9092',

kafka.topic = 'topic1',

kafka.group.id = 'gId1',

sourcedatatype ='json'

);

维表定义:

CREATE TABLE gdsInfo(

gdsId varchar,

gdsName varchar,

price double

)WITH(

type='mysql',

url='jdbc:mysql://localhost:3306/paul',

userName='root',

password='123456',

tableName='gdsInfo',

cache = 'LRU',

isSideTable='true'

);

现在就是要如何解析这些语句,正则表达式是首选,需要解析出表名称、字段、属性三个部分:creat table xxx (xxx) with(xxx);正则表达式可为:

(?i)create\s+table\s+(\S+)\s*\((.+)\)\s*with\s*\((.+)\)

?i表示后面的匹配忽略大小写,\s+ 表示匹配多个空格,\S+表示匹配多个字符,.+ 表示匹配任意字符。

定义一个table类:

class TableInfo{

private String tableName; // 表名称

private Map<String,String> fieldsInfo; //字段名称->类型

private Properties props; //表属性

private boolean isSideTable; //是否为维表

}

public class ParseCreate {


public static final String REG_CREATE="(?i)create\\s+table\\s+(\\S+)\\s*\\((.+)\\)\\s*with\\s*\\((.+)\\)";


public static void main(String[] args) {


String createSql="CREATE TABLE orders(" + " orderId varchar," + " gdsId varchar,"

+ " orderTime varchar" + " )WITH(" + " type = 'kafka',"

+ " kafka.bootstrap.servers = 'localhost:9092'," + " kafka.topic = 'topic1',"

+ " kafka.group.id = 'gId1'," + " sourcedatatype ='json'" + " );";

Pattern pattern=Pattern.compile(REG_CREATE);


TableInfo tableInfo=new TableInfo();

Matcher matcher=pattern.matcher(createSql);

if(matcher.find()){

tableInfo.setTableName(matcher.group(1));

String fieldsStr=matcher.group(2);

String propsStr=matcher.group(3);

tableInfo.setFieldsInfo(parseFiles(fieldsStr));

tableInfo.setProps(parseProps(propsStr));

if(Boolean.valueOf(tableInfo.getProps().getProperty("isSideTable","false"))){

tableInfo.setSideTable(true);

}

}


}


public static Map<String,String> parseFiles(String fieldsStr){

Map<String,String> fieldsInfo=new HashMap<>();

String[] fieldsArray=fieldsStr.split(",");

for(String field: fieldsArray){

String[] fieldInfo=field.trim().split(" ");

fieldsInfo.put(fieldInfo[0],fieldInfo[1]);

}

return fieldsInfo;

}


public static Properties parseProps(String propsStr){

Properties props=new Properties();

String[] propsArray=propsStr.split(",");

for(String prop: propsArray){

String[] propInfo=prop.trim().split("=");

props.setProperty(propInfo[0],propInfo[1]);

}

return props;

}


}

至此完成了简易的create语句解析,下一篇将介绍如何将解析后的create与维表关联转换为可执行代码。

—END—

使用Calcite解析Sql做维表关联(一)

关注回复 Flink

获取更多系列

原创不易,好看,就点个"在看"


以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

微积分的历程

微积分的历程

William Dunham / 李伯民、汪军、张怀勇 / 人民邮电出版社 / 2010-8 / 29.00元

“微积分”这一名称最早出现在哪本书中?第一本微积分教科书又是谁人所写?微积分究竟是谁人发明的?著名的洛必达法则居然是伯努利的研究成果?谁被誉为“分析学的化身”?谁又被誉为“现代分析学之父”?哪些数学天才使微积分的创建过程终于画上完美的句号?……本书将带你一一探究上述问题。 本书宛如一座陈列室,汇聚了十多位数学大师的杰作,当你徜徉其中时会对人类的想象力惊叹不已,当你离去时必然满怀对天才们的钦佩......一起来看看 《微积分的历程》 这本书的介绍吧!

HTML 压缩/解压工具
HTML 压缩/解压工具

在线压缩/解压 HTML 代码

CSS 压缩/解压工具
CSS 压缩/解压工具

在线压缩/解压 CSS 代码

Base64 编码/解码
Base64 编码/解码

Base64 编码/解码