知识树
1.Python发送email
2.token处理
3.用户账号管理
一个微型的Email工具
因为当用户注册时,可能会涉及到给用户发送邮件,所以这里先写一个发送Email的小工具。
直接使用Python自带的smtplib库和email库,我这里使用的是Google的smtp服务,并且把登陆用户名和密码设置在环境变量中,毕竟属于敏感信息嘛。然后就是定义消息体、邮件主题等信息。另外为了提高发送邮件的效率,采用了多线程的方式。
def sendmail(to, subject, text): # 第三方 SMTP 服务 mail_host = "smtp.gmail.com" # 设置服务器 mail_user = os.environ.get('MAIL_USERNAME') # 用户名, 从环境变量中获取 mail_pass = os.environ.get('MAIL_PASSWORD') # 口令 sender = os.environ.get('MAIL_USERNAME') receivers = [to] message = MIMEText(text, 'plain', 'utf-8') message['From'] = Header('萝卜大杂烩', 'utf-8') message['to'] = Header(to, 'utf-8') subject = subject message['Subject'] = Header(subject, 'utf-8') smtpobj = smtplib.SMTP(mail_host, 25) smtpobj.ehlo() smtpobj.starttls() smtpobj.login(mail_user, mail_pass) thr = Thread(target=smtpobj.sendmail, args=[sender, receivers, message.as_string()]) # smtpobj.sendmail(sender, receivers, message.as_string()) thr.start() return thr
更新表结构
生成token的工具
这里使用itsdangerous来生成命令牌,它的TimedJSONWebSignatureSerializer 类可以生成具有过期时间的 JSON Web 签名(JSON Web Signatures,JWS)。这个类的构造函数接收的参数是一个密钥,在 Flask 程序中可使用 SECRET_KEY 设置。
dumps() 方法为指定的数据生成一个加密签名,然后再对数据和签名进行序列化,生成令牌字符串。expires_in 参数设置令牌的过期时间,单位为秒。
为了解码令牌,序列化对象提供了 loads() 方法,其唯一的参数是令牌字符串。这个方法会检验签名和过期时间,如果通过,返回原始数据。如果提供给 loads() 方法的令牌不正 确或过期了,则抛出异常。
新的表结构
把生成命令牌的动作定义在WebUser里面,同时再为该类增加一个confirmed字段,用来标识用户是否确认,所以现在完整的WebUser类定义为
class WebUser(UserMixin, db.Model): __tablename__ = 'webuser' id = db.Column(db.Integer, primary_key=True) user_id = db.Column(db.String(64), unique=True, index=True) email = db.Column(db.String(64), unique=True, index=True) username = db.Column(db.String(64), unique=True, index=True) password_hash = db.Column(db.String(128)) confirmed = db.Column(db.Boolean, default=False) @staticmethod def init_user(): users = WebUser.query.filter_by(username='admin').first() if users is None: users = WebUser(user_id=time.time(), email='admin@123.com', username='admin', confirmed=True) users.password = '123456' db.session.add(users) db.session.commit() @property def password(self): raise AttributeError('password is not readable attribute') @password.setter def password(self, password): self.password_hash = generate_password_hash(password) def verify_password(self, password): if self.password_hash is not None: return check_password_hash(self.password_hash, password) def generate_confirmation_token(self, expiration=3600): s = Serializer(current_app.config['SECRET_KEY'], expiration) return s.dumps({'confirm': self.id}) def confirm(self, token): s = Serializer(current_app.config['SECRET_KEY']) try: data = s.loads(token) except: return False if data.get('confirm') != self.id: return False self.confirmed = True db.session.add(self) db.session.commit() return True
也可以看到,初始化用户的代码里,我设置了confirmed字段为True,即他默认为确认状态。
注册路由函数
这里定义了两个注册的路由函数,一个是需要用户使用邮件确认的,一个是不需要的。
其实也很好实现,不需要确认的函数,只要设置该用户的confirmed字段为True即可
@auth.route('/register', methods=['GET', 'POST']) def register(): form = RegisterForm() if form.validate_on_submit(): user = WebUser(email=form.email.data, username=form.username.data, password=form.password.data, user_id=time.time(), confirmed=True) db.session.add(user) db.session.commit() flash('You can login now.') return redirect(url_for('auth.login')) return render_template('auth/register.html', form=form)
对于需要确认的函数,需要发送邮件,这里就使用到我们刚刚定义的sendemail函数了
@auth.route('/registerconfirm', methods=['GET', 'POST']) def register_confirm(): form = RegisterForm() if form.validate_on_submit(): user = WebUser(email=form.email.data, username=form.username.data, password=form.password.data, user_id=time.time()) db.session.add(user) db.session.commit() token = user.generate_confirmation_token() text = render_template('auth/email/confirm.txt', user=user, token=token) sendmail(user.email, 'Confirm Your Account', text) flash('A Confirmation email has been sent to you by your registered email.') return redirect(url_for('auth.login')) return render_template('auth/registerconfirm.html', form=form)
而当用户点击确认链接后,跳转的确认函数逻辑为
@auth.route('/confirm/<token>') @login_required def confirm(token): if current_user.confirmed: return redirect(url_for('main.index')) if current_user.confirm(token): flash('You have confirmed your account') else: flash('The confirmation link is invalid or has expired.') return redirect(url_for('main.index'))
拿着用户确认链接里的token,调用confirm方法来更新confirmed字段。
重新发送confirm邮件
定义确认函数
如果用户没有在token超时时间内完成确认,而某些页面又是必须要确认才能访问的,所以就需要重新发送一个确认链接给用户,所以定义一个resend confirm函数,重新生成一个确认链接。因为这个是用户的主动行为,所以用login_required函数限制只有在用户登陆的情况下才可以使用。
@auth.route('/resendconfirm') @login_required def resend_confirmation(): token = current_user.generate_confirmation_token() text = render_template('auth/email/confirm.txt', user=current_user, token=token) sendmail(current_user.email, 'Confirm Your Account', text) flash('A new confirmation email has been sent to you by your registered email') return redirect(url_for('main.index'))
定义哪些页面非确认用户不可访问
Flask有两个钩子函数,分别是 before_request和before_app_request 修饰器。
before_request:注册一个函数,在每次请求之前运行。
before_app_request:针对程序全局,每次请求之前运行。
这里使用before_app_request注册一个函数,用来区别对待不同的页面
@auth.before_app_request def before_request(): if current_user.is_authenticated and not current_user.confirmed and request.endpoint == 'main.needconfirm': return redirect(url_for('auth.unconfirmed'))
可以看到,只有当用户为登陆状态,且没有确认,并且endpoint为main.needconfirm时,before_app_request才会拦截请求,并跳转到auth.unconfirmed页面。
其中needconfirm函数定义如下
@main.route('/needconfirm') @login_required def needconfirm(): print(request.endpoint) return 'Only confirmed users are allowed!'
unconfirmed函数定义如下
@auth.route('/unconfirmed') def unconfirmed(): if current_user.is_anonymous or current_user.confirmed: return redirect(url_for('main.index')) return render_template('auth/unconfirmed.html')
模板定义
现在新增了几个模板,分别为register.html、confirm.txt和unconfirmed.html等
对于register.html模板,是从login页面跳转过来的,在login页面添加不同的提示,指向不同的逻辑函数
<div class="col-md-12"> <a href="{{ url_for('auth.register') }}"> 点击此处注册,不需要确认邮箱 </a> </div> <div class="col-md-12"> <a href="{{ url_for('auth.register_confirm') }}"> 点击此处注册,需要确认邮箱 </a> </div>
confirm.txt是邮件模板,里面需要展示用户的确认url,而且这个url需要是绝对url,故使用_external=True来生成绝对url
{{ url_for('auth.confirm', token=token, _external=True) }}
而unconfirmed.html模板只需要调用resend_confirmation函数即可
<a href="{{ url_for('auth.resend_confirmation') }}">Click here</a>
用户账号管理
修改密码
这个功能相对简单一些,只要用户在登陆状态下,就可以展示一个表单,供用户修改密码
class ChangePwdForm(FlaskForm): oldpwd = PasswordField('Old Password', validators=[DataRequired()]) newpwd = PasswordField('New Password', validators=[DataRequired(), EqualTo('newpwd2', message='两次输入的密码需要一致。')]) newpwd2 = PasswordField('Confirm New Password', validators=[DataRequired()]) submit = SubmitField('Change Password')
然后使用WebUser模型的verity_password方法来检测老的密码是否正确,如果正确,则更新新密码
@auth.route('/changepwd', methods=['GET', 'POST']) @login_required def changepwd(): form = ChangePwdForm() if form.validate_on_submit(): if current_user.verify_password(form.oldpwd.data): current_user.password = form.newpwd.data db.session.add(current_user) db.session.commit() flash('You have changed your password!') return redirect(url_for('main.index')) else: flash('Invalid password') return render_template('auth/changepwd.html', form=form)
重设密码
重设密码相对复杂些,首先要在WebUser模型中增加两个静态方法,分别用来产生新的token和重置密码
def generate_reset_token(self, expiration=3600): s = Serializer(current_app.config['SECRET_KEY'], expiration) return s.dumps({'reset': self.id}) def reset_password(self, token, newpwd): s = Serializer(current_app.config['SECRET_KEY']) try: data = s.loads(token) except: return False if data.get('reset') != self.id: return False self.password = newpwd db.session.add(self) db.session.commit() return True
然后定义两个表单,一个为重置密码请求表单,包含一个Email输入框,用来输入用户注册的邮箱;另一个为重置密码表单,如果在重设密码时输入的Email是错误的邮箱,则直接报错
class ResetPwdEmailForm(FlaskForm): email = StringField('Your Register Email', validators=[DataRequired(), Length(1, 64), Email()]) submit = SubmitField('Reset Password') class ResetPwdForm(FlaskForm): email = StringField('Email', validators=[DataRequired(), Length(1, 64), Email()]) newpwd = PasswordField('New Password', validators=[DataRequired(), EqualTo('newpwd2', message='两次输入的密码需要一致。')]) newpwd2 = PasswordField('Confirm New Password', validators=[DataRequired()]) submit = SubmitField('Reset Password') def validate_email(self, field): if WebUser.query.filter_by(email=field.data).first() is None: raise ValidationError('This Email is Invalid')
页面演示
我将完整的代码上传到GitHub上了,有兴趣的同学可以戳这里:
https://github.com/zhouwei713/flask-webauth
另外还配置了一个演示web,地址为:
感兴趣的同学可以来玩玩