flink-table-sql-demo1

栏目: 数据库 · 发布时间: 6年前

内容简介:一.背景flink 这个东西,后面会尝试走纯SQL 统计路线,这个阿里和华为都搞了一套,这里就简单记录下测试效果。用SQL统计用户点击数,每隔5秒统计一次。暂时去掉了复杂逻辑。

一.背景

flink 这个东西,后面会尝试走纯 SQL 统计路线,这个阿里和华为都搞了一套,这里就简单记录下测试效果。

用SQL统计用户点击数,每隔5秒统计一次。暂时去掉了复杂逻辑。

二.直接看代码

// lombok 插件,这里主要写一个简单的数据产生的对象
// 表是时间,用户,以及商品3个字段
@Data
@ToString
public class UserInfo implements Serializable {
    private Timestamp pTime;
    private String userId;
    private String itemId;

    public UserInfo() {
    }

    public UserInfo(String userId, String itemId) {
        this.userId = userId;
        this.itemId = itemId;
        this.pTime = new Timestamp(System.currentTimeMillis());
    }
}
/**
 * 模拟数据产生,没隔1秒发送一个数据
 * @author <a href="mailto:huoguo@2dfire.com">火锅</a>
 * @time 2019/2/22
 */
public class UserDataSource implements SourceFunction<UserInfo> {
    static String[] items = {"i-1", "i-2", "i-3"};
    static String[] users = {"a", "b", "c"};
    @Override
    public void run(SourceContext sc) throws Exception {
        while (true) {
            TimeUnit.SECONDS.sleep(1);
            int m = (int) (System.currentTimeMillis() % 3);
            sc.collect(new UserInfo(users[m], items[m]));
        }
    }
    @Override
    public void cancel() {
        System.out.println("cancel to do ...");
    }
}
/**
 * 这就主函数,负责统计,引用是 java 的,别引错了
 */
public class UserApp {
    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        DataStream<UserInfo> userInfoDataStream = env.addSource(new UserDataSource());

        DataStream<UserInfo> timedData = userInfoDataStream.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<UserInfo>() {
            @Override
            public long extractAscendingTimestamp(UserInfo element) {
                return element.getPTime().getTime();
            }
        });
        StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
        // pTime.rowtime  = pTime as rowTime(proctime)
        tableEnv.registerDataStream("test", timedData, "userId,itemId,pTime.rowtime");
        Table result = tableEnv.sqlQuery("SELECT userId,TUMBLE_END(pTime, INTERVAL '5' SECOND) as pTime,count(1) as cnt FROM  test" +
                " GROUP BY TUMBLE(pTime, INTERVAL '5' SECOND),userId ");
        // deal with (Tuple2<Boolean, Row> value) -> out.collect(row)
        SingleOutputStreamOperator allClick = tableEnv.toRetractStream(result, Row.class)
                .flatMap((Tuple2<Boolean, Row> value, Collector<Row> out) -> {
                    out.collect(value.f1);
                }).returns(Row.class);
        // add sink or print
        allClick.print();
        env.execute("test");
    }

}

结果:

c,2019-03-06 13:55:20.0,4

a,2019-03-06 13:55:20.0,1

--- 手动分开好看

c,2019-03-06 13:55:25.0,2

b,2019-03-06 13:55:25.0,2

a,2019-03-06 13:55:25.0,1

---

a,2019-03-06 13:55:30.0,2

c,2019-03-06 13:55:30.0,2

b,2019-03-06 13:55:30.0,1

小结:

1.demo很简单,仅为了测试使用

2.具体的原理,和很多东西 有时间再写吧


以上所述就是小编给大家介绍的《flink-table-sql-demo1》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

算法(第4版)

算法(第4版)

塞奇威克 (Robert Sedgewick)、韦恩 (Kevin Wayne) / 谢路云 / 人民邮电出版社 / 2012-10-1 / 99.00元

本书全面讲述算法和数据结构的必备知识,具有以下几大特色。  算法领域的经典参考书 Sedgewick畅销著作的最新版,反映了经过几十年演化而成的算法核心知识体系  内容全面 全面论述排序、搜索、图处理和字符串处理的算法和数据结构,涵盖每位程序员应知应会的50种算法  全新修订的代码 全新的Java实现代码,采用模块化的编程风格,所有代码均可供读者使......一起来看看 《算法(第4版)》 这本书的介绍吧!

CSS 压缩/解压工具
CSS 压缩/解压工具

在线压缩/解压 CSS 代码

MD5 加密
MD5 加密

MD5 加密工具

HEX HSV 转换工具
HEX HSV 转换工具

HEX HSV 互换工具