diff --git a/app/integrations/apify.py b/app/integrations/apify.py index 49353a6..d579da5 100644 --- a/app/integrations/apify.py +++ b/app/integrations/apify.py @@ -9,6 +9,10 @@ APIFY_BASE = "https://api.apify.com/v2" IG_PROFILE_ACTOR = "coderx~instagram-profile-scraper-bio-posts" IG_HIGHLIGHTS_ACTOR = "igview-owner~instagram-highlights-scraper" +# Facebook: pages + posts 두 actor 직접 호출. +FB_PAGES_ACTOR = "apify~facebook-pages-scraper" +FB_POSTS_ACTOR = "apify~facebook-posts-scraper" + def _ig_username(url: str) -> str: return urlparse(url).path.strip("/").split("/")[0] if "://" in url else url.lstrip("@") @@ -19,7 +23,7 @@ class ApifyClient: self.token = token self.wait_for_finish = wait_for_finish - async def _run_actor(self, actor_id: str, input_data: dict) -> list[dict]: + async def _run_actor(self, actor_id: str, input_data: dict, limit: int = 20) -> list[dict]: resp = await http_request( HTTPMethod.POST, url=f"{APIFY_BASE}/acts/{actor_id}/runs", @@ -35,7 +39,7 @@ class ApifyClient: items_resp = await http_request( HTTPMethod.GET, url=f"{APIFY_BASE}/datasets/{dataset_id}/items", - params={"token": self.token, "limit": 20}, + params={"token": self.token, "limit": limit}, label=f"apify-dataset-{dataset_id}", ) if not items_resp or not items_resp.is_success: @@ -116,26 +120,47 @@ class ApifyClient: # } async def fetch_facebook_page(self, page_url: str) -> dict | None: - items = await self._run_actor("apify~facebook-pages-scraper", {"startUrls": [{"url": page_url}]}) + items = await self._run_actor(FB_PAGES_ACTOR, {"startUrls": [{"url": page_url}]}) return items[0] if items else None + async def fetch_facebook_posts(self, page_url: str, limit: int = 20) -> list[dict]: + return await self._run_actor( + FB_POSTS_ACTOR, {"startUrls": [{"url": page_url}], "resultsLimit": limit}, limit=limit, + ) + async def get_facebook_page(self, page_url: str) -> dict | None: - page = await self.fetch_facebook_page(page_url) - if not page: + # pages·posts 두 task 병렬 호출 (posts 실패해도 page만 있으면 진행) + page, posts = await asyncio.gather( + self.fetch_facebook_page(page_url), + self.fetch_facebook_posts(page_url), + return_exceptions=True, + ) + if isinstance(page, Exception) or not page: return None + if isinstance(posts, Exception): + posts = [] return { "pageName": page.get("title") or page.get("name"), "profileImage": page.get("profilePictureUrl") or page.get("profilePhoto") or page.get("profilePic"), "pageUrl": page.get("pageUrl", page_url), "followers": page.get("followers", 0), - "likes": page.get("likes", 0), + "following": page.get("followings", 0), + "reviews": page.get("ratingCount", 0), "categories": page.get("categories", []), - "email": page.get("email"), - "phone": page.get("phone"), - "website": page.get("website"), - "address": page.get("address"), + "website": page.get("website") or page.get("websites"), "intro": page.get("intro"), - "rating": page.get("rating"), + "latestPosts": [ + { + "text": (p.get("text") or "")[:160], + "likes": p.get("likes", 0), + "reactions": p.get("topReactionsCount", 0), + "shares": p.get("shares", 0), + "views": p.get("viewsCount") or 0, + "isVideo": p.get("isVideo", False), + "timestamp": p.get("time") or p.get("timestamp"), + } + for p in (posts or []) if isinstance(p, dict) + ], } async def fetch_tiktok_profile(self, url: str) -> list[dict]: diff --git a/app/integrations/llm/temp-prompt/report_prompt.txt b/app/integrations/llm/temp-prompt/report_prompt.txt index ecdcf3b..08bfc63 100644 --- a/app/integrations/llm/temp-prompt/report_prompt.txt +++ b/app/integrations/llm/temp-prompt/report_prompt.txt @@ -75,6 +75,7 @@ - clinic_snapshot 의 overall_rating/total_reviews/staff_count/location/certifications/lead_doctor 는 강남언니({gangnam_unni}) 데이터의 값을 그대로 사용. - **instagram_audit.accounts 는 반드시 빈 배열 []로 두세요.** 계정 정보는 시스템이 수집 데이터로 직접 채우니 LLM은 만들지 말고, instagram_audit.diagnosis(진단)만 작성하세요. - facebook_audit.pages: KR 페북({facebook})·영문 페북({facebook_en}) 데이터가 있으면 **각각 별도 페이지**로 넣고, url/page_name/followers 등은 그 데이터 그대로. language/label 동일 규칙. +- facebook_audit.pages[].top_content_type 은 해당 페이지 latestPosts의 **캡션·미디어를 읽고** 주로 올리는 콘텐츠를 의미 기반으로 짧게 묘사하세요 (예: "Before/After 사진 + 환자 여정 Reels", "이벤트·프로모션 카드뉴스", "다국어 시술 소개"). 단순 "동영상/이미지 위주"가 아니라 **무슨 주제**인지 쓰세요. (recent_post_age·post_frequency·engagement 수치는 시스템이 덮어쓰니 대략 적어도 됩니다.) - 위 수치·URL·이름은 제공된 데이터에서 그대로 쓰고 절대 지어내지 마세요. ## 기타 채널 현황 (other_channels) 작성 지침 diff --git a/app/services/analysis.py b/app/services/analysis.py index d43e580..baf8eeb 100644 --- a/app/services/analysis.py +++ b/app/services/analysis.py @@ -5,6 +5,7 @@ from integrations.llm.llm_service import LLMService from integrations.llm.prompt import report_prompt, plan_prompt from integrations.llm.schemas.report import ReportOutput from services.instagram_audit import build_instagram_accounts +from services.facebook_audit import build_facebook_pages from integrations.llm.schemas.plan import PlanOutput from models.status import AnalysisStatus @@ -140,15 +141,8 @@ async def _build_overrides(analysis_run_id: str) -> dict: instagram, hospital.get("instagramEn") or {}, hospital.get("channelLogos") or {}, ) - # ── facebook ────────────────────────────────────────────────────────────── - fb_patch: dict = {} - if facebook.get("pageUrl"): fb_patch["url"] = facebook["pageUrl"] - if facebook.get("pageUrl"): fb_patch["link"] = facebook["pageUrl"] - if facebook.get("pageName"): fb_patch["page_name"] = facebook["pageName"] - if facebook.get("followers"): fb_patch["followers"] = facebook["followers"] - if facebook.get("intro"): fb_patch["bio"] = facebook["intro"] - if facebook.get("categories"): fb_patch["category"] = ", ".join(facebook["categories"]) - if facebook.get("website"): fb_patch["linked_domain"] = facebook["website"] + # ── facebook (KR=facebook_data, EN=hospital.facebookEn 둘 다 코드 산출, [KR, EN] 순서) ── + fb_pages = build_facebook_pages(facebook, hospital.get("facebookEn") or {}) # ── youtube ─────────────────────────────────────────────────────────────── yt_patch: dict = {} @@ -176,8 +170,8 @@ async def _build_overrides(analysis_run_id: str) -> dict: overrides["clinic_snapshot"] = snapshot if ig_patch: overrides["instagram_audit"] = {"accounts": ig_patch} - if fb_patch: - overrides["facebook_audit"] = {"pages": [fb_patch]} + if fb_pages: + overrides["facebook_audit"] = {"pages": fb_pages} if yt_patch: overrides["youtube_audit"] = yt_patch return overrides diff --git a/app/services/collect.py b/app/services/collect.py index 6a68aee..91eb075 100644 --- a/app/services/collect.py +++ b/app/services/collect.py @@ -15,6 +15,7 @@ from integrations.naver import NaverClient from integrations.youtube import YouTubeClient from integrations.firecrawl import FirecrawlClient from services.enrichment import collect_brand_assets, collect_extra_channels, collect_channel_logos +from services.facebook_audit import transform_for_storage as transform_facebook logger = logging.getLogger(__name__) @@ -31,6 +32,7 @@ async def collect_facebook(analysis_run_id: str, row_id: int, url: str) -> None: logger.info("[facebook] start run=%s url=%s", analysis_run_id, url) await set_facebook_status(row_id, "processing") data = await ApifyClient(get_env("APIFY_API_TOKEN")).get_facebook_page(url) + data = transform_facebook(data) await save_facebook_raw_data(row_id, data) logger.info("[facebook] done run=%s", analysis_run_id) diff --git a/app/services/enrichment.py b/app/services/enrichment.py index f0af7c0..6598c31 100644 --- a/app/services/enrichment.py +++ b/app/services/enrichment.py @@ -8,6 +8,7 @@ from common.utils import get_env from integrations.apify import ApifyClient from integrations.vision import VisionClient from integrations.color_extractor import extract_brand_assets_from_site +from services.facebook_audit import transform_for_storage as transform_facebook logger = logging.getLogger(__name__) @@ -115,6 +116,8 @@ async def collect_extra_channels( if isinstance(res, Exception): logger.warning("[extra_channels] %s 수집 실패: %s", key, res) elif res: + if key == "facebookEn": + res = transform_facebook(res) results[key] = res if not results: logger.info("[extra_channels] 수집 결과 없음 run=%s", analysis_run_id) diff --git a/app/services/facebook_audit.py b/app/services/facebook_audit.py new file mode 100644 index 0000000..890437e --- /dev/null +++ b/app/services/facebook_audit.py @@ -0,0 +1,105 @@ +"""Facebook audit 페이지(KR·EN)를 수집 데이터로 구성. +수치 지표(최근 게시일·게시 빈도·참여율)는 **수집 시점에** 결정적으로 산출해 DB에 박는다 (transform_for_storage). +콘텐츠 주제(top_content_type)는 캡션 본문 이해가 필요해 LLM이 채운다 (리포트 프롬프트 지시).""" + +from datetime import datetime, timezone + + +def _parse_ts(v) -> datetime | None: + if isinstance(v, (int, float)): + return datetime.fromtimestamp(v, tz=timezone.utc) + if isinstance(v, str): + try: + return datetime.fromisoformat(v.replace("Z", "+00:00")) + except ValueError: + return None + return None + + +def _humanize_age(days: int) -> str: + days = max(days, 0) + if days == 0: return "오늘" + if days < 7: return f"{days}일 전" + if days < 30: return f"{days // 7}주 전" + if days < 365: return f"{days // 30}개월 전" + return f"{days // 365}년 전" + + +def _frequency_label(avg_gap_days: float) -> str: + """게시물 사이 평균 간격(일) → 빈도 라벨.""" + if avg_gap_days <= 1.5: return "거의 매일" + if avg_gap_days <= 10: return f"주 {7 / avg_gap_days:.1f}회" + if avg_gap_days <= 45: return f"월 {30 / avg_gap_days:.1f}회" + return "비정기 (분기 이상 간격)" + + +def _engagement_text(posts: list[dict]) -> str: + """게시물당 좋아요/반응/공유/조회를 min~max 범위로. 전부 0인 지표는 제외. + 댓글은 posts actor가 안 줘서 '댓글 거의 없음' 고정 부가 (FB 페이지는 댓글 희박이 일반적).""" + def _rng(vals: list[int], label: str, unit: str) -> str | None: + lo, hi = min(vals), max(vals) + if hi == 0: + return None + return f"{label} {lo}{unit}" if lo == hi else f"{label} {lo}~{hi}{unit}" + + parts = [ + _rng([p.get("likes", 0) for p in posts], "좋아요", "개"), + _rng([p.get("reactions", 0) for p in posts], "반응", "개"), + _rng([p.get("shares", 0) for p in posts], "공유", "개"), + ] + vid_views = [p.get("views", 0) for p in posts if p.get("isVideo")] + if vid_views: + parts.append(_rng(vid_views, "영상 조회", "회")) + parts = [x for x in parts if x] + if not parts: + return "게시물당 참여 거의 없음" + return "게시물당 " + " · ".join(parts) + " · 댓글 거의 없음" + + +def transform_for_storage(fb: dict | None) -> dict | None: + """apify 원본 → DB에 저장할 최종 형태. + - 수치 지표(recent_post_age·post_frequency·engagement)를 그 자리에서 계산해 박음. + - 게시물은 캡션·타입만 남김 (raw 숫자/timestamp는 어차피 재계산 안 하므로 버림). + 수집 시점에 한 번 계산 → 리포트 생성 때는 그대로 갖다 박기만 함.""" + if not isinstance(fb, dict): + return fb + posts = fb.get("latestPosts") or [] + out = {k: v for k, v in fb.items() if k != "latestPosts"} + if posts: + dts = sorted((d for d in (_parse_ts(p.get("timestamp")) for p in posts) if d), reverse=True) + if dts: + out["recent_post_age"] = _humanize_age((datetime.now(timezone.utc) - dts[0]).days) + if len(dts) > 1: + avg_gap = ((dts[0] - dts[-1]).days or 1) / (len(dts) - 1) + out["post_frequency"] = _frequency_label(avg_gap) + out["engagement"] = _engagement_text(posts) + out["latestPosts"] = [ + {"caption": (p.get("text") or "")[:160], + "type": "video" if p.get("isVideo") else "image"} + for p in posts + ] + else: + out["latestPosts"] = [] + return out + + +def _page_patch(fb: dict) -> dict: + """저장된 페북 페이지 → FacebookPage 스키마 필드 패치. 수치 지표는 수집 시점에 박혀있어 그대로 복사.""" + p: dict = {} + if fb.get("pageUrl"): p["url"] = p["link"] = fb["pageUrl"] + if fb.get("pageName"): p["page_name"] = fb["pageName"] + if fb.get("followers"): p["followers"] = fb["followers"] + if fb.get("intro"): p["bio"] = fb["intro"] + if fb.get("categories"): p["category"] = ", ".join(fb["categories"]) + if fb.get("website"): p["linked_domain"] = fb["website"] + if fb.get("reviews") is not None: p["reviews"] = fb["reviews"] + if fb.get("following") is not None: p["following"] = fb["following"] + for key in ("recent_post_age", "post_frequency", "engagement"): + if fb.get(key): p[key] = fb[key] + return p + + +def build_facebook_pages(facebook: dict, facebook_en: dict) -> list[dict]: + """KR·EN 페북 페이지 패치 리스트 구성. 프롬프트가 pages를 [KR, EN] 순서로 만들므로 동일 순서 유지. + 빈 패치는 제외 (해당 채널 데이터 없음 → LLM도 페이지 안 만듦 → 인덱스 정렬 유지).""" + return [pp for pp in (_page_patch(facebook), _page_patch(facebook_en)) if pp]