flink-table-sql-demo1

栏目: 数据库 · 发布时间: 6年前

内容简介:一.背景flink 这个东西,后面会尝试走纯SQL 统计路线,这个阿里和华为都搞了一套,这里就简单记录下测试效果。用SQL统计用户点击数,每隔5秒统计一次。暂时去掉了复杂逻辑。

一.背景

flink 这个东西,后面会尝试走纯 SQL 统计路线,这个阿里和华为都搞了一套,这里就简单记录下测试效果。

用SQL统计用户点击数,每隔5秒统计一次。暂时去掉了复杂逻辑。

二.直接看代码

// lombok 插件,这里主要写一个简单的数据产生的对象
// 表是时间,用户,以及商品3个字段
@Data
@ToString
public class UserInfo implements Serializable {
    private Timestamp pTime;
    private String userId;
    private String itemId;

    public UserInfo() {
    }

    public UserInfo(String userId, String itemId) {
        this.userId = userId;
        this.itemId = itemId;
        this.pTime = new Timestamp(System.currentTimeMillis());
    }
}
/**
 * 模拟数据产生,没隔1秒发送一个数据
 * @author <a href="mailto:huoguo@2dfire.com">火锅</a>
 * @time 2019/2/22
 */
public class UserDataSource implements SourceFunction<UserInfo> {
    static String[] items = {"i-1", "i-2", "i-3"};
    static String[] users = {"a", "b", "c"};
    @Override
    public void run(SourceContext sc) throws Exception {
        while (true) {
            TimeUnit.SECONDS.sleep(1);
            int m = (int) (System.currentTimeMillis() % 3);
            sc.collect(new UserInfo(users[m], items[m]));
        }
    }
    @Override
    public void cancel() {
        System.out.println("cancel to do ...");
    }
}
/**
 * 这就主函数,负责统计,引用是 java 的,别引错了
 */
public class UserApp {
    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        DataStream<UserInfo> userInfoDataStream = env.addSource(new UserDataSource());

        DataStream<UserInfo> timedData = userInfoDataStream.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<UserInfo>() {
            @Override
            public long extractAscendingTimestamp(UserInfo element) {
                return element.getPTime().getTime();
            }
        });
        StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
        // pTime.rowtime  = pTime as rowTime(proctime)
        tableEnv.registerDataStream("test", timedData, "userId,itemId,pTime.rowtime");
        Table result = tableEnv.sqlQuery("SELECT userId,TUMBLE_END(pTime, INTERVAL '5' SECOND) as pTime,count(1) as cnt FROM  test" +
                " GROUP BY TUMBLE(pTime, INTERVAL '5' SECOND),userId ");
        // deal with (Tuple2<Boolean, Row> value) -> out.collect(row)
        SingleOutputStreamOperator allClick = tableEnv.toRetractStream(result, Row.class)
                .flatMap((Tuple2<Boolean, Row> value, Collector<Row> out) -> {
                    out.collect(value.f1);
                }).returns(Row.class);
        // add sink or print
        allClick.print();
        env.execute("test");
    }

}

结果:

c,2019-03-06 13:55:20.0,4

a,2019-03-06 13:55:20.0,1

--- 手动分开好看

c,2019-03-06 13:55:25.0,2

b,2019-03-06 13:55:25.0,2

a,2019-03-06 13:55:25.0,1

---

a,2019-03-06 13:55:30.0,2

c,2019-03-06 13:55:30.0,2

b,2019-03-06 13:55:30.0,1

小结:

1.demo很简单,仅为了测试使用

2.具体的原理,和很多东西 有时间再写吧


以上所述就是小编给大家介绍的《flink-table-sql-demo1》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

Data Structures and Algorithms in Java

Data Structures and Algorithms in Java

Robert Lafore / Sams / 2002-11-06 / USD 64.99

Data Structures and Algorithms in Java, Second Edition is designed to be easy to read and understand although the topic itself is complicated. Algorithms are the procedures that software programs use......一起来看看 《Data Structures and Algorithms in Java》 这本书的介绍吧!

JS 压缩/解压工具
JS 压缩/解压工具

在线压缩/解压 JS 代码

UNIX 时间戳转换
UNIX 时间戳转换

UNIX 时间戳转换

正则表达式在线测试
正则表达式在线测试

正则表达式在线测试