“日常研究”之动态刷新 respage01 百度热力图

栏目: Python · 发布时间: 6年前

内容简介:因为这个DashBoard自己会一直开发维护下去,原本会演变成一个《试用 vue-admin-template 写一个自己的dashboard》+ 1、2、3、、N。但是今天还是把标题改掉了,因为其实重点并不是dashboard本身,真正的重点是我日常想去做的业余研究,顺便研究些华丽的前后端技术。Respage01的起因是老婆大人对周边各种早教和教育培训机构春笋般出现的现象感到费解,然后向我提了一个问题,“如果我们入股类似的一个小教育培训机构,会不会有前途?”。这是一个很难回答的问题,因为我们都没有在培训

因为这个DashBoard自己会一直开发维护下去,原本会演变成一个《试用 vue-admin-template 写一个自己的dashboard》+ 1、2、3、、N。但是今天还是把标题改掉了,因为其实重点并不是dashboard本身,真正的重点是我日常想去做的业余研究,顺便研究些华丽的前后端技术。

respage01 再说明

Respage01的起因是老婆大人对周边各种早教和教育培训机构春笋般出现的现象感到费解,然后向我提了一个问题,“如果我们入股类似的一个小教育培训机构,会不会有前途?”。这是一个很难回答的问题,因为我们都没有在培训教育行业里待过,但是既然问题已经出现了,自己也觉得这个问题有些意思,便想要做一个日常研究。Respage01会从宏观角度去分析并监测教育培训机构(在金华地区)相关的数据,数据都来自互联网。

预加功能

上一次已经留下了一个壳子,类似这样:

“日常研究”之动态刷新 respage01 百度热力图

但是从代码上也看到了,我是把某一天的坐标数据硬编码到了这个页面中,所以今天就是要来做成动态,刚好结合《部署Django REST Framework服务(Nginx + uWSGI + Django)》,将接口部署到线上。

既然做了接口化,那我们就可以定时做好每天的数据爬取 -> 数据清洗 -> 数据入库 -> 接口吐出。那么页面便可每天获取到最新的数据。

问题又来了,既然每天有了最新的数据,那最好是有方式可以看到每天的变化。可能在一定的时间跨度内,可以看到一些明显的变化,所以我打算做一个数据回看功能,做法可能很简单: 触发了某个按钮后,定时刷新所有的数据,或者做成一个短视频形式

在一个较短的时间跨度内,有可能很难从热力图上看到变化,所以打算加一个折线图来标识每天的数据。

部署自动爬取和数据清洗并入库

目前的爬虫目录很简单:

.
├── config.json
├── log
│   └── train-2018-09-03.log
├── result
│   └── train-2018-09-03.txt
└── spiker.py
复制代码

那完成这个事情定时运行类似这样的脚本就可以:

python spiker.py | python format.py | python writeToRedis.py && python redisToMysql.py
复制代码

虽然有人会建议可以直接在爬取时完成清洗、去重和入库的动作,但是我还是喜欢用这种流式的方法来处理,这样更加清晰,功能也更加解耦。而且了解管道的同学可以看出来,这其实就是同时完成了爬取、清洗和入库动作,只不过是每条数据串行完成了这系列动作。这里的writeToRedis.py是为了利用 redis 天然的去重功能,redis的读写性能也会让效率更高些。

修改spiker.py

修改就只有两个:

  • 将原先的查询关键字等配置信息写到config.json中,方便各管道节点获取到统一的信息
  • 在原先写文件的地方,直接加个print,将数据标准输出。
"""
    查询关键字:移到config.json
"""
FileKey = 'train'
KeyWord = u"早教$培训"
复制代码
## 设置标准输出的编码格式
sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf8')

for r in res['results']:
  file.writelines(str(r).strip() + '\n')
  # 增加标准输出
  print(str(r).strip(), flush=True)
复制代码

新增format.py 过滤无用信息

首先读取上一级管道的标准输出作为输入,使用fileinput便可实现:

def main():
    for line in fileinput.input():
         format(line)
复制代码

分析一条的数据的结构,只保留感兴趣的数据(不用担心丢失,因为在第一个节点上已经保存了原始数据),并尽量把json结构拉平成只有一级:

{'name': '英伦艺术培训', 'lat': 29.109614, 'lng': 119.662018, 'address': '解放东路238号福莲汇8层', 'province': '浙江省', 'city': '金华市', 'area': '婺城区', 'street_id': '15ca1ce6773a95f7a2a9343c', 'detail': 1, 'uid': '15ca1ce6773a95f7a2a9343c', 'detail_info': {'tag': '教育培训;培训机构', 'type': 'education', 'detail_url': 'http://api.map.baidu.com/place/detail?uid=15ca1ce6773a95f7a2a9343c&output=html&source=placeapi_v2', 'overall_rating': '0.0', 'children': []}}
复制代码

由于该数据一级比较简单,所以format也只是做了很小的处理,另外,这样的好处时,不同的数据结构可以写不同的format就可以。

# coding: utf-8
import fileinput
import io
import sys
import chardet
sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf8')

def format(line):
    """
    :param line:
    :return:
    """
    result = {}
    tmp = eval(line.decode('utf-8'))
    try:
        result = {
            "name": str(tmp["name"]),
            "lat": tmp["location"]["lat"],
            "lng": tmp["location"]["lng"],
            "address": str(tmp["address"]),
            "tag": str(tmp["detail_info"]["tag"]),
        }

        # 部分数据可能缺失字段
        if "detail_url" in tmp["detail_info"]:
            result["detail_url"] = tmp["detail_info"]["detail_url"]
        else:
            result["detail_url"] = ""

        if "overall_rating" in tmp["detail_info"]:
            result["rate"] = tmp["detail_info"]["overall_rating"]
        else:
            result["rate"] = "0.0"

        print(str(result).strip(), flush=True)

    except Exception as e:
        print(e)
        pass


def main():
    try:
        for line in fileinput.input(mode='rb'):
            format(line)
        sys.stderr.close()
    except Exception as e:
        print(e)
        pass


if __name__ == '__main__':
    main()

复制代码

如果数据量大,可以用类似的方法来调试:

cat node1.txt | head -n 1 | python format.py
复制代码

其实使用 python 的set也可以完成去重的事情,代码中也可以尝试这样的操作。关于去重的方式,在不同场景下有各式的方案,我们这属于简单场景,因为数据量不大。

系统安装redis服务,并配置密码:

sudo apt-get install redis-server
复制代码

在虚拟环境下安装redis库:

pip install redis
vi /etc/redis/redis.conf
# 打开 requirepass 配置项,并后面跟上密码
requirepass xxxx
复制代码

登录测试:

redis-cli -a xxxx
复制代码

redis有String、List、Set、Hash、Sort Hash几种类型,由于我们只是要做去重,那就用Set结构就可以:

train_2018_09_07(key) -> (数据1,数据2 ... 数据n)
复制代码

writeToRedis的简单实现:

# coding: utf-8
import fileinput
import redis
import time
from tool.tool import tool
import io
import sys
sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf8')


def connectRedis():
    re = redis.Redis(
        host=tool.getRedisHost(),
        port=tool.getRedisPort(),
        password=tool.getRedisPass(),
        decode_responses=True)
    return re


def main():
    today = time.strftime("%Y_%m_%d")
    setName = tool.getFileKey() + "_" + today
    try:
        re = connectRedis()
        for line in fileinput.input(mode='rb'):
            re.sadd(setName, line.decode('utf-8').strip())
        exit(0)
    except Exception as e:
        print(e)
        exit(-1)

if __name__ == '__main__':
    main()

复制代码

使用redis的set还有一个好处就是,可以理由set的差集等功能,快速获取每天发生变化的数据。这个需求打算后面加上

执行:

python spiker.py | python format.py | python writeToRedis.py
复制代码

运行后,原始数据文件条目:

cat train-2018-09-07.txt | wc -l                                            663
复制代码

Redis Set 内的条目数:

127.0.0.1:6379> SCARD train_2018_09_07
(integer) 640
复制代码

说明确实还是有重复的数据,原因可能是我们使用了10*10小矩形扫描的方式,不可避免地会有交界处重叠的问题。

关于使用了redis后还是否需要 Mysql 的讨论也有很多,大家可以去参与讨论。我个人的考虑是Django上可以更好地支持Mysql来做序列化和反序列化,毕竟Mysql的查询功能也更加舒适一些。

首先从redis中读出数据的形式,可以使用迭代器的方式,好处是稍微省内存些,但是问题是如果单条数据单独写入mysql的话,IO上估计也不太合算,所以我使用pandas的DataFrame写入mysql的方式来分批写入,使用pandas的好处是字典数据写入数据操作比一般的mysql库要简洁很多。

虚拟环境下安装pandas:

pip install pandas sqlalchemy
复制代码
# coding: utf-8
import redis
from tool.tool import tool
import time
import pandas as pd
from sqlalchemy import create_engine
import pymysql


def connectRedis():
    """
    连接Redis
    :return: redis connect
    """
    re = redis.Redis(
        host=tool.getRedisHost(),
        port=tool.getRedisPort(),
        password=tool.getRedisPass(),
        decode_responses=True)
    return re


def connectMysql():
    """
    连接mysql数据库
    :return: engine connect
    """
    config = tool.getMysqlConfig()
    engine = create_engine(str(r"mysql+pymysql://%s:%s@%s/%s?charset=utf8") %
                           (config['User'],
                            config['Pass'],
                            config['Host'],
                            config['Name']))
    return engine


def redisToMysql(re, en):
    """
    :param re: redis connect
    :param en: mysql engine connect
    :return:
    """
    today = time.strftime("%Y_%m_%d")
    tableName = tool.getFileKey() + '_' + today
    res = []
    index = 0
    for item in re.sscan_iter(tool.getFileKey() + '_' + today):
        tmp = eval(item.encode('utf-8').decode('utf-8'))
        tmp['time'] = today
        res.append(tmp)
        index += 1
        if index >= 100:
            df = pd.DataFrame(res)
            df.to_sql('respage01', con=en, if_exists='append', index=False,)
            index = 0
            res = []
    if index != 0:
        df = pd.DataFrame(res)
        df.to_sql(name='respage01', con=en, if_exists='append', index=False)

    # 添加主键
    # print("xxxxxxxx")
    # with en.connect() as con:
    #     con.execute("alter table respage01 add COLUMN id INT NOT NULL AUTO_INCREMENT  primary key first")


def main():
    re = connectRedis()
    en = connectMysql()
    redisToMysql(re, en)


if __name__ == '__main__':
    main()

复制代码

为了后面django处理方便,我后面临时加入了一个自增id作为主键。方法可以是:

alter table respage01 add COLUMN id INT NOT NULL AUTO_INCREMENT  primary key first;
复制代码

编写相关apis

我们设计两个api: * 获取某个时间段内的所有坐标数据 * 获取某个时间段内每天的数量值

model实现

class Respage01Info(models.Model):
    """
    respage 01 相关的数据
    """
    time = models.CharField(max_length=100)
    name = models.CharField(max_length=200)
    address = models.CharField(max_length=500)
    detail_url = models.URLField(max_length=500)
    rate = models.FloatField()
    lat = models.FloatField()
    lng = models.FloatField()

    class Meta:
			# 指定数据表
        db_table = "respage01"
复制代码

需要注意的是,我们已经拥有了数据库,并且表里已经有了数据,所以在执行migrate的时候,需要指明fake掉该项目的数据迁移:

python manage.py migrate --fake rouboapi
复制代码

Serializer实现

由于我们的计数接口是需要使用聚合类查询功能,简单说,就是需要返回数据库字段以外的字段给客户端,所以需要使用serializers的Field方法。

class Respage01Serializer(serializers.HyperlinkedModelSerializer):
    """
    序列化Respage01相关的数据
    """

    class Meta:
        model = Respage01Info
        fields = ('time', 'lat', 'lng', 'name', 'address', 'detail_url', 'rate')


class Respage01CountSerializer(serializers.HyperlinkedModelSerializer):
    """
    序列化计数数据,用于序列化聚合类查询的结果
    """
    time = serializers.StringRelatedField()
    count = serializers.IntegerField()

    class Meta:
        model = Respage01Info
        fields = ('time', 'count')
复制代码

view实现

这里需要用到django的数据库查询相关的知识,我们这里用到了fiter、values、annotate几个函数,具体的可以参考官方文档,基本用法还是比较简单。

class Respage01(APIView):
    """
    获取respage01相关的数据
    """

    authentication_classes = []
    permission_classes = []

    def rangeTime(self, start_time, end_time):
        """
        获取时间区间
        :param start_time:
        :param end_time:
        :return:
        """
        print("------------")
        dateList = [datetime.strftime(x, "%Y_%m_%d")
                    for x in list(pd.date_range(start=start_time.replace('_',''), end=end_time.replace('_','')))]
        return dateList

    def get(self, request, format=None):
        req = request.query_params
        if 'type' not in req or 'start_time' not in req or 'end_time' not in req:
            return Response({}, status=status.HTTP_400_BAD_REQUEST)
        if req['type'] == 'location':
            dateList = self.rangeTime(start_time=req['start_time'], end_time=req['end_time'])
            queryset = Respage01Info.objects.filter(time__in=dateList)
            serializer = Respage01Serializer(queryset, many=True)
        elif req['type'] == 'count':
            dateList = self.rangeTime(start_time=req['start_time'], end_time=req['end_time'])
            queryset = Respage01Info.objects.filter(time__in=dateList).values('time').annotate(count=Count('id'))
            serializer = Respage01CountSerializer(queryset, many=True)
        return Response(serializer.data, status=status.HTTP_200_OK)
复制代码

接口上线后发现的异常

在接口上线后测试过程中,发现接口极其不稳定,查了一下发现mysql会异常地退出,查看了日志发现是内存不足导致。

我的vps是1G内存的基础配置,虽然小,但是不至于这么紧张。通过top【M】排序后惊奇地发现uwsgi开了10个进程,每个进程占用了7%左右的内存。修改uwsgi ini文件重启后故障排除(我们这种小服务,两个进程足够了)。

# mysite_uwsgi.ini file
[uwsgi]

# Django-related settings
# the base directory (full path)
chdir           = /data/django/rouboApi
# Django's wsgi file
module          = rouboinfo.wsgi
# the virtualenv (full path)
home            = /data/django/env3

# process-related settings
# master
master          = true
# maximum number of worker processes
processes       = 2
# the socket (use the full path to be safe
socket          = /data/django/rouboApi/rouboapi.scok
# ... with appropriate permissions - may be needed
chmod-socket    = 666
# clear environment on exit
vacuum          = true
复制代码

修改respage01页面

roubo’s dashboard 主要是增加了两个接口请求,并将v-charts的数据动态化。这里也简单加了一个“复盘”按钮,定时刷新数据,可以大概看到一些变化。

<template>
  <div>
    <div style="height: 100%">
      <button @click="onPlay">复盘</button>
      <ve-heatmap :data="chartDataMap" :settings="chartSettingsMap" height="600px"/>
    </div>
    <div>
      <ve-line :data="chartDataChart" :settings="chartSettingsChart"/>
    </div>
  </div>
</template>
复制代码
/**
 * 获取某个时间区间的位置信息
 * @param start_time
 * @param end_time
 */
getLocations: function(start_time, end_time) {
  this.rouboapis.getRespage01Info('location', start_time, end_time, {
    success: (res) => {
      this.chartDataMap.rows = res
    },
    fail: (err) => {
      console.log(err)
    }
  })
},
/**
 * 获取某个时间段的统计数据
 * @param start_time
 * @param end_time
 */
getCount: function(start_time, end_time) {
  this.rouboapis.getRespage01Info('count', start_time, end_time, {
    success: (res) => {
      this.chartDataChart.rows = res
    }
  })
},

/**
 * 点击复盘按钮事件
 */
onPlay: function() {
  const dateList = this.getDateList('2018_09_13', this.today('_'))
  let index = 0
  const timer = setInterval(() => {
    this.getLocations(dateList[index], dateList[index])
    this.getCount('2018_09_13', dateList[index])
    index = index + 1
    if (index >= dateList.length) {
      clearInterval(timer)
      return
    }
  }, 5000)
}

复制代码

以上所述就是小编给大家介绍的《“日常研究”之动态刷新 respage01 百度热力图》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

引爆点

引爆点

[美] 马尔科姆·格拉德威尔 / 钱清、覃爱冬 / 中信出版社 / 2006-1 / 29.80元

这本书是《纽约客》杂志专职作家马尔科姆·格拉德威尔的一部才华横溢之作。他以社会上突如其来的流行风潮研究为切入点,从一个全新的角度探索了控制科学和营销模式。他认为,思想、行为、信息以及产品常常会像传染病爆发一样,迅速传播蔓延。正如一个病人就能引起一场全城流感;如果个别工作人员对顾客大打出手,或几位涂鸦爱好者管不住自己,也能在地铁里掀起一场犯罪浪潮;一位满意而归的顾客还能让新开张的餐馆座无虚席。这些现......一起来看看 《引爆点》 这本书的介绍吧!

HTML 压缩/解压工具
HTML 压缩/解压工具

在线压缩/解压 HTML 代码

RGB HSV 转换
RGB HSV 转换

RGB HSV 互转工具

RGB CMYK 转换工具
RGB CMYK 转换工具

RGB CMYK 互转工具