用Data-pipeline模式将nginx日志存储到elasticsearch中(1)

栏目: 服务器 · Nginx · 发布时间: 6年前

内容简介:做运维的小伙伴应该都知道nginx日志的重要性,一般出现访问问题,我们可能第一时间要去看日志去分析问题,但除了协助我们排查问题外,如果对nginx日志进一步分析可以得到更有用的数据,例如可以监控某站点的http状态码、PV,UV情况,request_time和response_time等,如果辅助其它工具进一步分析可以预防一些安全问题,比如同一个IP的访问某页面超出了限制,我们可以设置策略发现后可以自动拒绝有危险的IP访问,这也是我们经常说的防刷功能,今天要跟大家分享的是如何将nginx日志存储到es中,

做运维的小伙伴应该都知道nginx日志的重要性,一般出现访问问题,我们可能第一时间要去看日志去分析问题,但除了协助我们排查问题外,如果对nginx日志进一步分析可以得到更有用的数据,例如可以监控某站点的http状态码、PV,UV情况,request_time和response_time等,如果辅助其它 工具 进一步分析可以预防一些安全问题,比如同一个IP的访问某页面超出了限制,我们可以设置策略发现后可以自动拒绝有危险的IP访问,这也是我们经常说的防刷功能,今天要跟大家分享的是如何将nginx日志存储到es中,存储的目的当时是为了更好的分析它,大家都知道,如果要存储到es中,数据必须是json格式的,但如果当前日志不是json格式的就需要进行一个转换,要把每行数据转成json格式的这里有几个问题要明确:

首先,access.log是不断增长的,这就需要有个程序能实时读取新的日志进行转换,这是第一个要解决的问题。

其次,对转换完的数据我们能不能直接存储到es中,答案是不建议,因为如果转一次存一次,在程序中我们势必要判断每次存储是否成功,在高并发的网站中日志的产生量是非常巨大的,这样一来会影响整体的效率,那怎么办?在这种情况下,最好的解决办法就是用消息队列的方式去解决,这里我们选择kafka,这个后续会说。

根据上面的内容大家可以看到其实在分析nginx日志的过程就是一个数据流的处理,这也是我们标题所说的data-pipline,pipline故名意思就是管道的意思,加一个data就是一个数据管道,我们就是要建立一个这样的管道将数据源源不断的产生出来,就像水管的水一样。

因为要解决的二个问题,今天我们先解决第一个,先实现access.log实时读取和转换成json格式,先看我们的access.log的文件内容:

111.20.21.22 - - [19/Dec/2016:12:00:30 +0800] "GET / HTTP/1.1" 502 602 "-" "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Geco) Chrome/54.0.2840.99 Safari/537.36"

划重点了,请看代码:

import time
import datetime
import json
import socket
 
 
def parse_log_line(line):
    strptime = datetime.datetime.strptime
    hostname = socket.gethostname()
    time = line.split(' ')[3][1::]
    entry = {}
    entry['datetime'] = strptime(
        time, "%d/%b/%Y:%H:%M:%S").strftime("%Y-%m-%d %H:%M")
    entry['source'] = "{}".format(hostname)
    entry['type'] = "www_access"
    entry['log'] = "'{}'".format(line.rstrip())
    return entry
 
 
def show_entry(entry):
    temp = ",".join([
        entry['datetime'],
        entry['source'],
        entry['type'],
        entry['log']
    ])
    log_entry = {'log': entry}
    temp = json.dumps(log_entry)
    print("{}".format(temp))
    return temp
 
 
def follow(syslog_file):
    syslog_file.seek(0, 2)
    
    while True:
        line = syslog_file.readline()
        if not line:
            time.sleep(0.1)
            continue
        else:
            entry = parse_log_line(line)
            if not entry:
                continue
            json_entry = show_entry(entry)
            print(json_entry)
            
 
 
f = open("access.log", "rt")
follow(f)

运行结果:

{
	"log": {
		"datetime": "2016-12-19 12:00",
		"source": "iZ258ml0cx5Z",
		"type": "www_access",
		"log": "'111.20.21.22 - - [19/Dec/2016:12:00:30 +0800] \"GET / HTTP/1.1\" 502 602 \"-\" \"Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Geco) Chrome/54.0.2840.99 Safari/537.36\"'"
	}
}

脚本整体逻辑是先将行日志转换成一个字典(parse_log_line()函数),然后将字典转成json格式(show_entry()函数),最后follow函数是开始从尾部读取日志文件注意seek(0,2)参数是2从文件底部读取。

写到这我们第一个问题就算是解决了,接下来的工作就是如果把数据传给kafka了,因为kafka本身就是一块比较大的内容限于篇幅我们下次继续,多谢各位观看,如果觉得还不错,还请帮转发。


以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

计算机程序设计艺术

计算机程序设计艺术

Donald E.Knuth / 苏运霖 / 机械工业出版社 / 2006-4 / 45.00元

《计算机程序设计艺术》(经典计算机科学著作最新版)(第1卷第1册双语版)更新了《计算机程序设计艺术,第1卷,基本算法》(第3版),并且最终将成为该书第4版的一部分。具体地说,它向程序员提供了盼望已久的MMIX,代替原来的MIX的一个以RISC为基础的计算机,并且描述了MMIX汇编语言。一起来看看 《计算机程序设计艺术》 这本书的介绍吧!

RGB转16进制工具
RGB转16进制工具

RGB HEX 互转工具

RGB CMYK 转换工具
RGB CMYK 转换工具

RGB CMYK 互转工具