内容简介:(1)安装mysql(2)开启mysql binlog row模式,并启动mysql(3)安装jdk
下面是我在单机上面从零到一实现增量同步 mysql 数据到elasticsearch canal adapter方式(binlog)实现
实现步骤
(1)安装mysql
(2)开启mysql binlog row模式,并启动mysql
(3)安装jdk
(4)安装Elasticsearch并启动(我安装的是6.4.0,主要目前canal adapter1.1.3还不支持7.0.0的版本)
(5)安装kibana并启动
(6)安装并启动canal-server
(7)安装并启动canal-adapter
我使用的操作系统是centos7
1、通过yum安装mysql
(1)去官网查看最新的安装包
https://dev.mysql.com/downloa...
(2)下载mysql源安装包
wget http://dev.mysql.com/get/mysql57-community-release-el7-11.noarch.rpm
目前版本已经很高了,但是我使用的是57
安装mysql源
yum -y install mysql57-community-release-el7-11.noarch.rpm
查看效果:
yum repolist enabled | grep mysql.*
(3)安装mysql服务器
yum install mysql-community-server
(4)启动mysql服务
systemctl start mysqld.service
查看mysql服务的状态:
systemctl status mysqld.service
(5)查看初始化密码
grep "password" /var/log/mysqld.log
登录:
mysql -u root -p
(6)数据库授权(切记这一步一定要做,我为了方便后面使用的都是root账号,没有说新建一个canal账号)
数据库没有授权,只支持localhost本地访问
GRANT ALL PRIVILEGES ON *.* TO 'root'@'%' IDENTIFIED BY '123456' WITH GRANT OPTION; FLUSH PRIVILEGES;
用户名:root
密码:123456
指向ip:%代表所有Ip,此处也可以输入Ip来指定Ip
2、开启mysql binlog模式
找到my.cnf文件,我本地目录是/etc/my.cnf
添加即可
log-bin=mysql-bin binlog-format=ROW server-id=1
然后重启mysql,检查一下binlog是否正确启动
show variables like '%log_bin%';
3、安装jdk
我装的是jdk版本是1.8.0_202
下载网址:
https://www.oracle.com/techne...
(1)将jdk-8u202-linux-x64.tar.gz放入/usr/local目录
(2)解压缩等一系列处理
tar -xzvf jdk-8u202-linux-x64.tar.gz mv jdk-8u202-linux-x64 jdk rm -rf jdk-8u202-linux-x64.tar.gz
命令执行完成之后在/usr/local目录下就会生成一个jdk目录
(3)配置环境变量
vi /etc/profile 增加: export JAVA_HOME=/usr/local/jdk export CLASSPATH=.:$JAVA_HOME/lib/tools.jar:$JAVA_HOME/lib/dt.jar export PATH=$JAVA_HOME/bin:$PATH
(4)检查JDK是否安装成功
java -version
4、安装并启动Elasticsearch
官网地址: https://www.elastic.co/downlo...
执行如下命令,对于安装包也可以手动下载之后上传
cd /usr/local wget https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-6.4.0-linux-x86_64.tar.gz tar -xzvf elasticsearch-6.4.0-linux-x86_64.tar.gz mv elasticsearch-6.4.0-linux-x86_64 elasticsearch rm -rf elasticsearch-6.4.0-linux-x86_64.tar.gz
命令执行完成之后在/usr/local目录下就会生成一个elasticsearch目录
由于elasticsearch不能使用root账户启动。
下面执行如下命令:
useradd elasticsearch chown -R elasticsearch /usr/local/elasticsearch su elasticsearch
使用elasticsearch用户来启动ES
(1)修改 linux 参数
vim /etc/security/limits.conf
增加:
* soft nofile 65536 * hard nofile 65536 * soft nproc 2048 * hard nproc 4096 #锁住swapping因此需要在这个配置文件下再增加两行代码 elasticsearch soft memlock unlimited elasticsearch hard memlock unlimited
vim /etc/sysctl.conf
增加:
vm.max_map_count=655360 fs.file-max=655360
注意:之后需要执行一句命令sysctl -p使系统配置生效(使用root用户)
(2)修改ES配置文件(我的IP是192.168.254.131,操作时换成自己的IP即可)
vim /usr/local/elasticsearch/config/elasticsearch.yml
# ======================== Elasticsearch Configuration ========================= # # NOTE: Elasticsearch comes with reasonable defaults for most settings. # Before you set out to tweak and tune the configuration, make sure you # understand what are you trying to accomplish and the consequences. # # The primary way of configuring a node is via this file. This template lists # the most important settings you may want to configure for a production cluster. # # Please consult the documentation for further information on configuration options: # https://www.elastic.co/guide/en/elasticsearch/reference/index.html # # ---------------------------------- Cluster ----------------------------------- # # Use a descriptive name for your cluster: # cluster.name: my-application # # ------------------------------------ Node ------------------------------------ # # Use a descriptive name for the node: # node.name: node-1 # # Add custom attributes to the node: # #node.attr.rack: r1 # # ----------------------------------- Paths ------------------------------------ # # Path to directory where to store the data (separate multiple locations by comma): # path.data: /usr/local/elasticsearch-6.4.0/data # # Path to log files: # path.logs: /usr/local/elasticsearch-6.4.0/logs # # ----------------------------------- Memory ----------------------------------- # # Lock the memory on startup: # #bootstrap.memory_lock: true # # Make sure that the heap size is set to about half the memory available # on the system and that the owner of the process is allowed to use this # limit. # # Elasticsearch performs poorly when the system is swapping the memory. # # ---------------------------------- Network ----------------------------------- # # Set the bind address to a specific IP (IPv4 or IPv6): # network.host: 192.168.254.131 # # Set a custom port for HTTP: # http.port: 9200 # # For more information, consult the network module documentation. # # --------------------------------- Discovery ---------------------------------- # # Pass an initial list of hosts to perform discovery when new node is started: # The default list of hosts is ["127.0.0.1", "[::1]"] # discovery.zen.ping.unicast.hosts: ["192.168.254.131"] # # Prevent the "split brain" by configuring the majority of nodes (total number of master-eligible nodes / 2 + 1): # #discovery.zen.minimum_master_nodes: # # For more information, consult the zen discovery module documentation. # # ---------------------------------- Gateway ----------------------------------- # # Block initial recovery after a full cluster restart until N nodes are started: # #gateway.recover_after_nodes: 3 # # For more information, consult the gateway module documentation. # # ---------------------------------- Various ----------------------------------- # # Require explicit names when deleting indices: # #action.destructive_requires_name: true transport.tcp.port: 9300 transport.tcp.compress: true http.cors.enabled: true http.cors.allow-origin: "*"
(3)启动elasticsearch
cd /usr/local/elasticsearch ./bin/elasticsearch -d
检查是否启动成功:
curl http://192.168.254.131:9200
5、安装并启动kibana
官网地址: https://www.elastic.co/downlo...
执行如下命令,对于安装包也可以手动下载之后上传
cd /usr/local wget https://artifacts.elastic.co/downloads/kibana/kibana-6.4.0-linux-x86_64.tar.gz tar -xzvf kibana-6.4.0-linux-x86_64.tar.gz mv kibana-6.4.0-linux-x86_64 kibana rm -rf kibana-6.4.0-linux-x86_64.tar.gz
命令执行完成之后在/usr/local目录下就会生成一个kibana目录
修改kibana配置文件
vim /usr/local/kibana/config/kibana.yml
# Kibana is served by a back end server. This setting specifies the port to use.
server.port: 5601
# Specifies the address to which the Kibana server will bind. IP addresses and host names are both valid values.
# The default is 'localhost', which usually means remote machines will not be able to connect.
# To allow connections from remote users, set this parameter to a non-loopback address.
server.host: "192.168.254.131"
# Enables you to specify a path to mount Kibana at if you are running behind a proxy.
# Use the `server.rewriteBasePath` setting to tell Kibana if it should remove the basePath
# from requests it receives, and to prevent a deprecation warning at startup.
# This setting cannot end in a slash.
#server.basePath: ""
# Specifies whether Kibana should rewrite requests that are prefixed with
# `server.basePath` or require that they are rewritten by your reverse proxy.
# This setting was effectively always `false` before Kibana 6.3 and will
# default to `true` starting in Kibana 7.0.
#server.rewriteBasePath: false
# The maximum payload size in bytes for incoming server requests.
#server.maxPayloadBytes: 1048576
# The Kibana server's name. This is used for display purposes.
#server.name: "your-hostname"
# The URL of the Elasticsearch instance to use for all your queries.
elasticsearch.url: "http://192.168.254.131:9200"
# When this setting's value is true Kibana uses the hostname specified in the server.host
# setting. When the value of this setting is false, Kibana uses the hostname of the host
# that connects to this Kibana instance.
#elasticsearch.preserveHost: true
# Kibana uses an index in Elasticsearch to store saved searches, visualizations and
# dashboards. Kibana creates a new index if the index doesn't already exist.
kibana.index: ".kibana6"
# The default application to load.
#kibana.defaultAppId: "home"
# If your Elasticsearch is protected with basic authentication, these settings provide
# the username and password that the Kibana server uses to perform maintenance on the Kibana
# index at startup. Your Kibana users still need to authenticate with Elasticsearch, which
# is proxied through the Kibana server.
#elasticsearch.username: "user"
#elasticsearch.password: "pass"
# Enables SSL and paths to the PEM-format SSL certificate and SSL key files, respectively.
# These settings enable SSL for outgoing requests from the Kibana server to the browser.
#server.ssl.enabled: false
#server.ssl.certificate: /path/to/your/server.crt
#server.ssl.key: /path/to/your/server.key
# Optional settings that provide the paths to the PEM-format SSL certificate and key files.
# These files validate that your Elasticsearch backend uses the same key files.
#elasticsearch.ssl.certificate: /path/to/your/client.crt
#elasticsearch.ssl.key: /path/to/your/client.key
# Optional setting that enables you to specify a path to the PEM file for the certificate
# authority for your Elasticsearch instance.
#elasticsearch.ssl.certificateAuthorities: [ "/path/to/your/CA.pem" ]
# To disregard the validity of SSL certificates, change this setting's value to 'none'.
#elasticsearch.ssl.verificationMode: full
# Time in milliseconds to wait for Elasticsearch to respond to pings. Defaults to the value of
# the elasticsearch.requestTimeout setting.
#elasticsearch.pingTimeout: 1500
# Time in milliseconds to wait for responses from the back end or Elasticsearch. This value
# must be a positive integer.
#elasticsearch.requestTimeout: 30000
# List of Kibana client-side headers to send to Elasticsearch. To send *no* client-side
# headers, set this value to [] (an empty list).
#elasticsearch.requestHeadersWhitelist: [ authorization ]
# Header names and values that are sent to Elasticsearch. Any custom headers cannot be overwritten
# by client-side headers, regardless of the elasticsearch.requestHeadersWhitelist configuration.
#elasticsearch.customHeaders: {}
# Time in milliseconds for Elasticsearch to wait for responses from shards. Set to 0 to disable.
#elasticsearch.shardTimeout: 30000
# Time in milliseconds to wait for Elasticsearch at Kibana startup before retrying.
#elasticsearch.startupTimeout: 5000
# Logs queries sent to Elasticsearch. Requires logging.verbose set to true.
#elasticsearch.logQueries: false
# Specifies the path where Kibana creates the process ID file.
#pid.file: /var/run/kibana.pid
# Enables you specify a file where Kibana stores log output.
#logging.dest: stdout
# Set the value of this setting to true to suppress all logging output.
#logging.silent: false
# Set the value of this setting to true to suppress all logging output other than error messages.
#logging.quiet: false
# Set the value of this setting to true to log all events, including system usage information
# and all requests.
#logging.verbose: false
# Set the interval in milliseconds to sample system and process performance
# metrics. Minimum is 100ms. Defaults to 5000.
#ops.interval: 5000
# The default locale. This locale can be used in certain circumstances to substitute any missing
# translations.
#i18n.defaultLocale: "en"
启动kibana
cd /usr/local/kibana nohup ./bin/kibana &
检查是否启动成功
在浏览器中打开 http://192.168.254.131 :5601
6、安装并启动canal-server
详情请查询官网文档:
https://github.com/alibaba/ca...(1)下载canal
直接下载 访问:https://github.com/alibaba/canal/releases ,会列出所有历史的发布版本包 下载方式,比如以1.0.17版本为例子: wget https://github.com/alibaba/canal/releases/download/canal-1.1.3/canal.deployer-1.1.3.tar.gz or 自己编译 git clone git@github.com:alibaba/canal.git cd canal; mvn clean install -Dmaven.test.skip -Denv=release 编译完成后,会在根目录下产生target/canal.deployer-$version.tar.gz
(2)解压缩
mkdir /usr/local/canal tar zxvf canal.deployer-$version.tar.gz -C /usr/local/canal
(3)修改配置
cd /usr/local/canal vim conf/example/instance.properties
################################################# ## mysql serverId , v1.0.26+ will autoGen # canal.instance.mysql.slaveId=0 # enable gtid use true/false canal.instance.gtidon=false # position info canal.instance.master.address=192.168.254.131:3306 canal.instance.master.journal.name= canal.instance.master.position= canal.instance.master.timestamp= canal.instance.master.gtid= # rds oss binlog canal.instance.rds.accesskey= canal.instance.rds.secretkey= canal.instance.rds.instanceId= # table meta tsdb info canal.instance.tsdb.enable=true #canal.instance.tsdb.url=jdbc:mysql://127.0.0.1:3306/canal_tsdb #canal.instance.tsdb.dbUsername=canal #canal.instance.tsdb.dbPassword=canal #canal.instance.standby.address = #canal.instance.standby.journal.name = #canal.instance.standby.position = #canal.instance.standby.timestamp = #canal.instance.standby.gtid= # username/password canal.instance.dbUsername=root canal.instance.dbPassword=123456 canal.instance.connectionCharset = UTF-8 # enable druid Decrypt database password canal.instance.enableDruid=false #canal.instance.pwdPublicKey=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBALK4BUxdDltRRE5/zXpVEVPUgunvscYFtEip3pmLlhrWpacX7y7GCMo2/JM6LeHmiiNdH1FWgGCpUfircSwlWKUCAwEAAQ== # table regex canal.instance.filter.regex=.*\\..* # table black regex canal.instance.filter.black.regex= # mq config canal.mq.topic=example # dynamic topic route by schema or table regex #canal.mq.dynamicTopic=mytest1.user,mytest2\\..*,.*\\..* canal.mq.partition=0 # hash partition config #canal.mq.partitionsNum=3 #canal.mq.partitionHash=test.table:id^name,.*\\..* #################################################
(4)启动canal-server
cd /usr/local/canal ./bin/startup.sh
cat logs/canal/canal.log 2019-05-03 10:58:31.938 [main] INFO com.alibaba.otter.canal.deployer.CanalLauncher - ## set default uncaught exception handler 2019-05-03 10:58:32.106 [main] INFO com.alibaba.otter.canal.deployer.CanalLauncher - ## load canal configurations 2019-05-03 10:58:32.120 [main] INFO c.a.o.c.d.monitor.remote.RemoteConfigLoaderFactory - ## load local canal configurations 2019-05-03 10:58:32.143 [main] INFO com.alibaba.otter.canal.deployer.CanalStater - ## start the canal server. 2019-05-03 10:58:32.277 [main] INFO com.alibaba.otter.canal.deployer.CanalController - ## start the canal server[192.168.254.131:11111] 2019-05-03 10:58:34.235 [main] WARN o.s.beans.GenericTypeAwarePropertyDescriptor - Invalid JavaBean property 'connectionCharset' being accessed! Ambiguous write methods found next to actually used [public void com.alibaba.otter.canal.parse.inbound.mysql.AbstractMysqlEventParser.setConnectionCharset(java.lang.String)]: [public void com.alibaba.otter.canal.parse.inbound.mysql.AbstractMysqlEventParser.setConnectionCharset(java.nio.charset.Charset)] 2019-05-03 10:58:35.470 [main] ERROR com.alibaba.druid.pool.DruidDataSource - testWhileIdle is true, validationQuery not set 2019-05-03 10:58:36.317 [main] WARN c.a.o.canal.parse.inbound.mysql.dbsync.LogEventConvert - --> init table filter : ^.*\..*$ 2019-05-03 10:58:36.317 [main] WARN c.a.o.canal.parse.inbound.mysql.dbsync.LogEventConvert - --> init table black filter : 2019-05-03 10:58:37.106 [main] INFO com.alibaba.otter.canal.deployer.CanalStater - ## the canal server is running now ...... 2019-05-03 10:58:37.239 [destination = example , address = /192.168.254.131:3306 , EventParser] WARN c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - ---> begin to find start position, it will be long time for reset or first position 2019-05-03 10:58:37.241 [destination = example , address = /192.168.254.131:3306 , EventParser] WARN c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - prepare to find start position by switch ::1556597413000 2019-05-03 10:58:39.239 [destination = example , address = /192.168.254.131:3306 , EventParser] WARN c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - ---> find start position successfully, EntryPosition[included=false,journalName=mysql-bin.000004,position=4450,serverId=1,gtid=,timestamp=1556596874000] cost : 1915ms , the next step is binlog dump
7、安装并启动canal-adapter
(1)下载canal-adapter
访问:https://github.com/alibaba/canal/releases ,会列出所有历史的发布版本包 下载方式,比如以1.0.17版本为例子: wget https://github.com/alibaba/canal/releases/download/canal-1.1.3/canal.adapter-1.1.3.tar.gz
(2)解压缩
mkdir /usr/local/canal-adapter tar canal.adapter-1.1.3.tar.gz -C /usr/local/canal-adapter
(3)修改配置
cd /usr/local/canal-adapter vim conf/application.yml
server:
port: 8081
spring:
jackson:
date-format: yyyy-MM-dd HH:mm:ss
time-zone: GMT+8
default-property-inclusion: non_null
canal.conf:
mode: tcp # kafka rocketMQ
canalServerHost: 192.168.254.131:11111
# zookeeperHosts: slave1:2181
# mqServers: 127.0.0.1:9092 #or rocketmq
# flatMessage: true
batchSize: 500
syncBatchSize: 1000
retries: 0
timeout:
accessKey:
secretKey:
srcDataSources:
defaultDS:
url: jdbc:mysql://192.168.254.131:3306/mytest?useUnicode=true
username: root
password: 123456
canalAdapters:
- instance: example # canal instance Name or mq topic name
groups:
- groupId: g1
outerAdapters:
- name: logger
# - name: rdb
# key: mysql1
# properties:
# jdbc.driverClassName: com.mysql.jdbc.Driver
# jdbc.url: jdbc:mysql://127.0.0.1:3306/mytest2?useUnicode=true
# jdbc.username: root
# jdbc.password: 121212
# - name: rdb
# key: oracle1
# properties:
# jdbc.driverClassName: oracle.jdbc.OracleDriver
# jdbc.url: jdbc:oracle:thin:@localhost:49161:XE
# jdbc.username: mytest
# jdbc.password: m121212
# - name: rdb
# key: postgres1
# properties:
# jdbc.driverClassName: org.postgresql.Driver
# jdbc.url: jdbc:postgresql://localhost:5432/postgres
# jdbc.username: postgres
# jdbc.password: 121212
# threads: 1
# commitSize: 3000
# - name: hbase
# properties:
# hbase.zookeeper.quorum: 127.0.0.1
# hbase.zookeeper.property.clientPort: 2181
# zookeeper.znode.parent: /hbase
- name: es
hosts: 192.168.254.131:9300
properties:
cluster.name: my-application
vim conf/es/mytest_user.yml
dataSourceKey: defaultDS
destination: example
groupId: g1
esMapping:
_index: mytest_user
_type: _doc
_id: _id
upsert: true
# pk: id
sql: "select a.id as _id, a.name, a.role_id, a.c_time from user a"
# objFields:
# _labels: array:;
# etlCondition: "where a.c_time>='{0}'"
commitBatch: 3000
(4)先创建mysql表user以及索引mytest_user,否则启动canal-adapter会报错
create database mytest; use mytest; create table user ( `id` int(10) NOT NULL, `name` varchar(100) DEFAULT NULL, `role_id` int(10) NOT NULL, `c_time` timestamp NULL DEFAULT CURRENT_TIMESTAMP, `c_utime` timestamp NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, PRIMARY KEY (`id`) );
PUT /mytest_user
{
"mappings": {
"_doc": {
"properties": {
"name": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword"
}
}
},
"role_id": {
"type": "long"
},
"c_time": {
"type": "date"
}
}
}
}
}
(5)启动canal-adapter
cd /usr/local/canal-adapter ./bin/startup.sh
查看日志:
cat logs/adapter/adapter.log
(6)测试是否增量同步数据成功
没有数据更新前
GET /mytest_user/_search
{
"took": 1,
"timed_out": false,
"_shards": {
"total": 5,
"successful": 5,
"skipped": 0,
"failed": 0
},
"hits": {
"total": 0,
"max_score": null,
"hits": []
}
}
插入一条数据:
insert user(id, name, role_id) values(7, "test", 7);
GET /mytest_user/_doc/7
{
"_index": "mytest_user",
"_type": "_doc",
"_id": "7",
"_version": 1,
"found": true,
"_source": {
"name": "test",
"role_id": 7,
"c_time": "2019-05-04T06:11:31-05:00"
}
}
更新一条数据:
update user set name = 'zhengguo' where id = 7;
GET /mytest_user/_doc/7
{
"_index": "mytest_user",
"_type": "_doc",
"_id": "7",
"_version": 2,
"found": true,
"_source": {
"name": "zhengguo",
"role_id": 7,
"c_time": "2019-05-04T06:11:31-05:00"
}
}
删除一条数据:
delete from user where id = 7;
GET /mytest_user/_doc/7
{
"_index": "mytest_user",
"_type": "_doc",
"_id": "7",
"found": false
}
可以看到操作都成功了。
遇到的一个坑
之后可能canal会优化掉
目前如果使用adapter1.1.3增量同步的话,如果Elasticsearch的版本是7.X.X的,那么在数据增量同步的时候,会报ESSyncService - sync error, es index: mytest_user, DML : Dml{destination='example', database='mytest', table='user', type='INSERT', es=1556597413000, ts=1556597414139, sql='', data=[{id=4, name=junge, role_id=4, c_time=2019-04-30 00:10:13.0, c_utime=2019-04-30 00:10:13.0}], old=null} ERROR c.a.o.canal.adapter.launcher.loader.CanalAdapterWorker - NoNodeAvailableException[None of the configured nodes are available: [{#transport#-1}{lTIHs6ZsTe-PqHs9CToQYQ}{192.168.254.131}{192.168.254.131:9300}]] 无法连接ES的错误。
也就是目前还不支持7版本的增量同步。更换成6.X.X就OK了。
以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网
猜你喜欢:- *根据时间戳,增量同步数据的解决办法
- 基于 Canal 的实时数据增量同步架构实现
- 原 荐 使用KETTLE从mysql同步增量数据到oracle
- 使用canal增量同步mysql数据库信息到ElasticSearch
- 细说HTTP增量更新
- 增量更新
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
Big Java Late Objects
Horstmann, Cay S. / 2012-2 / 896.00元
The introductory programming course is difficult. Many students fail to succeed or have trouble in the course because they don't understand the material and do not practice programming sufficiently. ......一起来看看 《Big Java Late Objects》 这本书的介绍吧!