- Scopes de OAuth2 y OpenAPI
- Vista global
- Esquema de seguridad OAuth2
- Token JWT con scopes
- Declarar scopes enpath operations y dependencias
- Usar
SecurityScopes - Usar los
scopes - Verificar el
usernamey la forma de los datos - Verificar los
scopes - Árbol de dependencias y scopes
- Más detalles sobre
SecurityScopes - Revisa
- Acerca de las integraciones de terceros
Securityendependenciesdel decorador
Scopes de OAuth2¶
🌐 Traducción por IA y humanos
Esta traducción fue hecha por IA guiada por humanos. 🤝
Podría tener errores al interpretar el significado original, o sonar poco natural, etc. 🤖
Puedes mejorar esta traducciónayudándonos a guiar mejor al LLM de IA.
Puedes usar scopes de OAuth2 directamente conFastAPI, están integrados para funcionar de manera fluida.
Esto te permitiría tener un sistema de permisos más detallado, siguiendo el estándar de OAuth2, integrado en tu aplicación OpenAPI (y la documentación de la API).
OAuth2 con scopes es el mecanismo usado por muchos grandes proveedores de autenticación, como Facebook, Google, GitHub, Microsoft, X (Twitter), etc. Lo usan para proporcionar permisos específicos a usuarios y aplicaciones.
Cada vez que te "logueas con" Facebook, Google, GitHub, Microsoft, X (Twitter), esa aplicación está usando OAuth2 con scopes.
En esta sección verás cómo manejar autenticación y autorización con el mismo OAuth2 con scopes en tu aplicación deFastAPI.
Advertencia
Esta es una sección más o menos avanzada. Si estás comenzando, puedes saltarla.
No necesariamente necesitas scopes de OAuth2, y puedes manejar autenticación y autorización como quieras.
Pero OAuth2 con scopes se puede integrar muy bien en tu API (con OpenAPI) y en la documentación de tu API.
No obstante, tú aún impones esos scopes, o cualquier otro requisito de seguridad/autorización, como necesites, en tu código.
En muchos casos, OAuth2 con scopes puede ser un exceso.
Pero si sabes que lo necesitas, o tienes curiosidad, sigue leyendo.
Scopes de OAuth2 y OpenAPI¶
La especificación de OAuth2 define "scopes" como una lista de strings separados por espacios.
El contenido de cada uno de estos strings puede tener cualquier formato, pero no debe contener espacios.
Estos scopes representan "permisos".
En OpenAPI (por ejemplo, en la documentación de la API), puedes definir "esquemas de seguridad".
Cuando uno de estos esquemas de seguridad usa OAuth2, también puedes declarar y usar scopes.
Cada "scope" es solo un string (sin espacios).
Normalmente se utilizan para declarar permisos de seguridad específicos, por ejemplo:
users:readousers:writeson ejemplos comunes.instagram_basices usado por Facebook / Instagram.https://www.googleapis.com/auth/drivees usado por Google.
Información
En OAuth2 un "scope" es solo un string que declara un permiso específico requerido.
No importa si tiene otros caracteres como: o si es una URL.
Esos detalles son específicos de la implementación.
Para OAuth2 son solo strings.
Vista global¶
Primero, echemos un vistazo rápido a las partes que cambian desde los ejemplos en elTutorial - User Guide principal paraOAuth2 con Password (y hashing), Bearer con tokens JWT. Ahora usando scopes de OAuth2:
fromdatetimeimportdatetime,timedelta,timezonefromtypingimportAnnotatedimportjwtfromfastapiimportDepends,FastAPI,HTTPException,Security,statusfromfastapi.securityimport(OAuth2PasswordBearer,OAuth2PasswordRequestForm,SecurityScopes,)fromjwt.exceptionsimportInvalidTokenErrorfrompwdlibimportPasswordHashfrompydanticimportBaseModel,ValidationError# to get a string like this run:# openssl rand -hex 32SECRET_KEY="09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"ALGORITHM="HS256"ACCESS_TOKEN_EXPIRE_MINUTES=30fake_users_db={"johndoe":{"username":"johndoe","full_name":"John Doe","email":"johndoe@example.com","hashed_password":"$argon2id$v=19$m=65536,t=3,p=4$wagCPXjifgvUFBzq4hqe3w$CYaIb8sB+wtD+Vu/P4uod1+Qof8h+1g7bbDlBID48Rc","disabled":False,},"alice":{"username":"alice","full_name":"Alice Chains","email":"alicechains@example.com","hashed_password":"$argon2id$v=19$m=65536,t=3,p=4$g2/AV1zwopqUntPKJavBFw$BwpRGDCyUHLvHICnwijyX8ROGoiUPwNKZ7915MeYfCE","disabled":True,},}classToken(BaseModel):access_token:strtoken_type:strclassTokenData(BaseModel):username:str|None=Nonescopes:list[str]=[]classUser(BaseModel):username:stremail:str|None=Nonefull_name:str|None=Nonedisabled:bool|None=NoneclassUserInDB(User):hashed_password:strpassword_hash=PasswordHash.recommended()DUMMY_HASH=password_hash.hash("dummypassword")oauth2_scheme=OAuth2PasswordBearer(tokenUrl="token",scopes={"me":"Read information about the current user.","items":"Read items."},)app=FastAPI()defverify_password(plain_password,hashed_password):returnpassword_hash.verify(plain_password,hashed_password)defget_password_hash(password):returnpassword_hash.hash(password)defget_user(db,username:str):ifusernameindb:user_dict=db[username]returnUserInDB(**user_dict)defauthenticate_user(fake_db,username:str,password:str):user=get_user(fake_db,username)ifnotuser:verify_password(password,DUMMY_HASH)returnFalseifnotverify_password(password,user.hashed_password):returnFalsereturnuserdefcreate_access_token(data:dict,expires_delta:timedelta|None=None):to_encode=data.copy()ifexpires_delta:expire=datetime.now(timezone.utc)+expires_deltaelse:expire=datetime.now(timezone.utc)+timedelta(minutes=15)to_encode.update({"exp":expire})encoded_jwt=jwt.encode(to_encode,SECRET_KEY,algorithm=ALGORITHM)returnencoded_jwtasyncdefget_current_user(security_scopes:SecurityScopes,token:Annotated[str,Depends(oauth2_scheme)]):ifsecurity_scopes.scopes:authenticate_value=f'Bearer scope="{security_scopes.scope_str}"'else:authenticate_value="Bearer"credentials_exception=HTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Could not validate credentials",headers={"WWW-Authenticate":authenticate_value},)try:payload=jwt.decode(token,SECRET_KEY,algorithms=[ALGORITHM])username=payload.get("sub")ifusernameisNone:raisecredentials_exceptionscope:str=payload.get("scope","")token_scopes=scope.split(" ")token_data=TokenData(scopes=token_scopes,username=username)except(InvalidTokenError,ValidationError):raisecredentials_exceptionuser=get_user(fake_users_db,username=token_data.username)ifuserisNone:raisecredentials_exceptionforscopeinsecurity_scopes.scopes:ifscopenotintoken_data.scopes:raiseHTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Not enough permissions",headers={"WWW-Authenticate":authenticate_value},)returnuserasyncdefget_current_active_user(current_user:Annotated[User,Security(get_current_user,scopes=["me"])],):ifcurrent_user.disabled:raiseHTTPException(status_code=400,detail="Inactive user")returncurrent_user@app.post("/token")asyncdeflogin_for_access_token(form_data:Annotated[OAuth2PasswordRequestForm,Depends()],)->Token:user=authenticate_user(fake_users_db,form_data.username,form_data.password)ifnotuser:raiseHTTPException(status_code=400,detail="Incorrect username or password")access_token_expires=timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)access_token=create_access_token(data={"sub":user.username,"scope":" ".join(form_data.scopes)},expires_delta=access_token_expires,)returnToken(access_token=access_token,token_type="bearer")@app.get("/users/me/")asyncdefread_users_me(current_user:Annotated[User,Depends(get_current_active_user)],)->User:returncurrent_user@app.get("/users/me/items/")asyncdefread_own_items(current_user:Annotated[User,Security(get_current_active_user,scopes=["items"])],):return[{"item_id":"Foo","owner":current_user.username}]@app.get("/status/")asyncdefread_system_status(current_user:Annotated[User,Depends(get_current_user)]):return{"status":"ok"}🤓 Other versions and variants
Tip
Prefer to use theAnnotated version if possible.
fromdatetimeimportdatetime,timedelta,timezoneimportjwtfromfastapiimportDepends,FastAPI,HTTPException,Security,statusfromfastapi.securityimport(OAuth2PasswordBearer,OAuth2PasswordRequestForm,SecurityScopes,)fromjwt.exceptionsimportInvalidTokenErrorfrompwdlibimportPasswordHashfrompydanticimportBaseModel,ValidationError# to get a string like this run:# openssl rand -hex 32SECRET_KEY="09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"ALGORITHM="HS256"ACCESS_TOKEN_EXPIRE_MINUTES=30fake_users_db={"johndoe":{"username":"johndoe","full_name":"John Doe","email":"johndoe@example.com","hashed_password":"$argon2id$v=19$m=65536,t=3,p=4$wagCPXjifgvUFBzq4hqe3w$CYaIb8sB+wtD+Vu/P4uod1+Qof8h+1g7bbDlBID48Rc","disabled":False,},"alice":{"username":"alice","full_name":"Alice Chains","email":"alicechains@example.com","hashed_password":"$argon2id$v=19$m=65536,t=3,p=4$g2/AV1zwopqUntPKJavBFw$BwpRGDCyUHLvHICnwijyX8ROGoiUPwNKZ7915MeYfCE","disabled":True,},}classToken(BaseModel):access_token:strtoken_type:strclassTokenData(BaseModel):username:str|None=Nonescopes:list[str]=[]classUser(BaseModel):username:stremail:str|None=Nonefull_name:str|None=Nonedisabled:bool|None=NoneclassUserInDB(User):hashed_password:strpassword_hash=PasswordHash.recommended()DUMMY_HASH=password_hash.hash("dummypassword")oauth2_scheme=OAuth2PasswordBearer(tokenUrl="token",scopes={"me":"Read information about the current user.","items":"Read items."},)app=FastAPI()defverify_password(plain_password,hashed_password):returnpassword_hash.verify(plain_password,hashed_password)defget_password_hash(password):returnpassword_hash.hash(password)defget_user(db,username:str):ifusernameindb:user_dict=db[username]returnUserInDB(**user_dict)defauthenticate_user(fake_db,username:str,password:str):user=get_user(fake_db,username)ifnotuser:verify_password(password,DUMMY_HASH)returnFalseifnotverify_password(password,user.hashed_password):returnFalsereturnuserdefcreate_access_token(data:dict,expires_delta:timedelta|None=None):to_encode=data.copy()ifexpires_delta:expire=datetime.now(timezone.utc)+expires_deltaelse:expire=datetime.now(timezone.utc)+timedelta(minutes=15)to_encode.update({"exp":expire})encoded_jwt=jwt.encode(to_encode,SECRET_KEY,algorithm=ALGORITHM)returnencoded_jwtasyncdefget_current_user(security_scopes:SecurityScopes,token:str=Depends(oauth2_scheme)):ifsecurity_scopes.scopes:authenticate_value=f'Bearer scope="{security_scopes.scope_str}"'else:authenticate_value="Bearer"credentials_exception=HTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Could not validate credentials",headers={"WWW-Authenticate":authenticate_value},)try:payload=jwt.decode(token,SECRET_KEY,algorithms=[ALGORITHM])username:str=payload.get("sub")ifusernameisNone:raisecredentials_exceptionscope:str=payload.get("scope","")token_scopes=scope.split(" ")token_data=TokenData(scopes=token_scopes,username=username)except(InvalidTokenError,ValidationError):raisecredentials_exceptionuser=get_user(fake_users_db,username=token_data.username)ifuserisNone:raisecredentials_exceptionforscopeinsecurity_scopes.scopes:ifscopenotintoken_data.scopes:raiseHTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Not enough permissions",headers={"WWW-Authenticate":authenticate_value},)returnuserasyncdefget_current_active_user(current_user:User=Security(get_current_user,scopes=["me"]),):ifcurrent_user.disabled:raiseHTTPException(status_code=400,detail="Inactive user")returncurrent_user@app.post("/token")asyncdeflogin_for_access_token(form_data:OAuth2PasswordRequestForm=Depends(),)->Token:user=authenticate_user(fake_users_db,form_data.username,form_data.password)ifnotuser:raiseHTTPException(status_code=400,detail="Incorrect username or password")access_token_expires=timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)access_token=create_access_token(data={"sub":user.username,"scope":" ".join(form_data.scopes)},expires_delta=access_token_expires,)returnToken(access_token=access_token,token_type="bearer")@app.get("/users/me/")asyncdefread_users_me(current_user:User=Depends(get_current_active_user))->User:returncurrent_user@app.get("/users/me/items/")asyncdefread_own_items(current_user:User=Security(get_current_active_user,scopes=["items"]),):return[{"item_id":"Foo","owner":current_user.username}]@app.get("/status/")asyncdefread_system_status(current_user:User=Depends(get_current_user)):return{"status":"ok"}Ahora revisemos esos cambios paso a paso.
Esquema de seguridad OAuth2¶
El primer cambio es que ahora estamos declarando el esquema de seguridad OAuth2 con dos scopes disponibles,me yitems.
El parámetroscopes recibe undict con cada scope como clave y la descripción como valor:
fromdatetimeimportdatetime,timedelta,timezonefromtypingimportAnnotatedimportjwtfromfastapiimportDepends,FastAPI,HTTPException,Security,statusfromfastapi.securityimport(OAuth2PasswordBearer,OAuth2PasswordRequestForm,SecurityScopes,)fromjwt.exceptionsimportInvalidTokenErrorfrompwdlibimportPasswordHashfrompydanticimportBaseModel,ValidationError# to get a string like this run:# openssl rand -hex 32SECRET_KEY="09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"ALGORITHM="HS256"ACCESS_TOKEN_EXPIRE_MINUTES=30fake_users_db={"johndoe":{"username":"johndoe","full_name":"John Doe","email":"johndoe@example.com","hashed_password":"$argon2id$v=19$m=65536,t=3,p=4$wagCPXjifgvUFBzq4hqe3w$CYaIb8sB+wtD+Vu/P4uod1+Qof8h+1g7bbDlBID48Rc","disabled":False,},"alice":{"username":"alice","full_name":"Alice Chains","email":"alicechains@example.com","hashed_password":"$argon2id$v=19$m=65536,t=3,p=4$g2/AV1zwopqUntPKJavBFw$BwpRGDCyUHLvHICnwijyX8ROGoiUPwNKZ7915MeYfCE","disabled":True,},}classToken(BaseModel):access_token:strtoken_type:strclassTokenData(BaseModel):username:str|None=Nonescopes:list[str]=[]classUser(BaseModel):username:stremail:str|None=Nonefull_name:str|None=Nonedisabled:bool|None=NoneclassUserInDB(User):hashed_password:strpassword_hash=PasswordHash.recommended()DUMMY_HASH=password_hash.hash("dummypassword")oauth2_scheme=OAuth2PasswordBearer(tokenUrl="token",scopes={"me":"Read information about the current user.","items":"Read items."},)app=FastAPI()defverify_password(plain_password,hashed_password):returnpassword_hash.verify(plain_password,hashed_password)defget_password_hash(password):returnpassword_hash.hash(password)defget_user(db,username:str):ifusernameindb:user_dict=db[username]returnUserInDB(**user_dict)defauthenticate_user(fake_db,username:str,password:str):user=get_user(fake_db,username)ifnotuser:verify_password(password,DUMMY_HASH)returnFalseifnotverify_password(password,user.hashed_password):returnFalsereturnuserdefcreate_access_token(data:dict,expires_delta:timedelta|None=None):to_encode=data.copy()ifexpires_delta:expire=datetime.now(timezone.utc)+expires_deltaelse:expire=datetime.now(timezone.utc)+timedelta(minutes=15)to_encode.update({"exp":expire})encoded_jwt=jwt.encode(to_encode,SECRET_KEY,algorithm=ALGORITHM)returnencoded_jwtasyncdefget_current_user(security_scopes:SecurityScopes,token:Annotated[str,Depends(oauth2_scheme)]):ifsecurity_scopes.scopes:authenticate_value=f'Bearer scope="{security_scopes.scope_str}"'else:authenticate_value="Bearer"credentials_exception=HTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Could not validate credentials",headers={"WWW-Authenticate":authenticate_value},)try:payload=jwt.decode(token,SECRET_KEY,algorithms=[ALGORITHM])username=payload.get("sub")ifusernameisNone:raisecredentials_exceptionscope:str=payload.get("scope","")token_scopes=scope.split(" ")token_data=TokenData(scopes=token_scopes,username=username)except(InvalidTokenError,ValidationError):raisecredentials_exceptionuser=get_user(fake_users_db,username=token_data.username)ifuserisNone:raisecredentials_exceptionforscopeinsecurity_scopes.scopes:ifscopenotintoken_data.scopes:raiseHTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Not enough permissions",headers={"WWW-Authenticate":authenticate_value},)returnuserasyncdefget_current_active_user(current_user:Annotated[User,Security(get_current_user,scopes=["me"])],):ifcurrent_user.disabled:raiseHTTPException(status_code=400,detail="Inactive user")returncurrent_user@app.post("/token")asyncdeflogin_for_access_token(form_data:Annotated[OAuth2PasswordRequestForm,Depends()],)->Token:user=authenticate_user(fake_users_db,form_data.username,form_data.password)ifnotuser:raiseHTTPException(status_code=400,detail="Incorrect username or password")access_token_expires=timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)access_token=create_access_token(data={"sub":user.username,"scope":" ".join(form_data.scopes)},expires_delta=access_token_expires,)returnToken(access_token=access_token,token_type="bearer")@app.get("/users/me/")asyncdefread_users_me(current_user:Annotated[User,Depends(get_current_active_user)],)->User:returncurrent_user@app.get("/users/me/items/")asyncdefread_own_items(current_user:Annotated[User,Security(get_current_active_user,scopes=["items"])],):return[{"item_id":"Foo","owner":current_user.username}]@app.get("/status/")asyncdefread_system_status(current_user:Annotated[User,Depends(get_current_user)]):return{"status":"ok"}🤓 Other versions and variants
Tip
Prefer to use theAnnotated version if possible.
fromdatetimeimportdatetime,timedelta,timezoneimportjwtfromfastapiimportDepends,FastAPI,HTTPException,Security,statusfromfastapi.securityimport(OAuth2PasswordBearer,OAuth2PasswordRequestForm,SecurityScopes,)fromjwt.exceptionsimportInvalidTokenErrorfrompwdlibimportPasswordHashfrompydanticimportBaseModel,ValidationError# to get a string like this run:# openssl rand -hex 32SECRET_KEY="09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"ALGORITHM="HS256"ACCESS_TOKEN_EXPIRE_MINUTES=30fake_users_db={"johndoe":{"username":"johndoe","full_name":"John Doe","email":"johndoe@example.com","hashed_password":"$argon2id$v=19$m=65536,t=3,p=4$wagCPXjifgvUFBzq4hqe3w$CYaIb8sB+wtD+Vu/P4uod1+Qof8h+1g7bbDlBID48Rc","disabled":False,},"alice":{"username":"alice","full_name":"Alice Chains","email":"alicechains@example.com","hashed_password":"$argon2id$v=19$m=65536,t=3,p=4$g2/AV1zwopqUntPKJavBFw$BwpRGDCyUHLvHICnwijyX8ROGoiUPwNKZ7915MeYfCE","disabled":True,},}classToken(BaseModel):access_token:strtoken_type:strclassTokenData(BaseModel):username:str|None=Nonescopes:list[str]=[]classUser(BaseModel):username:stremail:str|None=Nonefull_name:str|None=Nonedisabled:bool|None=NoneclassUserInDB(User):hashed_password:strpassword_hash=PasswordHash.recommended()DUMMY_HASH=password_hash.hash("dummypassword")oauth2_scheme=OAuth2PasswordBearer(tokenUrl="token",scopes={"me":"Read information about the current user.","items":"Read items."},)app=FastAPI()defverify_password(plain_password,hashed_password):returnpassword_hash.verify(plain_password,hashed_password)defget_password_hash(password):returnpassword_hash.hash(password)defget_user(db,username:str):ifusernameindb:user_dict=db[username]returnUserInDB(**user_dict)defauthenticate_user(fake_db,username:str,password:str):user=get_user(fake_db,username)ifnotuser:verify_password(password,DUMMY_HASH)returnFalseifnotverify_password(password,user.hashed_password):returnFalsereturnuserdefcreate_access_token(data:dict,expires_delta:timedelta|None=None):to_encode=data.copy()ifexpires_delta:expire=datetime.now(timezone.utc)+expires_deltaelse:expire=datetime.now(timezone.utc)+timedelta(minutes=15)to_encode.update({"exp":expire})encoded_jwt=jwt.encode(to_encode,SECRET_KEY,algorithm=ALGORITHM)returnencoded_jwtasyncdefget_current_user(security_scopes:SecurityScopes,token:str=Depends(oauth2_scheme)):ifsecurity_scopes.scopes:authenticate_value=f'Bearer scope="{security_scopes.scope_str}"'else:authenticate_value="Bearer"credentials_exception=HTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Could not validate credentials",headers={"WWW-Authenticate":authenticate_value},)try:payload=jwt.decode(token,SECRET_KEY,algorithms=[ALGORITHM])username:str=payload.get("sub")ifusernameisNone:raisecredentials_exceptionscope:str=payload.get("scope","")token_scopes=scope.split(" ")token_data=TokenData(scopes=token_scopes,username=username)except(InvalidTokenError,ValidationError):raisecredentials_exceptionuser=get_user(fake_users_db,username=token_data.username)ifuserisNone:raisecredentials_exceptionforscopeinsecurity_scopes.scopes:ifscopenotintoken_data.scopes:raiseHTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Not enough permissions",headers={"WWW-Authenticate":authenticate_value},)returnuserasyncdefget_current_active_user(current_user:User=Security(get_current_user,scopes=["me"]),):ifcurrent_user.disabled:raiseHTTPException(status_code=400,detail="Inactive user")returncurrent_user@app.post("/token")asyncdeflogin_for_access_token(form_data:OAuth2PasswordRequestForm=Depends(),)->Token:user=authenticate_user(fake_users_db,form_data.username,form_data.password)ifnotuser:raiseHTTPException(status_code=400,detail="Incorrect username or password")access_token_expires=timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)access_token=create_access_token(data={"sub":user.username,"scope":" ".join(form_data.scopes)},expires_delta=access_token_expires,)returnToken(access_token=access_token,token_type="bearer")@app.get("/users/me/")asyncdefread_users_me(current_user:User=Depends(get_current_active_user))->User:returncurrent_user@app.get("/users/me/items/")asyncdefread_own_items(current_user:User=Security(get_current_active_user,scopes=["items"]),):return[{"item_id":"Foo","owner":current_user.username}]@app.get("/status/")asyncdefread_system_status(current_user:User=Depends(get_current_user)):return{"status":"ok"}Como ahora estamos declarando esos scopes, aparecerán en la documentación de la API cuando inicies sesión/autorices.
Y podrás seleccionar cuáles scopes quieres dar de acceso:me yitems.
Este es el mismo mecanismo utilizado cuando das permisos al iniciar sesión con Facebook, Google, GitHub, etc:

Token JWT con scopes¶
Ahora, modifica lapath operation del token para devolver los scopes solicitados.
Todavía estamos usando el mismoOAuth2PasswordRequestForm. Incluye una propiedadscopes con unalist destr, con cada scope que recibió en el request.
Y devolvemos los scopes como parte del token JWT.
Peligro
Para simplificar, aquí solo estamos añadiendo los scopes recibidos directamente al token.
Pero en tu aplicación, por seguridad, deberías asegurarte de añadir solo los scopes que el usuario realmente puede tener, o los que has predefinido.
fromdatetimeimportdatetime,timedelta,timezonefromtypingimportAnnotatedimportjwtfromfastapiimportDepends,FastAPI,HTTPException,Security,statusfromfastapi.securityimport(OAuth2PasswordBearer,OAuth2PasswordRequestForm,SecurityScopes,)fromjwt.exceptionsimportInvalidTokenErrorfrompwdlibimportPasswordHashfrompydanticimportBaseModel,ValidationError# to get a string like this run:# openssl rand -hex 32SECRET_KEY="09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"ALGORITHM="HS256"ACCESS_TOKEN_EXPIRE_MINUTES=30fake_users_db={"johndoe":{"username":"johndoe","full_name":"John Doe","email":"johndoe@example.com","hashed_password":"$argon2id$v=19$m=65536,t=3,p=4$wagCPXjifgvUFBzq4hqe3w$CYaIb8sB+wtD+Vu/P4uod1+Qof8h+1g7bbDlBID48Rc","disabled":False,},"alice":{"username":"alice","full_name":"Alice Chains","email":"alicechains@example.com","hashed_password":"$argon2id$v=19$m=65536,t=3,p=4$g2/AV1zwopqUntPKJavBFw$BwpRGDCyUHLvHICnwijyX8ROGoiUPwNKZ7915MeYfCE","disabled":True,},}classToken(BaseModel):access_token:strtoken_type:strclassTokenData(BaseModel):username:str|None=Nonescopes:list[str]=[]classUser(BaseModel):username:stremail:str|None=Nonefull_name:str|None=Nonedisabled:bool|None=NoneclassUserInDB(User):hashed_password:strpassword_hash=PasswordHash.recommended()DUMMY_HASH=password_hash.hash("dummypassword")oauth2_scheme=OAuth2PasswordBearer(tokenUrl="token",scopes={"me":"Read information about the current user.","items":"Read items."},)app=FastAPI()defverify_password(plain_password,hashed_password):returnpassword_hash.verify(plain_password,hashed_password)defget_password_hash(password):returnpassword_hash.hash(password)defget_user(db,username:str):ifusernameindb:user_dict=db[username]returnUserInDB(**user_dict)defauthenticate_user(fake_db,username:str,password:str):user=get_user(fake_db,username)ifnotuser:verify_password(password,DUMMY_HASH)returnFalseifnotverify_password(password,user.hashed_password):returnFalsereturnuserdefcreate_access_token(data:dict,expires_delta:timedelta|None=None):to_encode=data.copy()ifexpires_delta:expire=datetime.now(timezone.utc)+expires_deltaelse:expire=datetime.now(timezone.utc)+timedelta(minutes=15)to_encode.update({"exp":expire})encoded_jwt=jwt.encode(to_encode,SECRET_KEY,algorithm=ALGORITHM)returnencoded_jwtasyncdefget_current_user(security_scopes:SecurityScopes,token:Annotated[str,Depends(oauth2_scheme)]):ifsecurity_scopes.scopes:authenticate_value=f'Bearer scope="{security_scopes.scope_str}"'else:authenticate_value="Bearer"credentials_exception=HTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Could not validate credentials",headers={"WWW-Authenticate":authenticate_value},)try:payload=jwt.decode(token,SECRET_KEY,algorithms=[ALGORITHM])username=payload.get("sub")ifusernameisNone:raisecredentials_exceptionscope:str=payload.get("scope","")token_scopes=scope.split(" ")token_data=TokenData(scopes=token_scopes,username=username)except(InvalidTokenError,ValidationError):raisecredentials_exceptionuser=get_user(fake_users_db,username=token_data.username)ifuserisNone:raisecredentials_exceptionforscopeinsecurity_scopes.scopes:ifscopenotintoken_data.scopes:raiseHTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Not enough permissions",headers={"WWW-Authenticate":authenticate_value},)returnuserasyncdefget_current_active_user(current_user:Annotated[User,Security(get_current_user,scopes=["me"])],):ifcurrent_user.disabled:raiseHTTPException(status_code=400,detail="Inactive user")returncurrent_user@app.post("/token")asyncdeflogin_for_access_token(form_data:Annotated[OAuth2PasswordRequestForm,Depends()],)->Token:user=authenticate_user(fake_users_db,form_data.username,form_data.password)ifnotuser:raiseHTTPException(status_code=400,detail="Incorrect username or password")access_token_expires=timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)access_token=create_access_token(data={"sub":user.username,"scope":" ".join(form_data.scopes)},expires_delta=access_token_expires,)returnToken(access_token=access_token,token_type="bearer")@app.get("/users/me/")asyncdefread_users_me(current_user:Annotated[User,Depends(get_current_active_user)],)->User:returncurrent_user@app.get("/users/me/items/")asyncdefread_own_items(current_user:Annotated[User,Security(get_current_active_user,scopes=["items"])],):return[{"item_id":"Foo","owner":current_user.username}]@app.get("/status/")asyncdefread_system_status(current_user:Annotated[User,Depends(get_current_user)]):return{"status":"ok"}🤓 Other versions and variants
Tip
Prefer to use theAnnotated version if possible.
fromdatetimeimportdatetime,timedelta,timezoneimportjwtfromfastapiimportDepends,FastAPI,HTTPException,Security,statusfromfastapi.securityimport(OAuth2PasswordBearer,OAuth2PasswordRequestForm,SecurityScopes,)fromjwt.exceptionsimportInvalidTokenErrorfrompwdlibimportPasswordHashfrompydanticimportBaseModel,ValidationError# to get a string like this run:# openssl rand -hex 32SECRET_KEY="09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"ALGORITHM="HS256"ACCESS_TOKEN_EXPIRE_MINUTES=30fake_users_db={"johndoe":{"username":"johndoe","full_name":"John Doe","email":"johndoe@example.com","hashed_password":"$argon2id$v=19$m=65536,t=3,p=4$wagCPXjifgvUFBzq4hqe3w$CYaIb8sB+wtD+Vu/P4uod1+Qof8h+1g7bbDlBID48Rc","disabled":False,},"alice":{"username":"alice","full_name":"Alice Chains","email":"alicechains@example.com","hashed_password":"$argon2id$v=19$m=65536,t=3,p=4$g2/AV1zwopqUntPKJavBFw$BwpRGDCyUHLvHICnwijyX8ROGoiUPwNKZ7915MeYfCE","disabled":True,},}classToken(BaseModel):access_token:strtoken_type:strclassTokenData(BaseModel):username:str|None=Nonescopes:list[str]=[]classUser(BaseModel):username:stremail:str|None=Nonefull_name:str|None=Nonedisabled:bool|None=NoneclassUserInDB(User):hashed_password:strpassword_hash=PasswordHash.recommended()DUMMY_HASH=password_hash.hash("dummypassword")oauth2_scheme=OAuth2PasswordBearer(tokenUrl="token",scopes={"me":"Read information about the current user.","items":"Read items."},)app=FastAPI()defverify_password(plain_password,hashed_password):returnpassword_hash.verify(plain_password,hashed_password)defget_password_hash(password):returnpassword_hash.hash(password)defget_user(db,username:str):ifusernameindb:user_dict=db[username]returnUserInDB(**user_dict)defauthenticate_user(fake_db,username:str,password:str):user=get_user(fake_db,username)ifnotuser:verify_password(password,DUMMY_HASH)returnFalseifnotverify_password(password,user.hashed_password):returnFalsereturnuserdefcreate_access_token(data:dict,expires_delta:timedelta|None=None):to_encode=data.copy()ifexpires_delta:expire=datetime.now(timezone.utc)+expires_deltaelse:expire=datetime.now(timezone.utc)+timedelta(minutes=15)to_encode.update({"exp":expire})encoded_jwt=jwt.encode(to_encode,SECRET_KEY,algorithm=ALGORITHM)returnencoded_jwtasyncdefget_current_user(security_scopes:SecurityScopes,token:str=Depends(oauth2_scheme)):ifsecurity_scopes.scopes:authenticate_value=f'Bearer scope="{security_scopes.scope_str}"'else:authenticate_value="Bearer"credentials_exception=HTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Could not validate credentials",headers={"WWW-Authenticate":authenticate_value},)try:payload=jwt.decode(token,SECRET_KEY,algorithms=[ALGORITHM])username:str=payload.get("sub")ifusernameisNone:raisecredentials_exceptionscope:str=payload.get("scope","")token_scopes=scope.split(" ")token_data=TokenData(scopes=token_scopes,username=username)except(InvalidTokenError,ValidationError):raisecredentials_exceptionuser=get_user(fake_users_db,username=token_data.username)ifuserisNone:raisecredentials_exceptionforscopeinsecurity_scopes.scopes:ifscopenotintoken_data.scopes:raiseHTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Not enough permissions",headers={"WWW-Authenticate":authenticate_value},)returnuserasyncdefget_current_active_user(current_user:User=Security(get_current_user,scopes=["me"]),):ifcurrent_user.disabled:raiseHTTPException(status_code=400,detail="Inactive user")returncurrent_user@app.post("/token")asyncdeflogin_for_access_token(form_data:OAuth2PasswordRequestForm=Depends(),)->Token:user=authenticate_user(fake_users_db,form_data.username,form_data.password)ifnotuser:raiseHTTPException(status_code=400,detail="Incorrect username or password")access_token_expires=timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)access_token=create_access_token(data={"sub":user.username,"scope":" ".join(form_data.scopes)},expires_delta=access_token_expires,)returnToken(access_token=access_token,token_type="bearer")@app.get("/users/me/")asyncdefread_users_me(current_user:User=Depends(get_current_active_user))->User:returncurrent_user@app.get("/users/me/items/")asyncdefread_own_items(current_user:User=Security(get_current_active_user,scopes=["items"]),):return[{"item_id":"Foo","owner":current_user.username}]@app.get("/status/")asyncdefread_system_status(current_user:User=Depends(get_current_user)):return{"status":"ok"}Declarar scopes enpath operations y dependencias¶
Ahora declaramos que lapath operation para/users/me/items/ requiere el scopeitems.
Para esto, importamos y usamosSecurity defastapi.
Puedes usarSecurity para declarar dependencias (igual queDepends), peroSecurity también recibe un parámetroscopes con una lista de scopes (strings).
En este caso, pasamos una función de dependenciaget_current_active_user aSecurity (de la misma manera que haríamos conDepends).
Pero también pasamos unalist de scopes, en este caso con solo un scope:items (podría tener más).
Y la función de dependenciaget_current_active_user también puede declarar sub-dependencias, no solo conDepends sino también conSecurity. Declarando su propia función de sub-dependencia (get_current_user), y más requisitos de scope.
En este caso, requiere el scopeme (podría requerir más de un scope).
Nota
No necesariamente necesitas añadir diferentes scopes en diferentes lugares.
Lo estamos haciendo aquí para demostrar cómoFastAPI maneja scopes declarados en diferentes niveles.
fromdatetimeimportdatetime,timedelta,timezonefromtypingimportAnnotatedimportjwtfromfastapiimportDepends,FastAPI,HTTPException,Security,statusfromfastapi.securityimport(OAuth2PasswordBearer,OAuth2PasswordRequestForm,SecurityScopes,)fromjwt.exceptionsimportInvalidTokenErrorfrompwdlibimportPasswordHashfrompydanticimportBaseModel,ValidationError# to get a string like this run:# openssl rand -hex 32SECRET_KEY="09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"ALGORITHM="HS256"ACCESS_TOKEN_EXPIRE_MINUTES=30fake_users_db={"johndoe":{"username":"johndoe","full_name":"John Doe","email":"johndoe@example.com","hashed_password":"$argon2id$v=19$m=65536,t=3,p=4$wagCPXjifgvUFBzq4hqe3w$CYaIb8sB+wtD+Vu/P4uod1+Qof8h+1g7bbDlBID48Rc","disabled":False,},"alice":{"username":"alice","full_name":"Alice Chains","email":"alicechains@example.com","hashed_password":"$argon2id$v=19$m=65536,t=3,p=4$g2/AV1zwopqUntPKJavBFw$BwpRGDCyUHLvHICnwijyX8ROGoiUPwNKZ7915MeYfCE","disabled":True,},}classToken(BaseModel):access_token:strtoken_type:strclassTokenData(BaseModel):username:str|None=Nonescopes:list[str]=[]classUser(BaseModel):username:stremail:str|None=Nonefull_name:str|None=Nonedisabled:bool|None=NoneclassUserInDB(User):hashed_password:strpassword_hash=PasswordHash.recommended()DUMMY_HASH=password_hash.hash("dummypassword")oauth2_scheme=OAuth2PasswordBearer(tokenUrl="token",scopes={"me":"Read information about the current user.","items":"Read items."},)app=FastAPI()defverify_password(plain_password,hashed_password):returnpassword_hash.verify(plain_password,hashed_password)defget_password_hash(password):returnpassword_hash.hash(password)defget_user(db,username:str):ifusernameindb:user_dict=db[username]returnUserInDB(**user_dict)defauthenticate_user(fake_db,username:str,password:str):user=get_user(fake_db,username)ifnotuser:verify_password(password,DUMMY_HASH)returnFalseifnotverify_password(password,user.hashed_password):returnFalsereturnuserdefcreate_access_token(data:dict,expires_delta:timedelta|None=None):to_encode=data.copy()ifexpires_delta:expire=datetime.now(timezone.utc)+expires_deltaelse:expire=datetime.now(timezone.utc)+timedelta(minutes=15)to_encode.update({"exp":expire})encoded_jwt=jwt.encode(to_encode,SECRET_KEY,algorithm=ALGORITHM)returnencoded_jwtasyncdefget_current_user(security_scopes:SecurityScopes,token:Annotated[str,Depends(oauth2_scheme)]):ifsecurity_scopes.scopes:authenticate_value=f'Bearer scope="{security_scopes.scope_str}"'else:authenticate_value="Bearer"credentials_exception=HTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Could not validate credentials",headers={"WWW-Authenticate":authenticate_value},)try:payload=jwt.decode(token,SECRET_KEY,algorithms=[ALGORITHM])username=payload.get("sub")ifusernameisNone:raisecredentials_exceptionscope:str=payload.get("scope","")token_scopes=scope.split(" ")token_data=TokenData(scopes=token_scopes,username=username)except(InvalidTokenError,ValidationError):raisecredentials_exceptionuser=get_user(fake_users_db,username=token_data.username)ifuserisNone:raisecredentials_exceptionforscopeinsecurity_scopes.scopes:ifscopenotintoken_data.scopes:raiseHTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Not enough permissions",headers={"WWW-Authenticate":authenticate_value},)returnuserasyncdefget_current_active_user(current_user:Annotated[User,Security(get_current_user,scopes=["me"])],):ifcurrent_user.disabled:raiseHTTPException(status_code=400,detail="Inactive user")returncurrent_user@app.post("/token")asyncdeflogin_for_access_token(form_data:Annotated[OAuth2PasswordRequestForm,Depends()],)->Token:user=authenticate_user(fake_users_db,form_data.username,form_data.password)ifnotuser:raiseHTTPException(status_code=400,detail="Incorrect username or password")access_token_expires=timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)access_token=create_access_token(data={"sub":user.username,"scope":" ".join(form_data.scopes)},expires_delta=access_token_expires,)returnToken(access_token=access_token,token_type="bearer")@app.get("/users/me/")asyncdefread_users_me(current_user:Annotated[User,Depends(get_current_active_user)],)->User:returncurrent_user@app.get("/users/me/items/")asyncdefread_own_items(current_user:Annotated[User,Security(get_current_active_user,scopes=["items"])],):return[{"item_id":"Foo","owner":current_user.username}]@app.get("/status/")asyncdefread_system_status(current_user:Annotated[User,Depends(get_current_user)]):return{"status":"ok"}🤓 Other versions and variants
Tip
Prefer to use theAnnotated version if possible.
fromdatetimeimportdatetime,timedelta,timezoneimportjwtfromfastapiimportDepends,FastAPI,HTTPException,Security,statusfromfastapi.securityimport(OAuth2PasswordBearer,OAuth2PasswordRequestForm,SecurityScopes,)fromjwt.exceptionsimportInvalidTokenErrorfrompwdlibimportPasswordHashfrompydanticimportBaseModel,ValidationError# to get a string like this run:# openssl rand -hex 32SECRET_KEY="09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"ALGORITHM="HS256"ACCESS_TOKEN_EXPIRE_MINUTES=30fake_users_db={"johndoe":{"username":"johndoe","full_name":"John Doe","email":"johndoe@example.com","hashed_password":"$argon2id$v=19$m=65536,t=3,p=4$wagCPXjifgvUFBzq4hqe3w$CYaIb8sB+wtD+Vu/P4uod1+Qof8h+1g7bbDlBID48Rc","disabled":False,},"alice":{"username":"alice","full_name":"Alice Chains","email":"alicechains@example.com","hashed_password":"$argon2id$v=19$m=65536,t=3,p=4$g2/AV1zwopqUntPKJavBFw$BwpRGDCyUHLvHICnwijyX8ROGoiUPwNKZ7915MeYfCE","disabled":True,},}classToken(BaseModel):access_token:strtoken_type:strclassTokenData(BaseModel):username:str|None=Nonescopes:list[str]=[]classUser(BaseModel):username:stremail:str|None=Nonefull_name:str|None=Nonedisabled:bool|None=NoneclassUserInDB(User):hashed_password:strpassword_hash=PasswordHash.recommended()DUMMY_HASH=password_hash.hash("dummypassword")oauth2_scheme=OAuth2PasswordBearer(tokenUrl="token",scopes={"me":"Read information about the current user.","items":"Read items."},)app=FastAPI()defverify_password(plain_password,hashed_password):returnpassword_hash.verify(plain_password,hashed_password)defget_password_hash(password):returnpassword_hash.hash(password)defget_user(db,username:str):ifusernameindb:user_dict=db[username]returnUserInDB(**user_dict)defauthenticate_user(fake_db,username:str,password:str):user=get_user(fake_db,username)ifnotuser:verify_password(password,DUMMY_HASH)returnFalseifnotverify_password(password,user.hashed_password):returnFalsereturnuserdefcreate_access_token(data:dict,expires_delta:timedelta|None=None):to_encode=data.copy()ifexpires_delta:expire=datetime.now(timezone.utc)+expires_deltaelse:expire=datetime.now(timezone.utc)+timedelta(minutes=15)to_encode.update({"exp":expire})encoded_jwt=jwt.encode(to_encode,SECRET_KEY,algorithm=ALGORITHM)returnencoded_jwtasyncdefget_current_user(security_scopes:SecurityScopes,token:str=Depends(oauth2_scheme)):ifsecurity_scopes.scopes:authenticate_value=f'Bearer scope="{security_scopes.scope_str}"'else:authenticate_value="Bearer"credentials_exception=HTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Could not validate credentials",headers={"WWW-Authenticate":authenticate_value},)try:payload=jwt.decode(token,SECRET_KEY,algorithms=[ALGORITHM])username:str=payload.get("sub")ifusernameisNone:raisecredentials_exceptionscope:str=payload.get("scope","")token_scopes=scope.split(" ")token_data=TokenData(scopes=token_scopes,username=username)except(InvalidTokenError,ValidationError):raisecredentials_exceptionuser=get_user(fake_users_db,username=token_data.username)ifuserisNone:raisecredentials_exceptionforscopeinsecurity_scopes.scopes:ifscopenotintoken_data.scopes:raiseHTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Not enough permissions",headers={"WWW-Authenticate":authenticate_value},)returnuserasyncdefget_current_active_user(current_user:User=Security(get_current_user,scopes=["me"]),):ifcurrent_user.disabled:raiseHTTPException(status_code=400,detail="Inactive user")returncurrent_user@app.post("/token")asyncdeflogin_for_access_token(form_data:OAuth2PasswordRequestForm=Depends(),)->Token:user=authenticate_user(fake_users_db,form_data.username,form_data.password)ifnotuser:raiseHTTPException(status_code=400,detail="Incorrect username or password")access_token_expires=timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)access_token=create_access_token(data={"sub":user.username,"scope":" ".join(form_data.scopes)},expires_delta=access_token_expires,)returnToken(access_token=access_token,token_type="bearer")@app.get("/users/me/")asyncdefread_users_me(current_user:User=Depends(get_current_active_user))->User:returncurrent_user@app.get("/users/me/items/")asyncdefread_own_items(current_user:User=Security(get_current_active_user,scopes=["items"]),):return[{"item_id":"Foo","owner":current_user.username}]@app.get("/status/")asyncdefread_system_status(current_user:User=Depends(get_current_user)):return{"status":"ok"}Información Técnica
Security es en realidad una subclase deDepends, y tiene solo un parámetro extra que veremos más adelante.
Pero al usarSecurity en lugar deDepends,FastAPI sabrá que puede declarar scopes de seguridad, usarlos internamente y documentar la API con OpenAPI.
Pero cuando importasQuery,Path,Depends,Security y otros defastapi, en realidad son funciones que devuelven clases especiales.
UsarSecurityScopes¶
Ahora actualiza la dependenciaget_current_user.
Esta es la que usan las dependencias anteriores.
Aquí es donde estamos usando el mismo esquema de OAuth2 que creamos antes, declarándolo como una dependencia:oauth2_scheme.
Porque esta función de dependencia no tiene ningún requisito de scope en sí, podemos usarDepends conoauth2_scheme, no tenemos que usarSecurity cuando no necesitamos especificar scopes de seguridad.
También declaramos un parámetro especial de tipoSecurityScopes, importado defastapi.security.
Esta claseSecurityScopes es similar aRequest (Request se usó para obtener el objeto request directamente).
fromdatetimeimportdatetime,timedelta,timezonefromtypingimportAnnotatedimportjwtfromfastapiimportDepends,FastAPI,HTTPException,Security,statusfromfastapi.securityimport(OAuth2PasswordBearer,OAuth2PasswordRequestForm,SecurityScopes,)fromjwt.exceptionsimportInvalidTokenErrorfrompwdlibimportPasswordHashfrompydanticimportBaseModel,ValidationError# to get a string like this run:# openssl rand -hex 32SECRET_KEY="09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"ALGORITHM="HS256"ACCESS_TOKEN_EXPIRE_MINUTES=30fake_users_db={"johndoe":{"username":"johndoe","full_name":"John Doe","email":"johndoe@example.com","hashed_password":"$argon2id$v=19$m=65536,t=3,p=4$wagCPXjifgvUFBzq4hqe3w$CYaIb8sB+wtD+Vu/P4uod1+Qof8h+1g7bbDlBID48Rc","disabled":False,},"alice":{"username":"alice","full_name":"Alice Chains","email":"alicechains@example.com","hashed_password":"$argon2id$v=19$m=65536,t=3,p=4$g2/AV1zwopqUntPKJavBFw$BwpRGDCyUHLvHICnwijyX8ROGoiUPwNKZ7915MeYfCE","disabled":True,},}classToken(BaseModel):access_token:strtoken_type:strclassTokenData(BaseModel):username:str|None=Nonescopes:list[str]=[]classUser(BaseModel):username:stremail:str|None=Nonefull_name:str|None=Nonedisabled:bool|None=NoneclassUserInDB(User):hashed_password:strpassword_hash=PasswordHash.recommended()DUMMY_HASH=password_hash.hash("dummypassword")oauth2_scheme=OAuth2PasswordBearer(tokenUrl="token",scopes={"me":"Read information about the current user.","items":"Read items."},)app=FastAPI()defverify_password(plain_password,hashed_password):returnpassword_hash.verify(plain_password,hashed_password)defget_password_hash(password):returnpassword_hash.hash(password)defget_user(db,username:str):ifusernameindb:user_dict=db[username]returnUserInDB(**user_dict)defauthenticate_user(fake_db,username:str,password:str):user=get_user(fake_db,username)ifnotuser:verify_password(password,DUMMY_HASH)returnFalseifnotverify_password(password,user.hashed_password):returnFalsereturnuserdefcreate_access_token(data:dict,expires_delta:timedelta|None=None):to_encode=data.copy()ifexpires_delta:expire=datetime.now(timezone.utc)+expires_deltaelse:expire=datetime.now(timezone.utc)+timedelta(minutes=15)to_encode.update({"exp":expire})encoded_jwt=jwt.encode(to_encode,SECRET_KEY,algorithm=ALGORITHM)returnencoded_jwtasyncdefget_current_user(security_scopes:SecurityScopes,token:Annotated[str,Depends(oauth2_scheme)]):ifsecurity_scopes.scopes:authenticate_value=f'Bearer scope="{security_scopes.scope_str}"'else:authenticate_value="Bearer"credentials_exception=HTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Could not validate credentials",headers={"WWW-Authenticate":authenticate_value},)try:payload=jwt.decode(token,SECRET_KEY,algorithms=[ALGORITHM])username=payload.get("sub")ifusernameisNone:raisecredentials_exceptionscope:str=payload.get("scope","")token_scopes=scope.split(" ")token_data=TokenData(scopes=token_scopes,username=username)except(InvalidTokenError,ValidationError):raisecredentials_exceptionuser=get_user(fake_users_db,username=token_data.username)ifuserisNone:raisecredentials_exceptionforscopeinsecurity_scopes.scopes:ifscopenotintoken_data.scopes:raiseHTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Not enough permissions",headers={"WWW-Authenticate":authenticate_value},)returnuserasyncdefget_current_active_user(current_user:Annotated[User,Security(get_current_user,scopes=["me"])],):ifcurrent_user.disabled:raiseHTTPException(status_code=400,detail="Inactive user")returncurrent_user@app.post("/token")asyncdeflogin_for_access_token(form_data:Annotated[OAuth2PasswordRequestForm,Depends()],)->Token:user=authenticate_user(fake_users_db,form_data.username,form_data.password)ifnotuser:raiseHTTPException(status_code=400,detail="Incorrect username or password")access_token_expires=timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)access_token=create_access_token(data={"sub":user.username,"scope":" ".join(form_data.scopes)},expires_delta=access_token_expires,)returnToken(access_token=access_token,token_type="bearer")@app.get("/users/me/")asyncdefread_users_me(current_user:Annotated[User,Depends(get_current_active_user)],)->User:returncurrent_user@app.get("/users/me/items/")asyncdefread_own_items(current_user:Annotated[User,Security(get_current_active_user,scopes=["items"])],):return[{"item_id":"Foo","owner":current_user.username}]@app.get("/status/")asyncdefread_system_status(current_user:Annotated[User,Depends(get_current_user)]):return{"status":"ok"}🤓 Other versions and variants
Tip
Prefer to use theAnnotated version if possible.
fromdatetimeimportdatetime,timedelta,timezoneimportjwtfromfastapiimportDepends,FastAPI,HTTPException,Security,statusfromfastapi.securityimport(OAuth2PasswordBearer,OAuth2PasswordRequestForm,SecurityScopes,)fromjwt.exceptionsimportInvalidTokenErrorfrompwdlibimportPasswordHashfrompydanticimportBaseModel,ValidationError# to get a string like this run:# openssl rand -hex 32SECRET_KEY="09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"ALGORITHM="HS256"ACCESS_TOKEN_EXPIRE_MINUTES=30fake_users_db={"johndoe":{"username":"johndoe","full_name":"John Doe","email":"johndoe@example.com","hashed_password":"$argon2id$v=19$m=65536,t=3,p=4$wagCPXjifgvUFBzq4hqe3w$CYaIb8sB+wtD+Vu/P4uod1+Qof8h+1g7bbDlBID48Rc","disabled":False,},"alice":{"username":"alice","full_name":"Alice Chains","email":"alicechains@example.com","hashed_password":"$argon2id$v=19$m=65536,t=3,p=4$g2/AV1zwopqUntPKJavBFw$BwpRGDCyUHLvHICnwijyX8ROGoiUPwNKZ7915MeYfCE","disabled":True,},}classToken(BaseModel):access_token:strtoken_type:strclassTokenData(BaseModel):username:str|None=Nonescopes:list[str]=[]classUser(BaseModel):username:stremail:str|None=Nonefull_name:str|None=Nonedisabled:bool|None=NoneclassUserInDB(User):hashed_password:strpassword_hash=PasswordHash.recommended()DUMMY_HASH=password_hash.hash("dummypassword")oauth2_scheme=OAuth2PasswordBearer(tokenUrl="token",scopes={"me":"Read information about the current user.","items":"Read items."},)app=FastAPI()defverify_password(plain_password,hashed_password):returnpassword_hash.verify(plain_password,hashed_password)defget_password_hash(password):returnpassword_hash.hash(password)defget_user(db,username:str):ifusernameindb:user_dict=db[username]returnUserInDB(**user_dict)defauthenticate_user(fake_db,username:str,password:str):user=get_user(fake_db,username)ifnotuser:verify_password(password,DUMMY_HASH)returnFalseifnotverify_password(password,user.hashed_password):returnFalsereturnuserdefcreate_access_token(data:dict,expires_delta:timedelta|None=None):to_encode=data.copy()ifexpires_delta:expire=datetime.now(timezone.utc)+expires_deltaelse:expire=datetime.now(timezone.utc)+timedelta(minutes=15)to_encode.update({"exp":expire})encoded_jwt=jwt.encode(to_encode,SECRET_KEY,algorithm=ALGORITHM)returnencoded_jwtasyncdefget_current_user(security_scopes:SecurityScopes,token:str=Depends(oauth2_scheme)):ifsecurity_scopes.scopes:authenticate_value=f'Bearer scope="{security_scopes.scope_str}"'else:authenticate_value="Bearer"credentials_exception=HTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Could not validate credentials",headers={"WWW-Authenticate":authenticate_value},)try:payload=jwt.decode(token,SECRET_KEY,algorithms=[ALGORITHM])username:str=payload.get("sub")ifusernameisNone:raisecredentials_exceptionscope:str=payload.get("scope","")token_scopes=scope.split(" ")token_data=TokenData(scopes=token_scopes,username=username)except(InvalidTokenError,ValidationError):raisecredentials_exceptionuser=get_user(fake_users_db,username=token_data.username)ifuserisNone:raisecredentials_exceptionforscopeinsecurity_scopes.scopes:ifscopenotintoken_data.scopes:raiseHTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Not enough permissions",headers={"WWW-Authenticate":authenticate_value},)returnuserasyncdefget_current_active_user(current_user:User=Security(get_current_user,scopes=["me"]),):ifcurrent_user.disabled:raiseHTTPException(status_code=400,detail="Inactive user")returncurrent_user@app.post("/token")asyncdeflogin_for_access_token(form_data:OAuth2PasswordRequestForm=Depends(),)->Token:user=authenticate_user(fake_users_db,form_data.username,form_data.password)ifnotuser:raiseHTTPException(status_code=400,detail="Incorrect username or password")access_token_expires=timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)access_token=create_access_token(data={"sub":user.username,"scope":" ".join(form_data.scopes)},expires_delta=access_token_expires,)returnToken(access_token=access_token,token_type="bearer")@app.get("/users/me/")asyncdefread_users_me(current_user:User=Depends(get_current_active_user))->User:returncurrent_user@app.get("/users/me/items/")asyncdefread_own_items(current_user:User=Security(get_current_active_user,scopes=["items"]),):return[{"item_id":"Foo","owner":current_user.username}]@app.get("/status/")asyncdefread_system_status(current_user:User=Depends(get_current_user)):return{"status":"ok"}Usar losscopes¶
El parámetrosecurity_scopes será del tipoSecurityScopes.
Tendrá una propiedadscopes con una lista que contiene todos los scopes requeridos por sí mismo y por todas las dependencias que lo usan como sub-dependencia. Eso significa, todos los "dependientes"... esto podría sonar confuso, se explica de nuevo más abajo.
El objetosecurity_scopes (de la claseSecurityScopes) también proporciona un atributoscope_str con un único string, que contiene esos scopes separados por espacios (lo vamos a usar).
Creamos unaHTTPException que podemos reutilizar (raise) más tarde en varios puntos.
En esta excepción, incluimos los scopes requeridos (si los hay) como un string separado por espacios (usandoscope_str). Ponemos ese string que contiene los scopes en el headerWWW-Authenticate (esto es parte de la especificación).
fromdatetimeimportdatetime,timedelta,timezonefromtypingimportAnnotatedimportjwtfromfastapiimportDepends,FastAPI,HTTPException,Security,statusfromfastapi.securityimport(OAuth2PasswordBearer,OAuth2PasswordRequestForm,SecurityScopes,)fromjwt.exceptionsimportInvalidTokenErrorfrompwdlibimportPasswordHashfrompydanticimportBaseModel,ValidationError# to get a string like this run:# openssl rand -hex 32SECRET_KEY="09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"ALGORITHM="HS256"ACCESS_TOKEN_EXPIRE_MINUTES=30fake_users_db={"johndoe":{"username":"johndoe","full_name":"John Doe","email":"johndoe@example.com","hashed_password":"$argon2id$v=19$m=65536,t=3,p=4$wagCPXjifgvUFBzq4hqe3w$CYaIb8sB+wtD+Vu/P4uod1+Qof8h+1g7bbDlBID48Rc","disabled":False,},"alice":{"username":"alice","full_name":"Alice Chains","email":"alicechains@example.com","hashed_password":"$argon2id$v=19$m=65536,t=3,p=4$g2/AV1zwopqUntPKJavBFw$BwpRGDCyUHLvHICnwijyX8ROGoiUPwNKZ7915MeYfCE","disabled":True,},}classToken(BaseModel):access_token:strtoken_type:strclassTokenData(BaseModel):username:str|None=Nonescopes:list[str]=[]classUser(BaseModel):username:stremail:str|None=Nonefull_name:str|None=Nonedisabled:bool|None=NoneclassUserInDB(User):hashed_password:strpassword_hash=PasswordHash.recommended()DUMMY_HASH=password_hash.hash("dummypassword")oauth2_scheme=OAuth2PasswordBearer(tokenUrl="token",scopes={"me":"Read information about the current user.","items":"Read items."},)app=FastAPI()defverify_password(plain_password,hashed_password):returnpassword_hash.verify(plain_password,hashed_password)defget_password_hash(password):returnpassword_hash.hash(password)defget_user(db,username:str):ifusernameindb:user_dict=db[username]returnUserInDB(**user_dict)defauthenticate_user(fake_db,username:str,password:str):user=get_user(fake_db,username)ifnotuser:verify_password(password,DUMMY_HASH)returnFalseifnotverify_password(password,user.hashed_password):returnFalsereturnuserdefcreate_access_token(data:dict,expires_delta:timedelta|None=None):to_encode=data.copy()ifexpires_delta:expire=datetime.now(timezone.utc)+expires_deltaelse:expire=datetime.now(timezone.utc)+timedelta(minutes=15)to_encode.update({"exp":expire})encoded_jwt=jwt.encode(to_encode,SECRET_KEY,algorithm=ALGORITHM)returnencoded_jwtasyncdefget_current_user(security_scopes:SecurityScopes,token:Annotated[str,Depends(oauth2_scheme)]):ifsecurity_scopes.scopes:authenticate_value=f'Bearer scope="{security_scopes.scope_str}"'else:authenticate_value="Bearer"credentials_exception=HTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Could not validate credentials",headers={"WWW-Authenticate":authenticate_value},)try:payload=jwt.decode(token,SECRET_KEY,algorithms=[ALGORITHM])username=payload.get("sub")ifusernameisNone:raisecredentials_exceptionscope:str=payload.get("scope","")token_scopes=scope.split(" ")token_data=TokenData(scopes=token_scopes,username=username)except(InvalidTokenError,ValidationError):raisecredentials_exceptionuser=get_user(fake_users_db,username=token_data.username)ifuserisNone:raisecredentials_exceptionforscopeinsecurity_scopes.scopes:ifscopenotintoken_data.scopes:raiseHTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Not enough permissions",headers={"WWW-Authenticate":authenticate_value},)returnuserasyncdefget_current_active_user(current_user:Annotated[User,Security(get_current_user,scopes=["me"])],):ifcurrent_user.disabled:raiseHTTPException(status_code=400,detail="Inactive user")returncurrent_user@app.post("/token")asyncdeflogin_for_access_token(form_data:Annotated[OAuth2PasswordRequestForm,Depends()],)->Token:user=authenticate_user(fake_users_db,form_data.username,form_data.password)ifnotuser:raiseHTTPException(status_code=400,detail="Incorrect username or password")access_token_expires=timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)access_token=create_access_token(data={"sub":user.username,"scope":" ".join(form_data.scopes)},expires_delta=access_token_expires,)returnToken(access_token=access_token,token_type="bearer")@app.get("/users/me/")asyncdefread_users_me(current_user:Annotated[User,Depends(get_current_active_user)],)->User:returncurrent_user@app.get("/users/me/items/")asyncdefread_own_items(current_user:Annotated[User,Security(get_current_active_user,scopes=["items"])],):return[{"item_id":"Foo","owner":current_user.username}]@app.get("/status/")asyncdefread_system_status(current_user:Annotated[User,Depends(get_current_user)]):return{"status":"ok"}🤓 Other versions and variants
Tip
Prefer to use theAnnotated version if possible.
fromdatetimeimportdatetime,timedelta,timezoneimportjwtfromfastapiimportDepends,FastAPI,HTTPException,Security,statusfromfastapi.securityimport(OAuth2PasswordBearer,OAuth2PasswordRequestForm,SecurityScopes,)fromjwt.exceptionsimportInvalidTokenErrorfrompwdlibimportPasswordHashfrompydanticimportBaseModel,ValidationError# to get a string like this run:# openssl rand -hex 32SECRET_KEY="09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"ALGORITHM="HS256"ACCESS_TOKEN_EXPIRE_MINUTES=30fake_users_db={"johndoe":{"username":"johndoe","full_name":"John Doe","email":"johndoe@example.com","hashed_password":"$argon2id$v=19$m=65536,t=3,p=4$wagCPXjifgvUFBzq4hqe3w$CYaIb8sB+wtD+Vu/P4uod1+Qof8h+1g7bbDlBID48Rc","disabled":False,},"alice":{"username":"alice","full_name":"Alice Chains","email":"alicechains@example.com","hashed_password":"$argon2id$v=19$m=65536,t=3,p=4$g2/AV1zwopqUntPKJavBFw$BwpRGDCyUHLvHICnwijyX8ROGoiUPwNKZ7915MeYfCE","disabled":True,},}classToken(BaseModel):access_token:strtoken_type:strclassTokenData(BaseModel):username:str|None=Nonescopes:list[str]=[]classUser(BaseModel):username:stremail:str|None=Nonefull_name:str|None=Nonedisabled:bool|None=NoneclassUserInDB(User):hashed_password:strpassword_hash=PasswordHash.recommended()DUMMY_HASH=password_hash.hash("dummypassword")oauth2_scheme=OAuth2PasswordBearer(tokenUrl="token",scopes={"me":"Read information about the current user.","items":"Read items."},)app=FastAPI()defverify_password(plain_password,hashed_password):returnpassword_hash.verify(plain_password,hashed_password)defget_password_hash(password):returnpassword_hash.hash(password)defget_user(db,username:str):ifusernameindb:user_dict=db[username]returnUserInDB(**user_dict)defauthenticate_user(fake_db,username:str,password:str):user=get_user(fake_db,username)ifnotuser:verify_password(password,DUMMY_HASH)returnFalseifnotverify_password(password,user.hashed_password):returnFalsereturnuserdefcreate_access_token(data:dict,expires_delta:timedelta|None=None):to_encode=data.copy()ifexpires_delta:expire=datetime.now(timezone.utc)+expires_deltaelse:expire=datetime.now(timezone.utc)+timedelta(minutes=15)to_encode.update({"exp":expire})encoded_jwt=jwt.encode(to_encode,SECRET_KEY,algorithm=ALGORITHM)returnencoded_jwtasyncdefget_current_user(security_scopes:SecurityScopes,token:str=Depends(oauth2_scheme)):ifsecurity_scopes.scopes:authenticate_value=f'Bearer scope="{security_scopes.scope_str}"'else:authenticate_value="Bearer"credentials_exception=HTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Could not validate credentials",headers={"WWW-Authenticate":authenticate_value},)try:payload=jwt.decode(token,SECRET_KEY,algorithms=[ALGORITHM])username:str=payload.get("sub")ifusernameisNone:raisecredentials_exceptionscope:str=payload.get("scope","")token_scopes=scope.split(" ")token_data=TokenData(scopes=token_scopes,username=username)except(InvalidTokenError,ValidationError):raisecredentials_exceptionuser=get_user(fake_users_db,username=token_data.username)ifuserisNone:raisecredentials_exceptionforscopeinsecurity_scopes.scopes:ifscopenotintoken_data.scopes:raiseHTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Not enough permissions",headers={"WWW-Authenticate":authenticate_value},)returnuserasyncdefget_current_active_user(current_user:User=Security(get_current_user,scopes=["me"]),):ifcurrent_user.disabled:raiseHTTPException(status_code=400,detail="Inactive user")returncurrent_user@app.post("/token")asyncdeflogin_for_access_token(form_data:OAuth2PasswordRequestForm=Depends(),)->Token:user=authenticate_user(fake_users_db,form_data.username,form_data.password)ifnotuser:raiseHTTPException(status_code=400,detail="Incorrect username or password")access_token_expires=timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)access_token=create_access_token(data={"sub":user.username,"scope":" ".join(form_data.scopes)},expires_delta=access_token_expires,)returnToken(access_token=access_token,token_type="bearer")@app.get("/users/me/")asyncdefread_users_me(current_user:User=Depends(get_current_active_user))->User:returncurrent_user@app.get("/users/me/items/")asyncdefread_own_items(current_user:User=Security(get_current_active_user,scopes=["items"]),):return[{"item_id":"Foo","owner":current_user.username}]@app.get("/status/")asyncdefread_system_status(current_user:User=Depends(get_current_user)):return{"status":"ok"}Verificar elusername y la forma de los datos¶
Verificamos que obtenemos unusername, y extraemos los scopes.
Y luego validamos esos datos con el modelo de Pydantic (capturando la excepciónValidationError), y si obtenemos un error leyendo el token JWT o validando los datos con Pydantic, lanzamos laHTTPException que creamos antes.
Para eso, actualizamos el modelo de PydanticTokenData con una nueva propiedadscopes.
Al validar los datos con Pydantic podemos asegurarnos de que tenemos, por ejemplo, exactamente unalist destr con los scopes y unstr con elusername.
En lugar de, por ejemplo, undict, o algo más, ya que podría romper la aplicación en algún punto posterior, haciéndolo un riesgo de seguridad.
También verificamos que tenemos un usuario con ese username, y si no, lanzamos esa misma excepción que creamos antes.
fromdatetimeimportdatetime,timedelta,timezonefromtypingimportAnnotatedimportjwtfromfastapiimportDepends,FastAPI,HTTPException,Security,statusfromfastapi.securityimport(OAuth2PasswordBearer,OAuth2PasswordRequestForm,SecurityScopes,)fromjwt.exceptionsimportInvalidTokenErrorfrompwdlibimportPasswordHashfrompydanticimportBaseModel,ValidationError# to get a string like this run:# openssl rand -hex 32SECRET_KEY="09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"ALGORITHM="HS256"ACCESS_TOKEN_EXPIRE_MINUTES=30fake_users_db={"johndoe":{"username":"johndoe","full_name":"John Doe","email":"johndoe@example.com","hashed_password":"$argon2id$v=19$m=65536,t=3,p=4$wagCPXjifgvUFBzq4hqe3w$CYaIb8sB+wtD+Vu/P4uod1+Qof8h+1g7bbDlBID48Rc","disabled":False,},"alice":{"username":"alice","full_name":"Alice Chains","email":"alicechains@example.com","hashed_password":"$argon2id$v=19$m=65536,t=3,p=4$g2/AV1zwopqUntPKJavBFw$BwpRGDCyUHLvHICnwijyX8ROGoiUPwNKZ7915MeYfCE","disabled":True,},}classToken(BaseModel):access_token:strtoken_type:strclassTokenData(BaseModel):username:str|None=Nonescopes:list[str]=[]classUser(BaseModel):username:stremail:str|None=Nonefull_name:str|None=Nonedisabled:bool|None=NoneclassUserInDB(User):hashed_password:strpassword_hash=PasswordHash.recommended()DUMMY_HASH=password_hash.hash("dummypassword")oauth2_scheme=OAuth2PasswordBearer(tokenUrl="token",scopes={"me":"Read information about the current user.","items":"Read items."},)app=FastAPI()defverify_password(plain_password,hashed_password):returnpassword_hash.verify(plain_password,hashed_password)defget_password_hash(password):returnpassword_hash.hash(password)defget_user(db,username:str):ifusernameindb:user_dict=db[username]returnUserInDB(**user_dict)defauthenticate_user(fake_db,username:str,password:str):user=get_user(fake_db,username)ifnotuser:verify_password(password,DUMMY_HASH)returnFalseifnotverify_password(password,user.hashed_password):returnFalsereturnuserdefcreate_access_token(data:dict,expires_delta:timedelta|None=None):to_encode=data.copy()ifexpires_delta:expire=datetime.now(timezone.utc)+expires_deltaelse:expire=datetime.now(timezone.utc)+timedelta(minutes=15)to_encode.update({"exp":expire})encoded_jwt=jwt.encode(to_encode,SECRET_KEY,algorithm=ALGORITHM)returnencoded_jwtasyncdefget_current_user(security_scopes:SecurityScopes,token:Annotated[str,Depends(oauth2_scheme)]):ifsecurity_scopes.scopes:authenticate_value=f'Bearer scope="{security_scopes.scope_str}"'else:authenticate_value="Bearer"credentials_exception=HTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Could not validate credentials",headers={"WWW-Authenticate":authenticate_value},)try:payload=jwt.decode(token,SECRET_KEY,algorithms=[ALGORITHM])username=payload.get("sub")ifusernameisNone:raisecredentials_exceptionscope:str=payload.get("scope","")token_scopes=scope.split(" ")token_data=TokenData(scopes=token_scopes,username=username)except(InvalidTokenError,ValidationError):raisecredentials_exceptionuser=get_user(fake_users_db,username=token_data.username)ifuserisNone:raisecredentials_exceptionforscopeinsecurity_scopes.scopes:ifscopenotintoken_data.scopes:raiseHTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Not enough permissions",headers={"WWW-Authenticate":authenticate_value},)returnuserasyncdefget_current_active_user(current_user:Annotated[User,Security(get_current_user,scopes=["me"])],):ifcurrent_user.disabled:raiseHTTPException(status_code=400,detail="Inactive user")returncurrent_user@app.post("/token")asyncdeflogin_for_access_token(form_data:Annotated[OAuth2PasswordRequestForm,Depends()],)->Token:user=authenticate_user(fake_users_db,form_data.username,form_data.password)ifnotuser:raiseHTTPException(status_code=400,detail="Incorrect username or password")access_token_expires=timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)access_token=create_access_token(data={"sub":user.username,"scope":" ".join(form_data.scopes)},expires_delta=access_token_expires,)returnToken(access_token=access_token,token_type="bearer")@app.get("/users/me/")asyncdefread_users_me(current_user:Annotated[User,Depends(get_current_active_user)],)->User:returncurrent_user@app.get("/users/me/items/")asyncdefread_own_items(current_user:Annotated[User,Security(get_current_active_user,scopes=["items"])],):return[{"item_id":"Foo","owner":current_user.username}]@app.get("/status/")asyncdefread_system_status(current_user:Annotated[User,Depends(get_current_user)]):return{"status":"ok"}🤓 Other versions and variants
Tip
Prefer to use theAnnotated version if possible.
fromdatetimeimportdatetime,timedelta,timezoneimportjwtfromfastapiimportDepends,FastAPI,HTTPException,Security,statusfromfastapi.securityimport(OAuth2PasswordBearer,OAuth2PasswordRequestForm,SecurityScopes,)fromjwt.exceptionsimportInvalidTokenErrorfrompwdlibimportPasswordHashfrompydanticimportBaseModel,ValidationError# to get a string like this run:# openssl rand -hex 32SECRET_KEY="09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"ALGORITHM="HS256"ACCESS_TOKEN_EXPIRE_MINUTES=30fake_users_db={"johndoe":{"username":"johndoe","full_name":"John Doe","email":"johndoe@example.com","hashed_password":"$argon2id$v=19$m=65536,t=3,p=4$wagCPXjifgvUFBzq4hqe3w$CYaIb8sB+wtD+Vu/P4uod1+Qof8h+1g7bbDlBID48Rc","disabled":False,},"alice":{"username":"alice","full_name":"Alice Chains","email":"alicechains@example.com","hashed_password":"$argon2id$v=19$m=65536,t=3,p=4$g2/AV1zwopqUntPKJavBFw$BwpRGDCyUHLvHICnwijyX8ROGoiUPwNKZ7915MeYfCE","disabled":True,},}classToken(BaseModel):access_token:strtoken_type:strclassTokenData(BaseModel):username:str|None=Nonescopes:list[str]=[]classUser(BaseModel):username:stremail:str|None=Nonefull_name:str|None=Nonedisabled:bool|None=NoneclassUserInDB(User):hashed_password:strpassword_hash=PasswordHash.recommended()DUMMY_HASH=password_hash.hash("dummypassword")oauth2_scheme=OAuth2PasswordBearer(tokenUrl="token",scopes={"me":"Read information about the current user.","items":"Read items."},)app=FastAPI()defverify_password(plain_password,hashed_password):returnpassword_hash.verify(plain_password,hashed_password)defget_password_hash(password):returnpassword_hash.hash(password)defget_user(db,username:str):ifusernameindb:user_dict=db[username]returnUserInDB(**user_dict)defauthenticate_user(fake_db,username:str,password:str):user=get_user(fake_db,username)ifnotuser:verify_password(password,DUMMY_HASH)returnFalseifnotverify_password(password,user.hashed_password):returnFalsereturnuserdefcreate_access_token(data:dict,expires_delta:timedelta|None=None):to_encode=data.copy()ifexpires_delta:expire=datetime.now(timezone.utc)+expires_deltaelse:expire=datetime.now(timezone.utc)+timedelta(minutes=15)to_encode.update({"exp":expire})encoded_jwt=jwt.encode(to_encode,SECRET_KEY,algorithm=ALGORITHM)returnencoded_jwtasyncdefget_current_user(security_scopes:SecurityScopes,token:str=Depends(oauth2_scheme)):ifsecurity_scopes.scopes:authenticate_value=f'Bearer scope="{security_scopes.scope_str}"'else:authenticate_value="Bearer"credentials_exception=HTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Could not validate credentials",headers={"WWW-Authenticate":authenticate_value},)try:payload=jwt.decode(token,SECRET_KEY,algorithms=[ALGORITHM])username:str=payload.get("sub")ifusernameisNone:raisecredentials_exceptionscope:str=payload.get("scope","")token_scopes=scope.split(" ")token_data=TokenData(scopes=token_scopes,username=username)except(InvalidTokenError,ValidationError):raisecredentials_exceptionuser=get_user(fake_users_db,username=token_data.username)ifuserisNone:raisecredentials_exceptionforscopeinsecurity_scopes.scopes:ifscopenotintoken_data.scopes:raiseHTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Not enough permissions",headers={"WWW-Authenticate":authenticate_value},)returnuserasyncdefget_current_active_user(current_user:User=Security(get_current_user,scopes=["me"]),):ifcurrent_user.disabled:raiseHTTPException(status_code=400,detail="Inactive user")returncurrent_user@app.post("/token")asyncdeflogin_for_access_token(form_data:OAuth2PasswordRequestForm=Depends(),)->Token:user=authenticate_user(fake_users_db,form_data.username,form_data.password)ifnotuser:raiseHTTPException(status_code=400,detail="Incorrect username or password")access_token_expires=timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)access_token=create_access_token(data={"sub":user.username,"scope":" ".join(form_data.scopes)},expires_delta=access_token_expires,)returnToken(access_token=access_token,token_type="bearer")@app.get("/users/me/")asyncdefread_users_me(current_user:User=Depends(get_current_active_user))->User:returncurrent_user@app.get("/users/me/items/")asyncdefread_own_items(current_user:User=Security(get_current_active_user,scopes=["items"]),):return[{"item_id":"Foo","owner":current_user.username}]@app.get("/status/")asyncdefread_system_status(current_user:User=Depends(get_current_user)):return{"status":"ok"}Verificar losscopes¶
Ahora verificamos que todos los scopes requeridos, por esta dependencia y todos los dependientes (incluyendopath operations), estén incluidos en los scopes proporcionados en el token recibido, de lo contrario, lanzamos unaHTTPException.
Para esto, usamossecurity_scopes.scopes, que contiene unalist con todos estos scopes comostr.
fromdatetimeimportdatetime,timedelta,timezonefromtypingimportAnnotatedimportjwtfromfastapiimportDepends,FastAPI,HTTPException,Security,statusfromfastapi.securityimport(OAuth2PasswordBearer,OAuth2PasswordRequestForm,SecurityScopes,)fromjwt.exceptionsimportInvalidTokenErrorfrompwdlibimportPasswordHashfrompydanticimportBaseModel,ValidationError# to get a string like this run:# openssl rand -hex 32SECRET_KEY="09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"ALGORITHM="HS256"ACCESS_TOKEN_EXPIRE_MINUTES=30fake_users_db={"johndoe":{"username":"johndoe","full_name":"John Doe","email":"johndoe@example.com","hashed_password":"$argon2id$v=19$m=65536,t=3,p=4$wagCPXjifgvUFBzq4hqe3w$CYaIb8sB+wtD+Vu/P4uod1+Qof8h+1g7bbDlBID48Rc","disabled":False,},"alice":{"username":"alice","full_name":"Alice Chains","email":"alicechains@example.com","hashed_password":"$argon2id$v=19$m=65536,t=3,p=4$g2/AV1zwopqUntPKJavBFw$BwpRGDCyUHLvHICnwijyX8ROGoiUPwNKZ7915MeYfCE","disabled":True,},}classToken(BaseModel):access_token:strtoken_type:strclassTokenData(BaseModel):username:str|None=Nonescopes:list[str]=[]classUser(BaseModel):username:stremail:str|None=Nonefull_name:str|None=Nonedisabled:bool|None=NoneclassUserInDB(User):hashed_password:strpassword_hash=PasswordHash.recommended()DUMMY_HASH=password_hash.hash("dummypassword")oauth2_scheme=OAuth2PasswordBearer(tokenUrl="token",scopes={"me":"Read information about the current user.","items":"Read items."},)app=FastAPI()defverify_password(plain_password,hashed_password):returnpassword_hash.verify(plain_password,hashed_password)defget_password_hash(password):returnpassword_hash.hash(password)defget_user(db,username:str):ifusernameindb:user_dict=db[username]returnUserInDB(**user_dict)defauthenticate_user(fake_db,username:str,password:str):user=get_user(fake_db,username)ifnotuser:verify_password(password,DUMMY_HASH)returnFalseifnotverify_password(password,user.hashed_password):returnFalsereturnuserdefcreate_access_token(data:dict,expires_delta:timedelta|None=None):to_encode=data.copy()ifexpires_delta:expire=datetime.now(timezone.utc)+expires_deltaelse:expire=datetime.now(timezone.utc)+timedelta(minutes=15)to_encode.update({"exp":expire})encoded_jwt=jwt.encode(to_encode,SECRET_KEY,algorithm=ALGORITHM)returnencoded_jwtasyncdefget_current_user(security_scopes:SecurityScopes,token:Annotated[str,Depends(oauth2_scheme)]):ifsecurity_scopes.scopes:authenticate_value=f'Bearer scope="{security_scopes.scope_str}"'else:authenticate_value="Bearer"credentials_exception=HTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Could not validate credentials",headers={"WWW-Authenticate":authenticate_value},)try:payload=jwt.decode(token,SECRET_KEY,algorithms=[ALGORITHM])username=payload.get("sub")ifusernameisNone:raisecredentials_exceptionscope:str=payload.get("scope","")token_scopes=scope.split(" ")token_data=TokenData(scopes=token_scopes,username=username)except(InvalidTokenError,ValidationError):raisecredentials_exceptionuser=get_user(fake_users_db,username=token_data.username)ifuserisNone:raisecredentials_exceptionforscopeinsecurity_scopes.scopes:ifscopenotintoken_data.scopes:raiseHTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Not enough permissions",headers={"WWW-Authenticate":authenticate_value},)returnuserasyncdefget_current_active_user(current_user:Annotated[User,Security(get_current_user,scopes=["me"])],):ifcurrent_user.disabled:raiseHTTPException(status_code=400,detail="Inactive user")returncurrent_user@app.post("/token")asyncdeflogin_for_access_token(form_data:Annotated[OAuth2PasswordRequestForm,Depends()],)->Token:user=authenticate_user(fake_users_db,form_data.username,form_data.password)ifnotuser:raiseHTTPException(status_code=400,detail="Incorrect username or password")access_token_expires=timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)access_token=create_access_token(data={"sub":user.username,"scope":" ".join(form_data.scopes)},expires_delta=access_token_expires,)returnToken(access_token=access_token,token_type="bearer")@app.get("/users/me/")asyncdefread_users_me(current_user:Annotated[User,Depends(get_current_active_user)],)->User:returncurrent_user@app.get("/users/me/items/")asyncdefread_own_items(current_user:Annotated[User,Security(get_current_active_user,scopes=["items"])],):return[{"item_id":"Foo","owner":current_user.username}]@app.get("/status/")asyncdefread_system_status(current_user:Annotated[User,Depends(get_current_user)]):return{"status":"ok"}🤓 Other versions and variants
Tip
Prefer to use theAnnotated version if possible.
fromdatetimeimportdatetime,timedelta,timezoneimportjwtfromfastapiimportDepends,FastAPI,HTTPException,Security,statusfromfastapi.securityimport(OAuth2PasswordBearer,OAuth2PasswordRequestForm,SecurityScopes,)fromjwt.exceptionsimportInvalidTokenErrorfrompwdlibimportPasswordHashfrompydanticimportBaseModel,ValidationError# to get a string like this run:# openssl rand -hex 32SECRET_KEY="09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"ALGORITHM="HS256"ACCESS_TOKEN_EXPIRE_MINUTES=30fake_users_db={"johndoe":{"username":"johndoe","full_name":"John Doe","email":"johndoe@example.com","hashed_password":"$argon2id$v=19$m=65536,t=3,p=4$wagCPXjifgvUFBzq4hqe3w$CYaIb8sB+wtD+Vu/P4uod1+Qof8h+1g7bbDlBID48Rc","disabled":False,},"alice":{"username":"alice","full_name":"Alice Chains","email":"alicechains@example.com","hashed_password":"$argon2id$v=19$m=65536,t=3,p=4$g2/AV1zwopqUntPKJavBFw$BwpRGDCyUHLvHICnwijyX8ROGoiUPwNKZ7915MeYfCE","disabled":True,},}classToken(BaseModel):access_token:strtoken_type:strclassTokenData(BaseModel):username:str|None=Nonescopes:list[str]=[]classUser(BaseModel):username:stremail:str|None=Nonefull_name:str|None=Nonedisabled:bool|None=NoneclassUserInDB(User):hashed_password:strpassword_hash=PasswordHash.recommended()DUMMY_HASH=password_hash.hash("dummypassword")oauth2_scheme=OAuth2PasswordBearer(tokenUrl="token",scopes={"me":"Read information about the current user.","items":"Read items."},)app=FastAPI()defverify_password(plain_password,hashed_password):returnpassword_hash.verify(plain_password,hashed_password)defget_password_hash(password):returnpassword_hash.hash(password)defget_user(db,username:str):ifusernameindb:user_dict=db[username]returnUserInDB(**user_dict)defauthenticate_user(fake_db,username:str,password:str):user=get_user(fake_db,username)ifnotuser:verify_password(password,DUMMY_HASH)returnFalseifnotverify_password(password,user.hashed_password):returnFalsereturnuserdefcreate_access_token(data:dict,expires_delta:timedelta|None=None):to_encode=data.copy()ifexpires_delta:expire=datetime.now(timezone.utc)+expires_deltaelse:expire=datetime.now(timezone.utc)+timedelta(minutes=15)to_encode.update({"exp":expire})encoded_jwt=jwt.encode(to_encode,SECRET_KEY,algorithm=ALGORITHM)returnencoded_jwtasyncdefget_current_user(security_scopes:SecurityScopes,token:str=Depends(oauth2_scheme)):ifsecurity_scopes.scopes:authenticate_value=f'Bearer scope="{security_scopes.scope_str}"'else:authenticate_value="Bearer"credentials_exception=HTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Could not validate credentials",headers={"WWW-Authenticate":authenticate_value},)try:payload=jwt.decode(token,SECRET_KEY,algorithms=[ALGORITHM])username:str=payload.get("sub")ifusernameisNone:raisecredentials_exceptionscope:str=payload.get("scope","")token_scopes=scope.split(" ")token_data=TokenData(scopes=token_scopes,username=username)except(InvalidTokenError,ValidationError):raisecredentials_exceptionuser=get_user(fake_users_db,username=token_data.username)ifuserisNone:raisecredentials_exceptionforscopeinsecurity_scopes.scopes:ifscopenotintoken_data.scopes:raiseHTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Not enough permissions",headers={"WWW-Authenticate":authenticate_value},)returnuserasyncdefget_current_active_user(current_user:User=Security(get_current_user,scopes=["me"]),):ifcurrent_user.disabled:raiseHTTPException(status_code=400,detail="Inactive user")returncurrent_user@app.post("/token")asyncdeflogin_for_access_token(form_data:OAuth2PasswordRequestForm=Depends(),)->Token:user=authenticate_user(fake_users_db,form_data.username,form_data.password)ifnotuser:raiseHTTPException(status_code=400,detail="Incorrect username or password")access_token_expires=timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)access_token=create_access_token(data={"sub":user.username,"scope":" ".join(form_data.scopes)},expires_delta=access_token_expires,)returnToken(access_token=access_token,token_type="bearer")@app.get("/users/me/")asyncdefread_users_me(current_user:User=Depends(get_current_active_user))->User:returncurrent_user@app.get("/users/me/items/")asyncdefread_own_items(current_user:User=Security(get_current_active_user,scopes=["items"]),):return[{"item_id":"Foo","owner":current_user.username}]@app.get("/status/")asyncdefread_system_status(current_user:User=Depends(get_current_user)):return{"status":"ok"}Árbol de dependencias y scopes¶
Revisemos de nuevo este árbol de dependencias y los scopes.
Como la dependenciaget_current_active_user tiene como sub-dependencia aget_current_user, el scope"me" declarado enget_current_active_user se incluirá en la lista de scopes requeridos en elsecurity_scopes.scopes pasado aget_current_user.
Lapath operation en sí también declara un scope,"items", por lo que esto también estará en la lista desecurity_scopes.scopes pasado aget_current_user.
Así es como se ve la jerarquía de dependencias y scopes:
- Lapath operation
read_own_itemstiene:- Scopes requeridos
["items"]con la dependencia: get_current_active_user:- La función de dependencia
get_current_active_usertiene:- Scopes requeridos
["me"]con la dependencia: get_current_user:- La función de dependencia
get_current_usertiene:- No requiere scopes por sí misma.
- Una dependencia usando
oauth2_scheme. - Un parámetro
security_scopesde tipoSecurityScopes:- Este parámetro
security_scopestiene una propiedadscopescon unalistque contiene todos estos scopes declarados arriba, por lo que:security_scopes.scopescontendrá["me", "items"]para lapath operationread_own_items.security_scopes.scopescontendrá["me"]para lapath operationread_users_me, porque está declarado en la dependenciaget_current_active_user.security_scopes.scopescontendrá[](nada) para lapath operationread_system_status, porque no declaró ningúnSecurityconscopes, y su dependencia,get_current_user, tampoco declara ningúnscopes.
- Este parámetro
- La función de dependencia
- Scopes requeridos
- La función de dependencia
- Scopes requeridos
Consejo
Lo importante y "mágico" aquí es queget_current_user tendrá una lista diferente descopes para verificar para cadapath operation.
Todo depende de losscopes declarados en cadapath operation y cada dependencia en el árbol de dependencias para esapath operation específica.
Más detalles sobreSecurityScopes¶
Puedes usarSecurityScopes en cualquier punto, y en múltiples lugares, no tiene que ser en la dependencia "raíz".
Siempre tendrá los scopes de seguridad declarados en las dependenciasSecurity actuales y todos los dependientes paraesa específicapath operation yese específico árbol de dependencias.
Debido a queSecurityScopes tendrá todos los scopes declarados por dependientes, puedes usarlo para verificar que un token tiene los scopes requeridos en una función de dependencia central, y luego declarar diferentes requisitos de scope en diferentespath operations.
Serán verificados independientemente para cadapath operation.
Revisa¶
Si abres la documentación de la API, puedes autenticarte y especificar qué scopes deseas autorizar.

Si no seleccionas ningún scope, estarás "autenticado", pero cuando intentes acceder a/users/me/ o/users/me/items/ obtendrás un error diciendo que no tienes suficientes permisos. Aún podrás acceder a/status/.
Y si seleccionas el scopeme pero no el scopeitems, podrás acceder a/users/me/ pero no a/users/me/items/.
Eso es lo que pasaría a una aplicación de terceros que intentara acceder a una de estaspath operations con un token proporcionado por un usuario, dependiendo de cuántos permisos el usuario otorgó a la aplicación.
Acerca de las integraciones de terceros¶
En este ejemplo estamos usando el flujo de OAuth2 "password".
Esto es apropiado cuando estamos iniciando sesión en nuestra propia aplicación, probablemente con nuestro propio frontend.
Porque podemos confiar en ella para recibir elusername ypassword, ya que la controlamos.
Pero si estás construyendo una aplicación OAuth2 a la que otros se conectarían (es decir, si estás construyendo un proveedor de autenticación equivalente a Facebook, Google, GitHub, etc.) deberías usar uno de los otros flujos.
El más común es el flujo implícito.
El más seguro es el flujo de código, pero es más complejo de implementar ya que requiere más pasos. Como es más complejo, muchos proveedores terminan sugiriendo el flujo implícito.
Nota
Es común que cada proveedor de autenticación nombre sus flujos de una manera diferente, para hacerlos parte de su marca.
Pero al final, están implementando el mismo estándar OAuth2.
FastAPI incluye utilidades para todos estos flujos de autenticación OAuth2 enfastapi.security.oauth2.
Security endependencies del decorador¶
De la misma manera que puedes definir unalist deDepends en el parámetrodependencies del decorador (como se explica enDependencias en decoradores de path operation), también podrías usarSecurity conscopes allí.







