Authenticate a User
Authentication
auth.py modification to show it as a basic example to understand it.
# This file is to define authenticator and authorization for the Todo application.
# python-multipart is the package which will help to
have forms for OAuth and which will make password acceptance secure.
from venv import create
from fastapi import APIRouter, status, Depends, HTTPException
from UsersValidator import UserRequest
from models import Users
from database import SessionLocal
from sqlalchemy.orm import Session
from typing import Annotated
# OAuth2 and password hashing form which has it's own swagger documentation.
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
# Method to encrypt the password using bcrypt
from passlib.context import CryptContext
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
# creating db dependency
def get_db():
# We need to fetch this SessionLocal before each request made to database.
db = SessionLocal()
try:
# Only the code prio to and including the
# yield statement will execute before sending the response.
yield db
finally:
# It will be executed after sending the response.
# It makes fastAPI quicker as we can fetch information
# from the database and return it to client
# and then can close the db connection.
# It makes the connection extreamly safe.
# As it will open the database connect only at the time of usage.
db.close()
db_dependency = Annotated[Session, Depends(get_db)]
def authenticate_user(username: str, password: str, db):
user = db.query(Users).filter(Users.username == username).first()
if not user:
return False
if not pwd_context.verify(password, user.hashed_password):
return False
return True
router = APIRouter()
@router.get("/auth", status_code=status.HTTP_200_OK)
async def get_user():
return {'user': 'authenticated_user'}
@router.post("/auth/createUser", status_code=status.HTTP_201_CREATED)
async def create_user(db: db_dependency,
create_user_request: UserRequest):
# create_user_model = create_user_request.model_dump() but it
# will not work as table has hashed_password and input as plain password.
create_user_model = Users(
email=create_user_request.email,
username=create_user_request.username,
first_name=create_user_request.first_name,
last_name=create_user_request.last_name,
hashed_password=pwd_context.hash(create_user_request.password),
role=create_user_request.role,
is_active=True # Default value for new users
)
db.add(create_user_model)
db.commit()
@router.post("/auth/login", status_code=status.HTTP_200_OK)
async def login_user(form_data: Annotated[OAuth2PasswordRequestForm, Depends()],
db: db_dependency):
if not authenticate_user(form_data.username, form_data.password, db):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password",
headers={"WWW-Authenticate": "Bearer"},
# This header is used for OAuth2
)
return 'Authenticated successfully'
JWT concept and usage:
JSON Web Token -
- Self-contained way to securely transmit data and information between two parties using a JSON Object.
- JWT can be trusted because each JWT can be digitally signed, which in return allow the server to know if the JWT has been changed at all.
- JWT should be used when dealing with authorization. Helps to maintain client and server relationship which will allow to exchange information without making login with each request.
- Great way to exchange information between server and client.
- Client will use web token to communicate with server for information.
JSON WEB TOKEN Structure:
- A JSON Web Token is created of three separate parts separated by dots(.) which include:
- Header: (a)
- Payload: (b)
- Signature:(c)
JWT Header:
- Header usually consist of two parts:
- (alg) The algorithm for signing
- (typ) The specific type of token
- Header is then encoded using Base64 to create the first part of the JWT(aaaaaaa part of image)
JWT Payload:
- Payload consists of the data. The Payloads data contains claims, and there are three different types of claims.
- Registered - Claims which are pre-defined, recommended but not mandatory.
- Top three registered claims are
- ISS - Issuer
- Person or principle that issue the JWT
- sub - subject
- Statements about the subject
- Values of subject must be scope either globally or locally.
- They are like id for web token.
- exp - expression time
- It defines expiry of a JWT.
- It ensures that current date and time must be less than the expiry date and time.
- It is not mandatory but are most useful things.
- It helps to know that user is authorized for how much time on server for communication.
- example to have a expiry rule like if a user make no activity for next 1 hour then expire the JWT.
- Public -
- Private -
- JWT Payload is then encoded using Base64 to create the second part of the JWT(bbbbbbbb part of image).
JWT Signature:
- Created by using the algorithm in the header to hash out the encoded header, encoded payload with a secret.
- The Secret can be anything, but is saved somewhere on the server that the client does not have access to.
- It is the third and final part of JWT(cccccc par to image).
Example of JWT:
- JWT Header:
{
"alg" : "HS256",
"typ" : "JWT"
}
- JWT Payload
{
"sub" : "123456789",
"name" : "Puneet Kumar Singh",
"given_name" : "Puneet",
"family_name" : "Singh",
"email" : "puneet@gamil.com",
"admin" : true
}
- JWT Signature
HMACSHA256(
base64UrlEncode(header) + "." +
base64UrlEncode(payload),
learnonline)
All this above information is encoded and a unique link is created for client to use information.
JSON Web Token - adjhfkjahflakshdlkfajhsjlkd.haksdjhfkhiuerwahjdkygiueb.akjhuekfjadhouihfsadkj987325akdjhfgj
It is always unique for a client based on their authentication.
User usually send JWT with Authorization header in pair schema. It is the basic schema to authenticate the user and to authenticate him/her.
Due to security concerns never send sensitive data back to client. However, JWT is safe until or unless client do not found the secret key. As secret key will release entire control to client for data.
Place to learn it online with visualization is:
https://jwt.io
As if any character of random string is compromised asap server will deauthorize the authority of a client. As he/she tried to temper the data.
Example of use case:
Generating JWT:
Task is to return a JWT if authentication is successful.
install python-jose[cryptography] as package for JWT token creation.
Use command openssl rand -hex 32 to create a 32 characters unique string as secret key and openssl rand -hex 64 for 64 bit unique string as secret key.
TokenValidator.py file to validate the type of output.
# This file is for Token creation
from pydantic import BaseModel
class Token(BaseModel):
access_token: str
token_type: str
auth.py file updated to encode jwt:
# This file is to define authenticator and authorization for the Todo application.
# python-multipart is the package which will
help to have forms for OAuth and which will make password acceptance secure.
from venv import create
from fastapi import APIRouter, status, Depends, HTTPException
from UsersValidator import UserRequest
from models import Users
from database import SessionLocal
from sqlalchemy.orm import Session
from typing import Annotated
from datetime import timedelta, datetime, timezone
# OAuth2 and password hashing form which has it's own swagger documentation.
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
# Method to encrypt the password using bcrypt
from passlib.context import CryptContext
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
from jose import JWTError, jwt
from TokenValidator import Token
SECRET_KEY = "c5829be4cda3301fd160e251fbd87d607c7056fbcedc258bf26edb1d02919b50"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
# creating db dependency
def get_db():
# We need to fetch this SessionLocal before each request made to database.
db = SessionLocal()
try:
# Only the code prio to and including the
# yield statement will execute before sending the response.
yield db
finally:
# It will be executed after sending the response.
# It makes fastAPI quicker as we can fetch information
# from the database and return it to client
# and then can close the db connection.
# It makes the connection extreamly safe.
# As it will open the database connect only at the time of usage.
db.close()
db_dependency = Annotated[Session, Depends(get_db)]
def authenticate_user(username: str, password: str, db):
user = db.query(Users).filter(Users.username == username).first()
if not user:
return False
if not pwd_context.verify(password, user.hashed_password):
return False
return user
def create_access_token(username: str, user_id: int, expires_delta: timedelta):
encode = {
"sub": username,
"id": user_id
}
expires = datetime.now(timezone.utc) + expires_delta
encode.update({"exp": expires})
return jwt.encode(encode, SECRET_KEY, algorithm=ALGORITHM)
router = APIRouter()
@router.get("/auth", status_code=status.HTTP_200_OK)
async def get_user():
return {'user': 'authenticated_user'}
@router.post("/auth/createUser", status_code=status.HTTP_201_CREATED)
async def create_user(db: db_dependency,
create_user_request: UserRequest):
# create_user_model = create_user_request.model_dump() but it
# will not work as table has hashed_password and input as plain password.
create_user_model = Users(
email=create_user_request.email,
username=create_user_request.username,
first_name=create_user_request.first_name,
last_name=create_user_request.last_name,
hashed_password=pwd_context.hash(create_user_request.password),
role=create_user_request.role,
is_active=True # Default value for new users
)
db.add(create_user_model)
db.commit()
@router.post("/token", response_model=Token,status_code=status.HTTP_200_OK)
async def login_for_access_token(form_data: Annotated[OAuth2PasswordRequestForm,
Depends()],
db: db_dependency):
user = authenticate_user(form_data.username, form_data.password, db)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password",
headers={"WWW-Authenticate": "Bearer"}, # This header is used for OAuth2
)
# print(f"User {user.username} with user_id {user.id}
authenticated successfully.")
token = create_access_token(user.username, user.id, timedelta (minutes=20))
return Token(access_token=token, token_type='bearer')
Decoding the JWT: OAuth2PasswordBearer help to authenticate the token
# This file is to define authenticator and authorization for the Todo application.
# python-multipart is the package which will help to have forms for OAuth and which will make password acceptance secure.
from venv import create
from fastapi import APIRouter, status, Depends, HTTPException
from UsersValidator import UserRequest
from models import Users
from database import SessionLocal
from sqlalchemy.orm import Session
from typing import Annotated
from datetime import timedelta, datetime, timezone
# OAuth2 and password hashing form which has it's own swagger documentation.
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
# Method to encrypt the password using bcrypt
from passlib.context import CryptContext
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
oauth2_bearer = OAuth2PasswordBearer(tokenUrl="auth/token")
from jose import JWTError, jwt
from TokenValidator import Token
SECRET_KEY = "c5829be4cda3301fd160e251fbd87d607c7056fbcedc258bf26edb1d02919b50"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
# creating db dependency
def get_db():
# We need to fetch this SessionLocal before each request made to database.
db = SessionLocal()
try:
# Only the code prio to and including the
# yield statement will execute before sending the response.
yield db
finally:
# It will be executed after sending the response.
# It makes fastAPI quicker as we can fetch information
# from the database and return it to client
# and then can close the db connection.
# It makes the connection extreamly safe.
# As it will open the database connect only at the time of usage.
db.close()
db_dependency = Annotated[Session, Depends(get_db)]
def authenticate_user(username: str, password: str, db):
user = db.query(Users).filter(Users.username == username).first()
if not user:
return False
if not pwd_context.verify(password, user.hashed_password):
return False
return user
def create_access_token(username: str, user_id: int, expires_delta: timedelta):
encode = {
"sub": username,
"id": user_id
}
expires = datetime.now(timezone.utc) + expires_delta
encode.update({"exp": expires})
return jwt.encode(encode, SECRET_KEY, algorithm=ALGORITHM)
async def get_current_user(token: Annotated[str, Depends(oauth2_bearer)]):
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username: str = payload.get("sub")
user_id: int = payload.get("id")
if username is None or user_id is None:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid authentication credentials",
headers={"WWW-Authenticate": "Bearer"},
)
return {"username": username, "id": user_id}
except JWTError:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid authentication credentials",
headers={"WWW-Authenticate": "Bearer"},
)
router = APIRouter(
prefix="/auth",
tags=["auth"]
)
@router.get("/", status_code=status.HTTP_200_OK)
async def get_user():
return {'user': 'authenticated_user'}
@router.post("/createUser", status_code=status.HTTP_201_CREATED)
async def create_user(db: db_dependency,
create_user_request: UserRequest):
# create_user_model = create_user_request.model_dump() but it
# will not work as table has hashed_password and input as plain password.
create_user_model = Users(
email=create_user_request.email,
username=create_user_request.username,
first_name=create_user_request.first_name,
last_name=create_user_request.last_name,
hashed_password=pwd_context.hash(create_user_request.password),
role=create_user_request.role,
is_active=True # Default value for new users
)
db.add(create_user_model)
db.commit()
@router.post("/token", response_model=Token,status_code=status.HTTP_200_OK)
async def login_for_access_token(form_data: Annotated[OAuth2PasswordRequestForm, Depends()],
db: db_dependency):
user = authenticate_user(form_data.username, form_data.password, db)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password",
headers={"WWW-Authenticate": "Bearer"}, # This header is used for OAuth2
)
# print(f"User {user.username} with user_id {user.id} authenticated successfully.")
token = create_access_token(user.username, user.id, timedelta (minutes=20))
return Token(access_token=token, token_type='bearer')
Modifying all Todos functions to use Authentication module:
# This file will contain todos router details for main file to execute on call.
from fastapi import APIRouter, Depends, HTTPException, Path
from models import Todos
from database import SessionLocal
from typing import Annotated
from sqlalchemy.orm import Session
from starlette import status
from TodoValidator import TodoRequest
from .auth import get_current_user
router = APIRouter()
# creating db dependency
def get_db():
# We need to fetch this SessionLocal before each request made to database.
db = SessionLocal()
try:
# Only the code prio to and including the
# yield statement will execute before sending the response.
yield db
finally:
# It will be executed after sending the response.
# It makes fastAPI quicker as we can fetch information
# from the database and return it to client
# and then can close the db connection.
# It makes the connection extreamly safe.
# As it will open the database connect only at the time of usage.
db.close()
db_dependency = Annotated[Session, Depends(get_db)]
user_dependency = Annotated[dict, Depends(get_current_user)]
# Creating asynchronous API Endpoint for database interaction.
@router.get("/", status_code=status.HTTP_200_OK)
#async def read_all(db: Annotated[Session, Depends(get_db)]):
async def read_all(user: user_dependency, db: db_dependency):
if user is None:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="User not authenticated")
return db.query(Todos).filter(Todos.owner_id == user.get("id")).all()
@router.get("/todo/{todo_id}", status_code=status.HTTP_200_OK)
async def read_todo_by_id(user: user_dependency, db: db_dependency, todo_id: int = Path(gt=0)):
if user is None:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="User not authenticated")
todo_model= db.query(Todos).filter(Todos.id == todo_id).filter(Todos.owner_id == user.get("id")).first()
if todo_model is not None:
return todo_model
raise HTTPException(status_code=404, detail="id not found in table Todo")
@router.post("/todo",status_code=status.HTTP_201_CREATED)
async def create_todo(user: user_dependency,
db: db_dependency,
todo_request: TodoRequest):
if user is None:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="User not authenticated")
todo_model = Todos(**todo_request.model_dump(), owner_id=user.get("id"))
db.add(todo_model)
db.commit()
@router.put("/todo/{todo_id}", status_code=status.HTTP_204_NO_CONTENT)
async def update_todo(user: user_dependency,
db: db_dependency,
todo_request: TodoRequest,
todo_id: int = Path(gt=0)
):
if user is None:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="User not authenticated")
todo_model = db.query(Todos).filter(Todos.id == todo_id).filter(Todos.owner_id == user.get("id")).first()
if todo_model is None:
raise HTTPException(status_code=404, detail='Id not found in Todo table')
# todo_model must be the same which we are creating above in this method
# As sqlalchemy will use it to update the table.
# As if we are using the same model then it will think it as a new record
# will try to insert it with new id as autoincremented one.
# or might create an identical record with same id.
todo_model.title = todo_request.title
todo_model.description = todo_request.description
todo_model.priority = todo_request.priority
todo_model.complete = todo_request.complete
# As a result we are modifying the todo_model value before adding to todo table.
db.add(todo_model)
db.commit()
@router.delete("/todo/{todo_id}", status_code=status.HTTP_204_NO_CONTENT)
async def delete_todo(user: user_dependency, db: db_dependency, todo_id: int = Path(gt=0)):
if user is None:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="User not authenticated")
todo_model = db.query(Todos).filter(Todos.id == todo_id).filter(Todos.owner_id == user.get("id")).first()
if todo_model is None:
raise HTTPException(status_code=404, detail="Id not found in Todo table")
db.query(Todos).filter(Todos.id == todo_id).filter(Todos.owner_id == user.get("id")).delete()
db.commit()
Admin route:
admin.py
# code for admin.py in TodoApp/routers/admin.py
from fastapi import APIRouter, Depends, HTTPException, Path
from models import Todos
from database import SessionLocal
from typing import Annotated
from sqlalchemy.orm import Session
from starlette import status
from TodoValidator import TodoRequest
from .auth import get_current_user
router = APIRouter(
prefix="/admin",
tags=["admin"]
)
# creating db dependency
def get_db():
# We need to fetch this SessionLocal before each request made to database.
db = SessionLocal()
try:
# Only the code prio to and including the
# yield statement will execute before sending the response.
yield db
finally:
# It will be executed after sending the response.
# It makes fastAPI quicker as we can fetch information
# from the database and return it to client
# and then can close the db connection.
# It makes the connection extreamly safe.
# As it will open the database connect only at the time of usage.
db.close()
db_dependency = Annotated[Session, Depends(get_db)]
user_dependency = Annotated[dict, Depends(get_current_user)]
@router.get("/todos", status_code=status.HTTP_200_OK)
async def read_all_todos(user: user_dependency, db: db_dependency):
if user is None or user.get("role") != "admin":
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="User not authenticated or not an admin")
return db.query(Todos).all()
@router.delete("/todo/{todo_id}", status_code=status.HTTP_204_NO_CONTENT)
async def delete_todo(user: user_dependency, db: db_dependency, todo_id: int = Path(gt=0)):
if user is None or user.get("role") != "admin":
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="User not authenticated or not an admin")
todo_model = db.query(Todos).filter(Todos.id == todo_id).first()
if todo_model is None:
raise HTTPException(status_code=404, detail="Todo not found")
db.query(Todos).filter(Todos.id == todo_id).delete()
db.commit()
main.py
# This file it to perform all the activity which we required for Todo application.
# Depends means dependency injection
from fastapi import FastAPI
import models
from database import engine
from routers import auth, todos, admin
app = FastAPI()
# it will execute only when todos.db is not existing.
# side effect if you enhance the table in model file then it will not update
# it in database automatically.
models.Base.metadata.create_all(bind=engine)
app.include_router(auth.router)
app.include_router(todos.router)
app.include_router(admin.router)
Comments
Post a Comment