内容简介:由于它们是相当通用的,因此Flask-Login提供了一个å«做UserMixin的mixin类来将它们归纳其中
整体目录
os_basic ├── app │ ├── __init__.py │ ├── forms.py │ ├── routes.py │ └── templates │ ├── base.html │ ├── index.html │ └── login.html ├── config.py └── index.py
app/templates/index.html
{% extends "base.html" %}
{% block content %}
<h1>hello, {{ user.username }}</h1>
{% for post in posts %}
<div><p>{{ post.author.username }} says: <b>{{ post.body }}</b></p></div>
{% endfor %}
{% endblock %}
app/index.py
from app import app
if __name__ == '__main__':
app.run(
host='0.0.0.0',
port=8080,
debug=True
)
表单 flask form
pip install flask-wtf
在工程根目录新建 config.py
import os
class Config(object):
SECRET_KEY = os.environ.get('SECRET_KEY') or 'my-secret'
在子项目编辑 app/__init__.py
from flask import Flask from config import Config app = Flask(__name__) app.config.from_object(Config) from app import routes
声明一个 app/forms.py
from flask_wtf import FlaskForm
from wtforms import StringField, PasswordField, BooleanField, SubmitField
from wtforms.validators import DataRequired
class LoginForm(FlaskForm):
username = StringField('Username', validators=[DataRequired()])
password = PasswordField('Password', validators=[DataRequired()])
remember_me = BooleanField('Remember Me')
submit = SubmitField('Sign In')
加载表单 app/templates/base.html
<!DOCTYPE html>
<html lang="zh">
<head>
<meta charset="UTF-8">
{% if title %}
<title>{{ title }}</title>
{% else %}
<title>default - title</title>
{% endif %}
</head>
<body>
<div>
Index: <a href="{{ url_for('index') }}">Index</a><br />
Login: <a href="{{ url_for('login') }}">Login</a>
</div>
<hr />
{% with messages = get_flashed_messages() %}
{% if messages %}
<ul>
{% for message in messages %}
<li>{{ message }}</li>
{% endfor %}
</ul>
{% endif %}
{% endwith %}
{% block content %}{% endblock %}
</body>
</html>
加载表单 app/templates/login.html
{% extends "base.html" %}
{% block content %}
<h1>Sign In</h1>
<form action="" method="post" novalidate>
{{ form.hidden_tag() }}
<p>
{{ form.username.label }}<br>
{{ form.username(size=32) }}<br>
{% for error in form.username.errors %}
<span style="color: red;">[{{ error }}]</span>
{% endfor %}
</p>
<p>
{{ form.password.label }}<br>
{{ form.password(size=32) }}<br>
{% for error in form.password.errors %}
<span style="color: red;">[{{ error }}]</span>
{% endfor %}
</p>
<p>{{ form.remember_me() }} {{ form.remember_me.label }}</p>
<p>{{ form.submit() }}</p>
</form>
{% endblock %}
from flask import render_template, flash, redirect, url_for
from app import app
from app.forms import LoginForm
@app.route('/login', methods=['GET', 'POST'])
def login():
form = LoginForm()
if form.validate_on_submit():
flash('Login requested for user {}, remember_me={}'.format(
form.username.data, form.remember_me.data))
return redirect(url_for('index'))
return render_template('login.html', title='Sign In', form=form)
@app.route('/')
@app.route('/index')
def index():
user = {'username': 'lhh'}
posts = [
{
'author': {'username': 'user01'},
'body': 'user01 - body'
},
{
'author': {'username': 'user02'},
'body': 'user02 - body'
}
]
return render_template('index.html', title='Home', user=user, posts=posts)
pip install flask-sqlalchemy flask-migrate
config.py
import os
basedir = os.path.abspath(os.path.dirname(__file__)) # 增加项
class Config(object):
SECRET_KEY = os.environ.get('SECRET_KEY') or 'my-secret'
SQLALCHEMY_DATABASE_URI = os.environ.get('DATABASE_URL') or \ # 增加项
'sqlite:///' + os.path.join(basedir, 'app.db')
SQLALCHEMY_TRACK_MODIFICATIONS = False # 增加项
app/__init__.py
from flask import Flask from config import Config from flask_sqlalchemy import SQLAlchemy # 增加项 from flask_migrate import Migrate # 增加项 app = Flask(__name__) app.config.from_object(Config) db = SQLAlchemy(app) # 增加项 migrate = Migrate(app, db) # 增加项 from app import routes, models
app/models.py
from datetime import datetime
from app import db
class User(db.Model):
id = db.Column(db.Integer, primary_key=True)
username = db.Column(db.String(64), index=True, unique=True)
email = db.Column(db.String(120), index=True, unique=True)
password_hash = db.Column(db.String(128))
posts = db.relationship('Post', backref='author', lazy='dynamic')
def __repr__(self):
return '<User {}>'.format(self.username)
class Post(db.Model):
id = db.Column(db.Integer, primary_key=True)
body = db.Column(db.String(140))
timestamp = db.Column(db.DateTime, index=True, default=datetime.utcnow)
user_id = db.Column(db.Integer, db.ForeignKey('user.id'))
def __repr__(self):
return '<Post {}>'.format(self.body)
执行数据库命令
# 创建数据库迁移存储库, 生成migrations目录 # FLASK_APP=index.py flask db init # 第一次数据库迁移, 生成迁移脚本 flask db migrate -m "users table" # 回滚操作(开发时候应用) flask db downgrade # 把应用更改到数据库 flask db upgrade
python 交互操作数据库
# 导入数据库实例 >>> from app import db # 导入模型 >>> from app.models import User, Post # 创建一个新用户 >>> u = User(username='user01', email='user01@qq.com') >>> db.session.add(u) >>> db.session.commit() # 查询所有用户 >>> users = User.query.all() >>> users [<User user01>] >>> for user in users: ... print(user.id, user.username) ... 1 user01 >>> u = User.query.get(1) >>> u <User user01> >>> p = Post(body='1-post', author=u) >>> db.session.add(p) >>> db.session.commit() # 查所有 >>> posts = u.posts.all() >>> posts [<Post 1-post>] >>> for post in posts: ... print(post.id, post.author.username, post.body) ... 1 user01 1-post # 排序 >>> User.query.order_by(User.username.desc()).all() [<User user01>] # 删所有 >>> users = User.query.all() >>> for u in users: ... db.session.delete(u) ... >>> posts = Post.query.all() >>> for post in posts: ... db.session.delete(post) ... >>> db.session.commit()
shell 上下文 index.py
# export FLASK_APP=index.py
from app import app, db # 增加项
from app.models import User, Post # 增加项
@app.shell_context_processor # 增加项
def make_shell_context(): # 增加项
return {'db': db, 'User': User, 'Post': Post} # 增加项
if __name__ == '__main__':
app.run(
host='0.0.0.0',
port=8080,
debug=True
)
# flask shell Python 3.7.0 (default, Oct 2 2018, 09:18:58) [Clang 10.0.0 (clang-1000.11.45.2)] on darwin App: app [production] Instance: /Users/lhh/vdir/initial/py/os_basic/instance >>> db <SQLAlchemy engine=sqlite:////Users/lhh/vdir/initial/py/os_basic/app.db> >>> User <class 'app.models.User'> >>> Post <class 'app.models.Post'>
用户登录
密码哈希
>>> from werkzeug.security import generate_password_hash
>>> hash = generate_password_hash('string')
>>> hash
'pbkdf2:sha256:50000$OB1szOsd$634688cdbb02a04643127ed02bf20049ba6c2a20beb632490cfd065ba8a113fb'
>>> from werkzeug.security import check_password_hash
>>> check_password_hash(hash, 'string') # 验证
True
>>> check_password_hash(hash, 'string1')
False
app/models.py
from werkzeug.security import generate_password_hash, check_password_hash
# ...
class User(db.Model):
# ...
def set_password(self, password):
self.password_hash = generate_password_hash(password)
def check_password(self, password):
return check_password_hash(self.password_hash, password)
flask-login
pip install flask-login
app/__init__.py
from flask import Flask from config import Config from flask_sqlalchemy import SQLAlchemy from flask_migrate import Migrate from flask_login import LoginManager # 增加项 app = Flask(__name__) app.config.from_object(Config) db = SQLAlchemy(app) migrate = Migrate(app, db) login = LoginManager(app) # 增加项 from app import routes, models
flask-login 准备用户模型
- is_authenticated: 一个用来表示用户是否通过登录认证的属性,用True和False表示。
- is_active: 如果用户账户是活跃的,那么这个属性是True,否则就是False(译者注:活跃用户的定义是该用户的登录状态是否通过用户名密码登录,通过“记住我”功能保持登录状态的用户是非活跃的)。
- is_anonymous: 常规用户的该属性是False,对特定的匿名用户是True。
- get_id(): 返回用户的唯一id的方法,返回值类型是字符串(Python 2下返回unicode字符串).
由于它们是相当通用的,因此Flask-Login提供了一个å«做UserMixin的mixin类来将它们归纳其中
app/models.py
# ...
from flask_login import UserMixin
class User(UserMixin, db.Model):
# ...
用户加载函数 app/models.py
from app import login
# 使用Flask-Login的@login.user_loader装饰器来为用户加载功能注册函数。 Flask-Login将字符串类型的参数id传入用户加载函数,因此使用数字ID的数据库需要如上所示地将字符串转换为整数。
@login.user_loader
def load_user(id):
return User.query.get(int(id))
用户登入 app/routes.py
from flask_login import current_user, login_user # 增加项
from app.models import User # 增加项
@app.route('/login', methods=['GET', 'POST'])
def login():
if current_user.is_authenticated: # 如有有用户登录, 导航到 index
return redirect(url_for('index'))
form = LoginForm()
if form.validate_on_submit():
user = User.query.filter_by(username=form.username.data).first()
if user is None or not user.check_password(form.password.data):
flash('Invalid username or password')
return redirect(url_for('login'))
login_user(user, remember=form.remember_me.data)
# flash('Login requested for user {}, remember_me={}'.format(
# form.username.data, form.remember_me.data))
return redirect(url_for('index'))
return render_template('login.html', title='Sign In', form=form)
用户登出 app/routes.py
from flask_login import logout_user
@app.route('/logout')
def logout():
logout_user()
return redirect(url_for('index'))
模版 app/templates/base.html
<div>
Index: <a href="{{ url_for('index') }}">Index</a>
{% if current_user. is_anonymous %}
Login: <a href="{{ url_for('login') }}">Login</a>
{% else %}
Logout: <a href="{{ url_for('logout') }}">Logout</a>
{% endif %}
</div>
要求用户登录
app/init.py
login = LoginManager(app) login.login_view = 'login' # 值是登录视图函数(endpoint)名,换句话说该名称可用于url_for()函数的参数并返回对应的URL
app/routes.py
# 使用名为@login_required的装饰器来拒绝匿名用户的访问以保护某个视图函数
from flask_login import login_required
@app.route('/')
@app.route('/index')
@login_required
def index():
# ...
处理next查询字符串参数 app/routes.py
- 如果登录URL中不含next参数,那么将会重定向到本应用的主页。
- 如果登录URL中包含next参数,其值是一个相对路径(换句话说,该URL不含域名信息),那么将会重定向到本应用的这个相对路径。
- 如果登录URL中包含next参数,其值是一个包含域名的完整URL,那么重定向到本应用的主页。
from flask import request
from werkzeug.urls import url_parse
@app.route('/login', methods=['GET', 'POST'])
def login():
if current_user.is_authenticated:
return redirect(url_for('index'))
form = LoginForm()
if form.validate_on_submit():
user = User.query.filter_by(username=form.username.data).first()
if user is None or not user.check_password(form.password.data):
flash('Invalid username or password')
return redirect(url_for('login'))
login_user(user, remember=form.remember_me.data)
next_page = request.args.get('next') # 增加项
if not next_page or url_parse(next_page).netloc != '': # 增加项
next_page = url_for('index') # 增加项
return redirect(next_page) # 增加项
# flash('Login requested for user {}, remember_me={}'.format(
# form.username.data, form.remember_me.data))
# return redirect(url_for('index'))
return render_template('login.html', title='Sign In', form=form)
模版中显示已登录的用户 app/templates/index.html
{% extends "base.html" %}
{% block content %}
<h1>hello, {{ current_user.username }}</h1>
{% for post in posts %}
<div><p>{{ post.author.username }} says: <b>{{ post.body }}</b></p></div>
{% endfor %}
{% endblock %}
在模版函数中删除 user: app/routes.py
@app.route('/')
@app.route('/index')
@login_required
def index():
# ...
return render_template("index.html", title='Home Page', posts=posts)
手动添加一个用户, 相当于注册
# flask shell
>>> u = User(username='user01', email='user01@qq.com')
>>> u.set_password('1234567')
>>> db.session.add(u)
>>> db.session.commit()
用户注册
app/forms.py
from flask_wtf import FlaskForm
from wtforms import StringField, PasswordField, BooleanField, SubmitField
from wtforms.validators import ValidationError, DataRequired, Email, EqualTo
from app.models import User
class RegistrationForm(FlaskForm):
username = StringField('Username', validators=[DataRequired()])
email = StringField('Email', validators=[DataRequired(), Email()])
password = PasswordField('Password', validators=[DataRequired()])
password2 = PasswordField(
'Repeat Password', validators=[DataRequired(), EqualTo('password')])
submit = SubmitField('Register')
def validate_username(self, username):
user = User.query.filter_by(username=username.data).first()
if user is not None:
raise ValidationError('Please use a different username.')
def validate_email(self, email):
user = User.query.filter_by(email=email.data).first()
if user is not None:
raise ValidationError('Please use a different email address.')
app/template/register.html
{% extends "base.html" %}
{% block content %}
<h1>Register</h1>
<form action="" method="post">
{{ form.hidden_tag() }}
<p>
{{ form.username.label }}<br>
{{ form.username(size=32) }}<br>
{% for error in form.username.errors %}
<span style="color: red;">[{{ error }}]</span>
{% endfor %}
</p>
<p>
{{ form.email.label }}<br>
{{ form.email(size=64) }}<br>
{% for error in form.email.errors %}
<span style="color: red;">[{{ error }}]</span>
{% endfor %}
</p>
<p>
{{ form.password.label }}<br>
{{ form.password(size=32) }}<br>
{% for error in form.password.errors %}
<span style="color: red;">[{{ error }}]</span>
{% endfor %}
</p>
<p>
{{ form.password2.label }}<br>
{{ form.password2(size=32) }}<br>
{% for error in form.password2.errors %}
<span style="color: red;">[{{ error }}]</span>
{% endfor %}
</p>
<p>{{ form.submit() }}</p>
</form>
{% endblock %}
app/templates/login.html
{% extends "base.html" %}
{% block content %}
<h1>Sign In</h1>
<form action="" method="post" novalidate>
{{ form.hidden_tag() }}
<p>
{{ form.username.label }}<br>
{{ form.username(size=32) }}<br>
{% for error in form.username.errors %}
<span style="color: red;">[{{ error }}]</span>
{% endfor %}
</p>
<p>
{{ form.password.label }}<br>
{{ form.password(size=32) }}<br>
{% for error in form.password.errors %}
<span style="color: red;">[{{ error }}]</span>
{% endfor %}
</p>
<p>{{ form.remember_me() }} {{ form.remember_me.label }}</p>
<p>{{ form.submit() }}</p>
</form>
<p>New User? <a href="{{ url_for('register') }}">Click to Register!</a></p> # 增加注册
{% endblock %}
app/template/routes.py
from app import db
from app.forms import RegistrationForm
# ...
@app.route('/register', methods=['GET', 'POST'])
def register():
if current_user.is_authenticated:
return redirect(url_for('index'))
form = RegistrationForm()
if form.validate_on_submit():
user = User(username=form.username.data, email=form.email.data)
user.set_password(form.password.data)
db.session.add(user)
db.session.commit()
flash('Congratulations, you are now a registered user!')
return redirect(url_for('login'))
return render_template('register.html', title='Register', form=form)
个人主页和头像
个人主页
app/routes.py
@app.route('/user/<username>')
@login_required
def user(username):
user = User.query.filter_by(username=username).first_or_404()
posts = [
{'author': user, 'body': 'Test post #1'},
{'author': user, 'body': 'Test post #2'}
]
return render_template('user.html', user=user, posts=posts)
app/templates/user.html
{% extends "base.html" %}
{% block content %}
<h1>User: {{ user.username }}</h1>
<hr />
{% for post in posts %}
<p>
{{ post.author.username }} says: <b>{{ post.body }}</b>
</p>
{% endfor %}
{% endblock %}
app/templates/base.html
<div>
Index: <a href="{{ url_for('index') }}">Index</a>
{% if current_user.is_anonymous %}
Login: <a href="{{ url_for('login') }}">Login</a>
{% else %}
Profile: <a href="{{ url_for('user', username=current_user.username ) }}">Profile</a>
Logout: <a href="{{ url_for('logout') }}">Logout</a>
{% endif %}
</div>
Gravatar头像
>>> from hashlib import md5 >>> 'https://www.gravatar.com/avatar/' + md5(b'sma.flirt@gmail.com').hexdigest() 'https://www.gravatar.com/avatar/98d1c884548d73ac4f55cb23bd4af8d8'
- 头像链接 80X80 https://www.gravatar.com/avatar/98d1c884548d73ac4f55cb23bd4af8d8
- 头像链接 128X128 https://www.gravatar.com/avatar/98d1c884548d73ac4f55cb23bd4af8d8?s=128
app/modules.py
from hashlib import md5
# ...
class User(UserMixin, db.Model):
# ...
def avatar(self, size):
digest = md5(self.email.lower().encode('utf-8')).hexdigest()
# 对于没有注册头像的用户,将生成“identicon”类的随机图片
return 'https://www.gravatar.com/avatar/{}?d=identicon&s={}'.format(
digest, size)
app/templates/user.html
{% extends "base.html" %}
{% block content %}
<table>
<tr valign="top">
<td><img src="{{ user.avatar(128) }}"></td>
<td><h1>User: {{ user.username }}</h1></td>
</tr>
</table>
<hr>
{% for post in posts %}
<table>
<tr valign="top">
<td><img src="{{ post.author.avatar(36) }}"></td>
<td>{{ post.author.username }} says:<br>{{ post.body }}</td>
</tr>
</table>
{% endfor %}
{% endblock %}
更多个人资料
app/models.py
class User(UserMixin, db.Model):
# ...
about_me = db.Column(db.String(140))
last_seen = db.Column(db.DateTime, default=datetime.utcnow)
执行数据库更改
flask db migrate -m "new fields in user table" flask db upgrade
app/templates/user.html
{% extends "base.html" %}
{% block content %}
<table>
<tr valign="top">
<td><img src="{{ user.avatar(128) }}"></td>
<td>
<h1>User: {{ user.username }}</h1>
{% if user.about_me %}<p>{{ user.about_me }}</p>{% endif %}
{% if user.last_seen %}<p>Last seen on: {{ user.last_seen }}</p>{% endif %}
</td>
</tr>
</table>
...
{% endblock %}
app/routes.py
from datetime import datetime
@app.before_request
def before_request():
if current_user.is_authenticated:
current_user.last_seen = datetime.utcnow()
db.session.commit()
个人资料编辑
app/forms.py
from wtforms import StringField, TextAreaField, SubmitField
from wtforms.validators import DataRequired, Length
# ...
class EditProfileForm(FlaskForm):
username = StringField('Username', validators=[DataRequired()])
about_me = TextAreaField('About me', validators=[Length(min=0, max=140)])
submit = SubmitField('Submit')
app/templates/edit_profile.html
{% extends "base.html" %}
{% block content %}
<h1>Edit Profile</h1>
<form action="" method="post">
{{ form.hidden_tag() }}
<p>
{{ form.username.label }}<br>
{{ form.username(size=32) }}<br>
{% for error in form.username.errors %}
<span style="color: red;">[{{ error }}]</span>
{% endfor %}
</p>
<p>
{{ form.about_me.label }}<br>
{{ form.about_me(cols=50, rows=4) }}<br>
{% for error in form.about_me.errors %}
<span style="color: red;">[{{ error }}]</span>
{% endfor %}
</p>
<p>{{ form.submit() }}</p>
</form>
{% endblock %}
app/routes.py
from app.forms import EditProfileForm
@app.route('/edit_profile', methods=['GET', 'POST'])
@login_required
def edit_profile():
form = EditProfileForm()
if form.validate_on_submit():
current_user.username = form.username.data
current_user.about_me = form.about_me.data
db.session.commit()
flash('Your changes have been saved.')
return redirect(url_for('edit_profile'))
elif request.method == 'GET':
form.username.data = current_user.username
form.about_me.data = current_user.about_me
return render_template('edit_profile.html', title='Edit Profile',
form=form)
错误处理
app/errors.py
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# vim:fenc=utf-8
# Create Time: 2018-11-15 10:33:24
# Last Modified: 2018-11-15 10:33:45
from flask import render_template
from app import app, db
@app.errorhandler(404)
def not_found_error(error):
return render_template('404.html'), 404
@app.errorhandler(500)
def internal_error(error):
db.session.rollback()
return render_template('500.html'), 500
app/templates/404.html
{% extends "base.html" %}
{% block content %}
<h1>File Not Found</h1>
<p><a href="{{ url_for('index') }}">Back</a></p>
{% endblock %}
app/templates/500.html
{% extends "base.html" %}
{% block content %}
<h1>An unexpected error has occurred</h1>
<p>The administrator has been notified. Sorry for the inconvenience!</p>
<p><a href="{{ url_for('index') }}">Back</a></p>
{% endblock %}
使用邮件发送错误日志, 日志记录到文件中
config.py
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# vim:fenc=utf-8
# Create Time: 2018-11-14 11:18:47
# Last Modified: 2018-11-15 10:42:24
import os
basedir = os.path.abspath(os.path.dirname(__file__))
class Config(object):
SECRET_KEY = os.environ.get('SECRET_KEY') or 'my-secret'
SQLALCHEMY_DATABASE_URI = os.environ.get('DATABASE_URL') or \
'sqlite:///' + os.path.join(basedir, 'app.db')
SQLALCHEMY_TRACK_MODIFICATIONS = False
MAIL_SERVER = os.environ.get('MAIL_SERVER') or "smtp.email.com"
MAIL_PORT = int(os.environ.get('MAIL_PORT') or 25)
# MAIL_USE_TLS = os.environ.get('MAIL_USE_TLS') is not None
MAIL_USE_TLS = 0
MAIL_USERNAME = os.environ.get('MAIL_USERNAME') or "username@"
MAIL_PASSWORD = os.environ.get('MAIL_PASSWORD') or "password"
ADMINS = ['i@liuhonghe.me']
app/__init__.py
import logging
from logging.handlers import SMTPHandler
from logging.handlers import RotatingFileHandler
import os
# ...
if not app.debug:
if app.config['MAIL_SERVER']:
auth = None
if app.config['MAIL_USERNAME'] or app.config['MAIL_PASSWORD']:
auth = (app.config['MAIL_USERNAME'], app.config['MAIL_PASSWORD'])
secure = None
if app.config['MAIL_USE_TLS']:
secure = ()
mail_handler = SMTPHandler(
mailhost=(app.config['MAIL_SERVER'], app.config['MAIL_PORT']),
fromaddr='no-reply@' + app.config['MAIL_SERVER'],
toaddrs=app.config['ADMINS'], subject='Microblog Failure',
credentials=auth, secure=secure)
mail_handler.setLevel(logging.ERROR)
app.logger.addHandler(mail_handler)
os.makedirs('logs', exist_ok=True)
file_handler = RotatingFileHandler('logs/os_basic.log', maxBytes=10240,
backupCount=10)
file_handler.setFormatter(logging.Formatter(
'%(asctime)s %(levelname)s: %(message)s [in %(pathname)s:%(lineno)d]'))
file_handler.setLevel(logging.INFO)
app.logger.addHandler(file_handler)
app.logger.setLevel(logging.INFO)
app.logger.info('os_basic startup')
from app import routes, models, errors
修复用户名重复的 bug
app/forms.py
class EditProfileForm(FlaskForm):
username = StringField('Username', validators=[DataRequired()])
about_me = TextAreaField('About me', validators=[Length(min=0, max=140)])
submit = SubmitField('Submit')
def __init__(self, original_username, *args, **kwargs):
super(EditProfileForm, self).__init__(*args, **kwargs)
self.original_username = original_username
def validate_username(self, username):
if username.data != self.original_username:
user = User.query.filter_by(username=self.username.data).first()
if user is not None:
raise ValidationError('Please use a different username.')
app/routes.py
@app.route('/edit_profile', methods=['GET', 'POST'])
@login_required
def edit_profile():
form = EditProfileForm(current_user.username)
# ...
邮件支持
pipenv install flask-mail pyjwt
app/__init__.py
# ... from flask_mail import Mail app = Flask(__name__) # ... mail = Mail(app)
app/email.py
from flask_mail import Message
from app import mail
def send_email(subject, sender, recipients, text_body, html_body):
msg = Message(subject, sender=sender, recipients=recipients)
msg.body = text_body
msg.html = html_body
mail.send(msg)
app/templates/login.html
....
<p>New User? <a href="{{ url_for('register') }}">Click to Register!</a></p>
<p>
Forgot Your Password?
<a href="{{ url_for('reset_password_request') }}">Click to Reset It</a>
</p>
{% endblock %}
app/forms.py
class ResetPasswordRequestForm(FlaskForm):
email = StringField('Email', validators=[DataRequired(), Email()])
submit = SubmitField('Request Password Reset')
app/templates/reset_password_request.html
{% extends "base.html" %}
{% block content %}
<h1>Reset Password</h1>
<form action="" method="post">
{{ form.hidden_tag() }}
<p>
{{ form.email.label }}<br>
{{ form.email(size=64) }}<br>
{% for error in form.email.errors %}
<span style="color: red;">[{{ error }}]</span>
{% endfor %}
</p>
<p>{{ form.submit() }}</p>
</form>
{% endblock %}
app/routes.py
from app.forms import ResetPasswordRequestForm
from app.email import send_password_reset_email
@app.route('/reset_password_request', methods=['GET', 'POST'])
def reset_password_request():
if current_user.is_authenticated:
return redirect(url_for('index'))
form = ResetPasswordRequestForm()
if form.validate_on_submit():
user = User.query.filter_by(email=form.email.data).first()
if user:
send_password_reset_email(user)
flash('Check your email for the instructions to reset your password')
return redirect(url_for('login'))
return render_template('reset_password_request.html',
title='Reset Password', form=form)
app/models.py
from time import time
import jwt
from app import app
class User(UserMixin, db.Model):
# ...
def get_reset_password_token(self, expires_in=600):
return jwt.encode(
{'reset_password': self.id, 'exp': time() + expires_in},
app.config['SECRET_KEY'], algorithm='HS256').decode('utf-8')
@staticmethod
def verify_reset_password_token(token):
try:
id = jwt.decode(token, app.config['SECRET_KEY'],
algorithms=['HS256'])['reset_password']
except:
return
return User.query.get(id)
app/email.py
from flask import render_template
from app import app
# ...
def send_password_reset_email(user):
token = user.get_reset_password_token()
send_email('[Microblog] Reset Your Password',
sender=app.config['ADMINS'][0],
recipients=[user.email],
text_body=render_template('email/reset_password.txt',
user=user, token=token),
html_body=render_template('email/reset_password.html',
user=user, token=token))
app/templates/email/reset_password.txt
Dear {{ user.username }},
To reset your password click on the following link:
{{ url_for('reset_password', token=token, _external=True) }}
If you have not requested a password reset simply ignore this message.
Sincerely,
The Microblog Team
app/templates/email/reset_password.html
<p>Dear {{ user.username }},</p>
<p>
To reset your password
<a href="{{ url_for('reset_password', token=token, _external=True) }}">
click here
</a>.
</p>
<p>Alternatively, you can paste the following link in your browser's address bar:</p>
<p>{{ url_for('reset_password', token=token, _external=True) }}</p>
<p>If you have not requested a password reset simply ignore this message.</p>
<p>Sincerely,</p>
<p>The Microblog Team</p>
app/routes.py
from app.forms import ResetPasswordForm
@app.route('/reset_password/<token>', methods=['GET', 'POST'])
def reset_password(token):
if current_user.is_authenticated:
return redirect(url_for('index'))
user = User.verify_reset_password_token(token)
if not user:
return redirect(url_for('index'))
form = ResetPasswordForm()
if form.validate_on_submit():
user.set_password(form.password.data)
db.session.commit()
flash('Your password has been reset.')
return redirect(url_for('login'))
return render_template('reset_password.html', form=form)
app/forms.py
class ResetPasswordForm(FlaskForm):
password = PasswordField('Password', validators=[DataRequired()])
password2 = PasswordField(
'Repeat Password', validators=[DataRequired(), EqualTo('password')])
submit = SubmitField('Request Password Reset')
app/templates/reset_password.html
{% extends "base.html" %}
{% block content %}
<h1>Reset Your Password</h1>
<form action="" method="post">
{{ form.hidden_tag() }}
<p>
{{ form.password.label }}<br>
{{ form.password(size=32) }}<br>
{% for error in form.password.errors %}
<span style="color: red;">[{{ error }}]</span>
{% endfor %}
</p>
<p>
{{ form.password2.label }}<br>
{{ form.password2(size=32) }}<br>
{% for error in form.password2.errors %}
<span style="color: red;">[{{ error }}]</span>
{% endfor %}
</p>
<p>{{ form.submit() }}</p>
</form>
{% endblock %}
异步电子邮件 app/email.py
from threading import Thread
def send_async_email(app, msg):
with app.app_context():
mail.send(msg)
def send_email(subject, sender, recipients, text_body, html_body):
msg = Message(subject, sender=sender, recipients=recipients)
msg.body = text_body
msg.html = html_body
# mail.send(msg)
Thread(target=send_async_email, args=(app, msg)).start()
以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网
猜你喜欢:- 【每日笔记】【Go学习笔记】2019-01-04 Codis笔记
- 【每日笔记】【Go学习笔记】2019-01-02 Codis笔记
- 【每日笔记】【Go学习笔记】2019-01-07 Codis笔记
- Golang学习笔记-调度器学习
- Vue学习笔记(二)------axios学习
- 算法/NLP/深度学习/机器学习面试笔记
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
Machine Learning in Action
Peter Harrington / Manning Publications / 2012-4-19 / GBP 29.99
It's been said that data is the new "dirt"—the raw material from which and on which you build the structures of the modern world. And like dirt, data can seem like a limitless, undifferentiated mass. ......一起来看看 《Machine Learning in Action》 这本书的介绍吧!