内容简介:做运维的小伙伴应该都知道nginx日志的重要性,一般出现访问问题,我们可能第一时间要去看日志去分析问题,但除了协助我们排查问题外,如果对nginx日志进一步分析可以得到更有用的数据,例如可以监控某站点的http状态码、PV,UV情况,request_time和response_time等,如果辅助其它工具进一步分析可以预防一些安全问题,比如同一个IP的访问某页面超出了限制,我们可以设置策略发现后可以自动拒绝有危险的IP访问,这也是我们经常说的防刷功能,今天要跟大家分享的是如何将nginx日志存储到es中,
做运维的小伙伴应该都知道nginx日志的重要性,一般出现访问问题,我们可能第一时间要去看日志去分析问题,但除了协助我们排查问题外,如果对nginx日志进一步分析可以得到更有用的数据,例如可以监控某站点的http状态码、PV,UV情况,request_time和response_time等,如果辅助其它 工具 进一步分析可以预防一些安全问题,比如同一个IP的访问某页面超出了限制,我们可以设置策略发现后可以自动拒绝有危险的IP访问,这也是我们经常说的防刷功能,今天要跟大家分享的是如何将nginx日志存储到es中,存储的目的当时是为了更好的分析它,大家都知道,如果要存储到es中,数据必须是json格式的,但如果当前日志不是json格式的就需要进行一个转换,要把每行数据转成json格式的这里有几个问题要明确:
首先,access.log是不断增长的,这就需要有个程序能实时读取新的日志进行转换,这是第一个要解决的问题。
其次,对转换完的数据我们能不能直接存储到es中,答案是不建议,因为如果转一次存一次,在程序中我们势必要判断每次存储是否成功,在高并发的网站中日志的产生量是非常巨大的,这样一来会影响整体的效率,那怎么办?在这种情况下,最好的解决办法就是用消息队列的方式去解决,这里我们选择kafka,这个后续会说。
根据上面的内容大家可以看到其实在分析nginx日志的过程就是一个数据流的处理,这也是我们标题所说的data-pipline,pipline故名意思就是管道的意思,加一个data就是一个数据管道,我们就是要建立一个这样的管道将数据源源不断的产生出来,就像水管的水一样。
因为要解决的二个问题,今天我们先解决第一个,先实现access.log实时读取和转换成json格式,先看我们的access.log的文件内容:
111.20.21.22 - - [19/Dec/2016:12:00:30 +0800] "GET / HTTP/1.1" 502 602 "-" "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Geco) Chrome/54.0.2840.99 Safari/537.36"
划重点了,请看代码:
import time
import datetime
import json
import socket
def parse_log_line(line):
strptime = datetime.datetime.strptime
hostname = socket.gethostname()
time = line.split(' ')[3][1::]
entry = {}
entry['datetime'] = strptime(
time, "%d/%b/%Y:%H:%M:%S").strftime("%Y-%m-%d %H:%M")
entry['source'] = "{}".format(hostname)
entry['type'] = "www_access"
entry['log'] = "'{}'".format(line.rstrip())
return entry
def show_entry(entry):
temp = ",".join([
entry['datetime'],
entry['source'],
entry['type'],
entry['log']
])
log_entry = {'log': entry}
temp = json.dumps(log_entry)
print("{}".format(temp))
return temp
def follow(syslog_file):
syslog_file.seek(0, 2)
while True:
line = syslog_file.readline()
if not line:
time.sleep(0.1)
continue
else:
entry = parse_log_line(line)
if not entry:
continue
json_entry = show_entry(entry)
print(json_entry)
f = open("access.log", "rt")
follow(f)
运行结果:
{
"log": {
"datetime": "2016-12-19 12:00",
"source": "iZ258ml0cx5Z",
"type": "www_access",
"log": "'111.20.21.22 - - [19/Dec/2016:12:00:30 +0800] \"GET / HTTP/1.1\" 502 602 \"-\" \"Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Geco) Chrome/54.0.2840.99 Safari/537.36\"'"
}
}
脚本整体逻辑是先将行日志转换成一个字典(parse_log_line()函数),然后将字典转成json格式(show_entry()函数),最后follow函数是开始从尾部读取日志文件注意seek(0,2)参数是2从文件底部读取。
写到这我们第一个问题就算是解决了,接下来的工作就是如果把数据传给kafka了,因为kafka本身就是一块比较大的内容限于篇幅我们下次继续,多谢各位观看,如果觉得还不错,还请帮转发。
以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网
猜你喜欢:- 树形结构数据存储方案(一):邻接列表模式
- Repository存储库模式 – Abhishek Chaudhary
- 存储过程的in、out、inout模式
- 大数据分布式存储的部署模式:分离式or超融合
- 用Data-pipeline模式将nginx日志存储到elasticsearch中(续)
- 分离式or超融合,分布式存储建设时的两种部署模式
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
Java 8函数式编程
[英] Richard Warburton / 王群锋 / 人民邮电出版社 / 2015-3 / 39.00元
通过每一章的练习快速掌握Java 8中的Lambda表达式 分析流、高级集合和其他Java 8类库的改进 利用多核CPU提高数据并发的性能 将现有代码库和库代码Lambda化 学习Lambda表达式单元测试和调试的实践解决方案 用Lambda表达式实现面向对象编程的SOLID原则 编写能有效执行消息传送和非阻塞I/O的并发应用一起来看看 《Java 8函数式编程》 这本书的介绍吧!