Flink Catalog 介绍

栏目: IT技术 · 发布时间: 4年前

内容简介:这篇文章我们介绍了一下 Flink 的 Catalog,基于 Flink 1.11,熟悉 Flink 或者 Spark 等大数据引擎的同学应该都知道这两个计算引擎都有一个共同的组件叫 Catalog。下面是 Flink 的 Catalog 的官方定义。Catalog 提供了元数据信息,例如数据库、表、分区、视图以及数据库或其他外部系统中存储的函数和信息。数据处理最关键的方面之一是管理元数据。 元数据可以是临时的,例如临时表、或者通过 TableEnvironment 注册的 UDF。 元数据也可以是持久化的

这篇文章我们介绍了一下 Flink 的 Catalog,基于 Flink 1.11,熟悉 Flink 或者 Spark 等大数据引擎的同学应该都知道这两个计算引擎都有一个共同的组件叫 Catalog。下面是 Flink 的 Catalog 的官方定义。

Catalog 提供了元数据信息,例如数据库、表、分区、视图以及数据库或其他外部系统中存储的函数和信息。

数据处理最关键的方面之一是管理元数据。 元数据可以是临时的,例如临时表、或者通过 TableEnvironment 注册的 UDF。 元数据也可以是持久化的,例如 Hive Metastore 中的元数据。Catalog 提供了一个统一的API,用于管理元数据,并使其可以从 Table API 和 SQL 查询语句中来访问。

简单来说,Catalog 就是元数据管理中心,其中元数据包括数据库、表、表结构等信息。

1. Catalog 定义

Flink 的 Catalog 相关代码定义在 catalog.java 文件中,是一个 interface,如下。

/**
 * This interface is responsible for reading and writing metadata such as database/table/views/UDFs
 * from a registered catalog. It connects a registered catalog and Flink's Table API.
 */
@PublicEvolving
public interface Catalog {
  ...
}

既然是 interface,我们来看一下支持的操作。

Flink Catalog 介绍

我们可以将这些接口做一个简单的分类。

  • Database 相关操作

    • getDefaultDataBase:获取默认的 database
    • getDatabase:获取特定的 database
    • listDatabases:列出所有的 database
    • databaseExists:判断 database 是否存在
    • createDatabases:创建 database
    • dropDatabases:删除 database
    • alterDatabases:修改 database
  • Table 相关操作,一般都会有个参数是 database

    • listTables:列出所有的 table 和 view
    • getTable:获取指定的 table 或者 view
    • tableExist:判断 table 或者 view 是否存在
    • dropTable:删除 table 或者 view
    • createTable:创建 table 或者 view
    • renameTable:重命名 table 或者 view
    • alterTable:修改 table 或者 view
  • View 相关操作,除了和 table 共用方法外,还有一个独有的方法

    • listViews:列出所有的 view
  • Partition 相关操作,partition 是 table 的一个属性,所以参数一般都会带有 table 信息

    • listPartition:列出 table 的所有 partition
    • getPartition:获取指定的 partition
    • partitionExist:判断 parition 是否存在
    • createPartition:创建 partition
    • dropPartition:删除 partition
    • alterPartition:修改 parition
  • Function 相关操作,这里的 function 知道的是用户自定义的 function,也就是 Udf

    • listFunctions:列出所有的 function

    • getFunction:获取指定的 func

    • functionExist:判断 function 是否存在

    • dropFunction:删除 function

    • alterFunction:修改 function

2. Catalog 的实现

Catalog 作为 interface,我们看一下有哪些实现类。下图是通过 Idea 查看 Type Hierarchy 的结果。

Flink Catalog 介绍

从上图我们可以看到 Catalog 的最终实现有三个类:

  • HiveCatalog:使用 Hive 的元数据来作为 Flink 的 HiveCatalog
  • GenericInMemoryCatalog:使用内存实现 Catalog
  • JdbcCatalog:使用其他支持 jdbc 协议的关系型数据库来存储元数据
  • PostgresCatalog:使用 Postgres 数据库来作为 Catalog 存储元数据

3. Catalog 使用举例

下面的示例是 Flink SQL 使用 Catalog 的示例。

TableEnvironment tableEnv = ...

// Create a HiveCatalog 
Catalog catalog = new HiveCatalog("myhive", null, "<path_of_hive_conf>", "<hive_version>");

// Register the catalog
tableEnv.registerCatalog("myhive", catalog);

// Create a catalog database
tableEnv.executeSql("CREATE DATABASE mydb WITH (...)");

// Create a catalog table
tableEnv.executeSql("CREATE TABLE mytable (name STRING, age INT) WITH (...)");

tableEnv.listTables(); // should return the tables in current catalog and database.

下面是 api 的方式来使用 Catalog

import org.apache.flink.table.api.*;
import org.apache.flink.table.catalog.*;
import org.apache.flink.table.catalog.hive.HiveCatalog;
import org.apache.flink.table.descriptors.Kafka;

TableEnvironment tableEnv = TableEnvironment.create(EnvironmentSettings.newInstance().build());

// Create a HiveCatalog
Catalog catalog = new HiveCatalog("myhive", null, "<path_of_hive_conf>", "<hive_version>");

// Register the catalog
tableEnv.registerCatalog("myhive", catalog);

// Create a catalog database
catalog.createDatabase("mydb", new CatalogDatabaseImpl(...));

// Create a catalog table
TableSchema schema = TableSchema.builder()
    .field("name", DataTypes.STRING())
    .field("age", DataTypes.INT())
    .build();

catalog.createTable(
        new ObjectPath("mydb", "mytable"),
        new CatalogTableImpl(
            schema,
            new Kafka()
                .version("0.11")
                ....
                .startFromEarlist()
                .toProperties(),
            "my comment"
        ),
        false
    );

List<String> tables = catalog.listTables("mydb"); // tables should contain "mytable"

4. 自定义 Catalog

Catalog 是可扩展的,用户可以通过实现 Catalog 接口来开发自定义 Catalog。 想要在 SQL CLI 中使用自定义 Catalog,用户除了需要实现自定义的 Catalog 之外,还需要为这个 Catalog 实现对应的 CatalogFactory 接口。

CatalogFactory 定义了一组属性,用于 SQL CLI 启动时配置 Catalog。 这组属性集将传递给发现服务,在该服务中,服务会尝试将属性关联到 CatalogFactory 并初始化相应的 Catalog 实例。

5. 总结

这篇文章写的比较简单,相当于自己的学习笔记,下一篇文章我们比较一下 Spark 的 Catalog 实现。


以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

编程卓越之道

编程卓越之道

Hyde R / 韩东海 / 电子工业出版社 / 2006-4-1 / 49.80

各位程序员一定希望自己编写的代码是能让老板赞赏、满意的代码;是能让客户乐意掏钱购买的代码;是能让使用者顺利使用的代码;是能让同行欣赏赞誉的代码;是能让自己引以为豪的卓越代码。本书作者为希望能编写出卓越代码的人提供了自己积累的关于卓越编程的真知灼见。它弥补了计算机科学和工程课程中被忽略的一个部分——底层细节,而这正是构建卓越代码的基石。具体内容包括:计算机数据表示法,二进制数学运算与位运算,内存组织......一起来看看 《编程卓越之道》 这本书的介绍吧!

HTML 压缩/解压工具
HTML 压缩/解压工具

在线压缩/解压 HTML 代码

HTML 编码/解码
HTML 编码/解码

HTML 编码/解码

RGB HSV 转换
RGB HSV 转换

RGB HSV 互转工具