内容简介:最近在做一个画像的任务,sql实现的,其中有一个udf,会做很多事情,包括将从redis读出历史值加权,并将中间结果和加权后的结果更新到redis。大家都知道,flink 是可以支持事件处理的,也就是可以没有时间的概念,那么在聚合,join等操作的时候,flink内部会维护一个状态,假如此时你也用redis维护了历史状态,也即是类似 result = currentState(flink)+lastState(redis),且此时要针对计算的结果用where进行筛选.
场景
最近在做一个画像的任务,sql实现的,其中有一个udf,会做很多事情,包括将从 redis 读出历史值加权,并将中间结果和加权后的结果更新到redis。
大家都知道,flink 是可以支持事件处理的,也就是可以没有时间的概念,那么在聚合,join等操作的时候,flink内部会维护一个状态,假如此时你也用redis维护了历史状态,也即是类似 result = currentState(flink)+lastState(redis),且此时要针对计算的结果用where进行筛选.
SQL如下
CREATE VIEW view_count AS select `time`, gid, cid, count(feed_id) * 1 as strength FROM view_cid GROUP BY gid, cid,`time`; CREATE VIEW view_strength AS select `time`, gid, cid , Get_Strength_Weaken(gid, cid, cast(strength as double), `time`, 0.95) as `result` FROM view_count ; insert into hx_app_server_sink_common SELECT gid, cid, `result` FROM view_strength where `result` <> '0.0' GROUP BY gid, cid, `result`;
业务分析
第一个 sql 视图完成的是首先分组,然后统计某一个字段并乘以权重;
第二个sql视图,udf :Get_Strength_Weaken完成当前值和历史值叠加工作,历史值存储在redis,同时将结果返回并更新redis,返回值作为result字段。
第三个sql在输出的时候,result字段作为了where的条件和group by里的字段。
这时候生成的flink概图如下:
观察中间的结构图可以发现,Get_Strength_Weaken被调用两次:
1. where条件,这个的生成是由于第三条sql
where `result` <> '0.0'
产生的执行计划,是不是看起来很懵逼。。。
2. select里面还有一次调用Get_Strength_Weaken,这个很明显。
当然,可以打印一下flink udf里eval函数的调用细节日志,很容易发现重复调用的问题,浪院长这个也是通过分析日志,对比输出结果来得出的论。
综合上面分析和udf调用日志,结论就是udf被调用了两次。
对于这个flink的udf被多次调用引起的结果偏大,整整调试了一下午。
由于上面分析可以得出结论,flink将where条件下推了,where 条件判断会先执行,而select里后执行,那么可以调整SQL,如下:
CREATE VIEW view_count AS select `time`, gid, cid, count(feed_id) * 1 as strength FROM view_cid GROUP BY gid, cid,`time`; CREATE VIEW view_strength AS select `time`, gid, cid , getResult(gid,cid) as `result` FROM view_count where Get_Strength_Weaken(gid, cid, cast(strength as double), `time`, 0.95) as `result` <> '0.0' ; insert into hx_app_server_sink_common SELECT gid, cid, `result` FROM view_strength GROUP BY gid, cid, `result`;
那么实际上,select里的udf主要目的是取出来计算结果,那么这个时候可以写个简单的udf--getResult,只让他从redis获取 where条件里更新到redis里的结果,由于该udf是无状态的即使多次调用,也无所谓。
所以,总结一下,对于flink 来说,由于基于事件的处理,聚合、join等操作会有状态缓存,那么此时再用到含有外部存储状态的udf,一定要慎重,结合执行计划,来合理放置udf的位置,避免出错。
当然,调试阶段最好是有详细的日志,便于分析和定位问题。
flink 状态删除
其实,flink聚合等内部状态有配置可以使其自动删除的,具体配置使用如下:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env); // obtain query configuration from TableEnvironment StreamQueryConfig qConfig = tableEnv.queryConfig(); // set query parameters qConfig.withIdleStateRetentionTime(Time.hours(12)); // define query Table result = ... // create TableSink TableSink<Row> sink = ... // emit result Table via a TableSink result.writeToSink(sink, qConfig); // convert result Table into a DataStream<Row> DataStream<Row> stream = tableEnv.toAppendStream(result, Row.class, qConfig);
[完]
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网
猜你喜欢:本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
Think Python
Allen B. Downey / O'Reilly Media / 2012-8-23 / GBP 29.99
Think Python is an introduction to Python programming for students with no programming experience. It starts with the most basic concepts of programming, and is carefully designed to define all terms ......一起来看看 《Think Python》 这本书的介绍吧!