O2Sound_ver2_final/backend/app/infra/google/redis.py

128 lines
4.6 KiB
Python

import redis.asyncio as redis
import json
from typing import Optional, Dict, Any
from datetime import timedelta
from app.core.env_setting import EnvSetting
settings = EnvSetting()
class RedisManager:
def __init__(self, redis_url: Optional[str] = None):
self.redis_url = redis_url or settings.RedisManager_URL
self.client : Optional[redis.Redis] = None
async def connect(self):
'''Redis 연결'''
if self.client is None:
self.client = redis.from_url(
self.redis_url,
encoding="utf-8",
decode_responses=True,
max_connections=20,
socket_connect_timeout=5,
socket_timeout=5,
)
return self.client
async def close(self):
'''Redis 연결 종료'''
if self.client:
await self.client.close()
self.client = None
async def get_client(self) -> redis.Redis:
'''Redis 클라이언트 반환'''
if self.client is None:
await self.connect()
return self.client
class RedisOAuthStorage:
"""OAuth 관련 Redis 저장소"""
def __init__(self, redis_manager: RedisManager):
self.redis_manager = redis_manager
self.state_prefix = "oauth:state:"
self.temp_token_prefix = "oauth:temp_token:"
self.updated_token_prefix = "oauth:updated_token:"
async def store_oauth_state(self, state: str, flow_data: Dict[str, Any], ttl: int = 600):
"""OAuth 상태 저장 (기본 10분 TTL)"""
client = await self.redis_manager.get_client()
key = f"{self.state_prefix}{state}"
await client.setex(key, ttl, json.dumps(flow_data))
async def get_oauth_state(self, state: str) -> Optional[Dict[str, Any]]:
"""OAuth 상태 조회 및 삭제"""
client = await self.redis_manager.get_client()
key = f"{self.state_prefix}{state}"
data = await client.get(key)
if data:
await client.delete(key) # 일회용이므로 삭제
return json.loads(data)
return None
async def store_temp_token(self, temp_token_id: str, token_data: Dict[str, Any], ttl: int = 3600):
"""임시 토큰 저장 (기본 1시간 TTL)"""
client = await self.redis_manager.get_client()
key = f"{self.temp_token_prefix}{temp_token_id}"
await client.setex(key, ttl, json.dumps(token_data))
async def get_temp_token(self, temp_token_id: str) -> Optional[Dict[str, Any]]:
"""임시 토큰 조회 및 삭제"""
client = await self.redis_manager.get_client()
key = f"{self.temp_token_prefix}{temp_token_id}"
data = await client.get(key)
if data:
await client.delete(key) # 일회용이므로 삭제
return json.loads(data)
return None
async def store_updated_token(self, token_key: str, token_data: Dict[str, Any], ttl: int = 3600):
"""갱신된 토큰 저장"""
client = await self.redis_manager.get_client()
key = f"{self.updated_token_prefix}{token_key}"
await client.setex(key, ttl, json.dumps(token_data))
class RedisYouTubeStorage:
"""YouTube 관련 Redis 저장소"""
def __init__(self, redis_manager: RedisManager):
self.redis_manager = redis_manager
self.channels_prefix = "youtube:channels:"
self.service_prefix = "youtube:service:"
async def get_cached_channels(self, cache_key: str) -> Optional[Dict[str, Any]]:
"""캐시된 채널 정보 조회"""
client = await self.redis_manager.get_client()
key = f"{self.channels_prefix}{cache_key}"
data = await client.get(key)
if data:
return json.loads(data)
return None
async def cache_channels(self, cache_key: str, channels_data: Dict[str, Any], ttl: int = 1800):
"""채널 정보 캐싱 (기본 30분 TTL)"""
client = await self.redis_manager.get_client()
key = f"{self.channels_prefix}{cache_key}"
await client.setex(key, ttl, json.dumps(channels_data))
# 전역 Redis 매니저 인스턴스
redis_manager = RedisManager()
oauth_storage = RedisOAuthStorage(redis_manager)
youtube_storage = RedisYouTubeStorage(redis_manager)
async def get_redis_manager() -> RedisManager:
"""Redis 매니저 의존성"""
return redis_manager
async def get_oauth_storage() -> RedisOAuthStorage:
"""OAuth 저장소 의존성"""
return oauth_storage
async def get_youtube_storage() -> RedisYouTubeStorage:
"""YouTube 저장소 의존성"""
return youtube_storage