debezium关于cdc的使用(上)

栏目: 数据库 · 发布时间: 5年前

内容简介:debezium是一个为了捕获数据变更(cdc)的开源的分布式平台。启动并指向数据库,当其他应用对此数据库执行本篇文章主要使用下篇主要使用kafka connector来同步更新到数据库。

debezium是一个为了捕获数据变更(cdc)的开源的分布式平台。启动并指向数据库,当其他应用对此数据库执行 insertsupdatesdelete 操作时,此应用快速得到响应。debezium是持久化和快速响应的,因此你的应用可以快速响应且不会丢失任意一条事件。debezium记录是数据库表的行级别的变更事件。同时debezium是构建在kafka之上的,同时与kafka深度耦合,所以提供kafka connector来使用,debezium sink。支持的数据库有 mysqlMongoDB 、PostgreSQL、Oracle、SQL server。本篇以mysql作为数据源来实现功能,监听msyql的binlog,还需要修改。当前版本是0.9.5.Final,0.10版本正在开发中。

配置

本篇文章主要使用 Embedding 形式监听事件,并同步更新到数据库。

下篇主要使用kafka connector来同步更新到数据库。

mysql需要如下开启binlog。但是如果使用的是debezium/mysql镜像,自动已经配置好了。

log-bin=mysql-bin #添加这一行就ok
binlog-format=ROW #选择row模式
server_id=1 #配置mysql replaction需要定义,不能和canal的slaveId重复
复制代码

Tutorial

先来一个效果,主要是配置kafka connector来获取debezium事件记录。需要3个服务,zookeeper、kakfa和debezium connector。这里使用 docker 来启动的,所以需要先按照docker。

启动zookeeper

docker run -it --rm --name zookeeper -p 2181:2181 -p 2888:2888 -p 3888:3888 debezium/zookeeper:0.9
复制代码

启动kafka

docker run -it --rm --name kafka -p 9092:9092 --link zookeeper:zookeeper debezium/kafka:0.9
复制代码

启动mysql

docker run -it --rm --name mysql -p 3306:3306 -e MYSQL_ROOT_PASSWORD=debezium -e MYSQL_USER=mysqluser -e MYSQL_PASSWORD=mysqlpw debezium/example-mysql:0.9
复制代码

启动kafka connect

docker run -it --rm --name connect -p 8083:8083 -e GROUP_ID=1 -e CONFIG_STORAGE_TOPIC=my_connect_configs -e OFFSET_STORAGE_TOPIC=my_connect_offsets -e STATUS_STORAGE_TOPIC=my_connect_statuses --link zookeeper:zookeeper --link kafka:kafka --link mysql:mysql debezium/connect:0.9
复制代码

通过connect的http请求创建debezium connector

curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d '{ "name": "inventory-connector", "config": { "connector.class": "io.debezium.connector.mysql.MySqlConnector", "tasks.max": "1", "database.hostname": "mysql", "database.port": "3306", "database.user": "debezium", "database.password": "dbz", "database.server.id": "184054", "database.server.name": "dbserver1", "database.whitelist": "inventory", "database.history.kafka.bootstrap.servers": "kafka:9092", "database.history.kafka.topic": "dbhistory.inventory" } }'
复制代码

mysql客户端操作

通过 invertory 数据库了的任一表的数据

创建监听可以查看debezium事件记录

docker run -it --name watcher --rm --link zookeeper:zookeeper --link kafka:kafka debezium/kafka:0.9 watch-topic -a -k dbserver1.inventory.customers
复制代码

内嵌式

这里主要使用内嵌式的方式获取cdc事件而不需要使用kafka,直接消费debezium事件流。场景是在某一个mysql数据库里的table发生变更,把变更同步到另一mysql数据库。本次使用的是监听 inventory 数据库并将数据同步到 inventory_back

###debezium配置

connector.class=io.debezium.connector.mysql.MySqlConnector
offset.storage=org.apache.kafka.connect.storage.FileOffsetBackingStore
offset.storage.file.filename=offset.dat
offset.flush.interval.ms=60000

name=debezium-kafka-source
database.hostname=localhost
database.port=3306
database.user=debezium
database.password=dbz
#database.dbname=inventory
database.whitelist=inventory
#database.whitelist=inventory,inventory_back
server.id=184054
database.server.name=dbserver1
#transforms=unwrap
#transforms.unwrap.type=io.debezium.transforms.UnwrapFromEnvelope
#transforms.unwrap.drop.tombstones=false

database.history=io.debezium.relational.history.FileDatabaseHistory
database.history.file.filename=dbhistory.dat
复制代码

属性和convert配置

@Slf4j
@Configuration
public class DebeziumEmbeddedAutoConfiguration {

    @Bean
    public Properties embeddedProperties() {
        Properties propConfig = new Properties();
        try(InputStream propsInputStream = getClass().getClassLoader().getResourceAsStream("config.properties")) {
            propConfig.load(propsInputStream);
        } catch (IOException e) {
            log.error("Couldn't load properties", e);
        }
        PropertyLoader.loadEnvironmentValues(propConfig);
        return propConfig;
    }

    @Bean
    public io.debezium.config.Configuration embeddedConfig(Properties embeddedProperties) {
         return io.debezium.config.Configuration.from(embeddedProperties);
    }

    @Bean
    public JsonConverter keyConverter(io.debezium.config.Configuration embeddedConfig) {
        JsonConverter converter = new JsonConverter();
        converter.configure(embeddedConfig.asMap(), true);
        return converter;
    }

    @Bean
    public JsonConverter valueConverter(io.debezium.config.Configuration embeddedConfig) {
        JsonConverter converter = new JsonConverter();
        converter.configure(embeddedConfig.asMap(), false);
        return converter;
    }

}
复制代码

同步DDL和DML

这里主要是利用CommandLineRunner特性,启动debezium的EmbeddedEngine引擎,获取到cdc事件后由 handleRecord 处理DDL和DML,需要去解析cdc的事件 SourceRecord 的key和value。

@Slf4j
@Order(2)
@Component
public class DebeziumEmbeddedRunner implements CommandLineRunner {

    @Autowired
    private io.debezium.config.Configuration embeddedConfig;

    @Autowired
    private JdbcTemplate jdbcTemplate;

    @Autowired
    private NamedParameterJdbcTemplate namedTemplate;

    @Autowired
    private JsonConverter keyConverter;

    @Autowired
    private JsonConverter valueConverter;

    @Override
    public void run(String... args) throws Exception {
        EmbeddedEngine engine = EmbeddedEngine.create()
                .using(embeddedConfig)
                .using(this.getClass().getClassLoader())
                .using(Clock.SYSTEM)
                .notifying(this::handleRecord)
                .build();

        ExecutorService executor = Executors.newSingleThreadExecutor();

        executor.execute(engine);

        shutdownHook(engine);

        awaitTermination(executor);
    }

    /**
     * For every record this method will be invoked.
     */
    private void handleRecord(SourceRecord record) {
        logRecord(record);

        Struct payload = (Struct) record.value();
        if (Objects.isNull(payload)) {
            return;
        }
        String table = Optional.ofNullable(DebeziumRecordUtils.getRecordStructValue(payload, "source"))
                .map(s->s.getString("table")).orElse(null);

//        // 处理数据DML
        Envelope.Operation operation = DebeziumRecordUtils.getOperation(payload);
        if (Objects.nonNull(operation)) {
            Struct key = (Struct) record.key();
            handleDML(key, payload, table, operation);
            return;
        }
//
//        // 处理结构DDL
        String ddl = getDDL(payload);
        if (StringUtils.isNotBlank(ddl)) {
            handleDDL(ddl);
        }
    }

    private String getDDL(Struct payload) {
        String ddl = DebeziumRecordUtils.getDDL(payload);
        if (StringUtils.isBlank(ddl)) {
            return null;
        }
        String db = DebeziumRecordUtils.getDatabaseName(payload);
        if (StringUtils.isBlank(db)) {
            db = embeddedConfig.getString(MySqlConnectorConfig.DATABASE_WHITELIST);
        }
        ddl = ddl.replace(db + ".", "");
        ddl = ddl.replace("`" + db + "`.", "");
        return ddl;
    }

    /**
     * 执行数据库ddl语句
     *
     * @param ddl
     */
    private void handleDDL(String ddl) {
        log.info("ddl语句 : {}", ddl);
        try {
            jdbcTemplate.execute(ddl);
        } catch (Exception e) {
            log.error("数据库操作DDL语句失败,", e);
        }
    }

    /**
     * 处理insert,update,delete等DML语句
     *
     * @param key       表主键修改事件结构
     * @param payload   表正文响应
     * @param table     表名
     * @param operation DML操作类型
     */
    private void handleDML(Struct key, Struct payload, String table, Envelope.Operation operation) {
        AbstractDebeziumSqlProvider provider = DebeziumSqlProviderFactory.getProvider(operation);
        if (Objects.isNull(provider)) {
            log.error("没有找到 sql 处理器提供者.");
            return;
        }

        String sql = provider.getSql(key, payload, table);
        if (StringUtils.isBlank(sql)) {
            log.error("找不到sql.");
            return;
        }

        try {
            log.info("dml语句 : {}", sql);
            namedTemplate.update(sql, provider.getSqlParameterMap());
        } catch (Exception e) {
            log.error("数据库DML操作失败,", e);
        }
    }

    /**
     * 打印消息
     *
     * @param record
     */
    private void logRecord(SourceRecord record) {
        final byte[] payload = valueConverter.fromConnectData("dummy", record.valueSchema(), record.value());
        final byte[] key = keyConverter.fromConnectData("dummy", record.keySchema(), record.key());
        log.info("Publishing Topic --> {}", record.topic());
        log.info("Key --> {}", new String(key));
        log.info("Payload --> {}", new String(payload));
    }

    private void shutdownHook(EmbeddedEngine engine) {
        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            log.info("Requesting embedded engine to shut down");
            engine.stop();
        }));
    }

    private void awaitTermination(ExecutorService executor) {
        try {
            while (!executor.awaitTermination(10L, TimeUnit.SECONDS)) {
                log.info("Waiting another 10 seconds for the embedded engine to shut down");
            }
        } catch (InterruptedException e) {
            Thread.interrupted();
        }
    }

}
复制代码

provider和table字段解析器太多,这里就不在一一列出来了,如下图所示,支持mysql大部分字段类型。如果有需要的可以关注微信公众号或者邮件以及评论回复。

debezium关于cdc的使用(上)

测试表结构

CREATE TABLE `demo` (
  `id` int(10) NOT NULL AUTO_INCREMENT,
  `bigint_id` bigint(20) NOT NULL,
  `var_name` varchar(255) NOT NULL,
  `ex_tinyint` tinyint(4) DEFAULT NULL,
  `ex_char` char(255) DEFAULT NULL,
  `ex_json` json DEFAULT NULL COMMENT '水电费',
  `ex_text` text,
  `ex_year` year(4) DEFAULT NULL,
  `ex_time` time DEFAULT NULL,
  `ex_date` date DEFAULT NULL,
  `ex_datetime` datetime DEFAULT NULL,
  `ex_timestamp` timestamp NULL DEFAULT NULL,
  `ex_blob` blob,
  `ex_tinyblob` tinyblob,
  `ex_binary` binary(255) DEFAULT NULL,
  `ex_double` double(10,4) DEFAULT NULL,
  `ex_float` float(10,2) DEFAULT NULL,
  `ex_decimal` decimal(10,2) DEFAULT NULL,
  `ex_numeric` decimal(10,4) DEFAULT NULL,
  `ex_real` double(10,4) DEFAULT NULL,
  `ex_bit` bit(1) DEFAULT NULL,
  `ex_enum` enum('123','@@','22','水电费') DEFAULT '123',
  `ex_set` set('a','b','c','d') DEFAULT NULL,
  `ex_geometry` geometry DEFAULT NULL,
  `ex_point` point DEFAULT NULL,
  `ex_linestring` linestring DEFAULT NULL,
  `ex_polygon` polygon DEFAULT NULL,
  `ex_geometrycollection` geometrycollection DEFAULT NULL,
  `ex_multipoint` multipoint DEFAULT NULL,
  PRIMARY KEY (`id`,`bigint_id`,`var_name`) USING BTREE
) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8;
复制代码

结果

####DDL事件

可以看出将数据库表的 bigint_id 字段长度改为21,监听到事件后:执行了ddl语句, inventory_back 库中的 demo 表的 bigint_id 字段长度改为21了。

Publishing Topic --> dbserver1
2019-06-24 16:22:21.230  INFO 14995 --- [pool-1-thread-1] c.e.embedded.DebeziumEmbeddedRunner      : Key --> {"schema":{"type":"struct","fields":[{"type":"string","optional":false,"field":"databaseName"}],"optional":false,"name":"io.debezium.connector.mysql.SchemaChangeKey"},"payload":{"databaseName":"inventory"}}
2019-06-24 16:22:21.230  INFO 14995 --- [pool-1-thread-1] c.e.embedded.DebeziumEmbeddedRunner      : Payload --> {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"string","optional":true,"field":"version"},{"type":"string","optional":true,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"server_id"},{"type":"int64","optional":false,"field":"ts_sec"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"boolean","optional":true,"default":false,"field":"snapshot"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"db"},{"type":"string","optional":true,"field":"table"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"databaseName"},{"type":"string","optional":false,"field":"ddl"}],"optional":false,"name":"io.debezium.connector.mysql.SchemaChangeValue"},"payload":{"source":{"version":"0.9.3.Final","connector":"mysql","name":"dbserver1","server_id":223344,"ts_sec":1561364540,"gtid":null,"file":"mysql-bin.000006","pos":22530,"row":0,"snapshot":false,"thread":null,"db":null,"table":null,"query":null},"databaseName":"inventory","ddl":"ALTER TABLE `inventory`.`demo` \nMODIFY COLUMN `bigint_id` bigint(21) NOT NULL AFTER `id`"}}
2019-06-24 16:22:21.230 ERROR 14995 --- [pool-1-thread-1] c.example.embedded.DebeziumRecordUtils   : not find op field.
2019-06-24 16:22:21.231  INFO 14995 --- [pool-1-thread-1] c.e.embedded.DebeziumEmbeddedRunner      : ddl语句 : ALTER TABLE `demo` 
MODIFY COLUMN `bigint_id` bigint(21) NOT NULL AFTER `id`
复制代码
debezium关于cdc的使用(上)

####DML的insert事件

inventory 库中的 demo 新增一条记录后有如下日志记录,能查看到topic,key,payload以及dml的insert语句。结果会把数据同步到 inventory_back 库中的 demo

2019-06-24 16:27:14.735  INFO 14995 --- [pool-1-thread-1] i.debezium.connector.mysql.BinlogReader  : 1 records sent during previous 00:04:53.506, last recorded offset: {ts_sec=1561364834, file=mysql-bin.000006, pos=23002, row=1, server_id=223344, event=2}
2019-06-24 16:27:14.737  INFO 14995 --- [pool-1-thread-1] c.e.embedded.DebeziumEmbeddedRunner      : Publishing Topic --> dbserver1.inventory.demo
2019-06-24 16:27:14.737  INFO 14995 --- [pool-1-thread-1] c.e.embedded.DebeziumEmbeddedRunner      : Key --> {"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"bigint_id"},{"type":"string","optional":false,"field":"var_name"}],"optional":false,"name":"dbserver1.inventory.demo.Key"},"payload":{"id":2,"bigint_id":1,"var_name":"老王"}}
2019-06-24 16:27:14.738  INFO 14995 --- [pool-1-thread-1] c.e.embedded.DebeziumEmbeddedRunner      : Payload --> {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"bigint_id"},{"type":"string","optional":false,"field":"var_name"},{"type":"int16","optional":true,"field":"ex_tinyint"},{"type":"string","optional":true,"field":"ex_char"},{"type":"string","optional":true,"name":"io.debezium.data.Json","version":1,"field":"ex_json"},{"type":"string","optional":true,"field":"ex_text"},{"type":"int32","optional":true,"name":"io.debezium.time.Year","version":1,"field":"ex_year"},{"type":"int64","optional":true,"name":"io.debezium.time.MicroTime","version":1,"field":"ex_time"},{"type":"int32","optional":true,"name":"io.debezium.time.Date","version":1,"field":"ex_date"},{"type":"int64","optional":true,"name":"io.debezium.time.Timestamp","version":1,"field":"ex_datetime"},{"type":"string","optional":true,"name":"io.debezium.time.ZonedTimestamp","version":1,"field":"ex_timestamp"},{"type":"bytes","optional":true,"field":"ex_blob"},{"type":"bytes","optional":true,"field":"ex_tinyblob"},{"type":"bytes","optional":true,"field":"ex_binary"},{"type":"double","optional":true,"field":"ex_double"},{"type":"double","optional":true,"field":"ex_float"},{"type":"bytes","optional":true,"name":"org.apache.kafka.connect.data.Decimal","version":1,"parameters":{"scale":"2","connect.decimal.precision":"10"},"field":"ex_decimal"},{"type":"bytes","optional":true,"name":"org.apache.kafka.connect.data.Decimal","version":1,"parameters":{"scale":"4","connect.decimal.precision":"10"},"field":"ex_numeric"},{"type":"double","optional":true,"field":"ex_real"},{"type":"boolean","optional":true,"field":"ex_bit"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"123,22"},"default":"123","field":"ex_enum"},{"type":"string","optional":true,"name":"io.debezium.data.EnumSet","version":1,"parameters":{"allowed":"a,b,c,d"},"field":"ex_set"},{"type":"struct","fields":[{"type":"bytes","optional":false,"field":"wkb"},{"type":"int32","optional":true,"field":"srid"}],"optional":true,"name":"io.debezium.data.geometry.Geometry","version":1,"doc":"Geometry","field":"ex_geometry"},{"type":"struct","fields":[{"type":"double","optional":false,"field":"x"},{"type":"double","optional":false,"field":"y"},{"type":"bytes","optional":true,"field":"wkb"},{"type":"int32","optional":true,"field":"srid"}],"optional":true,"name":"io.debezium.data.geometry.Point","version":1,"doc":"Geometry (POINT)","field":"ex_point"},{"type":"struct","fields":[{"type":"bytes","optional":false,"field":"wkb"},{"type":"int32","optional":true,"field":"srid"}],"optional":true,"name":"io.debezium.data.geometry.Geometry","version":1,"doc":"Geometry","field":"ex_linestring"},{"type":"struct","fields":[{"type":"bytes","optional":false,"field":"wkb"},{"type":"int32","optional":true,"field":"srid"}],"optional":true,"name":"io.debezium.data.geometry.Geometry","version":1,"doc":"Geometry","field":"ex_polygon"},{"type":"struct","fields":[{"type":"bytes","optional":false,"field":"wkb"},{"type":"int32","optional":true,"field":"srid"}],"optional":true,"name":"io.debezium.data.geometry.Geometry","version":1,"doc":"Geometry","field":"ex_geometrycollection"},{"type":"struct","fields":[{"type":"bytes","optional":false,"field":"wkb"},{"type":"int32","optional":true,"field":"srid"}],"optional":true,"name":"io.debezium.data.geometry.Geometry","version":1,"doc":"Geometry","field":"ex_multipoint"}],"optional":true,"name":"dbserver1.inventory.demo.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"bigint_id"},{"type":"string","optional":false,"field":"var_name"},{"type":"int16","optional":true,"field":"ex_tinyint"},{"type":"string","optional":true,"field":"ex_char"},{"type":"string","optional":true,"name":"io.debezium.data.Json","version":1,"field":"ex_json"},{"type":"string","optional":true,"field":"ex_text"},{"type":"int32","optional":true,"name":"io.debezium.time.Year","version":1,"field":"ex_year"},{"type":"int64","optional":true,"name":"io.debezium.time.MicroTime","version":1,"field":"ex_time"},{"type":"int32","optional":true,"name":"io.debezium.time.Date","version":1,"field":"ex_date"},{"type":"int64","optional":true,"name":"io.debezium.time.Timestamp","version":1,"field":"ex_datetime"},{"type":"string","optional":true,"name":"io.debezium.time.ZonedTimestamp","version":1,"field":"ex_timestamp"},{"type":"bytes","optional":true,"field":"ex_blob"},{"type":"bytes","optional":true,"field":"ex_tinyblob"},{"type":"bytes","optional":true,"field":"ex_binary"},{"type":"double","optional":true,"field":"ex_double"},{"type":"double","optional":true,"field":"ex_float"},{"type":"bytes","optional":true,"name":"org.apache.kafka.connect.data.Decimal","version":1,"parameters":{"scale":"2","connect.decimal.precision":"10"},"field":"ex_decimal"},{"type":"bytes","optional":true,"name":"org.apache.kafka.connect.data.Decimal","version":1,"parameters":{"scale":"4","connect.decimal.precision":"10"},"field":"ex_numeric"},{"type":"double","optional":true,"field":"ex_real"},{"type":"boolean","optional":true,"field":"ex_bit"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"123,22"},"default":"123","field":"ex_enum"},{"type":"string","optional":true,"name":"io.debezium.data.EnumSet","version":1,"parameters":{"allowed":"a,b,c,d"},"field":"ex_set"},{"type":"struct","fields":[{"type":"bytes","optional":false,"field":"wkb"},{"type":"int32","optional":true,"field":"srid"}],"optional":true,"name":"io.debezium.data.geometry.Geometry","version":1,"doc":"Geometry","field":"ex_geometry"},{"type":"struct","fields":[{"type":"double","optional":false,"field":"x"},{"type":"double","optional":false,"field":"y"},{"type":"bytes","optional":true,"field":"wkb"},{"type":"int32","optional":true,"field":"srid"}],"optional":true,"name":"io.debezium.data.geometry.Point","version":1,"doc":"Geometry (POINT)","field":"ex_point"},{"type":"struct","fields":[{"type":"bytes","optional":false,"field":"wkb"},{"type":"int32","optional":true,"field":"srid"}],"optional":true,"name":"io.debezium.data.geometry.Geometry","version":1,"doc":"Geometry","field":"ex_linestring"},{"type":"struct","fields":[{"type":"bytes","optional":false,"field":"wkb"},{"type":"int32","optional":true,"field":"srid"}],"optional":true,"name":"io.debezium.data.geometry.Geometry","version":1,"doc":"Geometry","field":"ex_polygon"},{"type":"struct","fields":[{"type":"bytes","optional":false,"field":"wkb"},{"type":"int32","optional":true,"field":"srid"}],"optional":true,"name":"io.debezium.data.geometry.Geometry","version":1,"doc":"Geometry","field":"ex_geometrycollection"},{"type":"struct","fields":[{"type":"bytes","optional":false,"field":"wkb"},{"type":"int32","optional":true,"field":"srid"}],"optional":true,"name":"io.debezium.data.geometry.Geometry","version":1,"doc":"Geometry","field":"ex_multipoint"}],"optional":true,"name":"dbserver1.inventory.demo.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":true,"field":"version"},{"type":"string","optional":true,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"server_id"},{"type":"int64","optional":false,"field":"ts_sec"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"boolean","optional":true,"default":false,"field":"snapshot"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"db"},{"type":"string","optional":true,"field":"table"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"}],"optional":false,"name":"dbserver1.inventory.demo.Envelope"},"payload":{"before":null,"after":{"id":2,"bigint_id":1,"var_name":"老王","ex_tinyint":1,"ex_char":"a","ex_json":"{\"abc\":123}","ex_text":"ert","ex_year":2019,"ex_time":59224000000,"ex_date":null,"ex_datetime":null,"ex_timestamp":null,"ex_blob":null,"ex_tinyblob":null,"ex_binary":null,"ex_double":null,"ex_float":null,"ex_decimal":null,"ex_numeric":null,"ex_real":null,"ex_bit":null,"ex_enum":"123","ex_set":null,"ex_geometry":null,"ex_point":null,"ex_linestring":null,"ex_polygon":null,"ex_geometrycollection":null,"ex_multipoint":null},"source":{"version":"0.9.3.Final","connector":"mysql","name":"dbserver1","server_id":223344,"ts_sec":1561364834,"gtid":null,"file":"mysql-bin.000006","pos":23194,"row":0,"snapshot":false,"thread":9,"db":"inventory","table":"demo","query":null},"op":"c","ts_ms":1561364834477}}
2019-06-24 16:27:14.738  INFO 14995 --- [pool-1-thread-1] c.e.embedded.DebeziumEmbeddedRunner      : dml语句 : insert into demo values (:id,:bigint_id,:var_name,:ex_tinyint,:ex_char,:ex_json,:ex_text,:ex_year,:ex_time,:ex_date,:ex_datetime,:ex_timestamp,:ex_blob,:ex_tinyblob,:ex_binary,:ex_double,:ex_float,:ex_decimal,:ex_numeric,:ex_real,:ex_bit,:ex_enum,:ex_set,:ex_geometry,:ex_point,:ex_linestring,:ex_polygon,:ex_geometrycollection,:ex_multipoint) 

复制代码
debezium关于cdc的使用(上)

DML的update事件

inventory 库中的 demo 修改刚刚新增的记录后有如下日志记录,能查看到topic,key,payload以及先delete再insert语句。结果会把数据同步到 inventory_back 库中的 demo

debezium关于cdc的使用(上)
debezium关于cdc的使用(上)

以上所述就是小编给大家介绍的《debezium关于cdc的使用(上)》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

Concepts, Techniques, and Models of Computer Programming

Concepts, Techniques, and Models of Computer Programming

Peter Van Roy、Seif Haridi / The MIT Press / 2004-2-20 / USD 78.00

This innovative text presents computer programming as a unified discipline in a way that is both practical and scientifically sound. The book focuses on techniques of lasting value and explains them p......一起来看看 《Concepts, Techniques, and Models of Computer Programming》 这本书的介绍吧!

JSON 在线解析
JSON 在线解析

在线 JSON 格式化工具

UNIX 时间戳转换
UNIX 时间戳转换

UNIX 时间戳转换

HSV CMYK 转换工具
HSV CMYK 转换工具

HSV CMYK互换工具