内容简介:实际业务场景中,会遇到基础数据存在Mysql中,实时写入数据量比较大的情景。迁移至kafka是一种比较好的业务选型方案。而mysql写入kafka的选型方案有:方案一:logstash_output_kafka 插件。
0、题记
实际业务场景中,会遇到基础数据存在 Mysql 中,实时写入数据量比较大的情景。迁移至kafka是一种比较好的业务选型方案。
而mysql写入kafka的选型方案有:
方案一:logstash_output_kafka 插件。
方案二:kafka_connector。
方案三:debezium 插件。
方案四:flume。
方案五:其他类似方案。
其中:debezium和flume是基于 mysql binlog
实现的。
如果需要同步历史全量数据+实时更新数据,建议使用logstash。
1、logstash同步原理
常用的logstash的插件是:logstash_input_jdbc实现关系型数据库到Elasticsearch等的同步。
实际上, 核心logstash的同步原理的掌握 ,有助于大家理解类似的各种库之间的同步。
logstash 核心原理
:输入生成事件,过滤器修改它们,输出将它们发送到其他地方。
logstash核心三部分组成:input、filter、output。
input { } filter { } output { }
1.1 input输入
包含但远不限于:
-
jdbc:关系型数据库:mysql、oracle等。
-
file:从文件系统上的文件读取。
-
syslog:在已知端口514上侦听syslog消息。
-
redis:redis消息。beats:处理 Beats发送的事件。
-
kafka:kafka实时数据流。
1.2 filter过滤器
过滤器是Logstash管道中的中间处理设备。您可以将过滤器与条件组合,以便在事件满足特定条件时对其执行操作。
可以把它比作数据处理的 ETL
环节。
一些有用的过滤包括:
-
grok:解析并构造任意文本。
Grok是目前Logstash中将非结构化日志数据解析为结构化和可查询内容的最佳方式
。有了内置于Logstash的120种模式,您很可能会找到满足您需求的模式! -
mutate:对事件字段执行常规转换。您可以重命名,删除,替换和修改事件中的字段。
-
drop:完全删除事件,例如调试事件。
-
clone:制作事件的副本,可能添加或删除字段。
-
geoip:添加有关IP地址的地理位置的信息。
1.3 output输出
输出是Logstash管道的最后阶段。一些常用的输出包括:
-
elasticsearch:将事件数据发送到Elasticsearch。
-
file:将事件数据写入磁盘上的文件。
-
kafka:将事件写入Kafka。
详细的filter demo参考:http://t.cn/EaAt4zP
2、同步Mysql到kafka配置参考
input { jdbc { jdbc_connection_string => "jdbc:mysql://192.168.1.12:3306/news_base" jdbc_user => "root" jdbc_password => "xxxxxxx" jdbc_driver_library => "/home/logstash-6.4.0/lib/mysql-connector-java-5.1.47.jar" jdbc_driver_class => "com.mysql.jdbc.Driver" #schedule => "* * * * *" statement => "SELECT * from news_info WHERE id > :sql_last_value order by id" use_column_value => true tracking_column => "id" tracking_column_type => "numeric" record_last_run => true last_run_metadata_path => "/home/logstash-6.4.0/sync_data/news_last_run" } } filter { ruby{ code => "event.set('gather_time_unix',event.get('gather_time').to_i*1000)" } ruby{ code => "event.set('publish_time_unix',event.get('publish_time').to_i*1000)" } mutate { remove_field => [ "@version" ] remove_field => [ "@timestamp" ] remove_field => [ "gather_time" ] remove_field => [ "publish_time" ] } } output { kafka { bootstrap_servers => "192.168.1.13:9092" codec => json_lines topic_id => "mytopic" } file { codec => json_lines path => "/tmp/output_a.log" } }
以上内容不复杂,不做细讲。
注意:
Mysql借助logstash同步后,日期类型格式:“2019-04-20 13:55:53”已经被识别为日期格式。
code => "event.set('gather_time_unix',event.get('gather_time').to_i*1000)",
是将Mysql中的时间格式转化为时间戳格式。
3、坑总结
3.1 坑1字段大小写问题
from星友:使用logstash同步mysql数据的,因为在jdbc.conf里面没有添加 lowercase_column_names
=> "false" 这个属性,所以logstash默认把查询结果的列明改为了小写,同步进了es,所以就导致es里面看到的字段名称全是小写。
最后总结:es是支持大写字段名称的,问题出在logstash没用好,需要在同步配置中加上 lowercase_column_names => "false" 。记录下来希望可以帮到更多人。
3.2 同步到ES中的数据会不会重复?
想将关系数据库的数据同步至ES中,如果在集群的多台服务器上同时启动logstash。
解读:实际项目中就是没用随机id 使用指定id作为es的_id ,指定id可以是url的md5.这样相同数据就会走更新覆盖以前数据
3.3 相同配置logstash,升级6.3之后不能同步数据。
解读:高版本基于时间增量有优化。
tracking_column_type => "timestamp"
应该是需要指定标识为时间类型,默认为数字类型numeric
3.4 ETL字段统一在哪处理?
解读:可以logstash同步mysql的时候 sql 查询阶段处理,如: select a_value as avalue***
。
或者filter阶段处理, mutate rename
处理。
mutate { rename => ["shortHostname", "hostname" ] }
或者kafka阶段借助kafka stream处理。
4、小结
-
相关配置和同步都不复杂,复杂点往往在于filter阶段的解析还有logstash性能问题。
-
需要结合实际业务场景做深入的研究和性能分析。
-
有问题,欢迎留言讨论。
推荐阅读:
1、 实战 | canal 实现Mysql到Elasticsearch实时增量同步
2、 干货 | Debezium实现Mysql到Elasticsearch高效实时同步
3、 一张图理清楚关系型数据库与Elasticsearch同步 http://t.cn/EaAceD3
4、新的实现:http://t.cn/EaAt60O
5、mysql2mysql: http://t.cn/EaAtK7r 6、推荐开源实现: http://t.cn/EaAtjqN
以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网
猜你喜欢:- 运维必备!Linux 远程数据同步工具详解
- 五大实例详解,携程 Redis 跨机房双向同步实践
- 五大实例详解,携程 Redis 跨机房双向同步实践
- 五大实例详解,携程 Redis 跨机房双向同步实践
- MySQL 与 Hadoop 数据同步之 Sqoop 详解
- Go语言同步与异步执行多个任务封装详解(Runner和RunnerAsync)
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
UNIX编程艺术
Eric S. Raymond / 姜宏、何源、蔡晓俊 / 电子工业出版社 / 2011-1 / 69.00元
本书主要介绍了Unix系统领域中的设计和开发哲学、思想文化体系、原则与经验,由公认的Unix编程大师、开源运动领袖人物之一Eric S. Raymond倾力多年写作而成。包括Unix设计者在内的多位领域专家也为本书贡献了宝贵的内容。本书内容涉及社群文化、软件开发设计与实现,覆盖面广、内容深邃,完全展现了作者极其深厚的经验积累和领域智慧。一起来看看 《UNIX编程艺术》 这本书的介绍吧!