Hive 与 ElasticSearch 的数据交互

栏目: 服务器 · 发布时间: 5年前

内容简介:本文将详细介绍利用 ES 与 Hive 直接的数据交互;通过 Hive 外部表的方式,可以快速将 ES 索引数据映射到 Hive 中,使用易于上手的 Hive SQL 实现对数据的进一步加工。Hive 在 Hadoop 生态系统中扮演着数据仓库的角色,借助 Hive 可以方便地进行数据汇总、即席查询以及分析存储在 Hadoop 文件系统中的大型数据集。Hive 通过类 SQL 语言(HSQL)对 Hadoop 上的数据进行抽象,这样用户可以通过 SQL 语句对数据进行定义、组织、操作和分析;在 Hive 中

本文将详细介绍利用 ES 与 Hive 直接的数据交互;通过 Hive 外部表的方式,可以快速将 ES 索引数据映射到 Hive 中,使用易于上手的 Hive SQL 实现对数据的进一步加工。

一、开发环境

1、组件版本

  • CDH 集群版本:6.0.1
  • ES 版本:6.5.1
  • Hive 版本:2.1.1
  • ES-Hadoop 版本:6.5.1

2、Hive 简介

Hive 在 Hadoop 生态系统中扮演着数据仓库的角色,借助 Hive 可以方便地进行数据汇总、即席查询以及分析存储在 Hadoop 文件系统中的大型数据集。

Hive 通过类 SQL 语言(HSQL)对 Hadoop 上的数据进行抽象,这样用户可以通过 SQL 语句对数据进行定义、组织、操作和分析;在 Hive 中,数据集是通过表(定义了数据类型相关信息)进行定义的,用户可以通过内置运算符或用户自定义函数(UDF)对数据进行加载、查询和转换。

3、Hive 安装 ES-Hadoop

官方推荐的安装方式:

使用 add jar

add jar /path/elasticsearch-hadoop.jar
复制代码

使用 hive.aux.jars.path

$ bin/hive --auxpath=/path/elasticsearch-hadoop.jar
复制代码

修改配置( hive-site.xml )

<property>
  <name>hive.aux.jars.path</name>
  <value>/path/elasticsearch-hadoop.jar</value>
  <description>A comma separated list (with no spaces) of the jar files</description>
</property>
复制代码

CDH6.X 推荐的安装方法

elasticsearch-hadoop.jar 复制到 Hive 的 auxlib 目录中,然后重启 Hive 即可。

cp elasticsearch-hadoop.jar /opt/cloudera/parcels/CDH/lib/hive/auxlib/
复制代码

二、Hive 与 ElasticSearch 的数据交互

1、数据类型对照表

请务必注意,ES 中的类型是 index/_mapping 中对应的数据类型,非 _source 里面数据的类型。

Hive type Elasticsearch type
void null
boolean boolean
tinyint byte
smallint short
int int
bigint long
double double
float float
string string
binary binary
timestamp date
struct map
map map
array array
union not supported (yet)
decimal string
date date
varchar string
char string

2、建立 Hive 外部表

CREATE EXTERNAL TABLE default.surface(
    water_type STRING,
    water_level STRING,
    monitor_time TIMESTAMP,
    sitecode STRING,
    p492 DOUBLE,
    p311 DOUBLE,
    status STRING
)
STORED BY 'org.elasticsearch.hadoop.hive.EsStorageHandler'
TBLPROPERTIES(
    'es.resource'='ods_data_day_surface*/doc',
    'es.query'='?q=status:001'
    'es.nodes'='sky-01','es.port'='9200',
    'es.net.http.auth.user'='sky',
    'es.net.http.auth.pass'='jointsky',
    'es.date.format'='yyyy-MM-dd HH:mm:ss',
    'es.ser.reader.value.class'='com.jointsky.bigdata.hive.EsValueReader'
    'es.mapping.names'='waterType:water_type,monitortime:monitor_time'
);
复制代码

3、配置项说明

es.resource

es.resource 用于设置 ES 资源的位置,默认该配置项同时设置了读和写的索引,当然也可以分别设置读写索引名称:

es.resource.read
es.resource.write

es.query

es.query 设置查询过滤条件,目前支持 uri queryquery dslexternal resource 三种设置方式。

# uri (or parameter) query
es.query = ?q=costinl

# query dsl
es.query = { "query" : { "term" : { "user" : "costinl" } } }

# external resource
es.query = org/mypackage/myquery.json
复制代码

es.mapping.names

es.mapping.names 用于设置 Hive 与 ES 的字段映射关系,如果不设置,则默认字段名不发生变化(即为 data type 区域定义的字段名);此外该部分还用于定义 Hive 到 ES 的数据映射类型。

'es.mapping.names' = 'date:@timestamp , url:url_123 ')
复制代码

其他通用字段的说明请参考文章: 使用 ES-Hadoop 将 Spark Streaming 流数据写入 ES

4、自定义日期类型解析

目前将 ES 的 date 类型映射到 Hive 的 TIMESTAMP 类型时,ES-Hadoop 组件只能识别时间戳格式或者标准的 XSD 格式的日期字符串:

@Override
protected Object parseDate(Long value, boolean richDate) {
    return (richDate ? new TimestampWritable(new Timestamp(value)) : processLong(value));
}

@Override
protected Object parseDate(String value, boolean richDate) {
    return (richDate ? new TimestampWritable(new Timestamp(DatatypeConverter.parseDateTime(value).getTimeInMillis())) : parseString(value));
}
复制代码

关于 XSD(XML Schema Date/Time Datatypes)可用参考文章: www.w3schools.com/xml/schema_…

为了兼容自定义的日期格式,需要编写自定义的日期读取类:

import org.apache.hadoop.hive.serde2.io.TimestampWritable;
import org.elasticsearch.hadoop.cfg.Settings;
import org.elasticsearch.hadoop.hive.HiveValueReader;

import java.sql.Timestamp;
import java.text.ParseException;
import java.text.ParsePosition;
import java.text.SimpleDateFormat;
import java.util.Date;

public class EsValueReader extends HiveValueReader {
    private String dateFormat;
    private static final String DEFAULT_DATE_FORMAT = "yyyy-MM-dd HH:mm:ss";
    private static final String DEFAULT_DATE_FORMAT_MIN = "yyyy-MM-dd HH:mm";
    private static final String DEFAULT_DATE_FORMAT_HOUR = "yyyy-MM-dd HH";
    private static final String DEFAULT_DATE_FORMAT_DAY = "yyyy-MM-dd";

    @Override
    public void setSettings(Settings settings) {
        super.setSettings(settings);
        dateFormat = settings.getProperty("es.date.format");
    }

    @Override
    protected Object parseDate(String value, boolean richDate) {
        if (value != null && value.trim().length() > 0 && DEFAULT_DATE_FORMAT.equalsIgnoreCase(dateFormat)) {
            if (richDate){
                if (value.length() == 16){
                    return new TimestampWritable(new Timestamp(parseDate(value, DEFAULT_DATE_FORMAT_MIN).getTime()));
                }
                if (value.length() == 13){
                    return new TimestampWritable(new Timestamp(parseDate(value, DEFAULT_DATE_FORMAT_HOUR).getTime()));
                }
                if (value.length() == 10){
                    return new TimestampWritable(new Timestamp(parseDate(value, DEFAULT_DATE_FORMAT_DAY).getTime()));
                }
                return new TimestampWritable(new Timestamp(parseDate(value, DEFAULT_DATE_FORMAT).getTime()));
            }
            return parseString(value);
        }
        return super.parseDate(value, richDate);
    }

    /**
     * 解析日期,根據指定的格式進行解析.<br>
     * 如果解析錯誤,則返回null
     * @param stringDate 日期字串
     * @param format 日期格式
     * @return 日期型別
     */
    private static Date parseDate(String stringDate, String format) {
        if (stringDate == null) {
            return null;
        }
        try {
            return parseDate(stringDate, new String[] { format });
        } catch (ParseException e) {
            return null;
        }
    }

    public static Date parseDate(String str, String... parsePatterns) throws ParseException {
        return parseDateWithLeniency(str, parsePatterns, true);
    }

    private static Date parseDateWithLeniency(
            String str, String[] parsePatterns, boolean lenient) throws ParseException {
        if (str == null || parsePatterns == null) {
            throw new IllegalArgumentException("Date and Patterns must not be null");
        }

        SimpleDateFormat parser = new SimpleDateFormat();
        parser.setLenient(lenient);
        ParsePosition pos = new ParsePosition(0);
        for (String parsePattern : parsePatterns) {
            String pattern = parsePattern;
            if (parsePattern.endsWith("ZZ")) {
                pattern = pattern.substring(0, pattern.length() - 1);
            }
            parser.applyPattern(pattern);
            pos.setIndex(0);
            String str2 = str;
            if (parsePattern.endsWith("ZZ")) {
                str2 = str.replaceAll("([-+][0-9][0-9]):([0-9][0-9])$", "$1$2");
            }
            Date date = parser.parse(str2, pos);
            if (date != null && pos.getIndex() == str2.length()) {
                return date;
            }
        }
        throw new ParseException("Unable to parse the date: " + str, -1);
    }
}
复制代码

上述代码的 Maven 依赖

<dependencies>
    <dependency>
        <groupId>org.apache.hive</groupId>
        <artifactId>hive-exec</artifactId>
        <version>2.1.1</version>
        <scope>provided</scope>
    </dependency>

    <dependency>
        <groupId>org.elasticsearch</groupId>
        <artifactId>elasticsearch-hadoop</artifactId>
        <version>6.5.4</version>
        <scope>provided</scope>
    </dependency>
</dependencies>
复制代码

自定义日期解析包的部署

代码编写完成后,将代码进行打包,然后将打包好的 jar 包放置到 Hive 的 auxlib 目录中,然后重启 Hive 即可;该步骤与 ES-Hadoop 的安装步骤一样。

在编写 Spark 程序从 Hive 中读取数据的时候,需要添加对该包的依赖以及对 ES-Hadoop 的依赖。

三、总结

经过上述的步骤,Hive 与 ES 的映射已经不成问题,如果想从 ES 中导出数据,可用借助 HSQL insert into table XXX select * from XXXXX; 的方式从 ES 中读取数据写入到 HDFS;当然通过更为复杂的 HSQL 可以将数据进行处理,并将数据重新写入到 ES 或者存储到 HDFS。

充分利用 ES 的查询、过滤和聚合,可以很好的去服务数据标准化、数据清洗、数据分布情况等 ETL 流程。

Any Code,Code Any!

扫码关注『AnyCode』,编程路上,一起前行。

Hive 与 ElasticSearch 的数据交互

以上所述就是小编给大家介绍的《Hive 与 ElasticSearch 的数据交互》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

引爆点

引爆点

【加】马尔科姆•格拉德威尔(Malcolm Gladwell) / 钱清、覃爱冬 / 中信出版社 / 2014-4 / 36.00元

《引爆点》是《纽约客》怪才格拉德威尔的一部才华横溢之作。他以社会上突如其来的流行潮为切入点,从全新角度探索了控制科学和营销模式。他认为,思想、行为、信息及产品常会像传染病暴发一样迅速传播。正如一个病人就能引起全城流感;几位涂鸦爱好者能在地铁掀起犯罪浪潮;一位满意而归的顾客还能让新开张的餐馆座无虚席;发起小规模流行的团队能引发大规模流行风暴。这些现象均属“社会流行潮”,它达到临界水平并爆发的那一刻,......一起来看看 《引爆点》 这本书的介绍吧!

随机密码生成器
随机密码生成器

多种字符组合密码

RGB HSV 转换
RGB HSV 转换

RGB HSV 互转工具