Movatterモバイル変換


[0]ホーム

URL:


Skip to content
Join theFastAPI Cloud waiting list 🚀
Follow@fastapi onX (Twitter) to stay updated
FollowFastAPI onLinkedIn to stay updated
Subscribe to theFastAPI and friends newsletter 🎉
sponsor
sponsor
sponsor
sponsor
sponsor
sponsor
sponsor
sponsor
sponsor
sponsor
sponsor

Simple OAuth2 with Password and Bearer

Now let's build from the previous chapter and add the missing parts to have a complete security flow.

Get theusername andpassword

We are going to useFastAPI security utilities to get theusername andpassword.

OAuth2 specifies that when using the "password flow" (that we are using) the client/user must send ausername andpassword fields as form data.

And the spec says that the fields have to be named like that. Souser-name oremail wouldn't work.

But don't worry, you can show it as you wish to your final users in the frontend.

And your database models can use any other names you want.

But for the loginpath operation, we need to use these names to be compatible with the spec (and be able to, for example, use the integrated API documentation system).

The spec also states that theusername andpassword must be sent as form data (so, no JSON here).

scope

The spec also says that the client can send another form field "scope".

The form field name isscope (in singular), but it is actually a long string with "scopes" separated by spaces.

Each "scope" is just a string (without spaces).

They are normally used to declare specific security permissions, for example:

  • users:read orusers:write are common examples.
  • instagram_basic is used by Facebook / Instagram.
  • https://www.googleapis.com/auth/drive is used by Google.

Info

In OAuth2 a "scope" is just a string that declares a specific permission required.

It doesn't matter if it has other characters like: or if it is a URL.

Those details are implementation specific.

For OAuth2 they are just strings.

Code to get theusername andpassword

Now let's use the utilities provided byFastAPI to handle this.

OAuth2PasswordRequestForm

First, importOAuth2PasswordRequestForm, and use it as a dependency withDepends in thepath operation for/token:

fromtypingimportAnnotatedfromfastapiimportDepends,FastAPI,HTTPException,statusfromfastapi.securityimportOAuth2PasswordBearer,OAuth2PasswordRequestFormfrompydanticimportBaseModelfake_users_db={"johndoe":{"username":"johndoe","full_name":"John Doe","email":"johndoe@example.com","hashed_password":"fakehashedsecret","disabled":False,},"alice":{"username":"alice","full_name":"Alice Wonderson","email":"alice@example.com","hashed_password":"fakehashedsecret2","disabled":True,},}app=FastAPI()deffake_hash_password(password:str):return"fakehashed"+passwordoauth2_scheme=OAuth2PasswordBearer(tokenUrl="token")classUser(BaseModel):username:stremail:str|None=Nonefull_name:str|None=Nonedisabled:bool|None=NoneclassUserInDB(User):hashed_password:strdefget_user(db,username:str):ifusernameindb:user_dict=db[username]returnUserInDB(**user_dict)deffake_decode_token(token):# This doesn't provide any security at all# Check the next versionuser=get_user(fake_users_db,token)returnuserasyncdefget_current_user(token:Annotated[str,Depends(oauth2_scheme)]):user=fake_decode_token(token)ifnotuser:raiseHTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Not authenticated",headers={"WWW-Authenticate":"Bearer"},)returnuserasyncdefget_current_active_user(current_user:Annotated[User,Depends(get_current_user)],):ifcurrent_user.disabled:raiseHTTPException(status_code=400,detail="Inactive user")returncurrent_user@app.post("/token")asyncdeflogin(form_data:Annotated[OAuth2PasswordRequestForm,Depends()]):user_dict=fake_users_db.get(form_data.username)ifnotuser_dict:raiseHTTPException(status_code=400,detail="Incorrect username or password")user=UserInDB(**user_dict)hashed_password=fake_hash_password(form_data.password)ifnothashed_password==user.hashed_password:raiseHTTPException(status_code=400,detail="Incorrect username or password")return{"access_token":user.username,"token_type":"bearer"}@app.get("/users/me")asyncdefread_users_me(current_user:Annotated[User,Depends(get_current_active_user)],):returncurrent_user
🤓 Other versions and variants

Tip

Prefer to use theAnnotated version if possible.

fromfastapiimportDepends,FastAPI,HTTPException,statusfromfastapi.securityimportOAuth2PasswordBearer,OAuth2PasswordRequestFormfrompydanticimportBaseModelfake_users_db={"johndoe":{"username":"johndoe","full_name":"John Doe","email":"johndoe@example.com","hashed_password":"fakehashedsecret","disabled":False,},"alice":{"username":"alice","full_name":"Alice Wonderson","email":"alice@example.com","hashed_password":"fakehashedsecret2","disabled":True,},}app=FastAPI()deffake_hash_password(password:str):return"fakehashed"+passwordoauth2_scheme=OAuth2PasswordBearer(tokenUrl="token")classUser(BaseModel):username:stremail:str|None=Nonefull_name:str|None=Nonedisabled:bool|None=NoneclassUserInDB(User):hashed_password:strdefget_user(db,username:str):ifusernameindb:user_dict=db[username]returnUserInDB(**user_dict)deffake_decode_token(token):# This doesn't provide any security at all# Check the next versionuser=get_user(fake_users_db,token)returnuserasyncdefget_current_user(token:str=Depends(oauth2_scheme)):user=fake_decode_token(token)ifnotuser:raiseHTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Not authenticated",headers={"WWW-Authenticate":"Bearer"},)returnuserasyncdefget_current_active_user(current_user:User=Depends(get_current_user)):ifcurrent_user.disabled:raiseHTTPException(status_code=400,detail="Inactive user")returncurrent_user@app.post("/token")asyncdeflogin(form_data:OAuth2PasswordRequestForm=Depends()):user_dict=fake_users_db.get(form_data.username)ifnotuser_dict:raiseHTTPException(status_code=400,detail="Incorrect username or password")user=UserInDB(**user_dict)hashed_password=fake_hash_password(form_data.password)ifnothashed_password==user.hashed_password:raiseHTTPException(status_code=400,detail="Incorrect username or password")return{"access_token":user.username,"token_type":"bearer"}@app.get("/users/me")asyncdefread_users_me(current_user:User=Depends(get_current_active_user)):returncurrent_user

OAuth2PasswordRequestForm is a class dependency that declares a form body with:

  • Theusername.
  • Thepassword.
  • An optionalscope field as a big string, composed of strings separated by spaces.
  • An optionalgrant_type.

Tip

The OAuth2 spec actuallyrequires a fieldgrant_type with a fixed value ofpassword, butOAuth2PasswordRequestForm doesn't enforce it.

If you need to enforce it, useOAuth2PasswordRequestFormStrict instead ofOAuth2PasswordRequestForm.

  • An optionalclient_id (we don't need it for our example).
  • An optionalclient_secret (we don't need it for our example).

Info

TheOAuth2PasswordRequestForm is not a special class forFastAPI as isOAuth2PasswordBearer.

OAuth2PasswordBearer makesFastAPI know that it is a security scheme. So it is added that way to OpenAPI.

ButOAuth2PasswordRequestForm is just a class dependency that you could have written yourself, or you could have declaredForm parameters directly.

But as it's a common use case, it is provided byFastAPI directly, just to make it easier.

Use the form data

Tip

The instance of the dependency classOAuth2PasswordRequestForm won't have an attributescope with the long string separated by spaces, instead, it will have ascopes attribute with the actual list of strings for each scope sent.

We are not usingscopes in this example, but the functionality is there if you need it.

Now, get the user data from the (fake) database, using theusername from the form field.

If there is no such user, we return an error saying "Incorrect username or password".

For the error, we use the exceptionHTTPException:

fromtypingimportAnnotatedfromfastapiimportDepends,FastAPI,HTTPException,statusfromfastapi.securityimportOAuth2PasswordBearer,OAuth2PasswordRequestFormfrompydanticimportBaseModelfake_users_db={"johndoe":{"username":"johndoe","full_name":"John Doe","email":"johndoe@example.com","hashed_password":"fakehashedsecret","disabled":False,},"alice":{"username":"alice","full_name":"Alice Wonderson","email":"alice@example.com","hashed_password":"fakehashedsecret2","disabled":True,},}app=FastAPI()deffake_hash_password(password:str):return"fakehashed"+passwordoauth2_scheme=OAuth2PasswordBearer(tokenUrl="token")classUser(BaseModel):username:stremail:str|None=Nonefull_name:str|None=Nonedisabled:bool|None=NoneclassUserInDB(User):hashed_password:strdefget_user(db,username:str):ifusernameindb:user_dict=db[username]returnUserInDB(**user_dict)deffake_decode_token(token):# This doesn't provide any security at all# Check the next versionuser=get_user(fake_users_db,token)returnuserasyncdefget_current_user(token:Annotated[str,Depends(oauth2_scheme)]):user=fake_decode_token(token)ifnotuser:raiseHTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Not authenticated",headers={"WWW-Authenticate":"Bearer"},)returnuserasyncdefget_current_active_user(current_user:Annotated[User,Depends(get_current_user)],):ifcurrent_user.disabled:raiseHTTPException(status_code=400,detail="Inactive user")returncurrent_user@app.post("/token")asyncdeflogin(form_data:Annotated[OAuth2PasswordRequestForm,Depends()]):user_dict=fake_users_db.get(form_data.username)ifnotuser_dict:raiseHTTPException(status_code=400,detail="Incorrect username or password")user=UserInDB(**user_dict)hashed_password=fake_hash_password(form_data.password)ifnothashed_password==user.hashed_password:raiseHTTPException(status_code=400,detail="Incorrect username or password")return{"access_token":user.username,"token_type":"bearer"}@app.get("/users/me")asyncdefread_users_me(current_user:Annotated[User,Depends(get_current_active_user)],):returncurrent_user
🤓 Other versions and variants

Tip

Prefer to use theAnnotated version if possible.

fromfastapiimportDepends,FastAPI,HTTPException,statusfromfastapi.securityimportOAuth2PasswordBearer,OAuth2PasswordRequestFormfrompydanticimportBaseModelfake_users_db={"johndoe":{"username":"johndoe","full_name":"John Doe","email":"johndoe@example.com","hashed_password":"fakehashedsecret","disabled":False,},"alice":{"username":"alice","full_name":"Alice Wonderson","email":"alice@example.com","hashed_password":"fakehashedsecret2","disabled":True,},}app=FastAPI()deffake_hash_password(password:str):return"fakehashed"+passwordoauth2_scheme=OAuth2PasswordBearer(tokenUrl="token")classUser(BaseModel):username:stremail:str|None=Nonefull_name:str|None=Nonedisabled:bool|None=NoneclassUserInDB(User):hashed_password:strdefget_user(db,username:str):ifusernameindb:user_dict=db[username]returnUserInDB(**user_dict)deffake_decode_token(token):# This doesn't provide any security at all# Check the next versionuser=get_user(fake_users_db,token)returnuserasyncdefget_current_user(token:str=Depends(oauth2_scheme)):user=fake_decode_token(token)ifnotuser:raiseHTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Not authenticated",headers={"WWW-Authenticate":"Bearer"},)returnuserasyncdefget_current_active_user(current_user:User=Depends(get_current_user)):ifcurrent_user.disabled:raiseHTTPException(status_code=400,detail="Inactive user")returncurrent_user@app.post("/token")asyncdeflogin(form_data:OAuth2PasswordRequestForm=Depends()):user_dict=fake_users_db.get(form_data.username)ifnotuser_dict:raiseHTTPException(status_code=400,detail="Incorrect username or password")user=UserInDB(**user_dict)hashed_password=fake_hash_password(form_data.password)ifnothashed_password==user.hashed_password:raiseHTTPException(status_code=400,detail="Incorrect username or password")return{"access_token":user.username,"token_type":"bearer"}@app.get("/users/me")asyncdefread_users_me(current_user:User=Depends(get_current_active_user)):returncurrent_user

Check the password

At this point we have the user data from our database, but we haven't checked the password.

Let's put that data in the PydanticUserInDB model first.

You should never save plaintext passwords, so, we'll use the (fake) password hashing system.

If the passwords don't match, we return the same error.

Password hashing

"Hashing" means: converting some content (a password in this case) into a sequence of bytes (just a string) that looks like gibberish.

Whenever you pass exactly the same content (exactly the same password) you get exactly the same gibberish.

But you cannot convert from the gibberish back to the password.

Why use password hashing

If your database is stolen, the thief won't have your users' plaintext passwords, only the hashes.

So, the thief won't be able to try to use those same passwords in another system (as many users use the same password everywhere, this would be dangerous).

fromtypingimportAnnotatedfromfastapiimportDepends,FastAPI,HTTPException,statusfromfastapi.securityimportOAuth2PasswordBearer,OAuth2PasswordRequestFormfrompydanticimportBaseModelfake_users_db={"johndoe":{"username":"johndoe","full_name":"John Doe","email":"johndoe@example.com","hashed_password":"fakehashedsecret","disabled":False,},"alice":{"username":"alice","full_name":"Alice Wonderson","email":"alice@example.com","hashed_password":"fakehashedsecret2","disabled":True,},}app=FastAPI()deffake_hash_password(password:str):return"fakehashed"+passwordoauth2_scheme=OAuth2PasswordBearer(tokenUrl="token")classUser(BaseModel):username:stremail:str|None=Nonefull_name:str|None=Nonedisabled:bool|None=NoneclassUserInDB(User):hashed_password:strdefget_user(db,username:str):ifusernameindb:user_dict=db[username]returnUserInDB(**user_dict)deffake_decode_token(token):# This doesn't provide any security at all# Check the next versionuser=get_user(fake_users_db,token)returnuserasyncdefget_current_user(token:Annotated[str,Depends(oauth2_scheme)]):user=fake_decode_token(token)ifnotuser:raiseHTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Not authenticated",headers={"WWW-Authenticate":"Bearer"},)returnuserasyncdefget_current_active_user(current_user:Annotated[User,Depends(get_current_user)],):ifcurrent_user.disabled:raiseHTTPException(status_code=400,detail="Inactive user")returncurrent_user@app.post("/token")asyncdeflogin(form_data:Annotated[OAuth2PasswordRequestForm,Depends()]):user_dict=fake_users_db.get(form_data.username)ifnotuser_dict:raiseHTTPException(status_code=400,detail="Incorrect username or password")user=UserInDB(**user_dict)hashed_password=fake_hash_password(form_data.password)ifnothashed_password==user.hashed_password:raiseHTTPException(status_code=400,detail="Incorrect username or password")return{"access_token":user.username,"token_type":"bearer"}@app.get("/users/me")asyncdefread_users_me(current_user:Annotated[User,Depends(get_current_active_user)],):returncurrent_user
🤓 Other versions and variants

Tip

Prefer to use theAnnotated version if possible.

fromfastapiimportDepends,FastAPI,HTTPException,statusfromfastapi.securityimportOAuth2PasswordBearer,OAuth2PasswordRequestFormfrompydanticimportBaseModelfake_users_db={"johndoe":{"username":"johndoe","full_name":"John Doe","email":"johndoe@example.com","hashed_password":"fakehashedsecret","disabled":False,},"alice":{"username":"alice","full_name":"Alice Wonderson","email":"alice@example.com","hashed_password":"fakehashedsecret2","disabled":True,},}app=FastAPI()deffake_hash_password(password:str):return"fakehashed"+passwordoauth2_scheme=OAuth2PasswordBearer(tokenUrl="token")classUser(BaseModel):username:stremail:str|None=Nonefull_name:str|None=Nonedisabled:bool|None=NoneclassUserInDB(User):hashed_password:strdefget_user(db,username:str):ifusernameindb:user_dict=db[username]returnUserInDB(**user_dict)deffake_decode_token(token):# This doesn't provide any security at all# Check the next versionuser=get_user(fake_users_db,token)returnuserasyncdefget_current_user(token:str=Depends(oauth2_scheme)):user=fake_decode_token(token)ifnotuser:raiseHTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Not authenticated",headers={"WWW-Authenticate":"Bearer"},)returnuserasyncdefget_current_active_user(current_user:User=Depends(get_current_user)):ifcurrent_user.disabled:raiseHTTPException(status_code=400,detail="Inactive user")returncurrent_user@app.post("/token")asyncdeflogin(form_data:OAuth2PasswordRequestForm=Depends()):user_dict=fake_users_db.get(form_data.username)ifnotuser_dict:raiseHTTPException(status_code=400,detail="Incorrect username or password")user=UserInDB(**user_dict)hashed_password=fake_hash_password(form_data.password)ifnothashed_password==user.hashed_password:raiseHTTPException(status_code=400,detail="Incorrect username or password")return{"access_token":user.username,"token_type":"bearer"}@app.get("/users/me")asyncdefread_users_me(current_user:User=Depends(get_current_active_user)):returncurrent_user

About**user_dict

UserInDB(**user_dict) means:

Pass the keys and values of theuser_dict directly as key-value arguments, equivalent to:

UserInDB(username=user_dict["username"],email=user_dict["email"],full_name=user_dict["full_name"],disabled=user_dict["disabled"],hashed_password=user_dict["hashed_password"],)

Info

For a more complete explanation of**user_dict check back inthe documentation forExtra Models.

Return the token

The response of thetoken endpoint must be a JSON object.

It should have atoken_type. In our case, as we are using "Bearer" tokens, the token type should be "bearer".

And it should have anaccess_token, with a string containing our access token.

For this simple example, we are going to just be completely insecure and return the sameusername as the token.

Tip

In the next chapter, you will see a real secure implementation, with password hashing andJWT tokens.

But for now, let's focus on the specific details we need.

fromtypingimportAnnotatedfromfastapiimportDepends,FastAPI,HTTPException,statusfromfastapi.securityimportOAuth2PasswordBearer,OAuth2PasswordRequestFormfrompydanticimportBaseModelfake_users_db={"johndoe":{"username":"johndoe","full_name":"John Doe","email":"johndoe@example.com","hashed_password":"fakehashedsecret","disabled":False,},"alice":{"username":"alice","full_name":"Alice Wonderson","email":"alice@example.com","hashed_password":"fakehashedsecret2","disabled":True,},}app=FastAPI()deffake_hash_password(password:str):return"fakehashed"+passwordoauth2_scheme=OAuth2PasswordBearer(tokenUrl="token")classUser(BaseModel):username:stremail:str|None=Nonefull_name:str|None=Nonedisabled:bool|None=NoneclassUserInDB(User):hashed_password:strdefget_user(db,username:str):ifusernameindb:user_dict=db[username]returnUserInDB(**user_dict)deffake_decode_token(token):# This doesn't provide any security at all# Check the next versionuser=get_user(fake_users_db,token)returnuserasyncdefget_current_user(token:Annotated[str,Depends(oauth2_scheme)]):user=fake_decode_token(token)ifnotuser:raiseHTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Not authenticated",headers={"WWW-Authenticate":"Bearer"},)returnuserasyncdefget_current_active_user(current_user:Annotated[User,Depends(get_current_user)],):ifcurrent_user.disabled:raiseHTTPException(status_code=400,detail="Inactive user")returncurrent_user@app.post("/token")asyncdeflogin(form_data:Annotated[OAuth2PasswordRequestForm,Depends()]):user_dict=fake_users_db.get(form_data.username)ifnotuser_dict:raiseHTTPException(status_code=400,detail="Incorrect username or password")user=UserInDB(**user_dict)hashed_password=fake_hash_password(form_data.password)ifnothashed_password==user.hashed_password:raiseHTTPException(status_code=400,detail="Incorrect username or password")return{"access_token":user.username,"token_type":"bearer"}@app.get("/users/me")asyncdefread_users_me(current_user:Annotated[User,Depends(get_current_active_user)],):returncurrent_user
🤓 Other versions and variants

Tip

Prefer to use theAnnotated version if possible.

fromfastapiimportDepends,FastAPI,HTTPException,statusfromfastapi.securityimportOAuth2PasswordBearer,OAuth2PasswordRequestFormfrompydanticimportBaseModelfake_users_db={"johndoe":{"username":"johndoe","full_name":"John Doe","email":"johndoe@example.com","hashed_password":"fakehashedsecret","disabled":False,},"alice":{"username":"alice","full_name":"Alice Wonderson","email":"alice@example.com","hashed_password":"fakehashedsecret2","disabled":True,},}app=FastAPI()deffake_hash_password(password:str):return"fakehashed"+passwordoauth2_scheme=OAuth2PasswordBearer(tokenUrl="token")classUser(BaseModel):username:stremail:str|None=Nonefull_name:str|None=Nonedisabled:bool|None=NoneclassUserInDB(User):hashed_password:strdefget_user(db,username:str):ifusernameindb:user_dict=db[username]returnUserInDB(**user_dict)deffake_decode_token(token):# This doesn't provide any security at all# Check the next versionuser=get_user(fake_users_db,token)returnuserasyncdefget_current_user(token:str=Depends(oauth2_scheme)):user=fake_decode_token(token)ifnotuser:raiseHTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Not authenticated",headers={"WWW-Authenticate":"Bearer"},)returnuserasyncdefget_current_active_user(current_user:User=Depends(get_current_user)):ifcurrent_user.disabled:raiseHTTPException(status_code=400,detail="Inactive user")returncurrent_user@app.post("/token")asyncdeflogin(form_data:OAuth2PasswordRequestForm=Depends()):user_dict=fake_users_db.get(form_data.username)ifnotuser_dict:raiseHTTPException(status_code=400,detail="Incorrect username or password")user=UserInDB(**user_dict)hashed_password=fake_hash_password(form_data.password)ifnothashed_password==user.hashed_password:raiseHTTPException(status_code=400,detail="Incorrect username or password")return{"access_token":user.username,"token_type":"bearer"}@app.get("/users/me")asyncdefread_users_me(current_user:User=Depends(get_current_active_user)):returncurrent_user

Tip

By the spec, you should return a JSON with anaccess_token and atoken_type, the same as in this example.

This is something that you have to do yourself in your code, and make sure you use those JSON keys.

It's almost the only thing that you have to remember to do correctly yourself, to be compliant with the specifications.

For the rest,FastAPI handles it for you.

Update the dependencies

Now we are going to update our dependencies.

We want to get thecurrent_useronly if this user is active.

So, we create an additional dependencyget_current_active_user that in turn usesget_current_user as a dependency.

Both of these dependencies will just return an HTTP error if the user doesn't exist, or if is inactive.

So, in our endpoint, we will only get a user if the user exists, was correctly authenticated, and is active:

fromtypingimportAnnotatedfromfastapiimportDepends,FastAPI,HTTPException,statusfromfastapi.securityimportOAuth2PasswordBearer,OAuth2PasswordRequestFormfrompydanticimportBaseModelfake_users_db={"johndoe":{"username":"johndoe","full_name":"John Doe","email":"johndoe@example.com","hashed_password":"fakehashedsecret","disabled":False,},"alice":{"username":"alice","full_name":"Alice Wonderson","email":"alice@example.com","hashed_password":"fakehashedsecret2","disabled":True,},}app=FastAPI()deffake_hash_password(password:str):return"fakehashed"+passwordoauth2_scheme=OAuth2PasswordBearer(tokenUrl="token")classUser(BaseModel):username:stremail:str|None=Nonefull_name:str|None=Nonedisabled:bool|None=NoneclassUserInDB(User):hashed_password:strdefget_user(db,username:str):ifusernameindb:user_dict=db[username]returnUserInDB(**user_dict)deffake_decode_token(token):# This doesn't provide any security at all# Check the next versionuser=get_user(fake_users_db,token)returnuserasyncdefget_current_user(token:Annotated[str,Depends(oauth2_scheme)]):user=fake_decode_token(token)ifnotuser:raiseHTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Not authenticated",headers={"WWW-Authenticate":"Bearer"},)returnuserasyncdefget_current_active_user(current_user:Annotated[User,Depends(get_current_user)],):ifcurrent_user.disabled:raiseHTTPException(status_code=400,detail="Inactive user")returncurrent_user@app.post("/token")asyncdeflogin(form_data:Annotated[OAuth2PasswordRequestForm,Depends()]):user_dict=fake_users_db.get(form_data.username)ifnotuser_dict:raiseHTTPException(status_code=400,detail="Incorrect username or password")user=UserInDB(**user_dict)hashed_password=fake_hash_password(form_data.password)ifnothashed_password==user.hashed_password:raiseHTTPException(status_code=400,detail="Incorrect username or password")return{"access_token":user.username,"token_type":"bearer"}@app.get("/users/me")asyncdefread_users_me(current_user:Annotated[User,Depends(get_current_active_user)],):returncurrent_user
🤓 Other versions and variants

Tip

Prefer to use theAnnotated version if possible.

fromfastapiimportDepends,FastAPI,HTTPException,statusfromfastapi.securityimportOAuth2PasswordBearer,OAuth2PasswordRequestFormfrompydanticimportBaseModelfake_users_db={"johndoe":{"username":"johndoe","full_name":"John Doe","email":"johndoe@example.com","hashed_password":"fakehashedsecret","disabled":False,},"alice":{"username":"alice","full_name":"Alice Wonderson","email":"alice@example.com","hashed_password":"fakehashedsecret2","disabled":True,},}app=FastAPI()deffake_hash_password(password:str):return"fakehashed"+passwordoauth2_scheme=OAuth2PasswordBearer(tokenUrl="token")classUser(BaseModel):username:stremail:str|None=Nonefull_name:str|None=Nonedisabled:bool|None=NoneclassUserInDB(User):hashed_password:strdefget_user(db,username:str):ifusernameindb:user_dict=db[username]returnUserInDB(**user_dict)deffake_decode_token(token):# This doesn't provide any security at all# Check the next versionuser=get_user(fake_users_db,token)returnuserasyncdefget_current_user(token:str=Depends(oauth2_scheme)):user=fake_decode_token(token)ifnotuser:raiseHTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Not authenticated",headers={"WWW-Authenticate":"Bearer"},)returnuserasyncdefget_current_active_user(current_user:User=Depends(get_current_user)):ifcurrent_user.disabled:raiseHTTPException(status_code=400,detail="Inactive user")returncurrent_user@app.post("/token")asyncdeflogin(form_data:OAuth2PasswordRequestForm=Depends()):user_dict=fake_users_db.get(form_data.username)ifnotuser_dict:raiseHTTPException(status_code=400,detail="Incorrect username or password")user=UserInDB(**user_dict)hashed_password=fake_hash_password(form_data.password)ifnothashed_password==user.hashed_password:raiseHTTPException(status_code=400,detail="Incorrect username or password")return{"access_token":user.username,"token_type":"bearer"}@app.get("/users/me")asyncdefread_users_me(current_user:User=Depends(get_current_active_user)):returncurrent_user

Info

The additional headerWWW-Authenticate with valueBearer we are returning here is also part of the spec.

Any HTTP (error) status code 401 "UNAUTHORIZED" is supposed to also return aWWW-Authenticate header.

In the case of bearer tokens (our case), the value of that header should beBearer.

You can actually skip that extra header and it would still work.

But it's provided here to be compliant with the specifications.

Also, there might be tools that expect and use it (now or in the future) and that might be useful for you or your users, now or in the future.

That's the benefit of standards...

See it in action

Open the interactive docs:http://127.0.0.1:8000/docs.

Authenticate

Click the "Authorize" button.

Use the credentials:

User:johndoe

Password:secret

After authenticating in the system, you will see it like:

Get your own user data

Now use the operationGET with the path/users/me.

You will get your user's data, like:

{"username":"johndoe","email":"johndoe@example.com","full_name":"John Doe","disabled":false,"hashed_password":"fakehashedsecret"}

If you click the lock icon and logout, and then try the same operation again, you will get an HTTP 401 error of:

{"detail":"Not authenticated"}

Inactive user

Now try with an inactive user, authenticate with:

User:alice

Password:secret2

And try to use the operationGET with the path/users/me.

You will get an "Inactive user" error, like:

{"detail":"Inactive user"}

Recap

You now have the tools to implement a complete security system based onusername andpassword for your API.

Using these tools, you can make the security system compatible with any database and with any user or data model.

The only detail missing is that it is not actually "secure" yet.

In the next chapter you'll see how to use a secure password hashing library andJWT tokens.


[8]ページ先頭

©2009-2026 Movatter.jp