因为这个DashBoard自己会一直开发维护下去,原本会演变成一个《试用 vue-admin-template 写一个自己的dashboard》+ 1、2、3、、N。但是今天还是把标题改掉了,因为其实重点并不是dashboard本身,真正的重点是我日常想去做的业余研究,顺便研究些华丽的前后端技术。
respage01 再说明
但是从代码上也看到了,我是把某一天的坐标数据硬编码到了这个页面中,所以今天就是要来做成动态,刚好结合《部署Django REST Framework服务(Nginx + uWSGI + Django)》,将接口部署到线上。
既然做了接口化,那我们就可以定时做好每天的数据爬取 -> 数据清洗 -> 数据入库 -> 接口吐出。那么页面便可每天获取到最新的数据。
问题又来了,既然每天有了最新的数据,那最好是有方式可以看到每天的变化。可能在一定的时间跨度内,可以看到一些明显的变化,所以我打算做一个数据回看功能,做法可能很简单: 触发了某个按钮后,定时刷新所有的数据,或者做成一个短视频形式
. ├── config.json ├── log │ └── train-2018-09-03.log ├── result │ └── train-2018-09-03.txt └── spiker.py 复制代码
python spiker.py | python format.py | python writeToRedis.py && python redisToMysql.py 复制代码
虽然有人会建议可以直接在爬取时完成清洗、去重和入库的动作,但是我还是喜欢用这种流式的方法来处理,这样更加清晰,功能也更加解耦。而且了解管道的同学可以看出来,这其实就是同时完成了爬取、清洗和入库动作,只不过是每条数据串行完成了这系列动作。这里的writeToRedis.py是为了利用 redis 天然的去重功能,redis的读写性能也会让效率更高些。
- 将原先的查询关键字等配置信息写到config.json中,方便各管道节点获取到统一的信息
- 在原先写文件的地方,直接加个print,将数据标准输出。
""" 查询关键字:移到config.json """ FileKey = 'train' KeyWord = u"早教$培训" 复制代码
## 设置标准输出的编码格式 sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf8') for r in res['results']: file.writelines(str(r).strip() + '\n') # 增加标准输出 print(str(r).strip(), flush=True) 复制代码
新增format.py 过滤无用信息
def main(): for line in fileinput.input(): format(line) 复制代码
{'name': '英伦艺术培训', 'lat': 29.109614, 'lng': 119.662018, 'address': '解放东路238号福莲汇8层', 'province': '浙江省', 'city': '金华市', 'area': '婺城区', 'street_id': '15ca1ce6773a95f7a2a9343c', 'detail': 1, 'uid': '15ca1ce6773a95f7a2a9343c', 'detail_info': {'tag': '教育培训;培训机构', 'type': 'education', 'detail_url': 'http://api.map.baidu.com/place/detail?uid=15ca1ce6773a95f7a2a9343c&output=html&source=placeapi_v2', 'overall_rating': '0.0', 'children': []}} 复制代码
# coding: utf-8 import fileinput import io import sys import chardet sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf8') def format(line): """ :param line: :return: """ result = {} tmp = eval(line.decode('utf-8')) try: result = { "name": str(tmp["name"]), "lat": tmp["location"]["lat"], "lng": tmp["location"]["lng"], "address": str(tmp["address"]), "tag": str(tmp["detail_info"]["tag"]), } # 部分数据可能缺失字段 if "detail_url" in tmp["detail_info"]: result["detail_url"] = tmp["detail_info"]["detail_url"] else: result["detail_url"] = "" if "overall_rating" in tmp["detail_info"]: result["rate"] = tmp["detail_info"]["overall_rating"] else: result["rate"] = "0.0" print(str(result).strip(), flush=True) except Exception as e: print(e) pass def main(): try: for line in fileinput.input(mode='rb'): format(line) sys.stderr.close() except Exception as e: print(e) pass if __name__ == '__main__': main() 复制代码
cat node1.txt | head -n 1 | python format.py 复制代码
其实使用 python 的set也可以完成去重的事情,代码中也可以尝试这样的操作。关于去重的方式,在不同场景下有各式的方案,我们这属于简单场景,因为数据量不大。
sudo apt-get install redis-server 复制代码
pip install redis vi /etc/redis/redis.conf # 打开 requirepass 配置项,并后面跟上密码 requirepass xxxx 复制代码
redis-cli -a xxxx 复制代码
redis有String、List、Set、Hash、Sort Hash几种类型,由于我们只是要做去重,那就用Set结构就可以:
train_2018_09_07(key) -> (数据1,数据2 ... 数据n) 复制代码
# coding: utf-8 import fileinput import redis import time from tool.tool import tool import io import sys sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf8') def connectRedis(): re = redis.Redis( host=tool.getRedisHost(), port=tool.getRedisPort(), password=tool.getRedisPass(), decode_responses=True) return re def main(): today = time.strftime("%Y_%m_%d") setName = tool.getFileKey() + "_" + today try: re = connectRedis() for line in fileinput.input(mode='rb'): re.sadd(setName, line.decode('utf-8').strip()) exit(0) except Exception as e: print(e) exit(-1) if __name__ == '__main__': main() 复制代码
python spiker.py | python format.py | python writeToRedis.py 复制代码
cat train-2018-09-07.txt | wc -l 663 复制代码
Redis Set 内的条目数:> SCARD train_2018_09_07 (integer) 640 复制代码
关于使用了redis后还是否需要 Mysql 的讨论也有很多,大家可以去参与讨论。我个人的考虑是Django上可以更好地支持Mysql来做序列化和反序列化,毕竟Mysql的查询功能也更加舒适一些。
pip install pandas sqlalchemy 复制代码
# coding: utf-8 import redis from tool.tool import tool import time import pandas as pd from sqlalchemy import create_engine import pymysql def connectRedis(): """ 连接Redis :return: redis connect """ re = redis.Redis( host=tool.getRedisHost(), port=tool.getRedisPort(), password=tool.getRedisPass(), decode_responses=True) return re def connectMysql(): """ 连接mysql数据库 :return: engine connect """ config = tool.getMysqlConfig() engine = create_engine(str(r"mysql+pymysql://%s:%s@%s/%s?charset=utf8") % (config['User'], config['Pass'], config['Host'], config['Name'])) return engine def redisToMysql(re, en): """ :param re: redis connect :param en: mysql engine connect :return: """ today = time.strftime("%Y_%m_%d") tableName = tool.getFileKey() + '_' + today res = [] index = 0 for item in re.sscan_iter(tool.getFileKey() + '_' + today): tmp = eval(item.encode('utf-8').decode('utf-8')) tmp['time'] = today res.append(tmp) index += 1 if index >= 100: df = pd.DataFrame(res) df.to_sql('respage01', con=en, if_exists='append', index=False,) index = 0 res = [] if index != 0: df = pd.DataFrame(res) df.to_sql(name='respage01', con=en, if_exists='append', index=False) # 添加主键 # print("xxxxxxxx") # with en.connect() as con: # con.execute("alter table respage01 add COLUMN id INT NOT NULL AUTO_INCREMENT primary key first") def main(): re = connectRedis() en = connectMysql() redisToMysql(re, en) if __name__ == '__main__': main() 复制代码
alter table respage01 add COLUMN id INT NOT NULL AUTO_INCREMENT primary key first; 复制代码
我们设计两个api: * 获取某个时间段内的所有坐标数据 * 获取某个时间段内每天的数量值
class Respage01Info(models.Model): """ respage 01 相关的数据 """ time = models.CharField(max_length=100) name = models.CharField(max_length=200) address = models.CharField(max_length=500) detail_url = models.URLField(max_length=500) rate = models.FloatField() lat = models.FloatField() lng = models.FloatField() class Meta: # 指定数据表 db_table = "respage01" 复制代码
python manage.py migrate --fake rouboapi 复制代码
class Respage01Serializer(serializers.HyperlinkedModelSerializer): """ 序列化Respage01相关的数据 """ class Meta: model = Respage01Info fields = ('time', 'lat', 'lng', 'name', 'address', 'detail_url', 'rate') class Respage01CountSerializer(serializers.HyperlinkedModelSerializer): """ 序列化计数数据,用于序列化聚合类查询的结果 """ time = serializers.StringRelatedField() count = serializers.IntegerField() class Meta: model = Respage01Info fields = ('time', 'count') 复制代码
class Respage01(APIView): """ 获取respage01相关的数据 """ authentication_classes = [] permission_classes = [] def rangeTime(self, start_time, end_time): """ 获取时间区间 :param start_time: :param end_time: :return: """ print("------------") dateList = [datetime.strftime(x, "%Y_%m_%d") for x in list(pd.date_range(start=start_time.replace('_',''), end=end_time.replace('_','')))] return dateList def get(self, request, format=None): req = request.query_params if 'type' not in req or 'start_time' not in req or 'end_time' not in req: return Response({}, status=status.HTTP_400_BAD_REQUEST) if req['type'] == 'location': dateList = self.rangeTime(start_time=req['start_time'], end_time=req['end_time']) queryset = Respage01Info.objects.filter(time__in=dateList) serializer = Respage01Serializer(queryset, many=True) elif req['type'] == 'count': dateList = self.rangeTime(start_time=req['start_time'], end_time=req['end_time']) queryset = Respage01Info.objects.filter(time__in=dateList).values('time').annotate(count=Count('id')) serializer = Respage01CountSerializer(queryset, many=True) return Response(serializer.data, status=status.HTTP_200_OK) 复制代码
我的vps是1G内存的基础配置,虽然小,但是不至于这么紧张。通过top【M】排序后惊奇地发现uwsgi开了10个进程,每个进程占用了7%左右的内存。修改uwsgi ini文件重启后故障排除(我们这种小服务,两个进程足够了)。
# mysite_uwsgi.ini file [uwsgi] # Django-related settings # the base directory (full path) chdir = /data/django/rouboApi # Django's wsgi file module = rouboinfo.wsgi # the virtualenv (full path) home = /data/django/env3 # process-related settings # master master = true # maximum number of worker processes processes = 2 # the socket (use the full path to be safe socket = /data/django/rouboApi/rouboapi.scok # ... with appropriate permissions - may be needed chmod-socket = 666 # clear environment on exit vacuum = true 复制代码
roubo’s dashboard 主要是增加了两个接口请求,并将v-charts的数据动态化。这里也简单加了一个“复盘”按钮,定时刷新数据,可以大概看到一些变化。
<template> <div> <div style="height: 100%"> <button @click="onPlay">复盘</button> <ve-heatmap :data="chartDataMap" :settings="chartSettingsMap" height="600px"/> </div> <div> <ve-line :data="chartDataChart" :settings="chartSettingsChart"/> </div> </div> </template> 复制代码
/** * 获取某个时间区间的位置信息 * @param start_time * @param end_time */ getLocations: function(start_time, end_time) { this.rouboapis.getRespage01Info('location', start_time, end_time, { success: (res) => { this.chartDataMap.rows = res }, fail: (err) => { console.log(err) } }) }, /** * 获取某个时间段的统计数据 * @param start_time * @param end_time */ getCount: function(start_time, end_time) { this.rouboapis.getRespage01Info('count', start_time, end_time, { success: (res) => { this.chartDataChart.rows = res } }) }, /** * 点击复盘按钮事件 */ onPlay: function() { const dateList = this.getDateList('2018_09_13', this.today('_')) let index = 0 const timer = setInterval(() => { this.getLocations(dateList[index], dateList[index]) this.getCount('2018_09_13', dateList[index]) index = index + 1 if (index >= dateList.length) { clearInterval(timer) return } }, 5000) } 复制代码
