import redis.asyncio as redis import json from typing import Optional, Dict, Any from datetime import timedelta from app.core.env_setting import EnvSetting settings = EnvSetting() class RedisManager: def __init__(self, redis_url: Optional[str] = None): self.redis_url = redis_url or settings.RedisManager_URL self.client : Optional[redis.Redis] = None async def connect(self): '''Redis 연결''' if self.client is None: self.client = redis.from_url( self.redis_url, encoding="utf-8", decode_responses=True, max_connections=20, socket_connect_timeout=5, socket_timeout=5, ) return self.client async def close(self): '''Redis 연결 종료''' if self.client: await self.client.close() self.client = None async def get_client(self) -> redis.Redis: '''Redis 클라이언트 반환''' if self.client is None: await self.connect() return self.client class RedisOAuthStorage: """OAuth 관련 Redis 저장소""" def __init__(self, redis_manager: RedisManager): self.redis_manager = redis_manager self.state_prefix = "oauth:state:" self.temp_token_prefix = "oauth:temp_token:" self.updated_token_prefix = "oauth:updated_token:" async def store_oauth_state(self, state: str, flow_data: Dict[str, Any], ttl: int = 600): """OAuth 상태 저장 (기본 10분 TTL)""" client = await self.redis_manager.get_client() key = f"{self.state_prefix}{state}" await client.setex(key, ttl, json.dumps(flow_data)) async def get_oauth_state(self, state: str) -> Optional[Dict[str, Any]]: """OAuth 상태 조회 및 삭제""" client = await self.redis_manager.get_client() key = f"{self.state_prefix}{state}" data = await client.get(key) if data: await client.delete(key) # 일회용이므로 삭제 return json.loads(data) return None async def store_temp_token(self, temp_token_id: str, token_data: Dict[str, Any], ttl: int = 3600): """임시 토큰 저장 (기본 1시간 TTL)""" client = await self.redis_manager.get_client() key = f"{self.temp_token_prefix}{temp_token_id}" await client.setex(key, ttl, json.dumps(token_data)) async def get_temp_token(self, temp_token_id: str) -> Optional[Dict[str, Any]]: """임시 토큰 조회 및 삭제""" client = await self.redis_manager.get_client() key = f"{self.temp_token_prefix}{temp_token_id}" data = await client.get(key) if data: await client.delete(key) # 일회용이므로 삭제 return json.loads(data) return None async def store_updated_token(self, token_key: str, token_data: Dict[str, Any], ttl: int = 3600): """갱신된 토큰 저장""" client = await self.redis_manager.get_client() key = f"{self.updated_token_prefix}{token_key}" await client.setex(key, ttl, json.dumps(token_data)) class RedisYouTubeStorage: """YouTube 관련 Redis 저장소""" def __init__(self, redis_manager: RedisManager): self.redis_manager = redis_manager self.channels_prefix = "youtube:channels:" self.service_prefix = "youtube:service:" async def get_cached_channels(self, cache_key: str) -> Optional[Dict[str, Any]]: """캐시된 채널 정보 조회""" client = await self.redis_manager.get_client() key = f"{self.channels_prefix}{cache_key}" data = await client.get(key) if data: return json.loads(data) return None async def cache_channels(self, cache_key: str, channels_data: Dict[str, Any], ttl: int = 1800): """채널 정보 캐싱 (기본 30분 TTL)""" client = await self.redis_manager.get_client() key = f"{self.channels_prefix}{cache_key}" await client.setex(key, ttl, json.dumps(channels_data)) # 전역 Redis 매니저 인스턴스 redis_manager = RedisManager() oauth_storage = RedisOAuthStorage(redis_manager) youtube_storage = RedisYouTubeStorage(redis_manager) async def get_redis_manager() -> RedisManager: """Redis 매니저 의존성""" return redis_manager async def get_oauth_storage() -> RedisOAuthStorage: """OAuth 저장소 의존성""" return oauth_storage async def get_youtube_storage() -> RedisYouTubeStorage: """YouTube 저장소 의존성""" return youtube_storage