内容简介:—END—
点击箭头处 “蓝色字” ,关注我们哦!!
维表关联是离线计算或者实时计算里面常见的一种处理逻辑,常常用于字段补齐、规则过滤等,一般情况下维表数据放在 MySql 等数据库里面,对于离线计算直接通过ETL方式加载到Hive表中,然后通过sql方式关联查询即可,但是对于实时计算中Flink、SparkStreaming的表都是抽象的、虚拟的表,那么就没法使用加载方式完成。透过维表服务系列里面讲到的维表关联都是使用编码方式完成,使用Map或者AsyncIO方式完成,但是这种硬编码方式开发效率很低,特别是在实时数仓里面,我们希望能够使用跟离线一样sql方式完成维表关联操作。
在Flink1.9中提供了使用sql化方式完成维表关联,只需要实现LookupableTableSource接口即可,可以实现同步或者异步关联。在1.9之前就需要自己实现sql语法解析,然后在转换为API方式,对上层提供sql语法。看一个sql语句:
select * from orders o join gdsInfo g on o.gdsId=g.gdsId
orders表示流表,gdsInfo 表示维表。根据sql解析顺序先 from 部分、然后where 部分、最后select,那么对于join 方式,相当于join生成了一张临时表,然后去select 这张临时表,因此可以确认
sql解析流程:
1. 识别出流表与维表
3. select 临时表
现在使用calcite解析这条语句
public class ParseDemo {
public static void main(String[] args) {
//假设gdsInfo就是维表
String sql = "select * from orders o join gdsInfo g on o.gdsId=g.gdsId";
SqlParser.Config config = SqlParser.configBuilder().setLex(Lex.MYSQL).build();
SqlParser sqlParser = SqlParser.create(sql, config);
SqlSelect sqlSelect = null;
try {
sqlSelect = (SqlSelect) sqlParser.parseStmt();
} catch (Exception e) {
e.printStackTrace();
}
SqlNode sqlFrom = sqlSelect.getFrom();
boolean isSideJoin = false;
String leftTable = "";
String rightTable = "";
String newName = ""; //临时表
SqlJoin sqlJoin = null;
//解析join
if (sqlFrom.getKind() == SqlKind.JOIN) {
sqlJoin = (SqlJoin) sqlFrom;
SqlNode left = sqlJoin.getLeft();
SqlNode right = sqlJoin.getRight();
isSideJoin = true;
leftTable = paserTableName(left);
rightTable = paserTableName(right);
}
//生成新的select
if (isSideJoin) {
newName = leftTable + "_" + rightTable;
SqlParserPos pos = new SqlParserPos(0, 0);
SqlIdentifier sqlIdentifier = new SqlIdentifier(newName, pos);
sqlSelect.setFrom(sqlIdentifier);
}
}
//解析表
private static String paserTableName(SqlNode tbl) {
if (tbl.getKind() == SqlKind.AS) {
SqlBasicCall sqlBasicCall = (SqlBasicCall) tbl;
return sqlBasicCall.operands[1].toString();
}
return ((SqlIdentifier) tbl).toString();
}
}
那么我们需要的就是生成新的select节点与SqlJoin节点,执行逻辑就是根据SqlJoin节点做维表关联之后生成新的表,然后去select这样新的表。
sql解析部分已经完成,既然使用sql化方式,因此也需要定义源表与维表,数据源一般是kafka, 定义源表需要:表名称、字段名称、字段类型、数据格式、topic;维表假设为mysql,需要定义:表名称、字段类型、字段名称、关联方式(同步/异步)、缓存方式(LRU/全部缓存、无缓存)。
源表定义:
CREATE TABLE orders(
orderId varchar,
gdsId varchar,
orderTime varchar
)WITH(
type = 'kafka',
kafka.bootstrap.servers = 'localhost:9092',
kafka.topic = 'topic1',
kafka.group.id = 'gId1',
sourcedatatype ='json'
);
维表定义:
CREATE TABLE gdsInfo(
gdsId varchar,
gdsName varchar,
price double
)WITH(
type='mysql',
url='jdbc:mysql://localhost:3306/paul',
userName='root',
password='123456',
tableName='gdsInfo',
cache = 'LRU',
isSideTable='true'
);
现在就是要如何解析这些语句,正则表达式是首选,需要解析出表名称、字段、属性三个部分:creat table xxx (xxx) with(xxx);正则表达式可为:
(?i)create\s+table\s+(\S+)\s*\((.+)\)\s*with\s*\((.+)\)
?i表示后面的匹配忽略大小写,\s+ 表示匹配多个空格,\S+表示匹配多个字符,.+ 表示匹配任意字符。
定义一个table类:
class TableInfo{
private String tableName; // 表名称
private Map<String,String> fieldsInfo; //字段名称->类型
private Properties props; //表属性
private boolean isSideTable; //是否为维表
}
public class ParseCreate {
public static final String REG_CREATE="(?i)create\\s+table\\s+(\\S+)\\s*\\((.+)\\)\\s*with\\s*\\((.+)\\)";
public static void main(String[] args) {
String createSql="CREATE TABLE orders(" + " orderId varchar," + " gdsId varchar,"
+ " orderTime varchar" + " )WITH(" + " type = 'kafka',"
+ " kafka.bootstrap.servers = 'localhost:9092'," + " kafka.topic = 'topic1',"
+ " kafka.group.id = 'gId1'," + " sourcedatatype ='json'" + " );";
Pattern pattern=Pattern.compile(REG_CREATE);
TableInfo tableInfo=new TableInfo();
Matcher matcher=pattern.matcher(createSql);
if(matcher.find()){
tableInfo.setTableName(matcher.group(1));
String fieldsStr=matcher.group(2);
String propsStr=matcher.group(3);
tableInfo.setFieldsInfo(parseFiles(fieldsStr));
tableInfo.setProps(parseProps(propsStr));
if(Boolean.valueOf(tableInfo.getProps().getProperty("isSideTable","false"))){
tableInfo.setSideTable(true);
}
}
}
public static Map<String,String> parseFiles(String fieldsStr){
Map<String,String> fieldsInfo=new HashMap<>();
String[] fieldsArray=fieldsStr.split(",");
for(String field: fieldsArray){
String[] fieldInfo=field.trim().split(" ");
fieldsInfo.put(fieldInfo[0],fieldInfo[1]);
}
return fieldsInfo;
}
public static Properties parseProps(String propsStr){
Properties props=new Properties();
String[] propsArray=propsStr.split(",");
for(String prop: propsArray){
String[] propInfo=prop.trim().split("=");
props.setProperty(propInfo[0],propInfo[1]);
}
return props;
}
}
至此完成了简易的create语句解析,下一篇将介绍如何将解析后的create与维表关联转换为可执行代码。
—END—
关注回复 Flink
获取更多系列
原创不易,好看,就点个"在看"
以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网
猜你喜欢:- 使用Calcite解析Sql做维表关联(二)
- Flink 维表关联系列之 Kafka 维表关联:广播方式
- Flink 维表关联系列之 Redis 维表关联:实时查询
- Flink 维表关联系列之 MySQL 维表关联:全量加载
- Flink 维表关联系列之 Hbase 维表关联:LRU 策略
- GORM 关联查询
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
数据结构(C语言版)
严蔚敏、吴伟民 / 清华大学出版社 / 2012-5 / 29.00元
《数据结构》(C语言版)是为“数据结构”课程编写的教材,也可作为学习数据结构及其算法的C程序设计的参数教材。 本书的前半部分从抽象数据类型的角度讨论各种基本类型的数据结构及其应用;后半部分主要讨论查找和排序的各种实现方法及其综合分析比较。其内容和章节编排1992年4月出版的《数据结构》(第二版)基本一致,但在本书中更突出了抽象数据类型的概念。全书采用类C语言作为数据结构和算法的描述语言。 ......一起来看看 《数据结构(C语言版)》 这本书的介绍吧!