Flink SQL Table 我们一起去看2018中超联赛-Flink牛刀小试

栏目: 数据库 · 发布时间: 6年前

内容简介:版权声明:本套技术专栏是作者(秦凯新)平时工作的总结和升华,通过从真实商业环境抽取案例进行总结和分享,并给出商业应用的调优建议和集群环境容量规划等内容,请持续关注本套博客。版权声明:禁止转载,欢迎学习。QQ邮箱地址:1120746959@qq.com,如有任何问题,可随时联系。Flink是一个新型的流式处理引擎,作者自身只是对Spark底层较为熟悉,有兴趣可以查阅我的Spark core ,Spark String 以及 Spark SQL 源码解读系列。在这里我们只是品味一下号称第四代大数据处理引擎的F

版权声明:本套技术专栏是作者(秦凯新)平时工作的总结和升华,通过从真实商业环境抽取案例进行总结和分享,并给出商业应用的调优建议和集群环境容量规划等内容,请持续关注本套博客。版权声明:禁止转载,欢迎学习。QQ邮箱地址:1120746959@qq.com,如有任何问题,可随时联系。

写在前面的话

Flink是一个新型的流式处理引擎,作者自身只是对Spark底层较为熟悉,有兴趣可以查阅我的Spark core ,Spark String 以及 Spark SQL 源码解读系列。在这里我们只是品味一下号称第四代大数据处理引擎的Flink,作者也并没有深入到Flink底层源码级别。请见谅如果您已经是FLink大牛了!看一下2018中超联赛积分榜:

Flink SQL Table 我们一起去看2018中超联赛-Flink牛刀小试

1 SQL Table(牛刀小试)

The Table API is a declarative DSL centered around tables, which may be dynamically changing tables (when representing streams). The Table API follows the (extended) relational model: Tables have a schema attached (similar to tables in relational databases) and the API offers comparable operations, such as select, project, join, group-by, aggregate, etc. Table API programs declaratively define what logical operation should be done rather than specifying exactly how the code for the operation looks. Though the Table API is extensible by various types of user-defined functions, it is less expressive than the Core APIs, but more concise to use (less code to write). In addition, Table API programs also go through an optimizer that applies optimization rules before execution.

One can seamlessly convert between tables and DataStream/DataSet, allowing programs to mix Table API and with the DataStream and DataSet APIs.

Flink SQL Table 我们一起去看2018中超联赛-Flink牛刀小试

The highest level abstraction offered by Flink is SQL. This abstraction is similar to the Table API both in semantics and expressiveness, but represents programs as SQL query expressions. The SQL abstraction closely interacts with the Table API, and SQL queries can be executed over tables defined in the Table API.

2 上代码分析(球队粒度进行进球聚合排序)

  • 1 进行pojo对象的数据封装。
  • 2 BatchTableEnvironment tableEnv环境生成: BatchTableEnvironment.getTableEnvironment(env);
  • 3 Table表生成:Table topScore = tableEnv.fromDataSet(topInput)
  • 4 Table表注册:tableEnv.registerTable("topScore",topScore);
  • 5 Table表查询:tableEnv.sqlQuery
  • 6 Table表转换回DataSet: tableEnv.toDataSet

2.1 详情请参考代码

import org.apache.flink.api.common.functions.MapFunction;
    import org.apache.flink.api.java.DataSet;
    import org.apache.flink.api.java.ExecutionEnvironment;
    import org.apache.flink.api.java.tuple.Tuple2;
    import org.apache.flink.streaming.api.datastream.DataStreamSource;
    import org.apache.flink.table.api.Table;
    import org.apache.flink.table.api.java.BatchTableEnvironment;
    
    public class TableSQL {
    
        public static void main(String[] args) throws Exception{
            ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(1);
            BatchTableEnvironment tableEnv = BatchTableEnvironment.getTableEnvironment(env);
            
            DataSet<String> input = env.readTextFile("C:\\CoreForBigData\\FLINK\\TopCore.csv");
            input.print();
            DataSet<TopScorers> topInput = input.map(new MapFunction<String, TopScorers>() {
                @Override
                public TopScorers map(String s) throws Exception {
                    String[] splits = s.split("\t");
                    return new TopScorers(Integer.valueOf(splits[0]),splits[1],splits[2],Integer.valueOf(splits[3]),Integer.valueOf(splits[4]),Integer.valueOf(splits[5]),Integer.valueOf(splits[6]),Integer.valueOf(splits[7]),Integer.valueOf(splits[8]),Integer.valueOf(splits[9]),Integer.valueOf(splits[10]));
                }
            });

            //将DataSet转换为Table
            Table topScore = tableEnv.fromDataSet(topInput);
            //将topScore注册为一个表
            tableEnv.registerTable("topScore",topScore);
    
            Table tapiResult = tableEnv.scan("topScore").select("club");
            tapiResult.printSchema();
    
            Table groupedByCountry = tableEnv.sqlQuery("select club, sum(jinqiu) as sum_score from topScore group by club order by sum_score desc");
            
            //转换回dataset
            DataSet<Result> result = tableEnv.toDataSet(groupedByCountry, Result.class);
    
            //将dataset map成tuple输出
            result.map(new MapFunction<Result, Tuple2<String,Integer>>() {
                @Override
                public Tuple2<String, Integer> map(Result result) throws Exception {
                    String country = result.club;
                    int sum_total_score = result.sum_score;
                    return Tuple2.of(country,sum_total_score);
                }
            }).print();
    
        }
    
        /**
         * 源数据的映射类
         */
        public static class TopScorers {
            /**
             * 排名,球员,球队,出场,进球,射正,任意球,犯规,黄牌,红牌
             */
            public Integer rank;
            public String player;
            public String club;
            public Integer chuchang;
            public Integer jinqiu;
            public Integer zhugong;
            public Integer shezheng;
            public Integer renyiqiu;
            public Integer fangui;
            public Integer huangpai;
            public Integer hongpai;
    
            public TopScorers() {
                super();
            }
    
            public TopScorers(Integer rank, String player, String club, Integer chuchang, Integer jinqiu, Integer zhugong, Integer shezheng, Integer renyiqiu, Integer fangui, Integer huangpai, Integer hongpai) {
                this.rank = rank;
                this.player = player;
                this.club = club;
                this.chuchang = chuchang;
                this.jinqiu = jinqiu;
                this.zhugong = zhugong;
                this.shezheng = shezheng;
                this.renyiqiu = renyiqiu;
                this.fangui = fangui;
                this.huangpai = huangpai;
                this.hongpai = hongpai;
            }
        }
    
        /**
         * 统计结果对应的类
         */
        public static class Result {
            public String club;
            public Integer sum_score;
    
            public Result() {}
        }
    }
复制代码

2.2 结果展示(2018恒大队很厉害,进球55个)

Flink SQL Table 我们一起去看2018中超联赛-Flink牛刀小试

3 理论升华一下

3.1 Create a TableEnvironment

// ***************
// STREAMING QUERY
// ***************
StreamExecutionEnvironment sEnv = StreamExecutionEnvironment.getExecutionEnvironment();
// create a TableEnvironment for streaming queries
StreamTableEnvironment sTableEnv = TableEnvironment.getTableEnvironment(sEnv);

// ***********
// BATCH QUERY
// ***********
ExecutionEnvironment bEnv = ExecutionEnvironment.getExecutionEnvironment();
// create a TableEnvironment for batch queries
BatchTableEnvironment bTableEnv = TableEnvironment.getTableEnvironment(bEnv);
复制代码

3.2 TTL用法

// get a StreamTableEnvironment, works for BatchTableEnvironment equivalently
StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
// register Orders table
// scan registered Orders table
Table orders = tableEnv.scan("Orders");
// compute revenue for all customers from France
Table revenue = orders
  .filter("cCountry === 'FRANCE'")
  .groupBy("cID, cName")
  .select("cID, cName, revenue.sum AS revSum");

// emit or convert Table
// execute query
复制代码

3.3 Register a DataStream or DataSet as Table

// get StreamTableEnvironment
// registration of a DataSet in a BatchTableEnvironment is equivalent
StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);

DataStream<Tuple2<Long, String>> stream = ...

// register the DataStream as Table "myTable" with fields "f0", "f1"
tableEnv.registerDataStream("myTable", stream);

// register the DataStream as table "myTable2" with fields "myLong", "myString"
tableEnv.registerDataStream("myTable2", stream, "myLong, myString");
复制代码

3.4 Convert a DataStream or DataSet into a Table

// get StreamTableEnvironment
// registration of a DataSet in a BatchTableEnvironment is equivalent
StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);

DataStream<Tuple2<Long, String>> stream = ...

// Convert the DataStream into a Table with default fields "f0", "f1"
Table table1 = tableEnv.fromDataStream(stream);

// Convert the DataStream into a Table with fields "myLong", "myString"
Table table2 = tableEnv.fromDataStream(stream, "myLong, myString");
复制代码

4 收工

通过2018中超联赛,我们管中窥豹,学会了Flink SQL Table 的核心思想,当然本文并不完善,希望本文能够给大家带来一些收获。辛苦成文,彼此珍惜,谢谢!

版权声明:本套技术专栏是作者(秦凯新)平时工作的总结和升华,通过从真实商业环境抽取案例进行总结和分享,并给出商业应用的调优建议和集群环境容量规划等内容,请持续关注本套博客。版权声明:禁止转载,欢迎学习。QQ邮箱地址:1120746959@qq.com,如有任何问题,可随时联系。

秦凯新 于深圳 201811262252


以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

在线

在线

王坚 / 中信出版集团股份有限公司 / 2016-9-1 / CNY 58.00

互联网成为基础设施,数据成为生产资料,计算成为公共服务。 移动互联网带来的真正影响,是人们的大部分时间都消耗在在线社会上了。 50多万年前的关键词是光明与黑暗,50多年前的关键词是数字和模拟,而今天的关键词是在线与离线。 移动互联网是比传统互联网在线程度更深的互联网。手机操作系统一旦做到了在线就会带来绝佳的用户体验。苹果手机不仅淘汰了传统手机,而且带来了一个新的时代。 对于......一起来看看 《在线》 这本书的介绍吧!

正则表达式在线测试
正则表达式在线测试

正则表达式在线测试

HEX CMYK 转换工具
HEX CMYK 转换工具

HEX CMYK 互转工具

HEX HSV 转换工具
HEX HSV 转换工具

HEX HSV 互换工具