""" 내부 전용 좋아요 반응 플러시 API 스케줄러가 1분마다 호출하여 Redis dirty SET의 좋아요 토글을 MySQL에 bulk write합니다. X-Internal-Secret 헤더로 인증합니다. """ import logging from fastapi import APIRouter, Depends, Header, HTTPException, status from sqlalchemy import delete, insert, tuple_ from sqlalchemy.ext.asyncio import AsyncSession from app.database.like_cache import ( commit_dirty_processing, drain_dirty, is_user_liked, ) from app.database.session import get_session from app.video.models import VideoReaction from config import internal_settings logger = logging.getLogger(__name__) router = APIRouter(prefix="/internal/video", tags=["Internal"]) @router.post( "/reactions/flush", summary="[내부] 좋아요 반응 DB 플러시", description="스케줄러 서버에서 1분마다 호출하는 내부 전용 엔드포인트입니다. " "Redis dirty SET의 항목을 MySQL video_reaction 테이블에 bulk write합니다.", ) async def flush_reactions( session: AsyncSession = Depends(get_session), x_internal_secret: str = Header(...), ) -> dict: """Redis dirty SET → MySQL bulk write. 1. drain_dirty(): dirty SET을 processing으로 RENAME 후 항목 조회 2. 각 항목의 현재 Redis 상태(is_liked) 확인 3. is_liked=True → INSERT IGNORE, is_liked=False → DELETE 4. commit_dirty_processing(): processing SET 삭제 """ if x_internal_secret != internal_settings.INTERNAL_SECRET_KEY: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Invalid internal secret", ) pairs = await drain_dirty() if not pairs: logger.info("[REACTION_FLUSH] dirty 항목 없음, 종료") return {"flushed": 0, "adds": 0, "dels": 0} logger.info(f"[REACTION_FLUSH] START - dirty 항목 {len(pairs)}건") adds: list[dict] = [] dels: list[tuple[int, str]] = [] # Redis 현재 상태 기준으로 add / delete 분류 for video_id, user_uuid in pairs: liked = await is_user_liked(video_id, user_uuid) if liked: adds.append({"video_id": video_id, "user_uuid": user_uuid}) else: dels.append((video_id, user_uuid)) try: # Bulk INSERT IGNORE — UniqueConstraint 보장으로 멱등 처리 if adds: await session.execute( insert(VideoReaction).prefix_with("IGNORE").values(adds) ) # Bulk DELETE if dels: await session.execute( delete(VideoReaction).where( tuple_( VideoReaction.video_id, VideoReaction.user_uuid, ).in_(dels) ) ) await session.commit() await commit_dirty_processing() logger.info( f"[REACTION_FLUSH] SUCCESS - adds: {len(adds)}, dels: {len(dels)}" ) return {"flushed": len(pairs), "adds": len(adds), "dels": len(dels)} except Exception as e: await session.rollback() logger.error(f"[REACTION_FLUSH] EXCEPTION - error: {e}") raise HTTPException(status_code=500, detail=f"플러시 실패: {str(e)}")