""" 좋아요 Redis 캐시 클라이언트 Write-Behind 패턴 적용: - 토글 시 Redis를 즉시 업데이트하고 dirty SET에 표시 - 스케줄러가 1분마다 dirty 항목을 MySQL에 bulk write Key 패턴: - video:like:count:{video_id} INT — 좋아요 카운트 - video:like:users:{video_id} SET — 좋아요 누른 user_uuid 목록 - video:reaction:dirty SET — DB 동기화 대기 "{video_id}:{user_uuid}" - video:reaction:dirty:processing SET — 플러시 중 임시 (크래시 복구용) 캐시 미스(Redis 재시작 등) 시 호출부에서 DB 조회 후 backfill_user_set() / set_like_count()로 복구합니다. """ import redis.asyncio as aioredis from config import db_settings _client: aioredis.Redis | None = None # 원자적 토글 Lua 스크립트 — 동시 더블클릭 race condition 방지 _TOGGLE_LIKE_SCRIPT = """ local user_key = KEYS[1] local count_key = KEYS[2] local user_uuid = ARGV[1] if redis.call('SISMEMBER', user_key, user_uuid) == 1 then redis.call('SREM', user_key, user_uuid) local c = tonumber(redis.call('DECR', count_key)) if c < 0 then redis.call('SET', count_key, 0) c = 0 end return {0, c} else redis.call('SADD', user_key, user_uuid) local c = tonumber(redis.call('INCR', count_key)) return {1, c} end """ _DIRTY_KEY = "video:reaction:dirty" _DIRTY_PROCESSING_KEY = "video:reaction:dirty:processing" def get_like_cache() -> aioredis.Redis: global _client if _client is None: _client = aioredis.Redis( host=db_settings.REDIS_HOST, port=db_settings.REDIS_PORT, db=2, decode_responses=True, ) return _client async def close_like_cache() -> None: global _client if _client: await _client.aclose() _client = None # ────────────────────────────────────────────── # Key 헬퍼 # ────────────────────────────────────────────── def _key(video_id: int) -> str: return f"video:like:count:{video_id}" def _user_key(video_id: int) -> str: return f"video:like:users:{video_id}" # ────────────────────────────────────────────── # 카운트 (기존 API 유지) # ────────────────────────────────────────────── async def get_like_count(video_id: int) -> int | None: """Redis에서 like_count 조회. 캐시 미스 시 None 반환.""" val = await get_like_cache().get(_key(video_id)) if val is None: return None return max(int(val), 0) async def get_like_counts(video_ids: list[int]) -> dict[int, int | None]: """여러 영상의 like_count를 한 번에 조회 (mget). 캐시 미스인 video_id는 None으로 반환.""" if not video_ids: return {} keys = [_key(vid) for vid in video_ids] values = await get_like_cache().mget(*keys) return { vid: max(int(v), 0) if v is not None else None for vid, v in zip(video_ids, values) } async def set_like_count(video_id: int, count: int) -> None: """like_count를 Redis에 저장 (음수 방지).""" await get_like_cache().set(_key(video_id), max(count, 0)) async def mset_like_counts(counts: dict[int, int]) -> None: """여러 영상의 like_count를 한 번에 저장 (mset).""" if not counts: return await get_like_cache().mset({_key(vid): max(cnt, 0) for vid, cnt in counts.items()}) async def incr_like_count(video_id: int) -> int: """like_count를 1 증가 후 반환.""" return max(int(await get_like_cache().incr(_key(video_id))), 0) async def decr_like_count(video_id: int) -> int: """like_count를 1 감소 후 반환 (음수 방지).""" count = int(await get_like_cache().decr(_key(video_id))) if count < 0: await get_like_cache().set(_key(video_id), 0) return 0 return count # ────────────────────────────────────────────── # 유저 SET (is_liked_by_me source of truth) # ────────────────────────────────────────────── async def toggle_like_atomic(video_id: int, user_uuid: str) -> tuple[bool, int]: """Lua 스크립트로 원자적 좋아요 토글. Returns: (is_liked, new_count) 튜플 """ result = await get_like_cache().eval( _TOGGLE_LIKE_SCRIPT, 2, _user_key(video_id), _key(video_id), user_uuid, ) return bool(result[0]), int(result[1]) async def is_user_liked(video_id: int, user_uuid: str) -> bool | None: """Redis user-set에서 좋아요 여부 조회. Returns: True/False: 조회 성공 None: user-set 키가 없음 (cold-start backfill 필요 신호) """ client = get_like_cache() key = _user_key(video_id) if not await client.exists(key): return None return bool(await client.sismember(key, user_uuid)) async def is_user_set_exists(video_id: int) -> bool: """Redis user-set 키 존재 여부 확인.""" return bool(await get_like_cache().exists(_user_key(video_id))) async def bulk_is_user_liked( video_ids: list[int], user_uuid: str ) -> dict[int, bool | None]: """여러 영상의 is_liked 여부를 한 번에 조회 (pipeline). Returns: {video_id: True/False} — user-set 키가 없는 영상은 None """ if not video_ids: return {} client = get_like_cache() async with client.pipeline(transaction=False) as pipe: for vid in video_ids: pipe.exists(_user_key(vid)) pipe.sismember(_user_key(vid), user_uuid) responses = await pipe.execute() return { vid: (bool(responses[i * 2 + 1]) if responses[i * 2] else None) for i, vid in enumerate(video_ids) } async def backfill_user_set(video_id: int, user_uuids: list[str]) -> None: """DB에서 가져온 유저 목록을 Redis SET에 일괄 적재.""" if user_uuids: await get_like_cache().sadd(_user_key(video_id), *user_uuids) # ────────────────────────────────────────────── # Dirty SET (Write-Behind 큐) # ────────────────────────────────────────────── async def mark_dirty(video_id: int, user_uuid: str) -> None: """DB 동기화 대기 목록에 추가.""" await get_like_cache().sadd(_DIRTY_KEY, f"{video_id}:{user_uuid}") async def drain_dirty() -> list[tuple[int, str]]: """dirty SET을 processing으로 RENAME 후 전체 반환. 이전 실행 중 크래시로 남은 processing 항목은 먼저 병합하여 유실 방지. """ client = get_like_cache() # 이전 크래시 잔여 항목 병합 if await client.exists(_DIRTY_PROCESSING_KEY): await client.sunionstore(_DIRTY_KEY, _DIRTY_KEY, _DIRTY_PROCESSING_KEY) await client.delete(_DIRTY_PROCESSING_KEY) if not await client.exists(_DIRTY_KEY): return [] # RENAME으로 플러시 중 새로 들어오는 토글과 분리 await client.rename(_DIRTY_KEY, _DIRTY_PROCESSING_KEY) members = await client.smembers(_DIRTY_PROCESSING_KEY) result = [] for member in members: vid_str, user_uuid = member.split(":", 1) result.append((int(vid_str), user_uuid)) return result async def commit_dirty_processing() -> None: """DB 반영 완료 후 processing SET 삭제.""" await get_like_cache().delete(_DIRTY_PROCESSING_KEY)