1.需求分析
- 用户登录验证
- 用户登录保持
2.实现思路
- 用户发送用户名和密码到服务器,服务器端验证后,生成并返回令牌(token)。
- 用户再次访问时,每次请求携带token,访问私有信息。
3.支持模块
后端支持:
- 用于校验密码的哈希算法。
- 令牌计算和解译工具。
- 数据库和数据库连接工具,为简化操作,这里使用文件存储代替。
- 路由处理。
- 跨越请求。
前端支持:
- ajax请求模块。
- 本地端口服务。
4.文件组织
采用前后端分离方法。前端和后端分别放在不同的文件夹。
后端文件如下:
main.py //主文件
my_fake_db_connect.py//模拟数据库连接工具
fake_data.json//用json文件模拟数据库
my_tools.py//密码校验和令牌创建、解译工具,包含类PasswordCheck,TokenCreator
实现代码
main.py
main.py
文件主要作用是处理路由请求,配置跨域许可,调用各类对象和方法,其中关键代码在路由方法中。
#文件名:main.py
from fastapi import FastAPI,Depends,HTTPException, status
from fastapi.middleware.cors import CORSMiddleware#跨域请求
from my_fake_db_connect import fake_db_connect#虚拟数据库连接
from my_tools import PasswordCheck,TokenCreator
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")#初始化安全性方案对象
#初始化工具类
tokencreator=TokenCreator()
passwordcheck=PasswordCheck('fake_data.json')
db=fake_db_connect('fake_data.json')
# 放行跨域请求的域名
origins = [
"http://localhost.tiangolo.com",
"https://localhost.tiangolo.com",
"http://localhost",
"http://localhost:8080",
"http://localhost:8848",
"http://127.0.0.1:8848",
"http://127.0.0.1:5500"
]
app=FastAPI()
# 添加跨域方案
app.add_middleware(
CORSMiddleware,
allow_origins=origins,
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
#游客的访问
@app.get("/hello")
async def hello(name:str):
return "hello,"+name+"!"
#登录,返回令牌
@app.post("/token")
async def login(form_data: OAuth2PasswordRequestForm = Depends()):
'''
处理登录请求
'''
#user = authenticate_user(
# fake_users_db, form_data.username, form_data.password)
username=form_data.username
password=form_data.password
if not passwordcheck.check(username, password):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password",
headers={"WWW-Authenticate": "Bearer"},
)
access_token=tokencreator.create(username)
print('返回用户令牌',access_token)
# 返回用户令牌
return {"access_token": access_token, "token_type": "bearer"}
@app.get("/user/me")
async def get_current_user(token: str = Depends(oauth2_scheme)):
'''
获取当前用户信息。
'''
# 默认权限错误信息
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
# 尝试解码token获取username
username=tokencreator.decode(token)
if username is None: raise credentials_exception
# 尝试从数据库取用户信息
try: userinfo=db.get(username)
except: raise credentials_exception
if userinfo is None: raise credentials_exception
return userinfo
my_tools.py
这里封装了本案例中最核心的方法。即密码的校验、令牌的生成和解译。
class PasswordCheck():
'''
密码校验工具。
初始化方法:password_check=PasswordCheck("数据库名")。
密码检查方法:.check("用户名","密码"),返回布尔类型。
'''
from my_fake_db_connect import fake_db_connect#数据库连接模块
from passlib.context import CryptContext#哈希算法模块
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")#定义一个哈希算法
def __init__(self,db:str):#
self.db=self.fake_db_connect(db)
pass
def check(self,username:str,password:str)->bool:
try:
userinfo=self.db.get(username)#尝试取数据
hashed_password=userinfo.get('hashed_password')
if self.pwd_context.verify(password, hashed_password):
return True
else: return False
except:
return False
def __call__(self,username:str,password:str)->bool:
self.check(username, password)
class TokenCreator():
'''
令牌生成器。
'''
#算法库
from jose import JWTError, jwt
from datetime import datetime, timedelta
from typing import Optional
#令牌加密方法和有效期
SECRET_KEY = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
def __init__(self):
pass
def __call__(self,username:str):
self.create(username)
def create(self,username:str,expires_delta:Optional[timedelta] = None)->str:
if expires_delta:
# 失效时间等于当前时间加有效期
expire = self.datetime.utcnow()+expires_delta
else:
expire = self.datetime.utcnow()+self.timedelta(minutes=15)
data={"sub":username,'exp':expire}#写入令牌的数据
encode_jwt = self.jwt.encode(data, self.SECRET_KEY, algorithm=self.ALGORITHM)#生成令牌
return encode_jwt
def decode(self,token:str)->str:
'''
解码Token,返回用户名
'''
try:
payload = self.jwt.decode(token, self.SECRET_KEY, algorithms=[self.ALGORITHM])
username: str = payload.get("sub")
if username is None:
return None
except self.JWTError:
return None
print('解码令牌出错')
return username
my_fake_db_connect.py
这里使用JSON文件模拟数据库存储,使用python自带功能模拟数据库的增删改查,这样做数据丢失风险极大,这里仅作为测试时的替代方法,实际工作中务必使用真正的数据库。
class fake_db_connect():
'''
模拟数据库连接工具
'''
import json
def __init__(self,filename:str):
self.filename=filename
with open(filename, 'r',encoding='utf-8') as f:
self.fake_user_table = self.json.load(f)
def get(self,primaryKey:str)->dict:
return self.fake_user_table.get(primaryKey)
def insert(self,info:dict):
primaryKey=info['username']
self.fake_user_table[primaryKey]=info
def commit(self):
with open(self.filename, 'w',encoding='utf-8') as f:
self.json.dump(self.fake_user_table, f, ensure_ascii=False,indent=2)
def delete(self,primaryKey:str):
del self.fake_user_table[primaryKey]
fake_data.json
{
"johndoe": {
"username": "johndoe",
"full_name": "John Doe",
"email": "johndoe@example.com",
"hashed_password": "$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW",
"disabled": false
},
"alice": {
"username": "alice",
"full_name": "Alice Wonderson",
"email": "alice@example.com",
"hashed_password": "fakehashedsecret2",
"disabled": true
},
"lihua": {
"username": "lihua",
"full_name": "Li Hua",
"email": "lihua@example.com",
"hashed_password": "fakehashedmima",
"disabled": true
}
}