内容简介:—END—
点击箭头处 “蓝色字” ,关注我们哦!!
继上一篇中 使用Calcite解析Sql做维表关联(一) 介绍了建表语句解析方式以及使用calcite解析解析流表join维表方法,这一篇将会介绍如何使用代码去实现将sql变为可执行的代码。
实现流程分析:
-
注册表
根据对create语句解析的结果:表名称、字段信息、表属性,注册成为相应的源表、结果表;
-
join 拆解
使用calcite 解析后得到两个部分join部分、insert部分,join部分得到的流表先转换为流,然后根据维表配置的属性(维表来源、查询方式等)选择不同的维表关联策略,得到一个关联之后的流,最后将这个流注册为一张表;对于insert部分就比较简单,insert部分的select的表直接更换为关联之后的流表,然后执行即可。
经过以上分析之后,接下来看下具体的实现。
注册表
注册表包括源表、结果表。实时处理的数据源通常是kafka,针对不同的数据格式需要制定不同的反序列化方式,以json格式为例,如何将kafka的数据反序列化,将流转换为表,通常流的数据类型为Pojo、Tuple、Row等,为了能够通用化选择Row类型;结果表通常是 mysql 、hbase、es等,需要定义AppendStreamTableSink或者RetractStreamTableSink。
//以json格式为例
public class JsonDeserilization implements DeserializationSchema<Row> {
private Map<String,String> fields; //fieldName->fieldType
private RowTypeInfo rowTypeInfo;
private TypeInformation<?>[] typeInformations;
private String[] fieldNames;
//传入的参数解析create语句得到
public JsonDeserilization(String[] fieldNames,TypeInformation<?>[] typeInformations){
this.fieldNames=fieldNames;
this.typeInformations=typeInformations;
this.rowTypeInfo=new RowTypeInfo(typeInformations,fieldNames);
}
@Override public Row deserialize(byte[] message) throws IOException {
String msg=new String(message);
Row row=new Row(fieldNames.length);
JSONObject jsonObject=JSONObject.parseObject(msg);
for(int i=0;i<fieldNames.length;i++){
if(typeInformations[i].getTypeClass()==String.class){
row.setField(i,jsonObject.getString(fieldNames[i]));
}
if(typeInformations[i].getTypeClass()==Integer.class){
row.setField(i,jsonObject.getInteger(fieldNames[i]));
}
}
return row;
}
......
}
注册表:
//kafka json 类型
public static void registerSourceTable(TableInfo tableInfo,StreamTableEnvironment tblEnv){
Properties props=tableInfo.getProps();
String tableName=tableInfo.getTableName();
StreamExecutionEnvironment env=tblEnv.execEnv();
if("kafka".equals(props.getProperty("type"))) {
Properties kafkaPros = new Properties();
props.forEach((k, v) -> {
if (k.toString().startsWith("kafka.")) {
kafkaPros.setProperty(k.toString().replace("kafka.", ""), v.toString());
}
});
String topic = props.getProperty("kafka.topic");
FlinkKafkaConsumer<Row> consumer011 = new FlinkKafkaConsumer<Row>(topic,
new JsonDeserilization(tableInfo.getFieldNames(), tableInfo.getFieldTypes()), kafkaPros);
DataStream<Row> ds = env.addSource(consumer011);
tblEnv.registerDataStreamInternal(tableName, ds);
}
}
public static void registerSinkTable(TableInfo tableInfo,StreamTableEnvironment tblEnv){
Properties props=tableInfo.getProps();
String tableName=tableInfo.getTableName();
if("console".equals(props.getProperty("type"))){
ConsoleTableSink consoleTableSink=new ConsoleTableSink();
tblEnv.registerTableSink(tableName,tableInfo.getFieldNames(),tableInfo.getFieldTypes(),consoleTableSink);
}
}
ConsoleTableSink 实现了RetractStreamTableSink <Row> ,直接将数据原样输出到控制台。
Join实现
得到解析后的SqlJoin节点,获取源表、维表信息,首先将源表转换为流:
SqlJoin sqlJoin=(SqlJoin)sqlNode1;
String leftTableName=parseTableName(sqlJoin.getLeft()); //表名称
String rightTableName=parseTableName(sqlJoin.getRight());
TableInfo leftTableInfo=tableInfoMap.get(leftTableName);//表信息
TableInfo rightTable=tableInfoMap.get(rightTableName);
String leftAlias=paserAliasTableName(sqlJoin.getLeft()); //别名
String rightAlias=paserAliasTableName(sqlJoin.getRight());
Table leftTable=tblEnv.sqlQuery("select * from " + leftTableName);
DataStream leftStream = tblEnv.toAppendStream(leftTable,Row.class); //转换后的流
接下来将流表与维表进行关联查询,根据维表根据设置的不同属性:同步/异步查询、cache/nocache方式、查询不同的外部存储等,需要实现不同的查询方式。以异步查询mysql为例分析:需要根据维表定义的字段、join的关联条件解析生成一条sql语句,根据流入数据解析出sql的查询条件值,然后查询得到对应的维表值,将流入数据与查询得到的维表数据拼接起来输出到下游:
public class MySqlAsyncFunction extends RichAsyncFunction<Row,Row> {
private Connection connection;
private String sqlTemplate;
private String url;
private String username;
private String password;
private String tableName;
private int idx; //条件值在流入数据的位置
private int inLength; //流入字段数
private int outLength; //输出字段数
private int sideLength; //维表查询字段数
public MySqlAsyncFunction(SqlJoin sqlJoin,TableInfo sideTableInfo,TableInfo leftTableInfo){
Properties props=sideTableInfo.getProps();
this.url=props.getProperty("url");
this.username=props.getProperty("username");
this.password=props.getProperty("password");
this.tableName=props.getProperty("tableName");
String rightField=parseCondition(sqlJoin,false);
genSqlTemplate(sideTableInfo,rightField);
String leftField=parseCondition(sqlJoin,true);
for (int i = leftTableInfo.getFieldNames().length - 1; i >= 0; i--) {
if(leftField.equals(leftTableInfo.getFieldNames()[i])){
this.idx=i;
break;
}
}
inLength=leftTableInfo.getFieldNames().length;
sideLength=sideTableInfo.getFieldNames().length;
outLength=inLength+sideLength;
}
@Override public void open(Configuration parameters) throws Exception {
super.open(parameters);
this.connection= DriverManager.getConnection(url,username,password);
}
//这里还是一个同步查询,没有使用异步方式,需要使用一部mysql客户端
@Override public void asyncInvoke(Row input, ResultFuture<Row> resultFuture) throws Exception {
String v=(String)input.getField(idx); //获取条件值
PreparedStatement preparedStatement=connection.prepareStatement(sqlTemplate);
preparedStatement.setString(1,v);
ResultSet rs=preparedStatement.executeQuery();
boolean isJoin=false;
while (rs.next()){
isJoin=true;
Row row=new Row(outLength);
for(int i=0;i<input.getArity();i++){
row.setField(i,input.getField(i));
}
//直接将维表数据补齐在流数据后面
for(int i=0;i<sideLength;i++){
row.setField(inLength+i,rs.getObject(i+1));
}
resultFuture.complete(Collections.singletonList(row));
}
if(!isJoin) resultFuture.complete(null);
}
//解析on 条件的左右表字段名称, 这里只解析了一个关联条件
private String parseCondition(SqlJoin sqlJoin,boolean isLeft){
SqlNode condition=sqlJoin.getCondition();
SqlBasicCall sqlBasicCall=(SqlBasicCall)condition;
String name=SqlExec.paserAliasTableName(isLeft?sqlJoin.getLeft():sqlJoin.getRight());
SqlIdentifier sqlIdentifier1=(SqlIdentifier)sqlBasicCall.operands[0];
if(name.equals(sqlIdentifier1.names.get(0))){
return sqlIdentifier1.names.get(1);
}
SqlIdentifier sqlIdentifier2=(SqlIdentifier)sqlBasicCall.operands[1];
if(name.equals(sqlIdentifier2.names.get(0))){
return sqlIdentifier2.names.get(1);
}
return null;
}
//查询sql
private void genSqlTemplate(TableInfo tableInfo,String condition){
StringBuilder sql=new StringBuilder();
StringBuilder selects=new StringBuilder();
sql.append("select ");
for(String field : tableInfo.getFieldNames()){
selects.append(field);
selects.append(",");
}
sql.append(selects.substring(0,selects.lastIndexOf(",")));
sql.append(" from ").append(this.tableName);
if(condition!=null)
sql.append(" where ").append(condition).append("=?");
this.sqlTemplate=sql.toString();
}
}
到目前为止获取了join之后的结果数据,但是有一点需要考虑,流表定义字段名称与维表定义的字段名称可能会相同,那么在将流转换为表时就存在相同的字段,因此需要对相同的字段重命名:
TableInfo leftTableInfo=tableInfoMap.get(leftTableName);
TableInfo rightTable=tableInfoMap.get(rightTableName);
List<String> newFields=new ArrayList<>(); //join之后流的字段名称
List<TypeInformation> newTypes=new ArrayList<>(); //join之后流的数据类型
//需要做字段解析 原始表名-原始字段名称-新字段名称
HashBasedTable hashBasedTable=HashBasedTable.create();
int i=0;
for(String field:leftTable.getSchema().getFieldNames()){
hashBasedTable.put(leftAlias,field,field);
newFields.add(field);
newTypes.add(leftTable.getSchema().getFieldType(i).get());
i++;
}
i=0;
for(String field:rightTable.getFieldNames()){
String newField=field;
if(hashBasedTable.containsColumn(field)){
newField=field+"0";
}
hashBasedTable.put(rightAlias,field,newField);
newFields.add(newField);
newTypes.add(rightTable.getFieldTypes()[i]);
i++;
}
String newTableNameAlias=leftAlias+"_"+rightAlias;
String newTableName=leftTableName+"_"+rightTableName;
hashBasedTableMap.put(newTableNameAlias,hashBasedTable);
//outType 表示关联之后的流数据类型
RowTypeInfo outType=new RowTypeInfo(newTypes.toArray(new TypeInformation[]{}),newFields.toArray(new String[]{}));
DataStream dsOut=AsyncDataStream.unorderedWait(leftStream,new MySqlAsyncFunction(sqlJoin,rightTable,leftTableInfo),10,
TimeUnit.SECONDS);
dsOut.getTransformation().setOutputType(outType);
tblEnv.registerDataStream(newTableName,dsOut); //将join之后的流注册成为表
同样也需要对insert部分的select字段根据上面得到新的字段名称hashBasedTable进行替换:
SqlInsert sqlInsert=(SqlInsert)sqlNode1;
SqlSelect source=(SqlSelect)sqlInsert.getSource();
SqlBasicCall sqlBasicCall=(SqlBasicCall) source.getFrom();
String newAlias=sqlBasicCall.operands[1].toString(); //新表的别名
HashBasedTable hashBasedTable=hashBasedTableMap.get(newAlias);
SqlNodeList sqlNodeList=source.getSelectList();
int i=0;
for(SqlNode x: sqlNodeList.getList()){
SqlIdentifier sqlIdentifier=(SqlIdentifier)x;
String tableAlias=sqlIdentifier.names.get(0);
String field=sqlIdentifier.names.get(1); String newFieldName=hashBasedTable.get(tableAlias,field).toString();
sqlIdentifier=sqlIdentifier.setName(0,newAlias);//替换为新的表别名称
sqlIdentifier=sqlIdentifier.setName(1,newFieldName);//替换为新的字段名称
sqlNodeList.set(i,sqlIdentifier);
i++;
}
tblEnv.sqlUpdate(sqlInsert.toString()); //执行insert 语句
总结
以上提供了流表join维表的sql实现思路以及部分demo代码的参考,但是其远远达不到工程上的要求,在实际使用中需要要考虑更多的因素:复杂嵌套的sql、时间语义支持、自定义函数支持等。推荐一个开源项flinkStreamSql, 地址为: https://github.com/DTStack/flinkStreamSQL , 丰富的语义支持、不同类型的源插件支持等。
—END—
关注回复 Flink
获取更多系列
原创不易,好看,就点个"在看"
以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网
猜你喜欢:- 使用Calcite解析Sql做维表关联(一)
- Flink 维表关联系列之 Kafka 维表关联:广播方式
- Flink 维表关联系列之 Redis 维表关联:实时查询
- Flink 维表关联系列之 MySQL 维表关联:全量加载
- Flink 维表关联系列之 Hbase 维表关联:LRU 策略
- GORM 关联查询
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
算法分析-有效的学习方法(影印版)
Jeffrey J.McConnell / 高等教育出版社 / 2003-03-01 / 28.0
本书主要目标是提高读者关于算法对程序效率的影响等问题的认知水平,并培养读者分析程序中的算法所必需的技巧。各章材料以激发读者有效的、协同的学习方法的形式讲述。通过全面的论述和完整的数学推导,本书帮助读者最大限度地理解基本概念。 本书内容包括促使学生参与其中的大量程序设计课题。书中所有算法以伪码形式给出,使得具备条件表达式、循环与递归方面知识的读者均易于理解。本书以简洁的写作风格向读者介绍了兼具......一起来看看 《算法分析-有效的学习方法(影印版)》 这本书的介绍吧!