FastAPI用户安全性解决方案

1.需求分析

  1. 用户登录验证
  2. 用户登录保持

2.实现思路

  1. 用户发送用户名和密码到服务器,服务器端验证后,生成并返回令牌(token)。
  2. 用户再次访问时,每次请求携带token,访问私有信息。

3.支持模块

后端支持:

  1. 用于校验密码的哈希算法。
  2. 令牌计算和解译工具。
  3. 数据库和数据库连接工具,为简化操作,这里使用文件存储代替。
  4. 路由处理。
  5. 跨越请求。

前端支持:

  1. ajax请求模块。
  2. 本地端口服务。

4.文件组织

采用前后端分离方法。前端和后端分别放在不同的文件夹。
后端文件如下:

main.py //主文件
my_fake_db_connect.py//模拟数据库连接工具
fake_data.json//用json文件模拟数据库
my_tools.py//密码校验和令牌创建、解译工具,包含类PasswordCheck,TokenCreator

实现代码

main.py

main.py文件主要作用是处理路由请求,配置跨域许可,调用各类对象和方法,其中关键代码在路由方法中。

#文件名:main.py
from fastapi import FastAPI,Depends,HTTPException, status
from fastapi.middleware.cors import CORSMiddleware#跨域请求
from my_fake_db_connect import fake_db_connect#虚拟数据库连接
from my_tools import PasswordCheck,TokenCreator
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm

oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")#初始化安全性方案对象

#初始化工具类
tokencreator=TokenCreator()
passwordcheck=PasswordCheck('fake_data.json')
db=fake_db_connect('fake_data.json')

# 放行跨域请求的域名

origins = [
    "http://localhost.tiangolo.com",
    "https://localhost.tiangolo.com",
    "http://localhost",
    "http://localhost:8080",
    "http://localhost:8848",
    "http://127.0.0.1:8848",
    "http://127.0.0.1:5500"
]

app=FastAPI()

# 添加跨域方案
app.add_middleware(
    CORSMiddleware,
    allow_origins=origins,
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)



#游客的访问
@app.get("/hello")
async def hello(name:str):
    return "hello,"+name+"!"

#登录,返回令牌
@app.post("/token")
async def login(form_data: OAuth2PasswordRequestForm = Depends()):
    '''
    处理登录请求
    '''
    #user = authenticate_user(
    #    fake_users_db, form_data.username, form_data.password)
    username=form_data.username
    password=form_data.password

    if not passwordcheck.check(username, password):
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Incorrect username or password",
            headers={"WWW-Authenticate": "Bearer"},
        )

    access_token=tokencreator.create(username)
    print('返回用户令牌',access_token)
    # 返回用户令牌
    return {"access_token": access_token, "token_type": "bearer"}


@app.get("/user/me")
async def get_current_user(token: str = Depends(oauth2_scheme)):
    '''
    获取当前用户信息。
    '''
    # 默认权限错误信息
    credentials_exception = HTTPException(
        status_code=status.HTTP_401_UNAUTHORIZED,
        detail="Could not validate credentials",
        headers={"WWW-Authenticate": "Bearer"},
    )
    # 尝试解码token获取username
    username=tokencreator.decode(token)
    
    if username is None: raise credentials_exception

	# 尝试从数据库取用户信息
    try: userinfo=db.get(username)
    except: raise credentials_exception
       
    if userinfo is None: raise credentials_exception
    
    return userinfo
    

my_tools.py

这里封装了本案例中最核心的方法。即密码的校验、令牌的生成和解译。

class PasswordCheck():
    '''
    密码校验工具。
    初始化方法:password_check=PasswordCheck("数据库名")。
    密码检查方法:.check("用户名","密码"),返回布尔类型。
    '''
    from my_fake_db_connect import fake_db_connect#数据库连接模块
    from passlib.context import CryptContext#哈希算法模块
    pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")#定义一个哈希算法

    def __init__(self,db:str):#
        self.db=self.fake_db_connect(db)
        pass


    def check(self,username:str,password:str)->bool:
        try:
            userinfo=self.db.get(username)#尝试取数据
            hashed_password=userinfo.get('hashed_password')
            if self.pwd_context.verify(password, hashed_password):
                return True
            else: return False
        except:
            return False
      

    def __call__(self,username:str,password:str)->bool:
        self.check(username, password)


class TokenCreator():
    '''
    令牌生成器。
    '''
    #算法库
    from jose import JWTError, jwt

    from datetime import datetime, timedelta
    from typing import Optional

    #令牌加密方法和有效期
    SECRET_KEY = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"
    ALGORITHM = "HS256"
    ACCESS_TOKEN_EXPIRE_MINUTES = 30


    def __init__(self):
        pass
    
    def __call__(self,username:str):
        self.create(username)

    def create(self,username:str,expires_delta:Optional[timedelta] = None)->str:
        if expires_delta:
            # 失效时间等于当前时间加有效期
            expire = self.datetime.utcnow()+expires_delta
        else:
            expire = self.datetime.utcnow()+self.timedelta(minutes=15)

        data={"sub":username,'exp':expire}#写入令牌的数据
        encode_jwt = self.jwt.encode(data, self.SECRET_KEY, algorithm=self.ALGORITHM)#生成令牌
        return encode_jwt

    def decode(self,token:str)->str:
        '''
        解码Token,返回用户名
        '''
        try:
            payload = self.jwt.decode(token, self.SECRET_KEY, algorithms=[self.ALGORITHM])
            username: str = payload.get("sub")
            if username is None:
                return None

        except self.JWTError:
            return None
            print('解码令牌出错')
        return username

my_fake_db_connect.py

这里使用JSON文件模拟数据库存储,使用python自带功能模拟数据库的增删改查,这样做数据丢失风险极大,这里仅作为测试时的替代方法,实际工作中务必使用真正的数据库。

class fake_db_connect():
    '''
    模拟数据库连接工具
    '''
    import json
    def __init__(self,filename:str):
        self.filename=filename
        with open(filename, 'r',encoding='utf-8') as f:
            self.fake_user_table = self.json.load(f)
    
    def get(self,primaryKey:str)->dict:
        return self.fake_user_table.get(primaryKey)

    def insert(self,info:dict):
        primaryKey=info['username']
        self.fake_user_table[primaryKey]=info

    def commit(self):
        with open(self.filename, 'w',encoding='utf-8') as f:
            self.json.dump(self.fake_user_table, f, ensure_ascii=False,indent=2)
    
    def delete(self,primaryKey:str):
        del self.fake_user_table[primaryKey]

fake_data.json

{
  "johndoe": {
    "username": "johndoe",
    "full_name": "John Doe",
    "email": "johndoe@example.com",
    "hashed_password": "$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW",
    "disabled": false
  },
  "alice": {
    "username": "alice",
    "full_name": "Alice Wonderson",
    "email": "alice@example.com",
    "hashed_password": "fakehashedsecret2",
    "disabled": true
  },
  "lihua": {
    "username": "lihua",
    "full_name": "Li Hua",
    "email": "lihua@example.com",
    "hashed_password": "fakehashedmima",
    "disabled": true
  }
}
上一篇:微信小程序---自定义三级联动


下一篇:【Arduino实验15 红外遥控电风扇】