페북 수집·지표·저장 파이프라인 정리
수집: - pages + posts 두 actor 병렬 호출 (facebook-pages-scraper, facebook-posts-scraper) - 저장 필드 슬림화: 페이지 메타에서 likes/rating/email/phone/address 제거 (followers/reviews와 중복이거나 클리닉 raw_data에 이미 있음) - 게시물 저장은 캡션 160자 + likes/reactions/shares/views/isVideo/timestamp만 지표 계산 위치 이동: 리포트 시점 → 수집 시점: - recent_post_age / post_frequency / engagement 를 transform_for_storage에서 결정적으로 산출해 DB에 박음 (재계산 불필요) - 저장된 게시물은 LLM용 캡션·타입 2필드만 — 추가 슬림 단계 제거 리팩토링: - services/facebook_audit.py 신설 (instagram_audit 패턴) — _build_overrides의 인라인 클로저(_fb_page_patch)와 analysis.py의 _fb_post_metrics 분리 - collect.py / enrichment.py 가 transform_for_storage를 호출하도록 엔게이지먼트 표기: - 범위(min~max)로 표시, 전부 0인 지표는 제외 - 댓글은 actor 미제공이라 "댓글 거의 없음" 고정 부가 콘텐츠 유형: - top_content_type 은 캡션 본문 주제 추론이 필요해 LLM에 위임 - report_prompt.txt 에 facebook_audit.pages[].top_content_type 작성 지침 추가 Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>facebook-data
parent
4f756cf001
commit
652265cd19
|
|
@ -9,6 +9,10 @@ APIFY_BASE = "https://api.apify.com/v2"
|
|||
IG_PROFILE_ACTOR = "coderx~instagram-profile-scraper-bio-posts"
|
||||
IG_HIGHLIGHTS_ACTOR = "igview-owner~instagram-highlights-scraper"
|
||||
|
||||
# Facebook: pages + posts 두 actor 직접 호출.
|
||||
FB_PAGES_ACTOR = "apify~facebook-pages-scraper"
|
||||
FB_POSTS_ACTOR = "apify~facebook-posts-scraper"
|
||||
|
||||
|
||||
def _ig_username(url: str) -> str:
|
||||
return urlparse(url).path.strip("/").split("/")[0] if "://" in url else url.lstrip("@")
|
||||
|
|
@ -19,7 +23,7 @@ class ApifyClient:
|
|||
self.token = token
|
||||
self.wait_for_finish = wait_for_finish
|
||||
|
||||
async def _run_actor(self, actor_id: str, input_data: dict) -> list[dict]:
|
||||
async def _run_actor(self, actor_id: str, input_data: dict, limit: int = 20) -> list[dict]:
|
||||
resp = await http_request(
|
||||
HTTPMethod.POST,
|
||||
url=f"{APIFY_BASE}/acts/{actor_id}/runs",
|
||||
|
|
@ -35,7 +39,7 @@ class ApifyClient:
|
|||
items_resp = await http_request(
|
||||
HTTPMethod.GET,
|
||||
url=f"{APIFY_BASE}/datasets/{dataset_id}/items",
|
||||
params={"token": self.token, "limit": 20},
|
||||
params={"token": self.token, "limit": limit},
|
||||
label=f"apify-dataset-{dataset_id}",
|
||||
)
|
||||
if not items_resp or not items_resp.is_success:
|
||||
|
|
@ -116,26 +120,47 @@ class ApifyClient:
|
|||
# }
|
||||
|
||||
async def fetch_facebook_page(self, page_url: str) -> dict | None:
|
||||
items = await self._run_actor("apify~facebook-pages-scraper", {"startUrls": [{"url": page_url}]})
|
||||
items = await self._run_actor(FB_PAGES_ACTOR, {"startUrls": [{"url": page_url}]})
|
||||
return items[0] if items else None
|
||||
|
||||
async def fetch_facebook_posts(self, page_url: str, limit: int = 20) -> list[dict]:
|
||||
return await self._run_actor(
|
||||
FB_POSTS_ACTOR, {"startUrls": [{"url": page_url}], "resultsLimit": limit}, limit=limit,
|
||||
)
|
||||
|
||||
async def get_facebook_page(self, page_url: str) -> dict | None:
|
||||
page = await self.fetch_facebook_page(page_url)
|
||||
if not page:
|
||||
# pages·posts 두 task 병렬 호출 (posts 실패해도 page만 있으면 진행)
|
||||
page, posts = await asyncio.gather(
|
||||
self.fetch_facebook_page(page_url),
|
||||
self.fetch_facebook_posts(page_url),
|
||||
return_exceptions=True,
|
||||
)
|
||||
if isinstance(page, Exception) or not page:
|
||||
return None
|
||||
if isinstance(posts, Exception):
|
||||
posts = []
|
||||
return {
|
||||
"pageName": page.get("title") or page.get("name"),
|
||||
"profileImage": page.get("profilePictureUrl") or page.get("profilePhoto") or page.get("profilePic"),
|
||||
"pageUrl": page.get("pageUrl", page_url),
|
||||
"followers": page.get("followers", 0),
|
||||
"likes": page.get("likes", 0),
|
||||
"following": page.get("followings", 0),
|
||||
"reviews": page.get("ratingCount", 0),
|
||||
"categories": page.get("categories", []),
|
||||
"email": page.get("email"),
|
||||
"phone": page.get("phone"),
|
||||
"website": page.get("website"),
|
||||
"address": page.get("address"),
|
||||
"website": page.get("website") or page.get("websites"),
|
||||
"intro": page.get("intro"),
|
||||
"rating": page.get("rating"),
|
||||
"latestPosts": [
|
||||
{
|
||||
"text": (p.get("text") or "")[:160],
|
||||
"likes": p.get("likes", 0),
|
||||
"reactions": p.get("topReactionsCount", 0),
|
||||
"shares": p.get("shares", 0),
|
||||
"views": p.get("viewsCount") or 0,
|
||||
"isVideo": p.get("isVideo", False),
|
||||
"timestamp": p.get("time") or p.get("timestamp"),
|
||||
}
|
||||
for p in (posts or []) if isinstance(p, dict)
|
||||
],
|
||||
}
|
||||
|
||||
async def fetch_tiktok_profile(self, url: str) -> list[dict]:
|
||||
|
|
|
|||
|
|
@ -75,6 +75,7 @@
|
|||
- clinic_snapshot 의 overall_rating/total_reviews/staff_count/location/certifications/lead_doctor 는 강남언니({gangnam_unni}) 데이터의 값을 그대로 사용.
|
||||
- **instagram_audit.accounts 는 반드시 빈 배열 []로 두세요.** 계정 정보는 시스템이 수집 데이터로 직접 채우니 LLM은 만들지 말고, instagram_audit.diagnosis(진단)만 작성하세요.
|
||||
- facebook_audit.pages: KR 페북({facebook})·영문 페북({facebook_en}) 데이터가 있으면 **각각 별도 페이지**로 넣고, url/page_name/followers 등은 그 데이터 그대로. language/label 동일 규칙.
|
||||
- facebook_audit.pages[].top_content_type 은 해당 페이지 latestPosts의 **캡션·미디어를 읽고** 주로 올리는 콘텐츠를 의미 기반으로 짧게 묘사하세요 (예: "Before/After 사진 + 환자 여정 Reels", "이벤트·프로모션 카드뉴스", "다국어 시술 소개"). 단순 "동영상/이미지 위주"가 아니라 **무슨 주제**인지 쓰세요. (recent_post_age·post_frequency·engagement 수치는 시스템이 덮어쓰니 대략 적어도 됩니다.)
|
||||
- 위 수치·URL·이름은 제공된 데이터에서 그대로 쓰고 절대 지어내지 마세요.
|
||||
|
||||
## 기타 채널 현황 (other_channels) 작성 지침
|
||||
|
|
|
|||
|
|
@ -5,6 +5,7 @@ from integrations.llm.llm_service import LLMService
|
|||
from integrations.llm.prompt import report_prompt, plan_prompt
|
||||
from integrations.llm.schemas.report import ReportOutput
|
||||
from services.instagram_audit import build_instagram_accounts
|
||||
from services.facebook_audit import build_facebook_pages
|
||||
from integrations.llm.schemas.plan import PlanOutput
|
||||
from models.status import AnalysisStatus
|
||||
|
||||
|
|
@ -140,15 +141,8 @@ async def _build_overrides(analysis_run_id: str) -> dict:
|
|||
instagram, hospital.get("instagramEn") or {}, hospital.get("channelLogos") or {},
|
||||
)
|
||||
|
||||
# ── facebook ──────────────────────────────────────────────────────────────
|
||||
fb_patch: dict = {}
|
||||
if facebook.get("pageUrl"): fb_patch["url"] = facebook["pageUrl"]
|
||||
if facebook.get("pageUrl"): fb_patch["link"] = facebook["pageUrl"]
|
||||
if facebook.get("pageName"): fb_patch["page_name"] = facebook["pageName"]
|
||||
if facebook.get("followers"): fb_patch["followers"] = facebook["followers"]
|
||||
if facebook.get("intro"): fb_patch["bio"] = facebook["intro"]
|
||||
if facebook.get("categories"): fb_patch["category"] = ", ".join(facebook["categories"])
|
||||
if facebook.get("website"): fb_patch["linked_domain"] = facebook["website"]
|
||||
# ── facebook (KR=facebook_data, EN=hospital.facebookEn 둘 다 코드 산출, [KR, EN] 순서) ──
|
||||
fb_pages = build_facebook_pages(facebook, hospital.get("facebookEn") or {})
|
||||
|
||||
# ── youtube ───────────────────────────────────────────────────────────────
|
||||
yt_patch: dict = {}
|
||||
|
|
@ -176,8 +170,8 @@ async def _build_overrides(analysis_run_id: str) -> dict:
|
|||
overrides["clinic_snapshot"] = snapshot
|
||||
if ig_patch:
|
||||
overrides["instagram_audit"] = {"accounts": ig_patch}
|
||||
if fb_patch:
|
||||
overrides["facebook_audit"] = {"pages": [fb_patch]}
|
||||
if fb_pages:
|
||||
overrides["facebook_audit"] = {"pages": fb_pages}
|
||||
if yt_patch:
|
||||
overrides["youtube_audit"] = yt_patch
|
||||
return overrides
|
||||
|
|
|
|||
|
|
@ -15,6 +15,7 @@ from integrations.naver import NaverClient
|
|||
from integrations.youtube import YouTubeClient
|
||||
from integrations.firecrawl import FirecrawlClient
|
||||
from services.enrichment import collect_brand_assets, collect_extra_channels, collect_channel_logos
|
||||
from services.facebook_audit import transform_for_storage as transform_facebook
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
|
@ -31,6 +32,7 @@ async def collect_facebook(analysis_run_id: str, row_id: int, url: str) -> None:
|
|||
logger.info("[facebook] start run=%s url=%s", analysis_run_id, url)
|
||||
await set_facebook_status(row_id, "processing")
|
||||
data = await ApifyClient(get_env("APIFY_API_TOKEN")).get_facebook_page(url)
|
||||
data = transform_facebook(data)
|
||||
await save_facebook_raw_data(row_id, data)
|
||||
logger.info("[facebook] done run=%s", analysis_run_id)
|
||||
|
||||
|
|
|
|||
|
|
@ -8,6 +8,7 @@ from common.utils import get_env
|
|||
from integrations.apify import ApifyClient
|
||||
from integrations.vision import VisionClient
|
||||
from integrations.color_extractor import extract_brand_assets_from_site
|
||||
from services.facebook_audit import transform_for_storage as transform_facebook
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
|
@ -115,6 +116,8 @@ async def collect_extra_channels(
|
|||
if isinstance(res, Exception):
|
||||
logger.warning("[extra_channels] %s 수집 실패: %s", key, res)
|
||||
elif res:
|
||||
if key == "facebookEn":
|
||||
res = transform_facebook(res)
|
||||
results[key] = res
|
||||
if not results:
|
||||
logger.info("[extra_channels] 수집 결과 없음 run=%s", analysis_run_id)
|
||||
|
|
|
|||
|
|
@ -0,0 +1,105 @@
|
|||
"""Facebook audit 페이지(KR·EN)를 수집 데이터로 구성.
|
||||
수치 지표(최근 게시일·게시 빈도·참여율)는 **수집 시점에** 결정적으로 산출해 DB에 박는다 (transform_for_storage).
|
||||
콘텐츠 주제(top_content_type)는 캡션 본문 이해가 필요해 LLM이 채운다 (리포트 프롬프트 지시)."""
|
||||
|
||||
from datetime import datetime, timezone
|
||||
|
||||
|
||||
def _parse_ts(v) -> datetime | None:
|
||||
if isinstance(v, (int, float)):
|
||||
return datetime.fromtimestamp(v, tz=timezone.utc)
|
||||
if isinstance(v, str):
|
||||
try:
|
||||
return datetime.fromisoformat(v.replace("Z", "+00:00"))
|
||||
except ValueError:
|
||||
return None
|
||||
return None
|
||||
|
||||
|
||||
def _humanize_age(days: int) -> str:
|
||||
days = max(days, 0)
|
||||
if days == 0: return "오늘"
|
||||
if days < 7: return f"{days}일 전"
|
||||
if days < 30: return f"{days // 7}주 전"
|
||||
if days < 365: return f"{days // 30}개월 전"
|
||||
return f"{days // 365}년 전"
|
||||
|
||||
|
||||
def _frequency_label(avg_gap_days: float) -> str:
|
||||
"""게시물 사이 평균 간격(일) → 빈도 라벨."""
|
||||
if avg_gap_days <= 1.5: return "거의 매일"
|
||||
if avg_gap_days <= 10: return f"주 {7 / avg_gap_days:.1f}회"
|
||||
if avg_gap_days <= 45: return f"월 {30 / avg_gap_days:.1f}회"
|
||||
return "비정기 (분기 이상 간격)"
|
||||
|
||||
|
||||
def _engagement_text(posts: list[dict]) -> str:
|
||||
"""게시물당 좋아요/반응/공유/조회를 min~max 범위로. 전부 0인 지표는 제외.
|
||||
댓글은 posts actor가 안 줘서 '댓글 거의 없음' 고정 부가 (FB 페이지는 댓글 희박이 일반적)."""
|
||||
def _rng(vals: list[int], label: str, unit: str) -> str | None:
|
||||
lo, hi = min(vals), max(vals)
|
||||
if hi == 0:
|
||||
return None
|
||||
return f"{label} {lo}{unit}" if lo == hi else f"{label} {lo}~{hi}{unit}"
|
||||
|
||||
parts = [
|
||||
_rng([p.get("likes", 0) for p in posts], "좋아요", "개"),
|
||||
_rng([p.get("reactions", 0) for p in posts], "반응", "개"),
|
||||
_rng([p.get("shares", 0) for p in posts], "공유", "개"),
|
||||
]
|
||||
vid_views = [p.get("views", 0) for p in posts if p.get("isVideo")]
|
||||
if vid_views:
|
||||
parts.append(_rng(vid_views, "영상 조회", "회"))
|
||||
parts = [x for x in parts if x]
|
||||
if not parts:
|
||||
return "게시물당 참여 거의 없음"
|
||||
return "게시물당 " + " · ".join(parts) + " · 댓글 거의 없음"
|
||||
|
||||
|
||||
def transform_for_storage(fb: dict | None) -> dict | None:
|
||||
"""apify 원본 → DB에 저장할 최종 형태.
|
||||
- 수치 지표(recent_post_age·post_frequency·engagement)를 그 자리에서 계산해 박음.
|
||||
- 게시물은 캡션·타입만 남김 (raw 숫자/timestamp는 어차피 재계산 안 하므로 버림).
|
||||
수집 시점에 한 번 계산 → 리포트 생성 때는 그대로 갖다 박기만 함."""
|
||||
if not isinstance(fb, dict):
|
||||
return fb
|
||||
posts = fb.get("latestPosts") or []
|
||||
out = {k: v for k, v in fb.items() if k != "latestPosts"}
|
||||
if posts:
|
||||
dts = sorted((d for d in (_parse_ts(p.get("timestamp")) for p in posts) if d), reverse=True)
|
||||
if dts:
|
||||
out["recent_post_age"] = _humanize_age((datetime.now(timezone.utc) - dts[0]).days)
|
||||
if len(dts) > 1:
|
||||
avg_gap = ((dts[0] - dts[-1]).days or 1) / (len(dts) - 1)
|
||||
out["post_frequency"] = _frequency_label(avg_gap)
|
||||
out["engagement"] = _engagement_text(posts)
|
||||
out["latestPosts"] = [
|
||||
{"caption": (p.get("text") or "")[:160],
|
||||
"type": "video" if p.get("isVideo") else "image"}
|
||||
for p in posts
|
||||
]
|
||||
else:
|
||||
out["latestPosts"] = []
|
||||
return out
|
||||
|
||||
|
||||
def _page_patch(fb: dict) -> dict:
|
||||
"""저장된 페북 페이지 → FacebookPage 스키마 필드 패치. 수치 지표는 수집 시점에 박혀있어 그대로 복사."""
|
||||
p: dict = {}
|
||||
if fb.get("pageUrl"): p["url"] = p["link"] = fb["pageUrl"]
|
||||
if fb.get("pageName"): p["page_name"] = fb["pageName"]
|
||||
if fb.get("followers"): p["followers"] = fb["followers"]
|
||||
if fb.get("intro"): p["bio"] = fb["intro"]
|
||||
if fb.get("categories"): p["category"] = ", ".join(fb["categories"])
|
||||
if fb.get("website"): p["linked_domain"] = fb["website"]
|
||||
if fb.get("reviews") is not None: p["reviews"] = fb["reviews"]
|
||||
if fb.get("following") is not None: p["following"] = fb["following"]
|
||||
for key in ("recent_post_age", "post_frequency", "engagement"):
|
||||
if fb.get(key): p[key] = fb[key]
|
||||
return p
|
||||
|
||||
|
||||
def build_facebook_pages(facebook: dict, facebook_en: dict) -> list[dict]:
|
||||
"""KR·EN 페북 페이지 패치 리스트 구성. 프롬프트가 pages를 [KR, EN] 순서로 만들므로 동일 순서 유지.
|
||||
빈 패치는 제외 (해당 채널 데이터 없음 → LLM도 페이지 안 만듦 → 인덱스 정렬 유지)."""
|
||||
return [pp for pp in (_page_patch(facebook), _page_patch(facebook_en)) if pp]
|
||||
Loading…
Reference in New Issue