内容简介:在之前的文章中我们提到过如果通过Hangout将Kafka中的数据接入ClickHouse中,数据的QPS大或者单条数据长度特别大这两种场景下很容易达到
前言
Base on Teleport 2.2.0
在之前的文章中我们提到过如果通过Hangout将Kafka中的数据接入ClickHouse中, 相关文章 。Hangout固然是一个很好的工具,能够快速的实现将Kafka里的数据经过处理写入ClickHouse。但是在以下场景下Hangout无法很好满足我们的需求
数据的QPS大或者单条数据长度特别大
这两种场景下很容易达到 hangout-output-clickhouse 插件的瓶颈,因为说到底,hangout-output-clickhosue插件底层使用的是 clickhouse-jdbc ,而clickhouse-jdbc是通过http请求发送数据,hangout-output-clickhouse最先达到的瓶颈其实就是http接口发送数据的瓶颈。
在这种背景下,我们尝试通过TCP协议来完成数据的写入。于是我们通过golang完成了 Teleport ,通过Teleport将数据从Kafka中导入ClickHouse
Teleport
Teleport是由golang编写的专门用于从Kafka中拉取数据进行处理最后写入ClickHouse中的ETL工具。其中写入ClickHouse的部分是通过 clickhouse 实现的。
接下来还是以Nginx日志为例说明如何使用Teleport。
Prerequisites
- 下载Teleport并给予权限
wget https://github.com/RickyHuo/teleport/releases/download/v2.2.0/teleport chomd 777 teleport
- 安装Kafka依赖(librdkafka)
librdkafka.x86_64 0.11.4-1.el7
yum install librdkafka
Configuration Example: Nginx Log
Log Sample
001.cms.msina..sinanode.com [27/Dec/2017:16:01:03 +0800]
- ”GET /n/front/w636h3606893220.jpg/w720q75apl.webp HTTP/1.1”
”SinaNews/201706071542.1 CFNetwork/758.1.6 Darwin/15.0.0” 200
[127.0.0.1] -
”-“ 0.021
10640 -
127.0.0.1 l.sinaimg.cn
-
Kafka
如下所示, 指定kafka数据源
kafka: kafka.bootstrap.servers: "localhost:9092" kafka.group.id: "teleport_nginx_sample" kafka.auto.offset.reset: "earliest" topic: "teleport_nginx"
Input
指定输入数据处理方式,目前支持JSON和Grok
input: format: "line" match: "%{NOTSPACE:_hostname}`\[%{HTTPDATE:timestamp}\]`%{NOTSPACE:upstream}`\"%{NOTSPACE:_method}\s%{NOTSPACE:_uri}\s%{NOTSPACE:httpversion}\"`%{QS:_ua}`%{NUMBER:_http_code}`\[%{IP:_remote_addr}\]`%{NOTSPACE:unknow1}`%{QS:_reference}`%{NUMBER:_request_time}`%{NUMBER:_data_size}`%{NOTSPACE:unknow3}`%{IP:_http_x_forwarded_for}`%{NOTSPACE:_domain}`%{DATA:unknow4}$"
Filter
在Filter部分,这里有一系列转化的步骤,包括时间转换、类型转换等
filter: - date: source_field: "timestamp" format: "02/Jan/2006:15:04:05 -0700" - add: idc_ip: "{{Split ._hostname \".\" |Slice 0 2 | Join \".\"}}"
Output
最后我们将处理好的结构化数据写入ClickHouse
output: - clickhouse: table: "cms_msg" host: "localhost:9000" clickhouse.read_timeout: 10 clickhouse.write_timeout: 20 clickhouse.debug: "false" clickhouse.compress: "true" clickhouse.database: "cms" fields: ['date', 'datetime','hour', '_hostname', '_domain', '_data_size', '_uri', '_request_time', '_ua', '_http_code', '_remote_addr', '_method', '_reference', '_url', 'idc_ip'] bulk_size: 30000
ClickHouse Schema
当然, ClickHouse存储这些数据的前提是我们已经建立好了这些数据表。具体建表操作如下:
CREATE TABLE cms.cms_msg ( date Date, datetime DateTime, hour Int8, _uri String, _url String, _request_time Float32, _http_code String, _hostname String, _domain String, _http_x_forwarded_for String, _remote_addr String, _reference String, _data_size Int32, _method String, _rs String, _rs_time Float32, _ua String, idc_ip String ) ENGINE = MergeTree(date, (hour, date), 8192) CREATE TABLE cms.cms_msg_all ( date Date, datetime DateTime, hour Int8, _uri String, _url String, _request_time Float32, _http_code String, _hostname String, _domain String, _http_x_forwarded_for String, _remote_addr String, _reference String, _data_size Int32, _method String, _ua String, idc_ip String ) ENGINE = Distributed(bip_ck_cluster, 'cms', 'cms_msg', rand())
Conclusion
在这篇文章中,我们介绍了如何使用Teleport将Nginx日志文件写入ClickHouse中,整个配置与Hangout一样灵活,入手成本不高。目前Teleport支持的插件较少,后期会逐步完善,有任何问题都可以在Teleport中提 ISSUE 。
以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网
猜你喜欢:本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
Squid: The Definitive Guide
Duane Wessels / O'Reilly Media / 2004 / $44.95 US, $65.95 CA, £31.95 UK
Squid is the most popular Web caching software in use today, and it works on a variety of platforms including Linux, FreeBSD, and Windows. Squid improves network performance by reducing the amount of......一起来看看 《Squid: The Definitive Guide》 这本书的介绍吧!