399 lines
14 KiB
Python
399 lines
14 KiB
Python
"""
|
|
Instagram Graph API Client
|
|
|
|
Instagram Graph API를 사용한 비디오/릴스 게시를 위한 비동기 클라이언트입니다.
|
|
|
|
Example:
|
|
```python
|
|
async with InstagramClient(access_token="YOUR_TOKEN") as client:
|
|
media = await client.publish_video(
|
|
video_url="https://example.com/video.mp4",
|
|
caption="Hello Instagram!"
|
|
)
|
|
print(f"게시 완료: {media.permalink}")
|
|
```
|
|
"""
|
|
|
|
import asyncio
|
|
import logging
|
|
import re
|
|
import time
|
|
from enum import Enum
|
|
from typing import Any, Optional
|
|
|
|
import httpx
|
|
|
|
from .sns_schema import ErrorResponse, Media, MediaContainer
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
# ============================================================
|
|
# Error State & Parser
|
|
# ============================================================
|
|
|
|
|
|
class ErrorState(str, Enum):
|
|
"""Instagram API 에러 상태"""
|
|
|
|
RATE_LIMIT = "rate_limit"
|
|
AUTH_ERROR = "auth_error"
|
|
CONTAINER_TIMEOUT = "container_timeout"
|
|
CONTAINER_ERROR = "container_error"
|
|
API_ERROR = "api_error"
|
|
UNKNOWN = "unknown"
|
|
|
|
|
|
def parse_instagram_error(e: Exception) -> tuple[ErrorState, str, dict]:
|
|
"""
|
|
Instagram 예외를 파싱하여 상태, 메시지, 추가 정보를 반환
|
|
|
|
Args:
|
|
e: 발생한 예외
|
|
|
|
Returns:
|
|
tuple: (error_state, message, extra_info)
|
|
|
|
Example:
|
|
>>> error_state, message, extra_info = parse_instagram_error(e)
|
|
>>> if error_state == ErrorState.RATE_LIMIT:
|
|
... retry_after = extra_info.get("retry_after", 60)
|
|
"""
|
|
error_str = str(e)
|
|
extra_info = {}
|
|
|
|
# Rate Limit 에러
|
|
if "[RateLimit]" in error_str:
|
|
match = re.search(r"retry_after=(\d+)s", error_str)
|
|
if match:
|
|
extra_info["retry_after"] = int(match.group(1))
|
|
return ErrorState.RATE_LIMIT, "API 호출 제한 초과", extra_info
|
|
|
|
# 인증 에러 (code=190)
|
|
if "code=190" in error_str:
|
|
return ErrorState.AUTH_ERROR, "인증 실패 (토큰 만료 또는 무효)", extra_info
|
|
|
|
# 컨테이너 타임아웃
|
|
if "[ContainerTimeout]" in error_str:
|
|
match = re.search(r"\((\d+)초 초과\)", error_str)
|
|
if match:
|
|
extra_info["timeout"] = int(match.group(1))
|
|
return ErrorState.CONTAINER_TIMEOUT, "미디어 처리 시간 초과", extra_info
|
|
|
|
# 컨테이너 상태 에러
|
|
if "[ContainerStatus]" in error_str:
|
|
match = re.search(r"처리 실패: (\w+)", error_str)
|
|
if match:
|
|
extra_info["status"] = match.group(1)
|
|
return ErrorState.CONTAINER_ERROR, "미디어 컨테이너 처리 실패", extra_info
|
|
|
|
# Instagram API 에러
|
|
if "[InstagramAPI]" in error_str:
|
|
match = re.search(r"code=(\d+)", error_str)
|
|
if match:
|
|
extra_info["code"] = int(match.group(1))
|
|
return ErrorState.API_ERROR, "Instagram API 오류", extra_info
|
|
|
|
return ErrorState.UNKNOWN, str(e), extra_info
|
|
|
|
|
|
# ============================================================
|
|
# Instagram Client
|
|
# ============================================================
|
|
|
|
|
|
class InstagramClient:
|
|
"""
|
|
Instagram Graph API 비동기 클라이언트 (비디오 업로드 전용)
|
|
|
|
Example:
|
|
```python
|
|
async with InstagramClient(access_token="USER_TOKEN") as client:
|
|
media = await client.publish_video(
|
|
video_url="https://example.com/video.mp4",
|
|
caption="My video!"
|
|
)
|
|
print(f"게시됨: {media.permalink}")
|
|
```
|
|
"""
|
|
|
|
DEFAULT_BASE_URL = "https://graph.instagram.com/v21.0"
|
|
|
|
def __init__(
|
|
self,
|
|
access_token: str,
|
|
*,
|
|
base_url: Optional[str] = None,
|
|
timeout: float = 30.0,
|
|
max_retries: int = 3,
|
|
container_timeout: float = 300.0,
|
|
container_poll_interval: float = 5.0,
|
|
):
|
|
"""
|
|
클라이언트 초기화
|
|
|
|
Args:
|
|
access_token: Instagram 액세스 토큰 (필수)
|
|
base_url: API 기본 URL (기본값: https://graph.instagram.com/v21.0)
|
|
timeout: HTTP 요청 타임아웃 (초)
|
|
max_retries: 최대 재시도 횟수
|
|
container_timeout: 컨테이너 처리 대기 타임아웃 (초)
|
|
container_poll_interval: 컨테이너 상태 확인 간격 (초)
|
|
"""
|
|
if not access_token:
|
|
raise ValueError("access_token은 필수입니다.")
|
|
|
|
self.access_token = access_token
|
|
self.base_url = base_url or self.DEFAULT_BASE_URL
|
|
self.timeout = timeout
|
|
self.max_retries = max_retries
|
|
self.container_timeout = container_timeout
|
|
self.container_poll_interval = container_poll_interval
|
|
|
|
self._client: Optional[httpx.AsyncClient] = None
|
|
self._account_id: Optional[str] = None
|
|
self._account_id_lock: asyncio.Lock = asyncio.Lock()
|
|
|
|
async def __aenter__(self) -> "InstagramClient":
|
|
"""비동기 컨텍스트 매니저 진입"""
|
|
self._client = httpx.AsyncClient(
|
|
timeout=httpx.Timeout(self.timeout),
|
|
follow_redirects=True,
|
|
)
|
|
logger.debug("[InstagramClient] HTTP 클라이언트 초기화 완료")
|
|
return self
|
|
|
|
async def __aexit__(self, exc_type, exc_val, exc_tb) -> None:
|
|
"""비동기 컨텍스트 매니저 종료"""
|
|
if self._client:
|
|
await self._client.aclose()
|
|
self._client = None
|
|
logger.debug("[InstagramClient] HTTP 클라이언트 종료")
|
|
|
|
def _get_client(self) -> httpx.AsyncClient:
|
|
"""HTTP 클라이언트 반환"""
|
|
if self._client is None:
|
|
raise RuntimeError(
|
|
"InstagramClient는 비동기 컨텍스트 매니저로 사용해야 합니다. "
|
|
"예: async with InstagramClient(access_token=...) as client:"
|
|
)
|
|
return self._client
|
|
|
|
def _build_url(self, endpoint: str) -> str:
|
|
"""API URL 생성"""
|
|
return f"{self.base_url}/{endpoint}"
|
|
|
|
async def _request(
|
|
self,
|
|
method: str,
|
|
endpoint: str,
|
|
params: Optional[dict[str, Any]] = None,
|
|
data: Optional[dict[str, Any]] = None,
|
|
) -> dict[str, Any]:
|
|
"""
|
|
공통 HTTP 요청 처리
|
|
|
|
- Rate Limit 시 지수 백오프 재시도
|
|
- 에러 응답 시 InstagramAPIError 발생
|
|
"""
|
|
client = self._get_client()
|
|
url = self._build_url(endpoint)
|
|
params = params or {}
|
|
params["access_token"] = self.access_token
|
|
|
|
retry_base_delay = 1.0
|
|
last_exception: Optional[Exception] = None
|
|
|
|
for attempt in range(self.max_retries + 1):
|
|
try:
|
|
logger.debug(
|
|
f"[API] {method} {endpoint} (attempt {attempt + 1}/{self.max_retries + 1})"
|
|
)
|
|
|
|
response = await client.request(
|
|
method=method,
|
|
url=url,
|
|
params=params,
|
|
data=data,
|
|
)
|
|
|
|
# Rate Limit 체크 (429)
|
|
if response.status_code == 429:
|
|
retry_after = int(response.headers.get("Retry-After", 60))
|
|
if attempt < self.max_retries:
|
|
wait_time = max(retry_base_delay * (2**attempt), retry_after)
|
|
logger.warning(f"Rate limit 초과. {wait_time}초 후 재시도...")
|
|
await asyncio.sleep(wait_time)
|
|
continue
|
|
raise Exception(
|
|
f"[RateLimit] Rate limit 초과 (최대 재시도 횟수 도달) | retry_after={retry_after}s"
|
|
)
|
|
|
|
# 서버 에러 재시도 (5xx)
|
|
if response.status_code >= 500:
|
|
if attempt < self.max_retries:
|
|
wait_time = retry_base_delay * (2**attempt)
|
|
logger.warning(f"서버 에러 {response.status_code}. {wait_time}초 후 재시도...")
|
|
await asyncio.sleep(wait_time)
|
|
continue
|
|
response.raise_for_status()
|
|
|
|
# JSON 파싱
|
|
response_data = response.json()
|
|
|
|
# API 에러 체크 (Instagram API는 200 응답에도 error 포함 가능)
|
|
if "error" in response_data:
|
|
error_response = ErrorResponse.model_validate(response_data)
|
|
err = error_response.error
|
|
logger.error(f"[API Error] code={err.code}, message={err.message}")
|
|
error_msg = f"[InstagramAPI] {err.message} | code={err.code}"
|
|
if err.error_subcode:
|
|
error_msg += f" | subcode={err.error_subcode}"
|
|
if err.fbtrace_id:
|
|
error_msg += f" | fbtrace_id={err.fbtrace_id}"
|
|
raise Exception(error_msg)
|
|
|
|
return response_data
|
|
|
|
except httpx.HTTPError as e:
|
|
last_exception = e
|
|
if attempt < self.max_retries:
|
|
wait_time = retry_base_delay * (2**attempt)
|
|
logger.warning(f"HTTP 에러: {e}. {wait_time}초 후 재시도...")
|
|
await asyncio.sleep(wait_time)
|
|
continue
|
|
raise
|
|
|
|
raise last_exception or Exception("[InstagramAPI] 최대 재시도 횟수 초과")
|
|
|
|
async def _wait_for_container(
|
|
self,
|
|
container_id: str,
|
|
timeout: Optional[float] = None,
|
|
) -> MediaContainer:
|
|
"""컨테이너 상태가 FINISHED가 될 때까지 대기"""
|
|
timeout = timeout or self.container_timeout
|
|
start_time = time.monotonic()
|
|
|
|
logger.debug(f"[Container] 대기 시작: {container_id}, timeout={timeout}s")
|
|
|
|
while True:
|
|
elapsed = time.monotonic() - start_time
|
|
if elapsed >= timeout:
|
|
raise Exception(
|
|
f"[ContainerTimeout] 컨테이너 처리 타임아웃 ({timeout}초 초과): {container_id}"
|
|
)
|
|
|
|
response = await self._request(
|
|
method="GET",
|
|
endpoint=container_id,
|
|
params={"fields": "status_code,status"},
|
|
)
|
|
|
|
container = MediaContainer.model_validate(response)
|
|
logger.debug(f"[Container] status={container.status_code}, elapsed={elapsed:.1f}s")
|
|
|
|
if container.is_finished:
|
|
logger.info(f"[Container] 완료: {container_id}")
|
|
return container
|
|
|
|
if container.is_error:
|
|
raise Exception(f"[ContainerStatus] 컨테이너 처리 실패: {container.status}")
|
|
|
|
await asyncio.sleep(self.container_poll_interval)
|
|
|
|
async def get_account_id(self) -> str:
|
|
"""계정 ID 조회 (접속 테스트용)"""
|
|
if self._account_id:
|
|
return self._account_id
|
|
|
|
async with self._account_id_lock:
|
|
if self._account_id:
|
|
return self._account_id
|
|
|
|
response = await self._request(
|
|
method="GET",
|
|
endpoint="me",
|
|
params={"fields": "id"},
|
|
)
|
|
account_id: str = response["id"]
|
|
self._account_id = account_id
|
|
logger.debug(f"[Account] ID 조회 완료: {account_id}")
|
|
return account_id
|
|
|
|
async def get_media(self, media_id: str) -> Media:
|
|
"""
|
|
미디어 상세 조회
|
|
|
|
Args:
|
|
media_id: 미디어 ID
|
|
|
|
Returns:
|
|
Media: 미디어 상세 정보
|
|
"""
|
|
logger.info(f"[get_media] media_id={media_id}")
|
|
|
|
response = await self._request(
|
|
method="GET",
|
|
endpoint=media_id,
|
|
params={
|
|
"fields": "id,media_type,media_url,thumbnail_url,caption,timestamp,permalink,like_count,comments_count",
|
|
},
|
|
)
|
|
|
|
result = Media.model_validate(response)
|
|
logger.info(f"[get_media] 완료: type={result.media_type}, permalink={result.permalink}")
|
|
return result
|
|
|
|
async def publish_video(
|
|
self,
|
|
video_url: str,
|
|
caption: Optional[str] = None,
|
|
share_to_feed: bool = True,
|
|
) -> Media:
|
|
"""
|
|
비디오/릴스 게시
|
|
|
|
Args:
|
|
video_url: 공개 접근 가능한 비디오 URL (MP4 권장)
|
|
caption: 게시물 캡션
|
|
share_to_feed: 피드에 공유 여부
|
|
|
|
Returns:
|
|
Media: 게시된 미디어 정보
|
|
"""
|
|
logger.info(f"[publish_video] 시작: {video_url[:50]}...")
|
|
account_id = await self.get_account_id()
|
|
|
|
# Step 1: Container 생성
|
|
container_params: dict[str, Any] = {
|
|
"media_type": "REELS",
|
|
"video_url": video_url,
|
|
"share_to_feed": str(share_to_feed).lower(),
|
|
}
|
|
if caption:
|
|
container_params["caption"] = caption
|
|
|
|
container_response = await self._request(
|
|
method="POST",
|
|
endpoint=f"{account_id}/media",
|
|
params=container_params,
|
|
)
|
|
container_id = container_response["id"]
|
|
logger.debug(f"[publish_video] Container 생성: {container_id}")
|
|
|
|
# Step 2: Container 상태 대기 (비디오는 더 오래 걸림)
|
|
await self._wait_for_container(container_id, timeout=self.container_timeout * 2)
|
|
|
|
# Step 3: 게시
|
|
publish_response = await self._request(
|
|
method="POST",
|
|
endpoint=f"{account_id}/media_publish",
|
|
params={"creation_id": container_id},
|
|
)
|
|
media_id = publish_response["id"]
|
|
|
|
result = await self.get_media(media_id)
|
|
logger.info(f"[publish_video] 완료: {result.permalink}")
|
|
return result
|