使用canal增量同步mysql数据库信息到ElasticSearch

栏目: 数据库 · 发布时间: 6年前

内容简介:本文介绍如何使用canal增量同步mysql数据库信息到ElasticSearch。(注意:是增量!!!)Canal是一个基于MySQL二进制日志的高性能数据同步系统。Canal广泛用于阿里巴巴集团(包括www.taobao.com),以提供可靠的低延迟增量数据管道,github地址:Canal Server能够解析MySQL binlog并订阅数据更改,而Canal Client可以实现将更改广播到任何地方,例如数据库和Apache Kafka。

本文介绍如何使用canal增量同步 mysql 数据库信息到ElasticSearch。(注意:是增量!!!)

1.简介

1.1 canal介绍

Canal是一个基于MySQL二进制日志的高性能数据同步系统。Canal广泛用于阿里巴巴集团(包括www.taobao.com),以提供可靠的低延迟增量数据管道,github地址: github.com/alibaba/can…

Canal Server能够解析MySQL binlog并订阅数据更改,而Canal Client可以实现将更改广播到任何地方,例如数据库和Apache Kafka。

它具有以下功能:

  1. 支持所有平台。
  2. 支持由Prometheus提供支持的细粒度系统监控。
  3. 支持通过不同方式解析和订阅MySQL binlog,例如通过GTID。
  4. 支持高性能,实时数据同步。(详见Performance)
  5. Canal Server和Canal Client都支持HA / Scalability,由Apache ZooKeeper提供支持
  6. Docker支持。

缺点:

不支持全量更新,只支持增量更新。

完整wiki地址: github.com/alibaba/can…

1.2 运作原理

原理很简单:

  1. Canal模拟MySQL的slave的交互协议,伪装成mysql slave,并将转发协议发送到MySQL Master服务器。
  2. MySQL Master接收到转储请求并开始将二进制日志推送到slave(即canal)。
  3. Canal将二进制日志对象解析为自己的数据类型(原始字节流)

如图所示:

使用canal增量同步mysql数据库信息到ElasticSearch

1.3 同步es

在同步数据到es的时候需要使用适配器:canal adapter。目前最新版本1.1.3,下载地址: github.com/alibaba/can…

目前es貌似支持6.x版本,不支持7.x版本!!!

2.准备工作

2.1 es和jdk

安装es可以参考: www.dalaoyang.cn/article/78

安装jdk可以参考: www.dalaoyang.cn/article/16

2.2 安装canal server

下载canal.deployer-1.1.3.tar.gz

wget https://github.com/alibaba/canal/releases/download/canal-1.1.3/canal.deployer-1.1.3.tar.gz
复制代码

解压文件

tar -zxvf canal.deployer-1.1.3.tar.gz
复制代码

进入解压后的文件夹

cd canal.deployer-1.1.3
复制代码

修改conf/example/instance.properties文件,主要注意以下几处:

  • canal.instance.master.address:数据库地址,例如127.0.0.1:3306
  • canal.instance.dbUsername:数据库用户
  • canal.instance.dbPassword:数据库密码

完整内容如下:

#################################################
## mysql serverId , v1.0.26+ will autoGen
# canal.instance.mysql.slaveId=0

# enable gtid use true/false
canal.instance.gtidon=false

# position info
canal.instance.master.address=127.0.0.1:3306
canal.instance.master.journal.name=
canal.instance.master.position=
canal.instance.master.timestamp=
canal.instance.master.gtid=

# rds oss binlog
canal.instance.rds.accesskey=
canal.instance.rds.secretkey=
canal.instance.rds.instanceId=

# table meta tsdb info
canal.instance.tsdb.enable=true
#canal.instance.tsdb.url=
#canal.instance.tsdb.dbUsername=
#canal.instance.tsdb.dbPassword=

#canal.instance.standby.address =
#canal.instance.standby.journal.name =
#canal.instance.standby.position =
#canal.instance.standby.timestamp =
#canal.instance.standby.gtid=

# username/password
canal.instance.dbUsername=root
canal.instance.dbPassword=12345678
canal.instance.connectionCharset = UTF-8
# enable druid Decrypt database password
canal.instance.enableDruid=false
#canal.instance.pwdPublicKey=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBALK4BUxdDltRRE5/zXpVEVPUgunvscYFtEip3pmLlhrWpacX7y7GCMo2/JM6LeHmiiNdH1FWgGCpUfircSwlWKUCAwEAAQ==

# table regex
canal.instance.filter.regex=.*\\..*
# table black regex
canal.instance.filter.black.regex=

# mq config
#canal.mq.topic=example
# dynamic topic route by schema or table regex
#canal.mq.dynamicTopic=mytest1.user,mytest2\\..*,.*\\..*
#canal.mq.partition=0
# hash partition config
#canal.mq.partitionsNum=3
#canal.mq.partitionHash=test.table:id^name,.*\\..*
#################################################

复制代码

回到canal.deployer-1.1.3目录下,启动canal:

sh bin/startup.sh
复制代码

查看日志:

vi logs/canal/canal.log
复制代码

查看具体instance日志:

vi logs/example/example.log
复制代码

关闭命令

sh bin/stop.sh
复制代码

2.3 安装canal-adapter

下载canal.adapter-1.1.3.tar.gz

wget https://github.com/alibaba/canal/releases/download/canal-1.1.3/canal.adapter-1.1.3.tar.gz
复制代码

解压

tar -zxvf canal.adapter-1.1.3.tar.gz
复制代码

进入解压后的文件夹

cd canal.adapter-1.1.3
复制代码

修改conf/application.yml文件,主要注意如下内容,由于是yml文件,注意我这里说明的属性名称:

  • server.port:canal-adapter端口号
  • canal.conf.canalServerHost:canal-server地址和ip
  • canal.conf.srcDataSources.defaultDS.url:数据库地址
  • canal.conf.srcDataSources.defaultDS.username:数据库用户名
  • canal.conf.srcDataSources.defaultDS.password:数据库密码
  • canal.conf.canalAdapters.groups.outerAdapters.hosts:es主机地址,tcp端口

完整内容如下:

server:
  port: 8081
spring:
  jackson:
    date-format: yyyy-MM-dd HH:mm:ss
    time-zone: GMT+8
    default-property-inclusion: non_null


canal.conf:
  mode: tcp
  canalServerHost: 127.0.0.1:11111
  batchSize: 500
  syncBatchSize: 1000
  retries: 0
  timeout:
  accessKey:
  secretKey:
  srcDataSources:
    defaultDS:
      url: jdbc:mysql://127.0.0.1:3306/test?useUnicode=true
      username: root
      password: 12345678
  canalAdapters:
  - instance: example
    groups:
    - groupId: g1
      outerAdapters:
      - name: es
        hosts: 127.0.0.1:9300
        properties:
         cluster.name: elasticsearch
复制代码

另外需要配置conf/es/*.yml文件,adapter将会自动加载conf / es下的所有.yml结尾的配置文件。在介绍配置前,需要先介绍一下本案例使用的表结构,如下:

CREATE TABLE `test` (
  `id` int(11) NOT NULL,
  `name` varchar(200) NOT NULL,
  `address` varchar(1000) DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
复制代码

需要手动去es中创建索引,比如这里使用es-head创建,如下图:

使用canal增量同步mysql数据库信息到ElasticSearch

test索引结构如下:

{
    "mappings":{
        "_doc":{
            "properties":{
                "name":{
                    "type":"text"
                },
                "address":{
                    "type":"text"
                }
            }
        }
    }
}
复制代码

接下来创建test.yml(文件名随意),内容很好理解_index为索引名称,sql为对应语句,内容如下:

dataSourceKey: defaultDS
destination: example
groupId:
esMapping:
  _index: test
  _type: _doc
  _id: _id
  upsert: true
  sql: "select a.id as _id,a.name,a.address from test a"
  commitBatch: 3000
复制代码

配置完成后,回到canal-adapter根目录,执行命令启动

bin/startup.sh
复制代码

查看日志

vi logs/adapter/adapter.log
复制代码

关闭canal-adapter命令

bin/stop.sh
复制代码

3.测试

都启动成功后,先查看一下es-head,如图,现在是没有任何数据的。

使用canal增量同步mysql数据库信息到ElasticSearch

接下来,我们在数据库中插入一条数据进行测试,语句如下:

INSERT INTO `test`.`test`(`id`, `name`, `address`) VALUES (7, '北京', '北京市朝阳区');
复制代码

然后在看一下es-head,如下

使用canal增量同步mysql数据库信息到ElasticSearch

接下来看一下日志,如下:

2019-06-22 17:54:15.385 [pool-2-thread-1] DEBUG c.a.otter.canal.client.adapter.es.service.ESSyncService - DML: {"data":[{"id":7,"name":"北京","address":"北京市朝阳区"}],"database":"test","destination":"example","es":1561197255000,"groupId":null,"isDdl":false,"old":null,"pkNames":["id"],"sql":"","table":"test","ts":1561197255384,"type":"INSERT"} 
Affected indexes: test 
复制代码

小知识点:上面介绍的查看日志的方法可能不是很好用,推荐使用如下语法,比如查看日志最后200行:

tail -200f logs/adapter/adapter.log
复制代码

4.总结

1.全量更新不能实现,但是增删改都是可以的。 2.一定要提前创建好索引。 3.es配置的是tcp端口,比如默认的9300


以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

ANSI Common Lisp

ANSI Common Lisp

Paul Graham / Prentice Hall / 1995-11-12 / USD 116.40

For use as a core text supplement in any course covering common LISP such as Artificial Intelligence or Concepts of Programming Languages. Teaching students new and more powerful ways of thinking abo......一起来看看 《ANSI Common Lisp》 这本书的介绍吧!

RGB转16进制工具
RGB转16进制工具

RGB HEX 互转工具

Base64 编码/解码
Base64 编码/解码

Base64 编码/解码

RGB HSV 转换
RGB HSV 转换

RGB HSV 互转工具