内容简介:由于它们是相当通用的,因此Flask-Login提供了一个å«做UserMixin的mixin类来将它们归纳其中
整体目录
os_basic ├── app │ ├── __init__.py │ ├── forms.py │ ├── routes.py │ └── templates │ ├── base.html │ ├── index.html │ └── login.html ├── config.py └── index.py
app/templates/index.html
{% extends "base.html" %}
{% block content %}
<h1>hello, {{ user.username }}</h1>
{% for post in posts %}
<div><p>{{ post.author.username }} says: <b>{{ post.body }}</b></p></div>
{% endfor %}
{% endblock %}
app/index.py
from app import app
if __name__ == '__main__':
app.run(
host='0.0.0.0',
port=8080,
debug=True
)
表单 flask form
pip install flask-wtf
在工程根目录新建 config.py
import os
class Config(object):
SECRET_KEY = os.environ.get('SECRET_KEY') or 'my-secret'
在子项目编辑 app/__init__.py
from flask import Flask from config import Config app = Flask(__name__) app.config.from_object(Config) from app import routes
声明一个 app/forms.py
from flask_wtf import FlaskForm
from wtforms import StringField, PasswordField, BooleanField, SubmitField
from wtforms.validators import DataRequired
class LoginForm(FlaskForm):
username = StringField('Username', validators=[DataRequired()])
password = PasswordField('Password', validators=[DataRequired()])
remember_me = BooleanField('Remember Me')
submit = SubmitField('Sign In')
加载表单 app/templates/base.html
<!DOCTYPE html>
<html lang="zh">
<head>
<meta charset="UTF-8">
{% if title %}
<title>{{ title }}</title>
{% else %}
<title>default - title</title>
{% endif %}
</head>
<body>
<div>
Index: <a href="{{ url_for('index') }}">Index</a><br />
Login: <a href="{{ url_for('login') }}">Login</a>
</div>
<hr />
{% with messages = get_flashed_messages() %}
{% if messages %}
<ul>
{% for message in messages %}
<li>{{ message }}</li>
{% endfor %}
</ul>
{% endif %}
{% endwith %}
{% block content %}{% endblock %}
</body>
</html>
加载表单 app/templates/login.html
{% extends "base.html" %}
{% block content %}
<h1>Sign In</h1>
<form action="" method="post" novalidate>
{{ form.hidden_tag() }}
<p>
{{ form.username.label }}<br>
{{ form.username(size=32) }}<br>
{% for error in form.username.errors %}
<span style="color: red;">[{{ error }}]</span>
{% endfor %}
</p>
<p>
{{ form.password.label }}<br>
{{ form.password(size=32) }}<br>
{% for error in form.password.errors %}
<span style="color: red;">[{{ error }}]</span>
{% endfor %}
</p>
<p>{{ form.remember_me() }} {{ form.remember_me.label }}</p>
<p>{{ form.submit() }}</p>
</form>
{% endblock %}
from flask import render_template, flash, redirect, url_for
from app import app
from app.forms import LoginForm
@app.route('/login', methods=['GET', 'POST'])
def login():
form = LoginForm()
if form.validate_on_submit():
flash('Login requested for user {}, remember_me={}'.format(
form.username.data, form.remember_me.data))
return redirect(url_for('index'))
return render_template('login.html', title='Sign In', form=form)
@app.route('/')
@app.route('/index')
def index():
user = {'username': 'lhh'}
posts = [
{
'author': {'username': 'user01'},
'body': 'user01 - body'
},
{
'author': {'username': 'user02'},
'body': 'user02 - body'
}
]
return render_template('index.html', title='Home', user=user, posts=posts)
pip install flask-sqlalchemy flask-migrate
config.py
import os
basedir = os.path.abspath(os.path.dirname(__file__)) # 增加项
class Config(object):
SECRET_KEY = os.environ.get('SECRET_KEY') or 'my-secret'
SQLALCHEMY_DATABASE_URI = os.environ.get('DATABASE_URL') or \ # 增加项
'sqlite:///' + os.path.join(basedir, 'app.db')
SQLALCHEMY_TRACK_MODIFICATIONS = False # 增加项
app/__init__.py
from flask import Flask from config import Config from flask_sqlalchemy import SQLAlchemy # 增加项 from flask_migrate import Migrate # 增加项 app = Flask(__name__) app.config.from_object(Config) db = SQLAlchemy(app) # 增加项 migrate = Migrate(app, db) # 增加项 from app import routes, models
app/models.py
from datetime import datetime
from app import db
class User(db.Model):
id = db.Column(db.Integer, primary_key=True)
username = db.Column(db.String(64), index=True, unique=True)
email = db.Column(db.String(120), index=True, unique=True)
password_hash = db.Column(db.String(128))
posts = db.relationship('Post', backref='author', lazy='dynamic')
def __repr__(self):
return '<User {}>'.format(self.username)
class Post(db.Model):
id = db.Column(db.Integer, primary_key=True)
body = db.Column(db.String(140))
timestamp = db.Column(db.DateTime, index=True, default=datetime.utcnow)
user_id = db.Column(db.Integer, db.ForeignKey('user.id'))
def __repr__(self):
return '<Post {}>'.format(self.body)
执行数据库命令
# 创建数据库迁移存储库, 生成migrations目录 # FLASK_APP=index.py flask db init # 第一次数据库迁移, 生成迁移脚本 flask db migrate -m "users table" # 回滚操作(开发时候应用) flask db downgrade # 把应用更改到数据库 flask db upgrade
python 交互操作数据库
# 导入数据库实例 >>> from app import db # 导入模型 >>> from app.models import User, Post # 创建一个新用户 >>> u = User(username='user01', email='user01@qq.com') >>> db.session.add(u) >>> db.session.commit() # 查询所有用户 >>> users = User.query.all() >>> users [<User user01>] >>> for user in users: ... print(user.id, user.username) ... 1 user01 >>> u = User.query.get(1) >>> u <User user01> >>> p = Post(body='1-post', author=u) >>> db.session.add(p) >>> db.session.commit() # 查所有 >>> posts = u.posts.all() >>> posts [<Post 1-post>] >>> for post in posts: ... print(post.id, post.author.username, post.body) ... 1 user01 1-post # 排序 >>> User.query.order_by(User.username.desc()).all() [<User user01>] # 删所有 >>> users = User.query.all() >>> for u in users: ... db.session.delete(u) ... >>> posts = Post.query.all() >>> for post in posts: ... db.session.delete(post) ... >>> db.session.commit()
shell 上下文 index.py
# export FLASK_APP=index.py
from app import app, db # 增加项
from app.models import User, Post # 增加项
@app.shell_context_processor # 增加项
def make_shell_context(): # 增加项
return {'db': db, 'User': User, 'Post': Post} # 增加项
if __name__ == '__main__':
app.run(
host='0.0.0.0',
port=8080,
debug=True
)
# flask shell Python 3.7.0 (default, Oct 2 2018, 09:18:58) [Clang 10.0.0 (clang-1000.11.45.2)] on darwin App: app [production] Instance: /Users/lhh/vdir/initial/py/os_basic/instance >>> db <SQLAlchemy engine=sqlite:////Users/lhh/vdir/initial/py/os_basic/app.db> >>> User <class 'app.models.User'> >>> Post <class 'app.models.Post'>
用户登录
密码哈希
>>> from werkzeug.security import generate_password_hash
>>> hash = generate_password_hash('string')
>>> hash
'pbkdf2:sha256:50000$OB1szOsd$634688cdbb02a04643127ed02bf20049ba6c2a20beb632490cfd065ba8a113fb'
>>> from werkzeug.security import check_password_hash
>>> check_password_hash(hash, 'string') # 验证
True
>>> check_password_hash(hash, 'string1')
False
app/models.py
from werkzeug.security import generate_password_hash, check_password_hash
# ...
class User(db.Model):
# ...
def set_password(self, password):
self.password_hash = generate_password_hash(password)
def check_password(self, password):
return check_password_hash(self.password_hash, password)
flask-login
pip install flask-login
app/__init__.py
from flask import Flask from config import Config from flask_sqlalchemy import SQLAlchemy from flask_migrate import Migrate from flask_login import LoginManager # 增加项 app = Flask(__name__) app.config.from_object(Config) db = SQLAlchemy(app) migrate = Migrate(app, db) login = LoginManager(app) # 增加项 from app import routes, models
flask-login 准备用户模型
- is_authenticated: 一个用来表示用户是否通过登录认证的属性,用True和False表示。
- is_active: 如果用户账户是活跃的,那么这个属性是True,否则就是False(译者注:活跃用户的定义是该用户的登录状态是否通过用户名密码登录,通过“记住我”功能保持登录状态的用户是非活跃的)。
- is_anonymous: 常规用户的该属性是False,对特定的匿名用户是True。
- get_id(): 返回用户的唯一id的方法,返回值类型是字符串(Python 2下返回unicode字符串).
由于它们是相当通用的,因此Flask-Login提供了一个å«做UserMixin的mixin类来将它们归纳其中
app/models.py
# ...
from flask_login import UserMixin
class User(UserMixin, db.Model):
# ...
用户加载函数 app/models.py
from app import login
# 使用Flask-Login的@login.user_loader装饰器来为用户加载功能注册函数。 Flask-Login将字符串类型的参数id传入用户加载函数,因此使用数字ID的数据库需要如上所示地将字符串转换为整数。
@login.user_loader
def load_user(id):
return User.query.get(int(id))
用户登入 app/routes.py
from flask_login import current_user, login_user # 增加项
from app.models import User # 增加项
@app.route('/login', methods=['GET', 'POST'])
def login():
if current_user.is_authenticated: # 如有有用户登录, 导航到 index
return redirect(url_for('index'))
form = LoginForm()
if form.validate_on_submit():
user = User.query.filter_by(username=form.username.data).first()
if user is None or not user.check_password(form.password.data):
flash('Invalid username or password')
return redirect(url_for('login'))
login_user(user, remember=form.remember_me.data)
# flash('Login requested for user {}, remember_me={}'.format(
# form.username.data, form.remember_me.data))
return redirect(url_for('index'))
return render_template('login.html', title='Sign In', form=form)
用户登出 app/routes.py
from flask_login import logout_user
@app.route('/logout')
def logout():
logout_user()
return redirect(url_for('index'))
模版 app/templates/base.html
<div>
Index: <a href="{{ url_for('index') }}">Index</a>
{% if current_user. is_anonymous %}
Login: <a href="{{ url_for('login') }}">Login</a>
{% else %}
Logout: <a href="{{ url_for('logout') }}">Logout</a>
{% endif %}
</div>
要求用户登录
app/init.py
login = LoginManager(app) login.login_view = 'login' # 值是登录视图函数(endpoint)名,换句话说该名称可用于url_for()函数的参数并返回对应的URL
app/routes.py
# 使用名为@login_required的装饰器来拒绝匿名用户的访问以保护某个视图函数
from flask_login import login_required
@app.route('/')
@app.route('/index')
@login_required
def index():
# ...
处理next查询字符串参数 app/routes.py
- 如果登录URL中不含next参数,那么将会重定向到本应用的主页。
- 如果登录URL中包含next参数,其值是一个相对路径(换句话说,该URL不含域名信息),那么将会重定向到本应用的这个相对路径。
- 如果登录URL中包含next参数,其值是一个包含域名的完整URL,那么重定向到本应用的主页。
from flask import request
from werkzeug.urls import url_parse
@app.route('/login', methods=['GET', 'POST'])
def login():
if current_user.is_authenticated:
return redirect(url_for('index'))
form = LoginForm()
if form.validate_on_submit():
user = User.query.filter_by(username=form.username.data).first()
if user is None or not user.check_password(form.password.data):
flash('Invalid username or password')
return redirect(url_for('login'))
login_user(user, remember=form.remember_me.data)
next_page = request.args.get('next') # 增加项
if not next_page or url_parse(next_page).netloc != '': # 增加项
next_page = url_for('index') # 增加项
return redirect(next_page) # 增加项
# flash('Login requested for user {}, remember_me={}'.format(
# form.username.data, form.remember_me.data))
# return redirect(url_for('index'))
return render_template('login.html', title='Sign In', form=form)
模版中显示已登录的用户 app/templates/index.html
{% extends "base.html" %}
{% block content %}
<h1>hello, {{ current_user.username }}</h1>
{% for post in posts %}
<div><p>{{ post.author.username }} says: <b>{{ post.body }}</b></p></div>
{% endfor %}
{% endblock %}
在模版函数中删除 user: app/routes.py
@app.route('/')
@app.route('/index')
@login_required
def index():
# ...
return render_template("index.html", title='Home Page', posts=posts)
手动添加一个用户, 相当于注册
# flask shell
>>> u = User(username='user01', email='user01@qq.com')
>>> u.set_password('1234567')
>>> db.session.add(u)
>>> db.session.commit()
用户注册
app/forms.py
from flask_wtf import FlaskForm
from wtforms import StringField, PasswordField, BooleanField, SubmitField
from wtforms.validators import ValidationError, DataRequired, Email, EqualTo
from app.models import User
class RegistrationForm(FlaskForm):
username = StringField('Username', validators=[DataRequired()])
email = StringField('Email', validators=[DataRequired(), Email()])
password = PasswordField('Password', validators=[DataRequired()])
password2 = PasswordField(
'Repeat Password', validators=[DataRequired(), EqualTo('password')])
submit = SubmitField('Register')
def validate_username(self, username):
user = User.query.filter_by(username=username.data).first()
if user is not None:
raise ValidationError('Please use a different username.')
def validate_email(self, email):
user = User.query.filter_by(email=email.data).first()
if user is not None:
raise ValidationError('Please use a different email address.')
app/template/register.html
{% extends "base.html" %}
{% block content %}
<h1>Register</h1>
<form action="" method="post">
{{ form.hidden_tag() }}
<p>
{{ form.username.label }}<br>
{{ form.username(size=32) }}<br>
{% for error in form.username.errors %}
<span style="color: red;">[{{ error }}]</span>
{% endfor %}
</p>
<p>
{{ form.email.label }}<br>
{{ form.email(size=64) }}<br>
{% for error in form.email.errors %}
<span style="color: red;">[{{ error }}]</span>
{% endfor %}
</p>
<p>
{{ form.password.label }}<br>
{{ form.password(size=32) }}<br>
{% for error in form.password.errors %}
<span style="color: red;">[{{ error }}]</span>
{% endfor %}
</p>
<p>
{{ form.password2.label }}<br>
{{ form.password2(size=32) }}<br>
{% for error in form.password2.errors %}
<span style="color: red;">[{{ error }}]</span>
{% endfor %}
</p>
<p>{{ form.submit() }}</p>
</form>
{% endblock %}
app/templates/login.html
{% extends "base.html" %}
{% block content %}
<h1>Sign In</h1>
<form action="" method="post" novalidate>
{{ form.hidden_tag() }}
<p>
{{ form.username.label }}<br>
{{ form.username(size=32) }}<br>
{% for error in form.username.errors %}
<span style="color: red;">[{{ error }}]</span>
{% endfor %}
</p>
<p>
{{ form.password.label }}<br>
{{ form.password(size=32) }}<br>
{% for error in form.password.errors %}
<span style="color: red;">[{{ error }}]</span>
{% endfor %}
</p>
<p>{{ form.remember_me() }} {{ form.remember_me.label }}</p>
<p>{{ form.submit() }}</p>
</form>
<p>New User? <a href="{{ url_for('register') }}">Click to Register!</a></p> # 增加注册
{% endblock %}
app/template/routes.py
from app import db
from app.forms import RegistrationForm
# ...
@app.route('/register', methods=['GET', 'POST'])
def register():
if current_user.is_authenticated:
return redirect(url_for('index'))
form = RegistrationForm()
if form.validate_on_submit():
user = User(username=form.username.data, email=form.email.data)
user.set_password(form.password.data)
db.session.add(user)
db.session.commit()
flash('Congratulations, you are now a registered user!')
return redirect(url_for('login'))
return render_template('register.html', title='Register', form=form)
个人主页和头像
个人主页
app/routes.py
@app.route('/user/<username>')
@login_required
def user(username):
user = User.query.filter_by(username=username).first_or_404()
posts = [
{'author': user, 'body': 'Test post #1'},
{'author': user, 'body': 'Test post #2'}
]
return render_template('user.html', user=user, posts=posts)
app/templates/user.html
{% extends "base.html" %}
{% block content %}
<h1>User: {{ user.username }}</h1>
<hr />
{% for post in posts %}
<p>
{{ post.author.username }} says: <b>{{ post.body }}</b>
</p>
{% endfor %}
{% endblock %}
app/templates/base.html
<div>
Index: <a href="{{ url_for('index') }}">Index</a>
{% if current_user.is_anonymous %}
Login: <a href="{{ url_for('login') }}">Login</a>
{% else %}
Profile: <a href="{{ url_for('user', username=current_user.username ) }}">Profile</a>
Logout: <a href="{{ url_for('logout') }}">Logout</a>
{% endif %}
</div>
Gravatar头像
>>> from hashlib import md5 >>> 'https://www.gravatar.com/avatar/' + md5(b'sma.flirt@gmail.com').hexdigest() 'https://www.gravatar.com/avatar/98d1c884548d73ac4f55cb23bd4af8d8'
- 头像链接 80X80 https://www.gravatar.com/avatar/98d1c884548d73ac4f55cb23bd4af8d8
- 头像链接 128X128 https://www.gravatar.com/avatar/98d1c884548d73ac4f55cb23bd4af8d8?s=128
app/modules.py
from hashlib import md5
# ...
class User(UserMixin, db.Model):
# ...
def avatar(self, size):
digest = md5(self.email.lower().encode('utf-8')).hexdigest()
# 对于没有注册头像的用户,将生成“identicon”类的随机图片
return 'https://www.gravatar.com/avatar/{}?d=identicon&s={}'.format(
digest, size)
app/templates/user.html
{% extends "base.html" %}
{% block content %}
<table>
<tr valign="top">
<td><img src="{{ user.avatar(128) }}"></td>
<td><h1>User: {{ user.username }}</h1></td>
</tr>
</table>
<hr>
{% for post in posts %}
<table>
<tr valign="top">
<td><img src="{{ post.author.avatar(36) }}"></td>
<td>{{ post.author.username }} says:<br>{{ post.body }}</td>
</tr>
</table>
{% endfor %}
{% endblock %}
更多个人资料
app/models.py
class User(UserMixin, db.Model):
# ...
about_me = db.Column(db.String(140))
last_seen = db.Column(db.DateTime, default=datetime.utcnow)
执行数据库更改
flask db migrate -m "new fields in user table" flask db upgrade
app/templates/user.html
{% extends "base.html" %}
{% block content %}
<table>
<tr valign="top">
<td><img src="{{ user.avatar(128) }}"></td>
<td>
<h1>User: {{ user.username }}</h1>
{% if user.about_me %}<p>{{ user.about_me }}</p>{% endif %}
{% if user.last_seen %}<p>Last seen on: {{ user.last_seen }}</p>{% endif %}
</td>
</tr>
</table>
...
{% endblock %}
app/routes.py
from datetime import datetime
@app.before_request
def before_request():
if current_user.is_authenticated:
current_user.last_seen = datetime.utcnow()
db.session.commit()
个人资料编辑
app/forms.py
from wtforms import StringField, TextAreaField, SubmitField
from wtforms.validators import DataRequired, Length
# ...
class EditProfileForm(FlaskForm):
username = StringField('Username', validators=[DataRequired()])
about_me = TextAreaField('About me', validators=[Length(min=0, max=140)])
submit = SubmitField('Submit')
app/templates/edit_profile.html
{% extends "base.html" %}
{% block content %}
<h1>Edit Profile</h1>
<form action="" method="post">
{{ form.hidden_tag() }}
<p>
{{ form.username.label }}<br>
{{ form.username(size=32) }}<br>
{% for error in form.username.errors %}
<span style="color: red;">[{{ error }}]</span>
{% endfor %}
</p>
<p>
{{ form.about_me.label }}<br>
{{ form.about_me(cols=50, rows=4) }}<br>
{% for error in form.about_me.errors %}
<span style="color: red;">[{{ error }}]</span>
{% endfor %}
</p>
<p>{{ form.submit() }}</p>
</form>
{% endblock %}
app/routes.py
from app.forms import EditProfileForm
@app.route('/edit_profile', methods=['GET', 'POST'])
@login_required
def edit_profile():
form = EditProfileForm()
if form.validate_on_submit():
current_user.username = form.username.data
current_user.about_me = form.about_me.data
db.session.commit()
flash('Your changes have been saved.')
return redirect(url_for('edit_profile'))
elif request.method == 'GET':
form.username.data = current_user.username
form.about_me.data = current_user.about_me
return render_template('edit_profile.html', title='Edit Profile',
form=form)
错误处理
app/errors.py
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# vim:fenc=utf-8
# Create Time: 2018-11-15 10:33:24
# Last Modified: 2018-11-15 10:33:45
from flask import render_template
from app import app, db
@app.errorhandler(404)
def not_found_error(error):
return render_template('404.html'), 404
@app.errorhandler(500)
def internal_error(error):
db.session.rollback()
return render_template('500.html'), 500
app/templates/404.html
{% extends "base.html" %}
{% block content %}
<h1>File Not Found</h1>
<p><a href="{{ url_for('index') }}">Back</a></p>
{% endblock %}
app/templates/500.html
{% extends "base.html" %}
{% block content %}
<h1>An unexpected error has occurred</h1>
<p>The administrator has been notified. Sorry for the inconvenience!</p>
<p><a href="{{ url_for('index') }}">Back</a></p>
{% endblock %}
使用邮件发送错误日志, 日志记录到文件中
config.py
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# vim:fenc=utf-8
# Create Time: 2018-11-14 11:18:47
# Last Modified: 2018-11-15 10:42:24
import os
basedir = os.path.abspath(os.path.dirname(__file__))
class Config(object):
SECRET_KEY = os.environ.get('SECRET_KEY') or 'my-secret'
SQLALCHEMY_DATABASE_URI = os.environ.get('DATABASE_URL') or \
'sqlite:///' + os.path.join(basedir, 'app.db')
SQLALCHEMY_TRACK_MODIFICATIONS = False
MAIL_SERVER = os.environ.get('MAIL_SERVER') or "smtp.email.com"
MAIL_PORT = int(os.environ.get('MAIL_PORT') or 25)
# MAIL_USE_TLS = os.environ.get('MAIL_USE_TLS') is not None
MAIL_USE_TLS = 0
MAIL_USERNAME = os.environ.get('MAIL_USERNAME') or "username@"
MAIL_PASSWORD = os.environ.get('MAIL_PASSWORD') or "password"
ADMINS = ['i@liuhonghe.me']
app/__init__.py
import logging
from logging.handlers import SMTPHandler
from logging.handlers import RotatingFileHandler
import os
# ...
if not app.debug:
if app.config['MAIL_SERVER']:
auth = None
if app.config['MAIL_USERNAME'] or app.config['MAIL_PASSWORD']:
auth = (app.config['MAIL_USERNAME'], app.config['MAIL_PASSWORD'])
secure = None
if app.config['MAIL_USE_TLS']:
secure = ()
mail_handler = SMTPHandler(
mailhost=(app.config['MAIL_SERVER'], app.config['MAIL_PORT']),
fromaddr='no-reply@' + app.config['MAIL_SERVER'],
toaddrs=app.config['ADMINS'], subject='Microblog Failure',
credentials=auth, secure=secure)
mail_handler.setLevel(logging.ERROR)
app.logger.addHandler(mail_handler)
os.makedirs('logs', exist_ok=True)
file_handler = RotatingFileHandler('logs/os_basic.log', maxBytes=10240,
backupCount=10)
file_handler.setFormatter(logging.Formatter(
'%(asctime)s %(levelname)s: %(message)s [in %(pathname)s:%(lineno)d]'))
file_handler.setLevel(logging.INFO)
app.logger.addHandler(file_handler)
app.logger.setLevel(logging.INFO)
app.logger.info('os_basic startup')
from app import routes, models, errors
修复用户名重复的 bug
app/forms.py
class EditProfileForm(FlaskForm):
username = StringField('Username', validators=[DataRequired()])
about_me = TextAreaField('About me', validators=[Length(min=0, max=140)])
submit = SubmitField('Submit')
def __init__(self, original_username, *args, **kwargs):
super(EditProfileForm, self).__init__(*args, **kwargs)
self.original_username = original_username
def validate_username(self, username):
if username.data != self.original_username:
user = User.query.filter_by(username=self.username.data).first()
if user is not None:
raise ValidationError('Please use a different username.')
app/routes.py
@app.route('/edit_profile', methods=['GET', 'POST'])
@login_required
def edit_profile():
form = EditProfileForm(current_user.username)
# ...
邮件支持
pipenv install flask-mail pyjwt
app/__init__.py
# ... from flask_mail import Mail app = Flask(__name__) # ... mail = Mail(app)
app/email.py
from flask_mail import Message
from app import mail
def send_email(subject, sender, recipients, text_body, html_body):
msg = Message(subject, sender=sender, recipients=recipients)
msg.body = text_body
msg.html = html_body
mail.send(msg)
app/templates/login.html
....
<p>New User? <a href="{{ url_for('register') }}">Click to Register!</a></p>
<p>
Forgot Your Password?
<a href="{{ url_for('reset_password_request') }}">Click to Reset It</a>
</p>
{% endblock %}
app/forms.py
class ResetPasswordRequestForm(FlaskForm):
email = StringField('Email', validators=[DataRequired(), Email()])
submit = SubmitField('Request Password Reset')
app/templates/reset_password_request.html
{% extends "base.html" %}
{% block content %}
<h1>Reset Password</h1>
<form action="" method="post">
{{ form.hidden_tag() }}
<p>
{{ form.email.label }}<br>
{{ form.email(size=64) }}<br>
{% for error in form.email.errors %}
<span style="color: red;">[{{ error }}]</span>
{% endfor %}
</p>
<p>{{ form.submit() }}</p>
</form>
{% endblock %}
app/routes.py
from app.forms import ResetPasswordRequestForm
from app.email import send_password_reset_email
@app.route('/reset_password_request', methods=['GET', 'POST'])
def reset_password_request():
if current_user.is_authenticated:
return redirect(url_for('index'))
form = ResetPasswordRequestForm()
if form.validate_on_submit():
user = User.query.filter_by(email=form.email.data).first()
if user:
send_password_reset_email(user)
flash('Check your email for the instructions to reset your password')
return redirect(url_for('login'))
return render_template('reset_password_request.html',
title='Reset Password', form=form)
app/models.py
from time import time
import jwt
from app import app
class User(UserMixin, db.Model):
# ...
def get_reset_password_token(self, expires_in=600):
return jwt.encode(
{'reset_password': self.id, 'exp': time() + expires_in},
app.config['SECRET_KEY'], algorithm='HS256').decode('utf-8')
@staticmethod
def verify_reset_password_token(token):
try:
id = jwt.decode(token, app.config['SECRET_KEY'],
algorithms=['HS256'])['reset_password']
except:
return
return User.query.get(id)
app/email.py
from flask import render_template
from app import app
# ...
def send_password_reset_email(user):
token = user.get_reset_password_token()
send_email('[Microblog] Reset Your Password',
sender=app.config['ADMINS'][0],
recipients=[user.email],
text_body=render_template('email/reset_password.txt',
user=user, token=token),
html_body=render_template('email/reset_password.html',
user=user, token=token))
app/templates/email/reset_password.txt
Dear {{ user.username }},
To reset your password click on the following link:
{{ url_for('reset_password', token=token, _external=True) }}
If you have not requested a password reset simply ignore this message.
Sincerely,
The Microblog Team
app/templates/email/reset_password.html
<p>Dear {{ user.username }},</p>
<p>
To reset your password
<a href="{{ url_for('reset_password', token=token, _external=True) }}">
click here
</a>.
</p>
<p>Alternatively, you can paste the following link in your browser's address bar:</p>
<p>{{ url_for('reset_password', token=token, _external=True) }}</p>
<p>If you have not requested a password reset simply ignore this message.</p>
<p>Sincerely,</p>
<p>The Microblog Team</p>
app/routes.py
from app.forms import ResetPasswordForm
@app.route('/reset_password/<token>', methods=['GET', 'POST'])
def reset_password(token):
if current_user.is_authenticated:
return redirect(url_for('index'))
user = User.verify_reset_password_token(token)
if not user:
return redirect(url_for('index'))
form = ResetPasswordForm()
if form.validate_on_submit():
user.set_password(form.password.data)
db.session.commit()
flash('Your password has been reset.')
return redirect(url_for('login'))
return render_template('reset_password.html', form=form)
app/forms.py
class ResetPasswordForm(FlaskForm):
password = PasswordField('Password', validators=[DataRequired()])
password2 = PasswordField(
'Repeat Password', validators=[DataRequired(), EqualTo('password')])
submit = SubmitField('Request Password Reset')
app/templates/reset_password.html
{% extends "base.html" %}
{% block content %}
<h1>Reset Your Password</h1>
<form action="" method="post">
{{ form.hidden_tag() }}
<p>
{{ form.password.label }}<br>
{{ form.password(size=32) }}<br>
{% for error in form.password.errors %}
<span style="color: red;">[{{ error }}]</span>
{% endfor %}
</p>
<p>
{{ form.password2.label }}<br>
{{ form.password2(size=32) }}<br>
{% for error in form.password2.errors %}
<span style="color: red;">[{{ error }}]</span>
{% endfor %}
</p>
<p>{{ form.submit() }}</p>
</form>
{% endblock %}
异步电子邮件 app/email.py
from threading import Thread
def send_async_email(app, msg):
with app.app_context():
mail.send(msg)
def send_email(subject, sender, recipients, text_body, html_body):
msg = Message(subject, sender=sender, recipients=recipients)
msg.body = text_body
msg.html = html_body
# mail.send(msg)
Thread(target=send_async_email, args=(app, msg)).start()
以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网
猜你喜欢:- 【每日笔记】【Go学习笔记】2019-01-04 Codis笔记
- 【每日笔记】【Go学习笔记】2019-01-02 Codis笔记
- 【每日笔记】【Go学习笔记】2019-01-07 Codis笔记
- Golang学习笔记-调度器学习
- Vue学习笔记(二)------axios学习
- 算法/NLP/深度学习/机器学习面试笔记
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。