반응형

 <참조> https://fastapi.tiangolo.com/tutorial/sql-databases/

 

FastAPI, 데이터베이스 연동

정리. 수알치 오상문

 

이 내용을 따라하려면 Python 3.6 이상이어야 하고 3.7 이상 권장한다.

SQL (관계형) 데이터베이스

FastAPI에서 Python 기반의 데이터베이스 연동이 모두 가능하다. 여기서는 SQLAlchemy 기반을 사용하며 다음과 같은 데이터베이스 연동이 가능하다.

  • PostgreSQL
  • MySQL
  • SQLite
  • Oracle
  • Microsoft SQL Server 등

기본 예제는 파이썬에서 제공하는 SQLite 데이터베이스를 다루지만 PostgreSQL 참고 사항도 다룬다.

 

프로젝트 디렉터리 / 파일 구조

예제에서 다음과 같은 파일 구조로 예제를 진행한다. 참고로 예제 프로젝트 기본 디렉터리는 my_project이며 다른 이름이어도 상관없지만 아래 하위 구조는 같아야 한다.

.
└── sql_app
    ├── __init__.py
    ├── crud.py
    ├── database.py
    ├── main.py
    ├── models.py
    └── schemas.py

__init__.py 파일은 빈 파일이다. sql_app 디렉터리가 파이썬 파일 패키지임을 나타내는 파일이다. 

 

SQLAlchemy 부분 생성 (database.py)

- SQLAlchemy 임포트  및 데이터베이스 엔진 초기화

- SessionLocal 클래스 생성

 

SessionLocal 클래스의 각 인스턴스는 데이터베이스 세션이 된다(DB 연결 통로가 세션이다). SessionLocal 클래스의 인스턴스를 생성하면 이 인스턴스가 실제 데이터베이스와 연결되는 세션이 된다. SQLAlchemy에서 가져오는 세션과 구별하기 위해 SessionLocal이라는 이름을 지정한다. 나중에 Session(SQLAlchemy에서 가져온 것)을 사용할 것이다. SessionLocal 클래스 인스턴스를 생성할 때  sessionmaker() 함수를 사용한다.

- Base 클래스 생성 

from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
SQLALCHEMY_DATABASE_URL = "sqlite:///./sql_app.db"
# SQLALCHEMY_DATABASE_URL = "postgresql://user:password@postgresserver/db"
#                                         ID:PASSWORD@HOST/DATABASENAME  
engine = create_engine(
    SQLALCHEMY_DATABASE_URL, connect_args={"check_same_thread": False}
)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
Base = declarative_base()

[ 참고]  SQLite에만 적용하는 부분

connect_args={"check_same_thread": False}

FastAPI에서 일반 함수(def)를 사용하면 둘 이상 스레드가 동일한 요청에 대해 데이터베이스와 상호 작용할 수 있으므로 SQLite에서 connect_args={"check_same_thread": False}를 사용하여 이를 허용해야 한다.

 

데이터베이스 모델 생성  (models.py)

- Base 클래스로 SQLAlchemy 모델 생성

- 모델 속성/컬럼 생성

- 관계(relationship) 생성 

 

SQLAlchemy ORM에서 제공하는 관계를 사용한다. 관련된 다른 테이블의 값을 포함하는 속성을 이용한다. my_user.items 같이 사용자의 속성 항목에 접근할 때 사용자 테이블의 이 레코드를 가리키는 외래 키가 있는 항목 SQLAlchemy 모델 목록(항목 테이블에서)이 있다. my_user.items에 접근하면 SQLAlchemy는 실제로 항목 테이블의 데이터베이스에서 항목을 가져와 여기에 채운다. 그리고 항목 속성 소유자에 접근할 때 사용자 테이블의 사용자 SQLAlchemy 모델이 포함된다. 사용자 테이블에서 가져올 레코드를 알기 위해 외부 키와 함께 owner_id 속성/컬럼을 사용한다.

from sqlalchemy import Boolean, Column, ForeignKey, Integer, String
from sqlalchemy.orm import relationship
from .database import Base
class User(Base):
    __tablename__ = "users"
    id = Column(Integer, primary_key=True, index=True)
    email = Column(String, unique=True, index=True)
    hashed_password = Column(String)
    is_active = Column(Boolean, default=True)
    items = relationship("Item", back_populates="owner")
class Item(Base):
    __tablename__ = "items"
    id = Column(Integer, primary_key=True, index=True)
    title = Column(String, index=True)
    description = Column(String, index=True)
    owner_id = Column(Integer, ForeignKey("users.id"))
    owner = relationship("User", back_populates="items")

__tablename__ 속성은 SQLAlchemy에게 이러한 각 모델의 데이터베이스에서 사용할 테이블 이름을 알려준다.

 

Pydantic 모델 생성 (schemas.py)

- 초기 Pydantic 모델 / 스키마 생성 

from typing import Union
from pydantic import BaseModel
class ItemBase(BaseModel):
    title: str
    description: Union[str, None] = None
class ItemCreate(ItemBase):
    pass
class Item(ItemBase):
    id: int
    owner_id: int
    class Config:
        orm_mode = True
class UserBase(BaseModel):
    email: str
class UserCreate(UserBase):
    password: str
class User(UserBase):
    id: int
    is_active: bool
    items: list[Item] = []
    class Config:
        orm_mode = True

참고로 SQLAlchemy 모델은 속성 정의에 =를 사용하고 Column()을 이용하여 속성을 지정한다.

name = Column(String)

Pydantic 모델에서는 : 을 사용하고 자료형으로 속성을 지정한다.

name: str

Pydantic의 orm_mode는 dict가 아니라 ORM 모델(또는 속성이 있는 다른 임의의 객체)이더라도 데이터를 읽도록 Pydantic 모델에 지시한다. 그래서 다음과 같이 사용할 수 있다.

id = data["id"]

이렇게 표현할 수도 있다.

id = data.id

SQLAlchemy는 기본적으로 "지연 로딩(lazy loading)"이다. 예를 들어 해당 데이터를 포함할 속성에 접근하려고 시도하지 않는 한 데이터베이스에서 관계에 대한 데이터를 즉시 가져오지는 않는다. 예를 들어 속성 ​​항목에 접근하는 경우 특정한 실제 작업이 이루어지지 않으면 데이터베이스에 접근한지 않는다.

current_user.items

 

생성/읽기/갱신/삭제 작업 CRUD (crud.py)

CRUD =  Create, Read, Update, Delete.

 

데이터 가져오기 Read

  • ID / 이메일을 이용하여 사용자 한명 정보 가져오기
  • 여러 사용자 정보 가져오기
  • 여러 아이템 정보 가져오기

 

데이터 생성 Create

  • 데이터로 SQLAlchemy 모델 인스턴스 생성
  • 인스턴스 개체를 데이터베이스 세션에 추가
  • 변경 사항을 데이터베이스에 커밋(실제로 DB에 저장됨).
  • 인스턴스를 새로 고침(앞에서 생성된 ID 같은 데이터베이스 새 데이터 정보도 포함되도록)
from sqlalchemy.orm import Session
from . import models, schemas
def get_user(db: Session, user_id: int):
    return db.query(models.User).filter(models.User.id == user_id).first()
def get_user_by_email(db: Session, email: str):
    return db.query(models.User).filter(models.User.email == email).first()
def get_users(db: Session, skip: int = 0, limit: int = 100):
    return db.query(models.User).offset(skip).limit(limit).all()
def create_user(db: Session, user: schemas.UserCreate):
    fake_hashed_password = user.password + "notreallyhashed"
    db_user = models.User(email=user.email, hashed_password=fake_hashed_password)
    db.add(db_user)
    db.commit()
    db.refresh(db_user)
    return db_user
def get_items(db: Session, skip: int = 0, limit: int = 100):
    return db.query(models.Item).offset(skip).limit(limit).all()
def create_user_item(db: Session, item: schemas.ItemCreate, user_id: int):
    db_item = models.Item(**item.dict(), owner_id=user_id)
    db.add(db_item)
    db.commit()
    db.refresh(db_item)
    return db_item

[주의] 예제에서 비밀번호는 해시(암호화) 처리가 안되는데 이것은 보안에 취약한 방식이다.

[팁] 각 키워드 인수를 Item에 전달하고 Pydantic 모델에서 각 인수를 읽는 대신,

다음처럼 Pydantic 모델 데이터로 사전을 생성한다. 

item.dict()

dict의 키-값 쌍을 다음을 사용하여 SQLAlchemy 항목에 대한 키워드 인수로 전달한다.

Item(**item.dict())

Pydantic 모델에서 제공하지 않는 추가 인수 owner_id를 다음과 함께 전달한다.

Item(**item.dict(), owner_id=user_id)

 

FastAPI main (main.py), Python 3.6 이상

- 데이터베이스 테이블 생성

- 의존성 생성

- 경로 생성

 

sql_app/database.py 파일에서 만든 SessionLocal 클래스를 사용하여 종속성을 만든다. 요청마다 독립적인 데이터베이스 세션/연결(SessionLocal)이 필요하고 모든 요청을 통해 동일한 세션을 사용하고 요청이 완료된 후 닫는다. 그 후에 다음 요청을 위해 새 세션이 생성된다. 이를 위해 이전에 yield가 있는 종속성에 대한 섹션에서 설명한 대로 yield로 새 종속성을 생성한다. 우리 의존성은 단일 요청에서 사용될 새로운 SQLAlchemy SessionLocal을 생성하고 요청이 완료되면 닫는다.

 

[참고] 데이터베이스와  Alembic

 Alembic은 데이터베이스 초기화(테이블 생성 등)나 "마이그레이션"(주요 작업)에 사용할 수 있다. "마이그레이션"은 SQLAlchemy 모델 구조를 변경하거나, 새 속성을 추가하는 등 데이터베이스 변경 사항을 복제하고, 새 열, 새 테이블 등을 추가할 때마다 필요한 일련의 작업 단계이다.

 

[참고] SessionLocal() 생성과 요청 처리를 try 블록에 넣는다. 그런 다음에 finally 블록에서 닫는다.

이렇게 하면 요청 후에 데이터베이스 세션이 항상 닫힌다. 요청을 처리하는 동안 예외가 발생한 경우에도 마찬가지이다. 그러나 종료 코드(yield 이후)에서 다른 예외를 발생시킬 수 없다. 참고 Dependencies with yield and HTTPException
그런 다음 경로 연산 함수에서 종속성을 사용할 때 SQLAlchemy에서 직접 가져온 Session 유형으로 선언한다. 그러면 편집기가 db 매개변수가 Session 유형임을 알기 때문에 경로 작업 함수 내에서 더 나은 편집 지원을 제공한다.

 

[참고] 

db 매개변수는 실제로 SessionLocal 유형이지만 이 클래스(sessionmaker()로 생성)는 SQLAlchemy 세션의 "proxy(프록시)"이므로 편집기는 실제로 제공되는 메소드를 알지 못한다. 그러나 유형을 Session으로 선언함으로써 편집기는 이제 사용 가능한 메소드(.add(), .query(), .commit() 등)를 알 수 있고 더 나은 지원(예: 완료)을 제공할 수 있다. 유형 선언은 실제 개체에 영향을 주지 않는다.

from fastapi import Depends, FastAPI, HTTPException
from sqlalchemy.orm import Session
from . import crud, models, schemas
from .database import SessionLocal, engine
models.Base.metadata.create_all(bind=engine)
app = FastAPI()
# Dependency
def get_db():
    db = SessionLocal()
    try:
        yield db
    finally:
        db.close()
@app.post("/users/", response_model=schemas.User)
def create_user(user: schemas.UserCreate, db: Session = Depends(get_db)):
    db_user = crud.get_user_by_email(db, email=user.email)
    if db_user:
        raise HTTPException(status_code=400, detail="Email already registered")
    return crud.create_user(db=db, user=user)
@app.get("/users/", response_model=list[schemas.User])
def read_users(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
    users = crud.get_users(db, skip=skip, limit=limit)
    return users
@app.get("/users/{user_id}", response_model=schemas.User)
def read_user(user_id: int, db: Session = Depends(get_db)):
    db_user = crud.get_user(db, user_id=user_id)
    if db_user is None:
        raise HTTPException(status_code=404, detail="User not found")
    return db_user
@app.post("/users/{user_id}/items/", response_model=schemas.Item)
def create_item_for_user(
    user_id: int, item: schemas.ItemCreate, db: Session = Depends(get_db)
):
    return crud.create_user_item(db=db, item=item, user_id=user_id)
@app.get("/items/", response_model=list[schemas.Item])
def read_items(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
    items = crud.get_items(db, skip=skip, limit=limit)
    return items

yield 를 사용하여 종속성에서 각 요청 전에 데이터베이스 세션을 만든 다음 나중에 닫는다. 그런 다음에 경로 작업 함수에서 필요한 종속성을 생성하여 해당 세션을 직접 가져올 수 있다. 이를 통해 경로 연산 함수 내부에서 직접 crud.get_user를 호출하고 해당 세션을 사용할 수 있다.

 

def 함수와 async def 함수 

경로 연산 함수 내부와 종속성에서 SQLAlchemy 코드를 사용하고 있으며 차례로 외부 데이터베이스와 통신한다. 잠재적으로 약간의 "대기"가 필요할 수 있다. 그러나 SQLAlchemy는 다음과 같이 await를 직접 사용할 수 있는 호환성이 없다.

user = await db.query(User).first()

대신에 이렇게 사용한다.

user = db.query(User).first()

그런 다음 다음과 같이 일반 def와 함께 async def 없이 경로 작업 함수와 종속성을 선언해야 한다.

@app.get("/users/{user_id}", response_model=schemas.User)
def read_user(user_id: int, db: Session = Depends(get_db)):
    db_user = crud.get_user(db, user_id=user_id)
    ...
 

전체 파일 소스 코드 보기

프로젝트 디렉토리에 sql_app 하위 디렉토리가 있다는 것을 기억하자. sql_app에는 다음 파일이 있어야 한다.

  • sql_app/__init__.py : 그냥 빈 파일임
  • sql_app/database.py: 아래 파일 

[sql_app/database.py]

from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
SQLALCHEMY_DATABASE_URL = "sqlite:///./sql_app.db"
# SQLALCHEMY_DATABASE_URL = "postgresql://user:password@postgresserver/db"
engine = create_engine(
    SQLALCHEMY_DATABASE_URL, connect_args={"check_same_thread": False}
)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
Base = declarative_base()

 

[sql_app/models.py]

from sqlalchemy import Boolean, Column, ForeignKey, Integer, String
from sqlalchemy.orm import relationship
from .database import Base
class User(Base):
    __tablename__ = "users"
    id = Column(Integer, primary_key=True, index=True)
    email = Column(String, unique=True, index=True)
    hashed_password = Column(String)
    is_active = Column(Boolean, default=True)
    items = relationship("Item", back_populates="owner")
class Item(Base):
    __tablename__ = "items"
    id = Column(Integer, primary_key=True, index=True)
    title = Column(String, index=True)
    description = Column(String, index=True)
    owner_id = Column(Integer, ForeignKey("users.id"))
    owner = relationship("User", back_populates="items")

 

[sql_app/schemas.py]

from typing import Union
from pydantic import BaseModel
class ItemBase(BaseModel):
    title: str
    description: Union[str, None] = None
class ItemCreate(ItemBase):
    pass
class Item(ItemBase):
    id: int
    owner_id: int
    class Config:
        orm_mode = True
class UserBase(BaseModel):
    email: str
class UserCreate(UserBase):
    password: str
class User(UserBase):
    id: int
    is_active: bool
    items: list[Item] = []
    class Config:
        orm_mode = True

 

[sql_app/crud.py]

from sqlalchemy.orm import Session
from . import models, schemas
def get_user(db: Session, user_id: int):
    return db.query(models.User).filter(models.User.id == user_id).first()
def get_user_by_email(db: Session, email: str):
    return db.query(models.User).filter(models.User.email == email).first()
def get_users(db: Session, skip: int = 0, limit: int = 100):
    return db.query(models.User).offset(skip).limit(limit).all()
def create_user(db: Session, user: schemas.UserCreate):
    fake_hashed_password = user.password + "notreallyhashed"
    db_user = models.User(email=user.email, hashed_password=fake_hashed_password)
    db.add(db_user)
    db.commit()
    db.refresh(db_user)
    return db_user
def get_items(db: Session, skip: int = 0, limit: int = 100):
    return db.query(models.Item).offset(skip).limit(limit).all()
def create_user_item(db: Session, item: schemas.ItemCreate, user_id: int):
    db_item = models.Item(**item.dict(), owner_id=user_id)
    db.add(db_item)
    db.commit()
    db.refresh(db_item)
    return db_item

 

[sql_app/main.py]

 
from fastapi import Depends, FastAPI, HTTPException
from sqlalchemy.orm import Session
from . import crud, models, schemas
from .database import SessionLocal, engine
models.Base.metadata.create_all(bind=engine)
app = FastAPI()
# Dependency
def get_db():
    db = SessionLocal()
    try:
        yield db
    finally:
        db.close()
@app.post("/users/", response_model=schemas.User)
def create_user(user: schemas.UserCreate, db: Session = Depends(get_db)):
    db_user = crud.get_user_by_email(db, email=user.email)
    if db_user:
        raise HTTPException(status_code=400, detail="Email already registered")
    return crud.create_user(db=db, user=user)
@app.get("/users/", response_model=list[schemas.User])
def read_users(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
    users = crud.get_users(db, skip=skip, limit=limit)
    return users
@app.get("/users/{user_id}", response_model=schemas.User)
def read_user(user_id: int, db: Session = Depends(get_db)):
    db_user = crud.get_user(db, user_id=user_id)
    if db_user is None:
        raise HTTPException(status_code=404, detail="User not found")
    return db_user
@app.post("/users/{user_id}/items/", response_model=schemas.Item)
def create_item_for_user(
    user_id: int, item: schemas.ItemCreate, db: Session = Depends(get_db)
):
    return crud.create_user_item(db=db, item=item, user_id=user_id)
@app.get("/items/", response_model=list[schemas.Item])
def read_items(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
    items = crud.get_items(db, skip=skip, limit=limit)
    return items

 

Uvicorn으로 FastAPI 실행하기

uvicorn sql_app.main:app --reload
 
INFO: Uvicorn running on http://127.0.0.1:8000 (press CTRL+C, quit)

이제 웹브라우저에서 다음 주소에 접속하자.

http://127.0.0.1:8000/docs.

 

FastAPI 에서 제공하는 API 테스트 화면(스웨거 기능)이 나타날 것이다. 

 

미들웨어 기능을 이용한 DB session 처리

yield 와 함께 종속성을 사용할 수 없는 경우(예를 들어 Python 3.7을 사용하지 않고 Python 3.6에서 "backports"를 설치할 수 없을 때) "미들웨어"에서 세션을 설정할 수 있다. "미들웨어"는 기본적으로 각 요청에 대해 항상 실행되는 기능으로, 일부 코드는 이전에 실행되고 일부 코드는 엔드포인트 기능 후에 실행된다.

 

미들웨어(middleware) 생성 

from fastapi import Depends, FastAPI, HTTPException, Request, Response
from sqlalchemy.orm import Session
from . import crud, models, schemas
from .database import SessionLocal, engine
models.Base.metadata.create_all(bind=engine)
app = FastAPI()
@app.middleware("http")
async def db_session_middleware(request: Request, call_next):
    response = Response("Internal server error", status_code=500)
    try:
        request.state.db = SessionLocal()
        response = await call_next(request)
    finally:
        request.state.db.close()
    return response
# Dependency
def get_db(request: Request):
    return request.state.db
@app.post("/users/", response_model=schemas.User)
def create_user(user: schemas.UserCreate, db: Session = Depends(get_db)):
    db_user = crud.get_user_by_email(db, email=user.email)
    if db_user:
        raise HTTPException(status_code=400, detail="Email already registered")
    return crud.create_user(db=db, user=user)
@app.get("/users/", response_model=list[schemas.User])
def read_users(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
    users = crud.get_users(db, skip=skip, limit=limit)
    return users
@app.get("/users/{user_id}", response_model=schemas.User)
def read_user(user_id: int, db: Session = Depends(get_db)):
    db_user = crud.get_user(db, user_id=user_id)
    if db_user is None:
        raise HTTPException(status_code=404, detail="User not found")
    return db_user
@app.post("/users/{user_id}/items/", response_model=schemas.Item)
def create_item_for_user(
    user_id: int, item: schemas.ItemCreate, db: Session = Depends(get_db)
):
    return crud.create_user_item(db=db, item=item, user_id=user_id)
@app.get("/items/", response_model=list[schemas.Item])
def read_items(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
    items = crud.get_items(db, skip=skip, limit=limit)
    return items
여기에 미들웨어를 추가하는 것은 yield를 사용한 종속성과 유사하다. 다만, 미들웨어 방식은 다음과 같은 차이가 있다.

- 더 많은 코드가 필요하고 약간 더 복잡하다.
- 미들웨어는 비동기 기능이어야 한다.
     - 네트워크 "대기" 코드가 있는 경우 거기에서 애플리케이션을 "차단"하고 성능을 저하시킬 수 있다.
     - SQLAlchemy가 작동하는 방식에 대해서는 여기에서 그다지 문제가 되지 않을 수 있다.
     - 하지만 대기 중인 I/O가 많은 미들웨어에 코드를 더 추가하면 문제가 될 수 있다.
- 모든 요청에 ​​대해 미들웨어가 실행된다.
     - 모든 요청에 ​​대해 연결이 생성된다.
     - 해당 요청을 처리하는 경로 작업에 DB가 필요하지 않은 경우에도 마찬가지다.
 

 

반응형

+ Recent posts