内容简介:一.背景flink 这个东西,后面会尝试走纯SQL 统计路线,这个阿里和华为都搞了一套,这里就简单记录下测试效果。用SQL统计用户点击数,每隔5秒统计一次。暂时去掉了复杂逻辑。
一.背景
flink 这个东西,后面会尝试走纯 SQL 统计路线,这个阿里和华为都搞了一套,这里就简单记录下测试效果。
用SQL统计用户点击数,每隔5秒统计一次。暂时去掉了复杂逻辑。
二.直接看代码
// lombok 插件,这里主要写一个简单的数据产生的对象
// 表是时间,用户,以及商品3个字段
@Data
@ToString
public class UserInfo implements Serializable {
private Timestamp pTime;
private String userId;
private String itemId;
public UserInfo() {
}
public UserInfo(String userId, String itemId) {
this.userId = userId;
this.itemId = itemId;
this.pTime = new Timestamp(System.currentTimeMillis());
}
}
/**
* 模拟数据产生,没隔1秒发送一个数据
* @author <a href="mailto:huoguo@2dfire.com">火锅</a>
* @time 2019/2/22
*/
public class UserDataSource implements SourceFunction<UserInfo> {
static String[] items = {"i-1", "i-2", "i-3"};
static String[] users = {"a", "b", "c"};
@Override
public void run(SourceContext sc) throws Exception {
while (true) {
TimeUnit.SECONDS.sleep(1);
int m = (int) (System.currentTimeMillis() % 3);
sc.collect(new UserInfo(users[m], items[m]));
}
}
@Override
public void cancel() {
System.out.println("cancel to do ...");
}
}
/** * 这就主函数,负责统计,引用是 java 的,别引错了 */ public class UserApp { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); DataStream<UserInfo> userInfoDataStream = env.addSource(new UserDataSource()); DataStream<UserInfo> timedData = userInfoDataStream.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<UserInfo>() { @Override public long extractAscendingTimestamp(UserInfo element) { return element.getPTime().getTime(); } }); StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env); // pTime.rowtime = pTime as rowTime(proctime) tableEnv.registerDataStream("test", timedData, "userId,itemId,pTime.rowtime"); Table result = tableEnv.sqlQuery("SELECT userId,TUMBLE_END(pTime, INTERVAL '5' SECOND) as pTime,count(1) as cnt FROM test" + " GROUP BY TUMBLE(pTime, INTERVAL '5' SECOND),userId "); // deal with (Tuple2<Boolean, Row> value) -> out.collect(row) SingleOutputStreamOperator allClick = tableEnv.toRetractStream(result, Row.class) .flatMap((Tuple2<Boolean, Row> value, Collector<Row> out) -> { out.collect(value.f1); }).returns(Row.class); // add sink or print allClick.print(); env.execute("test"); } }
结果:
c,2019-03-06 13:55:20.0,4
a,2019-03-06 13:55:20.0,1
--- 手动分开好看
c,2019-03-06 13:55:25.0,2
b,2019-03-06 13:55:25.0,2
a,2019-03-06 13:55:25.0,1
---
a,2019-03-06 13:55:30.0,2
c,2019-03-06 13:55:30.0,2
b,2019-03-06 13:55:30.0,1
小结:
1.demo很简单,仅为了测试使用
2.具体的原理,和很多东西 有时间再写吧
以上所述就是小编给大家介绍的《flink-table-sql-demo1》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!
猜你喜欢:本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。