如何在 Pulsar 中使用 Debezium Connector

栏目: 数据库 · 发布时间: 5年前

内容简介:作者:Jia Zhai翻译:Jennifer编辑:Anonymitaet

作者:Jia Zhai



本文于 2019 年 5 月 23 日发表在  Debezium 社区。

Bot,向诸位爱卿介绍一下朕的 新晋宠妃 Debezium connector 。

Pulsar IO 框架运行 Debezium connector 能捕获数据变化,并能将不同数据库的数据变化保存至 Pulsar。

本文介绍了如何在 Pulsar 中使用 Debezium connector 捕获 MySQL 表的数据变化,并将这些变化保存至 Pulsar。

Debezium( https://debezium.io/ )是捕获数据变化(Change Data Capture)的一个开源项目。

Debezium 基于 Apache Kafka Connect (https://kafka.apache.org/documentation/#connectapi)开发,支持多种数据库,例如, MySQL、 MongoDB 、PostgreSQL、Oracle 以及 SQL Server。

Apache Pulsar(http://pulsar.apache.org)包含一套基于 Pulsar IO 框架的内置 connector (https://pulsar.apache.org/docs/en/io-connectors),它与 Apache Kafka Connect 功能相同。

从 2.3.0 版本开始,Pulsar IO 支持开箱即用的 Debezium 源连接器(http://pulsar.apache.org/docs/en/2.3.0/io-cdc-debezium),你可以利用 Debezium 将数据库中的变化瞬时保存至 Apache Pulsar。


除了事件流的存储从 Kafka 转到 Pulsar,其他步骤类似于 Debezium 教程(https://debezium.io/docs/tutorial)。


  1. 启动 MySQL。

  2. 启动 standalone Pulsar。

  3. 在 Pulsar IO 中启动 Debezium connector。Pulsar IO 从 MySQL 读取数据变化。

  4. 订阅 Pulsar topic,监控 MySQL 数据变化。

  5. 更改 MySQL 表的数据 ,验证更改是否立刻记录 在 Pulsar topic 中。

  6. 清除环境。

1. 启动 MySQL

启动 MySQL, Debezium 会从 MySQL 捕获数据变化。

在新的终端窗口中启动容器,运行 MySQL。

MySQL 有一个预配置好的数据库,名为 inventory。

docker run --rm --name mysql -p 3306:3306 -e MYSQL_ROOT_PASSWORD=debezium -e MYSQL_USER=mysqluser -e MYSQL_PASSWORD=mysqlpw debezium/example-mysql:0.8


2019-03-25T14:12:41.178325Z 0 [Note] Event Scheduler: Loaded 0 events

2019-03-25T14:12:41.178670Z 0 [Note] mysqld: ready for connections.

Version: '5.7.25-log'  socket: '/var/run/mysqld/mysqld.sock'  port: 3306  MySQL Community Server (GPL)

2. 启动 Pulsar

在本地用 standalone 模式启动 Pulsar。 Pulsar 2.3.0 版本新增了在 Pulsar IO 中运行 Debezium connector 的功能。


  • Apache Pulsar 2.3.0 二进制包,点击 https://archive.apache.org/dist/pulsar/pulsar-2.3.0/apache-pulsar-2.3.0-bin.tar.gz。

  • Kafka Adapter Connector,点击 (https://archive.apache.org/dist/pulsar/pulsar-2.3.0/connectors/pulsar-io-kafka-connect-adaptor-2.3.0.nar。

在 Pulsar 中,所有 Pulsar IO connector 都独立封装为  NAR(https://medium.com/hashmapinc/nifi-nar-files-explained-14113f7796fd)文件。

$ wget https://archive.apache.org/dist/pulsar/pulsar-2.3.0/apache-pulsar-2.3.0-bin.tar.gz

$ wget https://archive.apache.org/dist/pulsar/pulsar-2.3.0/connectors/pulsar-io-kafka-connect-adaptor-2.3.0.nar

$ tar zxf apache-pulsar-2.3.0-bin.tar.gz

$ cd apache-pulsar-2.3.0

$ mkdir connectors

$ cp ../pulsar-io-kafka-connect-adaptor-2.3.0.nar connectors

$ bin/pulsar standalone

3. 启动 Debezium Connector

在另一个终端窗口,用 local run 模式启动 Debezium MySQL connector。

debezium-mysql-source-config.yaml  文件包含所有配置,主要参数列在 configs 节点下。 yaml 格式的文件有 task.class 参数。配置文件还包含 MySQL 相关参数(例如,服务器、端口、用户、密码)以及 history 和 offset 存储文件中 Pulsar topics 的两个名字。

以下是 debezium-mysql-source-config.yaml 文件的内容:

表会自动创建在 MySQL 中,Debezium connector 一开始就会从 MySQL binlog 文件读取历史记录。输出结果显示 connector 被触发后,处理了 47 条记录。

更多关于如何管理 connector 的信息,查阅 Pulsar IO 文档(http://pulsar.apache.org/docs/en/io-managing/)。

Debezium 捕获和读取的记录会自动发布至 Pulsar topic。在另一终端窗口运行以下命令,将显示 Pulsar 当前的 topic。

$ bin/pulsar-admin topics list public/default

每个表格的变化数据会单独存储在 Pulsar topic 中。除了与数据库表相关的 topic,另外两个名为 history-topic 和 offset-topic 的 topic 分别储存与历史和偏移量相关的数据。



4. 订阅 Pulsar topic

persistent://public/default/dbserver1.inventory.productstopic 为例。使用 CLI 命令消费该 topic,并监控 products 表的变化。

$ bin/pulsar-client consume -s "sub-products" public/default/dbserver1.inventory.products -n 0


22:17:41.201 [pulsar-client-io-1-1] INFO  org.apache.pulsar.client.impl.ConsumerImpl - [public/default/dbserver1.inventory.products][sub-products] Subscribing to topic on cnx [id: 0xfe0b4feb, L:/ - R:localhost/]

22:17:41.223 [pulsar-client-io-1-1] INFO  org.apache.pulsar.client.impl.ConsumerImpl - [public/default/dbserver1.inventory.products][sub-products] Subscribed to topic on localhost/ -- consumer: 0

也可以使用 offset topic 监控偏移量变化,表的变化存储在

persistent://public/default/dbserver1.inventory.products topic 中。

$ bin/pulsar-client consume -s "sub-offset" offset-topic -n 0  

5. 验证 Pulsar topic 

Docker 中启动 MySQL, 更改 MySQL 的 product 表的数据。

$ docker run -it --rm --name mysqlterm --link mysql --rm mysql:5.7 sh -c 'exec mysql -h"$MYSQL_PORT_3306_TCP_ADDR" -P"$MYSQL_PORT_3306_TCP_PORT" -uroot -p"$MYSQL_ENV_MYSQL_ROOT_PASSWORD"'

运行命令后,出现 MySQL CLI,重命名 products 表中的两个项目。

mysql> use inventory;

mysql> show tables;

mysql> SELECT * FROM  products ;

mysql> UPDATE products SET name='1111111111' WHERE id=101;

mysql> UPDATE products SET name='1111111111' WHERE id=107;

此时,消费 products topic 的终端页面显示已添加了两个变化。

此时,消费 offset topic 的终端页面显示已添加了两个偏移量。

此时, local-run 连接器的终端显示已处理了两条记录。

6. 清除环境

使用 Ctrl+C 关闭终端。

使用 docker ps 和 docker kill 命令关闭与 MySQL 相关的容器。

mysql> quit

$ docker ps


84d66c2f591d  debezium/example-mysql:0.8 "docker-entrypoint.s…"   About an hour ago   Up About an hour>3306/tcp, 33060/tcp mysql

$ docker kill 84d66c2f591d

如需删除 Pulsar 数据,删除 Pulsar binary 目录中的数据目录。

$ pwd


$ rm -rf data


Pulsar IO 框架运行 Debezium connector 能捕获数据变化,并能将不同数据库的数据变化保存至 Pulsar。

本文介绍了如何在 Pulsar 中使用Debezium connector 捕获 MySQL 表的数据变化,并将这些变化保存至 Pulsar。

我们会持续改进在 Pulsar 中使用 Debezium 连接器的体验。在 Pulsar 2.4.0 版本后,操作会变得更加简单。


翟佳是 StreamNative 的核心工程师、开源项目 Apache Pulsar 和 Apache BookKeeper 的 PMC成员,并持续为这两个开源项目作出了杰出贡献。

点击 阅读原文 ,查看英语原文。

