480 lines
21 KiB
Python
480 lines
21 KiB
Python
import json
|
|
import uuid
|
|
import httpx
|
|
from typing import Optional, Dict, Any, List
|
|
from datetime import datetime, timezone
|
|
from fastapi import Request, HTTPException
|
|
from app.core.oauth_setting import get_oauth_client
|
|
from app.core.redis.oauth_storage import GoogleOAuthStorage
|
|
from app.core.redis.redis_manager import RedisManager
|
|
from app.shared.exception.custom_exception import OAuthConfigurationError, OAuthTokenExchangeError, OAuthStateInvalidError, OAuthUserInfoError, OAuthTempTokenError
|
|
from app.presentation.schemas.social.google_schema import GoogleCallbackResponse, GoogleTokenResponse, GoogleUserInfo
|
|
|
|
class GoogleService:
|
|
'''Google 서비스'''
|
|
|
|
def __init__(self, redis_manager: RedisManager):
|
|
self.oauth_storage = GoogleOAuthStorage(redis_manager)
|
|
|
|
async def get_login_url(self, return_url: Optional[str] = None, request: Request = None):
|
|
'''Google 로그인 URL 생성 및 리다이렉트'''
|
|
try:
|
|
# OAuth 클라이언트 확인
|
|
oauth_client = get_oauth_client()
|
|
if not hasattr(oauth_client, 'google'):
|
|
raise OAuthConfigurationError()
|
|
|
|
# 콜백 URL 설정
|
|
if request:
|
|
base_url = f"{request.url.scheme}://{request.url.netloc}"
|
|
redirect_uri = f"{base_url}/api/v1/social/google/callback"
|
|
else:
|
|
# fallback URL (env에서 가져올 수도 있음)
|
|
redirect_uri = "http://localhost:8000/api/v1/social/google/callback"
|
|
|
|
state_data = {
|
|
"return_url": return_url,
|
|
"provider": "google",
|
|
"created_at": datetime.now(timezone.utc).isoformat(),
|
|
"redirect_uri": redirect_uri # 검증용
|
|
}
|
|
state = str(uuid.uuid4())
|
|
|
|
# Redis에 state 저장
|
|
await self.oauth_storage.store_oauth_state(state, state_data)
|
|
|
|
# Google OAuth 클라이언트로 리다이렉트 URL 생성
|
|
return await oauth_client.google.authorize_redirect(request, redirect_uri, state=state)
|
|
except Exception as e:
|
|
raise OAuthConfigurationError(f"Google 로그인 URL 생성 중 오류 발생: {e}")
|
|
|
|
async def handle_callback(self, request: Request):
|
|
'''Google OAuth 콜백 처리'''
|
|
try:
|
|
# 에러 파라미터 확인
|
|
error = request.query_params.get('error')
|
|
if error:
|
|
error_description = request.query_params.get('error_description', '')
|
|
raise OAuthTokenExchangeError(f"Google OAuth 에러: {error} - {error_description}")
|
|
|
|
# state 검증
|
|
state = request.query_params.get('state')
|
|
if not state:
|
|
raise OAuthStateInvalidError("State 파라미터가 없습니다.")
|
|
|
|
# Redis에서 state 데이터 조회
|
|
state_data = await self.oauth_storage.get_oauth_state(state)
|
|
if not state_data:
|
|
raise OAuthStateInvalidError("State가 유효하지 않거나 만료되었습니다.")
|
|
|
|
return_url = state_data.get("return_url")
|
|
stored_redirect_uri = state_data.get("redirect_uri")
|
|
|
|
# state 삭제 (일회용)
|
|
await self.oauth_storage.delete_oauth_state(state)
|
|
|
|
# OAuth 클라이언트 확인
|
|
oauth_client = get_oauth_client()
|
|
if not hasattr(oauth_client, 'google'):
|
|
raise OAuthConfigurationError("Google OAuth 클라이언트가 설정되지 않았습니다.")
|
|
|
|
# Google에서 토큰 교환
|
|
try:
|
|
token = await oauth_client.google.authorize_access_token(request)
|
|
print(f"✅ Google 토큰 교환 성공")
|
|
except Exception as e:
|
|
print(f"❌ Google 토큰 교환 실패: {e}")
|
|
raise OAuthTokenExchangeError(f"토큰 교환 실패: {str(e)}")
|
|
|
|
# 사용자 정보 추출
|
|
try:
|
|
user_info = await self._extract_google_user_info(token)
|
|
print(f"✅ Google 사용자 정보 추출 성공: {user_info.email}")
|
|
except Exception as e:
|
|
print(f"❌ Google 사용자 정보 추출 실패: {e}")
|
|
raise OAuthUserInfoError(f"사용자 정보 추출 실패: {str(e)}")
|
|
|
|
# 토큰 데이터 준비
|
|
token_data = {
|
|
"access_token": token.get('access_token'),
|
|
"refresh_token": token.get('refresh_token'),
|
|
"expires_at": self._calculate_expires_at(token),
|
|
"token_type": token.get('token_type', 'Bearer'),
|
|
"user_info": user_info.dict(),
|
|
"scopes": token.get('scope', '').split() if token.get('scope') else [],
|
|
"provider": "google",
|
|
"created_at": datetime.now(timezone.utc).isoformat()
|
|
}
|
|
|
|
# 임시 토큰 ID 생성 및 저장
|
|
temp_token_id = str(uuid.uuid4())
|
|
await self.oauth_storage.store_temp_token(temp_token_id, token_data)
|
|
|
|
print(f"✅ Google OAuth 콜백 처리 완료")
|
|
print(f" Temp Token ID: {temp_token_id[:8]}...")
|
|
|
|
return GoogleCallbackResponse(
|
|
message="Google 로그인 성공",
|
|
temp_token_id=temp_token_id,
|
|
return_url=return_url,
|
|
user_info=user_info
|
|
)
|
|
|
|
except (OAuthStateInvalidError, OAuthTokenExchangeError, OAuthUserInfoError, OAuthConfigurationError):
|
|
raise
|
|
except Exception as e:
|
|
print(f"❌ Google OAuth 콜백 처리 실패: {e}")
|
|
raise OAuthTokenExchangeError(f"콜백 처리 실패: {str(e)}")
|
|
|
|
async def get_token_by_temp_id(self, temp_token_id: str) -> GoogleTokenResponse:
|
|
"""임시 토큰 ID로 실제 토큰 정보 조회"""
|
|
try:
|
|
token_data = await self.oauth_storage.get_temp_token(temp_token_id)
|
|
if not token_data:
|
|
print(f"⚠️ 임시 토큰 없음: {temp_token_id[:8]}...")
|
|
raise OAuthTempTokenError("임시 토큰이 유효하지 않거나 만료되었습니다.")
|
|
|
|
# 임시 토큰 삭제 (일회용)
|
|
await self.oauth_storage.delete_temp_token(temp_token_id)
|
|
|
|
print(f"✅ 임시 토큰 조회 및 삭제 완료: {temp_token_id[:8]}...")
|
|
|
|
# 올바른 데이터 구조로 반환 (GoogleTokenResponse를 직접 반환)
|
|
return GoogleTokenResponse(
|
|
access_token=token_data['access_token'],
|
|
refresh_token=token_data.get('refresh_token'),
|
|
expires_at=token_data.get('expires_at'),
|
|
token_type=token_data['token_type'],
|
|
user_info=GoogleUserInfo(**token_data['user_info']),
|
|
scopes=token_data.get('scopes')
|
|
)
|
|
|
|
except OAuthTempTokenError:
|
|
raise
|
|
except Exception as e:
|
|
print(f"❌ 임시 토큰 조회 실패: {e}")
|
|
raise OAuthTempTokenError(f"토큰 조회 실패: {str(e)}")
|
|
|
|
async def _extract_google_user_info(self, token: Dict[str, Any]) -> GoogleUserInfo:
|
|
"""Google 사용자 정보 추출"""
|
|
try:
|
|
# userinfo에서 먼저 시도
|
|
userinfo = token.get('userinfo', {})
|
|
|
|
# userinfo가 없으면 id_token에서 추출
|
|
if not userinfo and 'id_token' in token:
|
|
from authlib.jose import jwt
|
|
userinfo = jwt.decode(token['id_token'], options={"verify_signature": False})
|
|
|
|
if not userinfo:
|
|
raise ValueError("사용자 정보를 찾을 수 없습니다.")
|
|
|
|
provider_id = userinfo.get('sub')
|
|
if not provider_id:
|
|
raise ValueError("Google 사용자 ID(sub)를 찾을 수 없습니다.")
|
|
|
|
return GoogleUserInfo(
|
|
provider_id=provider_id,
|
|
email=userinfo.get('email'),
|
|
name=userinfo.get('name'),
|
|
picture=userinfo.get('picture'),
|
|
raw_data=userinfo
|
|
)
|
|
|
|
except Exception as e:
|
|
raise ValueError(f"Google 사용자 정보 추출 실패: {str(e)}")
|
|
|
|
def _calculate_expires_at(self, token: Dict[str, Any]) -> Optional[str]:
|
|
"""토큰 만료 시간 계산"""
|
|
try:
|
|
expires_in = token.get('expires_in')
|
|
if expires_in:
|
|
expires_at = datetime.now(timezone.utc).timestamp() + expires_in
|
|
return datetime.fromtimestamp(expires_at, timezone.utc).isoformat()
|
|
return None
|
|
except Exception as e:
|
|
print(f"⚠️ 토큰 만료 시간 계산 실패: {e}")
|
|
return None
|
|
|
|
|
|
|
|
|
|
# Google Service 클래스에 추가할 메서드
|
|
async def get_all_youtube_channel_info(self, request: Request):
|
|
'''YouTube 모든 채널 정보 조회'''
|
|
try:
|
|
# Authorization 헤더에서 토큰 추출
|
|
authorization = request.headers.get("Authorization")
|
|
if not authorization:
|
|
raise HTTPException(status_code=401, detail="Authorization 헤더가 필요합니다.")
|
|
|
|
if not authorization.startswith("Bearer "):
|
|
raise HTTPException(status_code=401, detail="Bearer 토큰이 필요합니다.")
|
|
|
|
access_token = authorization.replace("Bearer ", "")
|
|
|
|
# YouTube API로 채널 정보 조회
|
|
channels_data = await self._fetch_youtube_channels(access_token)
|
|
|
|
print(f"✅ YouTube 채널 정보 조회 완료: {len(channels_data)}개 채널")
|
|
|
|
return {
|
|
"message": "YouTube 채널 정보 조회 성공",
|
|
"data": channels_data,
|
|
"total_count": len(channels_data)
|
|
}
|
|
|
|
except HTTPException:
|
|
raise
|
|
except Exception as e:
|
|
print(f"❌ YouTube 채널 정보 조회 실패: {e}")
|
|
raise HTTPException(status_code=500, detail=f"채널 정보 조회 실패: {str(e)}")
|
|
|
|
async def _fetch_youtube_channels(self, access_token: str) -> List[Dict[str, Any]]:
|
|
"""YouTube Data API를 사용하여 채널 정보 조회"""
|
|
try:
|
|
url = "https://www.googleapis.com/youtube/v3/channels"
|
|
headers = {
|
|
"Authorization": f"Bearer {access_token}",
|
|
"Accept": "application/json"
|
|
}
|
|
params = {
|
|
"part": "snippet,statistics,contentDetails",
|
|
"mine": "true",
|
|
"maxResults": 50
|
|
}
|
|
|
|
async with httpx.AsyncClient() as client:
|
|
response = await client.get(url, headers=headers, params=params)
|
|
|
|
if response.status_code == 401:
|
|
raise HTTPException(status_code=401, detail="YouTube API 인증 실패")
|
|
elif response.status_code == 403:
|
|
raise HTTPException(status_code=403, detail="YouTube API 접근 권한 없음")
|
|
elif response.status_code != 200:
|
|
raise HTTPException(status_code=response.status_code, detail=f"YouTube API 요청 실패: {response.text}")
|
|
|
|
data = response.json()
|
|
channels = data.get('items', [])
|
|
|
|
# 채널 정보 정리
|
|
result = []
|
|
for channel in channels:
|
|
snippet = channel.get('snippet', {})
|
|
statistics = channel.get('statistics', {})
|
|
content_details = channel.get('contentDetails', {})
|
|
|
|
channel_info = {
|
|
"channel_id": channel.get('id'),
|
|
"title": snippet.get('title'),
|
|
"description": snippet.get('description'),
|
|
"thumbnail_url": snippet.get('thumbnails', {}).get('default', {}).get('url'),
|
|
"custom_url": snippet.get('customUrl'),
|
|
"published_at": snippet.get('publishedAt'),
|
|
"country": snippet.get('country'),
|
|
"view_count": int(statistics.get('viewCount', 0)),
|
|
"subscriber_count": int(statistics.get('subscriberCount', 0)),
|
|
"video_count": int(statistics.get('videoCount', 0)),
|
|
"uploads_playlist_id": content_details.get('relatedPlaylists', {}).get('uploads')
|
|
}
|
|
result.append(channel_info)
|
|
|
|
return result
|
|
|
|
except HTTPException:
|
|
raise
|
|
except Exception as e:
|
|
print(f"❌ YouTube API 요청 실패: {e}")
|
|
raise Exception(f"YouTube API 요청 실패: {str(e)}")
|
|
|
|
|
|
|
|
async def upload_youtube_video(self, request: Request):
|
|
'''YouTube 비디오 업로드'''
|
|
try:
|
|
# Authorization 헤더에서 토큰 추출
|
|
authorization = request.headers.get("Authorization")
|
|
if not authorization or not authorization.startswith("Bearer "):
|
|
raise HTTPException(status_code=401, detail="Bearer 토큰이 필요합니다.")
|
|
|
|
access_token = authorization.replace("Bearer ", "")
|
|
|
|
# 요청 본문에서 데이터 추출
|
|
body = await request.json()
|
|
|
|
channel_id = body.get('channel_id')
|
|
title = body.get('title')
|
|
description = body.get('description', '')
|
|
hashtags = body.get('hashtags', []) # 리스트 형태
|
|
video_url = body.get('video_url')
|
|
privacy_status = body.get('privacy_status', 'private') # 기본 비공개
|
|
category_id = body.get('category_id', '22') # 기본 People & Blogs
|
|
default_language = body.get('default_language', 'ko')
|
|
|
|
# 필수 필드 검증 (access_token 제외)
|
|
if not all([channel_id, title, video_url]):
|
|
raise HTTPException(
|
|
status_code=400,
|
|
detail="필수 필드가 누락되었습니다: channel_id, title, video_url"
|
|
)
|
|
|
|
# 비디오 파일 다운로드
|
|
video_content = await self._download_video_file(video_url)
|
|
|
|
# YouTube API로 비디오 업로드
|
|
upload_result = await self._upload_to_youtube(
|
|
access_token=access_token,
|
|
title=title,
|
|
description=description,
|
|
hashtags=hashtags,
|
|
video_content=video_content,
|
|
privacy_status=privacy_status,
|
|
category_id=category_id,
|
|
default_language=default_language
|
|
)
|
|
|
|
video_id = upload_result['video_id']
|
|
youtube_url = f"https://www.youtube.com/watch?v={video_id}"
|
|
youtube_short_url = f"https://youtu.be/{video_id}"
|
|
studio_url = f"https://studio.youtube.com/video/{video_id}/edit"
|
|
|
|
print(f"✅ YouTube 비디오 업로드 성공: {video_id}")
|
|
|
|
return {
|
|
"message": "YouTube 비디오 업로드 성공",
|
|
"video_id": video_id,
|
|
"video_info": {
|
|
"title": title,
|
|
"description": description,
|
|
"hashtags": hashtags,
|
|
"privacy_status": privacy_status,
|
|
"upload_status": upload_result.get('upload_status')
|
|
},
|
|
"links": {
|
|
"youtube_url": youtube_url,
|
|
"youtube_short_url": youtube_short_url,
|
|
"studio_url": studio_url
|
|
},
|
|
"channel_id": channel_id,
|
|
"uploaded_at": datetime.now(timezone.utc).isoformat()
|
|
}
|
|
|
|
except HTTPException:
|
|
raise
|
|
except Exception as e:
|
|
print(f"❌ YouTube 비디오 업로드 실패: {e}")
|
|
raise HTTPException(status_code=500, detail=f"비디오 업로드 실패: {str(e)}")
|
|
|
|
async def _download_video_file(self, video_url: str) -> bytes:
|
|
"""비디오 파일 다운로드"""
|
|
try:
|
|
async with httpx.AsyncClient(timeout=300.0) as client: # 5분 타임아웃
|
|
response = await client.get(video_url)
|
|
|
|
if response.status_code != 200:
|
|
raise Exception(f"비디오 다운로드 실패: {response.status_code}")
|
|
|
|
print(f"✅ 비디오 파일 다운로드 완료: {len(response.content)} bytes")
|
|
return response.content
|
|
|
|
except Exception as e:
|
|
print(f"❌ 비디오 다운로드 실패: {e}")
|
|
raise Exception(f"비디오 다운로드 실패: {str(e)}")
|
|
|
|
async def _upload_to_youtube(
|
|
self,
|
|
access_token: str,
|
|
title: str,
|
|
description: str,
|
|
hashtags: List[str],
|
|
video_content: bytes,
|
|
privacy_status: str = 'private',
|
|
category_id: str = '22',
|
|
default_language: str = 'ko'
|
|
) -> Dict[str, Any]:
|
|
"""YouTube API로 비디오 업로드"""
|
|
try:
|
|
# 해시태그를 설명에 추가
|
|
if hashtags:
|
|
hashtag_text = ' '.join([f"#{tag}" for tag in hashtags])
|
|
description = f"{description}\n\n{hashtag_text}".strip()
|
|
|
|
# 메타데이터 설정
|
|
metadata = {
|
|
"snippet": {
|
|
"title": title,
|
|
"description": description,
|
|
"categoryId": category_id,
|
|
"defaultLanguage": default_language,
|
|
"tags": hashtags # 태그는 별도로도 설정
|
|
},
|
|
"status": {
|
|
"privacyStatus": privacy_status,
|
|
"selfDeclaredMadeForKids": False
|
|
}
|
|
}
|
|
|
|
# multipart/form-data로 업로드
|
|
url = "https://www.googleapis.com/upload/youtube/v3/videos"
|
|
headers = {
|
|
"Authorization": f"Bearer {access_token}",
|
|
"Accept": "application/json"
|
|
}
|
|
params = {
|
|
"part": "snippet,status",
|
|
"uploadType": "multipart"
|
|
}
|
|
|
|
# multipart 데이터 준비
|
|
files = {
|
|
'metadata': (None, json.dumps(metadata), 'application/json'),
|
|
'media': ('video.mp4', video_content, 'video/mp4')
|
|
}
|
|
|
|
async with httpx.AsyncClient(timeout=600.0) as client: # 10분 타임아웃
|
|
response = await client.post(
|
|
url,
|
|
headers=headers,
|
|
params=params,
|
|
files=files
|
|
)
|
|
|
|
print(f"🔍 YouTube 업로드 응답 상태: {response.status_code}")
|
|
|
|
if response.status_code == 401:
|
|
raise HTTPException(status_code=401, detail="YouTube API 인증 실패")
|
|
elif response.status_code == 403:
|
|
error_data = response.json() if response.headers.get('content-type', '').startswith('application/json') else {}
|
|
error_reason = error_data.get('error', {}).get('errors', [{}])[0].get('reason', 'unknown')
|
|
|
|
if error_reason == 'quotaExceeded':
|
|
raise HTTPException(status_code=403, detail="YouTube API 할당량 초과")
|
|
elif error_reason == 'uploadLimitExceeded':
|
|
raise HTTPException(status_code=403, detail="일일 업로드 한도 초과")
|
|
else:
|
|
raise HTTPException(status_code=403, detail=f"YouTube API 권한 거부: {error_reason}")
|
|
elif response.status_code != 200:
|
|
error_text = response.text
|
|
print(f"❌ YouTube 업로드 에러: {error_text}")
|
|
raise Exception(f"YouTube 업로드 실패 ({response.status_code}): {error_text}")
|
|
|
|
result = response.json()
|
|
video_id = result.get('id')
|
|
upload_status = result.get('status', {}).get('uploadStatus')
|
|
|
|
if not video_id:
|
|
raise Exception("YouTube 응답에서 video_id를 찾을 수 없습니다")
|
|
|
|
print(f"✅ YouTube 업로드 성공: video_id={video_id}, status={upload_status}")
|
|
|
|
return {
|
|
"video_id": video_id,
|
|
"upload_status": upload_status,
|
|
"raw_response": result
|
|
}
|
|
|
|
except HTTPException:
|
|
raise
|
|
except Exception as e:
|
|
print(f"❌ YouTube API 업로드 실패: {e}")
|
|
raise Exception(f"YouTube API 업로드 실패: {str(e)}") |