flink-watermark

栏目: 编程工具 · 发布时间: 6年前

阅读更多

一.背景

当我们统计用户点击的时候,有时候会因为各种情况数据延迟,我们需要一个允许最大的延迟范围进行统计。这里的延迟统计分为两种:

模拟初始数据:早上10:00 11.10 用户点击了一次,但是延迟到10:00 11.15 才发送过来,允许最大延迟5秒, 5秒窗口统计。我们希望还是能统计到

二.基本代码

@Data
public class UserTimeInfo implements Serializable {

    private String userId;
    /** 实际时间-偏移量 偏移后的时间*/
    private Timestamp pTime;
    public UserTimeInfo() {
    }
    public UserTimeInfo(String userId, Timestamp pTime) {
        this.userId = userId;
        this.pTime = pTime;
    }
}
public class UserTimeSource implements SourceFunction<UserTimeInfo> {


    /**
     * 为了id 统计方便,我们只留一个id
     */
    static String[] userIds = {"id->"};
    Random random = new Random();
    /**
     * 模拟发送20次
     */
    int times = 20;

    @Override
    public void run(SourceContext sc) throws Exception {
        while (true) {
            TimeUnit.SECONDS.sleep(1);
            int m = (int) (System.currentTimeMillis() % userIds.length);
            // 随机延迟几秒
            int defTime = random.nextInt(5);
            // 发送时间
            DateTime dateTime = new DateTime();
            // 计算延迟后的时间,并且打印时间
            DateTime dateTimePrint = dateTime.plusSeconds(-defTime);
            System.out.println("实际时间:" + print(dateTime) + ",延迟:" + defTime + ":-->" + print(dateTimePrint));
            // 发送延迟时间
            dateTime = dateTime.plusSeconds(-defTime);
            sc.collect(new UserTimeInfo(userIds[m], new Timestamp(dateTime.getMillis())));
            // 只持续固定时间方便观察
            if (--times == 0) {
                break;
            }
        }
    }

    @Override
    public void cancel() {
        System.out.println("cancel to do ...");
    }

    private static String print(DateTime dateTime) {
        return dateTime.toString("yyyy-MM-dd hh:mm:ss");
    }
}

三.定义我们的两种watermark

a. 基于系统时间

/**
 * 这里逻辑,模拟按系统时间进行统计
 * 所有数据和系统自身时间有关
 */
public class UserTimeWaterMarkBySystem implements AssignerWithPeriodicWatermarks<UserTimeInfo> {
    /**
     * 默认允许 5秒延迟
     */
    long maxDelayTime = 5000;
    /**
     * 该时间由于基于系统时间来做,
     * 如果10:00 11:10 秒用户点击的数据,然后延迟,实际收到的时间是10.00 11:15  
     * a.根据系统时间 想减,小于5秒就会统计到
     * b.注意,如果程序挂了,12点重启消费这个数据,就统计不到了
     * @return
     */
    @Nullable
    @Override
    public Watermark getCurrentWatermark() {
        return new Watermark(System.currentTimeMillis() - maxDelayTime);
    }
    @Override
    public long extractTimestamp(UserTimeInfo element, long previousElementTimestamp) {
        long timestamp = element.getPTime().getTime();
        return timestamp;
    }
}

b.根据数据自生时间进行做延迟判断

public class UserTimeWaterMarkByRowTime implements AssignerWithPeriodicWatermarks<UserTimeInfo> {
    /**
     * 默认允许 5秒延迟
     */
    long maxDelayTime = 5000;

    /**
     * 该时间由于基于数据时间来做,
     * 如果10:00 11:10 秒用户点击的数据,然后延迟,实际收到的时间是10.00 11:15
     * a.根据系统时间 想减,小于5秒就会统计到
     * b.只要消息 时间延迟小于5 就能被统计。 
     * 这种对点击事件来说,更符合要求
     * @return
     */
    private long currentMaxTimestamp;
    
    @Nullable
    @Override
    public Watermark getCurrentWatermark() {
        return new Watermark(currentMaxTimestamp - maxDelayTime);
    }
    @Override
    public long extractTimestamp(UserTimeInfo element, long previousElementTimestamp) {
        long timestamp = element.getPTime().getTime();
        currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp);
        return timestamp;
    }
}

四.source 类,和以前一样

public class UserTimeWaterMarkApp {
    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        DataStream<UserTimeInfo> userInfoDataStream = env.addSource(new UserTimeSource());
        //  UserTimeWaterMarkByRowTime 这个时间可以替换
        DataStream<UserTimeInfo> timedData = userInfoDataStream.assignTimestampsAndWatermarks(new UserTimeWaterMarkByRowTime());
        StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
        tableEnv.registerDataStream("test", timedData, "userId,pTime.rowtime");
        Table result = tableEnv.sqlQuery("SELECT userId,TUMBLE_END(pTime, INTERVAL '5' SECOND) as pTime,count(1) as cnt FROM  test" +
                " GROUP BY TUMBLE(pTime, INTERVAL '5' SECOND),userId ");
        // deal with (Tuple2<Boolean, Row> value) -> out.collect(row)
        SingleOutputStreamOperator allClick = tableEnv.toRetractStream(result, Row.class)
                .flatMap((Tuple2<Boolean, Row> value, Collector<Row> out) -> {
                    out.collect(value.f1);
                }).returns(Row.class);
        // add sink or print
        allClick.print();
        env.execute("test");
    }

}
public class UserTimeWaterMarkApp {
    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        DataStream<UserTimeInfo> userInfoDataStream = env.addSource(new UserTimeSource());

        DataStream<UserTimeInfo> timedData = userInfoDataStream.assignTimestampsAndWatermarks(new UserTimeWaterMarkByRowTime());
        StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
        tableEnv.registerDataStream("test", timedData, "userId,pTime.rowtime");
        Table result = tableEnv.sqlQuery("SELECT userId,TUMBLE_END(pTime, INTERVAL '5' SECOND) as pTime,count(1) as cnt FROM  test" +
                " GROUP BY TUMBLE(pTime, INTERVAL '5' SECOND),userId ");
        // deal with (Tuple2<Boolean, Row> value) -> out.collect(row)
        SingleOutputStreamOperator allClick = tableEnv.toRetractStream(result, Row.class)
                .flatMap((Tuple2<Boolean, Row> value, Collector<Row> out) -> {
                    out.collect(value.f1);
                }).returns(Row.class);
        // add sink or print
        allClick.print();
        env.execute("test");
    }

小结:

1.这个是基于flink 1.7 跑的

2.代码比较简单,也好理解,有问题直接私信我

0顶

0踩

分享到:

flink-table-sql-demo2

评论


以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

Convergence Culture

Convergence Culture

Henry Jenkins / NYU Press / 2006-08-01 / USD 30.00

"Convergence Culture" maps a new territory: where old and new media intersect, where grassroots and corporate media collide, where the power of the media producer, and the power of the consumer intera......一起来看看 《Convergence Culture》 这本书的介绍吧!

HTML 压缩/解压工具
HTML 压缩/解压工具

在线压缩/解压 HTML 代码

随机密码生成器
随机密码生成器

多种字符组合密码