""" Instagram Graph API 클라이언트 모듈 Instagram Graph API와의 통신을 담당하는 비동기 클라이언트입니다. """ import asyncio import logging import time from typing import Any, Optional import httpx from .config import InstagramSettings, settings from .exceptions import ( AuthenticationError, ContainerStatusError, ContainerTimeoutError, InstagramAPIError, MediaPublishError, RateLimitError, create_exception_from_error, ) from .models import ( Account, Comment, CommentList, ErrorResponse, InsightResponse, Media, MediaContainer, MediaList, TokenDebugResponse, TokenInfo, ) # 로거 설정 logger = logging.getLogger(__name__) class InstagramGraphClient: """ Instagram Graph API 비동기 클라이언트 Instagram Graph API와의 모든 통신을 처리합니다. 비동기 컨텍스트 매니저로 사용해야 합니다. Example: ```python async with InstagramGraphClient(access_token="...") as client: account = await client.get_account() print(account.username) ``` Attributes: access_token: Instagram 액세스 토큰 app_id: Facebook 앱 ID (토큰 검증 시 필요) app_secret: Facebook 앱 시크릿 (토큰 교환 시 필요) settings: Instagram API 설정 """ def __init__( self, access_token: Optional[str] = None, app_id: Optional[str] = None, app_secret: Optional[str] = None, custom_settings: Optional[InstagramSettings] = None, ): """ 클라이언트 초기화 Args: access_token: Instagram 액세스 토큰 (없으면 설정에서 로드) app_id: Facebook 앱 ID (없으면 설정에서 로드) app_secret: Facebook 앱 시크릿 (없으면 설정에서 로드) custom_settings: 커스텀 설정 (테스트용) """ self.settings = custom_settings or settings self.access_token = access_token or self.settings.access_token self.app_id = app_id or self.settings.app_id self.app_secret = app_secret or self.settings.app_secret self._client: Optional[httpx.AsyncClient] = None self._account_id: Optional[str] = None if not self.access_token: raise ValueError( "access_token이 필요합니다. " "파라미터로 전달하거나 INSTAGRAM_ACCESS_TOKEN 환경변수를 설정하세요." ) async def __aenter__(self) -> "InstagramGraphClient": """비동기 컨텍스트 매니저 진입""" self._client = httpx.AsyncClient( timeout=httpx.Timeout(self.settings.timeout), follow_redirects=True, ) logger.debug("[InstagramGraphClient] HTTP 클라이언트 초기화 완료") return self async def __aexit__(self, exc_type, exc_val, exc_tb) -> None: """비동기 컨텍스트 매니저 종료""" if self._client: await self._client.aclose() self._client = None logger.debug("[InstagramGraphClient] HTTP 클라이언트 종료") # ========================================================================== # 내부 메서드 # ========================================================================== def _get_client(self) -> httpx.AsyncClient: """HTTP 클라이언트 반환""" if self._client is None: raise RuntimeError( "InstagramGraphClient는 비동기 컨텍스트 매니저로 사용해야 합니다. " "예: async with InstagramGraphClient(...) as client:" ) return self._client def _mask_sensitive_params(self, params: dict[str, Any]) -> dict[str, Any]: """ 로깅용 파라미터에서 민감 정보 마스킹 Args: params: 원본 파라미터 Returns: 마스킹된 파라미터 복사본 """ SENSITIVE_KEYS = {"access_token", "client_secret", "input_token"} masked = params.copy() for key in SENSITIVE_KEYS: if key in masked and masked[key]: value = str(masked[key]) if len(value) > 14: masked[key] = f"{value[:10]}...{value[-4:]}" else: masked[key] = "***" return masked async def _request( self, method: str, url: str, params: Optional[dict[str, Any]] = None, data: Optional[dict[str, Any]] = None, add_access_token: bool = True, ) -> dict[str, Any]: """ 공통 HTTP 요청 처리 - Rate Limit 시 지수 백오프 재시도 - 에러 응답 → 커스텀 예외 변환 - 요청/응답 로깅 Args: method: HTTP 메서드 (GET, POST 등) url: 요청 URL params: 쿼리 파라미터 data: POST 데이터 add_access_token: 액세스 토큰 자동 추가 여부 Returns: API 응답 JSON Raises: InstagramAPIError: API 에러 발생 시 """ client = self._get_client() params = params or {} # 액세스 토큰 추가 if add_access_token and "access_token" not in params: params["access_token"] = self.access_token # 재시도 로직 last_exception: Optional[Exception] = None for attempt in range(self.settings.max_retries + 1): try: logger.debug( f"[1/3] API 요청 시작: {method} {url} " f"(attempt {attempt + 1}/{self.settings.max_retries + 1})" ) response = await client.request( method=method, url=url, params=params, data=data, ) logger.debug( f"[2/3] API 응답 수신: status={response.status_code}" ) # Rate Limit 체크 if response.status_code == 429: retry_after = int(response.headers.get("Retry-After", 60)) if attempt < self.settings.max_retries: wait_time = min( self.settings.retry_base_delay * (2**attempt), self.settings.retry_max_delay, ) wait_time = max(wait_time, retry_after) logger.warning( f"Rate limit 초과. {wait_time}초 후 재시도..." ) await asyncio.sleep(wait_time) continue raise RateLimitError( message="Rate limit 초과", retry_after=retry_after, ) # 서버 에러 재시도 if response.status_code >= 500: if attempt < self.settings.max_retries: wait_time = self.settings.retry_base_delay * (2**attempt) logger.warning( f"서버 에러 {response.status_code}. {wait_time}초 후 재시도..." ) await asyncio.sleep(wait_time) continue # JSON 파싱 (안전 처리) try: response_data = response.json() except ValueError as e: logger.error( f"[_request] JSON 파싱 실패: status={response.status_code}, " f"body={response.text[:200]}" ) raise InstagramAPIError( f"API 응답 파싱 실패: {e}", code=response.status_code, ) from e # API 에러 체크 if "error" in response_data: error_response = ErrorResponse.model_validate(response_data) error = error_response.error logger.error( f"[3/3] API 에러: code={error.code}, message={error.message}" ) raise create_exception_from_error( message=error.message, code=error.code, subcode=error.error_subcode, fbtrace_id=error.fbtrace_id, ) logger.debug(f"[3/3] API 요청 완료") return response_data except httpx.HTTPError as e: last_exception = e if attempt < self.settings.max_retries: wait_time = self.settings.retry_base_delay * (2**attempt) logger.warning( f"HTTP 에러: {e}. {wait_time}초 후 재시도..." ) await asyncio.sleep(wait_time) continue raise InstagramAPIError(f"HTTP 요청 실패: {e}") from e # 모든 재시도 실패 raise InstagramAPIError( f"최대 재시도 횟수 초과: {last_exception}" ) async def _wait_for_container( self, container_id: str, timeout: Optional[float] = None, poll_interval: Optional[float] = None, ) -> MediaContainer: """ 컨테이너 상태가 FINISHED가 될 때까지 대기 Args: container_id: 컨테이너 ID timeout: 타임아웃 (초) poll_interval: 폴링 간격 (초) Returns: 완료된 MediaContainer Raises: ContainerStatusError: 컨테이너가 ERROR 상태가 된 경우 ContainerTimeoutError: 타임아웃 초과 """ timeout = timeout or self.settings.container_timeout poll_interval = poll_interval or self.settings.container_poll_interval start_time = time.monotonic() # 정확한 시간 측정 logger.debug( f"[컨테이너 대기] container_id={container_id}, " f"timeout={timeout}s, poll_interval={poll_interval}s" ) while True: elapsed = time.monotonic() - start_time if elapsed >= timeout: break url = self.settings.get_instagram_url(container_id) response = await self._request( method="GET", url=url, params={"fields": "status_code,status"}, ) container = MediaContainer.model_validate(response) logger.debug( f"[컨테이너 상태] status_code={container.status_code}, " f"elapsed={elapsed:.1f}s" ) if container.is_finished: logger.info(f"[컨테이너 완료] container_id={container_id}") return container if container.is_error: raise ContainerStatusError( f"컨테이너 처리 실패: {container.status}" ) await asyncio.sleep(poll_interval) raise ContainerTimeoutError( f"컨테이너 처리 타임아웃 ({timeout}초 초과): {container_id}" ) # ========================================================================== # 인증 API # ========================================================================== async def debug_token(self) -> TokenDebugResponse: """ 현재 토큰 정보 조회 (유효성 검증) 토큰의 유효성, 만료 시간, 권한 등을 확인합니다. Returns: TokenDebugResponse: 토큰 디버그 정보 Raises: AuthenticationError: 토큰이 유효하지 않은 경우 ValueError: app_id 또는 app_secret이 설정되지 않은 경우 Example: ```python async with InstagramGraphClient(...) as client: token_info = await client.debug_token() if token_info.data.is_valid: print(f"토큰 유효, 만료: {token_info.data.expires_at_datetime}") ``` """ if not self.app_id or not self.app_secret: raise ValueError( "토큰 검증에는 app_id와 app_secret이 필요합니다." ) logger.info("[debug_token] 토큰 검증 시작") url = self.settings.get_facebook_url("debug_token") response = await self._request( method="GET", url=url, params={ "input_token": self.access_token, "access_token": self.settings.app_access_token, }, add_access_token=False, ) result = TokenDebugResponse.model_validate(response) logger.info( f"[debug_token] 완료: is_valid={result.data.is_valid}, " f"expires_at={result.data.expires_at_datetime}" ) return result async def exchange_long_lived_token(self) -> TokenInfo: """ 단기 토큰을 장기 토큰(60일)으로 교환 Returns: TokenInfo: 새로운 장기 토큰 정보 Raises: AuthenticationError: 토큰 교환 실패 ValueError: app_secret이 설정되지 않은 경우 Example: ```python async with InstagramGraphClient(access_token="SHORT_LIVED_TOKEN") as client: new_token = await client.exchange_long_lived_token() print(f"새 토큰: {new_token.access_token}") print(f"만료: {new_token.expires_in}초 후") ``` """ if not self.app_secret: raise ValueError("토큰 교환에는 app_secret이 필요합니다.") logger.info("[exchange_long_lived_token] 토큰 교환 시작") url = self.settings.get_instagram_url("access_token") response = await self._request( method="GET", url=url, params={ "grant_type": "ig_exchange_token", "client_secret": self.app_secret, "access_token": self.access_token, }, add_access_token=False, ) result = TokenInfo.model_validate(response) logger.info( f"[exchange_long_lived_token] 완료: expires_in={result.expires_in}초" ) return result async def refresh_token(self) -> TokenInfo: """ 장기 토큰 갱신 만료 24시간 전부터 갱신 가능합니다. Returns: TokenInfo: 갱신된 토큰 정보 Raises: AuthenticationError: 토큰 갱신 실패 Example: ```python async with InstagramGraphClient(access_token="LONG_LIVED_TOKEN") as client: refreshed = await client.refresh_token() print(f"갱신된 토큰: {refreshed.access_token}") ``` """ logger.info("[refresh_token] 토큰 갱신 시작") url = self.settings.get_instagram_url("refresh_access_token") response = await self._request( method="GET", url=url, params={ "grant_type": "ig_refresh_token", "access_token": self.access_token, }, add_access_token=False, ) result = TokenInfo.model_validate(response) logger.info(f"[refresh_token] 완료: expires_in={result.expires_in}초") return result # ========================================================================== # 계정 API # ========================================================================== async def get_account(self) -> Account: """ 현재 계정 정보 조회 Returns: Account: 계정 정보 Example: ```python async with InstagramGraphClient(...) as client: account = await client.get_account() print(f"@{account.username}: {account.followers_count} followers") ``` """ logger.info("[get_account] 계정 정보 조회 시작") url = self.settings.get_instagram_url("me") response = await self._request( method="GET", url=url, params={ "fields": ( "id,username,name,account_type,profile_picture_url," "followers_count,follows_count,media_count,biography,website" ), }, ) result = Account.model_validate(response) self._account_id = result.id logger.info( f"[get_account] 완료: @{result.username}, " f"followers={result.followers_count}" ) return result async def get_account_id(self) -> str: """ 현재 계정 ID만 조회 Returns: str: 계정 ID Note: 캐시된 ID가 있으면 API 호출 없이 반환합니다. """ if self._account_id: return self._account_id logger.info("[get_account_id] 계정 ID 조회 시작") url = self.settings.get_instagram_url("me") response = await self._request( method="GET", url=url, params={"fields": "id"}, ) self._account_id = response["id"] logger.info(f"[get_account_id] 완료: {self._account_id}") return self._account_id # ========================================================================== # 미디어 API # ========================================================================== async def get_media_list( self, limit: int = 25, after: Optional[str] = None, ) -> MediaList: """ 미디어 목록 조회 (페이지네이션 지원) Args: limit: 조회할 미디어 수 (최대 100) after: 페이지네이션 커서 Returns: MediaList: 미디어 목록 Example: ```python async with InstagramGraphClient(...) as client: media_list = await client.get_media_list(limit=10) for media in media_list.data: print(f"{media.media_type}: {media.caption[:50]}") ``` """ logger.info(f"[get_media_list] 미디어 목록 조회: limit={limit}") account_id = await self.get_account_id() url = self.settings.get_instagram_url(f"{account_id}/media") params: dict[str, Any] = { "fields": ( "id,media_type,media_url,thumbnail_url,caption," "timestamp,permalink,like_count,comments_count" ), "limit": min(limit, 100), } if after: params["after"] = after response = await self._request(method="GET", url=url, params=params) result = MediaList.model_validate(response) logger.info(f"[get_media_list] 완료: {len(result.data)}개 조회") return result async def get_media(self, media_id: str) -> Media: """ 미디어 상세 조회 Args: media_id: 미디어 ID Returns: Media: 미디어 상세 정보 Example: ```python async with InstagramGraphClient(...) as client: media = await client.get_media("17880000000000000") print(f"좋아요: {media.like_count}, 댓글: {media.comments_count}") ``` """ logger.info(f"[get_media] 미디어 상세 조회: media_id={media_id}") url = self.settings.get_instagram_url(media_id) response = await self._request( method="GET", url=url, params={ "fields": ( "id,media_type,media_url,thumbnail_url,caption," "timestamp,permalink,like_count,comments_count," "children{id,media_type,media_url}" ), }, ) result = Media.model_validate(response) logger.info( f"[get_media] 완료: type={result.media_type}, " f"likes={result.like_count}" ) return result async def publish_image( self, image_url: str, caption: Optional[str] = None, ) -> Media: """ 이미지 게시 Container 생성 → 상태 확인 → 게시의 3단계 프로세스를 수행합니다. Args: image_url: 공개 접근 가능한 이미지 URL caption: 게시물 캡션 Returns: Media: 게시된 미디어 정보 Raises: MediaPublishError: 게시 실패 ContainerTimeoutError: 컨테이너 처리 타임아웃 Example: ```python async with InstagramGraphClient(...) as client: media = await client.publish_image( image_url="https://example.com/image.jpg", caption="My awesome photo! #photography" ) print(f"게시 완료: {media.permalink}") ``` """ logger.info(f"[publish_image] 이미지 게시 시작: {image_url[:50]}...") account_id = await self.get_account_id() # Step 1: Container 생성 logger.debug("[publish_image] Step 1: Container 생성") container_url = self.settings.get_instagram_url(f"{account_id}/media") container_params: dict[str, Any] = {"image_url": image_url} if caption: container_params["caption"] = caption container_response = await self._request( method="POST", url=container_url, params=container_params, ) container_id = container_response["id"] logger.debug(f"[publish_image] Container 생성 완료: {container_id}") # Step 2: Container 상태 대기 logger.debug("[publish_image] Step 2: Container 상태 대기") await self._wait_for_container(container_id) # Step 3: 게시 logger.debug("[publish_image] Step 3: 게시") publish_url = self.settings.get_instagram_url(f"{account_id}/media_publish") publish_response = await self._request( method="POST", url=publish_url, params={"creation_id": container_id}, ) media_id = publish_response["id"] # 게시된 미디어 정보 조회 result = await self.get_media(media_id) logger.info(f"[publish_image] 게시 완료: {result.permalink}") return result async def publish_video( self, video_url: str, caption: Optional[str] = None, share_to_feed: bool = True, ) -> Media: """ 비디오/릴스 게시 Args: video_url: 공개 접근 가능한 비디오 URL caption: 게시물 캡션 share_to_feed: 피드에 공유 여부 Returns: Media: 게시된 미디어 정보 Raises: MediaPublishError: 게시 실패 ContainerTimeoutError: 컨테이너 처리 타임아웃 Example: ```python async with InstagramGraphClient(...) as client: media = await client.publish_video( video_url="https://example.com/video.mp4", caption="Check out this video! #video" ) print(f"게시 완료: {media.permalink}") ``` """ logger.info(f"[publish_video] 비디오 게시 시작: {video_url[:50]}...") account_id = await self.get_account_id() # Step 1: Container 생성 logger.debug("[publish_video] Step 1: Container 생성") container_url = self.settings.get_instagram_url(f"{account_id}/media") container_params: dict[str, Any] = { "media_type": "REELS", "video_url": video_url, "share_to_feed": str(share_to_feed).lower(), } if caption: container_params["caption"] = caption container_response = await self._request( method="POST", url=container_url, params=container_params, ) container_id = container_response["id"] logger.debug(f"[publish_video] Container 생성 완료: {container_id}") # Step 2: Container 상태 대기 (비디오는 더 오래 걸릴 수 있음) logger.debug("[publish_video] Step 2: Container 상태 대기") await self._wait_for_container( container_id, timeout=self.settings.container_timeout * 2, # 비디오는 2배 시간 ) # Step 3: 게시 logger.debug("[publish_video] Step 3: 게시") publish_url = self.settings.get_instagram_url(f"{account_id}/media_publish") publish_response = await self._request( method="POST", url=publish_url, params={"creation_id": container_id}, ) media_id = publish_response["id"] # 게시된 미디어 정보 조회 result = await self.get_media(media_id) logger.info(f"[publish_video] 게시 완료: {result.permalink}") return result async def publish_carousel( self, media_urls: list[str], caption: Optional[str] = None, ) -> Media: """ 캐러셀(멀티 이미지) 게시 Args: media_urls: 이미지 URL 목록 (2-10개) caption: 게시물 캡션 Returns: Media: 게시된 미디어 정보 Raises: ValueError: 이미지 수가 2-10개가 아닌 경우 MediaPublishError: 게시 실패 Example: ```python async with InstagramGraphClient(...) as client: media = await client.publish_carousel( media_urls=[ "https://example.com/image1.jpg", "https://example.com/image2.jpg", ], caption="My carousel post!" ) ``` """ if len(media_urls) < 2 or len(media_urls) > 10: raise ValueError("캐러셀은 2-10개의 이미지가 필요합니다.") logger.info( f"[publish_carousel] 캐러셀 게시 시작: {len(media_urls)}개 이미지" ) account_id = await self.get_account_id() # Step 1: 각 이미지의 Container 병렬 생성 logger.debug( f"[publish_carousel] Step 1: {len(media_urls)}개 Container 병렬 생성" ) async def create_item_container(url: str, index: int) -> str: """개별 이미지 컨테이너 생성""" container_url = self.settings.get_instagram_url(f"{account_id}/media") response = await self._request( method="POST", url=container_url, params={ "image_url": url, "is_carousel_item": "true", }, ) logger.debug(f"[publish_carousel] 이미지 {index + 1} Container 생성 완료") return response["id"] # 병렬로 모든 컨테이너 생성 children_ids = await asyncio.gather( *[create_item_container(url, i) for i, url in enumerate(media_urls)] ) logger.debug(f"[publish_carousel] 모든 Container 생성 완료: {len(children_ids)}개") # Step 2: 캐러셀 Container 생성 logger.debug("[publish_carousel] Step 2: 캐러셀 Container 생성") carousel_url = self.settings.get_instagram_url(f"{account_id}/media") carousel_params: dict[str, Any] = { "media_type": "CAROUSEL", "children": ",".join(children_ids), } if caption: carousel_params["caption"] = caption carousel_response = await self._request( method="POST", url=carousel_url, params=carousel_params, ) carousel_id = carousel_response["id"] # Step 3: Container 상태 대기 logger.debug("[publish_carousel] Step 3: Container 상태 대기") await self._wait_for_container(carousel_id) # Step 4: 게시 logger.debug("[publish_carousel] Step 4: 게시") publish_url = self.settings.get_instagram_url(f"{account_id}/media_publish") publish_response = await self._request( method="POST", url=publish_url, params={"creation_id": carousel_id}, ) media_id = publish_response["id"] # 게시된 미디어 정보 조회 result = await self.get_media(media_id) logger.info(f"[publish_carousel] 게시 완료: {result.permalink}") return result # ========================================================================== # 인사이트 API # ========================================================================== async def get_account_insights( self, metrics: list[str], period: str = "day", ) -> InsightResponse: """ 계정 인사이트 조회 Args: metrics: 조회할 메트릭 목록 - impressions: 노출 수 - reach: 도달 수 - profile_views: 프로필 조회 수 - accounts_engaged: 참여 계정 수 - total_interactions: 총 상호작용 수 period: 기간 (day, week, days_28) Returns: InsightResponse: 인사이트 데이터 Example: ```python async with InstagramGraphClient(...) as client: insights = await client.get_account_insights( metrics=["impressions", "reach"], period="day" ) for insight in insights.data: print(f"{insight.name}: {insight.latest_value}") ``` """ logger.info( f"[get_account_insights] 계정 인사이트 조회: " f"metrics={metrics}, period={period}" ) account_id = await self.get_account_id() url = self.settings.get_instagram_url(f"{account_id}/insights") response = await self._request( method="GET", url=url, params={ "metric": ",".join(metrics), "period": period, "metric_type": "total_value", }, ) result = InsightResponse.model_validate(response) logger.info(f"[get_account_insights] 완료: {len(result.data)}개 메트릭") return result async def get_media_insights( self, media_id: str, metrics: Optional[list[str]] = None, ) -> InsightResponse: """ 미디어 인사이트 조회 Args: media_id: 미디어 ID metrics: 조회할 메트릭 (기본: impressions, reach, engagement, saved) Returns: InsightResponse: 인사이트 데이터 Example: ```python async with InstagramGraphClient(...) as client: insights = await client.get_media_insights("17880000000000000") reach = insights.get_metric("reach") print(f"도달: {reach.latest_value}") ``` """ if metrics is None: metrics = ["impressions", "reach", "engagement", "saved"] logger.info( f"[get_media_insights] 미디어 인사이트 조회: " f"media_id={media_id}, metrics={metrics}" ) url = self.settings.get_instagram_url(f"{media_id}/insights") response = await self._request( method="GET", url=url, params={"metric": ",".join(metrics)}, ) result = InsightResponse.model_validate(response) logger.info(f"[get_media_insights] 완료: {len(result.data)}개 메트릭") return result # ========================================================================== # 댓글 API # ========================================================================== async def get_comments( self, media_id: str, limit: int = 50, ) -> CommentList: """ 미디어의 댓글 목록 조회 Args: media_id: 미디어 ID limit: 조회할 댓글 수 (최대 50) Returns: CommentList: 댓글 목록 Example: ```python async with InstagramGraphClient(...) as client: comments = await client.get_comments("17880000000000000") for comment in comments.data: print(f"@{comment.username}: {comment.text}") ``` """ logger.info( f"[get_comments] 댓글 조회: media_id={media_id}, limit={limit}" ) url = self.settings.get_instagram_url(f"{media_id}/comments") response = await self._request( method="GET", url=url, params={ "fields": ( "id,text,username,timestamp,like_count," "replies{id,text,username,timestamp,like_count}" ), "limit": min(limit, 50), }, ) result = CommentList.model_validate(response) logger.info(f"[get_comments] 완료: {len(result.data)}개 댓글") return result async def reply_comment( self, comment_id: str, message: str, ) -> Comment: """ 댓글에 답글 작성 Args: comment_id: 댓글 ID message: 답글 내용 Returns: Comment: 작성된 답글 정보 Example: ```python async with InstagramGraphClient(...) as client: reply = await client.reply_comment( comment_id="17890000000000000", message="Thanks for your comment!" ) print(f"답글 작성 완료: {reply.id}") ``` """ logger.info( f"[reply_comment] 답글 작성: comment_id={comment_id}" ) url = self.settings.get_instagram_url(f"{comment_id}/replies") response = await self._request( method="POST", url=url, params={"message": message}, ) # 답글 ID만 반환되므로, 추가 정보 조회 reply_id = response["id"] reply_url = self.settings.get_instagram_url(reply_id) reply_response = await self._request( method="GET", url=reply_url, params={"fields": "id,text,username,timestamp,like_count"}, ) result = Comment.model_validate(reply_response) logger.info(f"[reply_comment] 완료: reply_id={result.id}") return result