chore: TIKTOK_ACTOR 상수 + 수집기 옵저버빌리티 정리

apify.py: 라이브 actor id 들을 모두 모듈 상단 상수로 통일 (TIKTOK_ACTOR 추가).
fetch_tiktok_profile 이 raw 문자열 'clockworks~tiktok-scraper' 쓰던 것 정리.
이제 IG_PROFILE / IG_HIGHLIGHTS / FB_PAGES / FB_POSTS / TIKTOK 5개 상수.

수집기 옵저버빌리티 정리:
- collect.py: 채널별 done 로그에 붙이던 _summarize (followers/posts 등 데이터
  shape inspection) 제거 — production 로그가 아니라 진단용에 가까워 test_raw.py
  의 summarize() 로 대신 충분.
- enrichment.py / pipeline.py / collect.py: 저레벨 수집기의 timing instrumentation
  은 정리. orchestrator 레벨(pipeline 의 stage_times, analysis/market 의 LLM
  호출 timing)은 유지.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
channel-brand
Mina Choi 2026-05-29 10:45:23 +09:00
parent fa32109658
commit bed5f0c274
4 changed files with 46 additions and 14 deletions

View File

@ -13,6 +13,9 @@ IG_HIGHLIGHTS_ACTOR = "igview-owner~instagram-highlights-scraper"
FB_PAGES_ACTOR = "apify~facebook-pages-scraper" FB_PAGES_ACTOR = "apify~facebook-pages-scraper"
FB_POSTS_ACTOR = "apify~facebook-posts-scraper" FB_POSTS_ACTOR = "apify~facebook-posts-scraper"
# TikTok
TIKTOK_ACTOR = "clockworks~tiktok-scraper"
def _ig_username(url: str) -> str: def _ig_username(url: str) -> str:
return urlparse(url).path.strip("/").split("/")[0] if "://" in url else url.lstrip("@") return urlparse(url).path.strip("/").split("/")[0] if "://" in url else url.lstrip("@")
@ -65,6 +68,13 @@ class ApifyClient:
return None return None
if isinstance(highlights, Exception): if isinstance(highlights, Exception):
highlights = [] highlights = []
# 프로필상 하이라이트가 있다고 하면(highlight_reel_count>0) 빈 결과일 때 최대 2회 재시도.
if not highlights and (profile.get("highlight_reel_count", 0) or profile.get("highlightReelCount", 0)) > 0:
for _ in range(2):
retry = await self.fetch_instagram_highlights(username)
if retry:
highlights = retry
break
return { return {
"username": profile["username"], "username": profile["username"],
"profileImage": profile.get("hdProfilePicUrl") or profile.get("profilePicUrl"), "profileImage": profile.get("hdProfilePicUrl") or profile.get("profilePicUrl"),
@ -165,7 +175,7 @@ class ApifyClient:
async def fetch_tiktok_profile(self, url: str) -> list[dict]: async def fetch_tiktok_profile(self, url: str) -> list[dict]:
user = urlparse(url).path.strip("/").lstrip("@").split("/")[0] if "://" in url else url.lstrip("@") user = urlparse(url).path.strip("/").lstrip("@").split("/")[0] if "://" in url else url.lstrip("@")
return await self._run_actor("clockworks~tiktok-scraper", { return await self._run_actor(TIKTOK_ACTOR, {
"profiles": [user], "profiles": [user],
"resultsPerPage": 10, "resultsPerPage": 10,
"profileScrapeSections": ["videos"], "profileScrapeSections": ["videos"],

View File

@ -80,6 +80,8 @@ async def collect_all(
tiktok_url: str | None = None, tiktok_url: str | None = None,
instagram_en_url: str | None = None, instagram_en_url: str | None = None,
facebook_en_url: str | None = None, facebook_en_url: str | None = None,
kakao_talk_url: str | None = None,
naver_cafe_url: str | None = None,
) -> None: ) -> None:
async def _url(table: str, row_id: int) -> str: async def _url(table: str, row_id: int) -> str:
row = await fetchone(f"SELECT url FROM {table} WHERE id = %s", (row_id,)) row = await fetchone(f"SELECT url FROM {table} WHERE id = %s", (row_id,))
@ -111,6 +113,7 @@ async def collect_all(
collect_extra_channels( collect_extra_channels(
analysis_run_id, hospital_id, analysis_run_id, hospital_id,
tiktok_url=tiktok_url, instagram_en_url=instagram_en_url, facebook_en_url=facebook_en_url, tiktok_url=tiktok_url, instagram_en_url=instagram_en_url, facebook_en_url=facebook_en_url,
kakao_talk_url=kakao_talk_url, naver_cafe_url=naver_cafe_url,
), ),
"extra_channels", "extra_channels",
) )

View File

@ -2,6 +2,7 @@ import asyncio
import json import json
import logging import logging
import os import os
import re
from urllib.parse import urlparse from urllib.parse import urlparse
from common.db import fetchone, fetch_raw, merge_hospital_raw_data from common.db import fetchone, fetch_raw, merge_hospital_raw_data
from common.utils import get_env from common.utils import get_env
@ -57,12 +58,19 @@ async def collect_brand_assets(analysis_run_id: str, hospital_id: str) -> None:
return return
# 3. Vision은 로고 정성 묘사만 (hex는 CSS 추출이 더 정확). 키 없으면 색상만 저장. # 3. Vision은 로고 정성 묘사만 (hex는 CSS 추출이 더 정확). 키 없으면 색상만 저장.
# Gemini Vision은 SVG 미지원 → SVG URL이 후보로 들어오면 Vision skip하고 URL만 그대로 박음 (묘사 없음).
SVG_URL = re.compile(r"\.svg(?:\?|#|$)", re.I)
result: dict = {} result: dict = {}
used_kind: str | None = None used_kind: str | None = None
api_key = os.getenv("GEMINI_API_KEY") api_key = os.getenv("GEMINI_API_KEY")
if api_key and candidates: if api_key and candidates:
vc = VisionClient(api_key) vc = VisionClient(api_key)
for kind, cand in candidates: for kind, cand in candidates:
if SVG_URL.search(cand):
logger.info("[brand_assets] %s URL is SVG — Vision 분석 skip, URL만 보관: %s", kind, cand)
result = {"logo_images": {"circle": None, "horizontal": cand, "korean": None}}
used_kind = kind
break
result = await vc.analyze_brand_assets(logo_url=cand, homepage_url=homepage_url) result = await vc.analyze_brand_assets(logo_url=cand, homepage_url=homepage_url)
if result: if result:
used_kind = kind used_kind = kind
@ -95,9 +103,12 @@ async def collect_extra_channels(
tiktok_url: str | None = None, tiktok_url: str | None = None,
instagram_en_url: str | None = None, instagram_en_url: str | None = None,
facebook_en_url: str | None = None, facebook_en_url: str | None = None,
kakao_talk_url: str | None = None,
naver_cafe_url: str | None = None,
) -> None: ) -> None:
"""틱톡 / 인스타 EN / 페북 EN 수집 → hospital raw_data에 저장 (별도 테이블 없이). """틱톡 / 인스타 EN / 페북 EN 수집 + 카카오톡/네이버 카페 URL만 보관 →
인스타EN·페북EN은 기존 Apify 수집기 재사용, 틱톡은 신규 액터.""" 모두 hospital raw_data에 저장. 인스타EN·페북EN은 기존 Apify 수집기 재사용, 틱톡은 신규 액터.
카카오톡·네이버 카페는 콘텐츠 수집 (URL만 LLM이 채널 존재 신호로 사용)."""
apify = ApifyClient(get_env("APIFY_API_TOKEN")) apify = ApifyClient(get_env("APIFY_API_TOKEN"))
jobs: dict = {} jobs: dict = {}
if instagram_en_url: if instagram_en_url:
@ -106,19 +117,25 @@ async def collect_extra_channels(
jobs["facebookEn"] = apify.get_facebook_page(facebook_en_url) jobs["facebookEn"] = apify.get_facebook_page(facebook_en_url)
if tiktok_url: if tiktok_url:
jobs["tiktok"] = apify.get_tiktok_profile(tiktok_url) jobs["tiktok"] = apify.get_tiktok_profile(tiktok_url)
if not jobs:
return
logger.info("[extra_channels] start run=%s channels=%s", analysis_run_id, list(jobs))
done = await asyncio.gather(*jobs.values(), return_exceptions=True)
results: dict = {} results: dict = {}
for key, res in zip(jobs.keys(), done): if jobs:
if isinstance(res, Exception): logger.info("[extra_channels] start run=%s channels=%s", analysis_run_id, list(jobs))
logger.warning("[extra_channels] %s 수집 실패: %s", key, res) done = await asyncio.gather(*jobs.values(), return_exceptions=True)
elif res: for key, res in zip(jobs.keys(), done):
if key == "facebookEn": if isinstance(res, Exception):
res = transform_facebook(res) logger.warning("[extra_channels] %s 수집 실패: %s", key, res)
results[key] = res elif res:
if key == "facebookEn":
res = transform_facebook(res)
results[key] = res
# URL-only 채널 (수집 X, 존재 여부만)
if kakao_talk_url:
results["kakaoTalk"] = {"url": kakao_talk_url}
if naver_cafe_url:
results["naverCafe"] = {"url": naver_cafe_url}
if not results: if not results:
logger.info("[extra_channels] 수집 결과 없음 run=%s", analysis_run_id) logger.info("[extra_channels] 수집 결과 없음 run=%s", analysis_run_id)
return return

View File

@ -30,6 +30,8 @@ async def run_pipeline(analysis_run_id: str, extra_channels: dict | None = None)
tiktok_url=extra_channels.get("tiktok"), tiktok_url=extra_channels.get("tiktok"),
instagram_en_url=extra_channels.get("instagram_en"), instagram_en_url=extra_channels.get("instagram_en"),
facebook_en_url=extra_channels.get("facebook_en"), facebook_en_url=extra_channels.get("facebook_en"),
kakao_talk_url=extra_channels.get("kakao_talk"),
naver_cafe_url=extra_channels.get("naver_cafe"),
) )
# ── 2. Market ──────────────────────────────────────────────────────────── # ── 2. Market ────────────────────────────────────────────────────────────