128 lines
4.6 KiB
Python
128 lines
4.6 KiB
Python
import redis.asyncio as redis
|
|
import json
|
|
from typing import Optional, Dict, Any
|
|
from datetime import timedelta
|
|
from app.core.env_setting import EnvSetting
|
|
|
|
|
|
settings = EnvSetting()
|
|
|
|
class RedisManager:
|
|
def __init__(self, redis_url: Optional[str] = None):
|
|
self.redis_url = redis_url or settings.RedisManager_URL
|
|
self.client : Optional[redis.Redis] = None
|
|
|
|
async def connect(self):
|
|
'''Redis 연결'''
|
|
if self.client is None:
|
|
self.client = redis.from_url(
|
|
self.redis_url,
|
|
encoding="utf-8",
|
|
decode_responses=True,
|
|
max_connections=20,
|
|
socket_connect_timeout=5,
|
|
socket_timeout=5,
|
|
)
|
|
return self.client
|
|
|
|
async def close(self):
|
|
'''Redis 연결 종료'''
|
|
if self.client:
|
|
await self.client.close()
|
|
self.client = None
|
|
|
|
async def get_client(self) -> redis.Redis:
|
|
'''Redis 클라이언트 반환'''
|
|
if self.client is None:
|
|
await self.connect()
|
|
return self.client
|
|
|
|
class RedisOAuthStorage:
|
|
"""OAuth 관련 Redis 저장소"""
|
|
|
|
def __init__(self, redis_manager: RedisManager):
|
|
self.redis_manager = redis_manager
|
|
self.state_prefix = "oauth:state:"
|
|
self.temp_token_prefix = "oauth:temp_token:"
|
|
self.updated_token_prefix = "oauth:updated_token:"
|
|
|
|
async def store_oauth_state(self, state: str, flow_data: Dict[str, Any], ttl: int = 600):
|
|
"""OAuth 상태 저장 (기본 10분 TTL)"""
|
|
client = await self.redis_manager.get_client()
|
|
key = f"{self.state_prefix}{state}"
|
|
await client.setex(key, ttl, json.dumps(flow_data))
|
|
|
|
async def get_oauth_state(self, state: str) -> Optional[Dict[str, Any]]:
|
|
"""OAuth 상태 조회 및 삭제"""
|
|
client = await self.redis_manager.get_client()
|
|
key = f"{self.state_prefix}{state}"
|
|
|
|
data = await client.get(key)
|
|
if data:
|
|
await client.delete(key) # 일회용이므로 삭제
|
|
return json.loads(data)
|
|
return None
|
|
|
|
async def store_temp_token(self, temp_token_id: str, token_data: Dict[str, Any], ttl: int = 3600):
|
|
"""임시 토큰 저장 (기본 1시간 TTL)"""
|
|
client = await self.redis_manager.get_client()
|
|
key = f"{self.temp_token_prefix}{temp_token_id}"
|
|
await client.setex(key, ttl, json.dumps(token_data))
|
|
|
|
async def get_temp_token(self, temp_token_id: str) -> Optional[Dict[str, Any]]:
|
|
"""임시 토큰 조회 및 삭제"""
|
|
client = await self.redis_manager.get_client()
|
|
key = f"{self.temp_token_prefix}{temp_token_id}"
|
|
|
|
data = await client.get(key)
|
|
if data:
|
|
await client.delete(key) # 일회용이므로 삭제
|
|
return json.loads(data)
|
|
return None
|
|
|
|
async def store_updated_token(self, token_key: str, token_data: Dict[str, Any], ttl: int = 3600):
|
|
"""갱신된 토큰 저장"""
|
|
client = await self.redis_manager.get_client()
|
|
key = f"{self.updated_token_prefix}{token_key}"
|
|
await client.setex(key, ttl, json.dumps(token_data))
|
|
|
|
class RedisYouTubeStorage:
|
|
"""YouTube 관련 Redis 저장소"""
|
|
|
|
def __init__(self, redis_manager: RedisManager):
|
|
self.redis_manager = redis_manager
|
|
self.channels_prefix = "youtube:channels:"
|
|
self.service_prefix = "youtube:service:"
|
|
|
|
async def get_cached_channels(self, cache_key: str) -> Optional[Dict[str, Any]]:
|
|
"""캐시된 채널 정보 조회"""
|
|
client = await self.redis_manager.get_client()
|
|
key = f"{self.channels_prefix}{cache_key}"
|
|
|
|
data = await client.get(key)
|
|
if data:
|
|
return json.loads(data)
|
|
return None
|
|
|
|
async def cache_channels(self, cache_key: str, channels_data: Dict[str, Any], ttl: int = 1800):
|
|
"""채널 정보 캐싱 (기본 30분 TTL)"""
|
|
client = await self.redis_manager.get_client()
|
|
key = f"{self.channels_prefix}{cache_key}"
|
|
await client.setex(key, ttl, json.dumps(channels_data))
|
|
|
|
# 전역 Redis 매니저 인스턴스
|
|
redis_manager = RedisManager()
|
|
oauth_storage = RedisOAuthStorage(redis_manager)
|
|
youtube_storage = RedisYouTubeStorage(redis_manager)
|
|
|
|
async def get_redis_manager() -> RedisManager:
|
|
"""Redis 매니저 의존성"""
|
|
return redis_manager
|
|
|
|
async def get_oauth_storage() -> RedisOAuthStorage:
|
|
"""OAuth 저장소 의존성"""
|
|
return oauth_storage
|
|
|
|
async def get_youtube_storage() -> RedisYouTubeStorage:
|
|
"""YouTube 저장소 의존성"""
|
|
return youtube_storage |