Movatterモバイル変換


[0]ホーム

URL:


Skip to content
Join theFastAPI Cloud waiting list 🚀
Follow@fastapi onX (Twitter) to stay updated
FollowFastAPI onLinkedIn to stay updated
Subscribe to theFastAPI and friends newsletter 🎉
sponsor
sponsor
sponsor
sponsor
sponsor
sponsor
sponsor
sponsor
sponsor
sponsor
sponsor

OAuth2 scopes

You can use OAuth2 scopes directly withFastAPI, they are integrated to work seamlessly.

This would allow you to have a more fine-grained permission system, following the OAuth2 standard, integrated into your OpenAPI application (and the API docs).

OAuth2 with scopes is the mechanism used by many big authentication providers, like Facebook, Google, GitHub, Microsoft, X (Twitter), etc. They use it to provide specific permissions to users and applications.

Every time you "log in with" Facebook, Google, GitHub, Microsoft, X (Twitter), that application is using OAuth2 with scopes.

In this section you will see how to manage authentication and authorization with the same OAuth2 with scopes in yourFastAPI application.

Warning

This is a more or less advanced section. If you are just starting, you can skip it.

You don't necessarily need OAuth2 scopes, and you can handle authentication and authorization however you want.

But OAuth2 with scopes can be nicely integrated into your API (with OpenAPI) and your API docs.

Nevertheless, you still enforce those scopes, or any other security/authorization requirement, however you need, in your code.

In many cases, OAuth2 with scopes can be an overkill.

But if you know you need it, or you are curious, keep reading.

OAuth2 scopes and OpenAPI

The OAuth2 specification defines "scopes" as a list of strings separated by spaces.

The content of each of these strings can have any format, but should not contain spaces.

These scopes represent "permissions".

In OpenAPI (e.g. the API docs), you can define "security schemes".

When one of these security schemes uses OAuth2, you can also declare and use scopes.

Each "scope" is just a string (without spaces).

They are normally used to declare specific security permissions, for example:

  • users:read orusers:write are common examples.
  • instagram_basic is used by Facebook / Instagram.
  • https://www.googleapis.com/auth/drive is used by Google.

Info

In OAuth2 a "scope" is just a string that declares a specific permission required.

It doesn't matter if it has other characters like: or if it is a URL.

Those details are implementation specific.

For OAuth2 they are just strings.

Global view

First, let's quickly see the parts that change from the examples in the mainTutorial - User Guide forOAuth2 with Password (and hashing), Bearer with JWT tokens. Now using OAuth2 scopes:

fromdatetimeimportdatetime,timedelta,timezonefromtypingimportAnnotatedimportjwtfromfastapiimportDepends,FastAPI,HTTPException,Security,statusfromfastapi.securityimport(OAuth2PasswordBearer,OAuth2PasswordRequestForm,SecurityScopes,)fromjwt.exceptionsimportInvalidTokenErrorfrompwdlibimportPasswordHashfrompydanticimportBaseModel,ValidationError# to get a string like this run:# openssl rand -hex 32SECRET_KEY="09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"ALGORITHM="HS256"ACCESS_TOKEN_EXPIRE_MINUTES=30fake_users_db={"johndoe":{"username":"johndoe","full_name":"John Doe","email":"johndoe@example.com","hashed_password":"$argon2id$v=19$m=65536,t=3,p=4$wagCPXjifgvUFBzq4hqe3w$CYaIb8sB+wtD+Vu/P4uod1+Qof8h+1g7bbDlBID48Rc","disabled":False,},"alice":{"username":"alice","full_name":"Alice Chains","email":"alicechains@example.com","hashed_password":"$argon2id$v=19$m=65536,t=3,p=4$g2/AV1zwopqUntPKJavBFw$BwpRGDCyUHLvHICnwijyX8ROGoiUPwNKZ7915MeYfCE","disabled":True,},}classToken(BaseModel):access_token:strtoken_type:strclassTokenData(BaseModel):username:str|None=Nonescopes:list[str]=[]classUser(BaseModel):username:stremail:str|None=Nonefull_name:str|None=Nonedisabled:bool|None=NoneclassUserInDB(User):hashed_password:strpassword_hash=PasswordHash.recommended()DUMMY_HASH=password_hash.hash("dummypassword")oauth2_scheme=OAuth2PasswordBearer(tokenUrl="token",scopes={"me":"Read information about the current user.","items":"Read items."},)app=FastAPI()defverify_password(plain_password,hashed_password):returnpassword_hash.verify(plain_password,hashed_password)defget_password_hash(password):returnpassword_hash.hash(password)defget_user(db,username:str):ifusernameindb:user_dict=db[username]returnUserInDB(**user_dict)defauthenticate_user(fake_db,username:str,password:str):user=get_user(fake_db,username)ifnotuser:verify_password(password,DUMMY_HASH)returnFalseifnotverify_password(password,user.hashed_password):returnFalsereturnuserdefcreate_access_token(data:dict,expires_delta:timedelta|None=None):to_encode=data.copy()ifexpires_delta:expire=datetime.now(timezone.utc)+expires_deltaelse:expire=datetime.now(timezone.utc)+timedelta(minutes=15)to_encode.update({"exp":expire})encoded_jwt=jwt.encode(to_encode,SECRET_KEY,algorithm=ALGORITHM)returnencoded_jwtasyncdefget_current_user(security_scopes:SecurityScopes,token:Annotated[str,Depends(oauth2_scheme)]):ifsecurity_scopes.scopes:authenticate_value=f'Bearer scope="{security_scopes.scope_str}"'else:authenticate_value="Bearer"credentials_exception=HTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Could not validate credentials",headers={"WWW-Authenticate":authenticate_value},)try:payload=jwt.decode(token,SECRET_KEY,algorithms=[ALGORITHM])username=payload.get("sub")ifusernameisNone:raisecredentials_exceptionscope:str=payload.get("scope","")token_scopes=scope.split(" ")token_data=TokenData(scopes=token_scopes,username=username)except(InvalidTokenError,ValidationError):raisecredentials_exceptionuser=get_user(fake_users_db,username=token_data.username)ifuserisNone:raisecredentials_exceptionforscopeinsecurity_scopes.scopes:ifscopenotintoken_data.scopes:raiseHTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Not enough permissions",headers={"WWW-Authenticate":authenticate_value},)returnuserasyncdefget_current_active_user(current_user:Annotated[User,Security(get_current_user,scopes=["me"])],):ifcurrent_user.disabled:raiseHTTPException(status_code=400,detail="Inactive user")returncurrent_user@app.post("/token")asyncdeflogin_for_access_token(form_data:Annotated[OAuth2PasswordRequestForm,Depends()],)->Token:user=authenticate_user(fake_users_db,form_data.username,form_data.password)ifnotuser:raiseHTTPException(status_code=400,detail="Incorrect username or password")access_token_expires=timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)access_token=create_access_token(data={"sub":user.username,"scope":" ".join(form_data.scopes)},expires_delta=access_token_expires,)returnToken(access_token=access_token,token_type="bearer")@app.get("/users/me/")asyncdefread_users_me(current_user:Annotated[User,Depends(get_current_active_user)],)->User:returncurrent_user@app.get("/users/me/items/")asyncdefread_own_items(current_user:Annotated[User,Security(get_current_active_user,scopes=["items"])],):return[{"item_id":"Foo","owner":current_user.username}]@app.get("/status/")asyncdefread_system_status(current_user:Annotated[User,Depends(get_current_user)]):return{"status":"ok"}
🤓 Other versions and variants

Tip

Prefer to use theAnnotated version if possible.

fromdatetimeimportdatetime,timedelta,timezoneimportjwtfromfastapiimportDepends,FastAPI,HTTPException,Security,statusfromfastapi.securityimport(OAuth2PasswordBearer,OAuth2PasswordRequestForm,SecurityScopes,)fromjwt.exceptionsimportInvalidTokenErrorfrompwdlibimportPasswordHashfrompydanticimportBaseModel,ValidationError# to get a string like this run:# openssl rand -hex 32SECRET_KEY="09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"ALGORITHM="HS256"ACCESS_TOKEN_EXPIRE_MINUTES=30fake_users_db={"johndoe":{"username":"johndoe","full_name":"John Doe","email":"johndoe@example.com","hashed_password":"$argon2id$v=19$m=65536,t=3,p=4$wagCPXjifgvUFBzq4hqe3w$CYaIb8sB+wtD+Vu/P4uod1+Qof8h+1g7bbDlBID48Rc","disabled":False,},"alice":{"username":"alice","full_name":"Alice Chains","email":"alicechains@example.com","hashed_password":"$argon2id$v=19$m=65536,t=3,p=4$g2/AV1zwopqUntPKJavBFw$BwpRGDCyUHLvHICnwijyX8ROGoiUPwNKZ7915MeYfCE","disabled":True,},}classToken(BaseModel):access_token:strtoken_type:strclassTokenData(BaseModel):username:str|None=Nonescopes:list[str]=[]classUser(BaseModel):username:stremail:str|None=Nonefull_name:str|None=Nonedisabled:bool|None=NoneclassUserInDB(User):hashed_password:strpassword_hash=PasswordHash.recommended()DUMMY_HASH=password_hash.hash("dummypassword")oauth2_scheme=OAuth2PasswordBearer(tokenUrl="token",scopes={"me":"Read information about the current user.","items":"Read items."},)app=FastAPI()defverify_password(plain_password,hashed_password):returnpassword_hash.verify(plain_password,hashed_password)defget_password_hash(password):returnpassword_hash.hash(password)defget_user(db,username:str):ifusernameindb:user_dict=db[username]returnUserInDB(**user_dict)defauthenticate_user(fake_db,username:str,password:str):user=get_user(fake_db,username)ifnotuser:verify_password(password,DUMMY_HASH)returnFalseifnotverify_password(password,user.hashed_password):returnFalsereturnuserdefcreate_access_token(data:dict,expires_delta:timedelta|None=None):to_encode=data.copy()ifexpires_delta:expire=datetime.now(timezone.utc)+expires_deltaelse:expire=datetime.now(timezone.utc)+timedelta(minutes=15)to_encode.update({"exp":expire})encoded_jwt=jwt.encode(to_encode,SECRET_KEY,algorithm=ALGORITHM)returnencoded_jwtasyncdefget_current_user(security_scopes:SecurityScopes,token:str=Depends(oauth2_scheme)):ifsecurity_scopes.scopes:authenticate_value=f'Bearer scope="{security_scopes.scope_str}"'else:authenticate_value="Bearer"credentials_exception=HTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Could not validate credentials",headers={"WWW-Authenticate":authenticate_value},)try:payload=jwt.decode(token,SECRET_KEY,algorithms=[ALGORITHM])username:str=payload.get("sub")ifusernameisNone:raisecredentials_exceptionscope:str=payload.get("scope","")token_scopes=scope.split(" ")token_data=TokenData(scopes=token_scopes,username=username)except(InvalidTokenError,ValidationError):raisecredentials_exceptionuser=get_user(fake_users_db,username=token_data.username)ifuserisNone:raisecredentials_exceptionforscopeinsecurity_scopes.scopes:ifscopenotintoken_data.scopes:raiseHTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Not enough permissions",headers={"WWW-Authenticate":authenticate_value},)returnuserasyncdefget_current_active_user(current_user:User=Security(get_current_user,scopes=["me"]),):ifcurrent_user.disabled:raiseHTTPException(status_code=400,detail="Inactive user")returncurrent_user@app.post("/token")asyncdeflogin_for_access_token(form_data:OAuth2PasswordRequestForm=Depends(),)->Token:user=authenticate_user(fake_users_db,form_data.username,form_data.password)ifnotuser:raiseHTTPException(status_code=400,detail="Incorrect username or password")access_token_expires=timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)access_token=create_access_token(data={"sub":user.username,"scope":" ".join(form_data.scopes)},expires_delta=access_token_expires,)returnToken(access_token=access_token,token_type="bearer")@app.get("/users/me/")asyncdefread_users_me(current_user:User=Depends(get_current_active_user))->User:returncurrent_user@app.get("/users/me/items/")asyncdefread_own_items(current_user:User=Security(get_current_active_user,scopes=["items"]),):return[{"item_id":"Foo","owner":current_user.username}]@app.get("/status/")asyncdefread_system_status(current_user:User=Depends(get_current_user)):return{"status":"ok"}

Now let's review those changes step by step.

OAuth2 Security scheme

The first change is that now we are declaring the OAuth2 security scheme with two available scopes,me anditems.

Thescopes parameter receives adict with each scope as a key and the description as the value:

fromdatetimeimportdatetime,timedelta,timezonefromtypingimportAnnotatedimportjwtfromfastapiimportDepends,FastAPI,HTTPException,Security,statusfromfastapi.securityimport(OAuth2PasswordBearer,OAuth2PasswordRequestForm,SecurityScopes,)fromjwt.exceptionsimportInvalidTokenErrorfrompwdlibimportPasswordHashfrompydanticimportBaseModel,ValidationError# to get a string like this run:# openssl rand -hex 32SECRET_KEY="09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"ALGORITHM="HS256"ACCESS_TOKEN_EXPIRE_MINUTES=30fake_users_db={"johndoe":{"username":"johndoe","full_name":"John Doe","email":"johndoe@example.com","hashed_password":"$argon2id$v=19$m=65536,t=3,p=4$wagCPXjifgvUFBzq4hqe3w$CYaIb8sB+wtD+Vu/P4uod1+Qof8h+1g7bbDlBID48Rc","disabled":False,},"alice":{"username":"alice","full_name":"Alice Chains","email":"alicechains@example.com","hashed_password":"$argon2id$v=19$m=65536,t=3,p=4$g2/AV1zwopqUntPKJavBFw$BwpRGDCyUHLvHICnwijyX8ROGoiUPwNKZ7915MeYfCE","disabled":True,},}classToken(BaseModel):access_token:strtoken_type:strclassTokenData(BaseModel):username:str|None=Nonescopes:list[str]=[]classUser(BaseModel):username:stremail:str|None=Nonefull_name:str|None=Nonedisabled:bool|None=NoneclassUserInDB(User):hashed_password:strpassword_hash=PasswordHash.recommended()DUMMY_HASH=password_hash.hash("dummypassword")oauth2_scheme=OAuth2PasswordBearer(tokenUrl="token",scopes={"me":"Read information about the current user.","items":"Read items."},)app=FastAPI()defverify_password(plain_password,hashed_password):returnpassword_hash.verify(plain_password,hashed_password)defget_password_hash(password):returnpassword_hash.hash(password)defget_user(db,username:str):ifusernameindb:user_dict=db[username]returnUserInDB(**user_dict)defauthenticate_user(fake_db,username:str,password:str):user=get_user(fake_db,username)ifnotuser:verify_password(password,DUMMY_HASH)returnFalseifnotverify_password(password,user.hashed_password):returnFalsereturnuserdefcreate_access_token(data:dict,expires_delta:timedelta|None=None):to_encode=data.copy()ifexpires_delta:expire=datetime.now(timezone.utc)+expires_deltaelse:expire=datetime.now(timezone.utc)+timedelta(minutes=15)to_encode.update({"exp":expire})encoded_jwt=jwt.encode(to_encode,SECRET_KEY,algorithm=ALGORITHM)returnencoded_jwtasyncdefget_current_user(security_scopes:SecurityScopes,token:Annotated[str,Depends(oauth2_scheme)]):ifsecurity_scopes.scopes:authenticate_value=f'Bearer scope="{security_scopes.scope_str}"'else:authenticate_value="Bearer"credentials_exception=HTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Could not validate credentials",headers={"WWW-Authenticate":authenticate_value},)try:payload=jwt.decode(token,SECRET_KEY,algorithms=[ALGORITHM])username=payload.get("sub")ifusernameisNone:raisecredentials_exceptionscope:str=payload.get("scope","")token_scopes=scope.split(" ")token_data=TokenData(scopes=token_scopes,username=username)except(InvalidTokenError,ValidationError):raisecredentials_exceptionuser=get_user(fake_users_db,username=token_data.username)ifuserisNone:raisecredentials_exceptionforscopeinsecurity_scopes.scopes:ifscopenotintoken_data.scopes:raiseHTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Not enough permissions",headers={"WWW-Authenticate":authenticate_value},)returnuserasyncdefget_current_active_user(current_user:Annotated[User,Security(get_current_user,scopes=["me"])],):ifcurrent_user.disabled:raiseHTTPException(status_code=400,detail="Inactive user")returncurrent_user@app.post("/token")asyncdeflogin_for_access_token(form_data:Annotated[OAuth2PasswordRequestForm,Depends()],)->Token:user=authenticate_user(fake_users_db,form_data.username,form_data.password)ifnotuser:raiseHTTPException(status_code=400,detail="Incorrect username or password")access_token_expires=timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)access_token=create_access_token(data={"sub":user.username,"scope":" ".join(form_data.scopes)},expires_delta=access_token_expires,)returnToken(access_token=access_token,token_type="bearer")@app.get("/users/me/")asyncdefread_users_me(current_user:Annotated[User,Depends(get_current_active_user)],)->User:returncurrent_user@app.get("/users/me/items/")asyncdefread_own_items(current_user:Annotated[User,Security(get_current_active_user,scopes=["items"])],):return[{"item_id":"Foo","owner":current_user.username}]@app.get("/status/")asyncdefread_system_status(current_user:Annotated[User,Depends(get_current_user)]):return{"status":"ok"}
🤓 Other versions and variants

Tip

Prefer to use theAnnotated version if possible.

fromdatetimeimportdatetime,timedelta,timezoneimportjwtfromfastapiimportDepends,FastAPI,HTTPException,Security,statusfromfastapi.securityimport(OAuth2PasswordBearer,OAuth2PasswordRequestForm,SecurityScopes,)fromjwt.exceptionsimportInvalidTokenErrorfrompwdlibimportPasswordHashfrompydanticimportBaseModel,ValidationError# to get a string like this run:# openssl rand -hex 32SECRET_KEY="09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"ALGORITHM="HS256"ACCESS_TOKEN_EXPIRE_MINUTES=30fake_users_db={"johndoe":{"username":"johndoe","full_name":"John Doe","email":"johndoe@example.com","hashed_password":"$argon2id$v=19$m=65536,t=3,p=4$wagCPXjifgvUFBzq4hqe3w$CYaIb8sB+wtD+Vu/P4uod1+Qof8h+1g7bbDlBID48Rc","disabled":False,},"alice":{"username":"alice","full_name":"Alice Chains","email":"alicechains@example.com","hashed_password":"$argon2id$v=19$m=65536,t=3,p=4$g2/AV1zwopqUntPKJavBFw$BwpRGDCyUHLvHICnwijyX8ROGoiUPwNKZ7915MeYfCE","disabled":True,},}classToken(BaseModel):access_token:strtoken_type:strclassTokenData(BaseModel):username:str|None=Nonescopes:list[str]=[]classUser(BaseModel):username:stremail:str|None=Nonefull_name:str|None=Nonedisabled:bool|None=NoneclassUserInDB(User):hashed_password:strpassword_hash=PasswordHash.recommended()DUMMY_HASH=password_hash.hash("dummypassword")oauth2_scheme=OAuth2PasswordBearer(tokenUrl="token",scopes={"me":"Read information about the current user.","items":"Read items."},)app=FastAPI()defverify_password(plain_password,hashed_password):returnpassword_hash.verify(plain_password,hashed_password)defget_password_hash(password):returnpassword_hash.hash(password)defget_user(db,username:str):ifusernameindb:user_dict=db[username]returnUserInDB(**user_dict)defauthenticate_user(fake_db,username:str,password:str):user=get_user(fake_db,username)ifnotuser:verify_password(password,DUMMY_HASH)returnFalseifnotverify_password(password,user.hashed_password):returnFalsereturnuserdefcreate_access_token(data:dict,expires_delta:timedelta|None=None):to_encode=data.copy()ifexpires_delta:expire=datetime.now(timezone.utc)+expires_deltaelse:expire=datetime.now(timezone.utc)+timedelta(minutes=15)to_encode.update({"exp":expire})encoded_jwt=jwt.encode(to_encode,SECRET_KEY,algorithm=ALGORITHM)returnencoded_jwtasyncdefget_current_user(security_scopes:SecurityScopes,token:str=Depends(oauth2_scheme)):ifsecurity_scopes.scopes:authenticate_value=f'Bearer scope="{security_scopes.scope_str}"'else:authenticate_value="Bearer"credentials_exception=HTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Could not validate credentials",headers={"WWW-Authenticate":authenticate_value},)try:payload=jwt.decode(token,SECRET_KEY,algorithms=[ALGORITHM])username:str=payload.get("sub")ifusernameisNone:raisecredentials_exceptionscope:str=payload.get("scope","")token_scopes=scope.split(" ")token_data=TokenData(scopes=token_scopes,username=username)except(InvalidTokenError,ValidationError):raisecredentials_exceptionuser=get_user(fake_users_db,username=token_data.username)ifuserisNone:raisecredentials_exceptionforscopeinsecurity_scopes.scopes:ifscopenotintoken_data.scopes:raiseHTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Not enough permissions",headers={"WWW-Authenticate":authenticate_value},)returnuserasyncdefget_current_active_user(current_user:User=Security(get_current_user,scopes=["me"]),):ifcurrent_user.disabled:raiseHTTPException(status_code=400,detail="Inactive user")returncurrent_user@app.post("/token")asyncdeflogin_for_access_token(form_data:OAuth2PasswordRequestForm=Depends(),)->Token:user=authenticate_user(fake_users_db,form_data.username,form_data.password)ifnotuser:raiseHTTPException(status_code=400,detail="Incorrect username or password")access_token_expires=timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)access_token=create_access_token(data={"sub":user.username,"scope":" ".join(form_data.scopes)},expires_delta=access_token_expires,)returnToken(access_token=access_token,token_type="bearer")@app.get("/users/me/")asyncdefread_users_me(current_user:User=Depends(get_current_active_user))->User:returncurrent_user@app.get("/users/me/items/")asyncdefread_own_items(current_user:User=Security(get_current_active_user,scopes=["items"]),):return[{"item_id":"Foo","owner":current_user.username}]@app.get("/status/")asyncdefread_system_status(current_user:User=Depends(get_current_user)):return{"status":"ok"}

Because we are now declaring those scopes, they will show up in the API docs when you log-in/authorize.

And you will be able to select which scopes you want to give access to:me anditems.

This is the same mechanism used when you give permissions while logging in with Facebook, Google, GitHub, etc:

JWT token with scopes

Now, modify the tokenpath operation to return the scopes requested.

We are still using the sameOAuth2PasswordRequestForm. It includes a propertyscopes with alist ofstr, with each scope it received in the request.

And we return the scopes as part of the JWT token.

Danger

For simplicity, here we are just adding the scopes received directly to the token.

But in your application, for security, you should make sure you only add the scopes that the user is actually able to have, or the ones you have predefined.

fromdatetimeimportdatetime,timedelta,timezonefromtypingimportAnnotatedimportjwtfromfastapiimportDepends,FastAPI,HTTPException,Security,statusfromfastapi.securityimport(OAuth2PasswordBearer,OAuth2PasswordRequestForm,SecurityScopes,)fromjwt.exceptionsimportInvalidTokenErrorfrompwdlibimportPasswordHashfrompydanticimportBaseModel,ValidationError# to get a string like this run:# openssl rand -hex 32SECRET_KEY="09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"ALGORITHM="HS256"ACCESS_TOKEN_EXPIRE_MINUTES=30fake_users_db={"johndoe":{"username":"johndoe","full_name":"John Doe","email":"johndoe@example.com","hashed_password":"$argon2id$v=19$m=65536,t=3,p=4$wagCPXjifgvUFBzq4hqe3w$CYaIb8sB+wtD+Vu/P4uod1+Qof8h+1g7bbDlBID48Rc","disabled":False,},"alice":{"username":"alice","full_name":"Alice Chains","email":"alicechains@example.com","hashed_password":"$argon2id$v=19$m=65536,t=3,p=4$g2/AV1zwopqUntPKJavBFw$BwpRGDCyUHLvHICnwijyX8ROGoiUPwNKZ7915MeYfCE","disabled":True,},}classToken(BaseModel):access_token:strtoken_type:strclassTokenData(BaseModel):username:str|None=Nonescopes:list[str]=[]classUser(BaseModel):username:stremail:str|None=Nonefull_name:str|None=Nonedisabled:bool|None=NoneclassUserInDB(User):hashed_password:strpassword_hash=PasswordHash.recommended()DUMMY_HASH=password_hash.hash("dummypassword")oauth2_scheme=OAuth2PasswordBearer(tokenUrl="token",scopes={"me":"Read information about the current user.","items":"Read items."},)app=FastAPI()defverify_password(plain_password,hashed_password):returnpassword_hash.verify(plain_password,hashed_password)defget_password_hash(password):returnpassword_hash.hash(password)defget_user(db,username:str):ifusernameindb:user_dict=db[username]returnUserInDB(**user_dict)defauthenticate_user(fake_db,username:str,password:str):user=get_user(fake_db,username)ifnotuser:verify_password(password,DUMMY_HASH)returnFalseifnotverify_password(password,user.hashed_password):returnFalsereturnuserdefcreate_access_token(data:dict,expires_delta:timedelta|None=None):to_encode=data.copy()ifexpires_delta:expire=datetime.now(timezone.utc)+expires_deltaelse:expire=datetime.now(timezone.utc)+timedelta(minutes=15)to_encode.update({"exp":expire})encoded_jwt=jwt.encode(to_encode,SECRET_KEY,algorithm=ALGORITHM)returnencoded_jwtasyncdefget_current_user(security_scopes:SecurityScopes,token:Annotated[str,Depends(oauth2_scheme)]):ifsecurity_scopes.scopes:authenticate_value=f'Bearer scope="{security_scopes.scope_str}"'else:authenticate_value="Bearer"credentials_exception=HTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Could not validate credentials",headers={"WWW-Authenticate":authenticate_value},)try:payload=jwt.decode(token,SECRET_KEY,algorithms=[ALGORITHM])username=payload.get("sub")ifusernameisNone:raisecredentials_exceptionscope:str=payload.get("scope","")token_scopes=scope.split(" ")token_data=TokenData(scopes=token_scopes,username=username)except(InvalidTokenError,ValidationError):raisecredentials_exceptionuser=get_user(fake_users_db,username=token_data.username)ifuserisNone:raisecredentials_exceptionforscopeinsecurity_scopes.scopes:ifscopenotintoken_data.scopes:raiseHTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Not enough permissions",headers={"WWW-Authenticate":authenticate_value},)returnuserasyncdefget_current_active_user(current_user:Annotated[User,Security(get_current_user,scopes=["me"])],):ifcurrent_user.disabled:raiseHTTPException(status_code=400,detail="Inactive user")returncurrent_user@app.post("/token")asyncdeflogin_for_access_token(form_data:Annotated[OAuth2PasswordRequestForm,Depends()],)->Token:user=authenticate_user(fake_users_db,form_data.username,form_data.password)ifnotuser:raiseHTTPException(status_code=400,detail="Incorrect username or password")access_token_expires=timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)access_token=create_access_token(data={"sub":user.username,"scope":" ".join(form_data.scopes)},expires_delta=access_token_expires,)returnToken(access_token=access_token,token_type="bearer")@app.get("/users/me/")asyncdefread_users_me(current_user:Annotated[User,Depends(get_current_active_user)],)->User:returncurrent_user@app.get("/users/me/items/")asyncdefread_own_items(current_user:Annotated[User,Security(get_current_active_user,scopes=["items"])],):return[{"item_id":"Foo","owner":current_user.username}]@app.get("/status/")asyncdefread_system_status(current_user:Annotated[User,Depends(get_current_user)]):return{"status":"ok"}
🤓 Other versions and variants

Tip

Prefer to use theAnnotated version if possible.

fromdatetimeimportdatetime,timedelta,timezoneimportjwtfromfastapiimportDepends,FastAPI,HTTPException,Security,statusfromfastapi.securityimport(OAuth2PasswordBearer,OAuth2PasswordRequestForm,SecurityScopes,)fromjwt.exceptionsimportInvalidTokenErrorfrompwdlibimportPasswordHashfrompydanticimportBaseModel,ValidationError# to get a string like this run:# openssl rand -hex 32SECRET_KEY="09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"ALGORITHM="HS256"ACCESS_TOKEN_EXPIRE_MINUTES=30fake_users_db={"johndoe":{"username":"johndoe","full_name":"John Doe","email":"johndoe@example.com","hashed_password":"$argon2id$v=19$m=65536,t=3,p=4$wagCPXjifgvUFBzq4hqe3w$CYaIb8sB+wtD+Vu/P4uod1+Qof8h+1g7bbDlBID48Rc","disabled":False,},"alice":{"username":"alice","full_name":"Alice Chains","email":"alicechains@example.com","hashed_password":"$argon2id$v=19$m=65536,t=3,p=4$g2/AV1zwopqUntPKJavBFw$BwpRGDCyUHLvHICnwijyX8ROGoiUPwNKZ7915MeYfCE","disabled":True,},}classToken(BaseModel):access_token:strtoken_type:strclassTokenData(BaseModel):username:str|None=Nonescopes:list[str]=[]classUser(BaseModel):username:stremail:str|None=Nonefull_name:str|None=Nonedisabled:bool|None=NoneclassUserInDB(User):hashed_password:strpassword_hash=PasswordHash.recommended()DUMMY_HASH=password_hash.hash("dummypassword")oauth2_scheme=OAuth2PasswordBearer(tokenUrl="token",scopes={"me":"Read information about the current user.","items":"Read items."},)app=FastAPI()defverify_password(plain_password,hashed_password):returnpassword_hash.verify(plain_password,hashed_password)defget_password_hash(password):returnpassword_hash.hash(password)defget_user(db,username:str):ifusernameindb:user_dict=db[username]returnUserInDB(**user_dict)defauthenticate_user(fake_db,username:str,password:str):user=get_user(fake_db,username)ifnotuser:verify_password(password,DUMMY_HASH)returnFalseifnotverify_password(password,user.hashed_password):returnFalsereturnuserdefcreate_access_token(data:dict,expires_delta:timedelta|None=None):to_encode=data.copy()ifexpires_delta:expire=datetime.now(timezone.utc)+expires_deltaelse:expire=datetime.now(timezone.utc)+timedelta(minutes=15)to_encode.update({"exp":expire})encoded_jwt=jwt.encode(to_encode,SECRET_KEY,algorithm=ALGORITHM)returnencoded_jwtasyncdefget_current_user(security_scopes:SecurityScopes,token:str=Depends(oauth2_scheme)):ifsecurity_scopes.scopes:authenticate_value=f'Bearer scope="{security_scopes.scope_str}"'else:authenticate_value="Bearer"credentials_exception=HTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Could not validate credentials",headers={"WWW-Authenticate":authenticate_value},)try:payload=jwt.decode(token,SECRET_KEY,algorithms=[ALGORITHM])username:str=payload.get("sub")ifusernameisNone:raisecredentials_exceptionscope:str=payload.get("scope","")token_scopes=scope.split(" ")token_data=TokenData(scopes=token_scopes,username=username)except(InvalidTokenError,ValidationError):raisecredentials_exceptionuser=get_user(fake_users_db,username=token_data.username)ifuserisNone:raisecredentials_exceptionforscopeinsecurity_scopes.scopes:ifscopenotintoken_data.scopes:raiseHTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Not enough permissions",headers={"WWW-Authenticate":authenticate_value},)returnuserasyncdefget_current_active_user(current_user:User=Security(get_current_user,scopes=["me"]),):ifcurrent_user.disabled:raiseHTTPException(status_code=400,detail="Inactive user")returncurrent_user@app.post("/token")asyncdeflogin_for_access_token(form_data:OAuth2PasswordRequestForm=Depends(),)->Token:user=authenticate_user(fake_users_db,form_data.username,form_data.password)ifnotuser:raiseHTTPException(status_code=400,detail="Incorrect username or password")access_token_expires=timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)access_token=create_access_token(data={"sub":user.username,"scope":" ".join(form_data.scopes)},expires_delta=access_token_expires,)returnToken(access_token=access_token,token_type="bearer")@app.get("/users/me/")asyncdefread_users_me(current_user:User=Depends(get_current_active_user))->User:returncurrent_user@app.get("/users/me/items/")asyncdefread_own_items(current_user:User=Security(get_current_active_user,scopes=["items"]),):return[{"item_id":"Foo","owner":current_user.username}]@app.get("/status/")asyncdefread_system_status(current_user:User=Depends(get_current_user)):return{"status":"ok"}

Declare scopes inpath operations and dependencies

Now we declare that thepath operation for/users/me/items/ requires the scopeitems.

For this, we import and useSecurity fromfastapi.

You can useSecurity to declare dependencies (just likeDepends), butSecurity also receives a parameterscopes with a list of scopes (strings).

In this case, we pass a dependency functionget_current_active_user toSecurity (the same way we would do withDepends).

But we also pass alist of scopes, in this case with just one scope:items (it could have more).

And the dependency functionget_current_active_user can also declare sub-dependencies, not only withDepends but also withSecurity. Declaring its own sub-dependency function (get_current_user), and more scope requirements.

In this case, it requires the scopeme (it could require more than one scope).

Note

You don't necessarily need to add different scopes in different places.

We are doing it here to demonstrate howFastAPI handles scopes declared at different levels.

fromdatetimeimportdatetime,timedelta,timezonefromtypingimportAnnotatedimportjwtfromfastapiimportDepends,FastAPI,HTTPException,Security,statusfromfastapi.securityimport(OAuth2PasswordBearer,OAuth2PasswordRequestForm,SecurityScopes,)fromjwt.exceptionsimportInvalidTokenErrorfrompwdlibimportPasswordHashfrompydanticimportBaseModel,ValidationError# to get a string like this run:# openssl rand -hex 32SECRET_KEY="09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"ALGORITHM="HS256"ACCESS_TOKEN_EXPIRE_MINUTES=30fake_users_db={"johndoe":{"username":"johndoe","full_name":"John Doe","email":"johndoe@example.com","hashed_password":"$argon2id$v=19$m=65536,t=3,p=4$wagCPXjifgvUFBzq4hqe3w$CYaIb8sB+wtD+Vu/P4uod1+Qof8h+1g7bbDlBID48Rc","disabled":False,},"alice":{"username":"alice","full_name":"Alice Chains","email":"alicechains@example.com","hashed_password":"$argon2id$v=19$m=65536,t=3,p=4$g2/AV1zwopqUntPKJavBFw$BwpRGDCyUHLvHICnwijyX8ROGoiUPwNKZ7915MeYfCE","disabled":True,},}classToken(BaseModel):access_token:strtoken_type:strclassTokenData(BaseModel):username:str|None=Nonescopes:list[str]=[]classUser(BaseModel):username:stremail:str|None=Nonefull_name:str|None=Nonedisabled:bool|None=NoneclassUserInDB(User):hashed_password:strpassword_hash=PasswordHash.recommended()DUMMY_HASH=password_hash.hash("dummypassword")oauth2_scheme=OAuth2PasswordBearer(tokenUrl="token",scopes={"me":"Read information about the current user.","items":"Read items."},)app=FastAPI()defverify_password(plain_password,hashed_password):returnpassword_hash.verify(plain_password,hashed_password)defget_password_hash(password):returnpassword_hash.hash(password)defget_user(db,username:str):ifusernameindb:user_dict=db[username]returnUserInDB(**user_dict)defauthenticate_user(fake_db,username:str,password:str):user=get_user(fake_db,username)ifnotuser:verify_password(password,DUMMY_HASH)returnFalseifnotverify_password(password,user.hashed_password):returnFalsereturnuserdefcreate_access_token(data:dict,expires_delta:timedelta|None=None):to_encode=data.copy()ifexpires_delta:expire=datetime.now(timezone.utc)+expires_deltaelse:expire=datetime.now(timezone.utc)+timedelta(minutes=15)to_encode.update({"exp":expire})encoded_jwt=jwt.encode(to_encode,SECRET_KEY,algorithm=ALGORITHM)returnencoded_jwtasyncdefget_current_user(security_scopes:SecurityScopes,token:Annotated[str,Depends(oauth2_scheme)]):ifsecurity_scopes.scopes:authenticate_value=f'Bearer scope="{security_scopes.scope_str}"'else:authenticate_value="Bearer"credentials_exception=HTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Could not validate credentials",headers={"WWW-Authenticate":authenticate_value},)try:payload=jwt.decode(token,SECRET_KEY,algorithms=[ALGORITHM])username=payload.get("sub")ifusernameisNone:raisecredentials_exceptionscope:str=payload.get("scope","")token_scopes=scope.split(" ")token_data=TokenData(scopes=token_scopes,username=username)except(InvalidTokenError,ValidationError):raisecredentials_exceptionuser=get_user(fake_users_db,username=token_data.username)ifuserisNone:raisecredentials_exceptionforscopeinsecurity_scopes.scopes:ifscopenotintoken_data.scopes:raiseHTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Not enough permissions",headers={"WWW-Authenticate":authenticate_value},)returnuserasyncdefget_current_active_user(current_user:Annotated[User,Security(get_current_user,scopes=["me"])],):ifcurrent_user.disabled:raiseHTTPException(status_code=400,detail="Inactive user")returncurrent_user@app.post("/token")asyncdeflogin_for_access_token(form_data:Annotated[OAuth2PasswordRequestForm,Depends()],)->Token:user=authenticate_user(fake_users_db,form_data.username,form_data.password)ifnotuser:raiseHTTPException(status_code=400,detail="Incorrect username or password")access_token_expires=timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)access_token=create_access_token(data={"sub":user.username,"scope":" ".join(form_data.scopes)},expires_delta=access_token_expires,)returnToken(access_token=access_token,token_type="bearer")@app.get("/users/me/")asyncdefread_users_me(current_user:Annotated[User,Depends(get_current_active_user)],)->User:returncurrent_user@app.get("/users/me/items/")asyncdefread_own_items(current_user:Annotated[User,Security(get_current_active_user,scopes=["items"])],):return[{"item_id":"Foo","owner":current_user.username}]@app.get("/status/")asyncdefread_system_status(current_user:Annotated[User,Depends(get_current_user)]):return{"status":"ok"}
🤓 Other versions and variants

Tip

Prefer to use theAnnotated version if possible.

fromdatetimeimportdatetime,timedelta,timezoneimportjwtfromfastapiimportDepends,FastAPI,HTTPException,Security,statusfromfastapi.securityimport(OAuth2PasswordBearer,OAuth2PasswordRequestForm,SecurityScopes,)fromjwt.exceptionsimportInvalidTokenErrorfrompwdlibimportPasswordHashfrompydanticimportBaseModel,ValidationError# to get a string like this run:# openssl rand -hex 32SECRET_KEY="09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"ALGORITHM="HS256"ACCESS_TOKEN_EXPIRE_MINUTES=30fake_users_db={"johndoe":{"username":"johndoe","full_name":"John Doe","email":"johndoe@example.com","hashed_password":"$argon2id$v=19$m=65536,t=3,p=4$wagCPXjifgvUFBzq4hqe3w$CYaIb8sB+wtD+Vu/P4uod1+Qof8h+1g7bbDlBID48Rc","disabled":False,},"alice":{"username":"alice","full_name":"Alice Chains","email":"alicechains@example.com","hashed_password":"$argon2id$v=19$m=65536,t=3,p=4$g2/AV1zwopqUntPKJavBFw$BwpRGDCyUHLvHICnwijyX8ROGoiUPwNKZ7915MeYfCE","disabled":True,},}classToken(BaseModel):access_token:strtoken_type:strclassTokenData(BaseModel):username:str|None=Nonescopes:list[str]=[]classUser(BaseModel):username:stremail:str|None=Nonefull_name:str|None=Nonedisabled:bool|None=NoneclassUserInDB(User):hashed_password:strpassword_hash=PasswordHash.recommended()DUMMY_HASH=password_hash.hash("dummypassword")oauth2_scheme=OAuth2PasswordBearer(tokenUrl="token",scopes={"me":"Read information about the current user.","items":"Read items."},)app=FastAPI()defverify_password(plain_password,hashed_password):returnpassword_hash.verify(plain_password,hashed_password)defget_password_hash(password):returnpassword_hash.hash(password)defget_user(db,username:str):ifusernameindb:user_dict=db[username]returnUserInDB(**user_dict)defauthenticate_user(fake_db,username:str,password:str):user=get_user(fake_db,username)ifnotuser:verify_password(password,DUMMY_HASH)returnFalseifnotverify_password(password,user.hashed_password):returnFalsereturnuserdefcreate_access_token(data:dict,expires_delta:timedelta|None=None):to_encode=data.copy()ifexpires_delta:expire=datetime.now(timezone.utc)+expires_deltaelse:expire=datetime.now(timezone.utc)+timedelta(minutes=15)to_encode.update({"exp":expire})encoded_jwt=jwt.encode(to_encode,SECRET_KEY,algorithm=ALGORITHM)returnencoded_jwtasyncdefget_current_user(security_scopes:SecurityScopes,token:str=Depends(oauth2_scheme)):ifsecurity_scopes.scopes:authenticate_value=f'Bearer scope="{security_scopes.scope_str}"'else:authenticate_value="Bearer"credentials_exception=HTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Could not validate credentials",headers={"WWW-Authenticate":authenticate_value},)try:payload=jwt.decode(token,SECRET_KEY,algorithms=[ALGORITHM])username:str=payload.get("sub")ifusernameisNone:raisecredentials_exceptionscope:str=payload.get("scope","")token_scopes=scope.split(" ")token_data=TokenData(scopes=token_scopes,username=username)except(InvalidTokenError,ValidationError):raisecredentials_exceptionuser=get_user(fake_users_db,username=token_data.username)ifuserisNone:raisecredentials_exceptionforscopeinsecurity_scopes.scopes:ifscopenotintoken_data.scopes:raiseHTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Not enough permissions",headers={"WWW-Authenticate":authenticate_value},)returnuserasyncdefget_current_active_user(current_user:User=Security(get_current_user,scopes=["me"]),):ifcurrent_user.disabled:raiseHTTPException(status_code=400,detail="Inactive user")returncurrent_user@app.post("/token")asyncdeflogin_for_access_token(form_data:OAuth2PasswordRequestForm=Depends(),)->Token:user=authenticate_user(fake_users_db,form_data.username,form_data.password)ifnotuser:raiseHTTPException(status_code=400,detail="Incorrect username or password")access_token_expires=timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)access_token=create_access_token(data={"sub":user.username,"scope":" ".join(form_data.scopes)},expires_delta=access_token_expires,)returnToken(access_token=access_token,token_type="bearer")@app.get("/users/me/")asyncdefread_users_me(current_user:User=Depends(get_current_active_user))->User:returncurrent_user@app.get("/users/me/items/")asyncdefread_own_items(current_user:User=Security(get_current_active_user,scopes=["items"]),):return[{"item_id":"Foo","owner":current_user.username}]@app.get("/status/")asyncdefread_system_status(current_user:User=Depends(get_current_user)):return{"status":"ok"}

Technical Details

Security is actually a subclass ofDepends, and it has just one extra parameter that we'll see later.

But by usingSecurity instead ofDepends,FastAPI will know that it can declare security scopes, use them internally, and document the API with OpenAPI.

But when you importQuery,Path,Depends,Security and others fromfastapi, those are actually functions that return special classes.

UseSecurityScopes

Now update the dependencyget_current_user.

This is the one used by the dependencies above.

Here's where we are using the same OAuth2 scheme we created before, declaring it as a dependency:oauth2_scheme.

Because this dependency function doesn't have any scope requirements itself, we can useDepends withoauth2_scheme, we don't have to useSecurity when we don't need to specify security scopes.

We also declare a special parameter of typeSecurityScopes, imported fromfastapi.security.

ThisSecurityScopes class is similar toRequest (Request was used to get the request object directly).

fromdatetimeimportdatetime,timedelta,timezonefromtypingimportAnnotatedimportjwtfromfastapiimportDepends,FastAPI,HTTPException,Security,statusfromfastapi.securityimport(OAuth2PasswordBearer,OAuth2PasswordRequestForm,SecurityScopes,)fromjwt.exceptionsimportInvalidTokenErrorfrompwdlibimportPasswordHashfrompydanticimportBaseModel,ValidationError# to get a string like this run:# openssl rand -hex 32SECRET_KEY="09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"ALGORITHM="HS256"ACCESS_TOKEN_EXPIRE_MINUTES=30fake_users_db={"johndoe":{"username":"johndoe","full_name":"John Doe","email":"johndoe@example.com","hashed_password":"$argon2id$v=19$m=65536,t=3,p=4$wagCPXjifgvUFBzq4hqe3w$CYaIb8sB+wtD+Vu/P4uod1+Qof8h+1g7bbDlBID48Rc","disabled":False,},"alice":{"username":"alice","full_name":"Alice Chains","email":"alicechains@example.com","hashed_password":"$argon2id$v=19$m=65536,t=3,p=4$g2/AV1zwopqUntPKJavBFw$BwpRGDCyUHLvHICnwijyX8ROGoiUPwNKZ7915MeYfCE","disabled":True,},}classToken(BaseModel):access_token:strtoken_type:strclassTokenData(BaseModel):username:str|None=Nonescopes:list[str]=[]classUser(BaseModel):username:stremail:str|None=Nonefull_name:str|None=Nonedisabled:bool|None=NoneclassUserInDB(User):hashed_password:strpassword_hash=PasswordHash.recommended()DUMMY_HASH=password_hash.hash("dummypassword")oauth2_scheme=OAuth2PasswordBearer(tokenUrl="token",scopes={"me":"Read information about the current user.","items":"Read items."},)app=FastAPI()defverify_password(plain_password,hashed_password):returnpassword_hash.verify(plain_password,hashed_password)defget_password_hash(password):returnpassword_hash.hash(password)defget_user(db,username:str):ifusernameindb:user_dict=db[username]returnUserInDB(**user_dict)defauthenticate_user(fake_db,username:str,password:str):user=get_user(fake_db,username)ifnotuser:verify_password(password,DUMMY_HASH)returnFalseifnotverify_password(password,user.hashed_password):returnFalsereturnuserdefcreate_access_token(data:dict,expires_delta:timedelta|None=None):to_encode=data.copy()ifexpires_delta:expire=datetime.now(timezone.utc)+expires_deltaelse:expire=datetime.now(timezone.utc)+timedelta(minutes=15)to_encode.update({"exp":expire})encoded_jwt=jwt.encode(to_encode,SECRET_KEY,algorithm=ALGORITHM)returnencoded_jwtasyncdefget_current_user(security_scopes:SecurityScopes,token:Annotated[str,Depends(oauth2_scheme)]):ifsecurity_scopes.scopes:authenticate_value=f'Bearer scope="{security_scopes.scope_str}"'else:authenticate_value="Bearer"credentials_exception=HTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Could not validate credentials",headers={"WWW-Authenticate":authenticate_value},)try:payload=jwt.decode(token,SECRET_KEY,algorithms=[ALGORITHM])username=payload.get("sub")ifusernameisNone:raisecredentials_exceptionscope:str=payload.get("scope","")token_scopes=scope.split(" ")token_data=TokenData(scopes=token_scopes,username=username)except(InvalidTokenError,ValidationError):raisecredentials_exceptionuser=get_user(fake_users_db,username=token_data.username)ifuserisNone:raisecredentials_exceptionforscopeinsecurity_scopes.scopes:ifscopenotintoken_data.scopes:raiseHTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Not enough permissions",headers={"WWW-Authenticate":authenticate_value},)returnuserasyncdefget_current_active_user(current_user:Annotated[User,Security(get_current_user,scopes=["me"])],):ifcurrent_user.disabled:raiseHTTPException(status_code=400,detail="Inactive user")returncurrent_user@app.post("/token")asyncdeflogin_for_access_token(form_data:Annotated[OAuth2PasswordRequestForm,Depends()],)->Token:user=authenticate_user(fake_users_db,form_data.username,form_data.password)ifnotuser:raiseHTTPException(status_code=400,detail="Incorrect username or password")access_token_expires=timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)access_token=create_access_token(data={"sub":user.username,"scope":" ".join(form_data.scopes)},expires_delta=access_token_expires,)returnToken(access_token=access_token,token_type="bearer")@app.get("/users/me/")asyncdefread_users_me(current_user:Annotated[User,Depends(get_current_active_user)],)->User:returncurrent_user@app.get("/users/me/items/")asyncdefread_own_items(current_user:Annotated[User,Security(get_current_active_user,scopes=["items"])],):return[{"item_id":"Foo","owner":current_user.username}]@app.get("/status/")asyncdefread_system_status(current_user:Annotated[User,Depends(get_current_user)]):return{"status":"ok"}
🤓 Other versions and variants

Tip

Prefer to use theAnnotated version if possible.

fromdatetimeimportdatetime,timedelta,timezoneimportjwtfromfastapiimportDepends,FastAPI,HTTPException,Security,statusfromfastapi.securityimport(OAuth2PasswordBearer,OAuth2PasswordRequestForm,SecurityScopes,)fromjwt.exceptionsimportInvalidTokenErrorfrompwdlibimportPasswordHashfrompydanticimportBaseModel,ValidationError# to get a string like this run:# openssl rand -hex 32SECRET_KEY="09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"ALGORITHM="HS256"ACCESS_TOKEN_EXPIRE_MINUTES=30fake_users_db={"johndoe":{"username":"johndoe","full_name":"John Doe","email":"johndoe@example.com","hashed_password":"$argon2id$v=19$m=65536,t=3,p=4$wagCPXjifgvUFBzq4hqe3w$CYaIb8sB+wtD+Vu/P4uod1+Qof8h+1g7bbDlBID48Rc","disabled":False,},"alice":{"username":"alice","full_name":"Alice Chains","email":"alicechains@example.com","hashed_password":"$argon2id$v=19$m=65536,t=3,p=4$g2/AV1zwopqUntPKJavBFw$BwpRGDCyUHLvHICnwijyX8ROGoiUPwNKZ7915MeYfCE","disabled":True,},}classToken(BaseModel):access_token:strtoken_type:strclassTokenData(BaseModel):username:str|None=Nonescopes:list[str]=[]classUser(BaseModel):username:stremail:str|None=Nonefull_name:str|None=Nonedisabled:bool|None=NoneclassUserInDB(User):hashed_password:strpassword_hash=PasswordHash.recommended()DUMMY_HASH=password_hash.hash("dummypassword")oauth2_scheme=OAuth2PasswordBearer(tokenUrl="token",scopes={"me":"Read information about the current user.","items":"Read items."},)app=FastAPI()defverify_password(plain_password,hashed_password):returnpassword_hash.verify(plain_password,hashed_password)defget_password_hash(password):returnpassword_hash.hash(password)defget_user(db,username:str):ifusernameindb:user_dict=db[username]returnUserInDB(**user_dict)defauthenticate_user(fake_db,username:str,password:str):user=get_user(fake_db,username)ifnotuser:verify_password(password,DUMMY_HASH)returnFalseifnotverify_password(password,user.hashed_password):returnFalsereturnuserdefcreate_access_token(data:dict,expires_delta:timedelta|None=None):to_encode=data.copy()ifexpires_delta:expire=datetime.now(timezone.utc)+expires_deltaelse:expire=datetime.now(timezone.utc)+timedelta(minutes=15)to_encode.update({"exp":expire})encoded_jwt=jwt.encode(to_encode,SECRET_KEY,algorithm=ALGORITHM)returnencoded_jwtasyncdefget_current_user(security_scopes:SecurityScopes,token:str=Depends(oauth2_scheme)):ifsecurity_scopes.scopes:authenticate_value=f'Bearer scope="{security_scopes.scope_str}"'else:authenticate_value="Bearer"credentials_exception=HTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Could not validate credentials",headers={"WWW-Authenticate":authenticate_value},)try:payload=jwt.decode(token,SECRET_KEY,algorithms=[ALGORITHM])username:str=payload.get("sub")ifusernameisNone:raisecredentials_exceptionscope:str=payload.get("scope","")token_scopes=scope.split(" ")token_data=TokenData(scopes=token_scopes,username=username)except(InvalidTokenError,ValidationError):raisecredentials_exceptionuser=get_user(fake_users_db,username=token_data.username)ifuserisNone:raisecredentials_exceptionforscopeinsecurity_scopes.scopes:ifscopenotintoken_data.scopes:raiseHTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Not enough permissions",headers={"WWW-Authenticate":authenticate_value},)returnuserasyncdefget_current_active_user(current_user:User=Security(get_current_user,scopes=["me"]),):ifcurrent_user.disabled:raiseHTTPException(status_code=400,detail="Inactive user")returncurrent_user@app.post("/token")asyncdeflogin_for_access_token(form_data:OAuth2PasswordRequestForm=Depends(),)->Token:user=authenticate_user(fake_users_db,form_data.username,form_data.password)ifnotuser:raiseHTTPException(status_code=400,detail="Incorrect username or password")access_token_expires=timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)access_token=create_access_token(data={"sub":user.username,"scope":" ".join(form_data.scopes)},expires_delta=access_token_expires,)returnToken(access_token=access_token,token_type="bearer")@app.get("/users/me/")asyncdefread_users_me(current_user:User=Depends(get_current_active_user))->User:returncurrent_user@app.get("/users/me/items/")asyncdefread_own_items(current_user:User=Security(get_current_active_user,scopes=["items"]),):return[{"item_id":"Foo","owner":current_user.username}]@app.get("/status/")asyncdefread_system_status(current_user:User=Depends(get_current_user)):return{"status":"ok"}

Use thescopes

The parametersecurity_scopes will be of typeSecurityScopes.

It will have a propertyscopes with a list containing all the scopes required by itself and all the dependencies that use this as a sub-dependency. That means, all the "dependants"... this might sound confusing, it is explained again later below.

Thesecurity_scopes object (of classSecurityScopes) also provides ascope_str attribute with a single string, containing those scopes separated by spaces (we are going to use it).

We create anHTTPException that we can reuse (raise) later at several points.

In this exception, we include the scopes required (if any) as a string separated by spaces (usingscope_str). We put that string containing the scopes in theWWW-Authenticate header (this is part of the spec).

fromdatetimeimportdatetime,timedelta,timezonefromtypingimportAnnotatedimportjwtfromfastapiimportDepends,FastAPI,HTTPException,Security,statusfromfastapi.securityimport(OAuth2PasswordBearer,OAuth2PasswordRequestForm,SecurityScopes,)fromjwt.exceptionsimportInvalidTokenErrorfrompwdlibimportPasswordHashfrompydanticimportBaseModel,ValidationError# to get a string like this run:# openssl rand -hex 32SECRET_KEY="09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"ALGORITHM="HS256"ACCESS_TOKEN_EXPIRE_MINUTES=30fake_users_db={"johndoe":{"username":"johndoe","full_name":"John Doe","email":"johndoe@example.com","hashed_password":"$argon2id$v=19$m=65536,t=3,p=4$wagCPXjifgvUFBzq4hqe3w$CYaIb8sB+wtD+Vu/P4uod1+Qof8h+1g7bbDlBID48Rc","disabled":False,},"alice":{"username":"alice","full_name":"Alice Chains","email":"alicechains@example.com","hashed_password":"$argon2id$v=19$m=65536,t=3,p=4$g2/AV1zwopqUntPKJavBFw$BwpRGDCyUHLvHICnwijyX8ROGoiUPwNKZ7915MeYfCE","disabled":True,},}classToken(BaseModel):access_token:strtoken_type:strclassTokenData(BaseModel):username:str|None=Nonescopes:list[str]=[]classUser(BaseModel):username:stremail:str|None=Nonefull_name:str|None=Nonedisabled:bool|None=NoneclassUserInDB(User):hashed_password:strpassword_hash=PasswordHash.recommended()DUMMY_HASH=password_hash.hash("dummypassword")oauth2_scheme=OAuth2PasswordBearer(tokenUrl="token",scopes={"me":"Read information about the current user.","items":"Read items."},)app=FastAPI()defverify_password(plain_password,hashed_password):returnpassword_hash.verify(plain_password,hashed_password)defget_password_hash(password):returnpassword_hash.hash(password)defget_user(db,username:str):ifusernameindb:user_dict=db[username]returnUserInDB(**user_dict)defauthenticate_user(fake_db,username:str,password:str):user=get_user(fake_db,username)ifnotuser:verify_password(password,DUMMY_HASH)returnFalseifnotverify_password(password,user.hashed_password):returnFalsereturnuserdefcreate_access_token(data:dict,expires_delta:timedelta|None=None):to_encode=data.copy()ifexpires_delta:expire=datetime.now(timezone.utc)+expires_deltaelse:expire=datetime.now(timezone.utc)+timedelta(minutes=15)to_encode.update({"exp":expire})encoded_jwt=jwt.encode(to_encode,SECRET_KEY,algorithm=ALGORITHM)returnencoded_jwtasyncdefget_current_user(security_scopes:SecurityScopes,token:Annotated[str,Depends(oauth2_scheme)]):ifsecurity_scopes.scopes:authenticate_value=f'Bearer scope="{security_scopes.scope_str}"'else:authenticate_value="Bearer"credentials_exception=HTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Could not validate credentials",headers={"WWW-Authenticate":authenticate_value},)try:payload=jwt.decode(token,SECRET_KEY,algorithms=[ALGORITHM])username=payload.get("sub")ifusernameisNone:raisecredentials_exceptionscope:str=payload.get("scope","")token_scopes=scope.split(" ")token_data=TokenData(scopes=token_scopes,username=username)except(InvalidTokenError,ValidationError):raisecredentials_exceptionuser=get_user(fake_users_db,username=token_data.username)ifuserisNone:raisecredentials_exceptionforscopeinsecurity_scopes.scopes:ifscopenotintoken_data.scopes:raiseHTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Not enough permissions",headers={"WWW-Authenticate":authenticate_value},)returnuserasyncdefget_current_active_user(current_user:Annotated[User,Security(get_current_user,scopes=["me"])],):ifcurrent_user.disabled:raiseHTTPException(status_code=400,detail="Inactive user")returncurrent_user@app.post("/token")asyncdeflogin_for_access_token(form_data:Annotated[OAuth2PasswordRequestForm,Depends()],)->Token:user=authenticate_user(fake_users_db,form_data.username,form_data.password)ifnotuser:raiseHTTPException(status_code=400,detail="Incorrect username or password")access_token_expires=timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)access_token=create_access_token(data={"sub":user.username,"scope":" ".join(form_data.scopes)},expires_delta=access_token_expires,)returnToken(access_token=access_token,token_type="bearer")@app.get("/users/me/")asyncdefread_users_me(current_user:Annotated[User,Depends(get_current_active_user)],)->User:returncurrent_user@app.get("/users/me/items/")asyncdefread_own_items(current_user:Annotated[User,Security(get_current_active_user,scopes=["items"])],):return[{"item_id":"Foo","owner":current_user.username}]@app.get("/status/")asyncdefread_system_status(current_user:Annotated[User,Depends(get_current_user)]):return{"status":"ok"}
🤓 Other versions and variants

Tip

Prefer to use theAnnotated version if possible.

fromdatetimeimportdatetime,timedelta,timezoneimportjwtfromfastapiimportDepends,FastAPI,HTTPException,Security,statusfromfastapi.securityimport(OAuth2PasswordBearer,OAuth2PasswordRequestForm,SecurityScopes,)fromjwt.exceptionsimportInvalidTokenErrorfrompwdlibimportPasswordHashfrompydanticimportBaseModel,ValidationError# to get a string like this run:# openssl rand -hex 32SECRET_KEY="09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"ALGORITHM="HS256"ACCESS_TOKEN_EXPIRE_MINUTES=30fake_users_db={"johndoe":{"username":"johndoe","full_name":"John Doe","email":"johndoe@example.com","hashed_password":"$argon2id$v=19$m=65536,t=3,p=4$wagCPXjifgvUFBzq4hqe3w$CYaIb8sB+wtD+Vu/P4uod1+Qof8h+1g7bbDlBID48Rc","disabled":False,},"alice":{"username":"alice","full_name":"Alice Chains","email":"alicechains@example.com","hashed_password":"$argon2id$v=19$m=65536,t=3,p=4$g2/AV1zwopqUntPKJavBFw$BwpRGDCyUHLvHICnwijyX8ROGoiUPwNKZ7915MeYfCE","disabled":True,},}classToken(BaseModel):access_token:strtoken_type:strclassTokenData(BaseModel):username:str|None=Nonescopes:list[str]=[]classUser(BaseModel):username:stremail:str|None=Nonefull_name:str|None=Nonedisabled:bool|None=NoneclassUserInDB(User):hashed_password:strpassword_hash=PasswordHash.recommended()DUMMY_HASH=password_hash.hash("dummypassword")oauth2_scheme=OAuth2PasswordBearer(tokenUrl="token",scopes={"me":"Read information about the current user.","items":"Read items."},)app=FastAPI()defverify_password(plain_password,hashed_password):returnpassword_hash.verify(plain_password,hashed_password)defget_password_hash(password):returnpassword_hash.hash(password)defget_user(db,username:str):ifusernameindb:user_dict=db[username]returnUserInDB(**user_dict)defauthenticate_user(fake_db,username:str,password:str):user=get_user(fake_db,username)ifnotuser:verify_password(password,DUMMY_HASH)returnFalseifnotverify_password(password,user.hashed_password):returnFalsereturnuserdefcreate_access_token(data:dict,expires_delta:timedelta|None=None):to_encode=data.copy()ifexpires_delta:expire=datetime.now(timezone.utc)+expires_deltaelse:expire=datetime.now(timezone.utc)+timedelta(minutes=15)to_encode.update({"exp":expire})encoded_jwt=jwt.encode(to_encode,SECRET_KEY,algorithm=ALGORITHM)returnencoded_jwtasyncdefget_current_user(security_scopes:SecurityScopes,token:str=Depends(oauth2_scheme)):ifsecurity_scopes.scopes:authenticate_value=f'Bearer scope="{security_scopes.scope_str}"'else:authenticate_value="Bearer"credentials_exception=HTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Could not validate credentials",headers={"WWW-Authenticate":authenticate_value},)try:payload=jwt.decode(token,SECRET_KEY,algorithms=[ALGORITHM])username:str=payload.get("sub")ifusernameisNone:raisecredentials_exceptionscope:str=payload.get("scope","")token_scopes=scope.split(" ")token_data=TokenData(scopes=token_scopes,username=username)except(InvalidTokenError,ValidationError):raisecredentials_exceptionuser=get_user(fake_users_db,username=token_data.username)ifuserisNone:raisecredentials_exceptionforscopeinsecurity_scopes.scopes:ifscopenotintoken_data.scopes:raiseHTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Not enough permissions",headers={"WWW-Authenticate":authenticate_value},)returnuserasyncdefget_current_active_user(current_user:User=Security(get_current_user,scopes=["me"]),):ifcurrent_user.disabled:raiseHTTPException(status_code=400,detail="Inactive user")returncurrent_user@app.post("/token")asyncdeflogin_for_access_token(form_data:OAuth2PasswordRequestForm=Depends(),)->Token:user=authenticate_user(fake_users_db,form_data.username,form_data.password)ifnotuser:raiseHTTPException(status_code=400,detail="Incorrect username or password")access_token_expires=timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)access_token=create_access_token(data={"sub":user.username,"scope":" ".join(form_data.scopes)},expires_delta=access_token_expires,)returnToken(access_token=access_token,token_type="bearer")@app.get("/users/me/")asyncdefread_users_me(current_user:User=Depends(get_current_active_user))->User:returncurrent_user@app.get("/users/me/items/")asyncdefread_own_items(current_user:User=Security(get_current_active_user,scopes=["items"]),):return[{"item_id":"Foo","owner":current_user.username}]@app.get("/status/")asyncdefread_system_status(current_user:User=Depends(get_current_user)):return{"status":"ok"}

Verify theusername and data shape

We verify that we get ausername, and extract the scopes.

And then we validate that data with the Pydantic model (catching theValidationError exception), and if we get an error reading the JWT token or validating the data with Pydantic, we raise theHTTPException we created before.

For that, we update the Pydantic modelTokenData with a new propertyscopes.

By validating the data with Pydantic we can make sure that we have, for example, exactly alist ofstr with the scopes and astr with theusername.

Instead of, for example, adict, or something else, as it could break the application at some point later, making it a security risk.

We also verify that we have a user with that username, and if not, we raise that same exception we created before.

fromdatetimeimportdatetime,timedelta,timezonefromtypingimportAnnotatedimportjwtfromfastapiimportDepends,FastAPI,HTTPException,Security,statusfromfastapi.securityimport(OAuth2PasswordBearer,OAuth2PasswordRequestForm,SecurityScopes,)fromjwt.exceptionsimportInvalidTokenErrorfrompwdlibimportPasswordHashfrompydanticimportBaseModel,ValidationError# to get a string like this run:# openssl rand -hex 32SECRET_KEY="09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"ALGORITHM="HS256"ACCESS_TOKEN_EXPIRE_MINUTES=30fake_users_db={"johndoe":{"username":"johndoe","full_name":"John Doe","email":"johndoe@example.com","hashed_password":"$argon2id$v=19$m=65536,t=3,p=4$wagCPXjifgvUFBzq4hqe3w$CYaIb8sB+wtD+Vu/P4uod1+Qof8h+1g7bbDlBID48Rc","disabled":False,},"alice":{"username":"alice","full_name":"Alice Chains","email":"alicechains@example.com","hashed_password":"$argon2id$v=19$m=65536,t=3,p=4$g2/AV1zwopqUntPKJavBFw$BwpRGDCyUHLvHICnwijyX8ROGoiUPwNKZ7915MeYfCE","disabled":True,},}classToken(BaseModel):access_token:strtoken_type:strclassTokenData(BaseModel):username:str|None=Nonescopes:list[str]=[]classUser(BaseModel):username:stremail:str|None=Nonefull_name:str|None=Nonedisabled:bool|None=NoneclassUserInDB(User):hashed_password:strpassword_hash=PasswordHash.recommended()DUMMY_HASH=password_hash.hash("dummypassword")oauth2_scheme=OAuth2PasswordBearer(tokenUrl="token",scopes={"me":"Read information about the current user.","items":"Read items."},)app=FastAPI()defverify_password(plain_password,hashed_password):returnpassword_hash.verify(plain_password,hashed_password)defget_password_hash(password):returnpassword_hash.hash(password)defget_user(db,username:str):ifusernameindb:user_dict=db[username]returnUserInDB(**user_dict)defauthenticate_user(fake_db,username:str,password:str):user=get_user(fake_db,username)ifnotuser:verify_password(password,DUMMY_HASH)returnFalseifnotverify_password(password,user.hashed_password):returnFalsereturnuserdefcreate_access_token(data:dict,expires_delta:timedelta|None=None):to_encode=data.copy()ifexpires_delta:expire=datetime.now(timezone.utc)+expires_deltaelse:expire=datetime.now(timezone.utc)+timedelta(minutes=15)to_encode.update({"exp":expire})encoded_jwt=jwt.encode(to_encode,SECRET_KEY,algorithm=ALGORITHM)returnencoded_jwtasyncdefget_current_user(security_scopes:SecurityScopes,token:Annotated[str,Depends(oauth2_scheme)]):ifsecurity_scopes.scopes:authenticate_value=f'Bearer scope="{security_scopes.scope_str}"'else:authenticate_value="Bearer"credentials_exception=HTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Could not validate credentials",headers={"WWW-Authenticate":authenticate_value},)try:payload=jwt.decode(token,SECRET_KEY,algorithms=[ALGORITHM])username=payload.get("sub")ifusernameisNone:raisecredentials_exceptionscope:str=payload.get("scope","")token_scopes=scope.split(" ")token_data=TokenData(scopes=token_scopes,username=username)except(InvalidTokenError,ValidationError):raisecredentials_exceptionuser=get_user(fake_users_db,username=token_data.username)ifuserisNone:raisecredentials_exceptionforscopeinsecurity_scopes.scopes:ifscopenotintoken_data.scopes:raiseHTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Not enough permissions",headers={"WWW-Authenticate":authenticate_value},)returnuserasyncdefget_current_active_user(current_user:Annotated[User,Security(get_current_user,scopes=["me"])],):ifcurrent_user.disabled:raiseHTTPException(status_code=400,detail="Inactive user")returncurrent_user@app.post("/token")asyncdeflogin_for_access_token(form_data:Annotated[OAuth2PasswordRequestForm,Depends()],)->Token:user=authenticate_user(fake_users_db,form_data.username,form_data.password)ifnotuser:raiseHTTPException(status_code=400,detail="Incorrect username or password")access_token_expires=timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)access_token=create_access_token(data={"sub":user.username,"scope":" ".join(form_data.scopes)},expires_delta=access_token_expires,)returnToken(access_token=access_token,token_type="bearer")@app.get("/users/me/")asyncdefread_users_me(current_user:Annotated[User,Depends(get_current_active_user)],)->User:returncurrent_user@app.get("/users/me/items/")asyncdefread_own_items(current_user:Annotated[User,Security(get_current_active_user,scopes=["items"])],):return[{"item_id":"Foo","owner":current_user.username}]@app.get("/status/")asyncdefread_system_status(current_user:Annotated[User,Depends(get_current_user)]):return{"status":"ok"}
🤓 Other versions and variants

Tip

Prefer to use theAnnotated version if possible.

fromdatetimeimportdatetime,timedelta,timezoneimportjwtfromfastapiimportDepends,FastAPI,HTTPException,Security,statusfromfastapi.securityimport(OAuth2PasswordBearer,OAuth2PasswordRequestForm,SecurityScopes,)fromjwt.exceptionsimportInvalidTokenErrorfrompwdlibimportPasswordHashfrompydanticimportBaseModel,ValidationError# to get a string like this run:# openssl rand -hex 32SECRET_KEY="09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"ALGORITHM="HS256"ACCESS_TOKEN_EXPIRE_MINUTES=30fake_users_db={"johndoe":{"username":"johndoe","full_name":"John Doe","email":"johndoe@example.com","hashed_password":"$argon2id$v=19$m=65536,t=3,p=4$wagCPXjifgvUFBzq4hqe3w$CYaIb8sB+wtD+Vu/P4uod1+Qof8h+1g7bbDlBID48Rc","disabled":False,},"alice":{"username":"alice","full_name":"Alice Chains","email":"alicechains@example.com","hashed_password":"$argon2id$v=19$m=65536,t=3,p=4$g2/AV1zwopqUntPKJavBFw$BwpRGDCyUHLvHICnwijyX8ROGoiUPwNKZ7915MeYfCE","disabled":True,},}classToken(BaseModel):access_token:strtoken_type:strclassTokenData(BaseModel):username:str|None=Nonescopes:list[str]=[]classUser(BaseModel):username:stremail:str|None=Nonefull_name:str|None=Nonedisabled:bool|None=NoneclassUserInDB(User):hashed_password:strpassword_hash=PasswordHash.recommended()DUMMY_HASH=password_hash.hash("dummypassword")oauth2_scheme=OAuth2PasswordBearer(tokenUrl="token",scopes={"me":"Read information about the current user.","items":"Read items."},)app=FastAPI()defverify_password(plain_password,hashed_password):returnpassword_hash.verify(plain_password,hashed_password)defget_password_hash(password):returnpassword_hash.hash(password)defget_user(db,username:str):ifusernameindb:user_dict=db[username]returnUserInDB(**user_dict)defauthenticate_user(fake_db,username:str,password:str):user=get_user(fake_db,username)ifnotuser:verify_password(password,DUMMY_HASH)returnFalseifnotverify_password(password,user.hashed_password):returnFalsereturnuserdefcreate_access_token(data:dict,expires_delta:timedelta|None=None):to_encode=data.copy()ifexpires_delta:expire=datetime.now(timezone.utc)+expires_deltaelse:expire=datetime.now(timezone.utc)+timedelta(minutes=15)to_encode.update({"exp":expire})encoded_jwt=jwt.encode(to_encode,SECRET_KEY,algorithm=ALGORITHM)returnencoded_jwtasyncdefget_current_user(security_scopes:SecurityScopes,token:str=Depends(oauth2_scheme)):ifsecurity_scopes.scopes:authenticate_value=f'Bearer scope="{security_scopes.scope_str}"'else:authenticate_value="Bearer"credentials_exception=HTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Could not validate credentials",headers={"WWW-Authenticate":authenticate_value},)try:payload=jwt.decode(token,SECRET_KEY,algorithms=[ALGORITHM])username:str=payload.get("sub")ifusernameisNone:raisecredentials_exceptionscope:str=payload.get("scope","")token_scopes=scope.split(" ")token_data=TokenData(scopes=token_scopes,username=username)except(InvalidTokenError,ValidationError):raisecredentials_exceptionuser=get_user(fake_users_db,username=token_data.username)ifuserisNone:raisecredentials_exceptionforscopeinsecurity_scopes.scopes:ifscopenotintoken_data.scopes:raiseHTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Not enough permissions",headers={"WWW-Authenticate":authenticate_value},)returnuserasyncdefget_current_active_user(current_user:User=Security(get_current_user,scopes=["me"]),):ifcurrent_user.disabled:raiseHTTPException(status_code=400,detail="Inactive user")returncurrent_user@app.post("/token")asyncdeflogin_for_access_token(form_data:OAuth2PasswordRequestForm=Depends(),)->Token:user=authenticate_user(fake_users_db,form_data.username,form_data.password)ifnotuser:raiseHTTPException(status_code=400,detail="Incorrect username or password")access_token_expires=timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)access_token=create_access_token(data={"sub":user.username,"scope":" ".join(form_data.scopes)},expires_delta=access_token_expires,)returnToken(access_token=access_token,token_type="bearer")@app.get("/users/me/")asyncdefread_users_me(current_user:User=Depends(get_current_active_user))->User:returncurrent_user@app.get("/users/me/items/")asyncdefread_own_items(current_user:User=Security(get_current_active_user,scopes=["items"]),):return[{"item_id":"Foo","owner":current_user.username}]@app.get("/status/")asyncdefread_system_status(current_user:User=Depends(get_current_user)):return{"status":"ok"}

Verify thescopes

We now verify that all the scopes required, by this dependency and all the dependants (includingpath operations), are included in the scopes provided in the token received, otherwise raise anHTTPException.

For this, we usesecurity_scopes.scopes, that contains alist with all these scopes asstr.

fromdatetimeimportdatetime,timedelta,timezonefromtypingimportAnnotatedimportjwtfromfastapiimportDepends,FastAPI,HTTPException,Security,statusfromfastapi.securityimport(OAuth2PasswordBearer,OAuth2PasswordRequestForm,SecurityScopes,)fromjwt.exceptionsimportInvalidTokenErrorfrompwdlibimportPasswordHashfrompydanticimportBaseModel,ValidationError# to get a string like this run:# openssl rand -hex 32SECRET_KEY="09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"ALGORITHM="HS256"ACCESS_TOKEN_EXPIRE_MINUTES=30fake_users_db={"johndoe":{"username":"johndoe","full_name":"John Doe","email":"johndoe@example.com","hashed_password":"$argon2id$v=19$m=65536,t=3,p=4$wagCPXjifgvUFBzq4hqe3w$CYaIb8sB+wtD+Vu/P4uod1+Qof8h+1g7bbDlBID48Rc","disabled":False,},"alice":{"username":"alice","full_name":"Alice Chains","email":"alicechains@example.com","hashed_password":"$argon2id$v=19$m=65536,t=3,p=4$g2/AV1zwopqUntPKJavBFw$BwpRGDCyUHLvHICnwijyX8ROGoiUPwNKZ7915MeYfCE","disabled":True,},}classToken(BaseModel):access_token:strtoken_type:strclassTokenData(BaseModel):username:str|None=Nonescopes:list[str]=[]classUser(BaseModel):username:stremail:str|None=Nonefull_name:str|None=Nonedisabled:bool|None=NoneclassUserInDB(User):hashed_password:strpassword_hash=PasswordHash.recommended()DUMMY_HASH=password_hash.hash("dummypassword")oauth2_scheme=OAuth2PasswordBearer(tokenUrl="token",scopes={"me":"Read information about the current user.","items":"Read items."},)app=FastAPI()defverify_password(plain_password,hashed_password):returnpassword_hash.verify(plain_password,hashed_password)defget_password_hash(password):returnpassword_hash.hash(password)defget_user(db,username:str):ifusernameindb:user_dict=db[username]returnUserInDB(**user_dict)defauthenticate_user(fake_db,username:str,password:str):user=get_user(fake_db,username)ifnotuser:verify_password(password,DUMMY_HASH)returnFalseifnotverify_password(password,user.hashed_password):returnFalsereturnuserdefcreate_access_token(data:dict,expires_delta:timedelta|None=None):to_encode=data.copy()ifexpires_delta:expire=datetime.now(timezone.utc)+expires_deltaelse:expire=datetime.now(timezone.utc)+timedelta(minutes=15)to_encode.update({"exp":expire})encoded_jwt=jwt.encode(to_encode,SECRET_KEY,algorithm=ALGORITHM)returnencoded_jwtasyncdefget_current_user(security_scopes:SecurityScopes,token:Annotated[str,Depends(oauth2_scheme)]):ifsecurity_scopes.scopes:authenticate_value=f'Bearer scope="{security_scopes.scope_str}"'else:authenticate_value="Bearer"credentials_exception=HTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Could not validate credentials",headers={"WWW-Authenticate":authenticate_value},)try:payload=jwt.decode(token,SECRET_KEY,algorithms=[ALGORITHM])username=payload.get("sub")ifusernameisNone:raisecredentials_exceptionscope:str=payload.get("scope","")token_scopes=scope.split(" ")token_data=TokenData(scopes=token_scopes,username=username)except(InvalidTokenError,ValidationError):raisecredentials_exceptionuser=get_user(fake_users_db,username=token_data.username)ifuserisNone:raisecredentials_exceptionforscopeinsecurity_scopes.scopes:ifscopenotintoken_data.scopes:raiseHTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Not enough permissions",headers={"WWW-Authenticate":authenticate_value},)returnuserasyncdefget_current_active_user(current_user:Annotated[User,Security(get_current_user,scopes=["me"])],):ifcurrent_user.disabled:raiseHTTPException(status_code=400,detail="Inactive user")returncurrent_user@app.post("/token")asyncdeflogin_for_access_token(form_data:Annotated[OAuth2PasswordRequestForm,Depends()],)->Token:user=authenticate_user(fake_users_db,form_data.username,form_data.password)ifnotuser:raiseHTTPException(status_code=400,detail="Incorrect username or password")access_token_expires=timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)access_token=create_access_token(data={"sub":user.username,"scope":" ".join(form_data.scopes)},expires_delta=access_token_expires,)returnToken(access_token=access_token,token_type="bearer")@app.get("/users/me/")asyncdefread_users_me(current_user:Annotated[User,Depends(get_current_active_user)],)->User:returncurrent_user@app.get("/users/me/items/")asyncdefread_own_items(current_user:Annotated[User,Security(get_current_active_user,scopes=["items"])],):return[{"item_id":"Foo","owner":current_user.username}]@app.get("/status/")asyncdefread_system_status(current_user:Annotated[User,Depends(get_current_user)]):return{"status":"ok"}
🤓 Other versions and variants

Tip

Prefer to use theAnnotated version if possible.

fromdatetimeimportdatetime,timedelta,timezoneimportjwtfromfastapiimportDepends,FastAPI,HTTPException,Security,statusfromfastapi.securityimport(OAuth2PasswordBearer,OAuth2PasswordRequestForm,SecurityScopes,)fromjwt.exceptionsimportInvalidTokenErrorfrompwdlibimportPasswordHashfrompydanticimportBaseModel,ValidationError# to get a string like this run:# openssl rand -hex 32SECRET_KEY="09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"ALGORITHM="HS256"ACCESS_TOKEN_EXPIRE_MINUTES=30fake_users_db={"johndoe":{"username":"johndoe","full_name":"John Doe","email":"johndoe@example.com","hashed_password":"$argon2id$v=19$m=65536,t=3,p=4$wagCPXjifgvUFBzq4hqe3w$CYaIb8sB+wtD+Vu/P4uod1+Qof8h+1g7bbDlBID48Rc","disabled":False,},"alice":{"username":"alice","full_name":"Alice Chains","email":"alicechains@example.com","hashed_password":"$argon2id$v=19$m=65536,t=3,p=4$g2/AV1zwopqUntPKJavBFw$BwpRGDCyUHLvHICnwijyX8ROGoiUPwNKZ7915MeYfCE","disabled":True,},}classToken(BaseModel):access_token:strtoken_type:strclassTokenData(BaseModel):username:str|None=Nonescopes:list[str]=[]classUser(BaseModel):username:stremail:str|None=Nonefull_name:str|None=Nonedisabled:bool|None=NoneclassUserInDB(User):hashed_password:strpassword_hash=PasswordHash.recommended()DUMMY_HASH=password_hash.hash("dummypassword")oauth2_scheme=OAuth2PasswordBearer(tokenUrl="token",scopes={"me":"Read information about the current user.","items":"Read items."},)app=FastAPI()defverify_password(plain_password,hashed_password):returnpassword_hash.verify(plain_password,hashed_password)defget_password_hash(password):returnpassword_hash.hash(password)defget_user(db,username:str):ifusernameindb:user_dict=db[username]returnUserInDB(**user_dict)defauthenticate_user(fake_db,username:str,password:str):user=get_user(fake_db,username)ifnotuser:verify_password(password,DUMMY_HASH)returnFalseifnotverify_password(password,user.hashed_password):returnFalsereturnuserdefcreate_access_token(data:dict,expires_delta:timedelta|None=None):to_encode=data.copy()ifexpires_delta:expire=datetime.now(timezone.utc)+expires_deltaelse:expire=datetime.now(timezone.utc)+timedelta(minutes=15)to_encode.update({"exp":expire})encoded_jwt=jwt.encode(to_encode,SECRET_KEY,algorithm=ALGORITHM)returnencoded_jwtasyncdefget_current_user(security_scopes:SecurityScopes,token:str=Depends(oauth2_scheme)):ifsecurity_scopes.scopes:authenticate_value=f'Bearer scope="{security_scopes.scope_str}"'else:authenticate_value="Bearer"credentials_exception=HTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Could not validate credentials",headers={"WWW-Authenticate":authenticate_value},)try:payload=jwt.decode(token,SECRET_KEY,algorithms=[ALGORITHM])username:str=payload.get("sub")ifusernameisNone:raisecredentials_exceptionscope:str=payload.get("scope","")token_scopes=scope.split(" ")token_data=TokenData(scopes=token_scopes,username=username)except(InvalidTokenError,ValidationError):raisecredentials_exceptionuser=get_user(fake_users_db,username=token_data.username)ifuserisNone:raisecredentials_exceptionforscopeinsecurity_scopes.scopes:ifscopenotintoken_data.scopes:raiseHTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Not enough permissions",headers={"WWW-Authenticate":authenticate_value},)returnuserasyncdefget_current_active_user(current_user:User=Security(get_current_user,scopes=["me"]),):ifcurrent_user.disabled:raiseHTTPException(status_code=400,detail="Inactive user")returncurrent_user@app.post("/token")asyncdeflogin_for_access_token(form_data:OAuth2PasswordRequestForm=Depends(),)->Token:user=authenticate_user(fake_users_db,form_data.username,form_data.password)ifnotuser:raiseHTTPException(status_code=400,detail="Incorrect username or password")access_token_expires=timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)access_token=create_access_token(data={"sub":user.username,"scope":" ".join(form_data.scopes)},expires_delta=access_token_expires,)returnToken(access_token=access_token,token_type="bearer")@app.get("/users/me/")asyncdefread_users_me(current_user:User=Depends(get_current_active_user))->User:returncurrent_user@app.get("/users/me/items/")asyncdefread_own_items(current_user:User=Security(get_current_active_user,scopes=["items"]),):return[{"item_id":"Foo","owner":current_user.username}]@app.get("/status/")asyncdefread_system_status(current_user:User=Depends(get_current_user)):return{"status":"ok"}

Dependency tree and scopes

Let's review again this dependency tree and the scopes.

As theget_current_active_user dependency has as a sub-dependency onget_current_user, the scope"me" declared atget_current_active_user will be included in the list of required scopes in thesecurity_scopes.scopes passed toget_current_user.

Thepath operation itself also declares a scope,"items", so this will also be in the list ofsecurity_scopes.scopes passed toget_current_user.

Here's how the hierarchy of dependencies and scopes looks like:

  • Thepath operationread_own_items has:
    • Required scopes["items"] with the dependency:
    • get_current_active_user:
      • The dependency functionget_current_active_user has:
        • Required scopes["me"] with the dependency:
        • get_current_user:
          • The dependency functionget_current_user has:
            • No scopes required by itself.
            • A dependency usingoauth2_scheme.
            • Asecurity_scopes parameter of typeSecurityScopes:
              • Thissecurity_scopes parameter has a propertyscopes with alist containing all these scopes declared above, so:
                • security_scopes.scopes will contain["me", "items"] for thepath operationread_own_items.
                • security_scopes.scopes will contain["me"] for thepath operationread_users_me, because it is declared in the dependencyget_current_active_user.
                • security_scopes.scopes will contain[] (nothing) for thepath operationread_system_status, because it didn't declare anySecurity withscopes, and its dependency,get_current_user, doesn't declare anyscopes either.

Tip

The important and "magic" thing here is thatget_current_user will have a different list ofscopes to check for eachpath operation.

All depending on thescopes declared in eachpath operation and each dependency in the dependency tree for that specificpath operation.

More details aboutSecurityScopes

You can useSecurityScopes at any point, and in multiple places, it doesn't have to be at the "root" dependency.

It will always have the security scopes declared in the currentSecurity dependencies and all the dependants forthat specificpath operation andthat specific dependency tree.

Because theSecurityScopes will have all the scopes declared by dependants, you can use it to verify that a token has the required scopes in a central dependency function, and then declare different scope requirements in differentpath operations.

They will be checked independently for eachpath operation.

Check it

If you open the API docs, you can authenticate and specify which scopes you want to authorize.

If you don't select any scope, you will be "authenticated", but when you try to access/users/me/ or/users/me/items/ you will get an error saying that you don't have enough permissions. You will still be able to access/status/.

And if you select the scopeme but not the scopeitems, you will be able to access/users/me/ but not/users/me/items/.

That's what would happen to a third party application that tried to access one of thesepath operations with a token provided by a user, depending on how many permissions the user gave the application.

About third party integrations

In this example we are using the OAuth2 "password" flow.

This is appropriate when we are logging in to our own application, probably with our own frontend.

Because we can trust it to receive theusername andpassword, as we control it.

But if you are building an OAuth2 application that others would connect to (i.e., if you are building an authentication provider equivalent to Facebook, Google, GitHub, etc.) you should use one of the other flows.

The most common is the implicit flow.

The most secure is the code flow, but it's more complex to implement as it requires more steps. As it is more complex, many providers end up suggesting the implicit flow.

Note

It's common that each authentication provider names their flows in a different way, to make it part of their brand.

But in the end, they are implementing the same OAuth2 standard.

FastAPI includes utilities for all these OAuth2 authentication flows infastapi.security.oauth2.

Security in decoratordependencies

The same way you can define alist ofDepends in the decorator'sdependencies parameter (as explained inDependencies in path operation decorators), you could also useSecurity withscopes there.


[8]ページ先頭

©2009-2026 Movatter.jp