O2Sound_ver2_final/backend/app/shared/progress.py

69 lines
2.1 KiB
Python

from app.core.redis.redis_manager import RedisManager
import json
from app.shared.logger import setup_logger
from typing import Optional
logger = setup_logger(__name__)
class Process:
"""Redis 상태 저장 및 조회 (예: 진행률 트래킹)"""
def __init__(self, redis_manager: RedisManager):
self.redis = redis_manager
async def init_task_status(self, task_id: str) -> None:
"""처음 상태 초기화"""
status = {
"metadata": False,
"lyrics": False,
"songs": False,
"images": False,
"movies": False,
"combined": False
}
r = await self.redis.get_client()
await r.set(f"task_progress:{task_id}", json.dumps(status))
async def set_task_step_done(self, task_id: str, step_name: str) -> None:
"""단계별 상태 True로 업데이트"""
r = await self.redis.get_client()
key = f"task_progress:{task_id}"
data = await r.get(key)
if not data:
await self.init_task_status(task_id)
data = await r.get(key)
if not data:
logger.error(f"[set_task_step_done] Redis에 상태가 저장되지 않았습니다. key={key}")
return
try:
if isinstance(data, bytes):
data = data.decode()
status_data = json.loads(data)
except Exception as e:
logger.exception(f"[set_task_step_done] JSON decode 실패: {e}")
return
# 단계 상태 업데이트
status_data[step_name] = True
await r.set(key, json.dumps(status_data)) # ✅ await 추가
async def get_task_status(self, task_id: str) -> dict:
"""전체 상태 조회"""
r = await self.redis.get_client()
key = f"task_progress:{task_id}"
data = await r.get(key)
if not data:
return {}
try:
if isinstance(data, bytes):
data = data.decode()
return json.loads(data)
except Exception as e:
logger.exception(f"[get_task_status] JSON decode error for {key}: {e}")
return {}