flink 有状态udf 引起血案一-spark高级玩法(主要分享spark使用及源码,spark 机器学习,图计算,同...

栏目: 服务器 · 发布时间: 7年前

内容简介:最近在做一个画像的任务,sql实现的,其中有一个udf,会做很多事情,包括将从redis读出历史值加权,并将中间结果和加权后的结果更新到redis。大家都知道,flink 是可以支持事件处理的,也就是可以没有时间的概念,那么在聚合,join等操作的时候,flink内部会维护一个状态,假如此时你也用redis维护了历史状态,也即是类似 result = currentState(flink)+lastState(redis),且此时要针对计算的结果用where进行筛选.

flink 有状态udf 引起血案一-spark高级玩法(主要分享spark使用及源码,spark 机器学习,图计算,同...

场景

最近在做一个画像的任务,sql实现的,其中有一个udf,会做很多事情,包括将从 redis 读出历史值加权,并将中间结果和加权后的结果更新到redis。

大家都知道,flink 是可以支持事件处理的,也就是可以没有时间的概念,那么在聚合,join等操作的时候,flink内部会维护一个状态,假如此时你也用redis维护了历史状态,也即是类似 result = currentState(flink)+lastState(redis),且此时要针对计算的结果用where进行筛选.

SQL如下

CREATE VIEW view_count AS
select
  `time`,
  gid,
  cid,
  count(feed_id) * 1 as strength
FROM
  view_cid
GROUP BY
  gid,
  cid,`time`;

CREATE VIEW view_strength AS select
  `time`,
  gid,
  cid ,
  Get_Strength_Weaken(gid, cid, cast(strength as double), `time`, 0.95)  as `result`
FROM
  view_count
;

insert into
  hx_app_server_sink_common
SELECT
  gid,
  cid,
  `result`
FROM
  view_strength
where `result` <> '0.0' 
GROUP BY
  gid,
  cid,
  `result`;

业务分析

第一个 sql 视图完成的是首先分组,然后统计某一个字段并乘以权重;

第二个sql视图,udf :Get_Strength_Weaken完成当前值和历史值叠加工作,历史值存储在redis,同时将结果返回并更新redis,返回值作为result字段。

第三个sql在输出的时候,result字段作为了where的条件和group by里的字段。

这时候生成的flink概图如下:

flink 有状态udf 引起血案一-spark高级玩法(主要分享spark使用及源码,spark 机器学习,图计算,同...

观察中间的结构图可以发现,Get_Strength_Weaken被调用两次:

1. where条件,这个的生成是由于第三条sql

where `result` <> '0.0'

产生的执行计划,是不是看起来很懵逼。。。

2. select里面还有一次调用Get_Strength_Weaken,这个很明显。

当然,可以打印一下flink udf里eval函数的调用细节日志,很容易发现重复调用的问题,浪院长这个也是通过分析日志,对比输出结果来得出的论。

综合上面分析和udf调用日志,结论就是udf被调用了两次。

对于这个flink的udf被多次调用引起的结果偏大,整整调试了一下午。

由于上面分析可以得出结论,flink将where条件下推了,where 条件判断会先执行,而select里后执行,那么可以调整SQL,如下:

CREATE VIEW view_count AS
select
`time`,
 gid,
 cid,
count(feed_id) * 1 as strength
FROM
 view_cid
GROUP BY
 gid,
 cid,`time`;

CREATE VIEW view_strength AS select
`time`,
 gid,
 cid ,
getResult(gid,cid) as `result`
FROM
 view_count
where Get_Strength_Weaken(gid, cid, cast(strength as double), `time`, 0.95)  as `result` <> '0.0'
;

insert into
 hx_app_server_sink_common
SELECT
 gid,
 cid,
`result`
FROM
 view_strength
GROUP BY
 gid,
 cid,
`result`;

那么实际上,select里的udf主要目的是取出来计算结果,那么这个时候可以写个简单的udf--getResult,只让他从redis获取 where条件里更新到redis里的结果,由于该udf是无状态的即使多次调用,也无所谓。

所以,总结一下,对于flink 来说,由于基于事件的处理,聚合、join等操作会有状态缓存,那么此时再用到含有外部存储状态的udf,一定要慎重,结合执行计划,来合理放置udf的位置,避免出错。

当然,调试阶段最好是有详细的日志,便于分析和定位问题。

flink 状态删除

其实,flink聚合等内部状态有配置可以使其自动删除的,具体配置使用如下:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);

// obtain query configuration from TableEnvironment
StreamQueryConfig qConfig = tableEnv.queryConfig();
// set query parameters
qConfig.withIdleStateRetentionTime(Time.hours(12));

// define query
Table result = ...

// create TableSink
TableSink<Row> sink = ...

// emit result Table via a TableSink
result.writeToSink(sink, qConfig);

// convert result Table into a DataStream<Row>
DataStream<Row> stream = tableEnv.toAppendStream(result, Row.class, qConfig);

[完]


以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

Think Python

Think Python

Allen B. Downey / O'Reilly Media / 2012-8-23 / GBP 29.99

Think Python is an introduction to Python programming for students with no programming experience. It starts with the most basic concepts of programming, and is carefully designed to define all terms ......一起来看看 《Think Python》 这本书的介绍吧!

CSS 压缩/解压工具
CSS 压缩/解压工具

在线压缩/解压 CSS 代码

Base64 编码/解码
Base64 编码/解码

Base64 编码/解码

HEX HSV 转换工具
HEX HSV 转换工具

HEX HSV 互换工具