FastAPI 백그라운드 작업 (이메일 전송, 파일 저장 등 백그라운드 태스크)
다음 두 예제는 백그라운드 작업을 이용하는 예를 보여줍니다.
[ 예제 1 ] https://www.youtube.com/watch?v=_yXOJvr5vOM
'''
fastapi==0.63.0
h11==0.12.0
h2==4.0.0
hpack==4.0.0
Hypercorn==0.11.2
hyperframe==6.0.0
pkg-resources==0.0.0
priority==1.3.0
pydantic==1.8.1
starlette==0.13.6
toml==0.10.2
typing-extensions==3.7.4.3
wsproto==1.0.0
'''
from fastapi import FastAPI, BackgroundTasks
from time import sleep
app = FastAPI()
def send_email(message):
sleep(5)
print('Sending email: ', message)
@app.get('/')
async def index(background_tasks: BackgroundTasks):
background_tasks.add_task(send_email, "Hello there.")
return {'result' : 'success'}
[ 예제 2 ] https://www.youtube.com/watch?v=Y3NRCfPKp3c
from fastapi import FastAPI, Body, Request, File, UploadFile, Form, Depends, BackgroundTasks
from pydantic import BaseModel
from fastapi.responses import HTMLResponse
from fastapi.templating import Jinja2Templates
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
import time
app = FastAPI()
list_of_usernames = list()
templates = Jinja2Templates(directory="htmldirectory")
oauth_scheme = OAuth2PasswordBearer(tokenUrl="token")
def handle_email_background(email: str, data: str):
print(email)
print(data)
for i in range(100):
print(i)
time.sleep(0.1)
@app.get("/users/email")
async def handle_email(email: str, background_task: BackgroundTasks):
print(email)
background_task.add_task(handle_email_background, email, "THIS IS A SAMPLE BACKGROUND TASK MANAGER")
return {"user": "baradwaj", "message": "Mail Sent"}
@app.post("/token")
async def token_generate(form_data: OAuth2PasswordRequestForm = Depends()):
print(form_data)
return {"access_token": form_data.username, "token_type": "bearer"}
@app.get("/users/profilepic")
async def profile_pic(token: str = Depends(oauth_scheme)):
print(token)
return {
"user": "baradwaj",
"profile_pic": "my_face"
}
class NameValues(BaseModel):
name: str = None
country: str
age: int
base_salary: float
@app.get("/home/{user_name}", response_class=HTMLResponse)
def write_home(request: Request, user_name: str):
return templates.TemplateResponse("home.html", {"request": request, "username": user_name})
@app.post("/submitform")
async def handle_form(assignment: str = Form(...), assignment_file: UploadFile = File(...)):
print(assignment)
print(assignment_file.filename)
content_assignment = await assignment_file.read()
print(content_assignment)
@app.post("/postData")
def post_data(name_value: NameValues, spousal_status: str = Body(...)):
print(name_value)
return {
"name": name_value.name,
"spousal_status": spousal_status
}
'FastAPI' 카테고리의 다른 글
FastAPI, 워커(worker) 개수 자동 계산하기 (0) | 2022.02.04 |
---|---|
FastAPI - A python framework | Full Course (0) | 2022.01.28 |
Nginx로 FastAPI 배포하기 (0) | 2022.01.25 |
FastAPI 시작, 중지, 재시작 (2) | 2022.01.25 |
FastAPI 세 가지 에러 핸들링 방법 (0) | 2022.01.23 |