from app.core.redis.redis_manager import RedisManager import json from app.shared.logger import setup_logger from typing import Optional logger = setup_logger(__name__) class Process: """Redis 상태 저장 및 조회 (예: 진행률 트래킹)""" def __init__(self, redis_manager: RedisManager): self.redis = redis_manager async def init_task_status(self, task_id: str) -> None: """처음 상태 초기화""" status = { "metadata": False, "lyrics": False, "songs": False, "images": False, "movies": False, "combined": False } r = await self.redis.get_client() await r.set(f"task_progress:{task_id}", json.dumps(status)) async def set_task_step_done(self, task_id: str, step_name: str) -> None: """단계별 상태 True로 업데이트""" r = await self.redis.get_client() key = f"task_progress:{task_id}" data = await r.get(key) if not data: await self.init_task_status(task_id) data = await r.get(key) if not data: logger.error(f"[set_task_step_done] Redis에 상태가 저장되지 않았습니다. key={key}") return try: if isinstance(data, bytes): data = data.decode() status_data = json.loads(data) except Exception as e: logger.exception(f"[set_task_step_done] JSON decode 실패: {e}") return # 단계 상태 업데이트 status_data[step_name] = True await r.set(key, json.dumps(status_data)) # ✅ await 추가 async def get_task_status(self, task_id: str) -> dict: """전체 상태 조회""" r = await self.redis.get_client() key = f"task_progress:{task_id}" data = await r.get(key) if not data: return {} try: if isinstance(data, bytes): data = data.decode() return json.loads(data) except Exception as e: logger.exception(f"[get_task_status] JSON decode error for {key}: {e}") return {}