Flink SQL Table 我们一起去看2018中超联赛-Flink牛刀小试

栏目: 数据库 · 发布时间: 7年前

内容简介:版权声明:本套技术专栏是作者(秦凯新)平时工作的总结和升华,通过从真实商业环境抽取案例进行总结和分享,并给出商业应用的调优建议和集群环境容量规划等内容,请持续关注本套博客。版权声明:禁止转载,欢迎学习。QQ邮箱地址:1120746959@qq.com,如有任何问题,可随时联系。Flink是一个新型的流式处理引擎,作者自身只是对Spark底层较为熟悉,有兴趣可以查阅我的Spark core ,Spark String 以及 Spark SQL 源码解读系列。在这里我们只是品味一下号称第四代大数据处理引擎的F

版权声明:本套技术专栏是作者(秦凯新)平时工作的总结和升华,通过从真实商业环境抽取案例进行总结和分享,并给出商业应用的调优建议和集群环境容量规划等内容,请持续关注本套博客。版权声明:禁止转载,欢迎学习。QQ邮箱地址:1120746959@qq.com,如有任何问题,可随时联系。

写在前面的话

Flink是一个新型的流式处理引擎,作者自身只是对Spark底层较为熟悉,有兴趣可以查阅我的Spark core ,Spark String 以及 Spark SQL 源码解读系列。在这里我们只是品味一下号称第四代大数据处理引擎的Flink,作者也并没有深入到Flink底层源码级别。请见谅如果您已经是FLink大牛了!看一下2018中超联赛积分榜:

Flink SQL Table 我们一起去看2018中超联赛-Flink牛刀小试

1 SQL Table(牛刀小试)

The Table API is a declarative DSL centered around tables, which may be dynamically changing tables (when representing streams). The Table API follows the (extended) relational model: Tables have a schema attached (similar to tables in relational databases) and the API offers comparable operations, such as select, project, join, group-by, aggregate, etc. Table API programs declaratively define what logical operation should be done rather than specifying exactly how the code for the operation looks. Though the Table API is extensible by various types of user-defined functions, it is less expressive than the Core APIs, but more concise to use (less code to write). In addition, Table API programs also go through an optimizer that applies optimization rules before execution.

One can seamlessly convert between tables and DataStream/DataSet, allowing programs to mix Table API and with the DataStream and DataSet APIs.

Flink SQL Table 我们一起去看2018中超联赛-Flink牛刀小试

The highest level abstraction offered by Flink is SQL. This abstraction is similar to the Table API both in semantics and expressiveness, but represents programs as SQL query expressions. The SQL abstraction closely interacts with the Table API, and SQL queries can be executed over tables defined in the Table API.

2 上代码分析(球队粒度进行进球聚合排序)

  • 1 进行pojo对象的数据封装。
  • 2 BatchTableEnvironment tableEnv环境生成: BatchTableEnvironment.getTableEnvironment(env);
  • 3 Table表生成:Table topScore = tableEnv.fromDataSet(topInput)
  • 4 Table表注册:tableEnv.registerTable("topScore",topScore);
  • 5 Table表查询:tableEnv.sqlQuery
  • 6 Table表转换回DataSet: tableEnv.toDataSet

2.1 详情请参考代码

import org.apache.flink.api.common.functions.MapFunction;
    import org.apache.flink.api.java.DataSet;
    import org.apache.flink.api.java.ExecutionEnvironment;
    import org.apache.flink.api.java.tuple.Tuple2;
    import org.apache.flink.streaming.api.datastream.DataStreamSource;
    import org.apache.flink.table.api.Table;
    import org.apache.flink.table.api.java.BatchTableEnvironment;
    
    public class TableSQL {
    
        public static void main(String[] args) throws Exception{
            ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(1);
            BatchTableEnvironment tableEnv = BatchTableEnvironment.getTableEnvironment(env);
            
            DataSet<String> input = env.readTextFile("C:\\CoreForBigData\\FLINK\\TopCore.csv");
            input.print();
            DataSet<TopScorers> topInput = input.map(new MapFunction<String, TopScorers>() {
                @Override
                public TopScorers map(String s) throws Exception {
                    String[] splits = s.split("\t");
                    return new TopScorers(Integer.valueOf(splits[0]),splits[1],splits[2],Integer.valueOf(splits[3]),Integer.valueOf(splits[4]),Integer.valueOf(splits[5]),Integer.valueOf(splits[6]),Integer.valueOf(splits[7]),Integer.valueOf(splits[8]),Integer.valueOf(splits[9]),Integer.valueOf(splits[10]));
                }
            });

            //将DataSet转换为Table
            Table topScore = tableEnv.fromDataSet(topInput);
            //将topScore注册为一个表
            tableEnv.registerTable("topScore",topScore);
    
            Table tapiResult = tableEnv.scan("topScore").select("club");
            tapiResult.printSchema();
    
            Table groupedByCountry = tableEnv.sqlQuery("select club, sum(jinqiu) as sum_score from topScore group by club order by sum_score desc");
            
            //转换回dataset
            DataSet<Result> result = tableEnv.toDataSet(groupedByCountry, Result.class);
    
            //将dataset map成tuple输出
            result.map(new MapFunction<Result, Tuple2<String,Integer>>() {
                @Override
                public Tuple2<String, Integer> map(Result result) throws Exception {
                    String country = result.club;
                    int sum_total_score = result.sum_score;
                    return Tuple2.of(country,sum_total_score);
                }
            }).print();
    
        }
    
        /**
         * 源数据的映射类
         */
        public static class TopScorers {
            /**
             * 排名,球员,球队,出场,进球,射正,任意球,犯规,黄牌,红牌
             */
            public Integer rank;
            public String player;
            public String club;
            public Integer chuchang;
            public Integer jinqiu;
            public Integer zhugong;
            public Integer shezheng;
            public Integer renyiqiu;
            public Integer fangui;
            public Integer huangpai;
            public Integer hongpai;
    
            public TopScorers() {
                super();
            }
    
            public TopScorers(Integer rank, String player, String club, Integer chuchang, Integer jinqiu, Integer zhugong, Integer shezheng, Integer renyiqiu, Integer fangui, Integer huangpai, Integer hongpai) {
                this.rank = rank;
                this.player = player;
                this.club = club;
                this.chuchang = chuchang;
                this.jinqiu = jinqiu;
                this.zhugong = zhugong;
                this.shezheng = shezheng;
                this.renyiqiu = renyiqiu;
                this.fangui = fangui;
                this.huangpai = huangpai;
                this.hongpai = hongpai;
            }
        }
    
        /**
         * 统计结果对应的类
         */
        public static class Result {
            public String club;
            public Integer sum_score;
    
            public Result() {}
        }
    }
复制代码

2.2 结果展示(2018恒大队很厉害,进球55个)

Flink SQL Table 我们一起去看2018中超联赛-Flink牛刀小试

3 理论升华一下

3.1 Create a TableEnvironment

// ***************
// STREAMING QUERY
// ***************
StreamExecutionEnvironment sEnv = StreamExecutionEnvironment.getExecutionEnvironment();
// create a TableEnvironment for streaming queries
StreamTableEnvironment sTableEnv = TableEnvironment.getTableEnvironment(sEnv);

// ***********
// BATCH QUERY
// ***********
ExecutionEnvironment bEnv = ExecutionEnvironment.getExecutionEnvironment();
// create a TableEnvironment for batch queries
BatchTableEnvironment bTableEnv = TableEnvironment.getTableEnvironment(bEnv);
复制代码

3.2 TTL用法

// get a StreamTableEnvironment, works for BatchTableEnvironment equivalently
StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
// register Orders table
// scan registered Orders table
Table orders = tableEnv.scan("Orders");
// compute revenue for all customers from France
Table revenue = orders
  .filter("cCountry === 'FRANCE'")
  .groupBy("cID, cName")
  .select("cID, cName, revenue.sum AS revSum");

// emit or convert Table
// execute query
复制代码

3.3 Register a DataStream or DataSet as Table

// get StreamTableEnvironment
// registration of a DataSet in a BatchTableEnvironment is equivalent
StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);

DataStream<Tuple2<Long, String>> stream = ...

// register the DataStream as Table "myTable" with fields "f0", "f1"
tableEnv.registerDataStream("myTable", stream);

// register the DataStream as table "myTable2" with fields "myLong", "myString"
tableEnv.registerDataStream("myTable2", stream, "myLong, myString");
复制代码

3.4 Convert a DataStream or DataSet into a Table

// get StreamTableEnvironment
// registration of a DataSet in a BatchTableEnvironment is equivalent
StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);

DataStream<Tuple2<Long, String>> stream = ...

// Convert the DataStream into a Table with default fields "f0", "f1"
Table table1 = tableEnv.fromDataStream(stream);

// Convert the DataStream into a Table with fields "myLong", "myString"
Table table2 = tableEnv.fromDataStream(stream, "myLong, myString");
复制代码

4 收工

通过2018中超联赛,我们管中窥豹,学会了Flink SQL Table 的核心思想,当然本文并不完善,希望本文能够给大家带来一些收获。辛苦成文,彼此珍惜,谢谢!

版权声明:本套技术专栏是作者(秦凯新)平时工作的总结和升华,通过从真实商业环境抽取案例进行总结和分享,并给出商业应用的调优建议和集群环境容量规划等内容,请持续关注本套博客。版权声明:禁止转载,欢迎学习。QQ邮箱地址:1120746959@qq.com,如有任何问题,可随时联系。

秦凯新 于深圳 201811262252


以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

黑客简史:棱镜中的帝国

黑客简史:棱镜中的帝国

刘创 / 电子工业出版社 / 2015-1 / 39.80元

“黑客”,伴随着计算机和互联网而诞生,他们掌握着前沿的计算机和网络技术,能够发现并利用计算机系统和网络的弱点,他们的行为动机多样,因此我们必须对这一群体进行分解,认识他们及其技术的两面性——“黑客”中那些不断拓展技术边界、富于创造力的,和那些掌握技术、却利欲熏心的,就像硬币的两面,谁都无法清晰地辨别是非。相对于主流文化,黑客的行为方式和理念等形成了一种“亚文化”,与主流文化相互作用。一起来看看 《黑客简史:棱镜中的帝国》 这本书的介绍吧!

图片转BASE64编码
图片转BASE64编码

在线图片转Base64编码工具

Base64 编码/解码
Base64 编码/解码

Base64 编码/解码

XML 在线格式化
XML 在线格式化

在线 XML 格式化压缩工具