o2o-castad-backend/app/video/api/routers/internal/reactions.py

99 lines
3.2 KiB
Python

"""
내부 전용 좋아요 반응 플러시 API
스케줄러가 1분마다 호출하여 Redis dirty SET의 좋아요 토글을 MySQL에 bulk write합니다.
X-Internal-Secret 헤더로 인증합니다.
"""
import logging
from fastapi import APIRouter, Depends, Header, HTTPException, status
from sqlalchemy import delete, insert, tuple_
from sqlalchemy.ext.asyncio import AsyncSession
from app.database.like_cache import (
commit_dirty_processing,
drain_dirty,
is_user_liked,
)
from app.database.session import get_session
from app.video.models import VideoReaction
from config import internal_settings
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/internal/video", tags=["Internal"])
@router.post(
"/reactions/flush",
summary="[내부] 좋아요 반응 DB 플러시",
description="스케줄러 서버에서 1분마다 호출하는 내부 전용 엔드포인트입니다. "
"Redis dirty SET의 항목을 MySQL video_reaction 테이블에 bulk write합니다.",
)
async def flush_reactions(
session: AsyncSession = Depends(get_session),
x_internal_secret: str = Header(...),
) -> dict:
"""Redis dirty SET → MySQL bulk write.
1. drain_dirty(): dirty SET을 processing으로 RENAME 후 항목 조회
2. 각 항목의 현재 Redis 상태(is_liked) 확인
3. is_liked=True → INSERT IGNORE, is_liked=False → DELETE
4. commit_dirty_processing(): processing SET 삭제
"""
if x_internal_secret != internal_settings.INTERNAL_SECRET_KEY:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Invalid internal secret",
)
pairs = await drain_dirty()
if not pairs:
logger.info("[REACTION_FLUSH] dirty 항목 없음, 종료")
return {"flushed": 0, "adds": 0, "dels": 0}
logger.info(f"[REACTION_FLUSH] START - dirty 항목 {len(pairs)}")
adds: list[dict] = []
dels: list[tuple[int, str]] = []
# Redis 현재 상태 기준으로 add / delete 분류
for video_id, user_uuid in pairs:
liked = await is_user_liked(video_id, user_uuid)
if liked:
adds.append({"video_id": video_id, "user_uuid": user_uuid})
else:
dels.append((video_id, user_uuid))
try:
# Bulk INSERT IGNORE — UniqueConstraint 보장으로 멱등 처리
if adds:
await session.execute(
insert(VideoReaction).prefix_with("IGNORE").values(adds)
)
# Bulk DELETE
if dels:
await session.execute(
delete(VideoReaction).where(
tuple_(
VideoReaction.video_id,
VideoReaction.user_uuid,
).in_(dels)
)
)
await session.commit()
await commit_dirty_processing()
logger.info(
f"[REACTION_FLUSH] SUCCESS - adds: {len(adds)}, dels: {len(dels)}"
)
return {"flushed": len(pairs), "adds": len(adds), "dels": len(dels)}
except Exception as e:
await session.rollback()
logger.error(f"[REACTION_FLUSH] EXCEPTION - error: {e}")
raise HTTPException(status_code=500, detail=f"플러시 실패: {str(e)}")