modify set ver1

insta
Dohyun Lim 2026-02-02 15:30:26 +09:00
parent 19bd12d581
commit 08d47a6990
21 changed files with 2435 additions and 398 deletions

View File

@ -4,47 +4,224 @@ SNS API 라우터
Instagram 업로드 관련 엔드포인트를 제공합니다. Instagram 업로드 관련 엔드포인트를 제공합니다.
""" """
from fastapi import APIRouter from fastapi import APIRouter, Depends, HTTPException, status
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from app.sns.schemas.sns_schema import InstagramUploadResponse from app.database.session import get_session
from app.sns.schemas.sns_schema import InstagramUploadRequest, InstagramUploadResponse
# =============================================================================
# SNS 예외 클래스 정의
# =============================================================================
class SNSException(HTTPException):
"""SNS 관련 기본 예외"""
def __init__(self, status_code: int, code: str, message: str):
super().__init__(status_code=status_code, detail={"code": code, "message": message})
class SocialAccountNotFoundError(SNSException):
"""소셜 계정 없음"""
def __init__(self, message: str = "연동된 소셜 계정을 찾을 수 없습니다."):
super().__init__(status.HTTP_404_NOT_FOUND, "SOCIAL_ACCOUNT_NOT_FOUND", message)
class VideoNotFoundError(SNSException):
"""비디오 없음"""
def __init__(self, message: str = "해당 작업 ID에 대한 비디오를 찾을 수 없습니다."):
super().__init__(status.HTTP_404_NOT_FOUND, "VIDEO_NOT_FOUND", message)
class VideoUrlNotReadyError(SNSException):
"""비디오 URL 미준비"""
def __init__(self, message: str = "비디오가 아직 준비되지 않았습니다."):
super().__init__(status.HTTP_400_BAD_REQUEST, "VIDEO_URL_NOT_READY", message)
class InstagramUploadError(SNSException):
"""Instagram 업로드 실패"""
def __init__(self, message: str = "Instagram 업로드에 실패했습니다."):
super().__init__(status.HTTP_500_INTERNAL_SERVER_ERROR, "INSTAGRAM_UPLOAD_ERROR", message)
class InstagramRateLimitError(SNSException):
"""Instagram API Rate Limit"""
def __init__(self, message: str = "Instagram API 호출 제한을 초과했습니다.", retry_after: int = 60):
super().__init__(
status.HTTP_429_TOO_MANY_REQUESTS,
"INSTAGRAM_RATE_LIMIT",
f"{message} {retry_after}초 후 다시 시도해주세요.",
)
class InstagramAuthError(SNSException):
"""Instagram 인증 오류"""
def __init__(self, message: str = "Instagram 인증에 실패했습니다. 계정을 다시 연동해주세요."):
super().__init__(status.HTTP_401_UNAUTHORIZED, "INSTAGRAM_AUTH_ERROR", message)
class InstagramContainerTimeoutError(SNSException):
"""Instagram 미디어 처리 타임아웃"""
def __init__(self, message: str = "Instagram 미디어 처리 시간이 초과되었습니다."):
super().__init__(status.HTTP_504_GATEWAY_TIMEOUT, "INSTAGRAM_CONTAINER_TIMEOUT", message)
class InstagramContainerError(SNSException):
"""Instagram 미디어 컨테이너 오류"""
def __init__(self, message: str = "Instagram 미디어 처리에 실패했습니다."):
super().__init__(status.HTTP_500_INTERNAL_SERVER_ERROR, "INSTAGRAM_CONTAINER_ERROR", message)
from app.user.dependencies.auth import get_current_user
from app.user.models import Platform, SocialAccount, User
from app.utils.instagram import ErrorState, InstagramClient, parse_instagram_error
from app.utils.logger import get_logger from app.utils.logger import get_logger
from app.video.models import Video
logger = get_logger(__name__) logger = get_logger(__name__)
router = APIRouter(prefix="/sns", tags=["SNS"]) router = APIRouter(prefix="/sns", tags=["SNS"])
@router.get( @router.post(
"/instagram/upload/{task_id}", "/instagram/upload/{task_id}",
summary="Instagram 업로드 상태 조회", summary="Instagram 비디오 업로드",
description=""" description="""
## 개요 ## 개요
task_id에 해당하는 Instagram 업로드 작업의 상태를 조회합니다. task_id에 해당하는 비디오를 Instagram 업로드합니다.
## 경로 파라미터 ## 경로 파라미터
- **task_id**: 업로드 작업 고유 식별자 - **task_id**: 비디오 생성 작업 고유 식별자
## 요청 본문
- **caption**: 게시물 캡션 (선택, 최대 2200)
- **share_to_feed**: 피드에 공유 여부 (기본값: true)
## 인증
- Bearer 토큰 필요 (Authorization: Bearer <token>)
- 사용자의 Instagram 계정이 연동되어 있어야 합니다.
## 반환 정보 ## 반환 정보
- **task_id**: 작업 고유 식별자 - **task_id**: 작업 고유 식별자
- **state**: 업로드 상태 (pending, processing, completed, failed) - **state**: 업로드 상태 (completed, failed)
- **message**: 상태 메시지 - **message**: 상태 메시지
- **error**: 에러 메시지 (실패 , 기본값: null) - **media_id**: Instagram 미디어 ID (성공 )
- **permalink**: Instagram 게시물 URL (성공 )
- **error**: 에러 메시지 (실패 )
""", """,
response_model=InstagramUploadResponse, response_model=InstagramUploadResponse,
responses={ responses={
200: {"description": "상태 조회 성공"}, 200: {"description": "업로드 성공"},
400: {"description": "비디오 URL 미준비"},
401: {"description": "인증 실패"},
404: {"description": "비디오 또는 소셜 계정 없음"},
429: {"description": "Instagram API Rate Limit"},
500: {"description": "업로드 실패"},
504: {"description": "타임아웃"},
}, },
) )
async def get_instagram_upload_status(task_id: str) -> InstagramUploadResponse: async def upload_to_instagram(
"""Instagram 업로드 작업의 상태를 반환합니다.""" task_id: str,
logger.info(f"[get_instagram_upload_status] START - task_id: {task_id}") request: InstagramUploadRequest,
current_user: User = Depends(get_current_user),
session: AsyncSession = Depends(get_session),
) -> InstagramUploadResponse:
"""Instagram에 비디오를 업로드합니다."""
logger.info(f"[upload_to_instagram] START - task_id: {task_id}, user_uuid: {current_user.user_uuid}")
response = InstagramUploadResponse( # Step 1: 사용자의 Instagram 소셜 계정 조회
task_id=task_id, social_account_result = await session.execute(
state="pending", select(SocialAccount).where(
message="업로드 대기 중입니다.", SocialAccount.user_uuid == current_user.user_uuid,
error=None, SocialAccount.platform == Platform.INSTAGRAM,
SocialAccount.is_active == True, # noqa: E712
SocialAccount.is_deleted == False, # noqa: E712
)
) )
social_account = social_account_result.scalar_one_or_none()
logger.info(f"[get_instagram_upload_status] SUCCESS - task_id: {task_id}, state: {response.state}") if social_account is None:
return response logger.warning(f"[upload_to_instagram] Instagram 계정 없음 - user_uuid: {current_user.user_uuid}")
raise SocialAccountNotFoundError("연동된 Instagram 계정을 찾을 수 없습니다.")
logger.info(f"[upload_to_instagram] 소셜 계정 확인 - social_account_id: {social_account.id}")
# Step 2: task_id로 비디오 조회 (가장 최근 것)
video_result = await session.execute(
select(Video)
.where(
Video.task_id == task_id,
Video.is_deleted == False, # noqa: E712
)
.order_by(Video.created_at.desc())
.limit(1)
)
video = video_result.scalar_one_or_none()
if video is None:
logger.warning(f"[upload_to_instagram] 비디오 없음 - task_id: {task_id}")
raise VideoNotFoundError(f"task_id '{task_id}'에 해당하는 비디오를 찾을 수 없습니다.")
if video.result_movie_url is None:
logger.warning(f"[upload_to_instagram] 비디오 URL 미준비 - task_id: {task_id}, status: {video.status}")
raise VideoUrlNotReadyError("비디오가 아직 처리 중입니다. 잠시 후 다시 시도해주세요.")
logger.info(f"[upload_to_instagram] 비디오 확인 - video_id: {video.id}, url: {video.result_movie_url[:50]}...")
# Step 3: Instagram 업로드
try:
async with InstagramClient(access_token=social_account.access_token) as client:
# 접속 테스트 (계정 ID 조회)
await client.get_account_id()
logger.info("[upload_to_instagram] Instagram 접속 확인 완료")
# 비디오 업로드
media = await client.publish_video(
video_url=video.result_movie_url,
caption=request.caption,
share_to_feed=request.share_to_feed,
)
logger.info(
f"[upload_to_instagram] SUCCESS - task_id: {task_id}, "
f"media_id: {media.id}, permalink: {media.permalink}"
)
return InstagramUploadResponse(
task_id=task_id,
state="completed",
message="Instagram 업로드 완료",
media_id=media.id,
permalink=media.permalink,
error=None,
)
except Exception as e:
error_state, message, extra_info = parse_instagram_error(e)
logger.error(f"[upload_to_instagram] FAILED - task_id: {task_id}, error_state: {error_state}, message: {message}")
match error_state:
case ErrorState.RATE_LIMIT:
retry_after = extra_info.get("retry_after", 60)
raise InstagramRateLimitError(retry_after=retry_after)
case ErrorState.AUTH_ERROR:
raise InstagramAuthError()
case ErrorState.CONTAINER_TIMEOUT:
raise InstagramContainerTimeoutError()
case ErrorState.CONTAINER_ERROR:
status = extra_info.get("status", "UNKNOWN")
raise InstagramContainerError(f"미디어 처리 실패: {status}")
case _:
raise InstagramUploadError(f"Instagram 업로드 실패: {message}")

View File

@ -0,0 +1,183 @@
"""
SNS 모듈 SQLAlchemy 모델 정의
SNS 업로드 작업 관리 모델입니다.
"""
from datetime import datetime
from typing import TYPE_CHECKING, Optional
from sqlalchemy import Boolean, DateTime, ForeignKey, Index, Integer, String, Text, func
from sqlalchemy.orm import Mapped, mapped_column, relationship
from app.database.session import Base
if TYPE_CHECKING:
from app.user.models import SocialAccount, User
class SNSUploadTask(Base):
"""
SNS 업로드 작업 테이블
SNS 플랫폼에 콘텐츠를 업로드하는 작업을 관리합니다.
즉시 업로드 또는 예약 업로드를 지원합니다.
Attributes:
id: 고유 식별자 (자동 증가)
user_uuid: 사용자 UUID (User.user_uuid 참조)
task_id: 외부 작업 식별자 (비디오 생성 작업 )
is_scheduled: 예약 작업 여부 (True: 예약, False: 즉시)
scheduled_at: 예약 발행 일시 ( 단위까지)
social_account_id: 소셜 계정 외래키 (SocialAccount.id 참조)
url: 업로드할 미디어 URL
caption: 게시물 캡션/설명
status: 발행 상태 (pending: 예약 대기, completed: 완료, error: 에러)
uploaded_at: 실제 업로드 완료 일시
created_at: 작업 생성 일시
발행 상태 (status):
- pending: 예약 대기 (예약 작업이거나 처리 )
- processing: 처리
- completed: 발행 완료
- error: 에러 발생
Relationships:
user: 작업 소유 사용자 (User 테이블 참조)
social_account: 발행 대상 소셜 계정 (SocialAccount 테이블 참조)
"""
__tablename__ = "sns_upload_task"
__table_args__ = (
Index("idx_sns_upload_task_user_uuid", "user_uuid"),
Index("idx_sns_upload_task_task_id", "task_id"),
Index("idx_sns_upload_task_social_account_id", "social_account_id"),
Index("idx_sns_upload_task_status", "status"),
Index("idx_sns_upload_task_is_scheduled", "is_scheduled"),
Index("idx_sns_upload_task_scheduled_at", "scheduled_at"),
Index("idx_sns_upload_task_created_at", "created_at"),
{
"mysql_engine": "InnoDB",
"mysql_charset": "utf8mb4",
"mysql_collate": "utf8mb4_unicode_ci",
},
)
# ==========================================================================
# 기본 식별자
# ==========================================================================
id: Mapped[int] = mapped_column(
Integer,
primary_key=True,
nullable=False,
autoincrement=True,
comment="고유 식별자",
)
# ==========================================================================
# 사용자 및 작업 식별
# ==========================================================================
user_uuid: Mapped[str] = mapped_column(
String(36),
ForeignKey("user.user_uuid", ondelete="CASCADE"),
nullable=False,
comment="사용자 UUID (User.user_uuid 참조)",
)
task_id: Mapped[Optional[str]] = mapped_column(
String(100),
nullable=True,
comment="외부 작업 식별자 (비디오 생성 작업 ID 등)",
)
# ==========================================================================
# 예약 설정
# ==========================================================================
is_scheduled: Mapped[bool] = mapped_column(
Boolean,
nullable=False,
default=False,
comment="예약 작업 여부 (True: 예약 발행, False: 즉시 발행)",
)
scheduled_at: Mapped[Optional[datetime]] = mapped_column(
DateTime,
nullable=True,
comment="예약 발행 일시 (분 단위까지 지정)",
)
# ==========================================================================
# 소셜 계정 연결
# ==========================================================================
social_account_id: Mapped[int] = mapped_column(
Integer,
ForeignKey("social_account.id", ondelete="CASCADE"),
nullable=False,
comment="소셜 계정 외래키 (SocialAccount.id 참조)",
)
# ==========================================================================
# 업로드 콘텐츠
# ==========================================================================
url: Mapped[str] = mapped_column(
String(2048),
nullable=False,
comment="업로드할 미디어 URL",
)
caption: Mapped[Optional[str]] = mapped_column(
Text,
nullable=True,
comment="게시물 캡션/설명",
)
# ==========================================================================
# 발행 상태
# ==========================================================================
status: Mapped[str] = mapped_column(
String(20),
nullable=False,
default="pending",
comment="발행 상태 (pending: 예약 대기, processing: 처리 중, completed: 완료, error: 에러)",
)
# ==========================================================================
# 시간 정보
# ==========================================================================
uploaded_at: Mapped[Optional[datetime]] = mapped_column(
DateTime,
nullable=True,
comment="실제 업로드 완료 일시",
)
created_at: Mapped[datetime] = mapped_column(
DateTime,
nullable=False,
server_default=func.now(),
comment="작업 생성 일시",
)
# ==========================================================================
# Relationships
# ==========================================================================
user: Mapped["User"] = relationship(
"User",
foreign_keys=[user_uuid],
primaryjoin="SNSUploadTask.user_uuid == User.user_uuid",
)
social_account: Mapped["SocialAccount"] = relationship(
"SocialAccount",
foreign_keys=[social_account_id],
)
def __repr__(self) -> str:
return (
f"<SNSUploadTask("
f"id={self.id}, "
f"user_uuid='{self.user_uuid}', "
f"social_account_id={self.social_account_id}, "
f"status='{self.status}', "
f"is_scheduled={self.is_scheduled}"
f")>"
)

View File

@ -3,24 +3,61 @@ SNS API Schemas
Instagram 업로드 관련 Pydantic 스키마를 정의합니다. Instagram 업로드 관련 Pydantic 스키마를 정의합니다.
""" """
from datetime import datetime
from typing import Any, Optional
from typing import Optional
from pydantic import BaseModel, ConfigDict, Field from pydantic import BaseModel, ConfigDict, Field
class InstagramUploadResponse(BaseModel): class InstagramUploadRequest(BaseModel):
"""Instagram 업로드 상태 응답 스키마 """Instagram 업로드 요청 스키마
Usage: Usage:
GET /sns/instagram/upload/{task_id} POST /sns/instagram/upload/{task_id}
Instagram 업로드 작업의 상태를 반환합니다. Instagram비디오를 업로드합니다.
Example Response: Example Request:
{
"caption": "Test video from Instagram POC #test",
"share_to_feed": true
}
"""
model_config = ConfigDict(
json_schema_extra={
"example": {
"caption": "Test video from Instagram POC #test",
"share_to_feed": True,
}
}
)
caption: str = Field(
default="",
description="게시물 캡션",
max_length=2200,
)
share_to_feed: bool = Field(
default=True,
description="피드에 공유 여부",
)
class InstagramUploadResponse(BaseModel):
"""Instagram 업로드 응답 스키마
Usage:
POST /sns/instagram/upload/{task_id}
Instagram 업로드 작업의 결과를 반환합니다.
Example Response (성공):
{ {
"task_id": "0694b716-dbff-7219-8000-d08cb5fce431", "task_id": "0694b716-dbff-7219-8000-d08cb5fce431",
"state": "pending", "state": "completed",
"message": "업로드 대기 중입니다.", "message": "Instagram 업로드 완료",
"media_id": "17841405822304914",
"permalink": "https://www.instagram.com/p/ABC123/",
"error": null "error": null
} }
""" """
@ -29,8 +66,10 @@ class InstagramUploadResponse(BaseModel):
json_schema_extra={ json_schema_extra={
"example": { "example": {
"task_id": "0694b716-dbff-7219-8000-d08cb5fce431", "task_id": "0694b716-dbff-7219-8000-d08cb5fce431",
"state": "pending", "state": "completed",
"message": "업로드 대기 중입니다.", "message": "Instagram 업로드 완료",
"media_id": "17841405822304914",
"permalink": "https://www.instagram.com/p/ABC123/",
"error": None, "error": None,
} }
} }
@ -39,4 +78,71 @@ class InstagramUploadResponse(BaseModel):
task_id: str = Field(..., description="작업 고유 식별자") task_id: str = Field(..., description="작업 고유 식별자")
state: str = Field(..., description="업로드 상태 (pending, processing, completed, failed)") state: str = Field(..., description="업로드 상태 (pending, processing, completed, failed)")
message: str = Field(..., description="상태 메시지") message: str = Field(..., description="상태 메시지")
media_id: Optional[str] = Field(default=None, description="Instagram 미디어 ID (성공 시)")
permalink: Optional[str] = Field(default=None, description="Instagram 게시물 URL (성공 시)")
error: Optional[str] = Field(default=None, description="에러 메시지 (실패 시)") error: Optional[str] = Field(default=None, description="에러 메시지 (실패 시)")
class Media(BaseModel):
"""Instagram 미디어 정보"""
id: str
media_type: Optional[str] = None
media_url: Optional[str] = None
thumbnail_url: Optional[str] = None
caption: Optional[str] = None
timestamp: Optional[datetime] = None
permalink: Optional[str] = None
like_count: int = 0
comments_count: int = 0
children: Optional[list["Media"]] = None
class MediaList(BaseModel):
"""미디어 목록 응답"""
data: list[Media] = Field(default_factory=list)
paging: Optional[dict[str, Any]] = None
@property
def next_cursor(self) -> Optional[str]:
"""다음 페이지 커서"""
if self.paging and "cursors" in self.paging:
return self.paging["cursors"].get("after")
return None
class MediaContainer(BaseModel):
"""미디어 컨테이너 상태"""
id: str
status_code: Optional[str] = None
status: Optional[str] = None
@property
def is_finished(self) -> bool:
return self.status_code == "FINISHED"
@property
def is_error(self) -> bool:
return self.status_code == "ERROR"
@property
def is_in_progress(self) -> bool:
return self.status_code == "IN_PROGRESS"
class APIError(BaseModel):
"""API 에러 응답"""
message: str
type: Optional[str] = None
code: Optional[int] = None
error_subcode: Optional[int] = None
fbtrace_id: Optional[str] = None
class ErrorResponse(BaseModel):
"""에러 응답 래퍼"""
error: APIError

View File

@ -0,0 +1,307 @@
"""
SocialAccount API 라우터
소셜 계정 연동 CRUD 엔드포인트를 제공합니다.
"""
import logging
from fastapi import APIRouter, Depends, HTTPException, status
from sqlalchemy.ext.asyncio import AsyncSession
from app.database.session import get_session
from app.user.dependencies import get_current_user
from app.user.models import User
from app.user.schemas.social_account_schema import (
SocialAccountCreateRequest,
SocialAccountDeleteResponse,
SocialAccountListResponse,
SocialAccountResponse,
SocialAccountUpdateRequest,
)
from app.user.services.social_account import SocialAccountService
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/social-accounts", tags=["Social Account"])
# =============================================================================
# 소셜 계정 목록 조회
# =============================================================================
@router.get(
"",
response_model=SocialAccountListResponse,
summary="소셜 계정 목록 조회",
description="""
## 개요
현재 로그인한 사용자의 연동된 소셜 계정 목록을 조회합니다.
## 인증
- Bearer 토큰 필수
## 반환 정보
- **items**: 소셜 계정 목록
- **total**: 계정
""",
)
async def get_social_accounts(
current_user: User = Depends(get_current_user),
session: AsyncSession = Depends(get_session),
) -> SocialAccountListResponse:
"""소셜 계정 목록 조회"""
logger.info(f"[get_social_accounts] START - user_uuid: {current_user.user_uuid}")
try:
service = SocialAccountService(session)
accounts = await service.get_list(current_user)
response = SocialAccountListResponse(
items=[SocialAccountResponse.model_validate(acc) for acc in accounts],
total=len(accounts),
)
logger.info(f"[get_social_accounts] SUCCESS - user_uuid: {current_user.user_uuid}, count: {len(accounts)}")
return response
except Exception as e:
logger.error(f"[get_social_accounts] ERROR - user_uuid: {current_user.user_uuid}, error: {e}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="소셜 계정 목록 조회 중 오류가 발생했습니다.",
)
# =============================================================================
# 소셜 계정 상세 조회
# =============================================================================
@router.get(
"/{account_id}",
response_model=SocialAccountResponse,
summary="소셜 계정 상세 조회",
description="""
## 개요
특정 소셜 계정의 상세 정보를 조회합니다.
## 인증
- Bearer 토큰 필수
- 본인 소유의 계정만 조회 가능
## 경로 파라미터
- **account_id**: 소셜 계정 ID
""",
responses={
404: {"description": "소셜 계정을 찾을 수 없음"},
},
)
async def get_social_account(
account_id: int,
current_user: User = Depends(get_current_user),
session: AsyncSession = Depends(get_session),
) -> SocialAccountResponse:
"""소셜 계정 상세 조회"""
logger.info(f"[get_social_account] START - user_uuid: {current_user.user_uuid}, account_id: {account_id}")
try:
service = SocialAccountService(session)
account = await service.get_by_id(current_user, account_id)
if not account:
logger.warning(f"[get_social_account] NOT_FOUND - account_id: {account_id}")
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="소셜 계정을 찾을 수 없습니다.",
)
logger.info(f"[get_social_account] SUCCESS - account_id: {account_id}, platform: {account.platform}")
return SocialAccountResponse.model_validate(account)
except HTTPException:
raise
except Exception as e:
logger.error(f"[get_social_account] ERROR - account_id: {account_id}, error: {e}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="소셜 계정 조회 중 오류가 발생했습니다.",
)
# =============================================================================
# 소셜 계정 생성
# =============================================================================
@router.post(
"",
response_model=SocialAccountResponse,
status_code=status.HTTP_201_CREATED,
summary="소셜 계정 연동",
description="""
## 개요
새로운 소셜 계정을 연동합니다.
## 인증
- Bearer 토큰 필수
## 요청 본문
- **platform**: 플랫폼 구분 (youtube, instagram, facebook, tiktok)
- **access_token**: OAuth 액세스 토큰
- **platform_user_id**: 플랫폼 사용자 고유 ID
- 기타 선택 필드
## 주의사항
- 동일한 플랫폼의 동일한 계정은 중복 연동할 없습니다.
""",
responses={
400: {"description": "이미 연동된 계정"},
},
)
async def create_social_account(
data: SocialAccountCreateRequest,
current_user: User = Depends(get_current_user),
session: AsyncSession = Depends(get_session),
) -> SocialAccountResponse:
"""소셜 계정 연동"""
logger.info(
f"[create_social_account] START - user_uuid: {current_user.user_uuid}, "
f"platform: {data.platform}, platform_user_id: {data.platform_user_id}"
)
try:
service = SocialAccountService(session)
account = await service.create(current_user, data)
logger.info(
f"[create_social_account] SUCCESS - account_id: {account.id}, "
f"platform: {account.platform}"
)
return SocialAccountResponse.model_validate(account)
except ValueError as e:
logger.warning(f"[create_social_account] DUPLICATE - error: {e}")
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=str(e),
)
except Exception as e:
logger.error(f"[create_social_account] ERROR - error: {e}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="소셜 계정 연동 중 오류가 발생했습니다.",
)
# =============================================================================
# 소셜 계정 수정
# =============================================================================
@router.patch(
"/{account_id}",
response_model=SocialAccountResponse,
summary="소셜 계정 정보 수정",
description="""
## 개요
소셜 계정 정보를 수정합니다. (토큰 갱신 )
## 인증
- Bearer 토큰 필수
- 본인 소유의 계정만 수정 가능
## 경로 파라미터
- **account_id**: 소셜 계정 ID
## 요청 본문
- 수정할 필드만 전송 (PATCH 방식)
""",
responses={
404: {"description": "소셜 계정을 찾을 수 없음"},
},
)
async def update_social_account(
account_id: int,
data: SocialAccountUpdateRequest,
current_user: User = Depends(get_current_user),
session: AsyncSession = Depends(get_session),
) -> SocialAccountResponse:
"""소셜 계정 정보 수정"""
logger.info(
f"[update_social_account] START - user_uuid: {current_user.user_uuid}, "
f"account_id: {account_id}, data: {data.model_dump(exclude_unset=True)}"
)
try:
service = SocialAccountService(session)
account = await service.update(current_user, account_id, data)
if not account:
logger.warning(f"[update_social_account] NOT_FOUND - account_id: {account_id}")
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="소셜 계정을 찾을 수 없습니다.",
)
logger.info(f"[update_social_account] SUCCESS - account_id: {account_id}")
return SocialAccountResponse.model_validate(account)
except HTTPException:
raise
except Exception as e:
logger.error(f"[update_social_account] ERROR - account_id: {account_id}, error: {e}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="소셜 계정 수정 중 오류가 발생했습니다.",
)
# =============================================================================
# 소셜 계정 삭제
# =============================================================================
@router.delete(
"/{account_id}",
response_model=SocialAccountDeleteResponse,
summary="소셜 계정 연동 해제",
description="""
## 개요
소셜 계정 연동을 해제합니다. (소프트 삭제)
## 인증
- Bearer 토큰 필수
- 본인 소유의 계정만 삭제 가능
## 경로 파라미터
- **account_id**: 소셜 계정 ID
""",
responses={
404: {"description": "소셜 계정을 찾을 수 없음"},
},
)
async def delete_social_account(
account_id: int,
current_user: User = Depends(get_current_user),
session: AsyncSession = Depends(get_session),
) -> SocialAccountDeleteResponse:
"""소셜 계정 연동 해제"""
logger.info(f"[delete_social_account] START - user_uuid: {current_user.user_uuid}, account_id: {account_id}")
try:
service = SocialAccountService(session)
deleted_id = await service.delete(current_user, account_id)
if not deleted_id:
logger.warning(f"[delete_social_account] NOT_FOUND - account_id: {account_id}")
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="소셜 계정을 찾을 수 없습니다.",
)
logger.info(f"[delete_social_account] SUCCESS - deleted_id: {deleted_id}")
return SocialAccountDeleteResponse(
message="소셜 계정이 삭제되었습니다.",
deleted_id=deleted_id,
)
except HTTPException:
raise
except Exception as e:
logger.error(f"[delete_social_account] ERROR - account_id: {account_id}, error: {e}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="소셜 계정 삭제 중 오류가 발생했습니다.",
)

View File

View File

@ -12,15 +12,14 @@ from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.ext.asyncio import AsyncSession
from app.database.session import get_session from app.database.session import get_session
from app.user.exceptions import ( from app.user.models import User
from app.user.services.auth import (
AdminRequiredError, AdminRequiredError,
InvalidTokenError, InvalidTokenError,
MissingTokenError, MissingTokenError,
TokenExpiredError,
UserInactiveError, UserInactiveError,
UserNotFoundError, UserNotFoundError,
) )
from app.user.models import User
from app.user.services.jwt import decode_token from app.user.services.jwt import decode_token
security = HTTPBearer(auto_error=False) security = HTTPBearer(auto_error=False)

View File

@ -1,141 +0,0 @@
"""
User 모듈 커스텀 예외 정의
인증 사용자 관련 에러를 처리하기 위한 예외 클래스들입니다.
"""
from fastapi import HTTPException, status
class AuthException(HTTPException):
"""인증 관련 기본 예외"""
def __init__(
self,
status_code: int,
code: str,
message: str,
):
super().__init__(
status_code=status_code,
detail={"code": code, "message": message},
)
# =============================================================================
# 카카오 OAuth 관련 예외
# =============================================================================
class InvalidAuthCodeError(AuthException):
"""유효하지 않은 인가 코드"""
def __init__(self, message: str = "유효하지 않은 인가 코드입니다."):
super().__init__(
status_code=status.HTTP_400_BAD_REQUEST,
code="INVALID_CODE",
message=message,
)
class KakaoAuthFailedError(AuthException):
"""카카오 인증 실패"""
def __init__(self, message: str = "카카오 인증에 실패했습니다."):
super().__init__(
status_code=status.HTTP_400_BAD_REQUEST,
code="KAKAO_AUTH_FAILED",
message=message,
)
class KakaoAPIError(AuthException):
"""카카오 API 호출 오류"""
def __init__(self, message: str = "카카오 API 호출 중 오류가 발생했습니다."):
super().__init__(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
code="KAKAO_API_ERROR",
message=message,
)
# =============================================================================
# JWT 토큰 관련 예외
# =============================================================================
class TokenExpiredError(AuthException):
"""토큰 만료"""
def __init__(self, message: str = "토큰이 만료되었습니다. 다시 로그인해주세요."):
super().__init__(
status_code=status.HTTP_401_UNAUTHORIZED,
code="TOKEN_EXPIRED",
message=message,
)
class InvalidTokenError(AuthException):
"""유효하지 않은 토큰"""
def __init__(self, message: str = "유효하지 않은 토큰입니다."):
super().__init__(
status_code=status.HTTP_401_UNAUTHORIZED,
code="INVALID_TOKEN",
message=message,
)
class TokenRevokedError(AuthException):
"""취소된 토큰"""
def __init__(self, message: str = "취소된 토큰입니다. 다시 로그인해주세요."):
super().__init__(
status_code=status.HTTP_401_UNAUTHORIZED,
code="TOKEN_REVOKED",
message=message,
)
class MissingTokenError(AuthException):
"""토큰 누락"""
def __init__(self, message: str = "인증 토큰이 필요합니다."):
super().__init__(
status_code=status.HTTP_401_UNAUTHORIZED,
code="MISSING_TOKEN",
message=message,
)
# =============================================================================
# 사용자 관련 예외
# =============================================================================
class UserNotFoundError(AuthException):
"""사용자 없음"""
def __init__(self, message: str = "가입되지 않은 사용자 입니다."):
super().__init__(
status_code=status.HTTP_404_NOT_FOUND,
code="USER_NOT_FOUND",
message=message,
)
class UserInactiveError(AuthException):
"""비활성화된 계정"""
def __init__(self, message: str = "활성화 상태가 아닌 사용자 입니다."):
super().__init__(
status_code=status.HTTP_403_FORBIDDEN,
code="USER_INACTIVE",
message=message,
)
class AdminRequiredError(AuthException):
"""관리자 권한 필요"""
def __init__(self, message: str = "관리자 권한이 필요합니다."):
super().__init__(
status_code=status.HTTP_403_FORBIDDEN,
code="ADMIN_REQUIRED",
message=message,
)

View File

@ -5,6 +5,7 @@ User 모듈 SQLAlchemy 모델 정의
""" """
from datetime import date, datetime from datetime import date, datetime
from enum import Enum
from typing import TYPE_CHECKING, List, Optional from typing import TYPE_CHECKING, List, Optional
from sqlalchemy import BigInteger, Boolean, Date, DateTime, ForeignKey, Index, Integer, String, Text, func from sqlalchemy import BigInteger, Boolean, Date, DateTime, ForeignKey, Index, Integer, String, Text, func
@ -13,6 +14,7 @@ from sqlalchemy.orm import Mapped, mapped_column, relationship
from app.database.session import Base from app.database.session import Base
if TYPE_CHECKING: if TYPE_CHECKING:
from app.home.models import Project from app.home.models import Project
@ -403,6 +405,15 @@ class RefreshToken(Base):
) )
class Platform(str, Enum):
"""소셜 플랫폼 구분"""
YOUTUBE = "youtube"
INSTAGRAM = "instagram"
FACEBOOK = "facebook"
TIKTOK = "tiktok"
class SocialAccount(Base): class SocialAccount(Base):
""" """
소셜 계정 연동 테이블 소셜 계정 연동 테이블
@ -475,10 +486,10 @@ class SocialAccount(Base):
# ========================================================================== # ==========================================================================
# 플랫폼 구분 # 플랫폼 구분
# ========================================================================== # ==========================================================================
platform: Mapped[str] = mapped_column( platform: Mapped[Platform] = mapped_column(
String(20), String(20),
nullable=False, nullable=False,
comment="플랫폼 구분 (youtube, instagram, facebook)", comment="플랫폼 구분 (youtube, instagram, facebook, tiktok)",
) )
# ========================================================================== # ==========================================================================
@ -513,7 +524,7 @@ class SocialAccount(Base):
# ========================================================================== # ==========================================================================
platform_user_id: Mapped[str] = mapped_column( platform_user_id: Mapped[str] = mapped_column(
String(100), String(100),
nullable=False, nullable=True,
comment="플랫폼 내 사용자 고유 ID", comment="플랫폼 내 사용자 고유 ID",
) )
@ -539,7 +550,7 @@ class SocialAccount(Base):
Boolean, Boolean,
nullable=False, nullable=False,
default=True, default=True,
comment="연동 활성화 상태 (비활성화 시 사용 중지)", comment="활성화 상태 (비활성화 시 사용 중지)",
) )
is_deleted: Mapped[bool] = mapped_column( is_deleted: Mapped[bool] = mapped_column(
@ -552,13 +563,6 @@ class SocialAccount(Base):
# ========================================================================== # ==========================================================================
# 시간 정보 # 시간 정보
# ========================================================================== # ==========================================================================
connected_at: Mapped[datetime] = mapped_column(
DateTime,
nullable=False,
server_default=func.now(),
comment="연동 일시",
)
updated_at: Mapped[datetime] = mapped_column( updated_at: Mapped[datetime] = mapped_column(
DateTime, DateTime,
nullable=False, nullable=False,
@ -567,6 +571,13 @@ class SocialAccount(Base):
comment="정보 수정 일시", comment="정보 수정 일시",
) )
created_at: Mapped[datetime] = mapped_column(
DateTime,
nullable=False,
server_default=func.now(),
comment="생성 일시",
)
# ========================================================================== # ==========================================================================
# User 관계 # User 관계
# ========================================================================== # ==========================================================================

View File

@ -0,0 +1,152 @@
"""
SocialAccount 모듈 Pydantic 스키마 정의
소셜 계정 연동 API 요청/응답 검증을 위한 스키마들입니다.
"""
from datetime import datetime
from typing import Any, Optional
from pydantic import BaseModel, Field
from app.user.models import Platform
# =============================================================================
# 요청 스키마
# =============================================================================
class SocialAccountCreateRequest(BaseModel):
"""소셜 계정 연동 요청"""
platform: Platform = Field(..., description="플랫폼 구분 (youtube, instagram, facebook, tiktok)")
access_token: str = Field(..., min_length=1, description="OAuth 액세스 토큰")
refresh_token: Optional[str] = Field(None, description="OAuth 리프레시 토큰")
token_expires_at: Optional[datetime] = Field(None, description="토큰 만료 일시")
scope: Optional[str] = Field(None, description="허용된 권한 범위")
platform_user_id: str = Field(..., min_length=1, description="플랫폼 내 사용자 고유 ID")
platform_username: Optional[str] = Field(None, description="플랫폼 내 사용자명/핸들")
platform_data: Optional[dict[str, Any]] = Field(None, description="플랫폼별 추가 정보")
model_config = {
"json_schema_extra": {
"example": {
"platform": "instagram",
"access_token": "IGQWRPcG...",
"refresh_token": None,
"token_expires_at": "2026-03-15T10:30:00",
"scope": "instagram_basic,instagram_content_publish",
"platform_user_id": "17841400000000000",
"platform_username": "my_instagram_account",
"platform_data": {
"business_account_id": "17841400000000000",
"facebook_page_id": "123456789"
}
}
}
}
class SocialAccountUpdateRequest(BaseModel):
"""소셜 계정 정보 수정 요청"""
access_token: Optional[str] = Field(None, min_length=1, description="OAuth 액세스 토큰")
refresh_token: Optional[str] = Field(None, description="OAuth 리프레시 토큰")
token_expires_at: Optional[datetime] = Field(None, description="토큰 만료 일시")
scope: Optional[str] = Field(None, description="허용된 권한 범위")
platform_username: Optional[str] = Field(None, description="플랫폼 내 사용자명/핸들")
platform_data: Optional[dict[str, Any]] = Field(None, description="플랫폼별 추가 정보")
is_active: Optional[bool] = Field(None, description="활성화 상태")
model_config = {
"json_schema_extra": {
"example": {
"access_token": "IGQWRPcG_NEW_TOKEN...",
"token_expires_at": "2026-04-15T10:30:00",
"is_active": True
}
}
}
# =============================================================================
# 응답 스키마
# =============================================================================
class SocialAccountResponse(BaseModel):
"""소셜 계정 정보 응답"""
account_id: int = Field(..., validation_alias="id", description="소셜 계정 ID")
platform: Platform = Field(..., description="플랫폼 구분")
platform_user_id: str = Field(..., description="플랫폼 내 사용자 고유 ID")
platform_username: Optional[str] = Field(None, description="플랫폼 내 사용자명/핸들")
platform_data: Optional[dict[str, Any]] = Field(None, description="플랫폼별 추가 정보")
scope: Optional[str] = Field(None, description="허용된 권한 범위")
token_expires_at: Optional[datetime] = Field(None, description="토큰 만료 일시")
is_active: bool = Field(..., description="활성화 상태")
created_at: datetime = Field(..., description="연동 일시")
updated_at: datetime = Field(..., description="수정 일시")
model_config = {
"from_attributes": True,
"populate_by_name": True,
"json_schema_extra": {
"example": {
"account_id": 1,
"platform": "instagram",
"platform_user_id": "17841400000000000",
"platform_username": "my_instagram_account",
"platform_data": {
"business_account_id": "17841400000000000"
},
"scope": "instagram_basic,instagram_content_publish",
"token_expires_at": "2026-03-15T10:30:00",
"is_active": True,
"created_at": "2026-01-15T10:30:00",
"updated_at": "2026-01-15T10:30:00"
}
}
}
class SocialAccountListResponse(BaseModel):
"""소셜 계정 목록 응답"""
items: list[SocialAccountResponse] = Field(..., description="소셜 계정 목록")
total: int = Field(..., description="총 계정 수")
model_config = {
"json_schema_extra": {
"example": {
"items": [
{
"account_id": 1,
"platform": "instagram",
"platform_user_id": "17841400000000000",
"platform_username": "my_instagram_account",
"platform_data": None,
"scope": "instagram_basic",
"token_expires_at": "2026-03-15T10:30:00",
"is_active": True,
"created_at": "2026-01-15T10:30:00",
"updated_at": "2026-01-15T10:30:00"
}
],
"total": 1
}
}
}
class SocialAccountDeleteResponse(BaseModel):
"""소셜 계정 삭제 응답"""
message: str = Field(..., description="결과 메시지")
deleted_id: int = Field(..., description="삭제된 계정 ID")
model_config = {
"json_schema_extra": {
"example": {
"message": "소셜 계정이 삭제되었습니다.",
"deleted_id": 1
}
}
}

View File

@ -8,6 +8,7 @@ import logging
from datetime import datetime from datetime import datetime
from typing import Optional from typing import Optional
from fastapi import HTTPException, status
from sqlalchemy import select, update from sqlalchemy import select, update
from sqlalchemy.exc import IntegrityError from sqlalchemy.exc import IntegrityError
from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.ext.asyncio import AsyncSession
@ -16,13 +17,66 @@ from config import prj_settings
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
from app.user.exceptions import (
InvalidTokenError, # =============================================================================
TokenExpiredError, # 인증 예외 클래스 정의
TokenRevokedError, # =============================================================================
UserInactiveError, class AuthException(HTTPException):
UserNotFoundError, """인증 관련 기본 예외"""
)
def __init__(self, status_code: int, code: str, message: str):
super().__init__(status_code=status_code, detail={"code": code, "message": message})
class TokenExpiredError(AuthException):
"""토큰 만료"""
def __init__(self, message: str = "토큰이 만료되었습니다. 다시 로그인해주세요."):
super().__init__(status.HTTP_401_UNAUTHORIZED, "TOKEN_EXPIRED", message)
class InvalidTokenError(AuthException):
"""유효하지 않은 토큰"""
def __init__(self, message: str = "유효하지 않은 토큰입니다."):
super().__init__(status.HTTP_401_UNAUTHORIZED, "INVALID_TOKEN", message)
class TokenRevokedError(AuthException):
"""취소된 토큰"""
def __init__(self, message: str = "취소된 토큰입니다. 다시 로그인해주세요."):
super().__init__(status.HTTP_401_UNAUTHORIZED, "TOKEN_REVOKED", message)
class MissingTokenError(AuthException):
"""토큰 누락"""
def __init__(self, message: str = "인증 토큰이 필요합니다."):
super().__init__(status.HTTP_401_UNAUTHORIZED, "MISSING_TOKEN", message)
class UserNotFoundError(AuthException):
"""사용자 없음"""
def __init__(self, message: str = "가입되지 않은 사용자 입니다."):
super().__init__(status.HTTP_404_NOT_FOUND, "USER_NOT_FOUND", message)
class UserInactiveError(AuthException):
"""비활성화된 계정"""
def __init__(self, message: str = "활성화 상태가 아닌 사용자 입니다."):
super().__init__(status.HTTP_403_FORBIDDEN, "USER_INACTIVE", message)
class AdminRequiredError(AuthException):
"""관리자 권한 필요"""
def __init__(self, message: str = "관리자 권한이 필요합니다."):
super().__init__(status.HTTP_403_FORBIDDEN, "ADMIN_REQUIRED", message)
from app.user.models import RefreshToken, User from app.user.models import RefreshToken, User
from app.utils.common import generate_uuid from app.utils.common import generate_uuid
from app.user.schemas.user_schema import ( from app.user.schemas.user_schema import (

View File

@ -7,15 +7,39 @@
import logging import logging
import aiohttp import aiohttp
from fastapi import HTTPException, status
from config import kakao_settings from config import kakao_settings
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
from app.user.exceptions import KakaoAPIError, KakaoAuthFailedError
from app.user.schemas.user_schema import KakaoTokenResponse, KakaoUserInfo from app.user.schemas.user_schema import KakaoTokenResponse, KakaoUserInfo
# =============================================================================
# 카카오 OAuth 예외 클래스 정의
# =============================================================================
class KakaoException(HTTPException):
"""카카오 관련 기본 예외"""
def __init__(self, status_code: int, code: str, message: str):
super().__init__(status_code=status_code, detail={"code": code, "message": message})
class KakaoAuthFailedError(KakaoException):
"""카카오 인증 실패"""
def __init__(self, message: str = "카카오 인증에 실패했습니다."):
super().__init__(status.HTTP_400_BAD_REQUEST, "KAKAO_AUTH_FAILED", message)
class KakaoAPIError(KakaoException):
"""카카오 API 호출 오류"""
def __init__(self, message: str = "카카오 API 호출 중 오류가 발생했습니다."):
super().__init__(status.HTTP_500_INTERNAL_SERVER_ERROR, "KAKAO_API_ERROR", message)
class KakaoOAuthClient: class KakaoOAuthClient:
""" """
카카오 OAuth API 클라이언트 카카오 OAuth API 클라이언트

View File

@ -0,0 +1,259 @@
"""
SocialAccount 서비스 레이어
소셜 계정 연동 관련 비즈니스 로직을 처리합니다.
"""
import logging
from typing import Optional
from sqlalchemy import and_, select
from sqlalchemy.ext.asyncio import AsyncSession
from app.user.models import Platform, SocialAccount, User
from app.user.schemas.social_account_schema import (
SocialAccountCreateRequest,
SocialAccountUpdateRequest,
)
logger = logging.getLogger(__name__)
class SocialAccountService:
"""소셜 계정 서비스"""
def __init__(self, session: AsyncSession):
self.session = session
async def get_list(self, user: User) -> list[SocialAccount]:
"""
사용자의 소셜 계정 목록 조회
Args:
user: 현재 로그인한 사용자
Returns:
list[SocialAccount]: 소셜 계정 목록
"""
logger.debug(f"[SocialAccountService.get_list] START - user_uuid: {user.user_uuid}")
result = await self.session.execute(
select(SocialAccount).where(
and_(
SocialAccount.user_uuid == user.user_uuid,
SocialAccount.is_deleted == False, # noqa: E712
)
).order_by(SocialAccount.created_at.desc())
)
accounts = list(result.scalars().all())
logger.debug(f"[SocialAccountService.get_list] SUCCESS - count: {len(accounts)}")
return accounts
async def get_by_id(self, user: User, account_id: int) -> Optional[SocialAccount]:
"""
ID로 소셜 계정 조회
Args:
user: 현재 로그인한 사용자
account_id: 소셜 계정 ID
Returns:
SocialAccount | None: 소셜 계정 또는 None
"""
logger.debug(f"[SocialAccountService.get_by_id] START - user_uuid: {user.user_uuid}, account_id: {account_id}")
result = await self.session.execute(
select(SocialAccount).where(
and_(
SocialAccount.id == account_id,
SocialAccount.user_uuid == user.user_uuid,
SocialAccount.is_deleted == False, # noqa: E712
)
)
)
account = result.scalar_one_or_none()
if account:
logger.debug(f"[SocialAccountService.get_by_id] SUCCESS - platform: {account.platform}")
else:
logger.debug(f"[SocialAccountService.get_by_id] NOT_FOUND - account_id: {account_id}")
return account
async def get_by_platform(
self,
user: User,
platform: Platform,
platform_user_id: Optional[str] = None,
) -> Optional[SocialAccount]:
"""
플랫폼별 소셜 계정 조회
Args:
user: 현재 로그인한 사용자
platform: 플랫폼
platform_user_id: 플랫폼 사용자 ID (선택)
Returns:
SocialAccount | None: 소셜 계정 또는 None
"""
logger.debug(
f"[SocialAccountService.get_by_platform] START - user_uuid: {user.user_uuid}, "
f"platform: {platform}, platform_user_id: {platform_user_id}"
)
conditions = [
SocialAccount.user_uuid == user.user_uuid,
SocialAccount.platform == platform,
SocialAccount.is_deleted == False, # noqa: E712
]
if platform_user_id:
conditions.append(SocialAccount.platform_user_id == platform_user_id)
result = await self.session.execute(
select(SocialAccount).where(and_(*conditions))
)
account = result.scalar_one_or_none()
if account:
logger.debug(f"[SocialAccountService.get_by_platform] SUCCESS - id: {account.id}")
else:
logger.debug(f"[SocialAccountService.get_by_platform] NOT_FOUND")
return account
async def create(
self,
user: User,
data: SocialAccountCreateRequest,
) -> SocialAccount:
"""
소셜 계정 생성
Args:
user: 현재 로그인한 사용자
data: 생성 요청 데이터
Returns:
SocialAccount: 생성된 소셜 계정
Raises:
ValueError: 이미 연동된 계정이 존재하는 경우
"""
logger.debug(
f"[SocialAccountService.create] START - user_uuid: {user.user_uuid}, "
f"platform: {data.platform}, platform_user_id: {data.platform_user_id}"
)
# 중복 확인
existing = await self.get_by_platform(user, data.platform, data.platform_user_id)
if existing:
logger.warning(
f"[SocialAccountService.create] DUPLICATE - "
f"platform: {data.platform}, platform_user_id: {data.platform_user_id}"
)
raise ValueError(f"이미 연동된 {data.platform.value} 계정입니다.")
account = SocialAccount(
user_uuid=user.user_uuid,
platform=data.platform,
access_token=data.access_token,
refresh_token=data.refresh_token,
token_expires_at=data.token_expires_at,
scope=data.scope,
platform_user_id=data.platform_user_id,
platform_username=data.platform_username,
platform_data=data.platform_data,
is_active=True,
is_deleted=False,
)
self.session.add(account)
await self.session.commit()
await self.session.refresh(account)
logger.info(
f"[SocialAccountService.create] SUCCESS - id: {account.id}, "
f"platform: {account.platform}, platform_username: {account.platform_username}"
)
return account
async def update(
self,
user: User,
account_id: int,
data: SocialAccountUpdateRequest,
) -> Optional[SocialAccount]:
"""
소셜 계정 수정
Args:
user: 현재 로그인한 사용자
account_id: 소셜 계정 ID
data: 수정 요청 데이터
Returns:
SocialAccount | None: 수정된 소셜 계정 또는 None
"""
logger.debug(
f"[SocialAccountService.update] START - user_uuid: {user.user_uuid}, account_id: {account_id}"
)
account = await self.get_by_id(user, account_id)
if not account:
logger.warning(f"[SocialAccountService.update] NOT_FOUND - account_id: {account_id}")
return None
# 변경된 필드만 업데이트
update_data = data.model_dump(exclude_unset=True)
for field, value in update_data.items():
if value is not None:
setattr(account, field, value)
await self.session.commit()
await self.session.refresh(account)
logger.info(
f"[SocialAccountService.update] SUCCESS - id: {account.id}, "
f"updated_fields: {list(update_data.keys())}"
)
return account
async def delete(self, user: User, account_id: int) -> Optional[int]:
"""
소셜 계정 소프트 삭제
Args:
user: 현재 로그인한 사용자
account_id: 소셜 계정 ID
Returns:
int | None: 삭제된 계정 ID 또는 None
"""
logger.debug(
f"[SocialAccountService.delete] START - user_uuid: {user.user_uuid}, account_id: {account_id}"
)
account = await self.get_by_id(user, account_id)
if not account:
logger.warning(f"[SocialAccountService.delete] NOT_FOUND - account_id: {account_id}")
return None
account.is_deleted = True
account.is_active = False
await self.session.commit()
logger.info(
f"[SocialAccountService.delete] SUCCESS - id: {account_id}, platform: {account.platform}"
)
return account_id
# =============================================================================
# 의존성 주입용 함수
# =============================================================================
async def get_social_account_service(session: AsyncSession) -> SocialAccountService:
"""SocialAccountService 인스턴스 반환"""
return SocialAccountService(session)

398
app/utils/instagram.py Normal file
View File

@ -0,0 +1,398 @@
"""
Instagram Graph API Client
Instagram Graph API를 사용한 비디오/릴스 게시를 위한 비동기 클라이언트입니다.
Example:
```python
async with InstagramClient(access_token="YOUR_TOKEN") as client:
media = await client.publish_video(
video_url="https://example.com/video.mp4",
caption="Hello Instagram!"
)
print(f"게시 완료: {media.permalink}")
```
"""
import asyncio
import logging
import re
import time
from enum import Enum
from typing import Any, Optional
import httpx
from app.sns.schemas.sns_schema import ErrorResponse, Media, MediaContainer
logger = logging.getLogger(__name__)
# ============================================================
# Error State & Parser
# ============================================================
class ErrorState(str, Enum):
"""Instagram API 에러 상태"""
RATE_LIMIT = "rate_limit"
AUTH_ERROR = "auth_error"
CONTAINER_TIMEOUT = "container_timeout"
CONTAINER_ERROR = "container_error"
API_ERROR = "api_error"
UNKNOWN = "unknown"
def parse_instagram_error(e: Exception) -> tuple[ErrorState, str, dict]:
"""
Instagram 예외를 파싱하여 상태, 메시지, 추가 정보를 반환
Args:
e: 발생한 예외
Returns:
tuple: (error_state, message, extra_info)
Example:
>>> error_state, message, extra_info = parse_instagram_error(e)
>>> if error_state == ErrorState.RATE_LIMIT:
... retry_after = extra_info.get("retry_after", 60)
"""
error_str = str(e)
extra_info = {}
# Rate Limit 에러
if "[RateLimit]" in error_str:
match = re.search(r"retry_after=(\d+)s", error_str)
if match:
extra_info["retry_after"] = int(match.group(1))
return ErrorState.RATE_LIMIT, "API 호출 제한 초과", extra_info
# 인증 에러 (code=190)
if "code=190" in error_str:
return ErrorState.AUTH_ERROR, "인증 실패 (토큰 만료 또는 무효)", extra_info
# 컨테이너 타임아웃
if "[ContainerTimeout]" in error_str:
match = re.search(r"\((\d+)초 초과\)", error_str)
if match:
extra_info["timeout"] = int(match.group(1))
return ErrorState.CONTAINER_TIMEOUT, "미디어 처리 시간 초과", extra_info
# 컨테이너 상태 에러
if "[ContainerStatus]" in error_str:
match = re.search(r"처리 실패: (\w+)", error_str)
if match:
extra_info["status"] = match.group(1)
return ErrorState.CONTAINER_ERROR, "미디어 컨테이너 처리 실패", extra_info
# Instagram API 에러
if "[InstagramAPI]" in error_str:
match = re.search(r"code=(\d+)", error_str)
if match:
extra_info["code"] = int(match.group(1))
return ErrorState.API_ERROR, "Instagram API 오류", extra_info
return ErrorState.UNKNOWN, str(e), extra_info
# ============================================================
# Instagram Client
# ============================================================
class InstagramClient:
"""
Instagram Graph API 비동기 클라이언트 (비디오 업로드 전용)
Example:
```python
async with InstagramClient(access_token="USER_TOKEN") as client:
media = await client.publish_video(
video_url="https://example.com/video.mp4",
caption="My video!"
)
print(f"게시됨: {media.permalink}")
```
"""
DEFAULT_BASE_URL = "https://graph.instagram.com/v21.0"
def __init__(
self,
access_token: str,
*,
base_url: Optional[str] = None,
timeout: float = 30.0,
max_retries: int = 3,
container_timeout: float = 300.0,
container_poll_interval: float = 5.0,
):
"""
클라이언트 초기화
Args:
access_token: Instagram 액세스 토큰 (필수)
base_url: API 기본 URL (기본값: https://graph.instagram.com/v21.0)
timeout: HTTP 요청 타임아웃 ()
max_retries: 최대 재시도 횟수
container_timeout: 컨테이너 처리 대기 타임아웃 ()
container_poll_interval: 컨테이너 상태 확인 간격 ()
"""
if not access_token:
raise ValueError("access_token은 필수입니다.")
self.access_token = access_token
self.base_url = base_url or self.DEFAULT_BASE_URL
self.timeout = timeout
self.max_retries = max_retries
self.container_timeout = container_timeout
self.container_poll_interval = container_poll_interval
self._client: Optional[httpx.AsyncClient] = None
self._account_id: Optional[str] = None
self._account_id_lock: asyncio.Lock = asyncio.Lock()
async def __aenter__(self) -> "InstagramClient":
"""비동기 컨텍스트 매니저 진입"""
self._client = httpx.AsyncClient(
timeout=httpx.Timeout(self.timeout),
follow_redirects=True,
)
logger.debug("[InstagramClient] HTTP 클라이언트 초기화 완료")
return self
async def __aexit__(self, exc_type, exc_val, exc_tb) -> None:
"""비동기 컨텍스트 매니저 종료"""
if self._client:
await self._client.aclose()
self._client = None
logger.debug("[InstagramClient] HTTP 클라이언트 종료")
def _get_client(self) -> httpx.AsyncClient:
"""HTTP 클라이언트 반환"""
if self._client is None:
raise RuntimeError(
"InstagramClient는 비동기 컨텍스트 매니저로 사용해야 합니다. "
"예: async with InstagramClient(access_token=...) as client:"
)
return self._client
def _build_url(self, endpoint: str) -> str:
"""API URL 생성"""
return f"{self.base_url}/{endpoint}"
async def _request(
self,
method: str,
endpoint: str,
params: Optional[dict[str, Any]] = None,
data: Optional[dict[str, Any]] = None,
) -> dict[str, Any]:
"""
공통 HTTP 요청 처리
- Rate Limit 지수 백오프 재시도
- 에러 응답 InstagramAPIError 발생
"""
client = self._get_client()
url = self._build_url(endpoint)
params = params or {}
params["access_token"] = self.access_token
retry_base_delay = 1.0
last_exception: Optional[Exception] = None
for attempt in range(self.max_retries + 1):
try:
logger.debug(
f"[API] {method} {endpoint} (attempt {attempt + 1}/{self.max_retries + 1})"
)
response = await client.request(
method=method,
url=url,
params=params,
data=data,
)
# Rate Limit 체크 (429)
if response.status_code == 429:
retry_after = int(response.headers.get("Retry-After", 60))
if attempt < self.max_retries:
wait_time = max(retry_base_delay * (2**attempt), retry_after)
logger.warning(f"Rate limit 초과. {wait_time}초 후 재시도...")
await asyncio.sleep(wait_time)
continue
raise Exception(
f"[RateLimit] Rate limit 초과 (최대 재시도 횟수 도달) | retry_after={retry_after}s"
)
# 서버 에러 재시도 (5xx)
if response.status_code >= 500:
if attempt < self.max_retries:
wait_time = retry_base_delay * (2**attempt)
logger.warning(f"서버 에러 {response.status_code}. {wait_time}초 후 재시도...")
await asyncio.sleep(wait_time)
continue
response.raise_for_status()
# JSON 파싱
response_data = response.json()
# API 에러 체크 (Instagram API는 200 응답에도 error 포함 가능)
if "error" in response_data:
error_response = ErrorResponse.model_validate(response_data)
err = error_response.error
logger.error(f"[API Error] code={err.code}, message={err.message}")
error_msg = f"[InstagramAPI] {err.message} | code={err.code}"
if err.error_subcode:
error_msg += f" | subcode={err.error_subcode}"
if err.fbtrace_id:
error_msg += f" | fbtrace_id={err.fbtrace_id}"
raise Exception(error_msg)
return response_data
except httpx.HTTPError as e:
last_exception = e
if attempt < self.max_retries:
wait_time = retry_base_delay * (2**attempt)
logger.warning(f"HTTP 에러: {e}. {wait_time}초 후 재시도...")
await asyncio.sleep(wait_time)
continue
raise
raise last_exception or Exception("[InstagramAPI] 최대 재시도 횟수 초과")
async def _wait_for_container(
self,
container_id: str,
timeout: Optional[float] = None,
) -> MediaContainer:
"""컨테이너 상태가 FINISHED가 될 때까지 대기"""
timeout = timeout or self.container_timeout
start_time = time.monotonic()
logger.debug(f"[Container] 대기 시작: {container_id}, timeout={timeout}s")
while True:
elapsed = time.monotonic() - start_time
if elapsed >= timeout:
raise Exception(
f"[ContainerTimeout] 컨테이너 처리 타임아웃 ({timeout}초 초과): {container_id}"
)
response = await self._request(
method="GET",
endpoint=container_id,
params={"fields": "status_code,status"},
)
container = MediaContainer.model_validate(response)
logger.debug(f"[Container] status={container.status_code}, elapsed={elapsed:.1f}s")
if container.is_finished:
logger.info(f"[Container] 완료: {container_id}")
return container
if container.is_error:
raise Exception(f"[ContainerStatus] 컨테이너 처리 실패: {container.status}")
await asyncio.sleep(self.container_poll_interval)
async def get_account_id(self) -> str:
"""계정 ID 조회 (접속 테스트용)"""
if self._account_id:
return self._account_id
async with self._account_id_lock:
if self._account_id:
return self._account_id
response = await self._request(
method="GET",
endpoint="me",
params={"fields": "id"},
)
account_id: str = response["id"]
self._account_id = account_id
logger.debug(f"[Account] ID 조회 완료: {account_id}")
return account_id
async def get_media(self, media_id: str) -> Media:
"""
미디어 상세 조회
Args:
media_id: 미디어 ID
Returns:
Media: 미디어 상세 정보
"""
logger.info(f"[get_media] media_id={media_id}")
response = await self._request(
method="GET",
endpoint=media_id,
params={
"fields": "id,media_type,media_url,thumbnail_url,caption,timestamp,permalink,like_count,comments_count",
},
)
result = Media.model_validate(response)
logger.info(f"[get_media] 완료: type={result.media_type}, permalink={result.permalink}")
return result
async def publish_video(
self,
video_url: str,
caption: Optional[str] = None,
share_to_feed: bool = True,
) -> Media:
"""
비디오/릴스 게시
Args:
video_url: 공개 접근 가능한 비디오 URL (MP4 권장)
caption: 게시물 캡션
share_to_feed: 피드에 공유 여부
Returns:
Media: 게시된 미디어 정보
"""
logger.info(f"[publish_video] 시작: {video_url[:50]}...")
account_id = await self.get_account_id()
# Step 1: Container 생성
container_params: dict[str, Any] = {
"media_type": "REELS",
"video_url": video_url,
"share_to_feed": str(share_to_feed).lower(),
}
if caption:
container_params["caption"] = caption
container_response = await self._request(
method="POST",
endpoint=f"{account_id}/media",
params=container_params,
)
container_id = container_response["id"]
logger.debug(f"[publish_video] Container 생성: {container_id}")
# Step 2: Container 상태 대기 (비디오는 더 오래 걸림)
await self._wait_for_container(container_id, timeout=self.container_timeout * 2)
# Step 3: 게시
publish_response = await self._request(
method="POST",
endpoint=f"{account_id}/media_publish",
params={"creation_id": container_id},
)
media_id = publish_response["id"]
result = await self.get_media(media_id)
logger.info(f"[publish_video] 완료: {result.permalink}")
return result

558
insta_plan.md Normal file
View File

@ -0,0 +1,558 @@
# Instagram POC 예외 처리 단순화 작업 계획서
## 개요
`poc/instagram/exceptions.py` 파일을 삭제하고, `client.py` 상단에 **ErrorState Enum과 에러 처리 유틸리티**를 정의하여 일관된 에러 처리 구조를 구현합니다.
---
## 최종 파일 구조
```
poc/instagram/
├── client.py # ErrorState + parse_instagram_error + InstagramClient
├── models.py
├── __init__.py # client.py에서 ErrorState, parse_instagram_error export
└── (exceptions.py 삭제)
```
---
## 작업 계획
### 1단계: client.py 상단에 에러 처리 코드 추가
**파일**: `poc/instagram/client.py`
**위치**: import 문 다음, InstagramClient 클래스 이전
**추가할 코드**:
```python
import re
from enum import Enum
# ============================================================
# Error State & Parser
# ============================================================
class ErrorState(str, Enum):
"""Instagram API 에러 상태"""
RATE_LIMIT = "rate_limit"
AUTH_ERROR = "auth_error"
CONTAINER_TIMEOUT = "container_timeout"
CONTAINER_ERROR = "container_error"
API_ERROR = "api_error"
UNKNOWN = "unknown"
def parse_instagram_error(e: Exception) -> tuple[ErrorState, str, dict]:
"""
Instagram 예외를 파싱하여 상태, 메시지, 추가 정보를 반환
Args:
e: 발생한 예외
Returns:
tuple: (error_state, message, extra_info)
Example:
>>> error_state, message, extra_info = parse_instagram_error(e)
>>> if error_state == ErrorState.RATE_LIMIT:
... retry_after = extra_info.get("retry_after", 60)
"""
error_str = str(e)
extra_info = {}
# Rate Limit 에러
if "[RateLimit]" in error_str:
match = re.search(r"retry_after=(\d+)s", error_str)
if match:
extra_info["retry_after"] = int(match.group(1))
return ErrorState.RATE_LIMIT, "API 호출 제한 초과", extra_info
# 인증 에러 (code=190)
if "code=190" in error_str:
return ErrorState.AUTH_ERROR, "인증 실패 (토큰 만료 또는 무효)", extra_info
# 컨테이너 타임아웃
if "[ContainerTimeout]" in error_str:
match = re.search(r"\((\d+)초 초과\)", error_str)
if match:
extra_info["timeout"] = int(match.group(1))
return ErrorState.CONTAINER_TIMEOUT, "미디어 처리 시간 초과", extra_info
# 컨테이너 상태 에러
if "[ContainerStatus]" in error_str:
match = re.search(r"처리 실패: (\w+)", error_str)
if match:
extra_info["status"] = match.group(1)
return ErrorState.CONTAINER_ERROR, "미디어 컨테이너 처리 실패", extra_info
# Instagram API 에러
if "[InstagramAPI]" in error_str:
match = re.search(r"code=(\d+)", error_str)
if match:
extra_info["code"] = int(match.group(1))
return ErrorState.API_ERROR, "Instagram API 오류", extra_info
return ErrorState.UNKNOWN, str(e), extra_info
```
---
### 2단계: client.py import 문 수정
**파일**: `poc/instagram/client.py`
**변경 전** (line 24-30):
```python
from .exceptions import (
ContainerStatusError,
ContainerTimeoutError,
InstagramAPIError,
RateLimitError,
create_exception_from_error,
)
```
**변경 후**:
```python
# (삭제 - ErrorState와 parse_instagram_error를 직접 정의)
```
**import 추가**:
```python
import re
from enum import Enum
```
---
### 3단계: 예외 발생 코드 수정
#### 3-1. Rate Limit 에러 (line 159-162)
**변경 전**:
```python
raise RateLimitError(
message="Rate limit 초과 (최대 재시도 횟수 도달)",
retry_after=retry_after,
)
```
**변경 후**:
```python
raise Exception(f"[RateLimit] Rate limit 초과 (최대 재시도 횟수 도달) | retry_after={retry_after}s")
```
---
#### 3-2. API 에러 응답 (line 177-186)
**변경 전**:
```python
if "error" in response_data:
error_response = ErrorResponse.model_validate(response_data)
err = error_response.error
logger.error(f"[API Error] code={err.code}, message={err.message}")
raise create_exception_from_error(
message=err.message,
code=err.code,
subcode=err.error_subcode,
fbtrace_id=err.fbtrace_id,
)
```
**변경 후**:
```python
if "error" in response_data:
error_response = ErrorResponse.model_validate(response_data)
err = error_response.error
logger.error(f"[API Error] code={err.code}, message={err.message}")
error_msg = f"[InstagramAPI] {err.message} | code={err.code}"
if err.error_subcode:
error_msg += f" | subcode={err.error_subcode}"
if err.fbtrace_id:
error_msg += f" | fbtrace_id={err.fbtrace_id}"
raise Exception(error_msg)
```
---
#### 3-3. 예외 재발생 (line 190-191)
**변경 전**:
```python
except InstagramAPIError:
raise
```
**변경 후**:
```python
except Exception:
raise
```
---
#### 3-4. 최대 재시도 초과 (line 201)
**변경 전**:
```python
raise last_exception or InstagramAPIError("최대 재시도 횟수 초과")
```
**변경 후**:
```python
raise last_exception or Exception("[InstagramAPI] 최대 재시도 횟수 초과")
```
---
#### 3-5. 컨테이너 타임아웃 (line 217-218)
**변경 전**:
```python
raise ContainerTimeoutError(
f"컨테이너 처리 타임아웃 ({timeout}초 초과): {container_id}"
)
```
**변경 후**:
```python
raise Exception(f"[ContainerTimeout] 컨테이너 처리 타임아웃 ({timeout}초 초과): {container_id}")
```
---
#### 3-6. 컨테이너 상태 에러 (line 235)
**변경 전**:
```python
raise ContainerStatusError(f"컨테이너 처리 실패: {container.status}")
```
**변경 후**:
```python
raise Exception(f"[ContainerStatus] 컨테이너 처리 실패: {container.status}")
```
---
### 4단계: __init__.py 수정
**파일**: `poc/instagram/__init__.py`
**변경 전** (line 18-25):
```python
from poc.instagram.client import InstagramClient
from poc.instagram.exceptions import (
InstagramAPIError,
AuthenticationError,
RateLimitError,
ContainerStatusError,
ContainerTimeoutError,
)
```
**변경 후**:
```python
from poc.instagram.client import (
InstagramClient,
ErrorState,
parse_instagram_error,
)
```
**__all__ 수정**:
```python
__all__ = [
# Client
"InstagramClient",
# Error handling
"ErrorState",
"parse_instagram_error",
# Models
"Media",
"MediaList",
"MediaContainer",
"APIError",
"ErrorResponse",
]
```
---
### 5단계: main.py 수정
**파일**: `poc/instagram/main.py`
**변경 전** (line 13):
```python
from poc.instagram.exceptions import InstagramAPIError
```
**변경 후**:
```python
from poc.instagram import ErrorState, parse_instagram_error
```
**예외 처리 수정**:
```python
# 변경 전
except InstagramAPIError as e:
logger.error(f"API 에러: {e}")
# 변경 후
except Exception as e:
error_state, message, extra_info = parse_instagram_error(e)
if error_state == ErrorState.RATE_LIMIT:
retry_after = extra_info.get("retry_after", 60)
logger.error(f"Rate Limit: {message} (재시도: {retry_after}초)")
elif error_state == ErrorState.AUTH_ERROR:
logger.error(f"인증 에러: {message}")
elif error_state == ErrorState.CONTAINER_TIMEOUT:
logger.error(f"타임아웃: {message}")
elif error_state == ErrorState.CONTAINER_ERROR:
status = extra_info.get("status", "UNKNOWN")
logger.error(f"컨테이너 에러: {message} (상태: {status})")
else:
logger.error(f"API 에러: {message}")
```
---
### 6단계: main_ori.py 수정
**파일**: `poc/instagram/main_ori.py`
**변경 전** (line 271-274):
```python
from poc.instagram.exceptions import (
AuthenticationError,
InstagramAPIError,
RateLimitError,
)
```
**변경 후**:
```python
from poc.instagram import ErrorState, parse_instagram_error
```
**예외 처리 수정** (line 289-298):
```python
# 변경 전
except AuthenticationError as e:
print(f"[성공] AuthenticationError 발생: {e}")
except RateLimitError as e:
print(f"[성공] RateLimitError 발생: {e}")
except InstagramAPIError as e:
print(f"[성공] InstagramAPIError 발생: {e}")
# 변경 후
except Exception as e:
error_state, message, extra_info = parse_instagram_error(e)
match error_state:
case ErrorState.RATE_LIMIT:
print(f"[성공] Rate Limit 에러: {message}")
case ErrorState.AUTH_ERROR:
print(f"[성공] 인증 에러: {message}")
case ErrorState.CONTAINER_TIMEOUT:
print(f"[성공] 타임아웃 에러: {message}")
case ErrorState.CONTAINER_ERROR:
print(f"[성공] 컨테이너 에러: {message}")
case _:
print(f"[성공] API 에러: {message}")
```
---
### 7단계: exceptions.py 삭제
**파일**: `poc/instagram/exceptions.py`
**작업**: 파일 삭제
---
## 최종 client.py 구조
```python
"""
Instagram Graph API Client
"""
import asyncio
import logging
import re
import time
from enum import Enum
from typing import Any, Optional
import httpx
from .models import ErrorResponse, Media, MediaContainer
logger = logging.getLogger(__name__)
# ============================================================
# Error State & Parser
# ============================================================
class ErrorState(str, Enum):
"""Instagram API 에러 상태"""
RATE_LIMIT = "rate_limit"
AUTH_ERROR = "auth_error"
CONTAINER_TIMEOUT = "container_timeout"
CONTAINER_ERROR = "container_error"
API_ERROR = "api_error"
UNKNOWN = "unknown"
def parse_instagram_error(e: Exception) -> tuple[ErrorState, str, dict]:
"""Instagram 예외를 파싱하여 상태, 메시지, 추가 정보를 반환"""
# ... (구현부)
# ============================================================
# Instagram Client
# ============================================================
class InstagramClient:
"""Instagram Graph API 비동기 클라이언트"""
# ... (기존 코드)
```
---
## 에러 메시지 형식
| 에러 유형 | 메시지 prefix | ErrorState | 예시 |
|----------|--------------|------------|------|
| Rate Limit | `[RateLimit]` | `RATE_LIMIT` | `[RateLimit] Rate limit 초과 \| retry_after=60s` |
| 인증 에러 | `[InstagramAPI]` + code=190 | `AUTH_ERROR` | `[InstagramAPI] Invalid token \| code=190` |
| API 에러 | `[InstagramAPI]` | `API_ERROR` | `[InstagramAPI] Error \| code=100` |
| 컨테이너 타임아웃 | `[ContainerTimeout]` | `CONTAINER_TIMEOUT` | `[ContainerTimeout] 타임아웃 (300초 초과)` |
| 컨테이너 에러 | `[ContainerStatus]` | `CONTAINER_ERROR` | `[ContainerStatus] 처리 실패: ERROR` |
---
## 작업 체크리스트
- [ ] 1단계: client.py 상단에 ErrorState Enum 및 parse_instagram_error 추가
- [ ] 2단계: client.py import 문 수정 (re, Enum 추가, exceptions import 삭제)
- [ ] 3단계: client.py 예외 발생 코드 6곳 수정
- [ ] line 159-162: RateLimitError → Exception
- [ ] line 177-186: create_exception_from_error → Exception
- [ ] line 190-191: InstagramAPIError → Exception
- [ ] line 201: InstagramAPIError → Exception
- [ ] line 217-218: ContainerTimeoutError → Exception
- [ ] line 235: ContainerStatusError → Exception
- [ ] 4단계: __init__.py 수정 (ErrorState, parse_instagram_error export)
- [ ] 5단계: main.py 수정 (ErrorState 활용)
- [ ] 6단계: main_ori.py 수정 (ErrorState 활용)
- [ ] 7단계: exceptions.py 파일 삭제
---
## 사용 예시
### 기본 사용법
```python
from poc.instagram import InstagramClient, ErrorState, parse_instagram_error
async def publish_video(video_url: str, caption: str):
async with InstagramClient(access_token="TOKEN") as client:
try:
media = await client.publish_video(video_url=video_url, caption=caption)
return {"success": True, "state": "completed", "data": media}
except Exception as e:
error_state, message, extra_info = parse_instagram_error(e)
return {
"success": False,
"state": error_state.value,
"message": message,
**extra_info
}
```
### match-case 활용 (Python 3.10+)
```python
except Exception as e:
error_state, message, extra_info = parse_instagram_error(e)
match error_state:
case ErrorState.RATE_LIMIT:
retry_after = extra_info.get("retry_after", 60)
await asyncio.sleep(retry_after)
# 재시도 로직...
case ErrorState.AUTH_ERROR:
# 토큰 갱신 로직...
case ErrorState.CONTAINER_TIMEOUT:
# 재시도 또는 알림...
case ErrorState.CONTAINER_ERROR:
# 실패 처리...
case _:
# 기본 에러 처리...
```
### 응답 예시
```python
# Rate Limit 에러
{
"success": False,
"state": "rate_limit",
"message": "API 호출 제한 초과",
"retry_after": 60
}
# 인증 에러
{
"success": False,
"state": "auth_error",
"message": "인증 실패 (토큰 만료 또는 무효)"
}
# 컨테이너 타임아웃
{
"success": False,
"state": "container_timeout",
"message": "미디어 처리 시간 초과",
"timeout": 300
}
# 컨테이너 에러
{
"success": False,
"state": "container_error",
"message": "미디어 컨테이너 처리 실패",
"status": "ERROR"
}
# API 에러
{
"success": False,
"state": "api_error",
"message": "Instagram API 오류",
"code": 100
}
```
---
## 장점
1. **단일 파일 관리**: client.py 하나에서 클라이언트와 에러 처리 모두 관리
2. **일관된 에러 형식**: ErrorState Enum으로 타입 안전한 에러 구분
3. **IDE 지원**: 자동완성, 타입 힌트 지원
4. **파싱 유틸리티**: parse_instagram_error로 에러 메시지에서 정보 추출
5. **유연한 처리**: match-case 또는 if-elif로 에러 타입별 처리 가능

17
main.py
View File

@ -14,6 +14,7 @@ from app.user.models import User, RefreshToken # noqa: F401
from app.archive.api.routers.v1.archive import router as archive_router from app.archive.api.routers.v1.archive import router as archive_router
from app.home.api.routers.v1.home import router as home_router from app.home.api.routers.v1.home import router as home_router
from app.user.api.routers.v1.auth import router as auth_router, test_router as auth_test_router from app.user.api.routers.v1.auth import router as auth_router, test_router as auth_test_router
from app.user.api.routers.v1.social_account import router as social_account_router
from app.lyric.api.routers.v1.lyric import router as lyric_router from app.lyric.api.routers.v1.lyric import router as lyric_router
from app.song.api.routers.v1.song import router as song_router from app.song.api.routers.v1.song import router as song_router
from app.sns.api.routers.v1.sns import router as sns_router from app.sns.api.routers.v1.sns import router as sns_router
@ -46,6 +47,21 @@ tags_metadata = [
3. `access_token` 입력 (Bearer 접두사 없이 토큰만 입력) 3. `access_token` 입력 (Bearer 접두사 없이 토큰만 입력)
4. **Authorize** 클릭하여 저장 4. **Authorize** 클릭하여 저장
5. 이후 인증이 필요한 API 호출 자동으로 토큰이 포함됨 5. 이후 인증이 필요한 API 호출 자동으로 토큰이 포함됨
""",
},
{
"name": "Social Account",
"description": """소셜 계정 연동 API - YouTube, Instagram, Facebook, TikTok
**인증: 필요** - `Authorization: Bearer {access_token}` 헤더 필수
## 주요 기능
- `GET /user/social-accounts` - 연동된 소셜 계정 목록 조회
- `GET /user/social-accounts/{account_id}` - 소셜 계정 상세 조회
- `POST /user/social-accounts` - 소셜 계정 연동
- `PATCH /user/social-accounts/{account_id}` - 소셜 계정 정보 수정
- `DELETE /user/social-accounts/{account_id}` - 소셜 계정 연동 해제
""", """,
}, },
# { # {
@ -275,6 +291,7 @@ def get_scalar_docs():
app.include_router(home_router) app.include_router(home_router)
app.include_router(auth_router, prefix="/user") # Auth API 라우터 추가 app.include_router(auth_router, prefix="/user") # Auth API 라우터 추가
app.include_router(social_account_router, prefix="/user") # Social Account API 라우터 추가
app.include_router(lyric_router) app.include_router(lyric_router)
app.include_router(song_router) app.include_router(song_router)
app.include_router(video_router) app.include_router(video_router)

View File

@ -15,15 +15,12 @@ Example:
``` ```
""" """
from poc.instagram.client import InstagramClient from poc.instagram.client import (
from poc.instagram.exceptions import ( InstagramClient,
InstagramAPIError, ErrorState,
AuthenticationError, parse_instagram_error,
RateLimitError,
ContainerStatusError,
ContainerTimeoutError,
) )
from poc.instagram.models import ( from poc.instagram.sns_schema import (
Media, Media,
MediaList, MediaList,
MediaContainer, MediaContainer,
@ -34,12 +31,9 @@ from poc.instagram.models import (
__all__ = [ __all__ = [
# Client # Client
"InstagramClient", "InstagramClient",
# Exceptions # Error handling
"InstagramAPIError", "ErrorState",
"AuthenticationError", "parse_instagram_error",
"RateLimitError",
"ContainerStatusError",
"ContainerTimeoutError",
# Models # Models
"Media", "Media",
"MediaList", "MediaList",

View File

@ -16,23 +16,92 @@ Example:
import asyncio import asyncio
import logging import logging
import re
import time import time
from enum import Enum
from typing import Any, Optional from typing import Any, Optional
import httpx import httpx
from .exceptions import ( from .sns_schema import ErrorResponse, Media, MediaContainer
ContainerStatusError,
ContainerTimeoutError,
InstagramAPIError,
RateLimitError,
create_exception_from_error,
)
from .models import ErrorResponse, Media, MediaContainer
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
# ============================================================
# Error State & Parser
# ============================================================
class ErrorState(str, Enum):
"""Instagram API 에러 상태"""
RATE_LIMIT = "rate_limit"
AUTH_ERROR = "auth_error"
CONTAINER_TIMEOUT = "container_timeout"
CONTAINER_ERROR = "container_error"
API_ERROR = "api_error"
UNKNOWN = "unknown"
def parse_instagram_error(e: Exception) -> tuple[ErrorState, str, dict]:
"""
Instagram 예외를 파싱하여 상태, 메시지, 추가 정보를 반환
Args:
e: 발생한 예외
Returns:
tuple: (error_state, message, extra_info)
Example:
>>> error_state, message, extra_info = parse_instagram_error(e)
>>> if error_state == ErrorState.RATE_LIMIT:
... retry_after = extra_info.get("retry_after", 60)
"""
error_str = str(e)
extra_info = {}
# Rate Limit 에러
if "[RateLimit]" in error_str:
match = re.search(r"retry_after=(\d+)s", error_str)
if match:
extra_info["retry_after"] = int(match.group(1))
return ErrorState.RATE_LIMIT, "API 호출 제한 초과", extra_info
# 인증 에러 (code=190)
if "code=190" in error_str:
return ErrorState.AUTH_ERROR, "인증 실패 (토큰 만료 또는 무효)", extra_info
# 컨테이너 타임아웃
if "[ContainerTimeout]" in error_str:
match = re.search(r"\((\d+)초 초과\)", error_str)
if match:
extra_info["timeout"] = int(match.group(1))
return ErrorState.CONTAINER_TIMEOUT, "미디어 처리 시간 초과", extra_info
# 컨테이너 상태 에러
if "[ContainerStatus]" in error_str:
match = re.search(r"처리 실패: (\w+)", error_str)
if match:
extra_info["status"] = match.group(1)
return ErrorState.CONTAINER_ERROR, "미디어 컨테이너 처리 실패", extra_info
# Instagram API 에러
if "[InstagramAPI]" in error_str:
match = re.search(r"code=(\d+)", error_str)
if match:
extra_info["code"] = int(match.group(1))
return ErrorState.API_ERROR, "Instagram API 오류", extra_info
return ErrorState.UNKNOWN, str(e), extra_info
# ============================================================
# Instagram Client
# ============================================================
class InstagramClient: class InstagramClient:
""" """
Instagram Graph API 비동기 클라이언트 (비디오 업로드 전용) Instagram Graph API 비동기 클라이언트 (비디오 업로드 전용)
@ -156,9 +225,8 @@ class InstagramClient:
logger.warning(f"Rate limit 초과. {wait_time}초 후 재시도...") logger.warning(f"Rate limit 초과. {wait_time}초 후 재시도...")
await asyncio.sleep(wait_time) await asyncio.sleep(wait_time)
continue continue
raise RateLimitError( raise Exception(
message="Rate limit 초과 (최대 재시도 횟수 도달)", f"[RateLimit] Rate limit 초과 (최대 재시도 횟수 도달) | retry_after={retry_after}s"
retry_after=retry_after,
) )
# 서버 에러 재시도 (5xx) # 서버 에러 재시도 (5xx)
@ -178,17 +246,15 @@ class InstagramClient:
error_response = ErrorResponse.model_validate(response_data) error_response = ErrorResponse.model_validate(response_data)
err = error_response.error err = error_response.error
logger.error(f"[API Error] code={err.code}, message={err.message}") logger.error(f"[API Error] code={err.code}, message={err.message}")
raise create_exception_from_error( error_msg = f"[InstagramAPI] {err.message} | code={err.code}"
message=err.message, if err.error_subcode:
code=err.code, error_msg += f" | subcode={err.error_subcode}"
subcode=err.error_subcode, if err.fbtrace_id:
fbtrace_id=err.fbtrace_id, error_msg += f" | fbtrace_id={err.fbtrace_id}"
) raise Exception(error_msg)
return response_data return response_data
except InstagramAPIError:
raise
except httpx.HTTPError as e: except httpx.HTTPError as e:
last_exception = e last_exception = e
if attempt < self.max_retries: if attempt < self.max_retries:
@ -198,7 +264,7 @@ class InstagramClient:
continue continue
raise raise
raise last_exception or InstagramAPIError("최대 재시도 횟수 초과") raise last_exception or Exception("[InstagramAPI] 최대 재시도 횟수 초과")
async def _wait_for_container( async def _wait_for_container(
self, self,
@ -214,8 +280,8 @@ class InstagramClient:
while True: while True:
elapsed = time.monotonic() - start_time elapsed = time.monotonic() - start_time
if elapsed >= timeout: if elapsed >= timeout:
raise ContainerTimeoutError( raise Exception(
f"컨테이너 처리 타임아웃 ({timeout}초 초과): {container_id}" f"[ContainerTimeout] 컨테이너 처리 타임아웃 ({timeout}초 초과): {container_id}"
) )
response = await self._request( response = await self._request(
@ -232,7 +298,7 @@ class InstagramClient:
return container return container
if container.is_error: if container.is_error:
raise ContainerStatusError(f"컨테이너 처리 실패: {container.status}") raise Exception(f"[ContainerStatus] 컨테이너 처리 실패: {container.status}")
await asyncio.sleep(self.container_poll_interval) await asyncio.sleep(self.container_poll_interval)

View File

@ -1,142 +0,0 @@
"""
Instagram Graph API 커스텀 예외 모듈
Instagram API 에러 코드에 맞는 예외 클래스를 정의합니다.
"""
from typing import Optional
class InstagramAPIError(Exception):
"""
Instagram API 기본 예외
모든 Instagram API 관련 예외의 기본 클래스입니다.
Attributes:
message: 에러 메시지
code: Instagram API 에러 코드
subcode: Instagram API 에러 서브코드
fbtrace_id: Facebook 트레이스 ID (디버깅용)
"""
def __init__(
self,
message: str,
code: Optional[int] = None,
subcode: Optional[int] = None,
fbtrace_id: Optional[str] = None,
):
self.message = message
self.code = code
self.subcode = subcode
self.fbtrace_id = fbtrace_id
super().__init__(self.message)
def __str__(self) -> str:
parts = [self.message]
if self.code is not None:
parts.append(f"code={self.code}")
if self.subcode is not None:
parts.append(f"subcode={self.subcode}")
if self.fbtrace_id:
parts.append(f"fbtrace_id={self.fbtrace_id}")
return " | ".join(parts)
class AuthenticationError(InstagramAPIError):
"""
인증 관련 에러
토큰이 만료되었거나, 유효하지 않거나, 권한이 없는 경우 발생합니다.
"""
pass
class RateLimitError(InstagramAPIError):
"""
Rate Limit 초과 에러
시간당 API 호출 제한을 초과한 경우 발생합니다.
Attributes:
retry_after: 재시도까지 대기해야 하는 시간 ()
"""
def __init__(
self,
message: str,
retry_after: Optional[int] = None,
code: Optional[int] = 4,
subcode: Optional[int] = None,
fbtrace_id: Optional[str] = None,
):
super().__init__(message, code, subcode, fbtrace_id)
self.retry_after = retry_after
def __str__(self) -> str:
base = super().__str__()
if self.retry_after is not None:
return f"{base} | retry_after={self.retry_after}s"
return base
class ContainerStatusError(InstagramAPIError):
"""
컨테이너 상태 에러
미디어 컨테이너가 ERROR 상태가 되었을 발생합니다.
"""
pass
class ContainerTimeoutError(InstagramAPIError):
"""
컨테이너 타임아웃 에러
미디어 컨테이너가 지정된 시간 내에 FINISHED 상태가 되지 않은 경우 발생합니다.
"""
pass
# 에러 코드 → 예외 클래스 매핑
ERROR_CODE_MAPPING: dict[int, type[InstagramAPIError]] = {
4: RateLimitError,
17: RateLimitError,
190: AuthenticationError,
341: RateLimitError,
}
def create_exception_from_error(
message: str,
code: Optional[int] = None,
subcode: Optional[int] = None,
fbtrace_id: Optional[str] = None,
) -> InstagramAPIError:
"""
API 에러 응답에서 적절한 예외 객체 생성
Args:
message: 에러 메시지
code: API 에러 코드
subcode: API 에러 서브코드
fbtrace_id: Facebook 트레이스 ID
Returns:
적절한 예외 클래스의 인스턴스
"""
exception_class = InstagramAPIError
if code is not None:
exception_class = ERROR_CODE_MAPPING.get(code, InstagramAPIError)
return exception_class(
message=message,
code=code,
subcode=subcode,
fbtrace_id=fbtrace_id,
)

View File

@ -9,8 +9,7 @@ import asyncio
import logging import logging
import sys import sys
from poc.instagram.client import InstagramClient from poc.instagram import InstagramClient, ErrorState, parse_instagram_error
from poc.instagram.exceptions import InstagramAPIError
# 로깅 설정 # 로깅 설정
logging.basicConfig( logging.basicConfig(
@ -42,8 +41,12 @@ async def main():
account_id = await client.get_account_id() account_id = await client.get_account_id()
print("[성공] 접속 확인 완료") print("[성공] 접속 확인 완료")
print(f" Account ID: {account_id}") print(f" Account ID: {account_id}")
except InstagramAPIError as e: except Exception as e:
print(f"[실패] 접속 실패: {e}") error_state, message, extra_info = parse_instagram_error(e)
if error_state == ErrorState.AUTH_ERROR:
print(f"[실패] 인증 실패: {message}")
else:
print(f"[실패] 접속 실패: {message}")
return return
# Step 2: 비디오 업로드 # Step 2: 비디오 업로드
@ -62,8 +65,18 @@ async def main():
print("\n[성공] 비디오 업로드 완료!") print("\n[성공] 비디오 업로드 완료!")
print(f" 미디어 ID: {media.id}") print(f" 미디어 ID: {media.id}")
print(f" 링크: {media.permalink}") print(f" 링크: {media.permalink}")
except InstagramAPIError as e: except Exception as e:
print(f"\n[실패] 업로드 실패: {e}") error_state, message, extra_info = parse_instagram_error(e)
if error_state == ErrorState.RATE_LIMIT:
retry_after = extra_info.get("retry_after", 60)
print(f"\n[실패] Rate Limit: {message} (재시도: {retry_after}초)")
elif error_state == ErrorState.CONTAINER_TIMEOUT:
print(f"\n[실패] 타임아웃: {message}")
elif error_state == ErrorState.CONTAINER_ERROR:
status = extra_info.get("status", "UNKNOWN")
print(f"\n[실패] 컨테이너 에러: {message} (상태: {status})")
else:
print(f"\n[실패] 업로드 실패: {message}")
return return
# Step 3: 업로드 확인 # Step 3: 업로드 확인
@ -79,8 +92,9 @@ async def main():
print(f" 게시일: {verified_media.timestamp}") print(f" 게시일: {verified_media.timestamp}")
if verified_media.caption: if verified_media.caption:
print(f" 캡션: {verified_media.caption}") print(f" 캡션: {verified_media.caption}")
except InstagramAPIError as e: except Exception as e:
print(f"[실패] 확인 실패: {e}") error_state, message, _ = parse_instagram_error(e)
print(f"[실패] 확인 실패: {message}")
return return
print("\n" + "=" * 60) print("\n" + "=" * 60)

View File

@ -267,12 +267,7 @@ async def test_publish_carousel():
async def test_error_handling(): async def test_error_handling():
"""에러 처리 테스트""" """에러 처리 테스트"""
from poc.instagram.client import InstagramClient from poc.instagram import InstagramClient, ErrorState, parse_instagram_error
from poc.instagram.exceptions import (
AuthenticationError,
InstagramAPIError,
RateLimitError,
)
print("\n" + "=" * 60) print("\n" + "=" * 60)
print("6. 에러 처리 테스트") print("6. 에러 처리 테스트")
@ -286,20 +281,26 @@ async def test_error_handling():
await client.get_media_list(limit=1) await client.get_media_list(limit=1)
print("[실패] 예외가 발생하지 않음") print("[실패] 예외가 발생하지 않음")
except AuthenticationError as e:
print(f"[성공] AuthenticationError 발생: {e}")
except RateLimitError as e:
print(f"[성공] RateLimitError 발생: {e}")
if e.retry_after:
print(f" 재시도 대기 시간: {e.retry_after}")
except InstagramAPIError as e:
print(f"[성공] InstagramAPIError 발생: {e}")
print(f" 코드: {e.code}, 서브코드: {e.subcode}")
except Exception as e: except Exception as e:
print(f"[성공] 예외 발생: {type(e).__name__}: {e}") error_state, message, extra_info = parse_instagram_error(e)
match error_state:
case ErrorState.RATE_LIMIT:
retry_after = extra_info.get("retry_after", 60)
print(f"[성공] Rate Limit 에러: {message}")
print(f" 재시도 대기 시간: {retry_after}")
case ErrorState.AUTH_ERROR:
print(f"[성공] 인증 에러: {message}")
case ErrorState.CONTAINER_TIMEOUT:
print(f"[성공] 타임아웃 에러: {message}")
case ErrorState.CONTAINER_ERROR:
status = extra_info.get("status", "UNKNOWN")
print(f"[성공] 컨테이너 에러: {message} (상태: {status})")
case _:
code = extra_info.get("code")
print(f"[성공] API 에러: {message}")
if code:
print(f" 코드: {code}")
async def main(): async def main():