用Data-pipeline模式将nginx日志存储到elasticsearch中(续)

栏目: 后端 · 发布时间: 7年前

内容简介:接上篇的内容,这篇我们要解决第二个问题,就是如何将我们转换完成的json数据发送到es中去,上篇提到了要存储到es中,我们要使用kafka来做消息队列,实现发布和订阅消息流模式,因为涉及kafka内容,所以我们先说一下Kafka的一些基本知识,然后再看代码不然一是一头雾水。kafka介绍:1、什么是kafka?

接上篇的内容,这篇我们要解决第二个问题,就是如何将我们转换完成的json数据发送到es中去,上篇提到了要存储到es中,我们要使用kafka来做消息队列,实现发布和订阅消息流模式,因为涉及kafka内容,所以我们先说一下Kafka的一些基本知识,然后再看代码不然一是一头雾水。

kafka介绍:

1、什么是kafka?

Kafka是最初由Linkedin公司开发,是一个分布式、支
持分区的(partition)、多副本的(replica),基于
zookeeper协调的分布式消息系统,它的最大的特性就是可
以实时的处理大量数据以满足各种需求场景:比如基于
hadoop的批处理系统、低延迟的实时系统、storm/Spark
流式处理引擎,web/nginx日志、访问日志,消息服务等
等,用scala语言编写,Linkedin于2010年贡献给了Apache
基金会并成为顶级开源 项目。

2、kafka特性

- 高吞吐量、低延迟:kafka每秒可以处理几十万条消息,
它的延迟最低只有几毫秒,每个topic可以分多个partition,
consumer group 对partition进行consume操作。
- 可扩展性:kafka集群支持热扩展
- 持久性、可靠性:消息被持久化到本地磁盘,并且支持
数据备份防止数据丢失
- 容错性:允许集群中节点失败(若副本数量为n,则允许n1个节点失败)
- 高并发:支持数千个客户端同时读写

3、kafka解决了什么问题?

kafka能在系统或应用程序之间构建可靠的用于传输实时数据的管道,
是一种高吞吐量的分布式发布订阅消息系统,
将不同的数据通过不同的topic实现发布订阅,
生产者生成的数据发布到对应的topic中,订阅这个topic的消费者
都可以消费这个topic中的数据。

4、kafka的一些概念和名词

1)Broker
Kafka集群包含一个或多个服务器,这种服务器被称为broker。broker端不维护数据的消费状态,提升了性能。直接使用磁盘进行存储,线
性读写,速度快:避免了数据在JVM内存和系统内存之间的复制,减少耗性能的创建对象和垃圾回收。
2)Producer
负责发布消息到Kafka broke
3)Consumer
消息消费者,向Kafka broker读取消息的客户端,consumer从broker拉取(pull)数据并进行处理。
4)Topic
每条发布到Kafka集群的消息都有一个类别,这个类别被称为Topic。(物理上不同Topic的消息分开存储,逻辑上一个Topic的消息虽然保
存于一个或多个broker上但用户只需指定消息的Topic即可生产或消费数据而不必关心数据存于何处)
5)Partition
Parition是物理上的概念,每个Topic包含一个或多个Partition.
6)Consumer Group
每个Consumer属于一个特定的Consumer Group(可为每个Consumer指定group name,若不指定group name则属于默认的group)
7)Topic & Partition 关系
Topic在逻辑上可以被认为是一个queue,每条消费都必须指定它的Topic,可以简单理解为必须指明把这条消息放进哪个queue里。为了使
得Kafka的吞吐率可以线性提高,物理上把Topic分成一个或多个Partition,每个Partition在物理上对应一个文件夹,该文件夹下存储这个
Partition的所有消息和索引文件

除了上面的一些内容外,我也谈一下我自己的一些理解,为什么kafka变得如此流行,很大程度上因为的特性决定的,

比如数据的聚合,怎么理解,就是我们可以把产生数据的系统就做生产者,有多少生产者无所谓,只要建立对应的topic,把数据发送到这个topic中就可以,如果有不同部门向要相同的数据,就去订阅这个topic即可,这样最大的好处是使我们系统结构清晰,同时能减少我们很多重复的工作。

另外一个重要的特性就是高并发性,并支持分布式部署,在一个繁忙的系统中,产生的日志或者其它数据是非常庞大的,要对这些数据进行处理,必须要一个高吞吐量、低延迟的处理系统,Kafka正好满足这个需求,每个topic可以建立一个或多个分区,每个分区你可以简单理解为一个公路上的多个车道,每个车就是数据,因为车道多所以它可以加速数据的传输。

经过上面的介绍,相信大家都对Kafka有了一个基本的了解,那接下来回到我们上面的问题,看如何在我们这个例子中使用:

1、安装kafka,这个因为比较简单就不在费篇幅写了。

2、安装完毕后建立一个topic, 命令:

bin/kafka-topics.sh –create –zookeeper 127.0.0.1:2181 –replication-factor 1 –partitions 1 –topic www_logs

3、安装:pip install kafka-python

4、实现生产者类:

from kafka import KafkaProducer
import json
 
 
class MyKafka(object):
 
    def __init__(self, kafka_brokers):
        self.producer = KafkaProducer(
            value_serializer=lambda v: json.dumps(v).encode('utf-8'),
            bootstrap_servers=kafka_brokers
        )
 
    def send_page_data(self, json_data, topic):
        result = self.producer.send(topic, key=b'log', value=json_data)
        print("kafka send result: {}".format(result.get()))

代码不是很多,先定义了初始化类的__init__方法,需要两个参数,一个就是broker这个没什么可说的, value_serializer的意思是用于将用户提供的消息值转换为字节,send_page_data()方法是将数据发送给对应的topic。

5、修改下上篇中的脚本,使其实现转换后发送到kafka中,修改如下:

import time
import datetime
import socket
import json
from mykafka import MyKafka   #add
 
 
def parse_log_line(line):
    strptime = datetime.datetime.strptime
    hostname = socket.gethostname()
    time = line.split(' ')[3][1::]
    entry = {}
    entry['datetime'] = strptime(
        time, "%d/%b/%Y:%H:%M:%S").strftime("%Y-%m-%d %H:%M")
    entry['source'] = "{}".format(hostname)
    entry['type'] = "www_access"
    entry['log'] = "'{}'".format(line.rstrip())
    return entry
 
 
def show_entry(entry):
    temp = ",".join([
        entry['datetime'],
        entry['source'],
        entry['type'],
        entry['log']
    ])
    log_entry = {'log': entry}
    temp = json.dumps(log_entry)
    print("{}".format(temp))
    return temp
 
 
def follow(syslog_file):
    syslog_file.seek(0, 2)
    pubsub = MyKafka(["10.51.117.28:9092"])   #add
    while True:
        line = syslog_file.readline()
        if not line:
            time.sleep(0.1)
            continue
        else:
            entry = parse_log_line(line)
            if not entry:
                continue
            json_entry = show_entry(entry)
            print(json_entry)
            pubsub.send_page_data(json_entry, 'www_logs')  #add
 
 
f = open("access.log", "rt")
follow(f)

增加的行都已注释,不多解释了。

到这我再梳理下我们都干了什么。

第一,我们实现了access.log日志的实时读取,并传唤成json格式。

第二,我们讲转换完毕的数据发送到kafka中,topic名词是www_logs

以上我们都已完成,接下来的问题是要怎么把数据存储到es中,不过先别急,先让我们验证下我们之前的工作是否正确吧,要验证是否可以从topic中读取数据,我们还需要一个消费者程序,为简单验证,我这边实现一个最简单的消费者程序,如下:

from kafka import KafkaConsumer
consumer = KafkaConsumer('www_logs',bootstrap_servers=['127.0.0.1:9092'])
for message in consumer:
    print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,message.offset, message.key, message.value))

就是从topic中读取数据,然后打印,如果没问题,就可以证明这条通路是通的,首先在一个终端运行,我们的日志分析程序,结果如下:

{"log": {"datetime": "2016-12-19 12:00", "source": "iZ258ml0cx5Z", "type": "www_access", "log": "'10.1.1.2 - - [19/Dec/2016:12:00:30 +0800] \"GET / HTTP/1.1\" 502 602 \"-\" \"Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Geco) Chrome/54.0.2840.99 Safari/537.36\"'"}}
{"log": {"datetime": "2016-12-19 12:00", "source": "iZ258ml0cx5Z", "type": "www_access", "log": "'10.1.1.2 - - [19/Dec/2016:12:00:30 +0800] \"GET / HTTP/1.1\" 502 602 \"-\" \"Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Geco) Chrome/54.0.2840.99 Safari/537.36\"'"}}

行太多,我只截取了2行,因为我们有打印,说明转换是没有问题的,但是否发送到了kafka呢,我们在另一个终端运行我们的消费程序,结果如下:

www_logs:0:21: key=b'log' value=b'"{\\"log\\": {\\"datetime\\": \\"2016-12-19 12:00\\", \\"source\\": \\"iZ258ml0cx5Z\\", \\"type\\": \\"www_access\\", \\"log\\": \\"\'10.1.1.2 - - [19/Dec/2016:12:00:30 +0800] \\\\\\"GET / HTTP/1.1\\\\\\" 502 602 \\\\\\"-\\\\\\" \\\\\\"Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Geco) Chrome/54.0.2840.99 Safari/537.36\\\\\\"\'\\"}}"'
www_logs:0:22: key=b'log' value=b'"{\\"log\\": {\\"datetime\\": \\"2016-12-19 12:00\\", \\"source\\": \\"iZ258ml0cx5Z\\", \\"type\\": \\"www_access\\", \\"log\\": \\"\'10.1.1.2 - - [19/Dec/2016:12:00:30 +0800] \\\\\\"GET / HTTP/1.1\\\\\\" 502 602 \\\\\\"-\\\\\\" \\\\\\"Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Geco) Chrome/54.0.2840.99 Safari/537.36\\\\\\"\'\\"}}"'

可以看到我们消费者成功的从kafka中取得了数据,说明这条通路我们已经打通了,这条数据流已经没有问题,那接下来就剩下最后存储到es中的问题了,我们后续再接续,这篇已经很长了。


以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

About Face 3

About Face 3

Alan Cooper、Robert Reimann、David Cronin / John Wiley & Sons / 2007-5-15 / GBP 28.99

* The return of the authoritative bestseller includes all new content relevant to the popularization of how About Face maintains its relevance to new Web technologies such as AJAX and mobile platforms......一起来看看 《About Face 3》 这本书的介绍吧!

图片转BASE64编码
图片转BASE64编码

在线图片转Base64编码工具

MD5 加密
MD5 加密

MD5 加密工具

Markdown 在线编辑器
Markdown 在线编辑器

Markdown 在线编辑器