内容简介:作者:Jia Zhai翻译:Jennifer编辑:Anonymitaet
作者:Jia Zhai
翻译:Jennifer
编辑:Anonymitaet
本文于 2019 年 5 月 23 日发表在 Debezium 社区。
Bot,向诸位爱卿介绍一下朕的 新晋宠妃 Debezium connector 。
Pulsar IO 框架运行 Debezium connector 能捕获数据变化,并能将不同数据库的数据变化保存至 Pulsar。
本文介绍了如何在 Pulsar 中使用 Debezium connector 捕获 MySQL 表的数据变化,并将这些变化保存至 Pulsar。
如何在 Pulsar 中使用 Debezium Connector
Debezium( https://debezium.io/ )是捕获数据变化(Change Data Capture)的一个开源项目。
Debezium 基于 Apache Kafka Connect (https://kafka.apache.org/documentation/#connectapi)开发,支持多种数据库,例如, MySQL、 MongoDB 、PostgreSQL、Oracle 以及 SQL Server。
Apache Pulsar(http://pulsar.apache.org)包含一套基于 Pulsar IO 框架的内置 connector (https://pulsar.apache.org/docs/en/io-connectors),它与 Apache Kafka Connect 功能相同。
从 2.3.0 版本开始,Pulsar IO 支持开箱即用的 Debezium 源连接器(http://pulsar.apache.org/docs/en/2.3.0/io-cdc-debezium),你可以利用 Debezium 将数据库中的变化瞬时保存至 Apache Pulsar。
教程步骤预览
除了事件流的存储从 Kafka 转到 Pulsar,其他步骤类似于 Debezium 教程(https://debezium.io/docs/tutorial)。
本教程主要包含六个步骤:
-
启动 MySQL。
-
启动 standalone Pulsar。
-
在 Pulsar IO 中启动 Debezium connector。Pulsar IO 从 MySQL 读取数据变化。
-
订阅 Pulsar topic,监控 MySQL 数据变化。
-
更改 MySQL 表的数据 ,验证更改是否立刻记录 在 Pulsar topic 中。
-
清除环境。
1. 启动 MySQL
启动 MySQL, Debezium 会从 MySQL 捕获数据变化。
在新的终端窗口中启动容器,运行 MySQL。
MySQL 有一个预配置好的数据库,名为 inventory。
docker run --rm --name mysql -p 3306:3306 -e MYSQL_ROOT_PASSWORD=debezium -e MYSQL_USER=mysqluser -e MYSQL_PASSWORD=mysqlpw debezium/example-mysql:0.8
结果如下所示:
2019-03-25T14:12:41.178325Z 0 [Note] Event Scheduler: Loaded 0 events
2019-03-25T14:12:41.178670Z 0 [Note] mysqld: ready for connections.
Version: '5.7.25-log' socket: '/var/run/mysqld/mysqld.sock' port: 3306 MySQL Community Server (GPL)
2. 启动 Pulsar
在本地用 standalone 模式启动 Pulsar。 Pulsar 2.3.0 版本新增了在 Pulsar IO 中运行 Debezium connector 的功能。
下载以下文件:
-
Apache Pulsar 2.3.0 二进制包,点击 https://archive.apache.org/dist/pulsar/pulsar-2.3.0/apache-pulsar-2.3.0-bin.tar.gz。
-
Kafka Adapter Connector,点击 (https://archive.apache.org/dist/pulsar/pulsar-2.3.0/connectors/pulsar-io-kafka-connect-adaptor-2.3.0.nar。
在 Pulsar 中,所有 Pulsar IO connector 都独立封装为 NAR(https://medium.com/hashmapinc/nifi-nar-files-explained-14113f7796fd)文件。
$ wget https://archive.apache.org/dist/pulsar/pulsar-2.3.0/apache-pulsar-2.3.0-bin.tar.gz
$ wget https://archive.apache.org/dist/pulsar/pulsar-2.3.0/connectors/pulsar-io-kafka-connect-adaptor-2.3.0.nar
$ tar zxf apache-pulsar-2.3.0-bin.tar.gz
$ cd apache-pulsar-2.3.0
$ mkdir connectors
$ cp ../pulsar-io-kafka-connect-adaptor-2.3.0.nar connectors
$ bin/pulsar standalone
3. 启动 Debezium Connector
在另一个终端窗口,用 local run 模式启动 Debezium MySQL connector。
debezium-mysql-source-config.yaml 文件包含所有配置,主要参数列在 configs 节点下。 yaml 格式的文件有 task.class 参数。配置文件还包含 MySQL 相关参数(例如,服务器、端口、用户、密码)以及 history 和 offset 存储文件中 Pulsar topics 的两个名字。
以下是 debezium-mysql-source-config.yaml 文件的内容:
表会自动创建在 MySQL 中,Debezium connector 一开始就会从 MySQL binlog 文件读取历史记录。输出结果显示 connector 被触发后,处理了 47 条记录。
更多关于如何管理 connector 的信息,查阅 Pulsar IO 文档(http://pulsar.apache.org/docs/en/io-managing/)。
Debezium 捕获和读取的记录会自动发布至 Pulsar topic。在另一终端窗口运行以下命令,将显示 Pulsar 当前的 topic。
$ bin/pulsar-admin topics list public/default
每个表格的变化数据会单独存储在 Pulsar topic 中。除了与数据库表相关的 topic,另外两个名为 history-topic 和 offset-topic 的 topic 分别储存与历史和偏移量相关的数据。
persistent://public/default/history-topic
persistent://public/default/offset-topic
4. 订阅 Pulsar topic
以
persistent://public/default/dbserver1.inventory.productstopic 为例。使用 CLI 命令消费该 topic,并监控 products 表的变化。
$ bin/pulsar-client consume -s "sub-products" public/default/dbserver1.inventory.products -n 0
输出结果如下:
22:17:41.201 [pulsar-client-io-1-1] INFO org.apache.pulsar.client.impl.ConsumerImpl - [public/default/dbserver1.inventory.products][sub-products] Subscribing to topic on cnx [id: 0xfe0b4feb, L:/127.0.0.1:55585 - R:localhost/127.0.0.1:6650]
22:17:41.223 [pulsar-client-io-1-1] INFO org.apache.pulsar.client.impl.ConsumerImpl - [public/default/dbserver1.inventory.products][sub-products] Subscribed to topic on localhost/127.0.0.1:6650 -- consumer: 0
也可以使用 offset topic 监控偏移量变化,表的变化存储在
persistent://public/default/dbserver1.inventory.products topic 中。
$ bin/pulsar-client consume -s "sub-offset" offset-topic -n 0
5. 验证 Pulsar topic
在 Docker 中启动 MySQL, 更改 MySQL 的 product 表的数据。
$ docker run -it --rm --name mysqlterm --link mysql --rm mysql:5.7 sh -c 'exec mysql -h"$MYSQL_PORT_3306_TCP_ADDR" -P"$MYSQL_PORT_3306_TCP_PORT" -uroot -p"$MYSQL_ENV_MYSQL_ROOT_PASSWORD"'
运行命令后,出现 MySQL CLI,重命名 products 表中的两个项目。
mysql> use inventory;
mysql> show tables;
mysql> SELECT * FROM products ;
mysql> UPDATE products SET name='1111111111' WHERE id=101;
mysql> UPDATE products SET name='1111111111' WHERE id=107;
此时,消费 products topic 的终端页面显示已添加了两个变化。
此时,消费 offset topic 的终端页面显示已添加了两个偏移量。
此时, local-run 连接器的终端显示已处理了两条记录。
6. 清除环境
使用 Ctrl+C 关闭终端。
使用 docker ps 和 docker kill 命令关闭与 MySQL 相关的容器。
mysql> quit
$ docker ps
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
84d66c2f591d debezium/example-mysql:0.8 "docker-entrypoint.s…" About an hour ago Up About an hour 0.0.0.0:3306->3306/tcp, 33060/tcp mysql
$ docker kill 84d66c2f591d
如需删除 Pulsar 数据,删除 Pulsar binary 目录中的数据目录。
$ pwd
/Users/jia/ws/releases/apache-pulsar-2.3.0
$ rm -rf data
总结
Pulsar IO 框架运行 Debezium connector 能捕获数据变化,并能将不同数据库的数据变化保存至 Pulsar。
本文介绍了如何在 Pulsar 中使用Debezium connector 捕获 MySQL 表的数据变化,并将这些变化保存至 Pulsar。
我们会持续改进在 Pulsar 中使用 Debezium 连接器的体验。在 Pulsar 2.4.0 版本后,操作会变得更加简单。
作者介绍
翟佳是 StreamNative 的核心工程师、开源项目 Apache Pulsar 和 Apache BookKeeper 的 PMC成员,并持续为这两个开源项目作出了杰出贡献。
点击 阅读原文 ,查看英语原文。
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网
猜你喜欢:- RecyclerView使用指南(一)—— 基本使用
- 如何使用Meteorjs使用URL参数
- 使用 defer 还是不使用 defer?
- 使用 Typescript 加强 Vuex 使用体验
- [译] 何时使用 Rust?何时使用 Go?
- UDP协议的正确使用场合(谨慎使用)
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
Beginning Google Maps API 3
Gabriel Svennerberg / Apress / 2010-07-27 / $39.99
This book is about the next generation of the Google Maps API. It will provide the reader with the skills and knowledge necessary to incorporate Google Maps v3 on web pages in both desktop and mobile ......一起来看看 《Beginning Google Maps API 3》 这本书的介绍吧!
CSS 压缩/解压工具
在线压缩/解压 CSS 代码
URL 编码/解码
URL 编码/解码