sns, social 비즈니스 로직 분리 .

subtitle
hbyang 2026-03-09 09:21:34 +09:00
parent d7a649809f
commit ce79cb5d04
12 changed files with 778 additions and 630 deletions

View File

@ -238,7 +238,7 @@ async def get_account_by_platform(
raise SocialAccountNotFoundError(platform=platform.value) raise SocialAccountNotFoundError(platform=platform.value)
return social_account_service._to_response(account) return social_account_service.to_response(account)
@router.delete( @router.delete(

View File

@ -1,131 +1,37 @@
"""
소셜 SEO API 라우터
import logging, json SEO 관련 엔드포인트를 제공합니다.
비즈니스 로직은 SeoService에 위임합니다.
"""
from redis.asyncio import Redis import logging
from config import social_oauth_settings, db_settings from fastapi import APIRouter, Depends
from app.social.constants import YOUTUBE_SEO_HASH
from sqlalchemy import select, func
from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.ext.asyncio import AsyncSession
from app.social.schemas import (
YoutubeDescriptionRequest,
YoutubeDescriptionResponse,
)
from app.database.session import get_session from app.database.session import get_session
from app.social.schemas import YoutubeDescriptionRequest, YoutubeDescriptionResponse
from app.social.services import seo_service
from app.user.dependencies import get_current_user from app.user.dependencies import get_current_user
from app.user.models import User from app.user.models import User
from app.home.models import Project, MarketingIntel
from fastapi import APIRouter, BackgroundTasks, Depends, Query
from fastapi import HTTPException, status
from app.utils.prompts.prompts import yt_upload_prompt
from app.utils.chatgpt_prompt import ChatgptService, ChatGPTResponseError
redis_seo_client = Redis(
host=db_settings.REDIS_HOST,
port=db_settings.REDIS_PORT,
db=0,
decode_responses=True,
)
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
router = APIRouter(prefix="/seo", tags=["Social SEO"]) router = APIRouter(prefix="/seo", tags=["Social SEO"])
@router.post( @router.post(
"/youtube", "/youtube",
response_model=YoutubeDescriptionResponse, response_model=YoutubeDescriptionResponse,
summary="유튜브 SEO descrption 생성", summary="유튜브 SEO description 생성",
description="유튜브 업로드 시 사용할 descrption을 SEO 적용하여 생성", description="유튜브 업로드 시 사용할 description을 SEO 적용하여 생성",
) )
async def youtube_seo_description( async def youtube_seo_description(
request_body: YoutubeDescriptionRequest, request_body: YoutubeDescriptionRequest,
current_user: User = Depends(get_current_user), current_user: User = Depends(get_current_user),
session: AsyncSession = Depends(get_session), session: AsyncSession = Depends(get_session),
) -> YoutubeDescriptionResponse: ) -> YoutubeDescriptionResponse:
return await seo_service.get_youtube_seo_description(
# TODO : 나중에 Session Task_id 검증 미들웨어 만들면 추가해주세요. request_body.task_id, current_user, session
logger.info(
f"[youtube_seo_description] Try Cache - user: {current_user.user_uuid} / task_id : {request_body.task_id}"
) )
cached = await get_yt_seo_in_redis(request_body.task_id)
if cached: # redis hit
return cached
logger.info(
f"[youtube_seo_description] Cache miss - user: {current_user.user_uuid} "
)
updated_seo = await make_youtube_seo_description(request_body.task_id, current_user, session)
await set_yt_seo_in_redis(request_body.task_id, updated_seo)
return updated_seo
async def make_youtube_seo_description(
task_id: str,
current_user: User,
session: AsyncSession,
) -> YoutubeDescriptionResponse:
logger.info(
f"[make_youtube_seo_description] START - user: {current_user.user_uuid} "
)
try:
project_query = await session.execute(
select(Project)
.where(
Project.task_id == task_id,
Project.user_uuid == current_user.user_uuid)
.order_by(Project.created_at.desc())
.limit(1)
)
project = project_query.scalar_one_or_none()
marketing_query = await session.execute(
select(MarketingIntel)
.where(MarketingIntel.id == project.marketing_intelligence)
)
marketing_intelligence = marketing_query.scalar_one_or_none()
hashtags = marketing_intelligence.intel_result["target_keywords"]
yt_seo_input_data = {
"customer_name" : project.store_name,
"detail_region_info" : project.detail_region_info,
"marketing_intelligence_summary" : json.dumps(marketing_intelligence.intel_result, ensure_ascii=False),
"language" : project.language,
"target_keywords" : hashtags
}
chatgpt = ChatgptService(timeout = 180)
yt_seo_output = await chatgpt.generate_structured_output(yt_upload_prompt, yt_seo_input_data)
result_dict = {
"title" : yt_seo_output.title,
"description" : yt_seo_output.description,
"keywords": hashtags
}
result = YoutubeDescriptionResponse(**result_dict)
return result
except Exception as e:
logger.error(f"[youtube_seo_description] EXCEPTION - error: {e}")
raise HTTPException(
status_code=500,
detail=f"유튜브 SEO 생성에 실패했습니다. : {str(e)}",
)
async def get_yt_seo_in_redis(task_id:str) -> YoutubeDescriptionResponse | None:
field = f"task_id:{task_id}"
yt_seo_info = await redis_seo_client.hget(YOUTUBE_SEO_HASH, field)
if yt_seo_info:
yt_seo = json.loads(yt_seo_info)
else:
return None
return YoutubeDescriptionResponse(**yt_seo)
async def set_yt_seo_in_redis(task_id:str, yt_seo : YoutubeDescriptionResponse) -> None:
field = f"task_id:{task_id}"
yt_seo_info = json.dumps(yt_seo.model_dump(), ensure_ascii=False)
await redis_seo_client.hsetex(YOUTUBE_SEO_HASH, field, yt_seo_info, ex=3600)
return

View File

@ -2,39 +2,34 @@
소셜 업로드 API 라우터 소셜 업로드 API 라우터
소셜 미디어 영상 업로드 관련 엔드포인트를 제공합니다. 소셜 미디어 영상 업로드 관련 엔드포인트를 제공합니다.
비즈니스 로직은 SocialUploadService에 위임합니다.
""" """
import logging import logging
from datetime import datetime, timezone
from typing import Optional from typing import Optional
from fastapi import APIRouter, BackgroundTasks, Depends, Query from fastapi import APIRouter, BackgroundTasks, Depends, Query
from fastapi import HTTPException, status
from sqlalchemy import select, func
from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.ext.asyncio import AsyncSession
from app.database.session import get_session from app.database.session import get_session
from app.social.constants import SocialPlatform, UploadStatus from app.social.constants import SocialPlatform
from app.social.exceptions import SocialAccountNotFoundError, VideoNotFoundError
from app.social.models import SocialUpload
from app.social.schemas import ( from app.social.schemas import (
MessageResponse, MessageResponse,
SocialUploadHistoryItem,
SocialUploadHistoryResponse, SocialUploadHistoryResponse,
SocialUploadRequest, SocialUploadRequest,
SocialUploadResponse, SocialUploadResponse,
SocialUploadStatusResponse, SocialUploadStatusResponse,
) )
from app.social.services import social_account_service from app.social.services import SocialUploadService, social_account_service
from app.social.worker.upload_task import process_social_upload
from app.user.dependencies import get_current_user from app.user.dependencies import get_current_user
from app.user.models import User from app.user.models import User
from app.video.models import Video
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
router = APIRouter(prefix="/upload", tags=["Social Upload"]) router = APIRouter(prefix="/upload", tags=["Social Upload"])
upload_service = SocialUploadService(account_service=social_account_service)
@router.post( @router.post(
"", "",
@ -71,129 +66,7 @@ async def upload_to_social(
current_user: User = Depends(get_current_user), current_user: User = Depends(get_current_user),
session: AsyncSession = Depends(get_session), session: AsyncSession = Depends(get_session),
) -> SocialUploadResponse: ) -> SocialUploadResponse:
""" return await upload_service.request_upload(body, current_user, session, background_tasks)
소셜 플랫폼에 영상 업로드 요청
백그라운드에서 영상을 다운로드하고 소셜 플랫폼에 업로드합니다.
"""
logger.info(
f"[UPLOAD_API] 업로드 요청 - "
f"user_uuid: {current_user.user_uuid}, "
f"video_id: {body.video_id}, "
f"social_account_id: {body.social_account_id}"
)
# 1. 영상 조회 및 검증
video_result = await session.execute(
select(Video).where(Video.id == body.video_id)
)
video = video_result.scalar_one_or_none()
if not video:
logger.warning(f"[UPLOAD_API] 영상 없음 - video_id: {body.video_id}")
raise VideoNotFoundError(video_id=body.video_id)
if not video.result_movie_url:
logger.warning(f"[UPLOAD_API] 영상 URL 없음 - video_id: {body.video_id}")
raise VideoNotFoundError(
video_id=body.video_id,
detail="영상이 아직 준비되지 않았습니다. 영상 생성이 완료된 후 시도해주세요.",
)
# 2. 소셜 계정 조회 (social_account_id로 직접 조회, 소유권 검증 포함)
account = await social_account_service.get_account_by_id(
user_uuid=current_user.user_uuid,
account_id=body.social_account_id,
session=session,
)
if not account:
logger.warning(
f"[UPLOAD_API] 연동 계정 없음 - "
f"user_uuid: {current_user.user_uuid}, social_account_id: {body.social_account_id}"
)
raise SocialAccountNotFoundError()
# 3. 진행 중인 업로드 확인 (pending 또는 uploading 상태만)
in_progress_result = await session.execute(
select(SocialUpload).where(
SocialUpload.video_id == body.video_id,
SocialUpload.social_account_id == account.id,
SocialUpload.status.in_([UploadStatus.PENDING.value, UploadStatus.UPLOADING.value]),
)
)
in_progress_upload = in_progress_result.scalar_one_or_none()
if in_progress_upload:
logger.info(
f"[UPLOAD_API] 진행 중인 업로드 존재 - upload_id: {in_progress_upload.id}"
)
return SocialUploadResponse(
success=True,
upload_id=in_progress_upload.id,
platform=account.platform,
status=in_progress_upload.status,
message="이미 업로드가 진행 중입니다.",
)
# 4. 업로드 순번 계산 (동일 video + account 조합에서 최대 순번 + 1)
max_seq_result = await session.execute(
select(func.coalesce(func.max(SocialUpload.upload_seq), 0)).where(
SocialUpload.video_id == body.video_id,
SocialUpload.social_account_id == account.id,
)
)
max_seq = max_seq_result.scalar() or 0
next_seq = max_seq + 1
# 5. 새 업로드 레코드 생성 (항상 새로 생성하여 이력 보존)
social_upload = SocialUpload(
user_uuid=current_user.user_uuid,
video_id=body.video_id,
social_account_id=account.id,
upload_seq=next_seq,
platform=account.platform,
status=UploadStatus.PENDING.value,
upload_progress=0,
title=body.title,
description=body.description,
tags=body.tags,
privacy_status=body.privacy_status.value,
scheduled_at=body.scheduled_at,
platform_options={
**(body.platform_options or {}),
"scheduled_at": body.scheduled_at.isoformat() if body.scheduled_at else None,
},
retry_count=0,
)
session.add(social_upload)
await session.commit()
await session.refresh(social_upload)
logger.info(
f"[UPLOAD_API] 업로드 레코드 생성 - "
f"upload_id: {social_upload.id}, video_id: {body.video_id}, "
f"account_id: {account.id}, upload_seq: {next_seq}, platform: {account.platform}"
)
# 6. 백그라운드 태스크 등록 (즉시 업로드 or 예약 없을 때만)
# scheduled_at은 naive datetime (클라이언트에서 KST 기준으로 전송)
# config의 TIMEZONE(KST) 기준 현재 시간과 비교
from config import TIMEZONE
now_kst_naive = datetime.now(TIMEZONE).replace(tzinfo=None)
is_scheduled = body.scheduled_at and body.scheduled_at > now_kst_naive
if not is_scheduled:
background_tasks.add_task(process_social_upload, social_upload.id)
message = "예약 업로드가 등록되었습니다." if is_scheduled else "업로드 요청이 접수되었습니다."
return SocialUploadResponse(
success=True,
upload_id=social_upload.id,
platform=account.platform,
status=social_upload.status,
message=message,
)
@router.get( @router.get(
@ -207,43 +80,7 @@ async def get_upload_status(
current_user: User = Depends(get_current_user), current_user: User = Depends(get_current_user),
session: AsyncSession = Depends(get_session), session: AsyncSession = Depends(get_session),
) -> SocialUploadStatusResponse: ) -> SocialUploadStatusResponse:
""" return await upload_service.get_upload_status(upload_id, current_user, session)
업로드 상태 조회
"""
logger.info(f"[UPLOAD_API] 상태 조회 - upload_id: {upload_id}")
result = await session.execute(
select(SocialUpload).where(
SocialUpload.id == upload_id,
SocialUpload.user_uuid == current_user.user_uuid,
)
)
upload = result.scalar_one_or_none()
if not upload:
from fastapi import HTTPException, status
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="업로드 정보를 찾을 수 없습니다.",
)
return SocialUploadStatusResponse(
upload_id=upload.id,
video_id=upload.video_id,
social_account_id=upload.social_account_id,
upload_seq=upload.upload_seq,
platform=upload.platform,
status=UploadStatus(upload.status),
upload_progress=upload.upload_progress,
title=upload.title,
platform_video_id=upload.platform_video_id,
platform_url=upload.platform_url,
error_message=upload.error_message,
retry_count=upload.retry_count,
created_at=upload.created_at,
uploaded_at=upload.uploaded_at,
)
@router.get( @router.get(
@ -270,98 +107,8 @@ async def get_upload_history(
page: int = Query(1, ge=1, description="페이지 번호"), page: int = Query(1, ge=1, description="페이지 번호"),
size: int = Query(20, ge=1, le=100, description="페이지 크기"), size: int = Query(20, ge=1, le=100, description="페이지 크기"),
) -> SocialUploadHistoryResponse: ) -> SocialUploadHistoryResponse:
""" return await upload_service.get_upload_history(
업로드 이력 조회 current_user, session, tab, platform, year, month, page, size
"""
now = datetime.now(timezone.utc)
target_year = year or now.year
target_month = month or now.month
logger.info(
f"[UPLOAD_API] 이력 조회 - "
f"user_uuid: {current_user.user_uuid}, tab: {tab}, "
f"year: {target_year}, month: {target_month}, page: {page}, size: {size}"
)
# 월 범위 계산
from calendar import monthrange
last_day = monthrange(target_year, target_month)[1]
month_start = datetime(target_year, target_month, 1, 0, 0, 0)
month_end = datetime(target_year, target_month, last_day, 23, 59, 59)
# 기본 쿼리 (cancelled 제외)
query = select(SocialUpload).where(
SocialUpload.user_uuid == current_user.user_uuid,
SocialUpload.created_at >= month_start,
SocialUpload.created_at <= month_end,
SocialUpload.status != UploadStatus.CANCELLED.value,
)
count_query = select(func.count(SocialUpload.id)).where(
SocialUpload.user_uuid == current_user.user_uuid,
SocialUpload.created_at >= month_start,
SocialUpload.created_at <= month_end,
SocialUpload.status != UploadStatus.CANCELLED.value,
)
# 탭 필터 적용
if tab == "completed":
query = query.where(SocialUpload.status == UploadStatus.COMPLETED.value)
count_query = count_query.where(SocialUpload.status == UploadStatus.COMPLETED.value)
elif tab == "scheduled":
query = query.where(
SocialUpload.status == UploadStatus.PENDING.value,
SocialUpload.scheduled_at.isnot(None),
)
count_query = count_query.where(
SocialUpload.status == UploadStatus.PENDING.value,
SocialUpload.scheduled_at.isnot(None),
)
elif tab == "failed":
query = query.where(SocialUpload.status == UploadStatus.FAILED.value)
count_query = count_query.where(SocialUpload.status == UploadStatus.FAILED.value)
# 플랫폼 필터 적용
if platform:
query = query.where(SocialUpload.platform == platform.value)
count_query = count_query.where(SocialUpload.platform == platform.value)
# 총 개수 조회
total_result = await session.execute(count_query)
total = total_result.scalar() or 0
# 페이지네이션 적용
query = (
query.order_by(SocialUpload.created_at.desc())
.offset((page - 1) * size)
.limit(size)
)
result = await session.execute(query)
uploads = result.scalars().all()
items = [
SocialUploadHistoryItem(
upload_id=upload.id,
video_id=upload.video_id,
social_account_id=upload.social_account_id,
upload_seq=upload.upload_seq,
platform=upload.platform,
status=upload.status,
title=upload.title,
platform_url=upload.platform_url,
error_message=upload.error_message,
scheduled_at=upload.scheduled_at,
created_at=upload.created_at,
uploaded_at=upload.uploaded_at,
)
for upload in uploads
]
return SocialUploadHistoryResponse(
items=items,
total=total,
page=page,
size=size,
) )
@ -377,53 +124,7 @@ async def retry_upload(
current_user: User = Depends(get_current_user), current_user: User = Depends(get_current_user),
session: AsyncSession = Depends(get_session), session: AsyncSession = Depends(get_session),
) -> SocialUploadResponse: ) -> SocialUploadResponse:
""" return await upload_service.retry_upload(upload_id, current_user, session, background_tasks)
업로드 재시도
실패한 업로드를 다시 시도합니다.
"""
logger.info(f"[UPLOAD_API] 재시도 요청 - upload_id: {upload_id}")
result = await session.execute(
select(SocialUpload).where(
SocialUpload.id == upload_id,
SocialUpload.user_uuid == current_user.user_uuid,
)
)
upload = result.scalar_one_or_none()
if not upload:
from fastapi import HTTPException, status
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="업로드 정보를 찾을 수 없습니다.",
)
if upload.status not in [UploadStatus.FAILED.value, UploadStatus.CANCELLED.value]:
from fastapi import HTTPException, status
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="실패하거나 취소된 업로드만 재시도할 수 있습니다.",
)
# 상태 초기화
upload.status = UploadStatus.PENDING.value
upload.upload_progress = 0
upload.error_message = None
await session.commit()
# 백그라운드 태스크 등록
background_tasks.add_task(process_social_upload, upload.id)
return SocialUploadResponse(
success=True,
upload_id=upload.id,
platform=upload.platform,
status=upload.status,
message="업로드 재시도가 요청되었습니다.",
)
@router.delete( @router.delete(
@ -437,38 +138,4 @@ async def cancel_upload(
current_user: User = Depends(get_current_user), current_user: User = Depends(get_current_user),
session: AsyncSession = Depends(get_session), session: AsyncSession = Depends(get_session),
) -> MessageResponse: ) -> MessageResponse:
""" return await upload_service.cancel_upload(upload_id, current_user, session)
업로드 취소
대기 중인 업로드를 취소합니다.
이미 진행 중이거나 완료된 업로드는 취소할 없습니다.
"""
logger.info(f"[UPLOAD_API] 취소 요청 - upload_id: {upload_id}")
result = await session.execute(
select(SocialUpload).where(
SocialUpload.id == upload_id,
SocialUpload.user_uuid == current_user.user_uuid,
)
)
upload = result.scalar_one_or_none()
if not upload:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="업로드 정보를 찾을 수 없습니다.",
)
if upload.status != UploadStatus.PENDING.value:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="대기 중인 업로드만 취소할 수 있습니다.",
)
upload.status = UploadStatus.CANCELLED.value
await session.commit()
return MessageResponse(
success=True,
message="업로드가 취소되었습니다.",
)

View File

@ -0,0 +1,35 @@
from app.social.schemas.oauth_schema import (
SocialConnectResponse,
SocialAccountResponse,
SocialAccountListResponse,
OAuthTokenResponse,
PlatformUserInfo,
MessageResponse,
)
from app.social.schemas.upload_schema import (
SocialUploadRequest,
SocialUploadResponse,
SocialUploadStatusResponse,
SocialUploadHistoryItem,
SocialUploadHistoryResponse,
)
from app.social.schemas.seo_schema import (
YoutubeDescriptionRequest,
YoutubeDescriptionResponse,
)
__all__ = [
"SocialConnectResponse",
"SocialAccountResponse",
"SocialAccountListResponse",
"OAuthTokenResponse",
"PlatformUserInfo",
"MessageResponse",
"SocialUploadRequest",
"SocialUploadResponse",
"SocialUploadStatusResponse",
"SocialUploadHistoryItem",
"SocialUploadHistoryResponse",
"YoutubeDescriptionRequest",
"YoutubeDescriptionResponse",
]

View File

@ -0,0 +1,125 @@
"""
소셜 OAuth 관련 Pydantic 스키마
"""
from datetime import datetime
from typing import Any, Optional
from pydantic import BaseModel, ConfigDict, Field
class SocialConnectResponse(BaseModel):
"""소셜 계정 연동 시작 응답"""
auth_url: str = Field(..., description="OAuth 인증 URL")
state: str = Field(..., description="CSRF 방지용 state 토큰")
platform: str = Field(..., description="플랫폼명")
model_config = ConfigDict(
json_schema_extra={
"example": {
"auth_url": "https://accounts.google.com/o/oauth2/v2/auth?...",
"state": "abc123xyz",
"platform": "youtube",
}
}
)
class SocialAccountResponse(BaseModel):
"""연동된 소셜 계정 정보"""
id: int = Field(..., description="소셜 계정 ID")
platform: str = Field(..., description="플랫폼명")
platform_user_id: str = Field(..., description="플랫폼 내 사용자 ID")
platform_username: Optional[str] = Field(None, description="플랫폼 내 사용자명")
display_name: Optional[str] = Field(None, description="표시 이름")
profile_image_url: Optional[str] = Field(None, description="프로필 이미지 URL")
is_active: bool = Field(..., description="활성화 상태")
connected_at: datetime = Field(..., description="연동 일시")
platform_data: Optional[dict[str, Any]] = Field(
None, description="플랫폼별 추가 정보 (채널ID, 구독자 수 등)"
)
model_config = ConfigDict(
from_attributes=True,
json_schema_extra={
"example": {
"id": 1,
"platform": "youtube",
"platform_user_id": "UC1234567890",
"platform_username": "my_channel",
"display_name": "My Channel",
"profile_image_url": "https://...",
"is_active": True,
"connected_at": "2024-01-15T12:00:00",
"platform_data": {
"channel_id": "UC1234567890",
"channel_title": "My Channel",
"subscriber_count": 1000,
},
}
}
)
class SocialAccountListResponse(BaseModel):
"""연동된 소셜 계정 목록 응답"""
accounts: list[SocialAccountResponse] = Field(..., description="연동 계정 목록")
total: int = Field(..., description="총 연동 계정 수")
model_config = ConfigDict(
json_schema_extra={
"example": {
"accounts": [
{
"id": 1,
"platform": "youtube",
"platform_user_id": "UC1234567890",
"platform_username": "my_channel",
"display_name": "My Channel",
"is_active": True,
"connected_at": "2024-01-15T12:00:00",
}
],
"total": 1,
}
}
)
class OAuthTokenResponse(BaseModel):
"""OAuth 토큰 응답 (내부 사용)"""
access_token: str
refresh_token: Optional[str] = None
expires_in: int
token_type: str = "Bearer"
scope: Optional[str] = None
class PlatformUserInfo(BaseModel):
"""플랫폼 사용자 정보 (내부 사용)"""
platform_user_id: str
username: Optional[str] = None
display_name: Optional[str] = None
profile_image_url: Optional[str] = None
platform_data: dict[str, Any] = Field(default_factory=dict)
class MessageResponse(BaseModel):
"""단순 메시지 응답"""
success: bool = Field(..., description="성공 여부")
message: str = Field(..., description="응답 메시지")
model_config = ConfigDict(
json_schema_extra={
"example": {
"success": True,
"message": "작업이 완료되었습니다.",
}
}
)

View File

@ -0,0 +1,37 @@
"""
소셜 SEO 관련 Pydantic 스키마
"""
from pydantic import BaseModel, ConfigDict, Field
class YoutubeDescriptionRequest(BaseModel):
"""유튜브 SEO Description 제안 요청"""
task_id: str = Field(..., description="작업 고유 식별자")
model_config = ConfigDict(
json_schema_extra={
"example": {
"task_id": "019c739f-65fc-7d15-8c88-b31be00e588e"
}
}
)
class YoutubeDescriptionResponse(BaseModel):
"""유튜브 SEO Description 제안 응답"""
title: str = Field(..., description="유튜브 영상 제목 - SEO/AEO 최적화")
description: str = Field(..., description="제안된 유튜브 SEO Description")
keywords: list[str] = Field(..., description="해시태그 리스트")
model_config = ConfigDict(
json_schema_extra={
"example": {
"title": "여기에 더미 타이틀",
"description": "여기에 더미 텍스트",
"keywords": ["여기에", "더미", "해시태그"]
}
}
)

View File

@ -1,7 +1,5 @@
""" """
Social Media Schemas 소셜 업로드 관련 Pydantic 스키마
소셜 미디어 연동 관련 Pydantic 스키마를 정의합니다.
""" """
from datetime import datetime from datetime import datetime
@ -9,123 +7,7 @@ from typing import Any, Optional
from pydantic import BaseModel, ConfigDict, Field from pydantic import BaseModel, ConfigDict, Field
from app.social.constants import PrivacyStatus, SocialPlatform, UploadStatus from app.social.constants import PrivacyStatus, UploadStatus
# =============================================================================
# OAuth 관련 스키마
# =============================================================================
class SocialConnectResponse(BaseModel):
"""소셜 계정 연동 시작 응답"""
auth_url: str = Field(..., description="OAuth 인증 URL")
state: str = Field(..., description="CSRF 방지용 state 토큰")
platform: str = Field(..., description="플랫폼명")
model_config = ConfigDict(
json_schema_extra={
"example": {
"auth_url": "https://accounts.google.com/o/oauth2/v2/auth?...",
"state": "abc123xyz",
"platform": "youtube",
}
}
)
class SocialAccountResponse(BaseModel):
"""연동된 소셜 계정 정보"""
id: int = Field(..., description="소셜 계정 ID")
platform: str = Field(..., description="플랫폼명")
platform_user_id: str = Field(..., description="플랫폼 내 사용자 ID")
platform_username: Optional[str] = Field(None, description="플랫폼 내 사용자명")
display_name: Optional[str] = Field(None, description="표시 이름")
profile_image_url: Optional[str] = Field(None, description="프로필 이미지 URL")
is_active: bool = Field(..., description="활성화 상태")
connected_at: datetime = Field(..., description="연동 일시")
platform_data: Optional[dict[str, Any]] = Field(
None, description="플랫폼별 추가 정보 (채널ID, 구독자 수 등)"
)
model_config = ConfigDict(
from_attributes=True,
json_schema_extra={
"example": {
"id": 1,
"platform": "youtube",
"platform_user_id": "UC1234567890",
"platform_username": "my_channel",
"display_name": "My Channel",
"profile_image_url": "https://...",
"is_active": True,
"connected_at": "2024-01-15T12:00:00",
"platform_data": {
"channel_id": "UC1234567890",
"channel_title": "My Channel",
"subscriber_count": 1000,
},
}
}
)
class SocialAccountListResponse(BaseModel):
"""연동된 소셜 계정 목록 응답"""
accounts: list[SocialAccountResponse] = Field(..., description="연동 계정 목록")
total: int = Field(..., description="총 연동 계정 수")
model_config = ConfigDict(
json_schema_extra={
"example": {
"accounts": [
{
"id": 1,
"platform": "youtube",
"platform_user_id": "UC1234567890",
"platform_username": "my_channel",
"display_name": "My Channel",
"is_active": True,
"connected_at": "2024-01-15T12:00:00",
}
],
"total": 1,
}
}
)
# =============================================================================
# 내부 사용 스키마 (OAuth 토큰 응답)
# =============================================================================
class OAuthTokenResponse(BaseModel):
"""OAuth 토큰 응답 (내부 사용)"""
access_token: str
refresh_token: Optional[str] = None
expires_in: int
token_type: str = "Bearer"
scope: Optional[str] = None
class PlatformUserInfo(BaseModel):
"""플랫폼 사용자 정보 (내부 사용)"""
platform_user_id: str
username: Optional[str] = None
display_name: Optional[str] = None
profile_image_url: Optional[str] = None
platform_data: dict[str, Any] = Field(default_factory=dict)
# =============================================================================
# 업로드 관련 스키마
# =============================================================================
class SocialUploadRequest(BaseModel): class SocialUploadRequest(BaseModel):
@ -159,7 +41,7 @@ class SocialUploadRequest(BaseModel):
"privacy_status": "public", "privacy_status": "public",
"scheduled_at": "2026-02-02T15:00:00", "scheduled_at": "2026-02-02T15:00:00",
"platform_options": { "platform_options": {
"category_id": "22", # YouTube 카테고리 "category_id": "22",
}, },
} }
} }
@ -278,50 +160,3 @@ class SocialUploadHistoryResponse(BaseModel):
} }
} }
) )
class YoutubeDescriptionRequest(BaseModel):
"""유튜브 SEO Description 제안 (자동완성) Request 모델"""
model_config = ConfigDict(
json_schema_extra={
"example": {
"task_id" : "019c739f-65fc-7d15-8c88-b31be00e588e"
}
}
)
task_id: str = Field(..., description="작업 고유 식별자")
class YoutubeDescriptionResponse(BaseModel):
"""유튜브 SEO Description 제안 (자동완성) Response 모델"""
title:str = Field(..., description="유튜브 영상 제목 - SEO/AEO 최적화")
description : str = Field(..., description="제안된 유튜브 SEO Description")
keywords : list[str] = Field(..., description="해시태그 리스트")
model_config = ConfigDict(
json_schema_extra={
"example": {
"title" : "여기에 더미 타이틀",
"description": "여기에 더미 텍스트",
"keywords": ["여기에", "더미", "해시태그"]
}
}
)
# =============================================================================
# 공통 응답 스키마
# =============================================================================
class MessageResponse(BaseModel):
"""단순 메시지 응답"""
success: bool = Field(..., description="성공 여부")
message: str = Field(..., description="응답 메시지")
model_config = ConfigDict(
json_schema_extra={
"example": {
"success": True,
"message": "작업이 완료되었습니다.",
}
}
)

View File

@ -0,0 +1,11 @@
from app.social.services.account_service import SocialAccountService, social_account_service
from app.social.services.upload_service import SocialUploadService
from app.social.services.seo_service import SeoService, seo_service
__all__ = [
"SocialAccountService",
"social_account_service",
"SocialUploadService",
"SeoService",
"seo_service",
]

View File

@ -188,7 +188,7 @@ class SocialAccountService:
session=session, session=session,
update_connected_at=is_reactivation, # 재활성화 시에만 연결 시간 업데이트 update_connected_at=is_reactivation, # 재활성화 시에만 연결 시간 업데이트
) )
return self._to_response(existing_account) return self.to_response(existing_account)
# 5. 새 소셜 계정 저장 (기존 계정이 없는 경우에만) # 5. 새 소셜 계정 저장 (기존 계정이 없는 경우에만)
social_account = await self._create_social_account( social_account = await self._create_social_account(
@ -204,7 +204,7 @@ class SocialAccountService:
f"account_id: {social_account.id}, platform: {platform.value}" f"account_id: {social_account.id}, platform: {platform.value}"
) )
return self._to_response(social_account) return self.to_response(social_account)
async def get_connected_accounts( async def get_connected_accounts(
self, self,
@ -241,7 +241,7 @@ class SocialAccountService:
for account in accounts: for account in accounts:
await self._try_refresh_token(account, session) await self._try_refresh_token(account, session)
return [self._to_response(account) for account in accounts] return [self.to_response(account) for account in accounts]
async def refresh_all_tokens( async def refresh_all_tokens(
self, self,
@ -713,7 +713,7 @@ class SocialAccountService:
return account return account
def _to_response(self, account: SocialAccount) -> SocialAccountResponse: def to_response(self, account: SocialAccount) -> SocialAccountResponse:
""" """
SocialAccount를 SocialAccountResponse로 변환 SocialAccount를 SocialAccountResponse로 변환

View File

@ -0,0 +1,12 @@
"""
소셜 서비스 베이스 클래스
"""
from sqlalchemy.ext.asyncio import AsyncSession
class BaseService:
"""서비스 레이어 베이스 클래스"""
def __init__(self, session: AsyncSession | None = None):
self.session = session

View File

@ -0,0 +1,129 @@
"""
유튜브 SEO 서비스
SEO description 생성 Redis 캐싱 로직을 처리합니다.
"""
import json
import logging
from fastapi import HTTPException
from redis.asyncio import Redis
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from config import db_settings
from app.home.models import MarketingIntel, Project
from app.social.constants import YOUTUBE_SEO_HASH
from app.social.schemas import YoutubeDescriptionResponse
from app.user.models import User
from app.utils.chatgpt_prompt import ChatgptService
from app.utils.prompts.prompts import yt_upload_prompt
logger = logging.getLogger(__name__)
redis_seo_client = Redis(
host=db_settings.REDIS_HOST,
port=db_settings.REDIS_PORT,
db=0,
decode_responses=True,
)
class SeoService:
"""유튜브 SEO 비즈니스 로직 서비스"""
async def get_youtube_seo_description(
self,
task_id: str,
current_user: User,
session: AsyncSession,
) -> YoutubeDescriptionResponse:
"""
유튜브 SEO description 생성
Redis 캐시 확인 miss이면 GPT로 생성하고 캐싱.
"""
logger.info(
f"[SEO_SERVICE] Try Cache - user: {current_user.user_uuid} / task_id: {task_id}"
)
cached = await self._get_from_redis(task_id)
if cached:
return cached
logger.info(f"[SEO_SERVICE] Cache miss - user: {current_user.user_uuid}")
result = await self._generate_seo_description(task_id, current_user, session)
await self._set_to_redis(task_id, result)
return result
async def _generate_seo_description(
self,
task_id: str,
current_user: User,
session: AsyncSession,
) -> YoutubeDescriptionResponse:
"""GPT를 사용하여 SEO description 생성"""
logger.info(f"[SEO_SERVICE] Generating SEO - user: {current_user.user_uuid}")
try:
project_result = await session.execute(
select(Project)
.where(
Project.task_id == task_id,
Project.user_uuid == current_user.user_uuid,
)
.order_by(Project.created_at.desc())
.limit(1)
)
project = project_result.scalar_one_or_none()
marketing_result = await session.execute(
select(MarketingIntel).where(MarketingIntel.id == project.marketing_intelligence)
)
marketing_intelligence = marketing_result.scalar_one_or_none()
hashtags = marketing_intelligence.intel_result["target_keywords"]
yt_seo_input_data = {
"customer_name": project.store_name,
"detail_region_info": project.detail_region_info,
"marketing_intelligence_summary": json.dumps(
marketing_intelligence.intel_result, ensure_ascii=False
),
"language": project.language,
"target_keywords": hashtags,
}
chatgpt = ChatgptService(timeout=180)
yt_seo_output = await chatgpt.generate_structured_output(yt_upload_prompt, yt_seo_input_data)
return YoutubeDescriptionResponse(
title=yt_seo_output.title,
description=yt_seo_output.description,
keywords=hashtags,
)
except Exception as e:
logger.error(f"[SEO_SERVICE] EXCEPTION - error: {e}")
raise HTTPException(
status_code=500,
detail=f"유튜브 SEO 생성에 실패했습니다. : {str(e)}",
)
async def _get_from_redis(self, task_id: str) -> YoutubeDescriptionResponse | None:
field = f"task_id:{task_id}"
yt_seo_info = await redis_seo_client.hget(YOUTUBE_SEO_HASH, field)
if yt_seo_info:
return YoutubeDescriptionResponse(**json.loads(yt_seo_info))
return None
async def _set_to_redis(self, task_id: str, yt_seo: YoutubeDescriptionResponse) -> None:
field = f"task_id:{task_id}"
yt_seo_info = json.dumps(yt_seo.model_dump(), ensure_ascii=False)
await redis_seo_client.hset(YOUTUBE_SEO_HASH, field, yt_seo_info)
await redis_seo_client.expire(YOUTUBE_SEO_HASH, 3600)
seo_service = SeoService()

View File

@ -0,0 +1,391 @@
"""
소셜 업로드 서비스
업로드 요청, 상태 조회, 이력 조회, 재시도, 취소 관련 비즈니스 로직을 처리합니다.
"""
import logging
from calendar import monthrange
from datetime import datetime
from typing import Optional
from fastapi import BackgroundTasks, HTTPException, status
from sqlalchemy import func, select
from sqlalchemy.ext.asyncio import AsyncSession
from config import TIMEZONE
from app.social.constants import SocialPlatform, UploadStatus
from app.social.exceptions import SocialAccountNotFoundError, VideoNotFoundError
from app.social.models import SocialUpload
from app.social.schemas import (
MessageResponse,
SocialUploadHistoryItem,
SocialUploadHistoryResponse,
SocialUploadResponse,
SocialUploadStatusResponse,
SocialUploadRequest,
)
from app.social.services.account_service import SocialAccountService
from app.social.worker.upload_task import process_social_upload
from app.user.models import User
from app.video.models import Video
logger = logging.getLogger(__name__)
class SocialUploadService:
"""소셜 업로드 비즈니스 로직 서비스"""
def __init__(self, account_service: SocialAccountService):
self._account_service = account_service
async def request_upload(
self,
body: SocialUploadRequest,
current_user: User,
session: AsyncSession,
background_tasks: BackgroundTasks,
) -> SocialUploadResponse:
"""
소셜 플랫폼 업로드 요청
영상 검증, 계정 확인, 중복 확인 업로드 레코드 생성.
즉시 업로드이면 백그라운드 태스크 등록, 예약이면 스케줄러가 처리.
"""
logger.info(
f"[UPLOAD_SERVICE] 업로드 요청 - "
f"user_uuid: {current_user.user_uuid}, "
f"video_id: {body.video_id}, "
f"social_account_id: {body.social_account_id}"
)
# 1. 영상 조회 및 검증
video_result = await session.execute(
select(Video).where(Video.id == body.video_id)
)
video = video_result.scalar_one_or_none()
if not video:
logger.warning(f"[UPLOAD_SERVICE] 영상 없음 - video_id: {body.video_id}")
raise VideoNotFoundError(video_id=body.video_id)
if not video.result_movie_url:
logger.warning(f"[UPLOAD_SERVICE] 영상 URL 없음 - video_id: {body.video_id}")
raise VideoNotFoundError(
video_id=body.video_id,
detail="영상이 아직 준비되지 않았습니다. 영상 생성이 완료된 후 시도해주세요.",
)
# 2. 소셜 계정 조회 및 소유권 검증
account = await self._account_service.get_account_by_id(
user_uuid=current_user.user_uuid,
account_id=body.social_account_id,
session=session,
)
if not account:
logger.warning(
f"[UPLOAD_SERVICE] 연동 계정 없음 - "
f"user_uuid: {current_user.user_uuid}, social_account_id: {body.social_account_id}"
)
raise SocialAccountNotFoundError()
# 3. 진행 중인 업로드 확인 (pending 또는 uploading 상태만)
in_progress_result = await session.execute(
select(SocialUpload).where(
SocialUpload.video_id == body.video_id,
SocialUpload.social_account_id == account.id,
SocialUpload.status.in_([UploadStatus.PENDING.value, UploadStatus.UPLOADING.value]),
)
)
in_progress_upload = in_progress_result.scalar_one_or_none()
if in_progress_upload:
logger.info(
f"[UPLOAD_SERVICE] 진행 중인 업로드 존재 - upload_id: {in_progress_upload.id}"
)
return SocialUploadResponse(
success=True,
upload_id=in_progress_upload.id,
platform=account.platform,
status=in_progress_upload.status,
message="이미 업로드가 진행 중입니다.",
)
# 4. 업로드 순번 계산
max_seq_result = await session.execute(
select(func.coalesce(func.max(SocialUpload.upload_seq), 0)).where(
SocialUpload.video_id == body.video_id,
SocialUpload.social_account_id == account.id,
)
)
next_seq = (max_seq_result.scalar() or 0) + 1
# 5. 새 업로드 레코드 생성
social_upload = SocialUpload(
user_uuid=current_user.user_uuid,
video_id=body.video_id,
social_account_id=account.id,
upload_seq=next_seq,
platform=account.platform,
status=UploadStatus.PENDING.value,
upload_progress=0,
title=body.title,
description=body.description,
tags=body.tags,
privacy_status=body.privacy_status.value,
scheduled_at=body.scheduled_at,
platform_options={
**(body.platform_options or {}),
"scheduled_at": body.scheduled_at.isoformat() if body.scheduled_at else None,
},
retry_count=0,
)
session.add(social_upload)
await session.commit()
await session.refresh(social_upload)
logger.info(
f"[UPLOAD_SERVICE] 업로드 레코드 생성 - "
f"upload_id: {social_upload.id}, video_id: {body.video_id}, "
f"account_id: {account.id}, upload_seq: {next_seq}, platform: {account.platform}"
)
# 6. 즉시 업로드이면 백그라운드 태스크 등록
now_kst_naive = datetime.now(TIMEZONE).replace(tzinfo=None)
is_scheduled = body.scheduled_at and body.scheduled_at > now_kst_naive
if not is_scheduled:
background_tasks.add_task(process_social_upload, social_upload.id)
message = "예약 업로드가 등록되었습니다." if is_scheduled else "업로드 요청이 접수되었습니다."
return SocialUploadResponse(
success=True,
upload_id=social_upload.id,
platform=account.platform,
status=social_upload.status,
message=message,
)
async def get_upload_status(
self,
upload_id: int,
current_user: User,
session: AsyncSession,
) -> SocialUploadStatusResponse:
"""업로드 상태 조회"""
logger.info(f"[UPLOAD_SERVICE] 상태 조회 - upload_id: {upload_id}")
result = await session.execute(
select(SocialUpload).where(
SocialUpload.id == upload_id,
SocialUpload.user_uuid == current_user.user_uuid,
)
)
upload = result.scalar_one_or_none()
if not upload:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="업로드 정보를 찾을 수 없습니다.",
)
return SocialUploadStatusResponse(
upload_id=upload.id,
video_id=upload.video_id,
social_account_id=upload.social_account_id,
upload_seq=upload.upload_seq,
platform=upload.platform,
status=UploadStatus(upload.status),
upload_progress=upload.upload_progress,
title=upload.title,
platform_video_id=upload.platform_video_id,
platform_url=upload.platform_url,
error_message=upload.error_message,
retry_count=upload.retry_count,
scheduled_at=upload.scheduled_at,
created_at=upload.created_at,
uploaded_at=upload.uploaded_at,
)
async def get_upload_history(
self,
current_user: User,
session: AsyncSession,
tab: str = "all",
platform: Optional[SocialPlatform] = None,
year: Optional[int] = None,
month: Optional[int] = None,
page: int = 1,
size: int = 20,
) -> SocialUploadHistoryResponse:
"""업로드 이력 조회 (탭/년월/플랫폼 필터, 페이지네이션)"""
now_kst = datetime.now(TIMEZONE)
target_year = year or now_kst.year
target_month = month or now_kst.month
logger.info(
f"[UPLOAD_SERVICE] 이력 조회 - "
f"user_uuid: {current_user.user_uuid}, tab: {tab}, "
f"year: {target_year}, month: {target_month}, page: {page}, size: {size}"
)
# 월 범위 계산
last_day = monthrange(target_year, target_month)[1]
month_start = datetime(target_year, target_month, 1, 0, 0, 0)
month_end = datetime(target_year, target_month, last_day, 23, 59, 59)
# 기본 쿼리 (cancelled 제외)
base_conditions = [
SocialUpload.user_uuid == current_user.user_uuid,
SocialUpload.created_at >= month_start,
SocialUpload.created_at <= month_end,
SocialUpload.status != UploadStatus.CANCELLED.value,
]
query = select(SocialUpload).where(*base_conditions)
count_query = select(func.count(SocialUpload.id)).where(*base_conditions)
# 탭 필터 적용
if tab == "completed":
query = query.where(SocialUpload.status == UploadStatus.COMPLETED.value)
count_query = count_query.where(SocialUpload.status == UploadStatus.COMPLETED.value)
elif tab == "scheduled":
query = query.where(
SocialUpload.status == UploadStatus.PENDING.value,
SocialUpload.scheduled_at.isnot(None),
)
count_query = count_query.where(
SocialUpload.status == UploadStatus.PENDING.value,
SocialUpload.scheduled_at.isnot(None),
)
elif tab == "failed":
query = query.where(SocialUpload.status == UploadStatus.FAILED.value)
count_query = count_query.where(SocialUpload.status == UploadStatus.FAILED.value)
# 플랫폼 필터 적용
if platform:
query = query.where(SocialUpload.platform == platform.value)
count_query = count_query.where(SocialUpload.platform == platform.value)
# 총 개수 조회
total_result = await session.execute(count_query)
total = total_result.scalar() or 0
# 페이지네이션 적용
query = (
query.order_by(SocialUpload.created_at.desc())
.offset((page - 1) * size)
.limit(size)
)
result = await session.execute(query)
uploads = result.scalars().all()
items = [
SocialUploadHistoryItem(
upload_id=upload.id,
video_id=upload.video_id,
social_account_id=upload.social_account_id,
upload_seq=upload.upload_seq,
platform=upload.platform,
status=upload.status,
title=upload.title,
platform_url=upload.platform_url,
error_message=upload.error_message,
scheduled_at=upload.scheduled_at,
created_at=upload.created_at,
uploaded_at=upload.uploaded_at,
)
for upload in uploads
]
return SocialUploadHistoryResponse(
items=items,
total=total,
page=page,
size=size,
)
async def retry_upload(
self,
upload_id: int,
current_user: User,
session: AsyncSession,
background_tasks: BackgroundTasks,
) -> SocialUploadResponse:
"""실패한 업로드 재시도"""
logger.info(f"[UPLOAD_SERVICE] 재시도 요청 - upload_id: {upload_id}")
result = await session.execute(
select(SocialUpload).where(
SocialUpload.id == upload_id,
SocialUpload.user_uuid == current_user.user_uuid,
)
)
upload = result.scalar_one_or_none()
if not upload:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="업로드 정보를 찾을 수 없습니다.",
)
if upload.status not in [UploadStatus.FAILED.value, UploadStatus.CANCELLED.value]:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="실패하거나 취소된 업로드만 재시도할 수 있습니다.",
)
# 상태 초기화
upload.status = UploadStatus.PENDING.value
upload.upload_progress = 0
upload.error_message = None
await session.commit()
background_tasks.add_task(process_social_upload, upload.id)
return SocialUploadResponse(
success=True,
upload_id=upload.id,
platform=upload.platform,
status=upload.status,
message="업로드 재시도가 요청되었습니다.",
)
async def cancel_upload(
self,
upload_id: int,
current_user: User,
session: AsyncSession,
) -> MessageResponse:
"""대기 중인 업로드 취소"""
logger.info(f"[UPLOAD_SERVICE] 취소 요청 - upload_id: {upload_id}")
result = await session.execute(
select(SocialUpload).where(
SocialUpload.id == upload_id,
SocialUpload.user_uuid == current_user.user_uuid,
)
)
upload = result.scalar_one_or_none()
if not upload:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="업로드 정보를 찾을 수 없습니다.",
)
if upload.status != UploadStatus.PENDING.value:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="대기 중인 업로드만 취소할 수 있습니다.",
)
upload.status = UploadStatus.CANCELLED.value
await session.commit()
return MessageResponse(
success=True,
message="업로드가 취소되었습니다.",
)