内容简介:接上篇的内容,这篇我们要解决第二个问题,就是如何将我们转换完成的json数据发送到es中去,上篇提到了要存储到es中,我们要使用kafka来做消息队列,实现发布和订阅消息流模式,因为涉及kafka内容,所以我们先说一下Kafka的一些基本知识,然后再看代码不然一是一头雾水。kafka介绍:1、什么是kafka?
接上篇的内容,这篇我们要解决第二个问题,就是如何将我们转换完成的json数据发送到es中去,上篇提到了要存储到es中,我们要使用kafka来做消息队列,实现发布和订阅消息流模式,因为涉及kafka内容,所以我们先说一下Kafka的一些基本知识,然后再看代码不然一是一头雾水。
kafka介绍:
1、什么是kafka?
Kafka是最初由Linkedin公司开发,是一个分布式、支 持分区的(partition)、多副本的(replica),基于 zookeeper协调的分布式消息系统,它的最大的特性就是可 以实时的处理大量数据以满足各种需求场景:比如基于 hadoop的批处理系统、低延迟的实时系统、storm/Spark 流式处理引擎,web/nginx日志、访问日志,消息服务等 等,用scala语言编写,Linkedin于2010年贡献给了Apache 基金会并成为顶级开源 项目。
2、kafka特性
- 高吞吐量、低延迟:kafka每秒可以处理几十万条消息, 它的延迟最低只有几毫秒,每个topic可以分多个partition, consumer group 对partition进行consume操作。 - 可扩展性:kafka集群支持热扩展 - 持久性、可靠性:消息被持久化到本地磁盘,并且支持 数据备份防止数据丢失 - 容错性:允许集群中节点失败(若副本数量为n,则允许n1个节点失败) - 高并发:支持数千个客户端同时读写
3、kafka解决了什么问题?
kafka能在系统或应用程序之间构建可靠的用于传输实时数据的管道, 是一种高吞吐量的分布式发布订阅消息系统, 将不同的数据通过不同的topic实现发布订阅, 生产者生成的数据发布到对应的topic中,订阅这个topic的消费者 都可以消费这个topic中的数据。
4、kafka的一些概念和名词
1)Broker Kafka集群包含一个或多个服务器,这种服务器被称为broker。broker端不维护数据的消费状态,提升了性能。直接使用磁盘进行存储,线 性读写,速度快:避免了数据在JVM内存和系统内存之间的复制,减少耗性能的创建对象和垃圾回收。 2)Producer 负责发布消息到Kafka broke 3)Consumer 消息消费者,向Kafka broker读取消息的客户端,consumer从broker拉取(pull)数据并进行处理。 4)Topic 每条发布到Kafka集群的消息都有一个类别,这个类别被称为Topic。(物理上不同Topic的消息分开存储,逻辑上一个Topic的消息虽然保 存于一个或多个broker上但用户只需指定消息的Topic即可生产或消费数据而不必关心数据存于何处) 5)Partition Parition是物理上的概念,每个Topic包含一个或多个Partition. 6)Consumer Group 每个Consumer属于一个特定的Consumer Group(可为每个Consumer指定group name,若不指定group name则属于默认的group) 7)Topic & Partition 关系 Topic在逻辑上可以被认为是一个queue,每条消费都必须指定它的Topic,可以简单理解为必须指明把这条消息放进哪个queue里。为了使 得Kafka的吞吐率可以线性提高,物理上把Topic分成一个或多个Partition,每个Partition在物理上对应一个文件夹,该文件夹下存储这个 Partition的所有消息和索引文件
除了上面的一些内容外,我也谈一下我自己的一些理解,为什么kafka变得如此流行,很大程度上因为的特性决定的,
比如数据的聚合,怎么理解,就是我们可以把产生数据的系统就做生产者,有多少生产者无所谓,只要建立对应的topic,把数据发送到这个topic中就可以,如果有不同部门向要相同的数据,就去订阅这个topic即可,这样最大的好处是使我们系统结构清晰,同时能减少我们很多重复的工作。
另外一个重要的特性就是高并发性,并支持分布式部署,在一个繁忙的系统中,产生的日志或者其它数据是非常庞大的,要对这些数据进行处理,必须要一个高吞吐量、低延迟的处理系统,Kafka正好满足这个需求,每个topic可以建立一个或多个分区,每个分区你可以简单理解为一个公路上的多个车道,每个车就是数据,因为车道多所以它可以加速数据的传输。
经过上面的介绍,相信大家都对Kafka有了一个基本的了解,那接下来回到我们上面的问题,看如何在我们这个例子中使用:
1、安装kafka,这个因为比较简单就不在费篇幅写了。
2、安装完毕后建立一个topic, 命令:
bin/kafka-topics.sh –create –zookeeper 127.0.0.1:2181 –replication-factor 1 –partitions 1 –topic www_logs
3、安装:pip install kafka-python
4、实现生产者类:
from kafka import KafkaProducer import json class MyKafka(object): def __init__(self, kafka_brokers): self.producer = KafkaProducer( value_serializer=lambda v: json.dumps(v).encode('utf-8'), bootstrap_servers=kafka_brokers ) def send_page_data(self, json_data, topic): result = self.producer.send(topic, key=b'log', value=json_data) print("kafka send result: {}".format(result.get()))
代码不是很多,先定义了初始化类的__init__方法,需要两个参数,一个就是broker这个没什么可说的, value_serializer的意思是用于将用户提供的消息值转换为字节,send_page_data()方法是将数据发送给对应的topic。
5、修改下上篇中的脚本,使其实现转换后发送到kafka中,修改如下:
import time import datetime import socket import json from mykafka import MyKafka #add def parse_log_line(line): strptime = datetime.datetime.strptime hostname = socket.gethostname() time = line.split(' ')[3][1::] entry = {} entry['datetime'] = strptime( time, "%d/%b/%Y:%H:%M:%S").strftime("%Y-%m-%d %H:%M") entry['source'] = "{}".format(hostname) entry['type'] = "www_access" entry['log'] = "'{}'".format(line.rstrip()) return entry def show_entry(entry): temp = ",".join([ entry['datetime'], entry['source'], entry['type'], entry['log'] ]) log_entry = {'log': entry} temp = json.dumps(log_entry) print("{}".format(temp)) return temp def follow(syslog_file): syslog_file.seek(0, 2) pubsub = MyKafka(["10.51.117.28:9092"]) #add while True: line = syslog_file.readline() if not line: time.sleep(0.1) continue else: entry = parse_log_line(line) if not entry: continue json_entry = show_entry(entry) print(json_entry) pubsub.send_page_data(json_entry, 'www_logs') #add f = open("access.log", "rt") follow(f)
增加的行都已注释,不多解释了。
到这我再梳理下我们都干了什么。
第一,我们实现了access.log日志的实时读取,并传唤成json格式。
第二,我们讲转换完毕的数据发送到kafka中,topic名词是www_logs
以上我们都已完成,接下来的问题是要怎么把数据存储到es中,不过先别急,先让我们验证下我们之前的工作是否正确吧,要验证是否可以从topic中读取数据,我们还需要一个消费者程序,为简单验证,我这边实现一个最简单的消费者程序,如下:
from kafka import KafkaConsumer consumer = KafkaConsumer('www_logs',bootstrap_servers=['127.0.0.1:9092']) for message in consumer: print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,message.offset, message.key, message.value))
就是从topic中读取数据,然后打印,如果没问题,就可以证明这条通路是通的,首先在一个终端运行,我们的日志分析程序,结果如下:
{"log": {"datetime": "2016-12-19 12:00", "source": "iZ258ml0cx5Z", "type": "www_access", "log": "'10.1.1.2 - - [19/Dec/2016:12:00:30 +0800] \"GET / HTTP/1.1\" 502 602 \"-\" \"Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Geco) Chrome/54.0.2840.99 Safari/537.36\"'"}} {"log": {"datetime": "2016-12-19 12:00", "source": "iZ258ml0cx5Z", "type": "www_access", "log": "'10.1.1.2 - - [19/Dec/2016:12:00:30 +0800] \"GET / HTTP/1.1\" 502 602 \"-\" \"Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Geco) Chrome/54.0.2840.99 Safari/537.36\"'"}}
行太多,我只截取了2行,因为我们有打印,说明转换是没有问题的,但是否发送到了kafka呢,我们在另一个终端运行我们的消费程序,结果如下:
www_logs:0:21: key=b'log' value=b'"{\\"log\\": {\\"datetime\\": \\"2016-12-19 12:00\\", \\"source\\": \\"iZ258ml0cx5Z\\", \\"type\\": \\"www_access\\", \\"log\\": \\"\'10.1.1.2 - - [19/Dec/2016:12:00:30 +0800] \\\\\\"GET / HTTP/1.1\\\\\\" 502 602 \\\\\\"-\\\\\\" \\\\\\"Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Geco) Chrome/54.0.2840.99 Safari/537.36\\\\\\"\'\\"}}"' www_logs:0:22: key=b'log' value=b'"{\\"log\\": {\\"datetime\\": \\"2016-12-19 12:00\\", \\"source\\": \\"iZ258ml0cx5Z\\", \\"type\\": \\"www_access\\", \\"log\\": \\"\'10.1.1.2 - - [19/Dec/2016:12:00:30 +0800] \\\\\\"GET / HTTP/1.1\\\\\\" 502 602 \\\\\\"-\\\\\\" \\\\\\"Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Geco) Chrome/54.0.2840.99 Safari/537.36\\\\\\"\'\\"}}"'
可以看到我们消费者成功的从kafka中取得了数据,说明这条通路我们已经打通了,这条数据流已经没有问题,那接下来就剩下最后存储到es中的问题了,我们后续再接续,这篇已经很长了。
以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网
猜你喜欢:- 树形结构数据存储方案(一):邻接列表模式
- Repository存储库模式 – Abhishek Chaudhary
- 存储过程的in、out、inout模式
- 大数据分布式存储的部署模式:分离式or超融合
- 用Data-pipeline模式将nginx日志存储到elasticsearch中(1)
- 分离式or超融合,分布式存储建设时的两种部署模式
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
基于MVC的JavaScript Web富应用开发
麦卡劳(Alex MacCaw) / 李晶、张散集 / 电子工业出版社 / 2012-5 / 59.00元
《JavaScript Web 富应用开发》Developing JavaScript Web Applications是 Alex MacCaw 的新作(由O'Reilly出版发行),本书系统而深入的讲解了如何使用最前沿的Web技术构建下一代互联网富应用程序。作者 Alex MacCaw 是一名Ruby/JavaScript 程序员,在开源社区中很有名望,是Spine框架的作者,同时活跃在纽约、......一起来看看 《基于MVC的JavaScript Web富应用开发》 这本书的介绍吧!