内容简介:debezium是一个为了捕获数据变更(cdc)的开源的分布式平台。启动并指向数据库,当其他应用对此数据库执行本篇文章主要使用下篇主要使用kafka connector来同步更新到数据库。
debezium是一个为了捕获数据变更(cdc)的开源的分布式平台。启动并指向数据库,当其他应用对此数据库执行 inserts 、 updates 、 delete 操作时,此应用快速得到响应。debezium是持久化和快速响应的,因此你的应用可以快速响应且不会丢失任意一条事件。debezium记录是数据库表的行级别的变更事件。同时debezium是构建在kafka之上的,同时与kafka深度耦合,所以提供kafka connector来使用,debezium sink。支持的数据库有 mysql 、 MongoDB 、PostgreSQL、Oracle、SQL server。本篇以mysql作为数据源来实现功能,监听msyql的binlog,还需要修改。当前版本是0.9.5.Final,0.10版本正在开发中。
配置
本篇文章主要使用 Embedding 形式监听事件,并同步更新到数据库。
下篇主要使用kafka connector来同步更新到数据库。
mysql需要如下开启binlog。但是如果使用的是debezium/mysql镜像,自动已经配置好了。
log-bin=mysql-bin #添加这一行就ok binlog-format=ROW #选择row模式 server_id=1 #配置mysql replaction需要定义,不能和canal的slaveId重复 复制代码
Tutorial
先来一个效果,主要是配置kafka connector来获取debezium事件记录。需要3个服务,zookeeper、kakfa和debezium connector。这里使用 docker 来启动的,所以需要先按照docker。
启动zookeeper
docker run -it --rm --name zookeeper -p 2181:2181 -p 2888:2888 -p 3888:3888 debezium/zookeeper:0.9 复制代码
启动kafka
docker run -it --rm --name kafka -p 9092:9092 --link zookeeper:zookeeper debezium/kafka:0.9 复制代码
启动mysql
docker run -it --rm --name mysql -p 3306:3306 -e MYSQL_ROOT_PASSWORD=debezium -e MYSQL_USER=mysqluser -e MYSQL_PASSWORD=mysqlpw debezium/example-mysql:0.9 复制代码
启动kafka connect
docker run -it --rm --name connect -p 8083:8083 -e GROUP_ID=1 -e CONFIG_STORAGE_TOPIC=my_connect_configs -e OFFSET_STORAGE_TOPIC=my_connect_offsets -e STATUS_STORAGE_TOPIC=my_connect_statuses --link zookeeper:zookeeper --link kafka:kafka --link mysql:mysql debezium/connect:0.9 复制代码
通过connect的http请求创建debezium connector
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d '{ "name": "inventory-connector", "config": { "connector.class": "io.debezium.connector.mysql.MySqlConnector", "tasks.max": "1", "database.hostname": "mysql", "database.port": "3306", "database.user": "debezium", "database.password": "dbz", "database.server.id": "184054", "database.server.name": "dbserver1", "database.whitelist": "inventory", "database.history.kafka.bootstrap.servers": "kafka:9092", "database.history.kafka.topic": "dbhistory.inventory" } }'
复制代码
mysql客户端操作
通过 invertory 数据库了的任一表的数据
创建监听可以查看debezium事件记录
docker run -it --name watcher --rm --link zookeeper:zookeeper --link kafka:kafka debezium/kafka:0.9 watch-topic -a -k dbserver1.inventory.customers 复制代码
内嵌式
这里主要使用内嵌式的方式获取cdc事件而不需要使用kafka,直接消费debezium事件流。场景是在某一个mysql数据库里的table发生变更,把变更同步到另一mysql数据库。本次使用的是监听 inventory 数据库并将数据同步到 inventory_back 。
###debezium配置
connector.class=io.debezium.connector.mysql.MySqlConnector offset.storage=org.apache.kafka.connect.storage.FileOffsetBackingStore offset.storage.file.filename=offset.dat offset.flush.interval.ms=60000 name=debezium-kafka-source database.hostname=localhost database.port=3306 database.user=debezium database.password=dbz #database.dbname=inventory database.whitelist=inventory #database.whitelist=inventory,inventory_back server.id=184054 database.server.name=dbserver1 #transforms=unwrap #transforms.unwrap.type=io.debezium.transforms.UnwrapFromEnvelope #transforms.unwrap.drop.tombstones=false database.history=io.debezium.relational.history.FileDatabaseHistory database.history.file.filename=dbhistory.dat 复制代码
属性和convert配置
@Slf4j
@Configuration
public class DebeziumEmbeddedAutoConfiguration {
@Bean
public Properties embeddedProperties() {
Properties propConfig = new Properties();
try(InputStream propsInputStream = getClass().getClassLoader().getResourceAsStream("config.properties")) {
propConfig.load(propsInputStream);
} catch (IOException e) {
log.error("Couldn't load properties", e);
}
PropertyLoader.loadEnvironmentValues(propConfig);
return propConfig;
}
@Bean
public io.debezium.config.Configuration embeddedConfig(Properties embeddedProperties) {
return io.debezium.config.Configuration.from(embeddedProperties);
}
@Bean
public JsonConverter keyConverter(io.debezium.config.Configuration embeddedConfig) {
JsonConverter converter = new JsonConverter();
converter.configure(embeddedConfig.asMap(), true);
return converter;
}
@Bean
public JsonConverter valueConverter(io.debezium.config.Configuration embeddedConfig) {
JsonConverter converter = new JsonConverter();
converter.configure(embeddedConfig.asMap(), false);
return converter;
}
}
复制代码
同步DDL和DML
这里主要是利用CommandLineRunner特性,启动debezium的EmbeddedEngine引擎,获取到cdc事件后由 handleRecord 处理DDL和DML,需要去解析cdc的事件 SourceRecord 的key和value。
@Slf4j
@Order(2)
@Component
public class DebeziumEmbeddedRunner implements CommandLineRunner {
@Autowired
private io.debezium.config.Configuration embeddedConfig;
@Autowired
private JdbcTemplate jdbcTemplate;
@Autowired
private NamedParameterJdbcTemplate namedTemplate;
@Autowired
private JsonConverter keyConverter;
@Autowired
private JsonConverter valueConverter;
@Override
public void run(String... args) throws Exception {
EmbeddedEngine engine = EmbeddedEngine.create()
.using(embeddedConfig)
.using(this.getClass().getClassLoader())
.using(Clock.SYSTEM)
.notifying(this::handleRecord)
.build();
ExecutorService executor = Executors.newSingleThreadExecutor();
executor.execute(engine);
shutdownHook(engine);
awaitTermination(executor);
}
/**
* For every record this method will be invoked.
*/
private void handleRecord(SourceRecord record) {
logRecord(record);
Struct payload = (Struct) record.value();
if (Objects.isNull(payload)) {
return;
}
String table = Optional.ofNullable(DebeziumRecordUtils.getRecordStructValue(payload, "source"))
.map(s->s.getString("table")).orElse(null);
// // 处理数据DML
Envelope.Operation operation = DebeziumRecordUtils.getOperation(payload);
if (Objects.nonNull(operation)) {
Struct key = (Struct) record.key();
handleDML(key, payload, table, operation);
return;
}
//
// // 处理结构DDL
String ddl = getDDL(payload);
if (StringUtils.isNotBlank(ddl)) {
handleDDL(ddl);
}
}
private String getDDL(Struct payload) {
String ddl = DebeziumRecordUtils.getDDL(payload);
if (StringUtils.isBlank(ddl)) {
return null;
}
String db = DebeziumRecordUtils.getDatabaseName(payload);
if (StringUtils.isBlank(db)) {
db = embeddedConfig.getString(MySqlConnectorConfig.DATABASE_WHITELIST);
}
ddl = ddl.replace(db + ".", "");
ddl = ddl.replace("`" + db + "`.", "");
return ddl;
}
/**
* 执行数据库ddl语句
*
* @param ddl
*/
private void handleDDL(String ddl) {
log.info("ddl语句 : {}", ddl);
try {
jdbcTemplate.execute(ddl);
} catch (Exception e) {
log.error("数据库操作DDL语句失败,", e);
}
}
/**
* 处理insert,update,delete等DML语句
*
* @param key 表主键修改事件结构
* @param payload 表正文响应
* @param table 表名
* @param operation DML操作类型
*/
private void handleDML(Struct key, Struct payload, String table, Envelope.Operation operation) {
AbstractDebeziumSqlProvider provider = DebeziumSqlProviderFactory.getProvider(operation);
if (Objects.isNull(provider)) {
log.error("没有找到 sql 处理器提供者.");
return;
}
String sql = provider.getSql(key, payload, table);
if (StringUtils.isBlank(sql)) {
log.error("找不到sql.");
return;
}
try {
log.info("dml语句 : {}", sql);
namedTemplate.update(sql, provider.getSqlParameterMap());
} catch (Exception e) {
log.error("数据库DML操作失败,", e);
}
}
/**
* 打印消息
*
* @param record
*/
private void logRecord(SourceRecord record) {
final byte[] payload = valueConverter.fromConnectData("dummy", record.valueSchema(), record.value());
final byte[] key = keyConverter.fromConnectData("dummy", record.keySchema(), record.key());
log.info("Publishing Topic --> {}", record.topic());
log.info("Key --> {}", new String(key));
log.info("Payload --> {}", new String(payload));
}
private void shutdownHook(EmbeddedEngine engine) {
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
log.info("Requesting embedded engine to shut down");
engine.stop();
}));
}
private void awaitTermination(ExecutorService executor) {
try {
while (!executor.awaitTermination(10L, TimeUnit.SECONDS)) {
log.info("Waiting another 10 seconds for the embedded engine to shut down");
}
} catch (InterruptedException e) {
Thread.interrupted();
}
}
}
复制代码
provider和table字段解析器太多,这里就不在一一列出来了,如下图所示,支持mysql大部分字段类型。如果有需要的可以关注微信公众号或者邮件以及评论回复。
测试表结构
CREATE TABLE `demo` (
`id` int(10) NOT NULL AUTO_INCREMENT,
`bigint_id` bigint(20) NOT NULL,
`var_name` varchar(255) NOT NULL,
`ex_tinyint` tinyint(4) DEFAULT NULL,
`ex_char` char(255) DEFAULT NULL,
`ex_json` json DEFAULT NULL COMMENT '水电费',
`ex_text` text,
`ex_year` year(4) DEFAULT NULL,
`ex_time` time DEFAULT NULL,
`ex_date` date DEFAULT NULL,
`ex_datetime` datetime DEFAULT NULL,
`ex_timestamp` timestamp NULL DEFAULT NULL,
`ex_blob` blob,
`ex_tinyblob` tinyblob,
`ex_binary` binary(255) DEFAULT NULL,
`ex_double` double(10,4) DEFAULT NULL,
`ex_float` float(10,2) DEFAULT NULL,
`ex_decimal` decimal(10,2) DEFAULT NULL,
`ex_numeric` decimal(10,4) DEFAULT NULL,
`ex_real` double(10,4) DEFAULT NULL,
`ex_bit` bit(1) DEFAULT NULL,
`ex_enum` enum('123','@@','22','水电费') DEFAULT '123',
`ex_set` set('a','b','c','d') DEFAULT NULL,
`ex_geometry` geometry DEFAULT NULL,
`ex_point` point DEFAULT NULL,
`ex_linestring` linestring DEFAULT NULL,
`ex_polygon` polygon DEFAULT NULL,
`ex_geometrycollection` geometrycollection DEFAULT NULL,
`ex_multipoint` multipoint DEFAULT NULL,
PRIMARY KEY (`id`,`bigint_id`,`var_name`) USING BTREE
) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8;
复制代码
结果
####DDL事件
可以看出将数据库表的 bigint_id 字段长度改为21,监听到事件后:执行了ddl语句, inventory_back 库中的 demo 表的 bigint_id 字段长度改为21了。
Publishing Topic --> dbserver1
2019-06-24 16:22:21.230 INFO 14995 --- [pool-1-thread-1] c.e.embedded.DebeziumEmbeddedRunner : Key --> {"schema":{"type":"struct","fields":[{"type":"string","optional":false,"field":"databaseName"}],"optional":false,"name":"io.debezium.connector.mysql.SchemaChangeKey"},"payload":{"databaseName":"inventory"}}
2019-06-24 16:22:21.230 INFO 14995 --- [pool-1-thread-1] c.e.embedded.DebeziumEmbeddedRunner : Payload --> {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"string","optional":true,"field":"version"},{"type":"string","optional":true,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"server_id"},{"type":"int64","optional":false,"field":"ts_sec"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"boolean","optional":true,"default":false,"field":"snapshot"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"db"},{"type":"string","optional":true,"field":"table"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"databaseName"},{"type":"string","optional":false,"field":"ddl"}],"optional":false,"name":"io.debezium.connector.mysql.SchemaChangeValue"},"payload":{"source":{"version":"0.9.3.Final","connector":"mysql","name":"dbserver1","server_id":223344,"ts_sec":1561364540,"gtid":null,"file":"mysql-bin.000006","pos":22530,"row":0,"snapshot":false,"thread":null,"db":null,"table":null,"query":null},"databaseName":"inventory","ddl":"ALTER TABLE `inventory`.`demo` \nMODIFY COLUMN `bigint_id` bigint(21) NOT NULL AFTER `id`"}}
2019-06-24 16:22:21.230 ERROR 14995 --- [pool-1-thread-1] c.example.embedded.DebeziumRecordUtils : not find op field.
2019-06-24 16:22:21.231 INFO 14995 --- [pool-1-thread-1] c.e.embedded.DebeziumEmbeddedRunner : ddl语句 : ALTER TABLE `demo`
MODIFY COLUMN `bigint_id` bigint(21) NOT NULL AFTER `id`
复制代码
####DML的insert事件
在 inventory 库中的 demo 新增一条记录后有如下日志记录,能查看到topic,key,payload以及dml的insert语句。结果会把数据同步到 inventory_back 库中的 demo 。
2019-06-24 16:27:14.735 INFO 14995 --- [pool-1-thread-1] i.debezium.connector.mysql.BinlogReader : 1 records sent during previous 00:04:53.506, last recorded offset: {ts_sec=1561364834, file=mysql-bin.000006, pos=23002, row=1, server_id=223344, event=2}
2019-06-24 16:27:14.737 INFO 14995 --- [pool-1-thread-1] c.e.embedded.DebeziumEmbeddedRunner : Publishing Topic --> dbserver1.inventory.demo
2019-06-24 16:27:14.737 INFO 14995 --- [pool-1-thread-1] c.e.embedded.DebeziumEmbeddedRunner : Key --> {"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"bigint_id"},{"type":"string","optional":false,"field":"var_name"}],"optional":false,"name":"dbserver1.inventory.demo.Key"},"payload":{"id":2,"bigint_id":1,"var_name":"老王"}}
2019-06-24 16:27:14.738 INFO 14995 --- [pool-1-thread-1] c.e.embedded.DebeziumEmbeddedRunner : Payload --> {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"bigint_id"},{"type":"string","optional":false,"field":"var_name"},{"type":"int16","optional":true,"field":"ex_tinyint"},{"type":"string","optional":true,"field":"ex_char"},{"type":"string","optional":true,"name":"io.debezium.data.Json","version":1,"field":"ex_json"},{"type":"string","optional":true,"field":"ex_text"},{"type":"int32","optional":true,"name":"io.debezium.time.Year","version":1,"field":"ex_year"},{"type":"int64","optional":true,"name":"io.debezium.time.MicroTime","version":1,"field":"ex_time"},{"type":"int32","optional":true,"name":"io.debezium.time.Date","version":1,"field":"ex_date"},{"type":"int64","optional":true,"name":"io.debezium.time.Timestamp","version":1,"field":"ex_datetime"},{"type":"string","optional":true,"name":"io.debezium.time.ZonedTimestamp","version":1,"field":"ex_timestamp"},{"type":"bytes","optional":true,"field":"ex_blob"},{"type":"bytes","optional":true,"field":"ex_tinyblob"},{"type":"bytes","optional":true,"field":"ex_binary"},{"type":"double","optional":true,"field":"ex_double"},{"type":"double","optional":true,"field":"ex_float"},{"type":"bytes","optional":true,"name":"org.apache.kafka.connect.data.Decimal","version":1,"parameters":{"scale":"2","connect.decimal.precision":"10"},"field":"ex_decimal"},{"type":"bytes","optional":true,"name":"org.apache.kafka.connect.data.Decimal","version":1,"parameters":{"scale":"4","connect.decimal.precision":"10"},"field":"ex_numeric"},{"type":"double","optional":true,"field":"ex_real"},{"type":"boolean","optional":true,"field":"ex_bit"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"123,22"},"default":"123","field":"ex_enum"},{"type":"string","optional":true,"name":"io.debezium.data.EnumSet","version":1,"parameters":{"allowed":"a,b,c,d"},"field":"ex_set"},{"type":"struct","fields":[{"type":"bytes","optional":false,"field":"wkb"},{"type":"int32","optional":true,"field":"srid"}],"optional":true,"name":"io.debezium.data.geometry.Geometry","version":1,"doc":"Geometry","field":"ex_geometry"},{"type":"struct","fields":[{"type":"double","optional":false,"field":"x"},{"type":"double","optional":false,"field":"y"},{"type":"bytes","optional":true,"field":"wkb"},{"type":"int32","optional":true,"field":"srid"}],"optional":true,"name":"io.debezium.data.geometry.Point","version":1,"doc":"Geometry (POINT)","field":"ex_point"},{"type":"struct","fields":[{"type":"bytes","optional":false,"field":"wkb"},{"type":"int32","optional":true,"field":"srid"}],"optional":true,"name":"io.debezium.data.geometry.Geometry","version":1,"doc":"Geometry","field":"ex_linestring"},{"type":"struct","fields":[{"type":"bytes","optional":false,"field":"wkb"},{"type":"int32","optional":true,"field":"srid"}],"optional":true,"name":"io.debezium.data.geometry.Geometry","version":1,"doc":"Geometry","field":"ex_polygon"},{"type":"struct","fields":[{"type":"bytes","optional":false,"field":"wkb"},{"type":"int32","optional":true,"field":"srid"}],"optional":true,"name":"io.debezium.data.geometry.Geometry","version":1,"doc":"Geometry","field":"ex_geometrycollection"},{"type":"struct","fields":[{"type":"bytes","optional":false,"field":"wkb"},{"type":"int32","optional":true,"field":"srid"}],"optional":true,"name":"io.debezium.data.geometry.Geometry","version":1,"doc":"Geometry","field":"ex_multipoint"}],"optional":true,"name":"dbserver1.inventory.demo.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"bigint_id"},{"type":"string","optional":false,"field":"var_name"},{"type":"int16","optional":true,"field":"ex_tinyint"},{"type":"string","optional":true,"field":"ex_char"},{"type":"string","optional":true,"name":"io.debezium.data.Json","version":1,"field":"ex_json"},{"type":"string","optional":true,"field":"ex_text"},{"type":"int32","optional":true,"name":"io.debezium.time.Year","version":1,"field":"ex_year"},{"type":"int64","optional":true,"name":"io.debezium.time.MicroTime","version":1,"field":"ex_time"},{"type":"int32","optional":true,"name":"io.debezium.time.Date","version":1,"field":"ex_date"},{"type":"int64","optional":true,"name":"io.debezium.time.Timestamp","version":1,"field":"ex_datetime"},{"type":"string","optional":true,"name":"io.debezium.time.ZonedTimestamp","version":1,"field":"ex_timestamp"},{"type":"bytes","optional":true,"field":"ex_blob"},{"type":"bytes","optional":true,"field":"ex_tinyblob"},{"type":"bytes","optional":true,"field":"ex_binary"},{"type":"double","optional":true,"field":"ex_double"},{"type":"double","optional":true,"field":"ex_float"},{"type":"bytes","optional":true,"name":"org.apache.kafka.connect.data.Decimal","version":1,"parameters":{"scale":"2","connect.decimal.precision":"10"},"field":"ex_decimal"},{"type":"bytes","optional":true,"name":"org.apache.kafka.connect.data.Decimal","version":1,"parameters":{"scale":"4","connect.decimal.precision":"10"},"field":"ex_numeric"},{"type":"double","optional":true,"field":"ex_real"},{"type":"boolean","optional":true,"field":"ex_bit"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"123,22"},"default":"123","field":"ex_enum"},{"type":"string","optional":true,"name":"io.debezium.data.EnumSet","version":1,"parameters":{"allowed":"a,b,c,d"},"field":"ex_set"},{"type":"struct","fields":[{"type":"bytes","optional":false,"field":"wkb"},{"type":"int32","optional":true,"field":"srid"}],"optional":true,"name":"io.debezium.data.geometry.Geometry","version":1,"doc":"Geometry","field":"ex_geometry"},{"type":"struct","fields":[{"type":"double","optional":false,"field":"x"},{"type":"double","optional":false,"field":"y"},{"type":"bytes","optional":true,"field":"wkb"},{"type":"int32","optional":true,"field":"srid"}],"optional":true,"name":"io.debezium.data.geometry.Point","version":1,"doc":"Geometry (POINT)","field":"ex_point"},{"type":"struct","fields":[{"type":"bytes","optional":false,"field":"wkb"},{"type":"int32","optional":true,"field":"srid"}],"optional":true,"name":"io.debezium.data.geometry.Geometry","version":1,"doc":"Geometry","field":"ex_linestring"},{"type":"struct","fields":[{"type":"bytes","optional":false,"field":"wkb"},{"type":"int32","optional":true,"field":"srid"}],"optional":true,"name":"io.debezium.data.geometry.Geometry","version":1,"doc":"Geometry","field":"ex_polygon"},{"type":"struct","fields":[{"type":"bytes","optional":false,"field":"wkb"},{"type":"int32","optional":true,"field":"srid"}],"optional":true,"name":"io.debezium.data.geometry.Geometry","version":1,"doc":"Geometry","field":"ex_geometrycollection"},{"type":"struct","fields":[{"type":"bytes","optional":false,"field":"wkb"},{"type":"int32","optional":true,"field":"srid"}],"optional":true,"name":"io.debezium.data.geometry.Geometry","version":1,"doc":"Geometry","field":"ex_multipoint"}],"optional":true,"name":"dbserver1.inventory.demo.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":true,"field":"version"},{"type":"string","optional":true,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"server_id"},{"type":"int64","optional":false,"field":"ts_sec"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"boolean","optional":true,"default":false,"field":"snapshot"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"db"},{"type":"string","optional":true,"field":"table"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"}],"optional":false,"name":"dbserver1.inventory.demo.Envelope"},"payload":{"before":null,"after":{"id":2,"bigint_id":1,"var_name":"老王","ex_tinyint":1,"ex_char":"a","ex_json":"{\"abc\":123}","ex_text":"ert","ex_year":2019,"ex_time":59224000000,"ex_date":null,"ex_datetime":null,"ex_timestamp":null,"ex_blob":null,"ex_tinyblob":null,"ex_binary":null,"ex_double":null,"ex_float":null,"ex_decimal":null,"ex_numeric":null,"ex_real":null,"ex_bit":null,"ex_enum":"123","ex_set":null,"ex_geometry":null,"ex_point":null,"ex_linestring":null,"ex_polygon":null,"ex_geometrycollection":null,"ex_multipoint":null},"source":{"version":"0.9.3.Final","connector":"mysql","name":"dbserver1","server_id":223344,"ts_sec":1561364834,"gtid":null,"file":"mysql-bin.000006","pos":23194,"row":0,"snapshot":false,"thread":9,"db":"inventory","table":"demo","query":null},"op":"c","ts_ms":1561364834477}}
2019-06-24 16:27:14.738 INFO 14995 --- [pool-1-thread-1] c.e.embedded.DebeziumEmbeddedRunner : dml语句 : insert into demo values (:id,:bigint_id,:var_name,:ex_tinyint,:ex_char,:ex_json,:ex_text,:ex_year,:ex_time,:ex_date,:ex_datetime,:ex_timestamp,:ex_blob,:ex_tinyblob,:ex_binary,:ex_double,:ex_float,:ex_decimal,:ex_numeric,:ex_real,:ex_bit,:ex_enum,:ex_set,:ex_geometry,:ex_point,:ex_linestring,:ex_polygon,:ex_geometrycollection,:ex_multipoint)
复制代码
DML的update事件
在 inventory 库中的 demo 修改刚刚新增的记录后有如下日志记录,能查看到topic,key,payload以及先delete再insert语句。结果会把数据同步到 inventory_back 库中的 demo 。
以上所述就是小编给大家介绍的《debezium关于cdc的使用(上)》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!
猜你喜欢:- RecyclerView使用指南(一)—— 基本使用
- 如何使用Meteorjs使用URL参数
- 使用 defer 还是不使用 defer?
- 使用 Typescript 加强 Vuex 使用体验
- [译] 何时使用 Rust?何时使用 Go?
- UDP协议的正确使用场合(谨慎使用)
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
Algorithms on Strings, Trees and Sequences
Dan Gusfield / Cambridge University Press / 1997-5-28 / USD 99.99
String algorithms are a traditional area of study in computer science. In recent years their importance has grown dramatically with the huge increase of electronically stored text and of molecular seq......一起来看看 《Algorithms on Strings, Trees and Sequences》 这本书的介绍吧!