Fastapi教程六:项目妹子图(5)-用户认证篇

in fastapi with 1 comment

一、JWT验证:
首先安装pyjwt(pip install pyjwt
core/token.py

from datetime import timedelta, datetime
import jwt

from fastapi.security import OAuth2PasswordBearer
from fastapi import Depends, HTTPException
from sqlalchemy.orm import Session  
from passlib.context import CryptContext

from core.database import get_db, SessionLocal
from apps.users.models import User, Token

SECRET_KEY = "09d25e034faa6caxxxxc318166b7a9563b93f7099xxxxf4caa6cf63b28e8d3e7"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 60

oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/token")

#验证用户
def authenticate_user(db:Session, username: str, password: str):
    try:
        user = db.query(User).filter(User.username == username).first()
        if not user:
            return False
        if not user.check_password(password):
            return False
        return user
    except Exception as e:
        print('err:%s' %e)
        # raise HTTPException(status_code=400, detail='用户不存在')
        return False

#创建JWT access_token
def create_access_token(*, data: dict, expires_delta: timedelta = None):
    to_encode = data.copy()
    if expires_delta:
        expire = datetime.utcnow() + expires_delta
    else:
        expire = datetime.utcnow() + timedelta(minutes=15)
    to_encode.update({"exp": expire})
    encoded_jwt = jwt.encode(to_encode, secret_key, algorithm=algorithm)
    return encoded_jwt

#decode token
def decode_access_token(*, data: str):
    to_decode = data
    return jwt.decode(to_decode, secret_key, algorithm=algorithm)

#获取当前用户,可以在视图中利用Depends依赖,当成登录视图,类似django @login_required.
#但是fastapi可以通过Depends函数依赖来使用。不需要用装饰器。
async def get_current_user(token: str = Depends(oauth2_scheme), db:Session = Depends(get_db)):
    credentials_exception = HTTPException(
        status_code=status.HTTP_401_UNAUTHORIZED,
        detail={'msg':'Could not validate credentials', 'code':999},
        headers={"WWW-Authenticate": "Bearer"},
    )
    try:
        payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
        username: str = payload.get("sub")
        if username is None:
            raise credentials_exception
        token_data = schemas.TokenData(username=username)
    except PyJWTError:
        raise credentials_exception
    user =  db.query(User).filter(User.username == token_data.username).first()
    if user is None:
        raise credentials_exception
    return user

#获取is_active的用户
async def get_current_active_user(current_user: User = Depends(get_current_user)):
    if not current_user.is_active:
        raise HTTPException(status_code=400, detail="Inactive user")
    return current_user

二、登录接口:
apps/router.py:

from datetime import datetime, timedelta

import jwt

from fastapi import APIRouter, Depends, HTTPException, status, Request, Form, Response
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm 
from fastapi.encoders import jsonable_encoder
from starlette.responses import RedirectResponse


from sqlalchemy.orm import Session  
from jwt import PyJWTError
from pydantic import BaseModel

from core.token import oauth2_scheme, create_access_token, authenticate_user, get_current_user,ACCESS_TOKEN_EXPIRE_MINUTES, SECRET_KEY, ALGORITHM
from core.database import get_db
from core import render 

from .users.crud import get_user_by_username, update_user, get_user_by_ssesion
from .users import schemas
from .users.models import User

router = APIRouter() 

#改视图Depends(get_current_user),需要登录才能查看,否则返回403
@router.get("/")
async def read_root(request:Request,user = Depends(get_current_user)):
    return {'msg':ok}

#请求TOken路由。这里要注意,使用OAuth2PasswordRequestForm必须安装python-multipart插件(pip install python-multipart)
@router.post("/token", response_model=schemas.Token)
async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends(), db:Session = Depends(get_db)):
# async def login_for_access_token(*, username: str = Body(...), password: str = Body(...), db:Session = Depends(get_db)):
    user = authenticate_user(db, form_data.username, form_data.password)
    if not user:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Incorrect username or password",
            headers={"WWW-Authenticate": "Bearer"},
        )
    access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
    access_token = create_access_token(
        data={"sub": user.username}, expires_delta=access_token_expires
    )
    return {"access_token": access_token, "token_type": "bearer"}

我们测试下,通过POSTMAN请求路由。
1、请求token
请输入图片描述
2、测试登录路由,header不带token[返回Not authenticated]:
请输入图片描述
3、测试登录路由,header带jwt token
请输入图片描述
成功返回{'msg':ok}

其实到这里后端的API差不多完成了,因为是一个简单的妹子图,只有用户、文章【可以细分分类标签这些】。
fastapi构建就是这么简单方便。后续我们继续前端妹子图项目。

Comments are closed.