99 lines
3.2 KiB
Python
99 lines
3.2 KiB
Python
"""
|
|
내부 전용 좋아요 반응 플러시 API
|
|
|
|
스케줄러가 1분마다 호출하여 Redis dirty SET의 좋아요 토글을 MySQL에 bulk write합니다.
|
|
X-Internal-Secret 헤더로 인증합니다.
|
|
"""
|
|
|
|
import logging
|
|
|
|
from fastapi import APIRouter, Depends, Header, HTTPException, status
|
|
from sqlalchemy import delete, insert, tuple_
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
|
|
from app.database.like_cache import (
|
|
commit_dirty_processing,
|
|
drain_dirty,
|
|
is_user_liked,
|
|
)
|
|
from app.database.session import get_session
|
|
from app.video.models import VideoReaction
|
|
from config import internal_settings
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
router = APIRouter(prefix="/internal/video", tags=["Internal"])
|
|
|
|
|
|
@router.post(
|
|
"/reactions/flush",
|
|
summary="[내부] 좋아요 반응 DB 플러시",
|
|
description="스케줄러 서버에서 1분마다 호출하는 내부 전용 엔드포인트입니다. "
|
|
"Redis dirty SET의 항목을 MySQL video_reaction 테이블에 bulk write합니다.",
|
|
)
|
|
async def flush_reactions(
|
|
session: AsyncSession = Depends(get_session),
|
|
x_internal_secret: str = Header(...),
|
|
) -> dict:
|
|
"""Redis dirty SET → MySQL bulk write.
|
|
|
|
1. drain_dirty(): dirty SET을 processing으로 RENAME 후 항목 조회
|
|
2. 각 항목의 현재 Redis 상태(is_liked) 확인
|
|
3. is_liked=True → INSERT IGNORE, is_liked=False → DELETE
|
|
4. commit_dirty_processing(): processing SET 삭제
|
|
"""
|
|
if x_internal_secret != internal_settings.INTERNAL_SECRET_KEY:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_403_FORBIDDEN,
|
|
detail="Invalid internal secret",
|
|
)
|
|
|
|
pairs = await drain_dirty()
|
|
if not pairs:
|
|
logger.info("[REACTION_FLUSH] dirty 항목 없음, 종료")
|
|
return {"flushed": 0, "adds": 0, "dels": 0}
|
|
|
|
logger.info(f"[REACTION_FLUSH] START - dirty 항목 {len(pairs)}건")
|
|
|
|
adds: list[dict] = []
|
|
dels: list[tuple[int, str]] = []
|
|
|
|
# Redis 현재 상태 기준으로 add / delete 분류
|
|
for video_id, user_uuid in pairs:
|
|
liked = await is_user_liked(video_id, user_uuid)
|
|
if liked:
|
|
adds.append({"video_id": video_id, "user_uuid": user_uuid})
|
|
else:
|
|
dels.append((video_id, user_uuid))
|
|
|
|
try:
|
|
# Bulk INSERT IGNORE — UniqueConstraint 보장으로 멱등 처리
|
|
if adds:
|
|
await session.execute(
|
|
insert(VideoReaction).prefix_with("IGNORE").values(adds)
|
|
)
|
|
|
|
# Bulk DELETE
|
|
if dels:
|
|
await session.execute(
|
|
delete(VideoReaction).where(
|
|
tuple_(
|
|
VideoReaction.video_id,
|
|
VideoReaction.user_uuid,
|
|
).in_(dels)
|
|
)
|
|
)
|
|
|
|
await session.commit()
|
|
await commit_dirty_processing()
|
|
|
|
logger.info(
|
|
f"[REACTION_FLUSH] SUCCESS - adds: {len(adds)}, dels: {len(dels)}"
|
|
)
|
|
return {"flushed": len(pairs), "adds": len(adds), "dels": len(dels)}
|
|
|
|
except Exception as e:
|
|
await session.rollback()
|
|
logger.error(f"[REACTION_FLUSH] EXCEPTION - error: {e}")
|
|
raise HTTPException(status_code=500, detail=f"플러시 실패: {str(e)}")
|