使用密码(及哈希)的 OAuth2,基于 JWT 的 Bearer 令牌¶
现在我们已经有了完整的安全流程,接下来用JWT 令牌和安全的密码哈希,让应用真正安全起来。
这些代码可以直接用于你的应用,你可以把密码哈希保存到数据库中,等等。
我们将从上一章结束的地方继续,逐步完善。
关于 JWT¶
JWT 意为 “JSON Web Tokens”。
它是一种标准,把一个 JSON 对象编码成没有空格、很密集的一长串字符串。看起来像这样:
eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIxMjM0NTY3ODkwIiwibmFtZSI6IkpvaG4gRG9lIiwiaWF0IjoxNTE2MjM5MDIyfQ.SflKxwRJSMeKKF2QT4fwpMeJf36POk6yJV_adQssw5c它不是加密的,所以任何人都可以从内容中恢复信息。
但它是“签名”的。因此,当你收到一个自己签发的令牌时,你可以验证它确实是你签发的。
这样你就可以创建一个例如有效期为 1 周的令牌。然后当用户第二天带着这个令牌回来时,你能知道该用户仍然处于登录状态。
一周后令牌过期,用户将不再被授权,需要重新登录以获取新令牌。而如果用户(或第三方)尝试修改令牌来更改过期时间,你也能发现,因为签名将不匹配。
如果你想动手体验 JWT 令牌并了解它的工作方式,请访问https://jwt.io。
安装PyJWT¶
我们需要安装PyJWT,以便在 Python 中生成和校验 JWT 令牌。
请确保创建并激活一个虚拟环境,然后安装pyjwt:
$pipinstallpyjwt---> 100%密码哈希¶
“哈希”是指把一些内容(这里是密码)转换成看起来像乱码的一串字节(其实就是字符串)。
当你每次传入完全相同的内容(完全相同的密码)时,都会得到完全相同的“乱码”。
但你无法从这个“乱码”反向还原出密码。
为什么使用密码哈希¶
如果你的数据库被盗,窃贼拿到的不会是用户的明文密码,而只是哈希值。
因此,窃贼无法把该密码拿去尝试登录另一个系统(很多用户在各处都用相同的密码,这将非常危险)。
安装pwdlib¶
pwdlib 是一个用于处理密码哈希的优秀 Python 包。
它支持多种安全的哈希算法以及相关工具。
推荐的算法是 “Argon2”。
请确保创建并激活一个虚拟环境,然后安装带 Argon2 的 pwdlib:
$pipinstall"pwdlib[argon2]"---> 100%提示
使用pwdlib,你甚至可以把它配置为能够读取由Django、Flask 安全插件或其他许多工具创建的密码。
例如,你可以在数据库中让一个 Django 应用和一个 FastAPI 应用共享同一份数据。或者在使用同一个数据库的前提下,逐步迁移一个 Django 应用到 FastAPI。
同时,你的用户既可以从 Django 应用登录,也可以从FastAPI 应用登录。
哈希并校验密码¶
从pwdlib 导入所需工具。
用推荐设置创建一个 PasswordHash 实例——它将用于哈希与校验密码。
提示
pwdlib 也支持 bcrypt 哈希算法,但不包含遗留算法——如果需要处理过时的哈希,建议使用 passlib 库。
例如,你可以用它读取并校验其他系统(如 Django)生成的密码,但对任何新密码使用不同的算法(如 Argon2 或 Bcrypt)进行哈希。
并且能够同时与它们全部兼容。
创建一个工具函数来哈希用户传入的密码。
再创建一个工具函数来校验接收的密码是否匹配已存储的哈希。
再创建一个工具函数来进行身份验证并返回用户。
fromdatetimeimportdatetime,timedelta,timezonefromtypingimportAnnotatedimportjwtfromfastapiimportDepends,FastAPI,HTTPException,statusfromfastapi.securityimportOAuth2PasswordBearer,OAuth2PasswordRequestFormfromjwt.exceptionsimportInvalidTokenErrorfrompwdlibimportPasswordHashfrompydanticimportBaseModel# to get a string like this run:# openssl rand -hex 32SECRET_KEY="09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"ALGORITHM="HS256"ACCESS_TOKEN_EXPIRE_MINUTES=30fake_users_db={"johndoe":{"username":"johndoe","full_name":"John Doe","email":"johndoe@example.com","hashed_password":"$argon2id$v=19$m=65536,t=3,p=4$wagCPXjifgvUFBzq4hqe3w$CYaIb8sB+wtD+Vu/P4uod1+Qof8h+1g7bbDlBID48Rc","disabled":False,}}classToken(BaseModel):access_token:strtoken_type:strclassTokenData(BaseModel):username:str|None=NoneclassUser(BaseModel):username:stremail:str|None=Nonefull_name:str|None=Nonedisabled:bool|None=NoneclassUserInDB(User):hashed_password:strpassword_hash=PasswordHash.recommended()DUMMY_HASH=password_hash.hash("dummypassword")oauth2_scheme=OAuth2PasswordBearer(tokenUrl="token")app=FastAPI()defverify_password(plain_password,hashed_password):returnpassword_hash.verify(plain_password,hashed_password)defget_password_hash(password):returnpassword_hash.hash(password)defget_user(db,username:str):ifusernameindb:user_dict=db[username]returnUserInDB(**user_dict)defauthenticate_user(fake_db,username:str,password:str):user=get_user(fake_db,username)ifnotuser:verify_password(password,DUMMY_HASH)returnFalseifnotverify_password(password,user.hashed_password):returnFalsereturnuserdefcreate_access_token(data:dict,expires_delta:timedelta|None=None):to_encode=data.copy()ifexpires_delta:expire=datetime.now(timezone.utc)+expires_deltaelse:expire=datetime.now(timezone.utc)+timedelta(minutes=15)to_encode.update({"exp":expire})encoded_jwt=jwt.encode(to_encode,SECRET_KEY,algorithm=ALGORITHM)returnencoded_jwtasyncdefget_current_user(token:Annotated[str,Depends(oauth2_scheme)]):credentials_exception=HTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Could not validate credentials",headers={"WWW-Authenticate":"Bearer"},)try:payload=jwt.decode(token,SECRET_KEY,algorithms=[ALGORITHM])username=payload.get("sub")ifusernameisNone:raisecredentials_exceptiontoken_data=TokenData(username=username)exceptInvalidTokenError:raisecredentials_exceptionuser=get_user(fake_users_db,username=token_data.username)ifuserisNone:raisecredentials_exceptionreturnuserasyncdefget_current_active_user(current_user:Annotated[User,Depends(get_current_user)],):ifcurrent_user.disabled:raiseHTTPException(status_code=400,detail="Inactive user")returncurrent_user@app.post("/token")asyncdeflogin_for_access_token(form_data:Annotated[OAuth2PasswordRequestForm,Depends()],)->Token:user=authenticate_user(fake_users_db,form_data.username,form_data.password)ifnotuser:raiseHTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Incorrect username or password",headers={"WWW-Authenticate":"Bearer"},)access_token_expires=timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)access_token=create_access_token(data={"sub":user.username},expires_delta=access_token_expires)returnToken(access_token=access_token,token_type="bearer")@app.get("/users/me/")asyncdefread_users_me(current_user:Annotated[User,Depends(get_current_active_user)],)->User:returncurrent_user@app.get("/users/me/items/")asyncdefread_own_items(current_user:Annotated[User,Depends(get_current_active_user)],):return[{"item_id":"Foo","owner":current_user.username}]🤓 Other versions and variants
Tip
Prefer to use theAnnotated version if possible.
fromdatetimeimportdatetime,timedelta,timezoneimportjwtfromfastapiimportDepends,FastAPI,HTTPException,statusfromfastapi.securityimportOAuth2PasswordBearer,OAuth2PasswordRequestFormfromjwt.exceptionsimportInvalidTokenErrorfrompwdlibimportPasswordHashfrompydanticimportBaseModel# to get a string like this run:# openssl rand -hex 32SECRET_KEY="09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"ALGORITHM="HS256"ACCESS_TOKEN_EXPIRE_MINUTES=30fake_users_db={"johndoe":{"username":"johndoe","full_name":"John Doe","email":"johndoe@example.com","hashed_password":"$argon2id$v=19$m=65536,t=3,p=4$wagCPXjifgvUFBzq4hqe3w$CYaIb8sB+wtD+Vu/P4uod1+Qof8h+1g7bbDlBID48Rc","disabled":False,}}classToken(BaseModel):access_token:strtoken_type:strclassTokenData(BaseModel):username:str|None=NoneclassUser(BaseModel):username:stremail:str|None=Nonefull_name:str|None=Nonedisabled:bool|None=NoneclassUserInDB(User):hashed_password:strpassword_hash=PasswordHash.recommended()DUMMY_HASH=password_hash.hash("dummypassword")oauth2_scheme=OAuth2PasswordBearer(tokenUrl="token")app=FastAPI()defverify_password(plain_password,hashed_password):returnpassword_hash.verify(plain_password,hashed_password)defget_password_hash(password):returnpassword_hash.hash(password)defget_user(db,username:str):ifusernameindb:user_dict=db[username]returnUserInDB(**user_dict)defauthenticate_user(fake_db,username:str,password:str):user=get_user(fake_db,username)ifnotuser:verify_password(password,DUMMY_HASH)returnFalseifnotverify_password(password,user.hashed_password):returnFalsereturnuserdefcreate_access_token(data:dict,expires_delta:timedelta|None=None):to_encode=data.copy()ifexpires_delta:expire=datetime.now(timezone.utc)+expires_deltaelse:expire=datetime.now(timezone.utc)+timedelta(minutes=15)to_encode.update({"exp":expire})encoded_jwt=jwt.encode(to_encode,SECRET_KEY,algorithm=ALGORITHM)returnencoded_jwtasyncdefget_current_user(token:str=Depends(oauth2_scheme)):credentials_exception=HTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Could not validate credentials",headers={"WWW-Authenticate":"Bearer"},)try:payload=jwt.decode(token,SECRET_KEY,algorithms=[ALGORITHM])username=payload.get("sub")ifusernameisNone:raisecredentials_exceptiontoken_data=TokenData(username=username)exceptInvalidTokenError:raisecredentials_exceptionuser=get_user(fake_users_db,username=token_data.username)ifuserisNone:raisecredentials_exceptionreturnuserasyncdefget_current_active_user(current_user:User=Depends(get_current_user)):ifcurrent_user.disabled:raiseHTTPException(status_code=400,detail="Inactive user")returncurrent_user@app.post("/token")asyncdeflogin_for_access_token(form_data:OAuth2PasswordRequestForm=Depends(),)->Token:user=authenticate_user(fake_users_db,form_data.username,form_data.password)ifnotuser:raiseHTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Incorrect username or password",headers={"WWW-Authenticate":"Bearer"},)access_token_expires=timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)access_token=create_access_token(data={"sub":user.username},expires_delta=access_token_expires)returnToken(access_token=access_token,token_type="bearer")@app.get("/users/me/")asyncdefread_users_me(current_user:User=Depends(get_current_active_user))->User:returncurrent_user@app.get("/users/me/items/")asyncdefread_own_items(current_user:User=Depends(get_current_active_user)):return[{"item_id":"Foo","owner":current_user.username}]当使用一个在数据库中不存在的用户名调用authenticate_user 时,我们仍然会针对一个虚拟哈希运行verify_password。
这可以确保无论用户名是否有效,端点的响应时间大致相同,从而防止可用于枚举已存在用户名的“时间攻击”(timing attacks)。
注意
如果你查看新的(伪)数据库fake_users_db,现在你会看到哈希后的密码类似这样:"$argon2id$v=19$m=65536,t=3,p=4$wagCPXjifgvUFBzq4hqe3w$CYaIb8sB+wtD+Vu/P4uod1+Qof8h+1g7bbDlBID48Rc"。
处理 JWT 令牌¶
导入已安装的模块。
创建一个用于对 JWT 令牌进行签名的随机密钥。
使用下列命令生成一个安全的随机密钥:
$opensslrand-hex3209d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7把输出复制到变量SECRET_KEY(不要使用示例中的那个)。
创建变量ALGORITHM,设置用于签名 JWT 令牌的算法,这里设为"HS256"。
创建一个变量用于设置令牌的过期时间。
定义一个用于令牌端点响应的 Pydantic 模型。
创建一个生成新访问令牌的工具函数。
fromdatetimeimportdatetime,timedelta,timezonefromtypingimportAnnotatedimportjwtfromfastapiimportDepends,FastAPI,HTTPException,statusfromfastapi.securityimportOAuth2PasswordBearer,OAuth2PasswordRequestFormfromjwt.exceptionsimportInvalidTokenErrorfrompwdlibimportPasswordHashfrompydanticimportBaseModel# to get a string like this run:# openssl rand -hex 32SECRET_KEY="09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"ALGORITHM="HS256"ACCESS_TOKEN_EXPIRE_MINUTES=30fake_users_db={"johndoe":{"username":"johndoe","full_name":"John Doe","email":"johndoe@example.com","hashed_password":"$argon2id$v=19$m=65536,t=3,p=4$wagCPXjifgvUFBzq4hqe3w$CYaIb8sB+wtD+Vu/P4uod1+Qof8h+1g7bbDlBID48Rc","disabled":False,}}classToken(BaseModel):access_token:strtoken_type:strclassTokenData(BaseModel):username:str|None=NoneclassUser(BaseModel):username:stremail:str|None=Nonefull_name:str|None=Nonedisabled:bool|None=NoneclassUserInDB(User):hashed_password:strpassword_hash=PasswordHash.recommended()DUMMY_HASH=password_hash.hash("dummypassword")oauth2_scheme=OAuth2PasswordBearer(tokenUrl="token")app=FastAPI()defverify_password(plain_password,hashed_password):returnpassword_hash.verify(plain_password,hashed_password)defget_password_hash(password):returnpassword_hash.hash(password)defget_user(db,username:str):ifusernameindb:user_dict=db[username]returnUserInDB(**user_dict)defauthenticate_user(fake_db,username:str,password:str):user=get_user(fake_db,username)ifnotuser:verify_password(password,DUMMY_HASH)returnFalseifnotverify_password(password,user.hashed_password):returnFalsereturnuserdefcreate_access_token(data:dict,expires_delta:timedelta|None=None):to_encode=data.copy()ifexpires_delta:expire=datetime.now(timezone.utc)+expires_deltaelse:expire=datetime.now(timezone.utc)+timedelta(minutes=15)to_encode.update({"exp":expire})encoded_jwt=jwt.encode(to_encode,SECRET_KEY,algorithm=ALGORITHM)returnencoded_jwtasyncdefget_current_user(token:Annotated[str,Depends(oauth2_scheme)]):credentials_exception=HTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Could not validate credentials",headers={"WWW-Authenticate":"Bearer"},)try:payload=jwt.decode(token,SECRET_KEY,algorithms=[ALGORITHM])username=payload.get("sub")ifusernameisNone:raisecredentials_exceptiontoken_data=TokenData(username=username)exceptInvalidTokenError:raisecredentials_exceptionuser=get_user(fake_users_db,username=token_data.username)ifuserisNone:raisecredentials_exceptionreturnuserasyncdefget_current_active_user(current_user:Annotated[User,Depends(get_current_user)],):ifcurrent_user.disabled:raiseHTTPException(status_code=400,detail="Inactive user")returncurrent_user@app.post("/token")asyncdeflogin_for_access_token(form_data:Annotated[OAuth2PasswordRequestForm,Depends()],)->Token:user=authenticate_user(fake_users_db,form_data.username,form_data.password)ifnotuser:raiseHTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Incorrect username or password",headers={"WWW-Authenticate":"Bearer"},)access_token_expires=timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)access_token=create_access_token(data={"sub":user.username},expires_delta=access_token_expires)returnToken(access_token=access_token,token_type="bearer")@app.get("/users/me/")asyncdefread_users_me(current_user:Annotated[User,Depends(get_current_active_user)],)->User:returncurrent_user@app.get("/users/me/items/")asyncdefread_own_items(current_user:Annotated[User,Depends(get_current_active_user)],):return[{"item_id":"Foo","owner":current_user.username}]🤓 Other versions and variants
Tip
Prefer to use theAnnotated version if possible.
fromdatetimeimportdatetime,timedelta,timezoneimportjwtfromfastapiimportDepends,FastAPI,HTTPException,statusfromfastapi.securityimportOAuth2PasswordBearer,OAuth2PasswordRequestFormfromjwt.exceptionsimportInvalidTokenErrorfrompwdlibimportPasswordHashfrompydanticimportBaseModel# to get a string like this run:# openssl rand -hex 32SECRET_KEY="09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"ALGORITHM="HS256"ACCESS_TOKEN_EXPIRE_MINUTES=30fake_users_db={"johndoe":{"username":"johndoe","full_name":"John Doe","email":"johndoe@example.com","hashed_password":"$argon2id$v=19$m=65536,t=3,p=4$wagCPXjifgvUFBzq4hqe3w$CYaIb8sB+wtD+Vu/P4uod1+Qof8h+1g7bbDlBID48Rc","disabled":False,}}classToken(BaseModel):access_token:strtoken_type:strclassTokenData(BaseModel):username:str|None=NoneclassUser(BaseModel):username:stremail:str|None=Nonefull_name:str|None=Nonedisabled:bool|None=NoneclassUserInDB(User):hashed_password:strpassword_hash=PasswordHash.recommended()DUMMY_HASH=password_hash.hash("dummypassword")oauth2_scheme=OAuth2PasswordBearer(tokenUrl="token")app=FastAPI()defverify_password(plain_password,hashed_password):returnpassword_hash.verify(plain_password,hashed_password)defget_password_hash(password):returnpassword_hash.hash(password)defget_user(db,username:str):ifusernameindb:user_dict=db[username]returnUserInDB(**user_dict)defauthenticate_user(fake_db,username:str,password:str):user=get_user(fake_db,username)ifnotuser:verify_password(password,DUMMY_HASH)returnFalseifnotverify_password(password,user.hashed_password):returnFalsereturnuserdefcreate_access_token(data:dict,expires_delta:timedelta|None=None):to_encode=data.copy()ifexpires_delta:expire=datetime.now(timezone.utc)+expires_deltaelse:expire=datetime.now(timezone.utc)+timedelta(minutes=15)to_encode.update({"exp":expire})encoded_jwt=jwt.encode(to_encode,SECRET_KEY,algorithm=ALGORITHM)returnencoded_jwtasyncdefget_current_user(token:str=Depends(oauth2_scheme)):credentials_exception=HTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Could not validate credentials",headers={"WWW-Authenticate":"Bearer"},)try:payload=jwt.decode(token,SECRET_KEY,algorithms=[ALGORITHM])username=payload.get("sub")ifusernameisNone:raisecredentials_exceptiontoken_data=TokenData(username=username)exceptInvalidTokenError:raisecredentials_exceptionuser=get_user(fake_users_db,username=token_data.username)ifuserisNone:raisecredentials_exceptionreturnuserasyncdefget_current_active_user(current_user:User=Depends(get_current_user)):ifcurrent_user.disabled:raiseHTTPException(status_code=400,detail="Inactive user")returncurrent_user@app.post("/token")asyncdeflogin_for_access_token(form_data:OAuth2PasswordRequestForm=Depends(),)->Token:user=authenticate_user(fake_users_db,form_data.username,form_data.password)ifnotuser:raiseHTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Incorrect username or password",headers={"WWW-Authenticate":"Bearer"},)access_token_expires=timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)access_token=create_access_token(data={"sub":user.username},expires_delta=access_token_expires)returnToken(access_token=access_token,token_type="bearer")@app.get("/users/me/")asyncdefread_users_me(current_user:User=Depends(get_current_active_user))->User:returncurrent_user@app.get("/users/me/items/")asyncdefread_own_items(current_user:User=Depends(get_current_active_user)):return[{"item_id":"Foo","owner":current_user.username}]更新依赖项¶
更新get_current_user 以接收与之前相同的令牌,但这次使用的是 JWT 令牌。
解码接收到的令牌,进行校验,并返回当前用户。
如果令牌无效,立即返回一个 HTTP 错误。
fromdatetimeimportdatetime,timedelta,timezonefromtypingimportAnnotatedimportjwtfromfastapiimportDepends,FastAPI,HTTPException,statusfromfastapi.securityimportOAuth2PasswordBearer,OAuth2PasswordRequestFormfromjwt.exceptionsimportInvalidTokenErrorfrompwdlibimportPasswordHashfrompydanticimportBaseModel# to get a string like this run:# openssl rand -hex 32SECRET_KEY="09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"ALGORITHM="HS256"ACCESS_TOKEN_EXPIRE_MINUTES=30fake_users_db={"johndoe":{"username":"johndoe","full_name":"John Doe","email":"johndoe@example.com","hashed_password":"$argon2id$v=19$m=65536,t=3,p=4$wagCPXjifgvUFBzq4hqe3w$CYaIb8sB+wtD+Vu/P4uod1+Qof8h+1g7bbDlBID48Rc","disabled":False,}}classToken(BaseModel):access_token:strtoken_type:strclassTokenData(BaseModel):username:str|None=NoneclassUser(BaseModel):username:stremail:str|None=Nonefull_name:str|None=Nonedisabled:bool|None=NoneclassUserInDB(User):hashed_password:strpassword_hash=PasswordHash.recommended()DUMMY_HASH=password_hash.hash("dummypassword")oauth2_scheme=OAuth2PasswordBearer(tokenUrl="token")app=FastAPI()defverify_password(plain_password,hashed_password):returnpassword_hash.verify(plain_password,hashed_password)defget_password_hash(password):returnpassword_hash.hash(password)defget_user(db,username:str):ifusernameindb:user_dict=db[username]returnUserInDB(**user_dict)defauthenticate_user(fake_db,username:str,password:str):user=get_user(fake_db,username)ifnotuser:verify_password(password,DUMMY_HASH)returnFalseifnotverify_password(password,user.hashed_password):returnFalsereturnuserdefcreate_access_token(data:dict,expires_delta:timedelta|None=None):to_encode=data.copy()ifexpires_delta:expire=datetime.now(timezone.utc)+expires_deltaelse:expire=datetime.now(timezone.utc)+timedelta(minutes=15)to_encode.update({"exp":expire})encoded_jwt=jwt.encode(to_encode,SECRET_KEY,algorithm=ALGORITHM)returnencoded_jwtasyncdefget_current_user(token:Annotated[str,Depends(oauth2_scheme)]):credentials_exception=HTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Could not validate credentials",headers={"WWW-Authenticate":"Bearer"},)try:payload=jwt.decode(token,SECRET_KEY,algorithms=[ALGORITHM])username=payload.get("sub")ifusernameisNone:raisecredentials_exceptiontoken_data=TokenData(username=username)exceptInvalidTokenError:raisecredentials_exceptionuser=get_user(fake_users_db,username=token_data.username)ifuserisNone:raisecredentials_exceptionreturnuserasyncdefget_current_active_user(current_user:Annotated[User,Depends(get_current_user)],):ifcurrent_user.disabled:raiseHTTPException(status_code=400,detail="Inactive user")returncurrent_user@app.post("/token")asyncdeflogin_for_access_token(form_data:Annotated[OAuth2PasswordRequestForm,Depends()],)->Token:user=authenticate_user(fake_users_db,form_data.username,form_data.password)ifnotuser:raiseHTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Incorrect username or password",headers={"WWW-Authenticate":"Bearer"},)access_token_expires=timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)access_token=create_access_token(data={"sub":user.username},expires_delta=access_token_expires)returnToken(access_token=access_token,token_type="bearer")@app.get("/users/me/")asyncdefread_users_me(current_user:Annotated[User,Depends(get_current_active_user)],)->User:returncurrent_user@app.get("/users/me/items/")asyncdefread_own_items(current_user:Annotated[User,Depends(get_current_active_user)],):return[{"item_id":"Foo","owner":current_user.username}]🤓 Other versions and variants
Tip
Prefer to use theAnnotated version if possible.
fromdatetimeimportdatetime,timedelta,timezoneimportjwtfromfastapiimportDepends,FastAPI,HTTPException,statusfromfastapi.securityimportOAuth2PasswordBearer,OAuth2PasswordRequestFormfromjwt.exceptionsimportInvalidTokenErrorfrompwdlibimportPasswordHashfrompydanticimportBaseModel# to get a string like this run:# openssl rand -hex 32SECRET_KEY="09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"ALGORITHM="HS256"ACCESS_TOKEN_EXPIRE_MINUTES=30fake_users_db={"johndoe":{"username":"johndoe","full_name":"John Doe","email":"johndoe@example.com","hashed_password":"$argon2id$v=19$m=65536,t=3,p=4$wagCPXjifgvUFBzq4hqe3w$CYaIb8sB+wtD+Vu/P4uod1+Qof8h+1g7bbDlBID48Rc","disabled":False,}}classToken(BaseModel):access_token:strtoken_type:strclassTokenData(BaseModel):username:str|None=NoneclassUser(BaseModel):username:stremail:str|None=Nonefull_name:str|None=Nonedisabled:bool|None=NoneclassUserInDB(User):hashed_password:strpassword_hash=PasswordHash.recommended()DUMMY_HASH=password_hash.hash("dummypassword")oauth2_scheme=OAuth2PasswordBearer(tokenUrl="token")app=FastAPI()defverify_password(plain_password,hashed_password):returnpassword_hash.verify(plain_password,hashed_password)defget_password_hash(password):returnpassword_hash.hash(password)defget_user(db,username:str):ifusernameindb:user_dict=db[username]returnUserInDB(**user_dict)defauthenticate_user(fake_db,username:str,password:str):user=get_user(fake_db,username)ifnotuser:verify_password(password,DUMMY_HASH)returnFalseifnotverify_password(password,user.hashed_password):returnFalsereturnuserdefcreate_access_token(data:dict,expires_delta:timedelta|None=None):to_encode=data.copy()ifexpires_delta:expire=datetime.now(timezone.utc)+expires_deltaelse:expire=datetime.now(timezone.utc)+timedelta(minutes=15)to_encode.update({"exp":expire})encoded_jwt=jwt.encode(to_encode,SECRET_KEY,algorithm=ALGORITHM)returnencoded_jwtasyncdefget_current_user(token:str=Depends(oauth2_scheme)):credentials_exception=HTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Could not validate credentials",headers={"WWW-Authenticate":"Bearer"},)try:payload=jwt.decode(token,SECRET_KEY,algorithms=[ALGORITHM])username=payload.get("sub")ifusernameisNone:raisecredentials_exceptiontoken_data=TokenData(username=username)exceptInvalidTokenError:raisecredentials_exceptionuser=get_user(fake_users_db,username=token_data.username)ifuserisNone:raisecredentials_exceptionreturnuserasyncdefget_current_active_user(current_user:User=Depends(get_current_user)):ifcurrent_user.disabled:raiseHTTPException(status_code=400,detail="Inactive user")returncurrent_user@app.post("/token")asyncdeflogin_for_access_token(form_data:OAuth2PasswordRequestForm=Depends(),)->Token:user=authenticate_user(fake_users_db,form_data.username,form_data.password)ifnotuser:raiseHTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Incorrect username or password",headers={"WWW-Authenticate":"Bearer"},)access_token_expires=timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)access_token=create_access_token(data={"sub":user.username},expires_delta=access_token_expires)returnToken(access_token=access_token,token_type="bearer")@app.get("/users/me/")asyncdefread_users_me(current_user:User=Depends(get_current_active_user))->User:returncurrent_user@app.get("/users/me/items/")asyncdefread_own_items(current_user:User=Depends(get_current_active_user)):return[{"item_id":"Foo","owner":current_user.username}]更新/token 路径操作¶
用令牌的过期时间创建一个timedelta。
创建一个真正的 JWT 访问令牌并返回它。
fromdatetimeimportdatetime,timedelta,timezonefromtypingimportAnnotatedimportjwtfromfastapiimportDepends,FastAPI,HTTPException,statusfromfastapi.securityimportOAuth2PasswordBearer,OAuth2PasswordRequestFormfromjwt.exceptionsimportInvalidTokenErrorfrompwdlibimportPasswordHashfrompydanticimportBaseModel# to get a string like this run:# openssl rand -hex 32SECRET_KEY="09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"ALGORITHM="HS256"ACCESS_TOKEN_EXPIRE_MINUTES=30fake_users_db={"johndoe":{"username":"johndoe","full_name":"John Doe","email":"johndoe@example.com","hashed_password":"$argon2id$v=19$m=65536,t=3,p=4$wagCPXjifgvUFBzq4hqe3w$CYaIb8sB+wtD+Vu/P4uod1+Qof8h+1g7bbDlBID48Rc","disabled":False,}}classToken(BaseModel):access_token:strtoken_type:strclassTokenData(BaseModel):username:str|None=NoneclassUser(BaseModel):username:stremail:str|None=Nonefull_name:str|None=Nonedisabled:bool|None=NoneclassUserInDB(User):hashed_password:strpassword_hash=PasswordHash.recommended()DUMMY_HASH=password_hash.hash("dummypassword")oauth2_scheme=OAuth2PasswordBearer(tokenUrl="token")app=FastAPI()defverify_password(plain_password,hashed_password):returnpassword_hash.verify(plain_password,hashed_password)defget_password_hash(password):returnpassword_hash.hash(password)defget_user(db,username:str):ifusernameindb:user_dict=db[username]returnUserInDB(**user_dict)defauthenticate_user(fake_db,username:str,password:str):user=get_user(fake_db,username)ifnotuser:verify_password(password,DUMMY_HASH)returnFalseifnotverify_password(password,user.hashed_password):returnFalsereturnuserdefcreate_access_token(data:dict,expires_delta:timedelta|None=None):to_encode=data.copy()ifexpires_delta:expire=datetime.now(timezone.utc)+expires_deltaelse:expire=datetime.now(timezone.utc)+timedelta(minutes=15)to_encode.update({"exp":expire})encoded_jwt=jwt.encode(to_encode,SECRET_KEY,algorithm=ALGORITHM)returnencoded_jwtasyncdefget_current_user(token:Annotated[str,Depends(oauth2_scheme)]):credentials_exception=HTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Could not validate credentials",headers={"WWW-Authenticate":"Bearer"},)try:payload=jwt.decode(token,SECRET_KEY,algorithms=[ALGORITHM])username=payload.get("sub")ifusernameisNone:raisecredentials_exceptiontoken_data=TokenData(username=username)exceptInvalidTokenError:raisecredentials_exceptionuser=get_user(fake_users_db,username=token_data.username)ifuserisNone:raisecredentials_exceptionreturnuserasyncdefget_current_active_user(current_user:Annotated[User,Depends(get_current_user)],):ifcurrent_user.disabled:raiseHTTPException(status_code=400,detail="Inactive user")returncurrent_user@app.post("/token")asyncdeflogin_for_access_token(form_data:Annotated[OAuth2PasswordRequestForm,Depends()],)->Token:user=authenticate_user(fake_users_db,form_data.username,form_data.password)ifnotuser:raiseHTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Incorrect username or password",headers={"WWW-Authenticate":"Bearer"},)access_token_expires=timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)access_token=create_access_token(data={"sub":user.username},expires_delta=access_token_expires)returnToken(access_token=access_token,token_type="bearer")@app.get("/users/me/")asyncdefread_users_me(current_user:Annotated[User,Depends(get_current_active_user)],)->User:returncurrent_user@app.get("/users/me/items/")asyncdefread_own_items(current_user:Annotated[User,Depends(get_current_active_user)],):return[{"item_id":"Foo","owner":current_user.username}]🤓 Other versions and variants
Tip
Prefer to use theAnnotated version if possible.
fromdatetimeimportdatetime,timedelta,timezoneimportjwtfromfastapiimportDepends,FastAPI,HTTPException,statusfromfastapi.securityimportOAuth2PasswordBearer,OAuth2PasswordRequestFormfromjwt.exceptionsimportInvalidTokenErrorfrompwdlibimportPasswordHashfrompydanticimportBaseModel# to get a string like this run:# openssl rand -hex 32SECRET_KEY="09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"ALGORITHM="HS256"ACCESS_TOKEN_EXPIRE_MINUTES=30fake_users_db={"johndoe":{"username":"johndoe","full_name":"John Doe","email":"johndoe@example.com","hashed_password":"$argon2id$v=19$m=65536,t=3,p=4$wagCPXjifgvUFBzq4hqe3w$CYaIb8sB+wtD+Vu/P4uod1+Qof8h+1g7bbDlBID48Rc","disabled":False,}}classToken(BaseModel):access_token:strtoken_type:strclassTokenData(BaseModel):username:str|None=NoneclassUser(BaseModel):username:stremail:str|None=Nonefull_name:str|None=Nonedisabled:bool|None=NoneclassUserInDB(User):hashed_password:strpassword_hash=PasswordHash.recommended()DUMMY_HASH=password_hash.hash("dummypassword")oauth2_scheme=OAuth2PasswordBearer(tokenUrl="token")app=FastAPI()defverify_password(plain_password,hashed_password):returnpassword_hash.verify(plain_password,hashed_password)defget_password_hash(password):returnpassword_hash.hash(password)defget_user(db,username:str):ifusernameindb:user_dict=db[username]returnUserInDB(**user_dict)defauthenticate_user(fake_db,username:str,password:str):user=get_user(fake_db,username)ifnotuser:verify_password(password,DUMMY_HASH)returnFalseifnotverify_password(password,user.hashed_password):returnFalsereturnuserdefcreate_access_token(data:dict,expires_delta:timedelta|None=None):to_encode=data.copy()ifexpires_delta:expire=datetime.now(timezone.utc)+expires_deltaelse:expire=datetime.now(timezone.utc)+timedelta(minutes=15)to_encode.update({"exp":expire})encoded_jwt=jwt.encode(to_encode,SECRET_KEY,algorithm=ALGORITHM)returnencoded_jwtasyncdefget_current_user(token:str=Depends(oauth2_scheme)):credentials_exception=HTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Could not validate credentials",headers={"WWW-Authenticate":"Bearer"},)try:payload=jwt.decode(token,SECRET_KEY,algorithms=[ALGORITHM])username=payload.get("sub")ifusernameisNone:raisecredentials_exceptiontoken_data=TokenData(username=username)exceptInvalidTokenError:raisecredentials_exceptionuser=get_user(fake_users_db,username=token_data.username)ifuserisNone:raisecredentials_exceptionreturnuserasyncdefget_current_active_user(current_user:User=Depends(get_current_user)):ifcurrent_user.disabled:raiseHTTPException(status_code=400,detail="Inactive user")returncurrent_user@app.post("/token")asyncdeflogin_for_access_token(form_data:OAuth2PasswordRequestForm=Depends(),)->Token:user=authenticate_user(fake_users_db,form_data.username,form_data.password)ifnotuser:raiseHTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Incorrect username or password",headers={"WWW-Authenticate":"Bearer"},)access_token_expires=timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)access_token=create_access_token(data={"sub":user.username},expires_delta=access_token_expires)returnToken(access_token=access_token,token_type="bearer")@app.get("/users/me/")asyncdefread_users_me(current_user:User=Depends(get_current_active_user))->User:returncurrent_user@app.get("/users/me/items/")asyncdefread_own_items(current_user:User=Depends(get_current_active_user)):return[{"item_id":"Foo","owner":current_user.username}]关于 JWT “主题”sub 的技术细节¶
JWT 规范中有一个sub 键,表示令牌的“主题”(subject)。
使用它是可选的,但通常会把用户的标识放在这里,所以本例中我们使用它。
JWT 除了用于识别用户并允许其直接在你的 API 上执行操作之外,还可能用于其他场景。
例如,你可以用它来标识一辆“车”或一篇“博客文章”。
然后你可以为该实体添加权限,比如“drive”(用于车)或“edit”(用于博客)。
接着,你可以把这个 JWT 令牌交给一个用户(或机器人),他们就可以在没有账户的前提下,仅凭你的 API 生成的 JWT 令牌来执行这些操作(开车、编辑文章)。
基于这些想法,JWT 可以用于更复杂的场景。
在这些情况下,多个实体可能会有相同的 ID,比如都叫foo(用户foo、车foo、博客文章foo)。
因此,为了避免 ID 冲突,在为用户创建 JWT 令牌时,你可以给sub 键的值加一个前缀,例如username:。所以在这个例子中,sub 的值可以是:username:johndoe。
需要牢记的一点是,sub 键在整个应用中应该是一个唯一标识符,并且它应该是字符串。
检查¶
运行服务器并打开文档:http://127.0.0.1:8000/docs。
你会看到这样的用户界面:

像之前一样进行授权。
使用以下凭证:
用户名:johndoe密码:secret
检查
注意,代码中的任何地方都没有明文密码 “secret”,我们只有它的哈希版本。

调用/users/me/ 端点,你将得到如下响应:
{"username":"johndoe","email":"johndoe@example.com","full_name":"John Doe","disabled":false}
如果你打开开发者工具,你会看到发送的数据只包含令牌。密码只会在第一个请求中用于认证用户并获取访问令牌,之后就不会再发送密码了:

注意
注意Authorization 请求头,其值以Bearer 开头。
使用scopes 的高级用法¶
OAuth2 支持 “scopes”(作用域)。
你可以用它们为 JWT 令牌添加一组特定的权限。
然后你可以把这个令牌直接交给用户或第三方,在一组限制条件下与 API 交互。
在高级用户指南中你将学习如何使用它们,以及它们如何集成进FastAPI。
小结¶
通过目前所学内容,你可以使用 OAuth2 和 JWT 等标准来搭建一个安全的FastAPI 应用。
在几乎任何框架中,处理安全问题都会很快变得相当复杂。
许多把安全流程大幅简化的包,往往要在数据模型、数据库和可用特性上做出大量妥协。而有些过度简化的包实际上在底层存在安全隐患。
FastAPI 不会在任何数据库、数据模型或工具上做妥协。
它给予你完全的灵活性,选择最适合你项目的方案。
而且你可以直接使用许多维护良好、广泛使用的包,比如pwdlib 和PyJWT,因为FastAPI 不需要复杂机制来集成外部包。
同时它也为你提供尽可能简化流程的工具,而不牺牲灵活性、健壮性或安全性。
你可以以相对简单的方式使用和实现像 OAuth2 这样的安全、标准协议。
在高级用户指南中,你可以进一步了解如何使用 OAuth2 的 “scopes”,以遵循相同标准实现更细粒度的权限系统。带作用域的 OAuth2 是许多大型身份认证提供商(如 Facebook、Google、GitHub、Microsoft、X(Twitter)等)用来授权第三方应用代表其用户与其 API 交互的机制。







