首先,你需要安装 PyJWT 库,可以通过以下命令安装:
pip install PyJWT
然后,你可以使用以下代码:
from fastapi import FastAPI, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
from jose import JWTError, jwt
from passlib.context import CryptContext
app = FastAPI()
# 加密算法
ALGORITHM = "HS256"
# 模拟用户数据
fake_users_db = {
"fakeuser": {
"username": "fakeuser",
"hashed_password": "$2b$12$TjC5p6E9JOHvDj.k2yMz4O7cw86FT0oRbyHM79X0IZoQdV5bkyb2a",
"email": "fakeuser@example.com",
}
}
# 密码哈希
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
# OAuth2PasswordBearer 类用于处理密码和 Bearer Token
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
# JWT 配置
SECRET_KEY = "supersecretkey"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
# 生成 JWT Token
def create_jwt_token(data: dict, expires_delta: int):
to_encode = data.copy()
expire = datetime.utcnow() + timedelta(minutes=expires_delta)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
# 模拟函数,用于验证用户名和密码
def verify_user(username: str, password: str):
user = fake_users_db.get(username)
if user and pwd_context.verify(password, user["hashed_password"]):
return user
# 模拟函数,用于获取当前用户信息
def get_current_user(token: str = Depends(oauth2_scheme)):
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid credentials",
headers={"WWW-Authenticate": "Bearer"},
)
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username: str = payload.get("sub")
if username is None:
raise credentials_exception
except JWTError:
raise credentials_exception
user_info = fake_users_db.get(username)
if user_info is None:
raise credentials_exception
return user_info
# 登录路径操作,使用 OAuth2PasswordBearer 从表单获取用户名和密码
@app.post("/token")
async def login(form_data: OAuth2PasswordRequestForm = Depends()):
user_info = verify_user(form_data.username, form_data.password)
if user_info is None:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid credentials",
headers={"WWW-Authenticate": "Bearer"},
)
# 生成 JWT Token
access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = create_jwt_token(data={"sub": user_info["username"]}, expires_delta=access_token_expires.total_seconds() // 60)
return {"access_token": access_token, "token_type": "bearer"}
# 受保护的路径操作,使用 get_current_user 依赖项获取当前用户信息
@app.get("/users/me", response_model=dict)
async def read_users_me(current_user: dict = Depends(get_current_user)):
return current_user
在这个示例中:
- 使用 PyJWT 库来处理 JWT。
- 使用 passlib 库来进行密码哈希验证。
- 在 /token 路径操作中,用户提交用户名和密码,如果验证通过,将生成一个带有用户信息的 JWT Token。
- 在 /users/me 路径操作中,使用 get_current_user 依赖项验证 JWT Token 并获取当前用户信息。
确保在实际应用中采用适当的安全措施,如使用 HTTPS 等。此外,存储密码哈希时,建议使用更安全的哈希算法,并使用适当的盐值。
转载请注明出处:http://www.pingtaimeng.com/article/detail/7393/FastAPI