o2o-castad-backend/app/database/like_cache.py

236 lines
7.8 KiB
Python

"""
좋아요 Redis 캐시 클라이언트
Write-Behind 패턴 적용:
- 토글 시 Redis를 즉시 업데이트하고 dirty SET에 표시
- 스케줄러가 1분마다 dirty 항목을 MySQL에 bulk write
Key 패턴:
- video:like:count:{video_id} INT — 좋아요 카운트
- video:like:users:{video_id} SET — 좋아요 누른 user_uuid 목록
- video:reaction:dirty SET — DB 동기화 대기 "{video_id}:{user_uuid}"
- video:reaction:dirty:processing SET — 플러시 중 임시 (크래시 복구용)
캐시 미스(Redis 재시작 등) 시 호출부에서 DB 조회 후 backfill_user_set() / set_like_count()로 복구합니다.
"""
import redis.asyncio as aioredis
from config import db_settings
_client: aioredis.Redis | None = None
# 원자적 토글 Lua 스크립트 — 동시 더블클릭 race condition 방지
_TOGGLE_LIKE_SCRIPT = """
local user_key = KEYS[1]
local count_key = KEYS[2]
local user_uuid = ARGV[1]
if redis.call('SISMEMBER', user_key, user_uuid) == 1 then
redis.call('SREM', user_key, user_uuid)
local c = tonumber(redis.call('DECR', count_key))
if c < 0 then
redis.call('SET', count_key, 0)
c = 0
end
return {0, c}
else
redis.call('SADD', user_key, user_uuid)
local c = tonumber(redis.call('INCR', count_key))
return {1, c}
end
"""
_DIRTY_KEY = "video:reaction:dirty"
_DIRTY_PROCESSING_KEY = "video:reaction:dirty:processing"
def get_like_cache() -> aioredis.Redis:
global _client
if _client is None:
_client = aioredis.Redis(
host=db_settings.REDIS_HOST,
port=db_settings.REDIS_PORT,
db=2,
decode_responses=True,
)
return _client
async def close_like_cache() -> None:
global _client
if _client:
await _client.aclose()
_client = None
# ──────────────────────────────────────────────
# Key 헬퍼
# ──────────────────────────────────────────────
def _key(video_id: int) -> str:
return f"video:like:count:{video_id}"
def _user_key(video_id: int) -> str:
return f"video:like:users:{video_id}"
# ──────────────────────────────────────────────
# 카운트 (기존 API 유지)
# ──────────────────────────────────────────────
async def get_like_count(video_id: int) -> int | None:
"""Redis에서 like_count 조회. 캐시 미스 시 None 반환."""
val = await get_like_cache().get(_key(video_id))
if val is None:
return None
return max(int(val), 0)
async def get_like_counts(video_ids: list[int]) -> dict[int, int | None]:
"""여러 영상의 like_count를 한 번에 조회 (mget).
캐시 미스인 video_id는 None으로 반환."""
if not video_ids:
return {}
keys = [_key(vid) for vid in video_ids]
values = await get_like_cache().mget(*keys)
return {
vid: max(int(v), 0) if v is not None else None
for vid, v in zip(video_ids, values)
}
async def set_like_count(video_id: int, count: int) -> None:
"""like_count를 Redis에 저장 (음수 방지)."""
await get_like_cache().set(_key(video_id), max(count, 0))
async def mset_like_counts(counts: dict[int, int]) -> None:
"""여러 영상의 like_count를 한 번에 저장 (mset)."""
if not counts:
return
await get_like_cache().mset({_key(vid): max(cnt, 0) for vid, cnt in counts.items()})
async def incr_like_count(video_id: int) -> int:
"""like_count를 1 증가 후 반환."""
return max(int(await get_like_cache().incr(_key(video_id))), 0)
async def decr_like_count(video_id: int) -> int:
"""like_count를 1 감소 후 반환 (음수 방지)."""
count = int(await get_like_cache().decr(_key(video_id)))
if count < 0:
await get_like_cache().set(_key(video_id), 0)
return 0
return count
# ──────────────────────────────────────────────
# 유저 SET (is_liked_by_me source of truth)
# ──────────────────────────────────────────────
async def toggle_like_atomic(video_id: int, user_uuid: str) -> tuple[bool, int]:
"""Lua 스크립트로 원자적 좋아요 토글.
Returns:
(is_liked, new_count) 튜플
"""
result = await get_like_cache().eval(
_TOGGLE_LIKE_SCRIPT,
2,
_user_key(video_id),
_key(video_id),
user_uuid,
)
return bool(result[0]), int(result[1])
async def is_user_liked(video_id: int, user_uuid: str) -> bool | None:
"""Redis user-set에서 좋아요 여부 조회.
Returns:
True/False: 조회 성공
None: user-set 키가 없음 (cold-start backfill 필요 신호)
"""
client = get_like_cache()
key = _user_key(video_id)
if not await client.exists(key):
return None
return bool(await client.sismember(key, user_uuid))
async def is_user_set_exists(video_id: int) -> bool:
"""Redis user-set 키 존재 여부 확인."""
return bool(await get_like_cache().exists(_user_key(video_id)))
async def bulk_is_user_liked(
video_ids: list[int], user_uuid: str
) -> dict[int, bool | None]:
"""여러 영상의 is_liked 여부를 한 번에 조회 (pipeline).
Returns:
{video_id: True/False} — user-set 키가 없는 영상은 None
"""
if not video_ids:
return {}
client = get_like_cache()
async with client.pipeline(transaction=False) as pipe:
for vid in video_ids:
pipe.exists(_user_key(vid))
pipe.sismember(_user_key(vid), user_uuid)
responses = await pipe.execute()
return {
vid: (bool(responses[i * 2 + 1]) if responses[i * 2] else None)
for i, vid in enumerate(video_ids)
}
async def backfill_user_set(video_id: int, user_uuids: list[str]) -> None:
"""DB에서 가져온 유저 목록을 Redis SET에 일괄 적재."""
if user_uuids:
await get_like_cache().sadd(_user_key(video_id), *user_uuids)
# ──────────────────────────────────────────────
# Dirty SET (Write-Behind 큐)
# ──────────────────────────────────────────────
async def mark_dirty(video_id: int, user_uuid: str) -> None:
"""DB 동기화 대기 목록에 추가."""
await get_like_cache().sadd(_DIRTY_KEY, f"{video_id}:{user_uuid}")
async def drain_dirty() -> list[tuple[int, str]]:
"""dirty SET을 processing으로 RENAME 후 전체 반환.
이전 실행 중 크래시로 남은 processing 항목은 먼저 병합하여 유실 방지.
"""
client = get_like_cache()
# 이전 크래시 잔여 항목 병합
if await client.exists(_DIRTY_PROCESSING_KEY):
await client.sunionstore(_DIRTY_KEY, _DIRTY_KEY, _DIRTY_PROCESSING_KEY)
await client.delete(_DIRTY_PROCESSING_KEY)
if not await client.exists(_DIRTY_KEY):
return []
# RENAME으로 플러시 중 새로 들어오는 토글과 분리
await client.rename(_DIRTY_KEY, _DIRTY_PROCESSING_KEY)
members = await client.smembers(_DIRTY_PROCESSING_KEY)
result = []
for member in members:
vid_str, user_uuid = member.split(":", 1)
result.append((int(vid_str), user_uuid))
return result
async def commit_dirty_processing() -> None:
"""DB 반영 완료 후 processing SET 삭제."""
await get_like_cache().delete(_DIRTY_PROCESSING_KEY)