内容简介:本文介绍如何使用canal增量同步mysql数据库信息到ElasticSearch。(注意:是增量!!!)Canal是一个基于MySQL二进制日志的高性能数据同步系统。Canal广泛用于阿里巴巴集团(包括www.taobao.com),以提供可靠的低延迟增量数据管道,github地址:Canal Server能够解析MySQL binlog并订阅数据更改,而Canal Client可以实现将更改广播到任何地方,例如数据库和Apache Kafka。
本文介绍如何使用canal增量同步 mysql 数据库信息到ElasticSearch。(注意:是增量!!!)
1.简介
1.1 canal介绍
Canal是一个基于MySQL二进制日志的高性能数据同步系统。Canal广泛用于阿里巴巴集团(包括www.taobao.com),以提供可靠的低延迟增量数据管道,github地址: github.com/alibaba/can…
Canal Server能够解析MySQL binlog并订阅数据更改,而Canal Client可以实现将更改广播到任何地方,例如数据库和Apache Kafka。
它具有以下功能:
- 支持所有平台。
- 支持由Prometheus提供支持的细粒度系统监控。
- 支持通过不同方式解析和订阅MySQL binlog,例如通过GTID。
- 支持高性能,实时数据同步。(详见Performance)
- Canal Server和Canal Client都支持HA / Scalability,由Apache ZooKeeper提供支持
- Docker支持。
缺点:
不支持全量更新,只支持增量更新。
完整wiki地址: github.com/alibaba/can…
1.2 运作原理
原理很简单:
- Canal模拟MySQL的slave的交互协议,伪装成mysql slave,并将转发协议发送到MySQL Master服务器。
- MySQL Master接收到转储请求并开始将二进制日志推送到slave(即canal)。
- Canal将二进制日志对象解析为自己的数据类型(原始字节流)
如图所示:
1.3 同步es
在同步数据到es的时候需要使用适配器:canal adapter。目前最新版本1.1.3,下载地址: github.com/alibaba/can… 。
目前es貌似支持6.x版本,不支持7.x版本!!!
2.准备工作
2.1 es和jdk
安装es可以参考: www.dalaoyang.cn/article/78
安装jdk可以参考: www.dalaoyang.cn/article/16
2.2 安装canal server
下载canal.deployer-1.1.3.tar.gz
wget https://github.com/alibaba/canal/releases/download/canal-1.1.3/canal.deployer-1.1.3.tar.gz 复制代码
解压文件
tar -zxvf canal.deployer-1.1.3.tar.gz 复制代码
进入解压后的文件夹
cd canal.deployer-1.1.3 复制代码
修改conf/example/instance.properties文件,主要注意以下几处:
- canal.instance.master.address:数据库地址,例如127.0.0.1:3306
- canal.instance.dbUsername:数据库用户
- canal.instance.dbPassword:数据库密码
完整内容如下:
################################################# ## mysql serverId , v1.0.26+ will autoGen # canal.instance.mysql.slaveId=0 # enable gtid use true/false canal.instance.gtidon=false # position info canal.instance.master.address=127.0.0.1:3306 canal.instance.master.journal.name= canal.instance.master.position= canal.instance.master.timestamp= canal.instance.master.gtid= # rds oss binlog canal.instance.rds.accesskey= canal.instance.rds.secretkey= canal.instance.rds.instanceId= # table meta tsdb info canal.instance.tsdb.enable=true #canal.instance.tsdb.url= #canal.instance.tsdb.dbUsername= #canal.instance.tsdb.dbPassword= #canal.instance.standby.address = #canal.instance.standby.journal.name = #canal.instance.standby.position = #canal.instance.standby.timestamp = #canal.instance.standby.gtid= # username/password canal.instance.dbUsername=root canal.instance.dbPassword=12345678 canal.instance.connectionCharset = UTF-8 # enable druid Decrypt database password canal.instance.enableDruid=false #canal.instance.pwdPublicKey=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBALK4BUxdDltRRE5/zXpVEVPUgunvscYFtEip3pmLlhrWpacX7y7GCMo2/JM6LeHmiiNdH1FWgGCpUfircSwlWKUCAwEAAQ== # table regex canal.instance.filter.regex=.*\\..* # table black regex canal.instance.filter.black.regex= # mq config #canal.mq.topic=example # dynamic topic route by schema or table regex #canal.mq.dynamicTopic=mytest1.user,mytest2\\..*,.*\\..* #canal.mq.partition=0 # hash partition config #canal.mq.partitionsNum=3 #canal.mq.partitionHash=test.table:id^name,.*\\..* ################################################# 复制代码
回到canal.deployer-1.1.3目录下,启动canal:
sh bin/startup.sh 复制代码
查看日志:
vi logs/canal/canal.log 复制代码
查看具体instance日志:
vi logs/example/example.log 复制代码
关闭命令
sh bin/stop.sh 复制代码
2.3 安装canal-adapter
下载canal.adapter-1.1.3.tar.gz
wget https://github.com/alibaba/canal/releases/download/canal-1.1.3/canal.adapter-1.1.3.tar.gz 复制代码
解压
tar -zxvf canal.adapter-1.1.3.tar.gz 复制代码
进入解压后的文件夹
cd canal.adapter-1.1.3 复制代码
修改conf/application.yml文件,主要注意如下内容,由于是yml文件,注意我这里说明的属性名称:
- server.port:canal-adapter端口号
- canal.conf.canalServerHost:canal-server地址和ip
- canal.conf.srcDataSources.defaultDS.url:数据库地址
- canal.conf.srcDataSources.defaultDS.username:数据库用户名
- canal.conf.srcDataSources.defaultDS.password:数据库密码
- canal.conf.canalAdapters.groups.outerAdapters.hosts:es主机地址,tcp端口
完整内容如下:
server: port: 8081 spring: jackson: date-format: yyyy-MM-dd HH:mm:ss time-zone: GMT+8 default-property-inclusion: non_null canal.conf: mode: tcp canalServerHost: 127.0.0.1:11111 batchSize: 500 syncBatchSize: 1000 retries: 0 timeout: accessKey: secretKey: srcDataSources: defaultDS: url: jdbc:mysql://127.0.0.1:3306/test?useUnicode=true username: root password: 12345678 canalAdapters: - instance: example groups: - groupId: g1 outerAdapters: - name: es hosts: 127.0.0.1:9300 properties: cluster.name: elasticsearch 复制代码
另外需要配置conf/es/*.yml文件,adapter将会自动加载conf / es下的所有.yml结尾的配置文件。在介绍配置前,需要先介绍一下本案例使用的表结构,如下:
CREATE TABLE `test` ( `id` int(11) NOT NULL, `name` varchar(200) NOT NULL, `address` varchar(1000) DEFAULT NULL, PRIMARY KEY (`id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4; 复制代码
需要手动去es中创建索引,比如这里使用es-head创建,如下图:
test索引结构如下:
{ "mappings":{ "_doc":{ "properties":{ "name":{ "type":"text" }, "address":{ "type":"text" } } } } } 复制代码
接下来创建test.yml(文件名随意),内容很好理解_index为索引名称,sql为对应语句,内容如下:
dataSourceKey: defaultDS destination: example groupId: esMapping: _index: test _type: _doc _id: _id upsert: true sql: "select a.id as _id,a.name,a.address from test a" commitBatch: 3000 复制代码
配置完成后,回到canal-adapter根目录,执行命令启动
bin/startup.sh 复制代码
查看日志
vi logs/adapter/adapter.log 复制代码
关闭canal-adapter命令
bin/stop.sh 复制代码
3.测试
都启动成功后,先查看一下es-head,如图,现在是没有任何数据的。
接下来,我们在数据库中插入一条数据进行测试,语句如下:
INSERT INTO `test`.`test`(`id`, `name`, `address`) VALUES (7, '北京', '北京市朝阳区'); 复制代码
然后在看一下es-head,如下
接下来看一下日志,如下:
2019-06-22 17:54:15.385 [pool-2-thread-1] DEBUG c.a.otter.canal.client.adapter.es.service.ESSyncService - DML: {"data":[{"id":7,"name":"北京","address":"北京市朝阳区"}],"database":"test","destination":"example","es":1561197255000,"groupId":null,"isDdl":false,"old":null,"pkNames":["id"],"sql":"","table":"test","ts":1561197255384,"type":"INSERT"} Affected indexes: test 复制代码
小知识点:上面介绍的查看日志的方法可能不是很好用,推荐使用如下语法,比如查看日志最后200行:
tail -200f logs/adapter/adapter.log 复制代码
4.总结
1.全量更新不能实现,但是增删改都是可以的。 2.一定要提前创建好索引。 3.es配置的是tcp端口,比如默认的9300
以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网
猜你喜欢:- *根据时间戳,增量同步数据的解决办法
- 基于 Canal 的实时数据增量同步架构实现
- 原 荐 使用KETTLE从mysql同步增量数据到oracle
- 增量同步mysql 数据到elasticsearch canal adapter方式(binlog)实现(从零到一超级详细全面)
- 细说HTTP增量更新
- 增量更新
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。