O2Sound_ver2_final/backend/app/services/social/google_service.py

480 lines
21 KiB
Python

import json
import uuid
import httpx
from typing import Optional, Dict, Any, List
from datetime import datetime, timezone
from fastapi import Request, HTTPException
from app.core.oauth_setting import get_oauth_client
from app.core.redis.oauth_storage import GoogleOAuthStorage
from app.core.redis.redis_manager import RedisManager
from app.shared.exception.custom_exception import OAuthConfigurationError, OAuthTokenExchangeError, OAuthStateInvalidError, OAuthUserInfoError, OAuthTempTokenError
from app.presentation.schemas.social.google_schema import GoogleCallbackResponse, GoogleTokenResponse, GoogleUserInfo
class GoogleService:
'''Google 서비스'''
def __init__(self, redis_manager: RedisManager):
self.oauth_storage = GoogleOAuthStorage(redis_manager)
async def get_login_url(self, return_url: Optional[str] = None, request: Request = None):
'''Google 로그인 URL 생성 및 리다이렉트'''
try:
# OAuth 클라이언트 확인
oauth_client = get_oauth_client()
if not hasattr(oauth_client, 'google'):
raise OAuthConfigurationError()
# 콜백 URL 설정
if request:
base_url = f"{request.url.scheme}://{request.url.netloc}"
redirect_uri = f"{base_url}/api/v1/social/google/callback"
else:
# fallback URL (env에서 가져올 수도 있음)
redirect_uri = "http://localhost:8000/api/v1/social/google/callback"
state_data = {
"return_url": return_url,
"provider": "google",
"created_at": datetime.now(timezone.utc).isoformat(),
"redirect_uri": redirect_uri # 검증용
}
state = str(uuid.uuid4())
# Redis에 state 저장
await self.oauth_storage.store_oauth_state(state, state_data)
# Google OAuth 클라이언트로 리다이렉트 URL 생성
return await oauth_client.google.authorize_redirect(request, redirect_uri, state=state)
except Exception as e:
raise OAuthConfigurationError(f"Google 로그인 URL 생성 중 오류 발생: {e}")
async def handle_callback(self, request: Request):
'''Google OAuth 콜백 처리'''
try:
# 에러 파라미터 확인
error = request.query_params.get('error')
if error:
error_description = request.query_params.get('error_description', '')
raise OAuthTokenExchangeError(f"Google OAuth 에러: {error} - {error_description}")
# state 검증
state = request.query_params.get('state')
if not state:
raise OAuthStateInvalidError("State 파라미터가 없습니다.")
# Redis에서 state 데이터 조회
state_data = await self.oauth_storage.get_oauth_state(state)
if not state_data:
raise OAuthStateInvalidError("State가 유효하지 않거나 만료되었습니다.")
return_url = state_data.get("return_url")
stored_redirect_uri = state_data.get("redirect_uri")
# state 삭제 (일회용)
await self.oauth_storage.delete_oauth_state(state)
# OAuth 클라이언트 확인
oauth_client = get_oauth_client()
if not hasattr(oauth_client, 'google'):
raise OAuthConfigurationError("Google OAuth 클라이언트가 설정되지 않았습니다.")
# Google에서 토큰 교환
try:
token = await oauth_client.google.authorize_access_token(request)
print(f"✅ Google 토큰 교환 성공")
except Exception as e:
print(f"❌ Google 토큰 교환 실패: {e}")
raise OAuthTokenExchangeError(f"토큰 교환 실패: {str(e)}")
# 사용자 정보 추출
try:
user_info = await self._extract_google_user_info(token)
print(f"✅ Google 사용자 정보 추출 성공: {user_info.email}")
except Exception as e:
print(f"❌ Google 사용자 정보 추출 실패: {e}")
raise OAuthUserInfoError(f"사용자 정보 추출 실패: {str(e)}")
# 토큰 데이터 준비
token_data = {
"access_token": token.get('access_token'),
"refresh_token": token.get('refresh_token'),
"expires_at": self._calculate_expires_at(token),
"token_type": token.get('token_type', 'Bearer'),
"user_info": user_info.dict(),
"scopes": token.get('scope', '').split() if token.get('scope') else [],
"provider": "google",
"created_at": datetime.now(timezone.utc).isoformat()
}
# 임시 토큰 ID 생성 및 저장
temp_token_id = str(uuid.uuid4())
await self.oauth_storage.store_temp_token(temp_token_id, token_data)
print(f"✅ Google OAuth 콜백 처리 완료")
print(f" Temp Token ID: {temp_token_id[:8]}...")
return GoogleCallbackResponse(
message="Google 로그인 성공",
temp_token_id=temp_token_id,
return_url=return_url,
user_info=user_info
)
except (OAuthStateInvalidError, OAuthTokenExchangeError, OAuthUserInfoError, OAuthConfigurationError):
raise
except Exception as e:
print(f"❌ Google OAuth 콜백 처리 실패: {e}")
raise OAuthTokenExchangeError(f"콜백 처리 실패: {str(e)}")
async def get_token_by_temp_id(self, temp_token_id: str) -> GoogleTokenResponse:
"""임시 토큰 ID로 실제 토큰 정보 조회"""
try:
token_data = await self.oauth_storage.get_temp_token(temp_token_id)
if not token_data:
print(f"⚠️ 임시 토큰 없음: {temp_token_id[:8]}...")
raise OAuthTempTokenError("임시 토큰이 유효하지 않거나 만료되었습니다.")
# 임시 토큰 삭제 (일회용)
await self.oauth_storage.delete_temp_token(temp_token_id)
print(f"✅ 임시 토큰 조회 및 삭제 완료: {temp_token_id[:8]}...")
# 올바른 데이터 구조로 반환 (GoogleTokenResponse를 직접 반환)
return GoogleTokenResponse(
access_token=token_data['access_token'],
refresh_token=token_data.get('refresh_token'),
expires_at=token_data.get('expires_at'),
token_type=token_data['token_type'],
user_info=GoogleUserInfo(**token_data['user_info']),
scopes=token_data.get('scopes')
)
except OAuthTempTokenError:
raise
except Exception as e:
print(f"❌ 임시 토큰 조회 실패: {e}")
raise OAuthTempTokenError(f"토큰 조회 실패: {str(e)}")
async def _extract_google_user_info(self, token: Dict[str, Any]) -> GoogleUserInfo:
"""Google 사용자 정보 추출"""
try:
# userinfo에서 먼저 시도
userinfo = token.get('userinfo', {})
# userinfo가 없으면 id_token에서 추출
if not userinfo and 'id_token' in token:
from authlib.jose import jwt
userinfo = jwt.decode(token['id_token'], options={"verify_signature": False})
if not userinfo:
raise ValueError("사용자 정보를 찾을 수 없습니다.")
provider_id = userinfo.get('sub')
if not provider_id:
raise ValueError("Google 사용자 ID(sub)를 찾을 수 없습니다.")
return GoogleUserInfo(
provider_id=provider_id,
email=userinfo.get('email'),
name=userinfo.get('name'),
picture=userinfo.get('picture'),
raw_data=userinfo
)
except Exception as e:
raise ValueError(f"Google 사용자 정보 추출 실패: {str(e)}")
def _calculate_expires_at(self, token: Dict[str, Any]) -> Optional[str]:
"""토큰 만료 시간 계산"""
try:
expires_in = token.get('expires_in')
if expires_in:
expires_at = datetime.now(timezone.utc).timestamp() + expires_in
return datetime.fromtimestamp(expires_at, timezone.utc).isoformat()
return None
except Exception as e:
print(f"⚠️ 토큰 만료 시간 계산 실패: {e}")
return None
# Google Service 클래스에 추가할 메서드
async def get_all_youtube_channel_info(self, request: Request):
'''YouTube 모든 채널 정보 조회'''
try:
# Authorization 헤더에서 토큰 추출
authorization = request.headers.get("Authorization")
if not authorization:
raise HTTPException(status_code=401, detail="Authorization 헤더가 필요합니다.")
if not authorization.startswith("Bearer "):
raise HTTPException(status_code=401, detail="Bearer 토큰이 필요합니다.")
access_token = authorization.replace("Bearer ", "")
# YouTube API로 채널 정보 조회
channels_data = await self._fetch_youtube_channels(access_token)
print(f"✅ YouTube 채널 정보 조회 완료: {len(channels_data)}개 채널")
return {
"message": "YouTube 채널 정보 조회 성공",
"data": channels_data,
"total_count": len(channels_data)
}
except HTTPException:
raise
except Exception as e:
print(f"❌ YouTube 채널 정보 조회 실패: {e}")
raise HTTPException(status_code=500, detail=f"채널 정보 조회 실패: {str(e)}")
async def _fetch_youtube_channels(self, access_token: str) -> List[Dict[str, Any]]:
"""YouTube Data API를 사용하여 채널 정보 조회"""
try:
url = "https://www.googleapis.com/youtube/v3/channels"
headers = {
"Authorization": f"Bearer {access_token}",
"Accept": "application/json"
}
params = {
"part": "snippet,statistics,contentDetails",
"mine": "true",
"maxResults": 50
}
async with httpx.AsyncClient() as client:
response = await client.get(url, headers=headers, params=params)
if response.status_code == 401:
raise HTTPException(status_code=401, detail="YouTube API 인증 실패")
elif response.status_code == 403:
raise HTTPException(status_code=403, detail="YouTube API 접근 권한 없음")
elif response.status_code != 200:
raise HTTPException(status_code=response.status_code, detail=f"YouTube API 요청 실패: {response.text}")
data = response.json()
channels = data.get('items', [])
# 채널 정보 정리
result = []
for channel in channels:
snippet = channel.get('snippet', {})
statistics = channel.get('statistics', {})
content_details = channel.get('contentDetails', {})
channel_info = {
"channel_id": channel.get('id'),
"title": snippet.get('title'),
"description": snippet.get('description'),
"thumbnail_url": snippet.get('thumbnails', {}).get('default', {}).get('url'),
"custom_url": snippet.get('customUrl'),
"published_at": snippet.get('publishedAt'),
"country": snippet.get('country'),
"view_count": int(statistics.get('viewCount', 0)),
"subscriber_count": int(statistics.get('subscriberCount', 0)),
"video_count": int(statistics.get('videoCount', 0)),
"uploads_playlist_id": content_details.get('relatedPlaylists', {}).get('uploads')
}
result.append(channel_info)
return result
except HTTPException:
raise
except Exception as e:
print(f"❌ YouTube API 요청 실패: {e}")
raise Exception(f"YouTube API 요청 실패: {str(e)}")
async def upload_youtube_video(self, request: Request):
'''YouTube 비디오 업로드'''
try:
# Authorization 헤더에서 토큰 추출
authorization = request.headers.get("Authorization")
if not authorization or not authorization.startswith("Bearer "):
raise HTTPException(status_code=401, detail="Bearer 토큰이 필요합니다.")
access_token = authorization.replace("Bearer ", "")
# 요청 본문에서 데이터 추출
body = await request.json()
channel_id = body.get('channel_id')
title = body.get('title')
description = body.get('description', '')
hashtags = body.get('hashtags', []) # 리스트 형태
video_url = body.get('video_url')
privacy_status = body.get('privacy_status', 'private') # 기본 비공개
category_id = body.get('category_id', '22') # 기본 People & Blogs
default_language = body.get('default_language', 'ko')
# 필수 필드 검증 (access_token 제외)
if not all([channel_id, title, video_url]):
raise HTTPException(
status_code=400,
detail="필수 필드가 누락되었습니다: channel_id, title, video_url"
)
# 비디오 파일 다운로드
video_content = await self._download_video_file(video_url)
# YouTube API로 비디오 업로드
upload_result = await self._upload_to_youtube(
access_token=access_token,
title=title,
description=description,
hashtags=hashtags,
video_content=video_content,
privacy_status=privacy_status,
category_id=category_id,
default_language=default_language
)
video_id = upload_result['video_id']
youtube_url = f"https://www.youtube.com/watch?v={video_id}"
youtube_short_url = f"https://youtu.be/{video_id}"
studio_url = f"https://studio.youtube.com/video/{video_id}/edit"
print(f"✅ YouTube 비디오 업로드 성공: {video_id}")
return {
"message": "YouTube 비디오 업로드 성공",
"video_id": video_id,
"video_info": {
"title": title,
"description": description,
"hashtags": hashtags,
"privacy_status": privacy_status,
"upload_status": upload_result.get('upload_status')
},
"links": {
"youtube_url": youtube_url,
"youtube_short_url": youtube_short_url,
"studio_url": studio_url
},
"channel_id": channel_id,
"uploaded_at": datetime.now(timezone.utc).isoformat()
}
except HTTPException:
raise
except Exception as e:
print(f"❌ YouTube 비디오 업로드 실패: {e}")
raise HTTPException(status_code=500, detail=f"비디오 업로드 실패: {str(e)}")
async def _download_video_file(self, video_url: str) -> bytes:
"""비디오 파일 다운로드"""
try:
async with httpx.AsyncClient(timeout=300.0) as client: # 5분 타임아웃
response = await client.get(video_url)
if response.status_code != 200:
raise Exception(f"비디오 다운로드 실패: {response.status_code}")
print(f"✅ 비디오 파일 다운로드 완료: {len(response.content)} bytes")
return response.content
except Exception as e:
print(f"❌ 비디오 다운로드 실패: {e}")
raise Exception(f"비디오 다운로드 실패: {str(e)}")
async def _upload_to_youtube(
self,
access_token: str,
title: str,
description: str,
hashtags: List[str],
video_content: bytes,
privacy_status: str = 'private',
category_id: str = '22',
default_language: str = 'ko'
) -> Dict[str, Any]:
"""YouTube API로 비디오 업로드"""
try:
# 해시태그를 설명에 추가
if hashtags:
hashtag_text = ' '.join([f"#{tag}" for tag in hashtags])
description = f"{description}\n\n{hashtag_text}".strip()
# 메타데이터 설정
metadata = {
"snippet": {
"title": title,
"description": description,
"categoryId": category_id,
"defaultLanguage": default_language,
"tags": hashtags # 태그는 별도로도 설정
},
"status": {
"privacyStatus": privacy_status,
"selfDeclaredMadeForKids": False
}
}
# multipart/form-data로 업로드
url = "https://www.googleapis.com/upload/youtube/v3/videos"
headers = {
"Authorization": f"Bearer {access_token}",
"Accept": "application/json"
}
params = {
"part": "snippet,status",
"uploadType": "multipart"
}
# multipart 데이터 준비
files = {
'metadata': (None, json.dumps(metadata), 'application/json'),
'media': ('video.mp4', video_content, 'video/mp4')
}
async with httpx.AsyncClient(timeout=600.0) as client: # 10분 타임아웃
response = await client.post(
url,
headers=headers,
params=params,
files=files
)
print(f"🔍 YouTube 업로드 응답 상태: {response.status_code}")
if response.status_code == 401:
raise HTTPException(status_code=401, detail="YouTube API 인증 실패")
elif response.status_code == 403:
error_data = response.json() if response.headers.get('content-type', '').startswith('application/json') else {}
error_reason = error_data.get('error', {}).get('errors', [{}])[0].get('reason', 'unknown')
if error_reason == 'quotaExceeded':
raise HTTPException(status_code=403, detail="YouTube API 할당량 초과")
elif error_reason == 'uploadLimitExceeded':
raise HTTPException(status_code=403, detail="일일 업로드 한도 초과")
else:
raise HTTPException(status_code=403, detail=f"YouTube API 권한 거부: {error_reason}")
elif response.status_code != 200:
error_text = response.text
print(f"❌ YouTube 업로드 에러: {error_text}")
raise Exception(f"YouTube 업로드 실패 ({response.status_code}): {error_text}")
result = response.json()
video_id = result.get('id')
upload_status = result.get('status', {}).get('uploadStatus')
if not video_id:
raise Exception("YouTube 응답에서 video_id를 찾을 수 없습니다")
print(f"✅ YouTube 업로드 성공: video_id={video_id}, status={upload_status}")
return {
"video_id": video_id,
"upload_status": upload_status,
"raw_response": result
}
except HTTPException:
raise
except Exception as e:
print(f"❌ YouTube API 업로드 실패: {e}")
raise Exception(f"YouTube API 업로드 실패: {str(e)}")