Movatterモバイル変換


[0]ホーム

URL:


Перейти к содержанию
Join theFastAPI Cloud waiting list 🚀
Follow@fastapi onX (Twitter) to stay updated
FollowFastAPI onLinkedIn to stay updated
Subscribe to theFastAPI and friends newsletter 🎉
sponsor
sponsor
sponsor
sponsor
sponsor
sponsor
sponsor
sponsor
sponsor
sponsor
sponsor

OAuth2 с паролем (и хешированием), Bearer с JWT-токенами

🌐 Перевод выполнен с помощью ИИ и людей

Этот перевод был сделан ИИ под руководством людей. 🤝

В нем могут быть ошибки из-за неправильного понимания оригинального смысла или неестественности и т. д. 🤖

Вы можете улучшить этот перевод,помогая нам лучше направлять ИИ LLM.

Английская версия

Теперь, когда у нас определен процесс обеспечения безопасности, давайте сделаем приложение действительно безопасным, используя токеныJWT и безопасное хеширование паролей.

Этот код можно реально использовать в своем приложении, сохранять хэши паролей в базе данных и т.д.

Мы продолжим разбираться, начиная с того места, на котором остановились в предыдущей главе, и расширим его.

Про JWT

JWT означает "JSON Web Tokens".

Это стандарт для кодирования JSON-объекта в виде длинной строки без пробелов. Выглядит это следующим образом:

eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIxMjM0NTY3ODkwIiwibmFtZSI6IkpvaG4gRG9lIiwiaWF0IjoxNTE2MjM5MDIyfQ.SflKxwRJSMeKKF2QT4fwpMeJf36POk6yJV_adQssw5c

Он не зашифрован, поэтому любой человек может восстановить информацию из его содержимого.

Но он подписан. Следовательно, когда вы получаете токен, который вы эмитировали (выдавали), вы можете убедиться, что это именно вы его эмитировали.

Таким образом, можно создать токен со сроком действия, скажем, 1 неделя. А когда пользователь вернется на следующий день с тем же токеном, вы будете знать, что он все еще авторизован в вашей системе.

Через неделю срок действия токена истечет, пользователь не будет авторизован и ему придется заново входить в систему, чтобы получить новый токен. А если пользователь (или третье лицо) попытается модифицировать токен, чтобы изменить срок действия, вы сможете это обнаружить, поскольку подписи не будут совпадать.

Если вы хотите поиграть с JWT-токенами и посмотреть, как они работают, посмотритеhttps://jwt.io.

УстановкаPyJWT

Нам необходимо установитьpyjwt для генерации и проверки JWT-токенов на языке Python.

Убедитесь, что вы создаливиртуальное окружение, активируйте его, а затем установитеpyjwt:

$pipinstallpyjwt---> 100%

Дополнительная информация

Если вы планируете использовать алгоритмы цифровой подписи, такие как RSA или ECDSA, вам следует установить зависимость библиотеки криптографииpyjwt[crypto].

Подробнее об этом можно прочитать вдокументации по установке PyJWT.

Хеширование паролей

"Хеширование" означает преобразование некоторого содержимого (в данном случае пароля) в последовательность байтов (просто строку), которая выглядит как тарабарщина.

Каждый раз, когда вы передаете точно такое же содержимое (точно такой же пароль), вы получаете точно такую же тарабарщину.

Но преобразовать тарабарщину обратно в пароль невозможно.

Для чего нужно хеширование паролей

Если ваша база данных будет украдена, то вор не получит пароли пользователей в открытом виде, а только их хэши.

Таким образом, вор не сможет использовать этот пароль в другой системе (поскольку многие пользователи везде используют один и тот же пароль, это было бы опасно).

Установкаpwdlib

pwdlib — это отличный пакет Python для работы с хэшами паролей.

Он поддерживает множество безопасных алгоритмов хеширования и утилит для работы с ними.

Рекомендуемый алгоритм — "Argon2".

Убедитесь, что вы создаливиртуальное окружение, активируйте его, и затем установите pwdlib вместе с Argon2:

$pipinstall"pwdlib[argon2]"---> 100%

Подсказка

С помощьюpwdlib можно даже настроить его на чтение паролей, созданныхDjango, плагином безопасностиFlask или многими другими библиотеками.

Таким образом, вы сможете, например, совместно использовать одни и те же данные из приложения Django в базе данных с приложением FastAPI. Или постепенно мигрировать Django-приложение, используя ту же базу данных.

При этом пользователи смогут одновременно входить в систему как из приложения Django, так и из приложенияFastAPI.

Хеширование и проверка паролей

Импортируйте необходимые инструменты изpwdlib.

Создайте экземпляр PasswordHash с рекомендованными настройками — он будет использоваться для хэширования и проверки паролей.

Подсказка

pwdlib также поддерживает алгоритм хеширования bcrypt, но не включает устаревшие алгоритмы — для работы с устаревшими хэшами рекомендуется использовать библиотеку passlib.

Например, вы можете использовать ее для чтения и проверки паролей, сгенерированных другой системой (например, Django), но хэшировать все новые пароли другим алгоритмом, например Argon2 или Bcrypt.

И при этом быть совместимым со всеми этими системами.

Создайте служебную функцию для хэширования пароля, поступающего от пользователя.

А затем создайте другую — для проверки соответствия полученного пароля и хранимого хэша.

И еще одну — для аутентификации и возврата пользователя.

fromdatetimeimportdatetime,timedelta,timezonefromtypingimportAnnotatedimportjwtfromfastapiimportDepends,FastAPI,HTTPException,statusfromfastapi.securityimportOAuth2PasswordBearer,OAuth2PasswordRequestFormfromjwt.exceptionsimportInvalidTokenErrorfrompwdlibimportPasswordHashfrompydanticimportBaseModel# to get a string like this run:# openssl rand -hex 32SECRET_KEY="09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"ALGORITHM="HS256"ACCESS_TOKEN_EXPIRE_MINUTES=30fake_users_db={"johndoe":{"username":"johndoe","full_name":"John Doe","email":"johndoe@example.com","hashed_password":"$argon2id$v=19$m=65536,t=3,p=4$wagCPXjifgvUFBzq4hqe3w$CYaIb8sB+wtD+Vu/P4uod1+Qof8h+1g7bbDlBID48Rc","disabled":False,}}classToken(BaseModel):access_token:strtoken_type:strclassTokenData(BaseModel):username:str|None=NoneclassUser(BaseModel):username:stremail:str|None=Nonefull_name:str|None=Nonedisabled:bool|None=NoneclassUserInDB(User):hashed_password:strpassword_hash=PasswordHash.recommended()DUMMY_HASH=password_hash.hash("dummypassword")oauth2_scheme=OAuth2PasswordBearer(tokenUrl="token")app=FastAPI()defverify_password(plain_password,hashed_password):returnpassword_hash.verify(plain_password,hashed_password)defget_password_hash(password):returnpassword_hash.hash(password)defget_user(db,username:str):ifusernameindb:user_dict=db[username]returnUserInDB(**user_dict)defauthenticate_user(fake_db,username:str,password:str):user=get_user(fake_db,username)ifnotuser:verify_password(password,DUMMY_HASH)returnFalseifnotverify_password(password,user.hashed_password):returnFalsereturnuserdefcreate_access_token(data:dict,expires_delta:timedelta|None=None):to_encode=data.copy()ifexpires_delta:expire=datetime.now(timezone.utc)+expires_deltaelse:expire=datetime.now(timezone.utc)+timedelta(minutes=15)to_encode.update({"exp":expire})encoded_jwt=jwt.encode(to_encode,SECRET_KEY,algorithm=ALGORITHM)returnencoded_jwtasyncdefget_current_user(token:Annotated[str,Depends(oauth2_scheme)]):credentials_exception=HTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Could not validate credentials",headers={"WWW-Authenticate":"Bearer"},)try:payload=jwt.decode(token,SECRET_KEY,algorithms=[ALGORITHM])username=payload.get("sub")ifusernameisNone:raisecredentials_exceptiontoken_data=TokenData(username=username)exceptInvalidTokenError:raisecredentials_exceptionuser=get_user(fake_users_db,username=token_data.username)ifuserisNone:raisecredentials_exceptionreturnuserasyncdefget_current_active_user(current_user:Annotated[User,Depends(get_current_user)],):ifcurrent_user.disabled:raiseHTTPException(status_code=400,detail="Inactive user")returncurrent_user@app.post("/token")asyncdeflogin_for_access_token(form_data:Annotated[OAuth2PasswordRequestForm,Depends()],)->Token:user=authenticate_user(fake_users_db,form_data.username,form_data.password)ifnotuser:raiseHTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Incorrect username or password",headers={"WWW-Authenticate":"Bearer"},)access_token_expires=timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)access_token=create_access_token(data={"sub":user.username},expires_delta=access_token_expires)returnToken(access_token=access_token,token_type="bearer")@app.get("/users/me/")asyncdefread_users_me(current_user:Annotated[User,Depends(get_current_active_user)],)->User:returncurrent_user@app.get("/users/me/items/")asyncdefread_own_items(current_user:Annotated[User,Depends(get_current_active_user)],):return[{"item_id":"Foo","owner":current_user.username}]
🤓 Other versions and variants

Tip

Prefer to use theAnnotated version if possible.

fromdatetimeimportdatetime,timedelta,timezoneimportjwtfromfastapiimportDepends,FastAPI,HTTPException,statusfromfastapi.securityimportOAuth2PasswordBearer,OAuth2PasswordRequestFormfromjwt.exceptionsimportInvalidTokenErrorfrompwdlibimportPasswordHashfrompydanticimportBaseModel# to get a string like this run:# openssl rand -hex 32SECRET_KEY="09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"ALGORITHM="HS256"ACCESS_TOKEN_EXPIRE_MINUTES=30fake_users_db={"johndoe":{"username":"johndoe","full_name":"John Doe","email":"johndoe@example.com","hashed_password":"$argon2id$v=19$m=65536,t=3,p=4$wagCPXjifgvUFBzq4hqe3w$CYaIb8sB+wtD+Vu/P4uod1+Qof8h+1g7bbDlBID48Rc","disabled":False,}}classToken(BaseModel):access_token:strtoken_type:strclassTokenData(BaseModel):username:str|None=NoneclassUser(BaseModel):username:stremail:str|None=Nonefull_name:str|None=Nonedisabled:bool|None=NoneclassUserInDB(User):hashed_password:strpassword_hash=PasswordHash.recommended()DUMMY_HASH=password_hash.hash("dummypassword")oauth2_scheme=OAuth2PasswordBearer(tokenUrl="token")app=FastAPI()defverify_password(plain_password,hashed_password):returnpassword_hash.verify(plain_password,hashed_password)defget_password_hash(password):returnpassword_hash.hash(password)defget_user(db,username:str):ifusernameindb:user_dict=db[username]returnUserInDB(**user_dict)defauthenticate_user(fake_db,username:str,password:str):user=get_user(fake_db,username)ifnotuser:verify_password(password,DUMMY_HASH)returnFalseifnotverify_password(password,user.hashed_password):returnFalsereturnuserdefcreate_access_token(data:dict,expires_delta:timedelta|None=None):to_encode=data.copy()ifexpires_delta:expire=datetime.now(timezone.utc)+expires_deltaelse:expire=datetime.now(timezone.utc)+timedelta(minutes=15)to_encode.update({"exp":expire})encoded_jwt=jwt.encode(to_encode,SECRET_KEY,algorithm=ALGORITHM)returnencoded_jwtasyncdefget_current_user(token:str=Depends(oauth2_scheme)):credentials_exception=HTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Could not validate credentials",headers={"WWW-Authenticate":"Bearer"},)try:payload=jwt.decode(token,SECRET_KEY,algorithms=[ALGORITHM])username=payload.get("sub")ifusernameisNone:raisecredentials_exceptiontoken_data=TokenData(username=username)exceptInvalidTokenError:raisecredentials_exceptionuser=get_user(fake_users_db,username=token_data.username)ifuserisNone:raisecredentials_exceptionreturnuserasyncdefget_current_active_user(current_user:User=Depends(get_current_user)):ifcurrent_user.disabled:raiseHTTPException(status_code=400,detail="Inactive user")returncurrent_user@app.post("/token")asyncdeflogin_for_access_token(form_data:OAuth2PasswordRequestForm=Depends(),)->Token:user=authenticate_user(fake_users_db,form_data.username,form_data.password)ifnotuser:raiseHTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Incorrect username or password",headers={"WWW-Authenticate":"Bearer"},)access_token_expires=timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)access_token=create_access_token(data={"sub":user.username},expires_delta=access_token_expires)returnToken(access_token=access_token,token_type="bearer")@app.get("/users/me/")asyncdefread_users_me(current_user:User=Depends(get_current_active_user))->User:returncurrent_user@app.get("/users/me/items/")asyncdefread_own_items(current_user:User=Depends(get_current_active_user)):return[{"item_id":"Foo","owner":current_user.username}]

Когдаauthenticate_user вызывается с именем пользователя, которого нет в базе данных, мы все равно запускаемverify_password с использованием фиктивного хэша.

Это гарантирует, что эндпоинт отвечает примерно за одно и то же время вне зависимости от того, существует имя пользователя или нет, предотвращая тайминговые атаки (атака по времени), с помощью которых можно было бы перечислять существующие имена пользователей.

Технические детали

Если проверить новую (фальшивую) базу данныхfake_users_db, то можно увидеть, как теперь выглядит хэшированный пароль:"$argon2id$v=19$m=65536,t=3,p=4$wagCPXjifgvUFBzq4hqe3w$CYaIb8sB+wtD+Vu/P4uod1+Qof8h+1g7bbDlBID48Rc".

Работа с JWT токенами

Импортируйте установленные модули.

Создайте случайный секретный ключ, который будет использоваться для подписи JWT-токенов.

Для генерации безопасного случайного секретного ключа используйте команду:

$opensslrand-hex3209d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7

И скопируйте полученный результат в переменнуюSECRET_KEY (не используйте тот, что в примере).

Создайте переменнуюALGORITHM с алгоритмом, используемым для подписи JWT-токена, и установите для нее значение"HS256".

Создайте переменную для срока действия токена.

Определите Pydantic-модель, которая будет использоваться для формирования ответа на запрос на получение токена.

Создайте служебную функцию для генерации нового токена доступа.

fromdatetimeimportdatetime,timedelta,timezonefromtypingimportAnnotatedimportjwtfromfastapiimportDepends,FastAPI,HTTPException,statusfromfastapi.securityimportOAuth2PasswordBearer,OAuth2PasswordRequestFormfromjwt.exceptionsimportInvalidTokenErrorfrompwdlibimportPasswordHashfrompydanticimportBaseModel# to get a string like this run:# openssl rand -hex 32SECRET_KEY="09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"ALGORITHM="HS256"ACCESS_TOKEN_EXPIRE_MINUTES=30fake_users_db={"johndoe":{"username":"johndoe","full_name":"John Doe","email":"johndoe@example.com","hashed_password":"$argon2id$v=19$m=65536,t=3,p=4$wagCPXjifgvUFBzq4hqe3w$CYaIb8sB+wtD+Vu/P4uod1+Qof8h+1g7bbDlBID48Rc","disabled":False,}}classToken(BaseModel):access_token:strtoken_type:strclassTokenData(BaseModel):username:str|None=NoneclassUser(BaseModel):username:stremail:str|None=Nonefull_name:str|None=Nonedisabled:bool|None=NoneclassUserInDB(User):hashed_password:strpassword_hash=PasswordHash.recommended()DUMMY_HASH=password_hash.hash("dummypassword")oauth2_scheme=OAuth2PasswordBearer(tokenUrl="token")app=FastAPI()defverify_password(plain_password,hashed_password):returnpassword_hash.verify(plain_password,hashed_password)defget_password_hash(password):returnpassword_hash.hash(password)defget_user(db,username:str):ifusernameindb:user_dict=db[username]returnUserInDB(**user_dict)defauthenticate_user(fake_db,username:str,password:str):user=get_user(fake_db,username)ifnotuser:verify_password(password,DUMMY_HASH)returnFalseifnotverify_password(password,user.hashed_password):returnFalsereturnuserdefcreate_access_token(data:dict,expires_delta:timedelta|None=None):to_encode=data.copy()ifexpires_delta:expire=datetime.now(timezone.utc)+expires_deltaelse:expire=datetime.now(timezone.utc)+timedelta(minutes=15)to_encode.update({"exp":expire})encoded_jwt=jwt.encode(to_encode,SECRET_KEY,algorithm=ALGORITHM)returnencoded_jwtasyncdefget_current_user(token:Annotated[str,Depends(oauth2_scheme)]):credentials_exception=HTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Could not validate credentials",headers={"WWW-Authenticate":"Bearer"},)try:payload=jwt.decode(token,SECRET_KEY,algorithms=[ALGORITHM])username=payload.get("sub")ifusernameisNone:raisecredentials_exceptiontoken_data=TokenData(username=username)exceptInvalidTokenError:raisecredentials_exceptionuser=get_user(fake_users_db,username=token_data.username)ifuserisNone:raisecredentials_exceptionreturnuserasyncdefget_current_active_user(current_user:Annotated[User,Depends(get_current_user)],):ifcurrent_user.disabled:raiseHTTPException(status_code=400,detail="Inactive user")returncurrent_user@app.post("/token")asyncdeflogin_for_access_token(form_data:Annotated[OAuth2PasswordRequestForm,Depends()],)->Token:user=authenticate_user(fake_users_db,form_data.username,form_data.password)ifnotuser:raiseHTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Incorrect username or password",headers={"WWW-Authenticate":"Bearer"},)access_token_expires=timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)access_token=create_access_token(data={"sub":user.username},expires_delta=access_token_expires)returnToken(access_token=access_token,token_type="bearer")@app.get("/users/me/")asyncdefread_users_me(current_user:Annotated[User,Depends(get_current_active_user)],)->User:returncurrent_user@app.get("/users/me/items/")asyncdefread_own_items(current_user:Annotated[User,Depends(get_current_active_user)],):return[{"item_id":"Foo","owner":current_user.username}]
🤓 Other versions and variants

Tip

Prefer to use theAnnotated version if possible.

fromdatetimeimportdatetime,timedelta,timezoneimportjwtfromfastapiimportDepends,FastAPI,HTTPException,statusfromfastapi.securityimportOAuth2PasswordBearer,OAuth2PasswordRequestFormfromjwt.exceptionsimportInvalidTokenErrorfrompwdlibimportPasswordHashfrompydanticimportBaseModel# to get a string like this run:# openssl rand -hex 32SECRET_KEY="09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"ALGORITHM="HS256"ACCESS_TOKEN_EXPIRE_MINUTES=30fake_users_db={"johndoe":{"username":"johndoe","full_name":"John Doe","email":"johndoe@example.com","hashed_password":"$argon2id$v=19$m=65536,t=3,p=4$wagCPXjifgvUFBzq4hqe3w$CYaIb8sB+wtD+Vu/P4uod1+Qof8h+1g7bbDlBID48Rc","disabled":False,}}classToken(BaseModel):access_token:strtoken_type:strclassTokenData(BaseModel):username:str|None=NoneclassUser(BaseModel):username:stremail:str|None=Nonefull_name:str|None=Nonedisabled:bool|None=NoneclassUserInDB(User):hashed_password:strpassword_hash=PasswordHash.recommended()DUMMY_HASH=password_hash.hash("dummypassword")oauth2_scheme=OAuth2PasswordBearer(tokenUrl="token")app=FastAPI()defverify_password(plain_password,hashed_password):returnpassword_hash.verify(plain_password,hashed_password)defget_password_hash(password):returnpassword_hash.hash(password)defget_user(db,username:str):ifusernameindb:user_dict=db[username]returnUserInDB(**user_dict)defauthenticate_user(fake_db,username:str,password:str):user=get_user(fake_db,username)ifnotuser:verify_password(password,DUMMY_HASH)returnFalseifnotverify_password(password,user.hashed_password):returnFalsereturnuserdefcreate_access_token(data:dict,expires_delta:timedelta|None=None):to_encode=data.copy()ifexpires_delta:expire=datetime.now(timezone.utc)+expires_deltaelse:expire=datetime.now(timezone.utc)+timedelta(minutes=15)to_encode.update({"exp":expire})encoded_jwt=jwt.encode(to_encode,SECRET_KEY,algorithm=ALGORITHM)returnencoded_jwtasyncdefget_current_user(token:str=Depends(oauth2_scheme)):credentials_exception=HTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Could not validate credentials",headers={"WWW-Authenticate":"Bearer"},)try:payload=jwt.decode(token,SECRET_KEY,algorithms=[ALGORITHM])username=payload.get("sub")ifusernameisNone:raisecredentials_exceptiontoken_data=TokenData(username=username)exceptInvalidTokenError:raisecredentials_exceptionuser=get_user(fake_users_db,username=token_data.username)ifuserisNone:raisecredentials_exceptionreturnuserasyncdefget_current_active_user(current_user:User=Depends(get_current_user)):ifcurrent_user.disabled:raiseHTTPException(status_code=400,detail="Inactive user")returncurrent_user@app.post("/token")asyncdeflogin_for_access_token(form_data:OAuth2PasswordRequestForm=Depends(),)->Token:user=authenticate_user(fake_users_db,form_data.username,form_data.password)ifnotuser:raiseHTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Incorrect username or password",headers={"WWW-Authenticate":"Bearer"},)access_token_expires=timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)access_token=create_access_token(data={"sub":user.username},expires_delta=access_token_expires)returnToken(access_token=access_token,token_type="bearer")@app.get("/users/me/")asyncdefread_users_me(current_user:User=Depends(get_current_active_user))->User:returncurrent_user@app.get("/users/me/items/")asyncdefread_own_items(current_user:User=Depends(get_current_active_user)):return[{"item_id":"Foo","owner":current_user.username}]

Обновление зависимостей

Обновитеget_current_user для получения того же токена, что и раньше, но на этот раз с использованием JWT-токенов.

Декодируйте полученный токен, проверьте его и верните текущего пользователя.

Если токен недействителен, то сразу же верните HTTP-ошибку.

fromdatetimeimportdatetime,timedelta,timezonefromtypingimportAnnotatedimportjwtfromfastapiimportDepends,FastAPI,HTTPException,statusfromfastapi.securityimportOAuth2PasswordBearer,OAuth2PasswordRequestFormfromjwt.exceptionsimportInvalidTokenErrorfrompwdlibimportPasswordHashfrompydanticimportBaseModel# to get a string like this run:# openssl rand -hex 32SECRET_KEY="09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"ALGORITHM="HS256"ACCESS_TOKEN_EXPIRE_MINUTES=30fake_users_db={"johndoe":{"username":"johndoe","full_name":"John Doe","email":"johndoe@example.com","hashed_password":"$argon2id$v=19$m=65536,t=3,p=4$wagCPXjifgvUFBzq4hqe3w$CYaIb8sB+wtD+Vu/P4uod1+Qof8h+1g7bbDlBID48Rc","disabled":False,}}classToken(BaseModel):access_token:strtoken_type:strclassTokenData(BaseModel):username:str|None=NoneclassUser(BaseModel):username:stremail:str|None=Nonefull_name:str|None=Nonedisabled:bool|None=NoneclassUserInDB(User):hashed_password:strpassword_hash=PasswordHash.recommended()DUMMY_HASH=password_hash.hash("dummypassword")oauth2_scheme=OAuth2PasswordBearer(tokenUrl="token")app=FastAPI()defverify_password(plain_password,hashed_password):returnpassword_hash.verify(plain_password,hashed_password)defget_password_hash(password):returnpassword_hash.hash(password)defget_user(db,username:str):ifusernameindb:user_dict=db[username]returnUserInDB(**user_dict)defauthenticate_user(fake_db,username:str,password:str):user=get_user(fake_db,username)ifnotuser:verify_password(password,DUMMY_HASH)returnFalseifnotverify_password(password,user.hashed_password):returnFalsereturnuserdefcreate_access_token(data:dict,expires_delta:timedelta|None=None):to_encode=data.copy()ifexpires_delta:expire=datetime.now(timezone.utc)+expires_deltaelse:expire=datetime.now(timezone.utc)+timedelta(minutes=15)to_encode.update({"exp":expire})encoded_jwt=jwt.encode(to_encode,SECRET_KEY,algorithm=ALGORITHM)returnencoded_jwtasyncdefget_current_user(token:Annotated[str,Depends(oauth2_scheme)]):credentials_exception=HTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Could not validate credentials",headers={"WWW-Authenticate":"Bearer"},)try:payload=jwt.decode(token,SECRET_KEY,algorithms=[ALGORITHM])username=payload.get("sub")ifusernameisNone:raisecredentials_exceptiontoken_data=TokenData(username=username)exceptInvalidTokenError:raisecredentials_exceptionuser=get_user(fake_users_db,username=token_data.username)ifuserisNone:raisecredentials_exceptionreturnuserasyncdefget_current_active_user(current_user:Annotated[User,Depends(get_current_user)],):ifcurrent_user.disabled:raiseHTTPException(status_code=400,detail="Inactive user")returncurrent_user@app.post("/token")asyncdeflogin_for_access_token(form_data:Annotated[OAuth2PasswordRequestForm,Depends()],)->Token:user=authenticate_user(fake_users_db,form_data.username,form_data.password)ifnotuser:raiseHTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Incorrect username or password",headers={"WWW-Authenticate":"Bearer"},)access_token_expires=timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)access_token=create_access_token(data={"sub":user.username},expires_delta=access_token_expires)returnToken(access_token=access_token,token_type="bearer")@app.get("/users/me/")asyncdefread_users_me(current_user:Annotated[User,Depends(get_current_active_user)],)->User:returncurrent_user@app.get("/users/me/items/")asyncdefread_own_items(current_user:Annotated[User,Depends(get_current_active_user)],):return[{"item_id":"Foo","owner":current_user.username}]
🤓 Other versions and variants

Tip

Prefer to use theAnnotated version if possible.

fromdatetimeimportdatetime,timedelta,timezoneimportjwtfromfastapiimportDepends,FastAPI,HTTPException,statusfromfastapi.securityimportOAuth2PasswordBearer,OAuth2PasswordRequestFormfromjwt.exceptionsimportInvalidTokenErrorfrompwdlibimportPasswordHashfrompydanticimportBaseModel# to get a string like this run:# openssl rand -hex 32SECRET_KEY="09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"ALGORITHM="HS256"ACCESS_TOKEN_EXPIRE_MINUTES=30fake_users_db={"johndoe":{"username":"johndoe","full_name":"John Doe","email":"johndoe@example.com","hashed_password":"$argon2id$v=19$m=65536,t=3,p=4$wagCPXjifgvUFBzq4hqe3w$CYaIb8sB+wtD+Vu/P4uod1+Qof8h+1g7bbDlBID48Rc","disabled":False,}}classToken(BaseModel):access_token:strtoken_type:strclassTokenData(BaseModel):username:str|None=NoneclassUser(BaseModel):username:stremail:str|None=Nonefull_name:str|None=Nonedisabled:bool|None=NoneclassUserInDB(User):hashed_password:strpassword_hash=PasswordHash.recommended()DUMMY_HASH=password_hash.hash("dummypassword")oauth2_scheme=OAuth2PasswordBearer(tokenUrl="token")app=FastAPI()defverify_password(plain_password,hashed_password):returnpassword_hash.verify(plain_password,hashed_password)defget_password_hash(password):returnpassword_hash.hash(password)defget_user(db,username:str):ifusernameindb:user_dict=db[username]returnUserInDB(**user_dict)defauthenticate_user(fake_db,username:str,password:str):user=get_user(fake_db,username)ifnotuser:verify_password(password,DUMMY_HASH)returnFalseifnotverify_password(password,user.hashed_password):returnFalsereturnuserdefcreate_access_token(data:dict,expires_delta:timedelta|None=None):to_encode=data.copy()ifexpires_delta:expire=datetime.now(timezone.utc)+expires_deltaelse:expire=datetime.now(timezone.utc)+timedelta(minutes=15)to_encode.update({"exp":expire})encoded_jwt=jwt.encode(to_encode,SECRET_KEY,algorithm=ALGORITHM)returnencoded_jwtasyncdefget_current_user(token:str=Depends(oauth2_scheme)):credentials_exception=HTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Could not validate credentials",headers={"WWW-Authenticate":"Bearer"},)try:payload=jwt.decode(token,SECRET_KEY,algorithms=[ALGORITHM])username=payload.get("sub")ifusernameisNone:raisecredentials_exceptiontoken_data=TokenData(username=username)exceptInvalidTokenError:raisecredentials_exceptionuser=get_user(fake_users_db,username=token_data.username)ifuserisNone:raisecredentials_exceptionreturnuserasyncdefget_current_active_user(current_user:User=Depends(get_current_user)):ifcurrent_user.disabled:raiseHTTPException(status_code=400,detail="Inactive user")returncurrent_user@app.post("/token")asyncdeflogin_for_access_token(form_data:OAuth2PasswordRequestForm=Depends(),)->Token:user=authenticate_user(fake_users_db,form_data.username,form_data.password)ifnotuser:raiseHTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Incorrect username or password",headers={"WWW-Authenticate":"Bearer"},)access_token_expires=timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)access_token=create_access_token(data={"sub":user.username},expires_delta=access_token_expires)returnToken(access_token=access_token,token_type="bearer")@app.get("/users/me/")asyncdefread_users_me(current_user:User=Depends(get_current_active_user))->User:returncurrent_user@app.get("/users/me/items/")asyncdefread_own_items(current_user:User=Depends(get_current_active_user)):return[{"item_id":"Foo","owner":current_user.username}]

Обновлениеоперации пути/token

Создайтеtimedelta со временем истечения срока действия токена.

Создайте реальный токен доступа JWT и верните его

fromdatetimeimportdatetime,timedelta,timezonefromtypingimportAnnotatedimportjwtfromfastapiimportDepends,FastAPI,HTTPException,statusfromfastapi.securityimportOAuth2PasswordBearer,OAuth2PasswordRequestFormfromjwt.exceptionsimportInvalidTokenErrorfrompwdlibimportPasswordHashfrompydanticimportBaseModel# to get a string like this run:# openssl rand -hex 32SECRET_KEY="09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"ALGORITHM="HS256"ACCESS_TOKEN_EXPIRE_MINUTES=30fake_users_db={"johndoe":{"username":"johndoe","full_name":"John Doe","email":"johndoe@example.com","hashed_password":"$argon2id$v=19$m=65536,t=3,p=4$wagCPXjifgvUFBzq4hqe3w$CYaIb8sB+wtD+Vu/P4uod1+Qof8h+1g7bbDlBID48Rc","disabled":False,}}classToken(BaseModel):access_token:strtoken_type:strclassTokenData(BaseModel):username:str|None=NoneclassUser(BaseModel):username:stremail:str|None=Nonefull_name:str|None=Nonedisabled:bool|None=NoneclassUserInDB(User):hashed_password:strpassword_hash=PasswordHash.recommended()DUMMY_HASH=password_hash.hash("dummypassword")oauth2_scheme=OAuth2PasswordBearer(tokenUrl="token")app=FastAPI()defverify_password(plain_password,hashed_password):returnpassword_hash.verify(plain_password,hashed_password)defget_password_hash(password):returnpassword_hash.hash(password)defget_user(db,username:str):ifusernameindb:user_dict=db[username]returnUserInDB(**user_dict)defauthenticate_user(fake_db,username:str,password:str):user=get_user(fake_db,username)ifnotuser:verify_password(password,DUMMY_HASH)returnFalseifnotverify_password(password,user.hashed_password):returnFalsereturnuserdefcreate_access_token(data:dict,expires_delta:timedelta|None=None):to_encode=data.copy()ifexpires_delta:expire=datetime.now(timezone.utc)+expires_deltaelse:expire=datetime.now(timezone.utc)+timedelta(minutes=15)to_encode.update({"exp":expire})encoded_jwt=jwt.encode(to_encode,SECRET_KEY,algorithm=ALGORITHM)returnencoded_jwtasyncdefget_current_user(token:Annotated[str,Depends(oauth2_scheme)]):credentials_exception=HTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Could not validate credentials",headers={"WWW-Authenticate":"Bearer"},)try:payload=jwt.decode(token,SECRET_KEY,algorithms=[ALGORITHM])username=payload.get("sub")ifusernameisNone:raisecredentials_exceptiontoken_data=TokenData(username=username)exceptInvalidTokenError:raisecredentials_exceptionuser=get_user(fake_users_db,username=token_data.username)ifuserisNone:raisecredentials_exceptionreturnuserasyncdefget_current_active_user(current_user:Annotated[User,Depends(get_current_user)],):ifcurrent_user.disabled:raiseHTTPException(status_code=400,detail="Inactive user")returncurrent_user@app.post("/token")asyncdeflogin_for_access_token(form_data:Annotated[OAuth2PasswordRequestForm,Depends()],)->Token:user=authenticate_user(fake_users_db,form_data.username,form_data.password)ifnotuser:raiseHTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Incorrect username or password",headers={"WWW-Authenticate":"Bearer"},)access_token_expires=timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)access_token=create_access_token(data={"sub":user.username},expires_delta=access_token_expires)returnToken(access_token=access_token,token_type="bearer")@app.get("/users/me/")asyncdefread_users_me(current_user:Annotated[User,Depends(get_current_active_user)],)->User:returncurrent_user@app.get("/users/me/items/")asyncdefread_own_items(current_user:Annotated[User,Depends(get_current_active_user)],):return[{"item_id":"Foo","owner":current_user.username}]
🤓 Other versions and variants

Tip

Prefer to use theAnnotated version if possible.

fromdatetimeimportdatetime,timedelta,timezoneimportjwtfromfastapiimportDepends,FastAPI,HTTPException,statusfromfastapi.securityimportOAuth2PasswordBearer,OAuth2PasswordRequestFormfromjwt.exceptionsimportInvalidTokenErrorfrompwdlibimportPasswordHashfrompydanticimportBaseModel# to get a string like this run:# openssl rand -hex 32SECRET_KEY="09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"ALGORITHM="HS256"ACCESS_TOKEN_EXPIRE_MINUTES=30fake_users_db={"johndoe":{"username":"johndoe","full_name":"John Doe","email":"johndoe@example.com","hashed_password":"$argon2id$v=19$m=65536,t=3,p=4$wagCPXjifgvUFBzq4hqe3w$CYaIb8sB+wtD+Vu/P4uod1+Qof8h+1g7bbDlBID48Rc","disabled":False,}}classToken(BaseModel):access_token:strtoken_type:strclassTokenData(BaseModel):username:str|None=NoneclassUser(BaseModel):username:stremail:str|None=Nonefull_name:str|None=Nonedisabled:bool|None=NoneclassUserInDB(User):hashed_password:strpassword_hash=PasswordHash.recommended()DUMMY_HASH=password_hash.hash("dummypassword")oauth2_scheme=OAuth2PasswordBearer(tokenUrl="token")app=FastAPI()defverify_password(plain_password,hashed_password):returnpassword_hash.verify(plain_password,hashed_password)defget_password_hash(password):returnpassword_hash.hash(password)defget_user(db,username:str):ifusernameindb:user_dict=db[username]returnUserInDB(**user_dict)defauthenticate_user(fake_db,username:str,password:str):user=get_user(fake_db,username)ifnotuser:verify_password(password,DUMMY_HASH)returnFalseifnotverify_password(password,user.hashed_password):returnFalsereturnuserdefcreate_access_token(data:dict,expires_delta:timedelta|None=None):to_encode=data.copy()ifexpires_delta:expire=datetime.now(timezone.utc)+expires_deltaelse:expire=datetime.now(timezone.utc)+timedelta(minutes=15)to_encode.update({"exp":expire})encoded_jwt=jwt.encode(to_encode,SECRET_KEY,algorithm=ALGORITHM)returnencoded_jwtasyncdefget_current_user(token:str=Depends(oauth2_scheme)):credentials_exception=HTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Could not validate credentials",headers={"WWW-Authenticate":"Bearer"},)try:payload=jwt.decode(token,SECRET_KEY,algorithms=[ALGORITHM])username=payload.get("sub")ifusernameisNone:raisecredentials_exceptiontoken_data=TokenData(username=username)exceptInvalidTokenError:raisecredentials_exceptionuser=get_user(fake_users_db,username=token_data.username)ifuserisNone:raisecredentials_exceptionreturnuserasyncdefget_current_active_user(current_user:User=Depends(get_current_user)):ifcurrent_user.disabled:raiseHTTPException(status_code=400,detail="Inactive user")returncurrent_user@app.post("/token")asyncdeflogin_for_access_token(form_data:OAuth2PasswordRequestForm=Depends(),)->Token:user=authenticate_user(fake_users_db,form_data.username,form_data.password)ifnotuser:raiseHTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Incorrect username or password",headers={"WWW-Authenticate":"Bearer"},)access_token_expires=timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)access_token=create_access_token(data={"sub":user.username},expires_delta=access_token_expires)returnToken(access_token=access_token,token_type="bearer")@app.get("/users/me/")asyncdefread_users_me(current_user:User=Depends(get_current_active_user))->User:returncurrent_user@app.get("/users/me/items/")asyncdefread_own_items(current_user:User=Depends(get_current_active_user)):return[{"item_id":"Foo","owner":current_user.username}]

Технические подробности о JWT ключеsub

В спецификации JWT говорится, что существует ключsub, содержащий субъект токена.

Его использование необязательно, но это именно то место, куда вы должны поместить идентификатор пользователя, и поэтому мы здесь его и используем.

JWT может использоваться и для других целей, помимо идентификации пользователя и предоставления ему возможности выполнять операции непосредственно в вашем API.

Например, вы могли бы определить "автомобиль" или "запись в блоге".

Затем вы могли бы добавить права доступа к этой сущности, например "управлять" (для автомобиля) или "редактировать" (для блога).

Затем вы могли бы передать этот JWT-токен пользователю (или боту), и они использовали бы его для выполнения определенных действий (управление автомобилем или редактирование запись в блоге), даже не имея учетной записи, просто используя JWT-токен, сгенерированный вашим API.

Используя эти идеи, JWT можно применять для гораздо более сложных сценариев.

В отдельных случаях несколько сущностей могут иметь один и тот же идентификатор, скажем,foo (пользовательfoo, автомобильfoo и запись в блогеfoo).

Поэтому, чтобы избежать коллизий идентификаторов, при создании JWT-токена для пользователя можно добавить префиксusername к значению ключаsub. Таким образом, в данном примере значениеsub было быusername:johndoe.

Важно помнить, что ключsub должен иметь уникальный идентификатор для всего приложения и представлять собой строку.

Проверка в действии

Запустите сервер и перейдите к документации:http://127.0.0.1:8000/docs.

Вы увидите пользовательский интерфейс вида:

Пройдите авторизацию так же, как делали раньше.

Используя учетные данные пользователя:

Username:johndoePassword:secret

Проверка

Обратите внимание, что нигде в коде не используется открытый текст пароля "secret", мы используем только его хэшированную версию.

Вызвав эндпоинт/users/me/, вы получите ответ в виде:

{"username":"johndoe","email":"johndoe@example.com","full_name":"John Doe","disabled":false}

Если открыть инструменты разработчика, то можно увидеть, что передаваемые данные включают только токен, пароль передается только в первом запросе для аутентификации пользователя и получения токена доступа, но не в последующих:

Техническая информация

Обратите внимание на HTTP-заголовокAuthorization, значение которого начинается сBearer.

Продвинутое использованиеscopes

В OAuth2 существует понятие "диапазоны" ("scopes").

С их помощью можно добавить определенный набор разрешений к JWT-токену.

Затем вы можете передать этот токен непосредственно пользователю или третьей стороне для взаимодействия с вашим API с определенным набором ограничений.

О том, как их использовать и как они интегрированы вFastAPI, читайте далее вРасширенном руководстве пользователя.

Резюме

С учетом того, что вы видели до сих пор, вы можете создать безопасное приложениеFastAPI, используя такие стандарты, как OAuth2 и JWT.

Практически в любом фреймворке работа с безопасностью довольно быстро превращается в сложную тему.

Многие пакеты, сильно упрощающие эту задачу, вынуждены идти на многочисленные компромиссы с моделью данных, с базой данных и с доступным функционалом. Некоторые из этих пакетов, которые пытаются уж слишком все упростить, имеют даже "дыры" в системе безопасности.


FastAPI не делает уступок ни одной базе данных, модели данных или инструментарию.

Он предоставляет вам полную свободу действий, позволяя выбирать то, что лучше всего подходит для вашего проекта.

Вы можете напрямую использовать многие хорошо поддерживаемые и широко распространенные пакеты, такие какpwdlib иPyJWT, посколькуFastAPI не требует сложных механизмов для интеграции внешних пакетов.

Напротив, он предоставляет инструменты, позволяющие максимально упростить этот процесс без ущерба для гибкости, надежности и безопасности.

При этом вы можете использовать и реализовывать безопасные стандартные протоколы, такие как OAuth2, относительно простым способом.

ВРасширенном руководстве пользователя вы можете узнать больше о том, как использовать "диапазоны" ("scopes") OAuth2 для создания более точно настроенной системы разрешений в соответствии с теми же стандартами. OAuth2 с диапазонами — это механизм, используемый многими крупными провайдерами сервиса аутентификации, такими как Facebook, Google, GitHub, Microsoft, X (Twitter) и др., для авторизации сторонних приложений на взаимодействие с их API от имени их пользователей.


[8]ページ先頭

©2009-2026 Movatter.jp