StreamSets实时采集MySQL数据到HBase

栏目: IT技术 · 发布时间: 4年前

内容简介:本地HBase环境本地环境演示实例

点击关注上方“ 知了小巷 ”,

设为“置顶或星标”,第一时间送达干货。

StreamSets实时采集 <a href='https://www.codercto.com/topics/18746.html'>MySQL</a> 数据到HBase

本地HBase环境

$ jps
4082 Jps
3556 NameNode
3813 QuorumPeerMain
3911 HMaster
3642 DataNode
3739 SecondaryNameNode
3999 HRegionServer

本地环境演示实例

mysql环境

$ docker ps
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
09849f9303ae 1fdf3806e715 "/entrypoint.sh mysq…" 22 months ago Up About a minute (healthy) 0.0.0.0:3306->3306/tcp, 33060/tcp mysql

mysql版本:8.0.12

hbase环境版本:

Apache Hadoop:hadoop-3.1.1

Apache HBase:hbase-2.1.0

Apache Phoenix:apache-phoenix-5.0.0-HBase-2.0-bin

本地sdc环境

$ docker run --restart on-failure -p 18630:18630 -d --name streamsets-dc streamsets/datacollector
$ docker ps
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
cd2d89509457 streamsets/datacollector "/docker-entrypoint.…" 35 minutes ago Up 35 minutes 0.0.0.0:18630->18630/tcp streamsets-dc
09849f9303ae 1fdf3806e715 "/entrypoint.sh mysq…" 22 months ago Up About an hour (healthy) 0.0.0.0:3306->3306/tcp, 33060/tcp mysql

虽然 docker很方便,但是在连接HBase时需要打通网络,由于本地HBase是localhost,访问有问题,所以放弃 docker 版sdc,改为本地解压版。

[zk: localhost:2181(CONNECTED) 4] get /hbase/master
�master:16000 }��PBUF


localhost�}�����.�}
cZxid = 0x1a725
ctime = Tue Jul 21 11:02:46 CST 2020
mZxid = 0x1a725
mtime = Tue Jul 21 11:02:46 CST 2020
pZxid = 0x1a725
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x1000069b14e0000
dataLength = 57
numChildren = 0

StreamSets实时采集MySQL数据到HBase

SDC本地解压版:

$ ./bin/streamsets dc
Java 1.8 detected; adding $SDC_JAVA8_OPTS of "-XX:+UseConcMarkSweepGC -XX:+UseParNewGC -Djdk.nio.maxCachedBufferSize=262144" to $SDC_JAVA_OPTS
Bypass activation because SDC contains only basic stage libraries.
Logging initialized @2901ms to org.eclipse.jetty.util.log.Slf4jLog
Running on URI : 'http://192.168.31.29:18630'

文档地址:

https://streamsets.com/documentation/datacollector/latest/help/datacollector/UserGuide/Getting_Started/GettingStarted_Title.html#concept_htw_ghg_jq

sdc支持的HBase版本下文测试环境演示实例中可以看到。

Phoenix与HBase服务的集成:

只需要将 Phoenix包解压后的phoenix-5.0.0-HBase-2.0-server.jar phoenix-core-5.0.0-HBase-2.0.jar两个jar包拷贝到hbase的lib目录下,修改hbase-site.xml,添加相关配置,重启hbase集群即可。

  <property>
<name>phoenix.schema.isNamespaceMappingEnabled</name>
<value>true</value>
</property>
<property>
<name>phoenix.schema.mapSystemTablesToNamespace</name>
<value>true</value>
</property>

为了方便通过sqlline.py 访问 phoenix,将hbase-site.xml复制一份到phoenix的bin目录下。

$ ./bin/sqlline.py 
Setting property: [incremental, false]
Setting property: [isolation, TRANSACTION_READ_COMMITTED]
issuing: !connect jdbc:phoenix: none none org.apache.phoenix.jdbc.PhoenixDriver
Connecting to jdbc:phoenix:
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/Users/shaozhipeng/Development/apache-phoenix-5.0.0-HBase-2.0-bin/phoenix-5.0.0-HBase-2.0-client.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/Users/shaozhipeng/Development/hadoop-3.1.1/share/hadoop/common/lib/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
20/07/20 16:28:09 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Connected to: Phoenix (version 5.0)
Driver: PhoenixEmbeddedDriver (version 5.0)
Autocommit status: true
Transaction isolation: TRANSACTION_READ_COMMITTED
Building list of tables and columns for tab-completion (set fastconnect to true to skip)...
133/133 (100%) Done
Done
sqlline version 1.2.0
0: jdbc:phoenix:> !tables
+------------+--------------+-------------+---------------+----------+------------+----------------------------+-----------------+--------------+-----------------+---------------+-+
| TABLE_CAT | TABLE_SCHEM | TABLE_NAME | TABLE_TYPE | REMARKS | TYPE_NAME | SELF_REFERENCING_COL_NAME | REF_GENERATION | INDEX_STATE | IMMUTABLE_ROWS | SALT_BUCKETS | |
+------------+--------------+-------------+---------------+----------+------------+----------------------------+-----------------+--------------+-----------------+---------------+-+
| | SYSTEM | CATALOG | SYSTEM TABLE | | | | | | false | null | |
| | SYSTEM | FUNCTION | SYSTEM TABLE | | | | | | false | null | |
| | SYSTEM | LOG | SYSTEM TABLE | | | | | | true | 32 | |
| | SYSTEM | SEQUENCE | SYSTEM TABLE | | | | | | false | null | |
| | SYSTEM | STATS | SYSTEM TABLE | | | | | | false | null | |
+------------+--------------+-------------+---------------+----------+------------+----------------------------+-----------------+--------------+-----------------+---------------+-+
0: jdbc:phoenix:>

MySQL创建表user

CREATE TABLE `user` (
`id` int(11) NOT NULL AUTO_INCREMENT COMMENT '主键',
`user_name` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NOT NULL DEFAULT '' COMMENT '用户名',
`update_time` datetime NOT NULL ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 COLLATE=utf8_bin;

HBase创建命名空间和表

create_namespace 'ZLXX'
create 'ZLXX:USER', 'INFO'


hbase(main):001:0> create_namespace 'ZLXX'
Took 0.9431 seconds
hbase(main):002:0> create 'ZLXX:USER', 'INFO'
Created table ZLXX:USER
Took 1.4457 seconds
=> Hbase::Table - ZLXX:USER

Phoenix创建schema和表映射

create schema ZLXX;
create table ZLXX.USER (
id varchar primary key,
info.id varchar,
info.user_name varchar,
info.update_time varchar
) column_encoded_bytes=0;


0: jdbc:phoenix:> !tables
+------------+--------------+-------------+---------------+----------+------------+----------------------------+-----------------+--------------+-----------------+---------+
| TABLE_CAT | TABLE_SCHEM | TABLE_NAME | TABLE_TYPE | REMARKS | TYPE_NAME | SELF_REFERENCING_COL_NAME | REF_GENERATION | INDEX_STATE | IMMUTABLE_ROWS | SALT_BU |
+------------+--------------+-------------+---------------+----------+------------+----------------------------+-----------------+--------------+-----------------+---------+
| | SYSTEM | CATALOG | SYSTEM TABLE | | | | | | false | null |
| | SYSTEM | FUNCTION | SYSTEM TABLE | | | | | | false | null |
| | SYSTEM | LOG | SYSTEM TABLE | | | | | | true | 32 |
| | SYSTEM | SEQUENCE | SYSTEM TABLE | | | | | | false | null |
| | SYSTEM | STATS | SYSTEM TABLE | | | | | | false | null |
| | ZLXX | USER | TABLE | | | | | | false | null |
+------------+--------------+-------------+---------------+----------+------------+----------------------------+-----------------+--------------+-----------------+---------+

SDC创建pipeline流水线

StreamSets实时采集MySQL数据到HBase

StreamSets实时采集MySQL数据到HBase

需要先安装JDBC和CDH的组件

StreamSets实时采集MySQL数据到HBase

StreamSets实时采集MySQL数据到HBase

选择Origin:JDBC Query Consumer和Destination HBase

StreamSets实时采集MySQL数据到HBase

简单完整pipeline如图

StreamSets实时采集MySQL数据到HBase

上图中,直接从 UI管理界面上传SDC-MySQL JDBC驱动,根据提示重启SDC即可

上传成功后,可以在列表里面看到

StreamSets实时采集MySQL数据到HBase

MySQL版本是8.0.12,需要注意jar包版本

https://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.12/mysql-connector-java-8.0.12.jar

主要的数据源配置和数据目标系统配置

JDBC-MySQL:

StreamSets实时采集MySQL数据到HBase

HBase-CDH6.3.0:

StreamSets实时采集MySQL数据到HBase

Validate成功之后,直接Start,运行后的界面:

StreamSets实时采集MySQL数据到HBase

简单演示一下,往 MySQL test.user表中插入和更新数据

INSERT INTO `test`.`user`(`id`, `user_name`, `update_time`) VALUES (2, 'ZLXX_INSERT', SYSDATE());

StreamSets实时采集MySQL数据到HBase

Record Count (since last startup)上面显示数量大于1是因为查询 SQL 里面的更新时间是大于等于OFFSET,而且每10秒扫描一次,因此会不断被扫描到,SQL里面加上了最近五分钟的限制,因此重复次数不会太多,如果是大于OFFSET,有可能会导致数据丢失。

StreamSets实时采集MySQL数据到HBase

hbase端的数据

hbase(main):004:0> scan 'ZLXX:USER', {LIMIT=>5}
ROW COLUMN+CELL
1 column=INFO:ID, timestamp=1595384382829, value=1
1 column=INFO:UPDATE_TIME, timestamp=1595384382829, value=20180903190809
1 column=INFO:USER_NAME, timestamp=1595384382829, value=\xE9\x82\xB5\xE5\xBF\x97\xE9\xB9\x8F
1 column=INFO:_0, timestamp=1595384382829, value=
2 column=INFO:ID, timestamp=1595388092263, value=2
2 column=INFO:UPDATE_TIME, timestamp=1595388092263, value=20200722111640
2 column=INFO:USER_NAME, timestamp=1595388092263, value=ZLXX_INSERT
2 row(s)
Took 0.0598 seconds

Phoenix端查询

0: jdbc:phoenix:> select * from zlxx.user limit 5;
+-----+-----+--------------+-----------------+
| ID | ID | USER_NAME | UPDATE_TIME |
+-----+-----+--------------+-----------------+
| 1 | 1 | 邵志鹏 | 20180903190809 |
| 2 | 2 | ZLXX_INSERT | 20200722111640 |
+-----+-----+--------------+-----------------+
2 rows selected (0.134 seconds)

MySQL test.user更新操作

UPDATE `test`.`user` SET `user_name` = 'ZLXX_IN_UPDATE', `update_time` = SYSDATE() WHERE `id` = 2;

Phoenix和HBase查询

0: jdbc:phoenix:> select * from zlxx.user limit 5;
+-----+-----+-----------------+-----------------+
| ID | ID | USER_NAME | UPDATE_TIME |
+-----+-----+-----------------+-----------------+
| 1 | 1 | 邵志鹏 | 20180903190809 |
| 2 | 2 | ZLXX_IN_UPDATE | 20200722113033 |
+-----+-----+-----------------+-----------------+
2 rows selected (0.128 seconds)
hbase(main):005:0> scan 'ZLXX:USER', {LIMIT=>5}
ROW COLUMN+CELL
1 column=INFO:ID, timestamp=1595384382829, value=1
1 column=INFO:UPDATE_TIME, timestamp=1595384382829, value=20180903190809
1 column=INFO:USER_NAME, timestamp=1595384382829, value=\xE9\x82\xB5\xE5\xBF\x97\xE9\xB9\x8F
1 column=INFO:_0, timestamp=1595384382829, value=
2 column=INFO:ID, timestamp=1595388662927, value=2
2 column=INFO:UPDATE_TIME, timestamp=1595388662927, value=20200722113033
2 column=INFO:USER_NAME, timestamp=1595388662927, value=ZLXX_IN_UPDATE
2 row(s)
Took 0.0761 seconds

需要注意上面 Phoenix映射表的时候,两个ID字段,命名需要注意。

【本地环境 SDC实时采集MySQL数据到HBase并映射Phoenix表查询、END】

附CDH的HBase版本:

不同 CDH版本对应的hbase版本(重要)

StreamSets实时采集MySQL数据到HBase

StreamSets实时采集MySQL数据到HBase

往期推荐:

到底什么样的企业应该建设数据中台?

数据中台到底是不是大数据的下一站?

Phoenix Java API配置及使用总结

Phoenix表映射

Phoenix视图映射

Kafka消息送达语义说明

Kafka基础知识总结

Hadoop YARN:ApplicationMaster向ResourceManager注册AM源码调试

Apache Hadoop YARN:Client<-->ResourceManager源码解析

Apache Hadoop YARN:Client<-->ResourceManager源码DEBUG

Hadoop YARN:ApplicationMaster与ResourceManager交互源码解析

Hive企业级调优

HiveQL查询连续三天有销售记录的店铺

HiveQL实战蚂蚁森林低碳用户排名分析:解法一

HiveQL实战蚂蚁森林低碳用户排名分析:解法二

HiveQL实战蚂蚁森林植物申领统计分析

Hive-函数

Hive-查询

Hive-DML(Data Manipulation Language)数据操作语言

Hive-DDL(Data Definition Language)数据定义

Hive优化(整理版)

Spark Core之Shuffle解析

数据仓库开发规范

StreamSets实时采集MySQL数据到HBase

StreamSets实时采集MySQL数据到HBase

喜欢就分享-点赞-在看吧,谢谢~~


以上所述就是小编给大家介绍的《StreamSets实时采集MySQL数据到HBase》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

Clean Code

Clean Code

Robert C. Martin / Prentice Hall / 2008-8-11 / USD 49.99

Even bad code can function. But if code isn’t clean, it can bring a development organization to its knees. Every year, countless hours and significant resources are lost because of poorly written code......一起来看看 《Clean Code》 这本书的介绍吧!

html转js在线工具
html转js在线工具

html转js在线工具

RGB HSV 转换
RGB HSV 转换

RGB HSV 互转工具

HEX HSV 转换工具
HEX HSV 转换工具

HEX HSV 互换工具