""" Instagram Graph API Client Instagram Graph API를 사용한 비디오/릴스 게시를 위한 비동기 클라이언트입니다. Example: ```python async with InstagramClient(access_token="YOUR_TOKEN") as client: media = await client.publish_video( video_url="https://example.com/video.mp4", caption="Hello Instagram!" ) print(f"게시 완료: {media.permalink}") ``` """ import asyncio import logging import re import time from enum import Enum from typing import Any, Optional import httpx from app.sns.schemas.sns_schema import ErrorResponse, Media, MediaContainer logger = logging.getLogger(__name__) # ============================================================ # Error State & Parser # ============================================================ class ErrorState(str, Enum): """Instagram API 에러 상태""" RATE_LIMIT = "rate_limit" AUTH_ERROR = "auth_error" CONTAINER_TIMEOUT = "container_timeout" CONTAINER_ERROR = "container_error" API_ERROR = "api_error" UNKNOWN = "unknown" def parse_instagram_error(e: Exception) -> tuple[ErrorState, str, dict]: """ Instagram 예외를 파싱하여 상태, 메시지, 추가 정보를 반환 Args: e: 발생한 예외 Returns: tuple: (error_state, message, extra_info) Example: >>> error_state, message, extra_info = parse_instagram_error(e) >>> if error_state == ErrorState.RATE_LIMIT: ... retry_after = extra_info.get("retry_after", 60) """ error_str = str(e) extra_info = {} # Rate Limit 에러 if "[RateLimit]" in error_str: match = re.search(r"retry_after=(\d+)s", error_str) if match: extra_info["retry_after"] = int(match.group(1)) return ErrorState.RATE_LIMIT, "API 호출 제한 초과", extra_info # 인증 에러 (code=190) if "code=190" in error_str: return ErrorState.AUTH_ERROR, "인증 실패 (토큰 만료 또는 무효)", extra_info # 컨테이너 타임아웃 if "[ContainerTimeout]" in error_str: match = re.search(r"\((\d+)초 초과\)", error_str) if match: extra_info["timeout"] = int(match.group(1)) return ErrorState.CONTAINER_TIMEOUT, "미디어 처리 시간 초과", extra_info # 컨테이너 상태 에러 if "[ContainerStatus]" in error_str: match = re.search(r"처리 실패: (\w+)", error_str) if match: extra_info["status"] = match.group(1) return ErrorState.CONTAINER_ERROR, "미디어 컨테이너 처리 실패", extra_info # Instagram API 에러 if "[InstagramAPI]" in error_str: match = re.search(r"code=(\d+)", error_str) if match: extra_info["code"] = int(match.group(1)) return ErrorState.API_ERROR, "Instagram API 오류", extra_info return ErrorState.UNKNOWN, str(e), extra_info # ============================================================ # Instagram Client # ============================================================ class InstagramClient: """ Instagram Graph API 비동기 클라이언트 (비디오 업로드 전용) Example: ```python async with InstagramClient(access_token="USER_TOKEN") as client: media = await client.publish_video( video_url="https://example.com/video.mp4", caption="My video!" ) print(f"게시됨: {media.permalink}") ``` """ DEFAULT_BASE_URL = "https://graph.instagram.com/v21.0" def __init__( self, access_token: str, *, base_url: Optional[str] = None, timeout: float = 30.0, max_retries: int = 3, container_timeout: float = 300.0, container_poll_interval: float = 5.0, ): """ 클라이언트 초기화 Args: access_token: Instagram 액세스 토큰 (필수) base_url: API 기본 URL (기본값: https://graph.instagram.com/v21.0) timeout: HTTP 요청 타임아웃 (초) max_retries: 최대 재시도 횟수 container_timeout: 컨테이너 처리 대기 타임아웃 (초) container_poll_interval: 컨테이너 상태 확인 간격 (초) """ if not access_token: raise ValueError("access_token은 필수입니다.") self.access_token = access_token self.base_url = base_url or self.DEFAULT_BASE_URL self.timeout = timeout self.max_retries = max_retries self.container_timeout = container_timeout self.container_poll_interval = container_poll_interval self._client: Optional[httpx.AsyncClient] = None self._account_id: Optional[str] = None self._account_id_lock: asyncio.Lock = asyncio.Lock() async def __aenter__(self) -> "InstagramClient": """비동기 컨텍스트 매니저 진입""" self._client = httpx.AsyncClient( timeout=httpx.Timeout(self.timeout), follow_redirects=True, ) logger.debug("[InstagramClient] HTTP 클라이언트 초기화 완료") return self async def __aexit__(self, exc_type, exc_val, exc_tb) -> None: """비동기 컨텍스트 매니저 종료""" if self._client: await self._client.aclose() self._client = None logger.debug("[InstagramClient] HTTP 클라이언트 종료") def _get_client(self) -> httpx.AsyncClient: """HTTP 클라이언트 반환""" if self._client is None: raise RuntimeError( "InstagramClient는 비동기 컨텍스트 매니저로 사용해야 합니다. " "예: async with InstagramClient(access_token=...) as client:" ) return self._client def _build_url(self, endpoint: str) -> str: """API URL 생성""" return f"{self.base_url}/{endpoint}" async def _request( self, method: str, endpoint: str, params: Optional[dict[str, Any]] = None, data: Optional[dict[str, Any]] = None, ) -> dict[str, Any]: """ 공통 HTTP 요청 처리 - Rate Limit 시 지수 백오프 재시도 - 에러 응답 시 InstagramAPIError 발생 """ client = self._get_client() url = self._build_url(endpoint) params = params or {} params["access_token"] = self.access_token retry_base_delay = 1.0 last_exception: Optional[Exception] = None for attempt in range(self.max_retries + 1): try: logger.debug( f"[API] {method} {endpoint} (attempt {attempt + 1}/{self.max_retries + 1})" ) response = await client.request( method=method, url=url, params=params, data=data, ) # Rate Limit 체크 (429) if response.status_code == 429: retry_after = int(response.headers.get("Retry-After", 60)) if attempt < self.max_retries: wait_time = max(retry_base_delay * (2**attempt), retry_after) logger.warning(f"Rate limit 초과. {wait_time}초 후 재시도...") await asyncio.sleep(wait_time) continue raise Exception( f"[RateLimit] Rate limit 초과 (최대 재시도 횟수 도달) | retry_after={retry_after}s" ) # 서버 에러 재시도 (5xx) if response.status_code >= 500: if attempt < self.max_retries: wait_time = retry_base_delay * (2**attempt) logger.warning(f"서버 에러 {response.status_code}. {wait_time}초 후 재시도...") await asyncio.sleep(wait_time) continue response.raise_for_status() # JSON 파싱 response_data = response.json() # API 에러 체크 (Instagram API는 200 응답에도 error 포함 가능) if "error" in response_data: error_response = ErrorResponse.model_validate(response_data) err = error_response.error logger.error(f"[API Error] code={err.code}, message={err.message}") error_msg = f"[InstagramAPI] {err.message} | code={err.code}" if err.error_subcode: error_msg += f" | subcode={err.error_subcode}" if err.fbtrace_id: error_msg += f" | fbtrace_id={err.fbtrace_id}" raise Exception(error_msg) return response_data except httpx.HTTPError as e: last_exception = e if attempt < self.max_retries: wait_time = retry_base_delay * (2**attempt) logger.warning(f"HTTP 에러: {e}. {wait_time}초 후 재시도...") await asyncio.sleep(wait_time) continue raise raise last_exception or Exception("[InstagramAPI] 최대 재시도 횟수 초과") async def _wait_for_container( self, container_id: str, timeout: Optional[float] = None, ) -> MediaContainer: """컨테이너 상태가 FINISHED가 될 때까지 대기""" timeout = timeout or self.container_timeout start_time = time.monotonic() logger.debug(f"[Container] 대기 시작: {container_id}, timeout={timeout}s") while True: elapsed = time.monotonic() - start_time if elapsed >= timeout: raise Exception( f"[ContainerTimeout] 컨테이너 처리 타임아웃 ({timeout}초 초과): {container_id}" ) response = await self._request( method="GET", endpoint=container_id, params={"fields": "status_code,status"}, ) container = MediaContainer.model_validate(response) logger.debug(f"[Container] status={container.status_code}, elapsed={elapsed:.1f}s") if container.is_finished: logger.info(f"[Container] 완료: {container_id}") return container if container.is_error: raise Exception(f"[ContainerStatus] 컨테이너 처리 실패: {container.status}") await asyncio.sleep(self.container_poll_interval) async def get_account_id(self) -> str: """계정 ID 조회 (접속 테스트용)""" if self._account_id: return self._account_id async with self._account_id_lock: if self._account_id: return self._account_id response = await self._request( method="GET", endpoint="me", params={"fields": "id"}, ) account_id: str = response["id"] self._account_id = account_id logger.debug(f"[Account] ID 조회 완료: {account_id}") return account_id async def get_media(self, media_id: str) -> Media: """ 미디어 상세 조회 Args: media_id: 미디어 ID Returns: Media: 미디어 상세 정보 """ logger.info(f"[get_media] media_id={media_id}") response = await self._request( method="GET", endpoint=media_id, params={ "fields": "id,media_type,media_url,thumbnail_url,caption,timestamp,permalink,like_count,comments_count", }, ) result = Media.model_validate(response) logger.info(f"[get_media] 완료: type={result.media_type}, permalink={result.permalink}") return result async def publish_video( self, video_url: str, caption: Optional[str] = None, share_to_feed: bool = True, ) -> Media: """ 비디오/릴스 게시 Args: video_url: 공개 접근 가능한 비디오 URL (MP4 권장) caption: 게시물 캡션 share_to_feed: 피드에 공유 여부 Returns: Media: 게시된 미디어 정보 """ logger.info(f"[publish_video] 시작: {video_url[:50]}...") account_id = await self.get_account_id() # Step 1: Container 생성 container_params: dict[str, Any] = { "media_type": "REELS", "video_url": video_url, "share_to_feed": str(share_to_feed).lower(), } if caption: container_params["caption"] = caption container_response = await self._request( method="POST", endpoint=f"{account_id}/media", params=container_params, ) container_id = container_response["id"] logger.debug(f"[publish_video] Container 생성: {container_id}") # Step 2: Container 상태 대기 (비디오는 더 오래 걸림) await self._wait_for_container(container_id, timeout=self.container_timeout * 2) # Step 3: 게시 publish_response = await self._request( method="POST", endpoint=f"{account_id}/media_publish", params={"creation_id": container_id}, ) media_id = publish_response["id"] result = await self.get_media(media_id) logger.info(f"[publish_video] 완료: {result.permalink}") return result