Flask 偏函数、g对象、flask-session、数据库连接池、信号、自制命令、flask-admin

栏目: IT技术 · 发布时间: 4年前

内容简介:当函数的参数个数太多,需要简化时,使用专门用来存储用户信息的g对象,g的全称的为globalg对象在一次请求中的所有的代码的地方,都是可以使用的

一、偏函数

当函数的参数个数太多,需要简化时,使用 functools.partial 可以创建一个新的函数,这个新函数可以固定住原函数的部分参数,从而在调用时更简单。

from functools import partial
def func(a1,a2,a3):
    print(a1,a2,a3)


new_func1 = partial(func,a1=1,a2=2)
new_func1(a3=3)  # 额外传值

new_func2 = partial(func,1,2)
new_func2(3)

new_func3 = partial(func,a1=1)
new_func3(a2=2,a3=3)

二、g对象

专门用来存储用户信息的g对象,g的全称的为global

g对象在一次请求中的所有的代码的地方,都是可以使用的

作用:防止和request内部的属性混乱,比如不知道request里面有没有name属性,可以直接g.name设值。只对这个请求设值

from flask import Flask, g

app = Flask(__name__)


@app.before_request
def be():
    g.name = 'jeff'

    print('befor1')


@app.route('/')
def index():
    print(g.name)
    return 'ok'


@app.route('/login')
def login():
    print(g.name)
    return 'ok'


if __name__ == '__main__':
    app.run()

g对象和session的区别

session对象是可以跨request的,只要session还未失效,不同的request的请求会获取到同一个session,但是g对象不是,g对象不需要管过期时间,请求一次就g对象就改变了一次,或者重新赋值了一次

三、flask-session

作用:将默认保存的签名cookie中的值 保存到 redis/memcached/file/Mongodb/SQLAlchemy

安装:pip3 install flask-session

使用1:

from flask import Flask,session
from flask_session import RedisSessionInterface
import redis
app = Flask(__name__)
conn=redis.Redis(host='127.0.0.1',port=6379)
#use_signer是否对key签名
app.session_interface=RedisSessionInterface(conn,key_prefix='lqz')
@app.route('/')
def hello_world():
    session['name']='lqz'
    return 'Hello World!'

if __name__ == '__main__':
    app.run()

使用2:

from redis import Redis
from flask.ext.session import Session
app.config['SESSION_TYPE'] = 'redis'
app.config['SESSION_REDIS'] = Redis(host='192.168.0.94',port='6379')
Session(app)

问题:设置cookie时,如何设定关闭浏览器则cookie失效。

response.set_cookie('k','v',exipre=None)#这样设置即可
#在session中设置
app.session_interface=RedisSessionInterface(conn,key_prefix='lqz',permanent=False)
#一般不用,我们一般都设置超时时间,多长时间后失效

问题:cookie默认超时时间是多少?如何设置超时时间

#源码expires = self.get_expiration_time(app, session)
'PERMANENT_SESSION_LIFETIME':           timedelta(days=31),#这个配置文件控制

四、数据库连接池

pymsql链接数据库

import pymysql

conn = pymysql.connect(host='127.0.0.1', port=3306, user='root', passwd='123456', db='s8day127db')
cursor = conn.cursor(cursor=pymysql.cursors.DictCursor)
# cursor.execute("select id,name from users where name=%s and pwd=%s",['lqz','123',])
cursor.execute("select id,name from users where name=%(user)s and pwd=%(pwd)s",{'user':'lqz','pwd':'123'})
obj = cursor.fetchone()
conn.commit()
cursor.close()
conn.close()

print(obj)

数据库连接池版

setting.py

from datetime import timedelta
from redis import Redis
import pymysql
from DBUtils.PooledDB import PooledDB, SharedDBConnection

class Config(object):
    DEBUG = True
    SECRET_KEY = "umsuldfsdflskjdf"
    PERMANENT_SESSION_LIFETIME = timedelta(minutes=20)
    SESSION_REFRESH_EACH_REQUEST= True
    SESSION_TYPE = "redis"
    PYMYSQL_POOL = PooledDB(
        creator=pymysql,  # 使用链接数据库的模块
        maxconnections=6,  # 连接池允许的最大连接数,0和None表示不限制连接数
        mincached=2,  # 初始化时,链接池中至少创建的空闲的链接,0表示不创建
        maxcached=5,  # 链接池中最多闲置的链接,0和None不限制
        maxshared=3,
        # 链接池中最多共享的链接数量,0和None表示全部共享。PS: 无用,因为pymysql和MySQLdb等模块的 threadsafety都为1,所有值无论设置为多少,_maxcached永远为0,所以永远是所有链接都共享。
        blocking=True,  # 连接池中如果没有可用连接后,是否阻塞等待。True,等待;False,不等待然后报错
        maxusage=None,  # 一个链接最多被重复使用的次数,None表示无限制
        setsession=[],  # 开始会话前执行的命令列表。如:["set datestyle to ...", "set time zone ..."]
        ping=0,
        # ping MySQL服务端,检查是否服务可用。# 如:0 = None = never, 1 = default = whenever it is requested, 2 = when a cursor is created, 4 = when a query is executed, 7 = always
        host='127.0.0.1',
        port=3306,
        user='root',
        password='123456',
        database='s8day127db',
        charset='utf8'
    )

class ProductionConfig(Config):
    SESSION_REDIS = Redis(host='192.168.0.94', port='6379')



class DevelopmentConfig(Config):
    SESSION_REDIS = Redis(host='127.0.0.1', port='6379')


class TestingConfig(Config):
    pass

utils/sql.py

import pymysql
from settings import Config
class SQLHelper(object):

    @staticmethod
    def open(cursor):
        POOL = Config.PYMYSQL_POOL
        conn = POOL.connection()
        cursor = conn.cursor(cursor=cursor)
        return conn,cursor

    @staticmethod
    def close(conn,cursor):
        conn.commit()
        cursor.close()
        conn.close()

    @classmethod
    def fetch_one(cls,sql,args,cursor =pymysql.cursors.DictCursor):
        conn,cursor = cls.open(cursor)
        cursor.execute(sql, args)
        obj = cursor.fetchone()
        cls.close(conn,cursor)
        return obj

    @classmethod
    def fetch_all(cls,sql, args,cursor =pymysql.cursors.DictCursor):
        conn, cursor = cls.open(cursor)
        cursor.execute(sql, args)
        obj = cursor.fetchall()
        cls.close(conn, cursor)
        return obj

五、信号

Flask框架中的信号基于blinker,其主要就是让开发者可是在flask请求过程中定制一些用户行为

安装: pip3 install blinker

内置信号:

request_started = _signals.signal('request-started')                # 请求到来前执行
request_finished = _signals.signal('request-finished')              # 请求结束后执行
 
before_render_template = _signals.signal('before-render-template')  # 模板渲染前执行
template_rendered = _signals.signal('template-rendered')            # 模板渲染后执行
 
got_request_exception = _signals.signal('got-request-exception')    # 请求执行出现异常时执行
 
request_tearing_down = _signals.signal('request-tearing-down')      # 请求执行完毕后自动执行(无论成功与否)
appcontext_tearing_down = _signals.signal('appcontext-tearing-down')# 应用上下文执行完毕后自动执行(无论成功与否)
 
appcontext_pushed = _signals.signal('appcontext-pushed')            # 应用上下文push时执行
appcontext_popped = _signals.signal('appcontext-popped')            # 应用上下文pop时执行
message_flashed = _signals.signal('message-flashed')                # 调用flask在其中添加数据时,自动触发

使用信号:

from flask import Flask,signals,render_template

app = Flask(__name__)

# 往信号中注册函数
def func(*args,**kwargs):
    print('触发型号',args,kwargs)
signals.request_started.connect(func)

# 触发信号: signals.request_started.send()
@app.before_first_request
def before_first1(*args,**kwargs):
    pass
@app.before_first_request
def before_first2(*args,**kwargs):
    pass

@app.before_request
def before_first3(*args,**kwargs):
    pass

@app.route('/',methods=['GET',"POST"])
def index():
    print('视图')
    return render_template('index.html')


if __name__ == '__main__':
    app.wsgi_app
    app.run()

一个流程中的信号触发点(了解)

a. before_first_request
b. 触发 request_started 信号
c. before_request
d. 模板渲染
    渲染前的信号 before_render_template.send(app, template=template, context=context)
        rv = template.render(context) # 模板渲染
    渲染后的信号 template_rendered.send(app, template=template, context=context)
e. after_request
f. session.save_session()
g. 触发 request_finished信号        
    如果上述过程出错:
        触发错误处理信号 got_request_exception.send(self, exception=e)
            
h. 触发信号 request_tearing_down

自定义信号(了解):

from flask import Flask, current_app, flash, render_template
from flask.signals import _signals
app = Flask(import_name=__name__)

# 自定义信号
xxxxx = _signals.signal('xxxxx')
 
def func(sender, *args, **kwargs):
    print(sender)
# 自定义信号中注册函数
xxxxx.connect(func)
@app.route("/x")
def index():
    # 触发信号
    xxxxx.send('123123', k1='v1')
    return 'Index' 
 
if __name__ == '__main__':
    app.run()

六、命令flask-script

用于实现类似于django中 python 3 manage.py runserver ...类似的命令

安装:pip3 install flask-script

使用

from flask_script import Manager
app = Flask(__name__)
manager=Manager(app)
...
if __name__ == '__main__':
    manager.run()
#以后在执行,直接:python3 manage.py runserver
#python3 manage.py runserver --help

自定制命令

@manager.command
def custom(arg):
    """
    自定义命令
    python manage.py custom 123
    :param arg:
    :return:
    """
    print(arg)


@manager.option('-n', '--name', dest='name')
#@manager.option('-u', '--url', dest='url')
def cmd(name, url):
    """
    自定义命令(-n也可以写成--name)
    执行: python manage.py  cmd -n lqz -u http://www.oldboyedu.com
    执行: python manage.py  cmd --name lqz --url http://www.oldboyedu.com
    :param name:
    :param url:
    :return:
    """
    print(name, url)
#有什么用?
#把excel的数据导入数据库,定制个命令,去执行

七、flask-admin

安装

pip3 install flask_admin

简单使用

from flask import Flask
from flask_admin import Admin

app = Flask(__name__)
#将app注册到adminzhong 
admin = Admin(app)

if __name__=="mian":

    app.run()
#访问
#127.0.0.1:5000/admin端口,会得到一个空白的页面

将表模型注册到admin中

#在将表注册之前应该对app进行配置
SQLALCHEMY_DATABASE_URI = "mysql+pymysql://root:@127.0.0.1:3307/py9api?charset=utf8mb4"
SQLALCHEMY_POOL_SIZE = 5
SQLALCHEMY_POOL_TIMEOUT = 30
SQLALCHEMY_POOL_RECYCLE = -1

#导入models文件的中的表模型
from flask_admin.contrib.sqla import ModelView
from api.models import Stock,Product,Images,Category,Wxuser,Banner

admin.add_view(ModelView(Stock, db.session))
admin.add_view(ModelView(Product, db.session))

admin.add_view(ModelView(Category, db.session))

如果有个字段是图片指端

#配置上传文件的路径
#导入from flask_admin.contrib.fileadmin import FileAdmin
from flask_admin.contrib.fileadmin import FileAdmin,form
file_path = op.join(op.dirname(__file__), 'static')
admin = Admin(app)
admin.add_view(FileAdmin(file_path, '/static/', name='文件'))

#如果有个字段要是上传文件重写该方法的modleView类,假设imgae_url是文件图片的字段
class ImagesView(ModelView):

    form_extra_fields = {
        'image_url': form.ImageUploadField('Image',
                                          base_path=file_path,
                                          relative_path='uploadFile/'
                                          )
    }

admin.add_view(ImagesView(Images, db.session))

以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

即将到来的场景时代

即将到来的场景时代

罗伯特•斯考伯、谢尔•伊斯雷尔 / 赵乾坤 周宝曜 / 北京联合出版公司 / 2014-5-1 / 42

科技大神、全球科技创新领域最知名记者 罗伯特·斯考伯:“技术越了解你,就会为你提供越多好处!” 互联网的炒作点一个一个不停出现,大数据、3D打印、O2O等,无不宣扬要颠覆商业模式。但是,互联网进入移动时代,接下来到底会发生什么?移动互联网时代真正带来哪些改变?这具体会怎样影响我们每一个人的生活?商业真的会被颠覆?目前为止没有一本书给出答案。 《即将到来的场景时代》不是就一个炒作点大加谈......一起来看看 《即将到来的场景时代》 这本书的介绍吧!

URL 编码/解码
URL 编码/解码

URL 编码/解码

XML 在线格式化
XML 在线格式化

在线 XML 格式化压缩工具

HEX HSV 转换工具
HEX HSV 转换工具

HEX HSV 互换工具