FastAPI, 인증 API (Authentication APIs)
동영상 댓글을 보면 이런 글이 있습니다.
1. 새로 고침 토큰에는 만료 시간이 없습니다.
2. 새로 고침 토큰과 액세스 토큰을 모두 사용하여 보호된 엔드포인트에 액세스할 수 있습니다.
새로 고침 토큰은 새 액세스 토큰을 얻는 데에만 사용해야 합니다.
그리고 유튜버가 올린 답글은 다음과 같으니 참고하시기 바랍니다.
새로 고침 토큰에 TTL을 추가할 수 있습니다(액세스 토큰과 동일한 논리를 따름).
그리고 제공된 코드에서 사용자 세부 정보를 가져오는 데 Refresh_token을 사용할 수 있습니다. 페이로드가 동일하기 때문입니다. 요구 사항에 따라 토큰의 페이로드를 변경할 수 있다고 강의에서 언급한 바와 같습니다. 이 튜토리얼은 이러한 것들이 어떻게 작동하는지 설명하기 위한 것이니 참고하시기 바랍니다.
https://www.youtube.com/watch?v=z3nwf7wGdUw
깃허브 소스 : https://github.com/Describly/fastapi-auth-custom
API
/users | Create new user account |
/auth/token | Create JWT Token |
/auth/refresh-token | Refresh JWT Token |
/users/me | Get Authenticated User Detail |
[ requirements.txt ]
fastapi==0.103.1
uvicorn==0.23.2
python-dotenv==1.0.0
pydantic-settings==2.0.3
passlib[bcrypt]==1.7.4
python-jose[cryptography]==3.3.0
pymysql==1.1.0
SQLAlchemy==2.0.20
[ main.py ]
from fastapi import FastAPI
from fastapi.responses import JSONResponse
from users.routes import router as guest_router, user_router
from auth.route import router as auth_router
from core.security import JWTAuth
from starlette.middleware.authentication import AuthenticationMiddleware
app = FastAPI()
app.include_router(guest_router)
app.include_router(user_router)
app.include_router(auth_router)
# Add Middleware
app.add_middleware(AuthenticationMiddleware, backend=JWTAuth())
@app.get('/')
def health_check():
return JSONResponse(content={"status": "Running!"})
[ auth/responses.py ]
from pydantic import BaseModel
class TokenResponse(BaseModel):
access_token: str
refresh_token: str
token_type: str = 'Bearer'
expires_in: int
[ auth/route.py ]
from fastapi import APIRouter, status, Depends, Header
from fastapi.security import OAuth2PasswordRequestForm
from sqlalchemy.orm import Session
from core.database import get_db
from auth.services import get_token, get_refresh_token
router = APIRouter(
prefix="/auth",
tags=["Auth"],
responses={404: {"description": "Not found"}},
)
@router.post("/token", status_code=status.HTTP_200_OK)
async def authenticate_user(data: OAuth2PasswordRequestForm = Depends(), db: Session = Depends(get_db)):
return await get_token(data=data, db=db)
@router.post("/refresh", status_code=status.HTTP_200_OK)
async def refresh_access_token(refresh_token: str = Header(), db: Session = Depends(get_db)):
return await get_refresh_token(token=refresh_token, db=db)
[ auth/services.py ]
from users.models import UserModel
from fastapi.exceptions import HTTPException
from core.security import verify_password
from core.config import get_settings
from datetime import timedelta
from auth.responses import TokenResponse
from core.security import create_access_token, create_refresh_token, get_token_payload
settings = get_settings()
async def get_token(data, db):
user = db.query(UserModel).filter(UserModel.email == data.username).first()
if not user:
raise HTTPException(
status_code=400,
detail="Email is not registered with us.",
headers={"WWW-Authenticate": "Bearer"},
)
if not verify_password(data.password, user.password):
raise HTTPException(
status_code=400,
detail="Invalid Login Credentials.",
headers={"WWW-Authenticate": "Bearer"},
)
_verify_user_access(user=user)
return await _get_user_token(user=user)
async def get_refresh_token(token, db):
payload = get_token_payload(token=token)
user_id = payload.get('id', None)
if not user_id:
raise HTTPException(
status_code=401,
detail="Invalid refresh token.",
headers={"WWW-Authenticate": "Bearer"},
)
user = db.query(UserModel).filter(UserModel.id == user_id).first()
if not user:
raise HTTPException(
status_code=401,
detail="Invalid refresh token.",
headers={"WWW-Authenticate": "Bearer"},
)
return await _get_user_token(user=user, refresh_token=token)
def _verify_user_access(user: UserModel):
if not user.is_active:
raise HTTPException(
status_code=400,
detail="Your account is inactive. Please contact support.",
headers={"WWW-Authenticate": "Bearer"},
)
if not user.is_verified:
# Trigger user account verification email
raise HTTPException(
status_code=400,
detail="Your account is unverified. We have resend the account verification email.",
headers={"WWW-Authenticate": "Bearer"},
)
async def _get_user_token(user: UserModel, refresh_token = None):
payload = {"id": user.id}
access_token_expiry = timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = await create_access_token(payload, access_token_expiry)
if not refresh_token:
refresh_token = await create_refresh_token(payload)
return TokenResponse(
access_token=access_token,
refresh_token=refresh_token,
expires_in=access_token_expiry.seconds # in seconds
)
[ core\config.py ]
from users.models import UserModel
from fastapi.exceptions import HTTPException
from core.security import verify_password
from core.config import get_settings
from datetime import timedelta
from auth.responses import TokenResponse
from core.security import create_access_token, create_refresh_token, get_token_payload
settings = get_settings()
async def get_token(data, db):
user = db.query(UserModel).filter(UserModel.email == data.username).first()
if not user:
raise HTTPException(
status_code=400,
detail="Email is not registered with us.",
headers={"WWW-Authenticate": "Bearer"},
)
if not verify_password(data.password, user.password):
raise HTTPException(
status_code=400,
detail="Invalid Login Credentials.",
headers={"WWW-Authenticate": "Bearer"},
)
_verify_user_access(user=user)
return await _get_user_token(user=user)
async def get_refresh_token(token, db):
payload = get_token_payload(token=token)
user_id = payload.get('id', None)
if not user_id:
raise HTTPException(
status_code=401,
detail="Invalid refresh token.",
headers={"WWW-Authenticate": "Bearer"},
)
user = db.query(UserModel).filter(UserModel.id == user_id).first()
if not user:
raise HTTPException(
status_code=401,
detail="Invalid refresh token.",
headers={"WWW-Authenticate": "Bearer"},
)
return await _get_user_token(user=user, refresh_token=token)
def _verify_user_access(user: UserModel):
if not user.is_active:
raise HTTPException(
status_code=400,
detail="Your account is inactive. Please contact support.",
headers={"WWW-Authenticate": "Bearer"},
)
if not user.is_verified:
# Trigger user account verification email
raise HTTPException(
status_code=400,
detail="Your account is unverified. We have resend the account verification email.",
headers={"WWW-Authenticate": "Bearer"},
)
async def _get_user_token(user: UserModel, refresh_token = None):
payload = {"id": user.id}
access_token_expiry = timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = await create_access_token(payload, access_token_expiry)
if not refresh_token:
refresh_token = await create_refresh_token(payload)
return TokenResponse(
access_token=access_token,
refresh_token=refresh_token,
expires_in=access_token_expiry.seconds # in seconds
)
[ core\database ]
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker, declarative_base
from typing import Generator
from core.config import get_settings
settings = get_settings()
engine = create_engine(
settings.DATABASE_URL,
pool_pre_ping=True,
pool_recycle=300,
pool_size=5,
max_overflow=0
)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
Base = declarative_base()
def get_db() -> Generator:
db = SessionLocal()
try:
yield db
finally:
db.close()
[ core\security.py ]
from passlib.context import CryptContext
from fastapi.security import OAuth2PasswordBearer
from starlette.authentication import AuthCredentials, UnauthenticatedUser
from datetime import timedelta, datetime
from jose import jwt, JWTError
from core.config import get_settings
from fastapi import Depends
from core.database import get_db
from users.models import UserModel
settings = get_settings()
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/auth/token")
def get_password_hash(password):
return pwd_context.hash(password)
def verify_password(plain_password, hashed_password):
return pwd_context.verify(plain_password, hashed_password)
async def create_access_token(data, expiry: timedelta):
payload = data.copy()
expire_in = datetime.utcnow() + expiry
payload.update({"exp": expire_in})
return jwt.encode(payload, settings.JWT_SECRET, algorithm=settings.JWT_ALGORITHM)
async def create_refresh_token(data):
return jwt.encode(data, settings.JWT_SECRET, algorithm=settings.JWT_ALGORITHM)
def get_token_payload(token):
try:
payload = jwt.decode(token, settings.JWT_SECRET, algorithms=[settings.JWT_ALGORITHM])
except JWTError:
return None
return payload
def get_current_user(token: str = Depends(oauth2_scheme), db = None):
payload = get_token_payload(token)
if not payload or type(payload) is not dict:
return None
user_id = payload.get('id', None)
if not user_id:
return None
if not db:
db = next(get_db())
user = db.query(UserModel).filter(UserModel.id == user_id).first()
return user
class JWTAuth:
async def authenticate(self, conn):
guest = AuthCredentials(['unauthenticated']), UnauthenticatedUser()
if 'authorization' not in conn.headers:
return guest
token = conn.headers.get('authorization').split(' ')[1] # Bearer token_hash
if not token:
return guest
user = get_current_user(token=token)
if not user:
return guest
return AuthCredentials('authenticated'), user
[ users\models.py ]
from sqlalchemy import Boolean, Column, Integer, String, DateTime, func
from datetime import datetime
from core.database import Base
class UserModel(Base):
__tablename__ = "users"
id = Column(Integer, primary_key=True, index=True)
first_name = Column(String(100))
last_name = Column(String(100))
email = Column(String(255), unique=True, index=True)
password = Column(String(100))
is_active = Column(Boolean, default=False)
is_verified = Column(Boolean, default=False)
verified_at = Column(DateTime, nullable=True, default=None)
registered_at = Column(DateTime, nullable=True, default=None)
updated_at = Column(DateTime, nullable=True, default=None, onupdate=datetime.now)
created_at = Column(DateTime, nullable=False, server_default=func.now())
[ user\responses.py ]
from pydantic import BaseModel, EmailStr
from typing import Union
from datetime import datetime
class BaseResponse(BaseModel):
class Config:
from_attributes = True
arbitrary_types_allowed = True
class UserResponse(BaseModel):
id: int
first_name: str
last_name: str
email: EmailStr
registered_at: Union[None, datetime] = None
[ user\routes.py ]
from fastapi import APIRouter, status, Depends, Request
from fastapi.responses import JSONResponse
from sqlalchemy.orm import Session
from core.database import get_db
from users.schemas import CreateUserRequest
from users.services import create_user_account
from core.security import oauth2_scheme
from users.responses import UserResponse;
router = APIRouter(
prefix="/users",
tags=["Users"],
responses={404: {"description": "Not found"}},
)
user_router = APIRouter(
prefix="/users",
tags=["Users"],
responses={404: {"description": "Not found"}},
dependencies=[Depends(oauth2_scheme)]
)
@router.post('', status_code=status.HTTP_201_CREATED)
async def create_user(data: CreateUserRequest, db: Session = Depends(get_db)):
await create_user_account(data=data, db=db)
payload = {"message": "User account has been succesfully created."}
return JSONResponse(content=payload)
@user_router.post('/me', status_code=status.HTTP_200_OK, response_model=UserResponse)
def get_user_detail(request: Request):
return request.user
[ user\schemas.py ]
from pydantic import BaseModel, EmailStr
class CreateUserRequest(BaseModel):
first_name: str
last_name: str
email: EmailStr
password: str
[ user\services.py ]
from users.models import UserModel
from fastapi.exceptions import HTTPException
from core.security import get_password_hash
from datetime import datetime
async def create_user_account(data, db):
user = db.query(UserModel).filter(UserModel.email == data.email).first()
if user:
raise HTTPException(status_code=422, detail="Email is already registered with us.")
new_user = UserModel(
first_name=data.first_name,
last_name=data.last_name,
email=data.email,
password=get_password_hash(data.password),
is_active=False,
is_verified=False,
registered_at=datetime.now(),
updated_at=datetime.now()
)
db.add(new_user)
db.commit()
db.refresh(new_user)
return new_user
'FastAPI' 카테고리의 다른 글
파이썬 FastAPI + JAVA (Jpype) 예제 (1) | 2023.11.24 |
---|---|
Python Unit Testing | FastAPI with Pytest Tutorial (0) | 2023.11.14 |
How to Use FastAPI: A Detailed Python Tutorial (1) | 2023.10.23 |
FastAPI, NGINX 사용 시 Client IP 확인 (0) | 2023.10.18 |
FastAPI, 현재 실행 프로세스 및 환경 정보 예제 (0) | 2023.08.18 |