diff --git a/SQL/db_create.sql b/SQL/db_create.sql index c4b6f02..19a835e 100644 --- a/SQL/db_create.sql +++ b/SQL/db_create.sql @@ -1,158 +1,87 @@ --- 테이블 순서는 관계를 고려하여 한 번에 실행해도 에러가 발생하지 않게 정렬되었습니다. - --- instagram_data Table Create SQL --- 테이블 생성 SQL - instagram_data -CREATE TABLE instagram_data -( - `id` INT NOT NULL AUTO_INCREMENT, - `hospital_id` CHAR(36) NOT NULL, - `url` VARCHAR(500) NOT NULL, - `status` VARCHAR(20) NOT NULL DEFAULT 'start', - `raw_data` JSON NULL, - `created_at` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, - PRIMARY KEY (id) -); - --- Index 설정 SQL - instagram_data(hospital_id) -CREATE INDEX IX_instagram_data_1 - ON instagram_data(hospital_id); - - --- facebook_data Table Create SQL --- 테이블 생성 SQL - facebook_data -CREATE TABLE facebook_data -( - `id` INT NOT NULL AUTO_INCREMENT, - `hospital_id` CHAR(36) NOT NULL, - `url` VARCHAR(500) NOT NULL, - `status` VARCHAR(20) NOT NULL DEFAULT 'start', - `raw_data` JSON NULL, - `created_at` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, - PRIMARY KEY (id) -); - --- Index 설정 SQL - facebook_data(hospital_id) -CREATE INDEX IX_facebook_data_1 - ON facebook_data(hospital_id); - - --- naver_blog_data Table Create SQL --- 테이블 생성 SQL - naver_blog_data -CREATE TABLE naver_blog_data -( - `id` INT NOT NULL AUTO_INCREMENT, - `hospital_id` CHAR(36) NOT NULL, - `url` VARCHAR(500) NOT NULL, - `status` VARCHAR(20) NOT NULL DEFAULT 'start', - `raw_data` JSON NULL, - `created_at` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, - PRIMARY KEY (id) -); - --- Index 설정 SQL - naver_blog_data(hospital_id) -CREATE INDEX IX_naver_blog_data_1 - ON naver_blog_data(hospital_id); - - --- hospital_baseinfo Table Create SQL --- 테이블 생성 SQL - hospital_baseinfo -CREATE TABLE hospital_baseinfo -( - `hospital_id` CHAR(36) NOT NULL, - `owner_user_id` INT NOT NULL, - `hospital_name` VARCHAR(50) NOT NULL, - `hospital_name_en` VARCHAR(50) NULL, - `brn` VARCHAR(50) NOT NULL, - `road_address` VARCHAR(100) NULL, - `site_address` VARCHAR(100) NULL, - `url` VARCHAR(500) NULL, - `status` VARCHAR(20) NOT NULL DEFAULT 'start', - `raw_data` JSON NULL, - `created_at` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, - `updated_at` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, - PRIMARY KEY (hospital_id) -); - --- Index 설정 SQL - hospital_baseinfo(owner_user_id) -CREATE INDEX IX_hospital_baseinfo_1 - ON hospital_baseinfo(owner_user_id); - - --- user_info Table Create SQL --- 테이블 생성 SQL - user_info +-- user_info CREATE TABLE user_info ( - `user_id` INT NOT NULL AUTO_INCREMENT, - `username` VARCHAR(50) NOT NULL, - `password` VARCHAR(50) NOT NULL, - `created_at` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, - `updated_at` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, + `user_id` INT NOT NULL AUTO_INCREMENT, + `username` VARCHAR(50) NOT NULL, + `password` VARCHAR(50) NOT NULL, + `created_at` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, + `updated_at` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, PRIMARY KEY (user_id) ); --- youtube_data Table Create SQL -CREATE TABLE youtube_data + +-- hospital_baseinfo +CREATE TABLE hospital_baseinfo ( - `id` INT NOT NULL AUTO_INCREMENT, - `hospital_id` CHAR(36) NOT NULL, - `url` VARCHAR(500) NOT NULL, - `status` VARCHAR(20) NOT NULL DEFAULT 'start', - `raw_data` JSON NULL, - `created_at` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, - PRIMARY KEY (id) + `hospital_id` CHAR(36) NOT NULL, + `owner_user_id` INT NOT NULL, + `hospital_name` VARCHAR(50) NOT NULL, + `hospital_name_en` VARCHAR(50) NULL, + `brn` VARCHAR(50) NOT NULL, + `road_address` VARCHAR(100) NULL, + `site_address` VARCHAR(100) NULL, + `status` VARCHAR(20) NOT NULL DEFAULT 'start', + `created_at` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, + `updated_at` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, + PRIMARY KEY (hospital_id) ); --- Index 설정 SQL - youtube_data(hospital_id) -CREATE INDEX IX_youtube_data_1 - ON youtube_data(hospital_id); +CREATE INDEX IX_hospital_baseinfo_1 ON hospital_baseinfo (owner_user_id); --- gangnam_unni_data Table Create SQL -CREATE TABLE gangnam_unni_data +-- remote_source: 병원별 채널 소스 정보 (instagram/facebook/naver_blog/youtube/gangnam_unni 등) +CREATE TABLE remote_source ( - `id` INT NOT NULL AUTO_INCREMENT, - `hospital_id` CHAR(36) NOT NULL, - `url` VARCHAR(500) NOT NULL, - `status` VARCHAR(20) NOT NULL DEFAULT 'start', - `raw_data` JSON NULL, - `created_at` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, - PRIMARY KEY (id) + `source_id` INT NOT NULL AUTO_INCREMENT, + `hospital_id` CHAR(36) NOT NULL, + `source_type` VARCHAR(50) NOT NULL, + `language` CHAR(2) NULL, + `url` VARCHAR(500) NOT NULL, + `created_at` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, + PRIMARY KEY (source_id) ); --- Index 설정 SQL - gangnam_unni_data(hospital_id) -CREATE INDEX IX_gangnam_unni_data_1 - ON gangnam_unni_data(hospital_id); +CREATE INDEX IX_remote_source_1 ON remote_source (hospital_id); +CREATE INDEX IX_remote_source_2 ON remote_source (hospital_id, source_type); --- analysis_runs Table Create SQL +-- analysis_runs CREATE TABLE analysis_runs ( - `analysis_run_id` CHAR(36) NOT NULL, - `hospital_id` CHAR(36) NOT NULL, - `owner_user_id` INT NOT NULL DEFAULT 0, - `status` VARCHAR(50) NOT NULL DEFAULT 'discovering', - `instagram_data_id` INT NULL, - `facebook_data_id` INT NULL, - `naver_blog_data_id` INT NULL, - `youtube_data_id` INT NULL, - `gangnam_unni_data_id` INT NULL, - `report_data` JSON NULL, - `plan_data` JSON NULL, - `created_at` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, - `updated_at` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, + `analysis_run_id` CHAR(36) NOT NULL, + `hospital_id` CHAR(36) NOT NULL, + `owner_user_id` INT NOT NULL DEFAULT 0, + `status` VARCHAR(50) NOT NULL DEFAULT 'discovering', + `report_data` JSON NULL, + `plan_data` JSON NULL, + `created_at` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, + `updated_at` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, PRIMARY KEY (analysis_run_id) ); --- Index 설정 SQL - analysis_runs(hospital_id) -CREATE INDEX IX_analysis_runs_1 - ON analysis_runs(hospital_id); - --- Index 설정 SQL - analysis_runs(owner_user_id) -CREATE INDEX IX_analysis_runs_2 - ON analysis_runs(owner_user_id); +CREATE INDEX IX_analysis_runs_1 ON analysis_runs (hospital_id); +CREATE INDEX IX_analysis_runs_2 ON analysis_runs (owner_user_id); --- file_data Table Create SQL +-- raw_info: 분석 실행별 수집 원시 데이터 +CREATE TABLE raw_info +( + `info_id` INT NOT NULL AUTO_INCREMENT, + `source_id` INT NOT NULL, + `analysis_run_id` CHAR(36) NOT NULL, + `data_tag` VARCHAR(50) NOT NULL DEFAULT 'default', + `status` VARCHAR(20) NOT NULL DEFAULT 'start', + `raw_data` JSON NULL, + `created_at` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, + `updated_at` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, + PRIMARY KEY (info_id) +); + +CREATE INDEX IX_raw_info_1 ON raw_info (analysis_run_id); +CREATE INDEX IX_raw_info_2 ON raw_info (source_id); + + +-- file_data CREATE TABLE file_data ( `id` INT NOT NULL AUTO_INCREMENT, @@ -169,48 +98,38 @@ CREATE TABLE file_data ); --- hospital_history Table Create SQL +-- hospital_history CREATE TABLE hospital_history ( - `id` INT NOT NULL AUTO_INCREMENT, - `hospital_id` CHAR(36) NOT NULL, - `owner_user_id` INT NOT NULL, - `hospital_name` VARCHAR(50) NOT NULL, - `hospital_name_en` VARCHAR(50) NULL, - `brn` VARCHAR(50) NOT NULL, - `road_address` VARCHAR(100) NULL, - `site_address` VARCHAR(100) NULL, - `url` VARCHAR(500) NULL, - `status` VARCHAR(20) NOT NULL, - `raw_data` JSON NULL, - `analysis_run_id` CHAR(36) NULL, - `created_at` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, + `id` INT NOT NULL AUTO_INCREMENT, + `hospital_id` CHAR(36) NOT NULL, + `owner_user_id` INT NOT NULL, + `hospital_name` VARCHAR(50) NOT NULL, + `hospital_name_en` VARCHAR(50) NULL, + `brn` VARCHAR(50) NOT NULL, + `road_address` VARCHAR(100) NULL, + `site_address` VARCHAR(100) NULL, + `status` VARCHAR(20) NOT NULL, + `analysis_run_id` CHAR(36) NULL, + `created_at` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, PRIMARY KEY (id) ); --- Index 설정 SQL - hospital_history(hospital_id) -CREATE INDEX IX_hospital_history_1 - ON hospital_history(hospital_id); - --- Index 설정 SQL - hospital_history(analysis_run_id) -CREATE INDEX IX_hospital_history_2 - ON hospital_history(analysis_run_id); +CREATE INDEX IX_hospital_history_1 ON hospital_history (hospital_id); +CREATE INDEX IX_hospital_history_2 ON hospital_history (analysis_run_id); --- market_analysis Table Create SQL +-- market_analysis CREATE TABLE market_analysis ( - `id` INT NOT NULL AUTO_INCREMENT, - `analysis_run_id` CHAR(36) NOT NULL, - `analysis_type` VARCHAR(50) NOT NULL, - `status` VARCHAR(20) NOT NULL DEFAULT 'start', - `data` JSON NULL, - `created_at` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, + `id` INT NOT NULL AUTO_INCREMENT, + `analysis_run_id` CHAR(36) NOT NULL, + `analysis_type` VARCHAR(50) NOT NULL, + `status` VARCHAR(20) NOT NULL DEFAULT 'start', + `data` JSON NULL, + `created_at` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, PRIMARY KEY (id), UNIQUE KEY UQ_market_analysis (analysis_run_id, analysis_type) ); --- Index 설정 SQL - market_analysis(analysis_run_id) -CREATE INDEX IX_market_analysis_1 - ON market_analysis(analysis_run_id); - +CREATE INDEX IX_market_analysis_1 ON market_analysis (analysis_run_id); diff --git a/app/api/analysis.py b/app/api/analysis.py index 2abbe32..8a77af1 100644 --- a/app/api/analysis.py +++ b/app/api/analysis.py @@ -2,21 +2,22 @@ import logging import uuid6 from fastapi import APIRouter, BackgroundTasks, Depends, File, Form, HTTPException, UploadFile, status from common.deps import verify_api_key -from common.db import fetchone, insert_instagram_row, insert_facebook_row, insert_naver_blog_row, insert_youtube_row, insert_gangnam_unni_row, insert_analysis_run +from common.db.hospital import select_hospital +from common.db.source import select_source_mainpage, insert_source, insert_raw_info +from common.db.run import insert_run, select_run_status +from common.utils import _normalize_homepage, _with_scheme from models.analysis import AnalysisCreate, AnalysisStartResponse, AnalysisStatusResponse from models.file import FileListItem, FileType, FileUploadResponse -from models.status import AnalysisStatus +from models.status import AnalysisStatus, SourceType from services.pipeline import run_pipeline -from services.file import get_analysis_files_response, handle_analysis_file_upload, soft_delete_analysis_file +from services.file_data import get_analysis_files_response, handle_analysis_file_upload, soft_delete_analysis_file from mock_urls import MOCK_CLINICS -from common.utils import _normalize_homepage, _with_scheme router = APIRouter(prefix="/api/analysis", tags=["analysis"], dependencies=[Depends(verify_api_key)]) logger = logging.getLogger(__name__) -# 추후 DB에 클리닉별로 매핑할 채널들 — 지금은 mock_urls에서 homepage 매칭으로 보충. -# 메인 채널(IG/FB/YT/네이버블로그/강남언니) + 부가 채널(틱톡/영문 IG·FB/카카오/네이버카페) 모두 포함. -# 클라가 일부만 보내거나 빈 값이면 mock에서 동일 hospital을 찾아 채워줌. + +# 클라가 일부만 보내거나 빈 값이면 mock_urls 의 동일 homepage 매칭으로 채워줌 (메인 + 부가 채널 동일 규칙). def _channels_from_mockurls(homepage_url: str) -> dict: target = _normalize_homepage(homepage_url) if not target: @@ -47,44 +48,51 @@ async def start_analysis(body: AnalysisCreate, background_tasks: BackgroundTasks analysis_run_id = str(uuid6.uuid7()) hospital_id = body.clinic_id - # 사실 hospital과 owner_user_id 비교 후 검증이 필요한 거지만 일단 PoC 니까. 나중에 바꿉니다. - hospital = await fetchone( - "SELECT owner_user_id, url FROM hospital_baseinfo WHERE hospital_id = %s", - (hospital_id,), - ) + # 사실 hospital 과 owner_user_id 비교 후 검증이 필요한 거지만 일단 PoC 니까. 나중에 바꿉니다. + hospital = await select_hospital(hospital_id) if not hospital: raise HTTPException(status_code=409, detail="Clinic not found") - # 클라가 안 보낸 채널은 mock_urls에서 homepage 매칭으로 보충 (main + extra 동일 규칙) - mock = _channels_from_mockurls(hospital["url"]) + analysis_run_id = await insert_run(analysis_run_id, hospital_id, hospital["owner_user_id"]) - # 사용자가 'gangnamunni.com/...' 같이 scheme/www 없이 줘도 _with_scheme이 https://www. 보강. - ig_url = _with_scheme(body.channels.instagram) or mock.get("instagram") - fb_url = _with_scheme(body.channels.facebook) or mock.get("facebook") - nb_url = _with_scheme(body.channels.naver_blog) or mock.get("naver_blog") - yt_url = _with_scheme(body.channels.youtube) or mock.get("youtube") - gu_url = _with_scheme(body.channels.gangnam_unni) or mock.get("gangnam_unni") + mainpage = await select_source_mainpage(hospital_id) + if mainpage: + await insert_raw_info(mainpage["source_id"], analysis_run_id, data_tag=SourceType.MAINPAGE) + # branding (HTML/CSS + Vision 로고 매칭) — mainpage 와 같은 homepage URL 을 source 로 사용. + branding_id = await insert_source(hospital_id, SourceType.BRANDING, mainpage["url"]) + await insert_raw_info(branding_id, analysis_run_id, data_tag=SourceType.BRANDING) - ig_id = await insert_instagram_row(hospital_id, ig_url) if ig_url else None - fb_id = await insert_facebook_row(hospital_id, fb_url) if fb_url else None - nb_id = await insert_naver_blog_row(hospital_id, nb_url) if nb_url else None - yt_id = await insert_youtube_row(hospital_id, yt_url) if yt_url else None - gu_id = await insert_gangnam_unni_row(hospital_id, gu_url) if gu_url else None + # 클라가 안 보낸 채널은 mock_urls 에서 homepage 매칭으로 보충 (main + extra 동일 규칙). + mock = _channels_from_mockurls((mainpage or {}).get("url") or "") - analysis_run_id = await insert_analysis_run( - analysis_run_id, hospital_id, hospital["owner_user_id"], - ig_id, fb_id, nb_id, yt_id, gu_id, - ) + # 메인 5채널 (KR). _with_scheme 으로 'gangnamunni.com/...' 같이 scheme/www 없이 와도 보강. + main_channels = [ + (SourceType.INSTAGRAM, _with_scheme(body.channels.instagram) or mock.get("instagram")), + (SourceType.FACEBOOK, _with_scheme(body.channels.facebook) or mock.get("facebook")), + (SourceType.NAVER_BLOG, _with_scheme(body.channels.naver_blog) or mock.get("naver_blog")), + (SourceType.YOUTUBE, _with_scheme(body.channels.youtube) or mock.get("youtube")), + (SourceType.GANGNAM_UNNI, _with_scheme(body.channels.gangnam_unni) or mock.get("gangnam_unni")), + ] + for source_type, url in main_channels: + if url: + source_id = await insert_source(hospital_id, source_type, url) + await insert_raw_info(source_id, analysis_run_id, data_tag=source_type) + + # 부가 채널 — instagram_en/facebook_en 은 동일 source_type 에 language='EN' 으로 구분, 나머지는 자체 source_type. + extra_channels = [ + (SourceType.INSTAGRAM, "EN", _with_scheme(body.channels.instagram_en) or mock.get("instagram_en")), + (SourceType.FACEBOOK, "EN", _with_scheme(body.channels.facebook_en) or mock.get("facebook_en")), + (SourceType.TIKTOK, None, _with_scheme(body.channels.tiktok) or mock.get("tiktok")), + (SourceType.KAKAOTALK, None, _with_scheme(body.channels.kakao_talk) or mock.get("kakao_talk")), + (SourceType.NAVER_CAFE, None, _with_scheme(body.channels.naver_cafe) or mock.get("naver_cafe")), + ] + for source_type, language, url in extra_channels: + if url: + source_id = await insert_source(hospital_id, source_type, url, language=language) + await insert_raw_info(source_id, analysis_run_id, data_tag=source_type) - extra_channels = { - "tiktok": body.channels.tiktok or mock.get("tiktok"), - "instagram_en": body.channels.instagram_en or mock.get("instagram_en"), - "facebook_en": body.channels.facebook_en or mock.get("facebook_en"), - "kakao_talk": body.channels.kakao_talk or mock.get("kakao_talk"), - "naver_cafe": body.channels.naver_cafe or mock.get("naver_cafe"), - } logger.info("[analysis] main+extra channels resolved (mock_matched=%s)", bool(mock)) - background_tasks.add_task(run_pipeline, analysis_run_id, extra_channels) + background_tasks.add_task(run_pipeline, analysis_run_id) return AnalysisStartResponse( analysis_run_id=analysis_run_id, @@ -121,12 +129,12 @@ async def delete_analysis_run_file(run_id: str, file_id: int) -> None: @router.get("/{run_id}/status", response_model=AnalysisStatusResponse) async def get_analysis_status(run_id: str): logger.info("GET /api/analysis/%s/status", run_id) - row = await fetchone("SELECT status FROM analysis_runs WHERE analysis_run_id = %s", (run_id,)) - if not row: + run_status = await select_run_status(run_id) + if run_status is None: raise HTTPException(status_code=404, detail="Run not found") return AnalysisStatusResponse( analysis_run_id=run_id, - status=AnalysisStatus(row["status"]), + status=AnalysisStatus(run_status), progress=50.0, current_step="", channel_errors={}, diff --git a/app/api/clinics.py b/app/api/clinics.py index 917e7eb..272e587 100644 --- a/app/api/clinics.py +++ b/app/api/clinics.py @@ -2,7 +2,8 @@ import logging import uuid6 from fastapi import APIRouter, Depends, HTTPException, status from common.deps import verify_api_key -from common.db import insert_hospital, fetchone +from common.db.hospital import select_hospital, insert_hospital +from common.db.source import insert_source from common.utils import get_env from integrations.firecrawl import FirecrawlClient from models.clinic import ClinicCreate, ClinicCreateResponse, ClinicResponse, ClinicHistoryResponse, RunSummary @@ -30,9 +31,8 @@ async def create_clinic(body: ClinicCreate): name=info["clinicName"], name_en=info.get("clinicNameEn"), road_address=info.get("address"), - url=body.url, - raw_data=info, ) + await insert_source(hospital_id, "mainpage", body.url) return ClinicCreateResponse( id=hospital_id, url=body.url, @@ -44,11 +44,7 @@ async def create_clinic(body: ClinicCreate): @router.get("/{hospital_id}", response_model=ClinicResponse) async def get_clinic(hospital_id: str): logger.info("GET /api/clinics/%s", hospital_id) - row = await fetchone( - "SELECT hospital_id, hospital_name, hospital_name_en, road_address, url, status, raw_data, created_at, updated_at" - " FROM hospital_baseinfo WHERE hospital_id = %s", - (hospital_id,), - ) + row = await select_hospital(hospital_id) if not row: raise HTTPException(status_code=404, detail="Clinic not found") return ClinicResponse(**{**row, "created_at": str(row["created_at"]), "updated_at": str(row["updated_at"])}) diff --git a/app/api/plan.py b/app/api/plan.py index 41065f6..697354c 100644 --- a/app/api/plan.py +++ b/app/api/plan.py @@ -1,11 +1,13 @@ import json import logging from fastapi import APIRouter, Depends, HTTPException, Response -from common.db import fetchone, fetch_raw +from common.db.run import select_run_with_clinic +from common.db.source import select_run_source_raw from common.deps import verify_api_key from common.utils import _with_scheme from integrations.llm.schemas.plan import PlanOutput from models.plan import PlanApiResponse +from models.status import SourceType router = APIRouter(prefix="/api/plan", tags=["plan"], dependencies=[Depends(verify_api_key)]) logger = logging.getLogger(__name__) @@ -14,26 +16,21 @@ logger = logging.getLogger(__name__) @router.get("/{run_id}", response_model=PlanApiResponse, response_model_by_alias=True) async def get_plan(run_id: str): logger.info("GET /api/plan/%s", run_id) - row = await fetchone( - "SELECT ar.plan_data, ar.created_at, ar.gangnam_unni_data_id, h.hospital_name, h.hospital_name_en, h.url" - " FROM analysis_runs ar" - " JOIN hospital_baseinfo h ON ar.hospital_id = h.hospital_id" - " WHERE ar.analysis_run_id = %s", - (run_id,), - ) + row = await select_run_with_clinic(run_id) if row is None: raise HTTPException(status_code=404, detail="Run not found") if row["plan_data"] is None: return Response(status_code=204) data = json.loads(row["plan_data"]) if isinstance(row["plan_data"], str) else row["plan_data"] plan = PlanOutput(**data) - gangnam_unni = await fetch_raw("gangnam_unni_data", row["gangnam_unni_data_id"]) or {} - clinic_name = gangnam_unni.get("name") or row["hospital_name"] + # 강남언니에서 긁어온 이름이 있으면 우선 (hospital_baseinfo 의 정식 이름보다 강남언니가 더 광고용 표기). + gu = await select_run_source_raw(run_id, SourceType.GANGNAM_UNNI) or {} + clinic_name = gu.get("name") or row["hospital_name"] return PlanApiResponse( id=run_id, clinic_name=clinic_name, clinic_name_en=row["hospital_name_en"], created_at=str(row["created_at"]), - target_url=_with_scheme(row["url"]), + target_url=_with_scheme(row["target_url"]), **plan.model_dump(), ) diff --git a/app/api/report.py b/app/api/report.py index f39d8ce..ac139f5 100644 --- a/app/api/report.py +++ b/app/api/report.py @@ -1,7 +1,7 @@ import json import logging from fastapi import APIRouter, Depends, HTTPException, Response -from common.db import fetchone +from common.db.run import select_run_with_clinic from common.deps import verify_api_key from common.utils import _with_scheme from integrations.llm.schemas.report import ReportOutput @@ -14,13 +14,7 @@ logger = logging.getLogger(__name__) @router.get("/{run_id}", response_model=MarketingReportResponse, response_model_by_alias=True) async def get_report(run_id: str): logger.info("GET /api/report/%s", run_id) - row = await fetchone( - "SELECT ar.report_data, ar.created_at, h.hospital_name, h.hospital_name_en, h.url" - " FROM analysis_runs ar" - " JOIN hospital_baseinfo h ON ar.hospital_id = h.hospital_id" - " WHERE ar.analysis_run_id = %s", - (run_id,), - ) + row = await select_run_with_clinic(run_id) if row is None: raise HTTPException(status_code=404, detail="Run not found") if row["report_data"] is None: @@ -32,6 +26,6 @@ async def get_report(run_id: str): clinic_name=row["hospital_name"], clinic_name_en=row["hospital_name_en"], created_at=str(row["created_at"]), - target_url=_with_scheme(row["url"]), + target_url=_with_scheme(row["target_url"]), **llm_output.model_dump(exclude={"id", "created_at", "target_url"}), ) diff --git a/app/common/db.py b/app/common/db.py deleted file mode 100644 index 608d6fb..0000000 --- a/app/common/db.py +++ /dev/null @@ -1,287 +0,0 @@ -import json -import os -import aiomysql -from common.utils import get_env - -_pool: aiomysql.Pool | None = None - - -async def get_pool() -> aiomysql.Pool: - global _pool - if _pool is None: - _pool = await aiomysql.create_pool( - host=get_env("MYSQL_HOST"), - port=int(os.getenv("MYSQL_PORT", "3306")), - user=get_env("MYSQL_USER"), - password=get_env("MYSQL_PASSWORD"), - db=get_env("MYSQL_DB"), - charset="utf8mb4", - minsize=0, - maxsize=30, - connect_timeout=10, - ) - return _pool - - -# 쓰기 (INSERT/UPDATE/DELETE) -async def execute(sql: str, args: tuple = ()) -> int: - pool = await get_pool() - async with pool.acquire() as conn: - try: - async with conn.cursor() as cur: - await cur.execute(sql, args) - await conn.commit() - return cur.lastrowid - finally: - conn.close() - - -# 읽기 (SELECT) -async def fetchone(sql: str, args: tuple = ()) -> dict | None: - pool = await get_pool() - async with pool.acquire() as conn: - try: - async with conn.cursor(aiomysql.DictCursor) as cur: - await cur.execute(sql, args) - return await cur.fetchone() - finally: - conn.close() - - -async def fetchall(sql: str, args: tuple = ()) -> list[dict]: - pool = await get_pool() - async with pool.acquire() as conn: - try: - async with conn.cursor(aiomysql.DictCursor) as cur: - await cur.execute(sql, args) - return await cur.fetchall() - finally: - conn.close() - -async def insert_instagram_row(hospital_id: str, url: str) -> int: - return await execute("INSERT INTO instagram_data (hospital_id, url) VALUES (%s, %s)", (hospital_id, url)) - - -async def insert_facebook_row(hospital_id: str, url: str) -> int: - return await execute("INSERT INTO facebook_data (hospital_id, url) VALUES (%s, %s)", (hospital_id, url)) - - -async def insert_naver_blog_row(hospital_id: str, url: str) -> int: - return await execute("INSERT INTO naver_blog_data (hospital_id, url) VALUES (%s, %s)", (hospital_id, url)) - - -async def insert_youtube_row(hospital_id: str, url: str) -> int: - return await execute("INSERT INTO youtube_data (hospital_id, url) VALUES (%s, %s)", (hospital_id, url)) - - -async def insert_gangnam_unni_row(hospital_id: str, url: str) -> int: - return await execute("INSERT INTO gangnam_unni_data (hospital_id, url) VALUES (%s, %s)", (hospital_id, url)) - - -async def insert_file_row( - analysis_run_id: str, - file_type: str, - file_name: str, - file_url: str, - size_bytes: int | None = None, - hospital_id: str | None = None, -) -> int: - return await execute( - "INSERT INTO file_data (analysis_run_id, hospital_id, file_type, file_name, file_url, size_bytes)" - " VALUES (%s, %s, %s, %s, %s, %s)", - (analysis_run_id, hospital_id, file_type, file_name, file_url, size_bytes), - ) - - -async def insert_analysis_run( - analysis_run_id: str, - hospital_id: str, - owner_user_id: int, - instagram_data_id: int | None, - facebook_data_id: int | None, - naver_blog_data_id: int | None, - youtube_data_id: int | None, - gangnam_unni_data_id: int | None, -) -> str: - await execute( - "INSERT INTO analysis_runs" - " (analysis_run_id, hospital_id, owner_user_id, instagram_data_id, facebook_data_id, naver_blog_data_id, youtube_data_id, gangnam_unni_data_id)" - " VALUES (%s, %s, %s, %s, %s, %s, %s, %s)", - (analysis_run_id, hospital_id, owner_user_id, instagram_data_id, facebook_data_id, naver_blog_data_id, youtube_data_id, gangnam_unni_data_id), - ) - return analysis_run_id - - - -async def save_analysis_report(analysis_run_id: str, data: dict) -> None: - await execute( - "UPDATE analysis_runs SET report_data = %s WHERE analysis_run_id = %s", - (json.dumps(data, ensure_ascii=False), analysis_run_id), - ) - - -async def is_done(table: str, row_id: int | None) -> bool: - if row_id is None: - return True - r = await fetchone(f"SELECT status FROM {table} WHERE id = %s", (row_id,)) - return r["status"] == "done" - - -async def fetch_raw(table: str, row_id: int | None) -> dict | None: - if row_id is None: - return None - row = await fetchone(f"SELECT raw_data FROM {table} WHERE id = %s", (row_id,)) - if not row or not row["raw_data"]: - return None - return json.loads(row["raw_data"]) if isinstance(row["raw_data"], str) else row["raw_data"] - - -async def get_analysis_raw_data(analysis_run_id: str) -> dict: - run = await fetchone( - "SELECT instagram_data_id, facebook_data_id, naver_blog_data_id, youtube_data_id, gangnam_unni_data_id" - " FROM analysis_runs WHERE analysis_run_id = %s", - (analysis_run_id,), - ) - return { - "instagram": await fetch_raw("instagram_data", run["instagram_data_id"]), - "facebook": await fetch_raw("facebook_data", run["facebook_data_id"]), - "naver_blog": await fetch_raw("naver_blog_data", run["naver_blog_data_id"]), - "youtube": await fetch_raw("youtube_data", run["youtube_data_id"]), - "gangnam_unni": await fetch_raw("gangnam_unni_data", run["gangnam_unni_data_id"]), - } - - -async def set_instagram_status(row_id: int, status: str) -> None: - await execute("UPDATE instagram_data SET status = %s WHERE id = %s", (status, row_id)) - - -async def set_facebook_status(row_id: int, status: str) -> None: - await execute("UPDATE facebook_data SET status = %s WHERE id = %s", (status, row_id)) - - -async def set_naver_blog_status(row_id: int, status: str) -> None: - await execute("UPDATE naver_blog_data SET status = %s WHERE id = %s", (status, row_id)) - - -async def set_youtube_status(row_id: int, status: str) -> None: - await execute("UPDATE youtube_data SET status = %s WHERE id = %s", (status, row_id)) - - -async def set_gangnam_unni_status(row_id: int, status: str) -> None: - await execute("UPDATE gangnam_unni_data SET status = %s WHERE id = %s", (status, row_id)) - - -async def save_instagram_raw_data(row_id: int, data: dict) -> None: - await execute("UPDATE instagram_data SET raw_data = %s, status = 'done' WHERE id = %s", (json.dumps(data, ensure_ascii=False), row_id)) - - -async def save_facebook_raw_data(row_id: int, data: dict) -> None: - await execute("UPDATE facebook_data SET raw_data = %s, status = 'done' WHERE id = %s", (json.dumps(data, ensure_ascii=False), row_id)) - - -async def save_naver_blog_raw_data(row_id: int, data: dict) -> None: - await execute("UPDATE naver_blog_data SET raw_data = %s, status = 'done' WHERE id = %s", (json.dumps(data, ensure_ascii=False), row_id)) - - -async def save_youtube_raw_data(row_id: int, data: dict) -> None: - await execute("UPDATE youtube_data SET raw_data = %s, status = 'done' WHERE id = %s", (json.dumps(data, ensure_ascii=False), row_id)) - - -async def save_gangnam_unni_raw_data(row_id: int, data: dict) -> None: - await execute("UPDATE gangnam_unni_data SET raw_data = %s, status = 'done' WHERE id = %s", (json.dumps(data, ensure_ascii=False), row_id)) - - -async def _insert_hospital_history(hospital_id: str, analysis_run_id: str | None) -> None: - row = await fetchone( - "SELECT owner_user_id, hospital_name, hospital_name_en, brn, road_address, site_address, url, status, raw_data" - " FROM hospital_baseinfo WHERE hospital_id = %s", - (hospital_id,), - ) - if not row: - return - await execute( - "INSERT INTO hospital_history" - " (hospital_id, owner_user_id, hospital_name, hospital_name_en, brn, road_address, site_address, url, status, raw_data, analysis_run_id)" - " VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)", - ( - hospital_id, - row["owner_user_id"], - row["hospital_name"], - row["hospital_name_en"], - row["brn"], - row["road_address"], - row["site_address"], - row["url"], - row["status"], - row["raw_data"] if isinstance(row["raw_data"], str) else json.dumps(row["raw_data"], ensure_ascii=False) if row["raw_data"] else None, - analysis_run_id, - ), - ) - - -async def insert_hospital( - hospital_id: str, - name: str, - name_en: str | None = None, - road_address: str | None = None, - site_address: str | None = None, - url: str | None = None, - raw_data: dict | None = None, - owner_user_id: int = 0, - brn: str = "", -) -> dict: - await execute( - "INSERT INTO hospital_baseinfo (hospital_id, hospital_name, hospital_name_en, road_address, site_address, url, raw_data, status, owner_user_id, brn)" - " VALUES (%s, %s, %s, %s, %s, %s, %s, 'done', %s, %s)", - (hospital_id, name, name_en, road_address, site_address, url, - json.dumps(raw_data, ensure_ascii=False) if raw_data else None, - owner_user_id, brn), - ) - await _insert_hospital_history(hospital_id, analysis_run_id=None) - return await fetchone( - "SELECT created_at FROM hospital_baseinfo WHERE hospital_id = %s", - (hospital_id,), - ) - - -async def save_hospital_raw_data(hospital_id: str, data: dict, analysis_run_id: str | None = None) -> None: - await execute( - "UPDATE hospital_baseinfo" - " SET raw_data = %s, status = 'done'," - " hospital_name = COALESCE(%s, hospital_name)," - " hospital_name_en = COALESCE(%s, hospital_name_en)," - " road_address = COALESCE(%s, road_address)" - " WHERE hospital_id = %s", - ( - json.dumps(data, ensure_ascii=False), - data.get("clinicName"), - data.get("clinicNameEn"), - data.get("address"), - hospital_id, - ), - ) - await _insert_hospital_history(hospital_id, analysis_run_id) - - -async def merge_hospital_raw_data(hospital_id: str, patch: dict) -> None: - """hospital_baseinfo.raw_data를 읽어 patch를 top-level 병합 후 저장 (read-modify-write). - 부가 수집 단계들이 순차로 raw_data에 키를 덧붙일 때 사용.""" - row = await fetchone("SELECT raw_data FROM hospital_baseinfo WHERE hospital_id = %s", (hospital_id,)) - raw = row["raw_data"] if row else None - raw_data = json.loads(raw) if isinstance(raw, str) else (raw or {}) - raw_data.update(patch) - await execute( - "UPDATE hospital_baseinfo SET raw_data = %s WHERE hospital_id = %s", - (json.dumps(raw_data, ensure_ascii=False), hospital_id), - ) - - -async def get_market_analysis(analysis_run_id: str) -> dict: - rows = await fetchall( - "SELECT analysis_type, data FROM market_analysis WHERE analysis_run_id = %s AND status = 'done'", - (analysis_run_id,), - ) - return { - row["analysis_type"]: json.loads(row["data"]) if isinstance(row["data"], str) else row["data"] - for row in rows - } diff --git a/app/common/db/__init__.py b/app/common/db/__init__.py new file mode 100644 index 0000000..7ac14e3 --- /dev/null +++ b/app/common/db/__init__.py @@ -0,0 +1,15 @@ +from common.db.base import execute, fetchone, fetchall +from common.db.hospital import select_hospital, update_hospital_status, insert_hospital, update_hospital +from common.db.source import ( + insert_source, select_source_mainpage, select_source_by_type, + insert_raw_info, update_raw_info_status, update_raw_info, update_raw_info_merge, + select_raw_info_data, + select_run_sources, select_run_raw_data, select_run_source_raw, + select_run_mainpage_url, +) +from common.db.run import ( + insert_run, select_run, select_run_status, update_run_status, + update_run_report, update_run_plan, select_run_with_clinic, +) +from common.db.market import upsert_market_status, upsert_market_result, select_market +from common.db.file_data import insert_file, select_run_files, select_file, delete_file diff --git a/app/common/db/base.py b/app/common/db/base.py new file mode 100644 index 0000000..c047ff6 --- /dev/null +++ b/app/common/db/base.py @@ -0,0 +1,56 @@ +import os +import aiomysql +from common.utils import get_env + +_pool: aiomysql.Pool | None = None + + +async def get_pool() -> aiomysql.Pool: + global _pool + if _pool is None: + _pool = await aiomysql.create_pool( + host=get_env("MYSQL_HOST"), + port=int(os.getenv("MYSQL_PORT", "3306")), + user=get_env("MYSQL_USER"), + password=get_env("MYSQL_PASSWORD"), + db=get_env("MYSQL_DB"), + charset="utf8mb4", + minsize=0, + maxsize=30, + connect_timeout=10, + ) + return _pool + + +async def execute(sql: str, args: tuple = ()) -> int: + pool = await get_pool() + async with pool.acquire() as conn: + try: + async with conn.cursor() as cur: + await cur.execute(sql, args) + await conn.commit() + return cur.lastrowid + finally: + conn.close() + + +async def fetchone(sql: str, args: tuple = ()) -> dict | None: + pool = await get_pool() + async with pool.acquire() as conn: + try: + async with conn.cursor(aiomysql.DictCursor) as cur: + await cur.execute(sql, args) + return await cur.fetchone() + finally: + conn.close() + + +async def fetchall(sql: str, args: tuple = ()) -> list[dict]: + pool = await get_pool() + async with pool.acquire() as conn: + try: + async with conn.cursor(aiomysql.DictCursor) as cur: + await cur.execute(sql, args) + return await cur.fetchall() + finally: + conn.close() diff --git a/app/common/db/file_data.py b/app/common/db/file_data.py new file mode 100644 index 0000000..ac3e9bc --- /dev/null +++ b/app/common/db/file_data.py @@ -0,0 +1,39 @@ +from common.db.base import execute, fetchone, fetchall + + +async def insert_file( + analysis_run_id: str, + file_type: str, + file_name: str, + file_url: str, + size_bytes: int | None = None, + hospital_id: str | None = None, +) -> int: + return await execute( + "INSERT INTO file_data (analysis_run_id, hospital_id, file_type, file_name, file_url, size_bytes)" + " VALUES (%s, %s, %s, %s, %s, %s)", + (analysis_run_id, hospital_id, file_type, file_name, file_url, size_bytes), + ) + + +async def select_run_files(analysis_run_id: str) -> list[dict]: + return await fetchall( + "SELECT id, file_type, file_name, file_url, size_bytes, created_at" + " FROM file_data WHERE analysis_run_id = %s AND is_deleted = FALSE" + " ORDER BY created_at DESC", + (analysis_run_id,), + ) + + +async def select_file(file_id: int, analysis_run_id: str) -> dict | None: + return await fetchone( + "SELECT id FROM file_data WHERE id = %s AND analysis_run_id = %s", + (file_id, analysis_run_id), + ) + + +async def delete_file(file_id: int) -> None: + await execute( + "UPDATE file_data SET is_deleted = TRUE WHERE id = %s AND is_deleted = FALSE", + (file_id,), + ) diff --git a/app/common/db/hospital.py b/app/common/db/hospital.py new file mode 100644 index 0000000..6e4cd88 --- /dev/null +++ b/app/common/db/hospital.py @@ -0,0 +1,78 @@ +from common.db.base import execute, fetchone + + +async def select_hospital(hospital_id: str) -> dict | None: + return await fetchone( + "SELECT hospital_id, owner_user_id, hospital_name, hospital_name_en," + " brn, road_address, site_address, status, created_at, updated_at" + " FROM hospital_baseinfo WHERE hospital_id = %s", + (hospital_id,), + ) + + +async def update_hospital_status(hospital_id: str, status: str) -> None: + await execute( + "UPDATE hospital_baseinfo SET status = %s WHERE hospital_id = %s", + (status, hospital_id), + ) + + +async def _insert_hospital_history(hospital_id: str, analysis_run_id: str | None) -> None: + row = await fetchone( + "SELECT owner_user_id, hospital_name, hospital_name_en, brn, road_address, site_address, status" + " FROM hospital_baseinfo WHERE hospital_id = %s", + (hospital_id,), + ) + if not row: + return + await execute( + "INSERT INTO hospital_history" + " (hospital_id, owner_user_id, hospital_name, hospital_name_en, brn, road_address, site_address, status, analysis_run_id)" + " VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)", + ( + hospital_id, + row["owner_user_id"], + row["hospital_name"], + row["hospital_name_en"], + row["brn"], + row["road_address"], + row["site_address"], + row["status"], + analysis_run_id, + ), + ) + + +async def insert_hospital( + hospital_id: str, + name: str, + name_en: str | None = None, + road_address: str | None = None, + site_address: str | None = None, + owner_user_id: int = 0, + brn: str = "", +) -> dict: + await execute( + "INSERT INTO hospital_baseinfo" + " (hospital_id, hospital_name, hospital_name_en, road_address, site_address, status, owner_user_id, brn)" + " VALUES (%s, %s, %s, %s, %s, 'done', %s, %s)", + (hospital_id, name, name_en, road_address, site_address, owner_user_id, brn), + ) + await _insert_hospital_history(hospital_id, analysis_run_id=None) + return await fetchone( + "SELECT created_at FROM hospital_baseinfo WHERE hospital_id = %s", + (hospital_id,), + ) + + +async def update_hospital(hospital_id: str, data: dict, analysis_run_id: str | None = None) -> None: + await execute( + "UPDATE hospital_baseinfo" + " SET status = 'done'," + " hospital_name = COALESCE(%s, hospital_name)," + " hospital_name_en = COALESCE(%s, hospital_name_en)," + " road_address = COALESCE(%s, road_address)" + " WHERE hospital_id = %s", + (data.get("clinicName"), data.get("clinicNameEn"), data.get("address"), hospital_id), + ) + await _insert_hospital_history(hospital_id, analysis_run_id) diff --git a/app/common/db/market.py b/app/common/db/market.py new file mode 100644 index 0000000..ca4093b --- /dev/null +++ b/app/common/db/market.py @@ -0,0 +1,31 @@ +import json +from common.db.base import execute, fetchall + + +async def upsert_market_status(analysis_run_id: str, analysis_type: str, status: str) -> None: + await execute( + "INSERT INTO market_analysis (analysis_run_id, analysis_type, status)" + " VALUES (%s, %s, %s)" + " ON DUPLICATE KEY UPDATE status = VALUES(status)", + (analysis_run_id, analysis_type, status), + ) + + +async def upsert_market_result(analysis_run_id: str, analysis_type: str, data: dict) -> None: + await execute( + "INSERT INTO market_analysis (analysis_run_id, analysis_type, status, data)" + " VALUES (%s, %s, 'done', %s)" + " ON DUPLICATE KEY UPDATE status = 'done', data = VALUES(data)", + (analysis_run_id, analysis_type, json.dumps(data, ensure_ascii=False)), + ) + + +async def select_market(analysis_run_id: str) -> dict: + rows = await fetchall( + "SELECT analysis_type, data FROM market_analysis WHERE analysis_run_id = %s AND status = 'done'", + (analysis_run_id,), + ) + return { + row["analysis_type"]: json.loads(row["data"]) if isinstance(row["data"], str) else row["data"] + for row in rows + } diff --git a/app/common/db/run.py b/app/common/db/run.py new file mode 100644 index 0000000..cbf188c --- /dev/null +++ b/app/common/db/run.py @@ -0,0 +1,64 @@ +import json +from common.db.base import execute, fetchone + + +async def insert_run( + analysis_run_id: str, + hospital_id: str, + owner_user_id: int, +) -> str: + await execute( + "INSERT INTO analysis_runs (analysis_run_id, hospital_id, owner_user_id) VALUES (%s, %s, %s)", + (analysis_run_id, hospital_id, owner_user_id), + ) + return analysis_run_id + + +async def select_run(analysis_run_id: str) -> dict | None: + return await fetchone( + "SELECT analysis_run_id, hospital_id, owner_user_id, status, created_at, updated_at" + " FROM analysis_runs WHERE analysis_run_id = %s", + (analysis_run_id,), + ) + + +async def select_run_status(analysis_run_id: str) -> str | None: + row = await fetchone( + "SELECT status FROM analysis_runs WHERE analysis_run_id = %s", + (analysis_run_id,), + ) + return row["status"] if row else None + + +async def update_run_status(analysis_run_id: str, status: str) -> None: + await execute( + "UPDATE analysis_runs SET status = %s WHERE analysis_run_id = %s", + (status, analysis_run_id), + ) + + +async def update_run_report(analysis_run_id: str, data: dict) -> None: + await execute( + "UPDATE analysis_runs SET report_data = %s WHERE analysis_run_id = %s", + (json.dumps(data, ensure_ascii=False), analysis_run_id), + ) + + +async def update_run_plan(analysis_run_id: str, data: dict) -> None: + await execute( + "UPDATE analysis_runs SET plan_data = %s WHERE analysis_run_id = %s", + (json.dumps(data, ensure_ascii=False), analysis_run_id), + ) + + +async def select_run_with_clinic(analysis_run_id: str) -> dict | None: + return await fetchone( + "SELECT ar.report_data, ar.plan_data, ar.created_at," + " h.hospital_name, h.hospital_name_en," + " rs.url AS target_url" + " FROM analysis_runs ar" + " JOIN hospital_baseinfo h ON ar.hospital_id = h.hospital_id" + " LEFT JOIN remote_source rs ON rs.hospital_id = h.hospital_id AND rs.source_type = 'mainpage'" + " WHERE ar.analysis_run_id = %s", + (analysis_run_id,), + ) diff --git a/app/common/db/source.py b/app/common/db/source.py new file mode 100644 index 0000000..1977791 --- /dev/null +++ b/app/common/db/source.py @@ -0,0 +1,134 @@ +import json +from common.db.base import execute, fetchone, fetchall +from models.status import SourceType + + +async def insert_source( + hospital_id: str, + source_type: SourceType, + url: str, + language: str | None = None, +) -> int: + return await execute( + "INSERT INTO remote_source (hospital_id, source_type, language, url) VALUES (%s, %s, %s, %s)", + (hospital_id, source_type, language, url), + ) + + +async def select_source_mainpage(hospital_id: str) -> dict | None: + return await fetchone( + "SELECT source_id, url FROM remote_source WHERE hospital_id = %s AND source_type = 'mainpage'", + (hospital_id,), + ) + + +async def insert_raw_info( + source_id: int, + analysis_run_id: str, + data_tag: SourceType, +) -> int: + return await execute( + "INSERT INTO raw_info (source_id, analysis_run_id, data_tag) VALUES (%s, %s, %s)", + (source_id, analysis_run_id, data_tag), + ) + + +async def update_raw_info_status(info_id: int, status: str) -> None: + await execute("UPDATE raw_info SET status = %s WHERE info_id = %s", (status, info_id)) + + +async def update_raw_info(info_id: int, data: dict) -> None: + await execute( + "UPDATE raw_info SET raw_data = %s, status = 'done' WHERE info_id = %s", + (json.dumps(data, ensure_ascii=False), info_id), + ) + + +async def select_raw_info_data(info_id: int | None) -> dict | None: + if info_id is None: + return None + row = await fetchone("SELECT raw_data FROM raw_info WHERE info_id = %s", (info_id,)) + if not row or not row["raw_data"]: + return None + return json.loads(row["raw_data"]) if isinstance(row["raw_data"], str) else row["raw_data"] + + +async def select_run_sources(analysis_run_id: str) -> list[dict]: + return await fetchall( + "SELECT ri.info_id, rs.source_type, rs.url" + " FROM raw_info ri JOIN remote_source rs USING (source_id)" + " WHERE ri.analysis_run_id = %s", + (analysis_run_id,), + ) + + +async def select_run_raw_data(analysis_run_id: str) -> dict: + # language='EN' 인 row 는 dict key 를 "_en" 으로 합성 (KR/EN 동시 수집 시 키 충돌 방지). + rows = await fetchall( + "SELECT rs.source_type, rs.language, ri.raw_data" + " FROM raw_info ri JOIN remote_source rs USING (source_id)" + " WHERE ri.analysis_run_id = %s", + (analysis_run_id,), + ) + result: dict = {} + for row in rows: + raw = row["raw_data"] + key = row["source_type"] + if (row.get("language") or "").upper() == "EN": + key = f"{key}_en" + result[key] = json.loads(raw) if isinstance(raw, str) else raw + return result + + +async def select_run_source_raw( + analysis_run_id: str, source_type: str, language: str | None = None, +) -> dict | None: + sql = ( + "SELECT ri.raw_data FROM raw_info ri JOIN remote_source rs USING (source_id)" + " WHERE ri.analysis_run_id = %s AND rs.source_type = %s" + ) + args: tuple = (analysis_run_id, source_type) + if language: + sql += " AND rs.language = %s" + args = (*args, language) + sql += " LIMIT 1" + row = await fetchone(sql, args) + if not row or not row["raw_data"]: + return None + return json.loads(row["raw_data"]) if isinstance(row["raw_data"], str) else row["raw_data"] + + +async def update_raw_info_merge(info_id: int, patch: dict) -> None: + """raw_info.raw_data 를 read-modify-write 로 top-level 머지. + 한 source 가 단계별로 (예: branding 의 brandAssets → channelLogos) 키를 덧붙일 때 사용.""" + row = await fetchone("SELECT raw_data FROM raw_info WHERE info_id = %s", (info_id,)) + if not row: + return + raw = row["raw_data"] + data = json.loads(raw) if isinstance(raw, str) else (raw or {}) + data.update(patch) + await execute( + "UPDATE raw_info SET raw_data = %s, status = 'done' WHERE info_id = %s", + (json.dumps(data, ensure_ascii=False), info_id), + ) + + +async def select_source_by_type( + hospital_id: str, source_type: str, language: str | None = None, +) -> dict | None: + sql = "SELECT source_id, url FROM remote_source WHERE hospital_id = %s AND source_type = %s" + args: tuple = (hospital_id, source_type) + if language: + sql += " AND language = %s" + args = (*args, language) + sql += " LIMIT 1" + return await fetchone(sql, args) + + +async def select_run_mainpage_url(analysis_run_id: str) -> str: + row = await fetchone( + "SELECT rs.url FROM raw_info ri JOIN remote_source rs USING (source_id)" + " WHERE ri.analysis_run_id = %s AND rs.source_type = 'mainpage'", + (analysis_run_id,), + ) + return (row or {}).get("url") or "" diff --git a/app/models/clinic.py b/app/models/clinic.py index 473b4a2..9191a93 100644 --- a/app/models/clinic.py +++ b/app/models/clinic.py @@ -10,9 +10,7 @@ class ClinicResponse(BaseModel): hospital_name: str hospital_name_en: str | None road_address: str | None - url: str | None status: str - raw_data: dict | None created_at: str updated_at: str diff --git a/app/models/status.py b/app/models/status.py index 34bac8b..de7015e 100644 --- a/app/models/status.py +++ b/app/models/status.py @@ -36,9 +36,24 @@ class DataSource(StrEnum): SCRAPE = "scrape" +class SourceType(StrEnum): + MAINPAGE = "mainpage" + INSTAGRAM = "instagram" + FACEBOOK = "facebook" + NAVER_BLOG = "naver_blog" + YOUTUBE = "youtube" + TIKTOK = "tiktok" + GANGNAM_UNNI = "gangnam_unni" + KAKAOTALK = "kakaotalk" + NAVER_CAFE = "naver_cafe" + # 부가 수집/분석 (HTML/CSS 재크롤 + Vision 로고 매칭) — 한 raw_info entry 에 brandAssets/channelLogos 같이 보관. + BRANDING = "branding" + + class Language(StrEnum): KR = "KR" EN = "EN" + WW = "WW" class VideoType(StrEnum): diff --git a/app/services/analysis.py b/app/services/analysis.py index 2b3e597..8672c50 100644 --- a/app/services/analysis.py +++ b/app/services/analysis.py @@ -2,7 +2,10 @@ import json import logging import re from datetime import datetime -from common.db import fetchone, execute, fetch_raw, get_analysis_raw_data, save_analysis_report, get_market_analysis +from urllib.parse import urlparse +from common.db.run import select_run, update_run_report, update_run_plan +from common.db.source import select_run_raw_data +from common.db.market import select_market from integrations.llm.llm_service import LLMService from integrations.llm.prompt import report_prompt, plan_prompt, youtube_diagnosis_prompt from integrations.llm.schemas.report import ReportOutput, ClinicSnapshot, YouTubeAudit @@ -10,24 +13,15 @@ from services.instagram_audit import build_instagram_accounts from services.facebook_audit import build_facebook_pages from services.kpi_dashboard import build_kpi_dashboard from integrations.llm.schemas.plan import PlanOutput -from models.status import AnalysisStatus logger = logging.getLogger(__name__) async def generate_report(analysis_run_id: str) -> ReportOutput: - run = await fetchone( - "SELECT hospital_id FROM analysis_runs WHERE analysis_run_id = %s", - (analysis_run_id,), - ) - clinic_row = await fetchone( - "SELECT raw_data FROM hospital_baseinfo WHERE hospital_id = %s", - (run["hospital_id"],), - ) - raw_data = clinic_row["raw_data"] if clinic_row else None - clinic = json.loads(raw_data) if isinstance(raw_data, str) else (raw_data or {}) - raw = await get_analysis_raw_data(analysis_run_id) - market = await get_market_analysis(analysis_run_id) + raw = await select_run_raw_data(analysis_run_id) + clinic = raw.get("mainpage") or {} + branding = raw.get("branding") or {} + market = await select_market(analysis_run_id) def _json(v) -> str | None: return json.dumps(v, ensure_ascii=False) if v else None @@ -44,37 +38,38 @@ async def generate_report(analysis_run_id: str) -> ReportOutput: "market_keywords": _json(market.get("keywords")), "market_trend": _json(market.get("trend")), "market_target_audience": _json(market.get("target_audience")), + # firecrawl 이 mainpage 에서 뽑은 branding 메타(logoUrl/ogImage/faviconUrl) + Vision/CSS 산출물 "branding": _json(clinic.get("branding")), - "brand_assets": _json(clinic.get("brandAssets")), - "tiktok": _json(clinic.get("tiktok")), - "instagram_en": _json(clinic.get("instagramEn")), - "facebook_en": _json(clinic.get("facebookEn")), - "kakao_talk": _json(clinic.get("kakaoTalk")), - "naver_cafe": _json(clinic.get("naverCafe")), - "channel_logos": _json(clinic.get("channelLogos")), + "brand_assets": _json(branding.get("brandAssets")), + "channel_logos": _json(branding.get("channelLogos")), + # 부가 채널 (raw_info entry) — raw dict 의 한국식 key 그대로 + "tiktok": _json(raw.get("tiktok")), + "instagram_en": _json(raw.get("instagram_en")), + "facebook_en": _json(raw.get("facebook_en")), + "kakao_talk": _json(raw.get("kakaotalk")), + "naver_cafe": _json(raw.get("naver_cafe")), + # 메인 5채널은 raw dict 그대로 펼쳐서 prompt placeholder 와 매칭 **{ - channel: _json(data) - for channel, data in raw.items() + source_type: _json(data) + for source_type, data in raw.items() + if source_type not in { + "mainpage", "branding", + "tiktok", "instagram_en", "facebook_en", "kakaotalk", "naver_cafe", + } }, } return await LLMService(provider="perplexity").generate(report_prompt, input_data) + async def generate_plan(analysis_run_id: str) -> PlanOutput: - run = await fetchone( - "SELECT hospital_id, report_data FROM analysis_runs WHERE analysis_run_id = %s", - (analysis_run_id,), - ) - clinic_row = await fetchone( - "SELECT raw_data FROM hospital_baseinfo WHERE hospital_id = %s", - (run["hospital_id"],), - ) - raw_data = clinic_row["raw_data"] if clinic_row else None - clinic = json.loads(raw_data) if isinstance(raw_data, str) else (raw_data or {}) + run = await select_run(analysis_run_id) + raw = await select_run_raw_data(analysis_run_id) + clinic = raw.get("mainpage") or {} + branding = raw.get("branding") or {} report_data = run["report_data"] report = json.loads(report_data) if isinstance(report_data, str) else report_data - market = await get_market_analysis(analysis_run_id) - raw = await get_analysis_raw_data(analysis_run_id) + market = await select_market(analysis_run_id) def _json(v) -> str | None: return json.dumps(v, ensure_ascii=False) if v else None @@ -92,27 +87,28 @@ async def generate_plan(analysis_run_id: str) -> PlanOutput: "market_keywords": _json(market.get("keywords")), "market_trend": _json(market.get("trend")), "market_target_audience": _json(market.get("target_audience")), - "tiktok": _json(clinic.get("tiktok")), - "instagram_en": _json(clinic.get("instagramEn")), - "facebook_en": _json(clinic.get("facebookEn")), + "tiktok": _json(raw.get("tiktok")), + "instagram_en": _json(raw.get("instagram_en")), + "facebook_en": _json(raw.get("facebook_en")), "naver_blog": _json(_naver_blog_summary(raw.get("naver_blog"))), - "naver_cafe": _json(clinic.get("naverCafe")), - "kakao_talk": _json(clinic.get("kakaoTalk")), - "channel_logos": _json(clinic.get("channelLogos")), - "brand_assets": _json(clinic.get("brandAssets")), + "naver_cafe": _json(raw.get("naver_cafe")), + "kakao_talk": _json(raw.get("kakaotalk")), + "channel_logos": _json(branding.get("channelLogos")), + "brand_assets": _json(branding.get("brandAssets")), } return await LLMService(provider="perplexity").generate(plan_prompt, input_data) -def _build_clinic_snapshot(gangnam_unni: dict, hospital: dict) -> dict: +def _build_clinic_snapshot(gangnam_unni: dict, mainpage: dict, brand_assets: dict) -> dict: snapshot: dict = {} doctors = gangnam_unni.get("doctors", []) lead = max(doctors, key=lambda d: d.get("reviews", 0)) if doctors else None if gangnam_unni.get("name"): snapshot["name"] = gangnam_unni["name"] - if hospital.get("clinicNameEn"): snapshot["name_en"] = hospital["clinicNameEn"] - if hospital.get("phone"): snapshot["phone"] = hospital["phone"] - if hospital.get("domain"): snapshot["domain"] = hospital["domain"] + if mainpage.get("clinicNameEn"): snapshot["name_en"] = mainpage["clinicNameEn"] + if mainpage.get("phone"): snapshot["phone"] = mainpage["phone"] + domain = mainpage.get("domain") or urlparse(mainpage.get("sourceUrl") or "").netloc + if domain: snapshot["domain"] = domain if gangnam_unni.get("rating"): snapshot["overall_rating"] = gangnam_unni["rating"] if gangnam_unni.get("totalReviews"): snapshot["total_reviews"] = gangnam_unni["totalReviews"] if gangnam_unni.get("address"): snapshot["location"] = gangnam_unni["address"] @@ -125,16 +121,15 @@ def _build_clinic_snapshot(gangnam_unni: dict, hospital: dict) -> dict: "rating": lead.get("rating"), "review_count": lead.get("reviews"), } - # brand_assets에서 logo_images / brand_colors 강제 주입. LLM이 프롬프트 가드 무시하고 null로 두는 케이스 차단. - ba = hospital.get("brandAssets") or {} - if ba.get("logo_images"): snapshot["logo_images"] = ba["logo_images"] - if ba.get("brand_colors"): snapshot["brand_colors"] = ba["brand_colors"] + # branding.brandAssets 에서 logo_images / brand_colors 강제 주입. LLM 이 프롬프트 가드 무시하고 null 로 두는 케이스 차단. + if brand_assets.get("logo_images"): snapshot["logo_images"] = brand_assets["logo_images"] + if brand_assets.get("brand_colors"): snapshot["brand_colors"] = brand_assets["brand_colors"] return ClinicSnapshot.model_validate(snapshot).model_dump() def _naver_blog_summary(blog: dict | None) -> dict | None: """plan 카드 한 장에 들어가는 건 전체 포스트 수와 최근 활동 시점뿐. 그 외(본문·링크·제목)는 - 던져봐야 토큰만 늘고 LLM이 무관 정보로 hallucinate 함.""" + 던져봐야 토큰만 늘고 LLM 이 무관 정보로 hallucinate 함.""" if not blog: return None posts = blog.get("posts") or [] @@ -256,40 +251,42 @@ async def _build_youtube_audit(youtube: dict) -> dict: async def _build_overrides(analysis_run_id: str) -> dict: - run = await fetchone( - "SELECT hospital_id, instagram_data_id, facebook_data_id," - " naver_blog_data_id, youtube_data_id, gangnam_unni_data_id" - " FROM analysis_runs WHERE analysis_run_id = %s", - (analysis_run_id,), - ) - if not run: + raw = await select_run_raw_data(analysis_run_id) + if not raw: return {} - hospital_row = await fetchone( - "SELECT raw_data, url FROM hospital_baseinfo WHERE hospital_id = %s", - (run["hospital_id"],), - ) - hospital = json.loads(hospital_row["raw_data"]) if hospital_row and isinstance(hospital_row.get("raw_data"), str) else (hospital_row or {}).get("raw_data") or {} - hospital["domain"] = (hospital_row or {}).get("url") or "" - instagram = await fetch_raw("instagram_data", run["instagram_data_id"]) or {} - facebook = await fetch_raw("facebook_data", run["facebook_data_id"]) or {} - naver_blog = await fetch_raw("naver_blog_data", run["naver_blog_data_id"]) or {} - youtube = await fetch_raw("youtube_data", run["youtube_data_id"]) or {} - gangnam_unni = await fetch_raw("gangnam_unni_data", run["gangnam_unni_data_id"]) or {} + mainpage = raw.get("mainpage", {}) or {} + branding = raw.get("branding", {}) or {} + instagram = raw.get("instagram", {}) or {} + facebook = raw.get("facebook", {}) or {} + youtube = raw.get("youtube", {}) or {} + gangnam_unni = raw.get("gangnam_unni", {}) or {} + naver_blog = raw.get("naver_blog", {}) or {} + instagram_en = raw.get("instagram_en", {}) or {} + facebook_en = raw.get("facebook_en", {}) or {} + tiktok = raw.get("tiktok", {}) or {} + naver_cafe = raw.get("naver_cafe", {}) or {} + brand_assets = branding.get("brandAssets") or {} + channel_logos = branding.get("channelLogos") or {} - snapshot: dict = _build_clinic_snapshot(gangnam_unni, hospital) + snapshot: dict = _build_clinic_snapshot(gangnam_unni, mainpage, brand_assets) yt_patch: dict = await _build_youtube_audit(youtube) # ── instagram (KR·EN 계정을 코드에서 구성 → LLM 출력 무시하고 교체) ────────────── - ig_patch = build_instagram_accounts( - instagram, hospital.get("instagramEn") or {}, hospital.get("channelLogos") or {}, - ) + ig_patch = build_instagram_accounts(instagram, instagram_en, channel_logos) - # ── facebook (KR=facebook_data, EN=hospital.facebookEn 둘 다 코드 산출, [KR, EN] 순서) ── - fb_pages = build_facebook_pages(facebook, hospital.get("facebookEn") or {}) + # ── facebook (KR=raw.facebook, EN=raw.facebook_en 둘 다 코드 산출, [KR, EN] 순서) ── + fb_pages = build_facebook_pages(facebook, facebook_en) # ── KPI dashboard: 7개 mockup 라이프사이클 공식으로 코드가 결정. LLM 출력은 무시. ────── - kpi = build_kpi_dashboard(instagram, facebook, youtube, gangnam_unni, hospital, naver_blog) + # build_kpi_dashboard 의 hospital 인자에 부가 채널 dict 모아서 넘김 (instagramEn/facebookEn/tiktok/naverCafe 키 기대). + kpi_extras = { + "instagramEn": instagram_en, + "facebookEn": facebook_en, + "tiktok": tiktok, + "naverCafe": naver_cafe, + } + kpi = build_kpi_dashboard(instagram, facebook, youtube, gangnam_unni, kpi_extras, naver_blog) overrides: dict = {} if snapshot: @@ -317,12 +314,13 @@ def _deep_merge(base: dict, overrides: dict) -> dict: base[k] = v return base + def _patch_report(result: ReportOutput, overrides: dict) -> ReportOutput: merged = _deep_merge(result.model_dump(), overrides) - # 인스타 계정은 프롬프트에서 LLM이 []로 두게 했고, 코드가 수집 데이터로 채운다 (데이터 없으면 빈 리스트) + # 인스타 계정은 프롬프트에서 LLM 이 [] 로 두게 했고, 코드가 수집 데이터로 채운다 (데이터 없으면 빈 리스트) merged.setdefault("instagram_audit", {})["accounts"] = (overrides.get("instagram_audit") or {}).get("accounts") or [] - # 페북 페이지(KR+EN): _page_patch가 부분 필드만 만들어 그대로 박으면 검증 실패(label/logo 등 누락). - # LLM이 만든 첫 페이지(보통 KR)를 템플릿으로 복사한 뒤 코드 patch로 인덱스별 덮어쓰기 → + # 페북 페이지(KR+EN): _page_patch 가 부분 필드만 만들어 그대로 박으면 검증 실패(label/logo 등 누락). + # LLM 이 만든 첫 페이지(보통 KR)를 템플릿으로 복사한 뒤 코드 patch 로 인덱스별 덮어쓰기 → # 필수 필드는 LLM 디폴트 받고, 수집 수치는 코드 값. EN 누락 버그 회피. fb_pages = (overrides.get("facebook_audit") or {}).get("pages") or [] if fb_pages: @@ -343,13 +341,13 @@ async def run_report_task(analysis_run_id: str) -> None: logger.info("[report] start run=%s", analysis_run_id) result = await generate_report(analysis_run_id) result = _patch_report(result, await _build_overrides(analysis_run_id)) - await save_analysis_report(analysis_run_id, result.model_dump()) + await update_run_report(analysis_run_id, result.model_dump()) logger.info("[report] done run=%s", analysis_run_id) def _patch_plan(result: PlanOutput, logo_desc: str) -> PlanOutput: """brand_guide.channel_branding[].profile_photo 는 LLM 안 맡기고 코드가 박는다 - (모든 채널 동일값 = brand_assets.logo_description). LLM이 fallback 문구 hallucinate 방지.""" + (모든 채널 동일값 = brand_assets.logo_description). LLM 이 fallback 문구 hallucinate 방지.""" p = result.model_dump() for ch in (p.get("brand_guide") or {}).get("channel_branding") or []: ch["profile_photo"] = logo_desc @@ -359,15 +357,10 @@ def _patch_plan(result: PlanOutput, logo_desc: str) -> PlanOutput: async def run_plan_task(analysis_run_id: str) -> None: logger.info("[plan] start run=%s", analysis_run_id) result = await generate_plan(analysis_run_id) - # profile_photo 는 brand_assets.logo_description 으로 코드가 박음 (LLM "(가이드 미보유)" 같은 hallucination 차단) - run = await fetchone("SELECT hospital_id FROM analysis_runs WHERE analysis_run_id = %s", (analysis_run_id,)) - if run: - hr = await fetchone("SELECT raw_data FROM hospital_baseinfo WHERE hospital_id = %s", (run["hospital_id"],)) - h = json.loads(hr["raw_data"]) if hr and isinstance(hr.get("raw_data"), str) else (hr or {}).get("raw_data") or {} - logo_desc = ((h.get("brandAssets") or {}).get("logo_description")) or "" - result = _patch_plan(result, logo_desc) - await execute( - "UPDATE analysis_runs SET plan_data = %s WHERE analysis_run_id = %s", - (json.dumps(result.model_dump(), ensure_ascii=False), analysis_run_id), - ) + # profile_photo 는 brand_assets.logo_description 으로 코드가 박음 (LLM "(가이드 미보유)" 같은 hallucination 차단). + raw = await select_run_raw_data(analysis_run_id) + branding = raw.get("branding") or {} + logo_desc = ((branding.get("brandAssets") or {}).get("logo_description")) or "" + result = _patch_plan(result, logo_desc) + await update_run_plan(analysis_run_id, result.model_dump()) logger.info("[plan] done run=%s", analysis_run_id) diff --git a/app/services/collect.py b/app/services/collect.py index 9887257..8490201 100644 --- a/app/services/collect.py +++ b/app/services/collect.py @@ -1,120 +1,160 @@ import asyncio import logging -from common.db import ( - fetchone, - set_instagram_status, save_instagram_raw_data, - set_facebook_status, save_facebook_raw_data, - set_naver_blog_status, save_naver_blog_raw_data, - set_youtube_status, save_youtube_raw_data, - set_gangnam_unni_status, save_gangnam_unni_raw_data, - execute, save_hospital_raw_data, -) +from common.db.hospital import update_hospital_status, update_hospital +from common.db.source import select_run_sources, update_raw_info_status, update_raw_info from common.utils import get_env, _run_optional_step from integrations.apify import ApifyClient from integrations.naver import NaverClient from integrations.youtube import YouTubeClient from integrations.firecrawl import FirecrawlClient -from services.collect_extras import collect_brand_assets, collect_extra_channels, collect_channel_logos +from models.status import SourceType +from services.collect_extras import collect_brand_assets, collect_channel_logos from services.facebook_audit import transform_for_storage as transform_facebook logger = logging.getLogger(__name__) -async def collect_instagram(analysis_run_id: str, row_id: int, url: str) -> None: +async def collect_instagram(analysis_run_id: str, info_id: int, url: str) -> None: logger.info("[instagram] start run=%s url=%s", analysis_run_id, url) - await set_instagram_status(row_id, "processing") + await update_raw_info_status(info_id, "processing") data = await ApifyClient(get_env("APIFY_API_TOKEN")).get_instagram_profile(url) - await save_instagram_raw_data(row_id, data) + if data is None: + await update_raw_info_status(info_id, "failed") + logger.warning("[instagram] failed run=%s", analysis_run_id) + return + await update_raw_info(info_id, data) logger.info("[instagram] done run=%s", analysis_run_id) -async def collect_facebook(analysis_run_id: str, row_id: int, url: str) -> None: +async def collect_facebook(analysis_run_id: str, info_id: int, url: str) -> None: logger.info("[facebook] start run=%s url=%s", analysis_run_id, url) - await set_facebook_status(row_id, "processing") + await update_raw_info_status(info_id, "processing") data = await ApifyClient(get_env("APIFY_API_TOKEN")).get_facebook_page(url) + if data is None: + await update_raw_info_status(info_id, "failed") + logger.warning("[facebook] failed run=%s", analysis_run_id) + return data = transform_facebook(data) - await save_facebook_raw_data(row_id, data) + await update_raw_info(info_id, data) logger.info("[facebook] done run=%s", analysis_run_id) -async def collect_naver_blog(analysis_run_id: str, row_id: int, url: str) -> None: +async def collect_naver_blog(analysis_run_id: str, info_id: int, url: str) -> None: logger.info("[naver_blog] start run=%s url=%s", analysis_run_id, url) - await set_naver_blog_status(row_id, "processing") + await update_raw_info_status(info_id, "processing") data = await NaverClient(get_env("NAVER_CLIENT_ID"), get_env("NAVER_CLIENT_SECRET")).get_blog_rss(url) - await save_naver_blog_raw_data(row_id, data) + if data is None: + await update_raw_info_status(info_id, "failed") + logger.warning("[naver_blog] failed run=%s", analysis_run_id) + return + await update_raw_info(info_id, data) logger.info("[naver_blog] done run=%s", analysis_run_id) -async def collect_youtube(analysis_run_id: str, row_id: int, url: str) -> None: +async def collect_youtube(analysis_run_id: str, info_id: int, url: str) -> None: logger.info("[youtube] start run=%s url=%s", analysis_run_id, url) - await set_youtube_status(row_id, "processing") + await update_raw_info_status(info_id, "processing") data = await YouTubeClient(get_env("YOUTUBE_API_KEY")).get_channel(url) - await save_youtube_raw_data(row_id, data) + if data is None: + await update_raw_info_status(info_id, "failed") + logger.warning("[youtube] failed run=%s", analysis_run_id) + return + await update_raw_info(info_id, data) logger.info("[youtube] done run=%s", analysis_run_id) -async def collect_gangnam_unni(analysis_run_id: str, row_id: int, url: str) -> None: +async def collect_gangnam_unni(analysis_run_id: str, info_id: int, url: str) -> None: logger.info("[gangnam_unni] start run=%s url=%s", analysis_run_id, url) - await set_gangnam_unni_status(row_id, "processing") + await update_raw_info_status(info_id, "processing") data = await FirecrawlClient(get_env("FIRECRAWL_API_KEY")).get_gangnam_unni(url) - await save_gangnam_unni_raw_data(row_id, data) + if data is None: + await update_raw_info_status(info_id, "failed") + logger.warning("[gangnam_unni] failed run=%s", analysis_run_id) + return + await update_raw_info(info_id, data) logger.info("[gangnam_unni] done run=%s", analysis_run_id) -async def collect_clinic_info(analysis_run_id: str, hospital_id: str, url: str) -> None: - logger.info("[clinic] start run=%s url=%s", analysis_run_id, url) - await execute("UPDATE hospital_baseinfo SET status = 'processing' WHERE hospital_id = %s", (hospital_id,)) +async def collect_mainpage(analysis_run_id: str, info_id: int, hospital_id: str, url: str) -> None: + logger.info("[mainpage] start run=%s url=%s", analysis_run_id, url) + await update_raw_info_status(info_id, "processing") + await update_hospital_status(hospital_id, "processing") data = await FirecrawlClient(get_env("FIRECRAWL_API_KEY")).fetch_clinic_info(url) - await save_hospital_raw_data(hospital_id, data, analysis_run_id=analysis_run_id) - logger.info("[clinic] done run=%s", analysis_run_id) + if data is None: + await update_raw_info_status(info_id, "failed") + logger.warning("[mainpage] failed run=%s", analysis_run_id) + return + # 홈페이지 URL 자체도 raw_data 에 박아둬야 brand_assets / 분석 단계에서 mainpage URL 재조회 없이 사용 가능. + data = {**data, "sourceUrl": url} + await update_raw_info(info_id, data) + await update_hospital(hospital_id, data, analysis_run_id=analysis_run_id) + logger.info("[mainpage] done run=%s", analysis_run_id) -async def collect_all( - analysis_run_id: str, - hospital_id: str, - instagram_id: int | None = None, - facebook_id: int | None = None, - naver_blog_id: int | None = None, - youtube_id: int | None = None, - gangnam_unni_id: int | None = None, - tiktok_url: str | None = None, - instagram_en_url: str | None = None, - facebook_en_url: str | None = None, - kakao_talk_url: str | None = None, - naver_cafe_url: str | None = None, -) -> None: - async def _url(table: str, row_id: int) -> str: - row = await fetchone(f"SELECT url FROM {table} WHERE id = %s", (row_id,)) - return row["url"] if row else "" +async def collect_tiktok(analysis_run_id: str, info_id: int, url: str) -> None: + logger.info("[tiktok] start run=%s url=%s", analysis_run_id, url) + await update_raw_info_status(info_id, "processing") + data = await ApifyClient(get_env("APIFY_API_TOKEN")).get_tiktok_profile(url) + if data is None: + await update_raw_info_status(info_id, "failed") + logger.warning("[tiktok] failed run=%s", analysis_run_id) + return + await update_raw_info(info_id, data) + logger.info("[tiktok] done run=%s", analysis_run_id) - hospital = await fetchone("SELECT url FROM hospital_baseinfo WHERE hospital_id = %s", (hospital_id,)) - tasks = [collect_clinic_info(analysis_run_id, hospital_id, hospital["url"])] - if instagram_id: - tasks.append(collect_instagram(analysis_run_id, instagram_id, await _url("instagram_data", instagram_id))) - if facebook_id: - tasks.append(collect_facebook(analysis_run_id, facebook_id, await _url("facebook_data", facebook_id))) - if naver_blog_id: - tasks.append(collect_naver_blog(analysis_run_id, naver_blog_id, await _url("naver_blog_data", naver_blog_id))) - if youtube_id: - tasks.append(collect_youtube(analysis_run_id, youtube_id, await _url("youtube_data", youtube_id))) - if gangnam_unni_id: - tasks.append(collect_gangnam_unni(analysis_run_id, gangnam_unni_id, await _url("gangnam_unni_data", gangnam_unni_id))) +async def collect_naver_cafe(analysis_run_id: str, info_id: int, url: str) -> None: + """카페는 로그인 필요라 본문 못 봄. URL 활성·cafeId·이름 언급수만 신호로 수집.""" + logger.info("[naver_cafe] start run=%s url=%s", analysis_run_id, url) + await update_raw_info_status(info_id, "processing") + data = await NaverClient(get_env("NAVER_CLIENT_ID"), get_env("NAVER_CLIENT_SECRET")).get_cafe_info(url) + if data is None: + await update_raw_info_status(info_id, "failed") + logger.warning("[naver_cafe] failed run=%s", analysis_run_id) + return + await update_raw_info(info_id, data) + logger.info("[naver_cafe] done run=%s", analysis_run_id) + + +async def collect_kakaotalk(analysis_run_id: str, info_id: int, url: str) -> None: + """카카오톡은 수집 X — URL 보관만. LLM이 채널 존재 신호로만 사용.""" + logger.info("[kakaotalk] url-only run=%s url=%s", analysis_run_id, url) + await update_raw_info(info_id, {"url": url}) + + +async def collect_all(analysis_run_id: str, hospital_id: str) -> None: + rows = await select_run_sources(analysis_run_id) + + # source_type → collector. KR/EN 구분은 collector 입장에서 동일, language 컬럼만 다름. + _collectors = { + SourceType.INSTAGRAM: collect_instagram, + SourceType.FACEBOOK: collect_facebook, + SourceType.NAVER_BLOG: collect_naver_blog, + SourceType.YOUTUBE: collect_youtube, + SourceType.GANGNAM_UNNI: collect_gangnam_unni, + SourceType.TIKTOK: collect_tiktok, + SourceType.NAVER_CAFE: collect_naver_cafe, + SourceType.KAKAOTALK: collect_kakaotalk, + } + + tasks = [] + branding_info_id: int | None = None + for row in rows: + info_id = row["info_id"] + source_type = row["source_type"] + url = row["url"] + if source_type == SourceType.BRANDING: + branding_info_id = info_id # mainpage·채널 수집 끝난 뒤 2단계에서 사용 + continue + if source_type == SourceType.MAINPAGE: + tasks.append(collect_mainpage(analysis_run_id, info_id, hospital_id, url)) + elif source_type in _collectors: + tasks.append(_collectors[source_type](analysis_run_id, info_id, url)) await asyncio.gather(*tasks, return_exceptions=True) - # 아래 3단계는 모두 hospital raw_data를 read-modify-write 하므로 race 방지 위해 순차. - # brand_assets : clinic_info가 채운 branding.logoUrl로 공식 로고/hex 추출 - # extra_channels: 틱톡/인스타EN/페북EN 수집 - # channel_logos : 공식 로고(brand_assets)+채널 profileImage(extra_channels) 채워진 뒤 Vision 비교 - # 부가 기능이라 실패해도 리포트는 나와야 하므로 _run_optional_step으로 각각 격리. - await _run_optional_step(collect_brand_assets(analysis_run_id, hospital_id), "brand_assets") - await _run_optional_step( - collect_extra_channels( - analysis_run_id, hospital_id, - tiktok_url=tiktok_url, instagram_en_url=instagram_en_url, facebook_en_url=facebook_en_url, - kakao_talk_url=kakao_talk_url, naver_cafe_url=naver_cafe_url, - ), - "extra_channels", - ) - await _run_optional_step(collect_channel_logos(analysis_run_id, hospital_id), "channel_logos") + # 2단계: branding (brandAssets → channelLogos 한 raw_info 안에 머지). mainpage·채널 raw_data 의존이라 순차. + # 부가 기능이라 실패해도 리포트는 나와야 하므로 _run_optional_step 으로 격리. + if branding_info_id is not None: + await _run_optional_step(collect_brand_assets(analysis_run_id, branding_info_id), "brand_assets") + await _run_optional_step(collect_channel_logos(analysis_run_id, branding_info_id), "channel_logos") diff --git a/app/services/collect_extras.py b/app/services/collect_extras.py index de885cf..3cec4d6 100644 --- a/app/services/collect_extras.py +++ b/app/services/collect_extras.py @@ -1,37 +1,25 @@ -import asyncio -import json import logging import os from urllib.parse import urlparse -from common.db import fetchone, fetch_raw, merge_hospital_raw_data -from common.utils import get_env -from integrations.apify import ApifyClient +from common.db.source import select_run_raw_data, update_raw_info_merge from integrations.vision import VisionClient -from integrations.naver import NaverClient from integrations.color_extractor import extract_brand_assets_from_site -from services.facebook_audit import transform_for_storage as transform_facebook logger = logging.getLogger(__name__) -async def collect_brand_assets(analysis_run_id: str, hospital_id: str) -> None: - """홈페이지에서 로고 URL + brand hex 색상을 뽑아 raw_data["brandAssets"]에 저장. +async def collect_brand_assets(analysis_run_id: str, info_id: int) -> None: + """홈페이지에서 로고 URL + brand hex 색상 추출 → branding raw_info["brandAssets"] 머지. - 로고 URL/hex: HTML·CSS 정규식 (color_extractor) — Vision 의존 X, 사이트 전체 컬러 시스템이 더 정확. - 로고 정성 묘사(심볼/워드마크/톤): Gemini Vision (GEMINI_API_KEY 없으면 색상만 저장하고 skip). """ - logger.info("[brand_assets] start run=%s", analysis_run_id) - row = await fetchone( - "SELECT raw_data, url FROM hospital_baseinfo WHERE hospital_id = %s", - (hospital_id,), - ) - if not row: - return - raw = row["raw_data"] - raw_data = json.loads(raw) if isinstance(raw, str) else (raw or {}) - branding = raw_data.get("branding") or {} - homepage_url = row["url"] + logger.info("[brand_assets] start run=%s info=%s", analysis_run_id, info_id) + raw = await select_run_raw_data(analysis_run_id) + mainpage = raw.get("mainpage") or {} + homepage_url = mainpage.get("sourceUrl") or "" + branding = mainpage.get("branding") or {} - # 0~1. 사이트 1회 fetch로 logo URL + brand hex 동시 추출 (img/background-image/CSS .logo, Vision 의존 X) + # 0~1. 사이트 1회 fetch 로 logo URL + brand hex 동시 추출 (img/background-image/CSS .logo, Vision 의존 X) site = await extract_brand_assets_from_site(homepage_url) if homepage_url else {} html_logo_url = site.get("logo_url") css_colors = site.get("colors") or {} @@ -43,7 +31,7 @@ async def collect_brand_assets(analysis_run_id: str, hospital_id: str) -> None: # 2. 로고/대표 이미지 후보 (logo → og:image → favicon 순) logo_url = html_logo_url or branding.get("logoUrl") og_image = branding.get("ogImage") - favicon = branding.get("faviconUrl") + favicon = branding.get("faviconUrl") candidates: list[tuple[str, str]] = [] if logo_url: candidates.append(("logo", logo_url)) if og_image: candidates.append(("og", og_image)) @@ -57,8 +45,8 @@ async def collect_brand_assets(analysis_run_id: str, hospital_id: str) -> None: logger.info("[brand_assets] skip — no logo/og/favicon candidates and no CSS colors") return - # 3. Vision은 로고 정성 묘사만 (hex는 CSS 추출이 더 정확). 키 없으면 색상만 저장. - # SVG는 vision 내부에서 resvg로 PNG 래스터화 후 Gemini에 던지므로 분기 불필요. + # 3. Vision 은 로고 정성 묘사만 (hex 는 CSS 추출이 더 정확). 키 없으면 색상만 저장. + # SVG 는 vision 내부에서 resvg 로 PNG 래스터화 후 Gemini 에 던지므로 분기 불필요. result: dict = {} used_kind: str | None = None api_key = os.getenv("GEMINI_API_KEY") @@ -69,123 +57,64 @@ async def collect_brand_assets(analysis_run_id: str, hospital_id: str) -> None: if result: used_kind = kind break - # favicon으로만 분석된 경우 진짜 로고가 아니므로 logo URL은 박지 않음 (묘사는 OK) + # favicon 으로만 분석된 경우 진짜 로고가 아니므로 logo URL 은 박지 않음 (묘사는 OK) if result and used_kind == "favicon" and result.get("logo_images"): result["logo_images"] = {"circle": None, "horizontal": None, "korean": None} elif not api_key: logger.info("[brand_assets] GEMINI_API_KEY not set — 색상만 저장, Vision 묘사 skip") - # 4. CSS에서 추출한 brand_colors/palette를 Vision보다 우선 사용 + # 4. CSS 에서 추출한 brand_colors/palette 를 Vision 보다 우선 사용 if css_colors: - if css_colors.get("brand_colors"): - result["brand_colors"] = css_colors["brand_colors"] - if css_colors.get("color_palette"): - result["color_palette"] = css_colors["color_palette"] + if css_colors.get("brand_colors"): result["brand_colors"] = css_colors["brand_colors"] + if css_colors.get("color_palette"): result["color_palette"] = css_colors["color_palette"] result["color_source"] = "html+css" elif result: result["color_source"] = "vision" if result: result["logo_source"] = used_kind or "none" - await merge_hospital_raw_data(hospital_id, {"brandAssets": result}) + await update_raw_info_merge(info_id, {"brandAssets": result}) logger.info("[brand_assets] done keys=%s", list(result.keys()) if result else None) -async def collect_extra_channels( - analysis_run_id: str, - hospital_id: str, - tiktok_url: str | None = None, - instagram_en_url: str | None = None, - facebook_en_url: str | None = None, - kakao_talk_url: str | None = None, - naver_cafe_url: str | None = None, -) -> None: - """틱톡 / 인스타 EN / 페북 EN / 네이버 카페 수집 + 카카오톡 URL만 보관 → - 모두 hospital raw_data에 저장. 인스타EN·페북EN은 기존 Apify 수집기 재사용, 틱톡은 신규 액터. - 네이버 카페는 로그인 필요라 본문 못 보지만 URL 활성·cafeId·이름 언급수만 신호로 수집. - 카카오톡은 URL만 (LLM이 채널 존재 신호로만 사용).""" - apify = ApifyClient(get_env("APIFY_API_TOKEN")) - jobs: dict = {} - if instagram_en_url: - jobs["instagramEn"] = apify.get_instagram_profile(instagram_en_url) - if facebook_en_url: - jobs["facebookEn"] = apify.get_facebook_page(facebook_en_url) - if tiktok_url: - jobs["tiktok"] = apify.get_tiktok_profile(tiktok_url) - if naver_cafe_url: - nc = NaverClient(get_env("NAVER_CLIENT_ID"), get_env("NAVER_CLIENT_SECRET")) - jobs["naverCafe"] = nc.get_cafe_info(naver_cafe_url) - - results: dict = {} - if jobs: - logger.info("[extra_channels] start run=%s channels=%s", analysis_run_id, list(jobs)) - done = await asyncio.gather(*jobs.values(), return_exceptions=True) - for key, res in zip(jobs.keys(), done): - if isinstance(res, Exception): - logger.warning("[extra_channels] %s 수집 실패: %s", key, res) - elif res: - if key == "facebookEn": - res = transform_facebook(res) - results[key] = res - - # URL-only 채널 (수집 X, 존재 여부만) - if kakao_talk_url: - results["kakaoTalk"] = {"url": kakao_talk_url} - - if not results: - logger.info("[extra_channels] 수집 결과 없음 run=%s", analysis_run_id) - return - - await merge_hospital_raw_data(hospital_id, results) - logger.info("[extra_channels] done run=%s keys=%s", analysis_run_id, list(results)) - - -async def collect_channel_logos(analysis_run_id: str, hospital_id: str) -> None: - """채널별 프로필 이미지(로고)를 모아 Gemini Vision으로 설명 + 공식 로고 일치 여부 평가. - → hospital raw_data["channelLogos"]에 저장. GEMINI_API_KEY 없으면 skip. - brand_assets(공식 로고)·extra_channels(틱톡/EN profileImage) 다음에 실행돼야 함.""" +async def collect_channel_logos(analysis_run_id: str, info_id: int) -> None: + """채널별 프로필 이미지(로고)를 모아 Gemini Vision 으로 설명 + 공식 로고 일치 여부 평가. + → branding raw_info["channelLogos"] 머지. GEMINI_API_KEY 없으면 skip. + brand_assets(공식 로고) · 채널 raw_info(profileImage) 가 채워진 뒤 실행돼야 함.""" api_key = os.getenv("GEMINI_API_KEY") if not api_key: logger.info("[channel_logos] skip — GEMINI_API_KEY 없음") return - hrow = await fetchone("SELECT raw_data FROM hospital_baseinfo WHERE hospital_id = %s", (hospital_id,)) - raw = hrow["raw_data"] if hrow else None - raw_data = json.loads(raw) if isinstance(raw, str) else (raw or {}) - official = ((raw_data.get("brandAssets") or {}).get("logo_images") or {}).get("horizontal") + raw = await select_run_raw_data(analysis_run_id) + branding = raw.get("branding") or {} + official = ((branding.get("brandAssets") or {}).get("logo_images") or {}).get("horizontal") - run = await fetchone( - "SELECT instagram_data_id, facebook_data_id, youtube_data_id" - " FROM analysis_runs WHERE analysis_run_id = %s", - (analysis_run_id,), - ) + # KR 메인 채널 + EN/TikTok 부가 채널 profileImage 수집 (raw_info dict 키: instagram, instagram_en, ...) + _label = { + "instagram": "Instagram", + "facebook": "Facebook", + "youtube": "YouTube", + "instagram_en": "Instagram EN", + "facebook_en": "Facebook EN", + "tiktok": "TikTok", + } logos: list[dict] = [] - # 전용 테이블 채널 (KR) - for ch, table, col in [ - ("Instagram", "instagram_data", "instagram_data_id"), - ("Facebook", "facebook_data", "facebook_data_id"), - ("YouTube", "youtube_data", "youtube_data_id"), - ]: - rid = (run or {}).get(col) - if rid: - d = await fetch_raw(table, rid) or {} - if d.get("profileImage"): - logos.append({"channel": ch, "url": d["profileImage"]}) - # 추가 채널 (hospital raw_data) - for ch, key in [("Instagram EN", "instagramEn"), ("Facebook EN", "facebookEn"), ("TikTok", "tiktok")]: - img = (raw_data.get(key) or {}).get("profileImage") + for key, label in _label.items(): + img = (raw.get(key) or {}).get("profileImage") if img: - logos.append({"channel": ch, "url": img}) + logos.append({"channel": label, "url": img}) if not logos: logger.info("[channel_logos] skip — 채널 프로필 이미지 없음") return - logger.info("[channel_logos] start run=%s channels=%s official=%s", analysis_run_id, - [l["channel"] for l in logos], bool(official)) + logger.info("[channel_logos] start run=%s channels=%s official=%s", + analysis_run_id, [l["channel"] for l in logos], bool(official)) result = await VisionClient(api_key).describe_channel_logos(official, logos) if result: - # Vision이 못 본 채널도 url은 채워둠 (프론트에서 이미지 표시용) + # Vision 이 못 본 채널도 url 은 채워둠 (프론트에서 이미지 표시용) result["logos"] = logos - await merge_hospital_raw_data(hospital_id, {"channelLogos": result}) - logger.info("[channel_logos] done run=%s keys=%s", analysis_run_id, list(result.keys()) if result else None) + await update_raw_info_merge(info_id, {"channelLogos": result}) + logger.info("[channel_logos] done run=%s keys=%s", + analysis_run_id, list(result.keys()) if result else None) diff --git a/app/services/file.py b/app/services/file_data.py similarity index 80% rename from app/services/file.py rename to app/services/file_data.py index 339cc99..8e168c7 100644 --- a/app/services/file.py +++ b/app/services/file_data.py @@ -2,7 +2,8 @@ import logging from fastapi import HTTPException, UploadFile -from common.db import execute, fetchall, fetchone, insert_file_row +from common.db.run import select_run +from common.db.file_data import insert_file, select_run_files, select_file, delete_file from integrations.azure_blob import AzureBlobUploader from models.file import FileListItem, FileType, FileUploadResponse @@ -31,10 +32,7 @@ async def upload_analysis_file( content_type: str | None = None, ) -> tuple[int, str]: """analysis_run에 딸린 파일 업로드. Blob 업로드 + file_data row 생성. (file_id, url) 반환.""" - run = await fetchone( - "SELECT hospital_id FROM analysis_runs WHERE analysis_run_id = %s", - (analysis_run_id,), - ) + run = await select_run(analysis_run_id) if not run: raise HTTPException(status_code=404, detail="analysis_run not found") hospital_id = run["hospital_id"] @@ -47,7 +45,7 @@ async def upload_analysis_file( content_type=content_type, ) - file_id = await insert_file_row( + file_id = await insert_file( analysis_run_id=analysis_run_id, hospital_id=hospital_id, file_type=file_type, @@ -61,12 +59,7 @@ async def upload_analysis_file( async def list_analysis_files(analysis_run_id: str) -> list[dict]: """analysis_run에 딸린 (삭제 안 된) 파일 목록.""" - return await fetchall( - "SELECT id, file_type, file_name, file_url, size_bytes, created_at FROM file_data" - " WHERE analysis_run_id = %s AND is_deleted = FALSE" - " ORDER BY created_at DESC", - (analysis_run_id,), - ) + return await select_run_files(analysis_run_id) async def handle_analysis_file_upload( @@ -102,7 +95,7 @@ async def handle_analysis_file_upload( async def get_analysis_files_response(analysis_run_id: str) -> list[FileListItem]: """run 존재 확인 + 응답 모델 생성.""" - if not await fetchone("SELECT 1 FROM analysis_runs WHERE analysis_run_id = %s", (analysis_run_id,)): + if not await select_run(analysis_run_id): raise HTTPException(status_code=404, detail="analysis_run not found") rows = await list_analysis_files(analysis_run_id) return [FileListItem(**{**r, "created_at": str(r["created_at"])}) for r in rows] @@ -110,14 +103,8 @@ async def get_analysis_files_response(analysis_run_id: str) -> list[FileListItem async def soft_delete_analysis_file(analysis_run_id: str, file_id: int) -> None: """analysis_run에 딸린 파일을 소프트 삭제. 멱등성 보장.""" - row = await fetchone( - "SELECT id FROM file_data WHERE id = %s AND analysis_run_id = %s", - (file_id, analysis_run_id), - ) + row = await select_file(file_id, analysis_run_id) if not row: raise HTTPException(status_code=404, detail="file not found") - await execute( - "UPDATE file_data SET is_deleted = TRUE WHERE id = %s AND is_deleted = FALSE", - (file_id,), - ) + await delete_file(file_id) logger.info("soft-deleted analysis file run=%s file_id=%s", analysis_run_id, file_id) diff --git a/app/services/market.py b/app/services/market.py index 8cfa193..421e901 100644 --- a/app/services/market.py +++ b/app/services/market.py @@ -1,7 +1,9 @@ import asyncio -import json import logging -from common.db import fetchone, execute +from common.db.run import select_run +from common.db.hospital import select_hospital +from common.db.market import upsert_market_status, upsert_market_result +from common.db.source import select_run_raw_data from integrations.llm.llm_service import LLMService from integrations.llm.prompt import ( market_competitors_prompt, @@ -18,49 +20,27 @@ _TYPES = ["competitors", "keywords", "trend", "target_audience"] async def _save(analysis_run_id: str, analysis_type: str, result, exc: Exception | None) -> None: if exc: logger.warning("[market] %s failed run=%s: %s", analysis_type, analysis_run_id, exc) - await execute( - "INSERT INTO market_analysis (analysis_run_id, analysis_type, status)" - " VALUES (%s, %s, 'failed')" - " ON DUPLICATE KEY UPDATE status = 'failed'", - (analysis_run_id, analysis_type), - ) + await upsert_market_status(analysis_run_id, analysis_type, "failed") else: - await execute( - "INSERT INTO market_analysis (analysis_run_id, analysis_type, status, data)" - " VALUES (%s, %s, 'done', %s)" - " ON DUPLICATE KEY UPDATE status = 'done', data = VALUES(data)", - (analysis_run_id, analysis_type, json.dumps(result.model_dump(), ensure_ascii=False)), - ) + await upsert_market_result(analysis_run_id, analysis_type, result.model_dump()) async def run_market_analysis(analysis_run_id: str) -> None: logger.info("[market] start run=%s", analysis_run_id) - run = await fetchone( - "SELECT hospital_id FROM analysis_runs WHERE analysis_run_id = %s", - (analysis_run_id,), - ) - clinic = await fetchone( - "SELECT hospital_name, road_address, raw_data FROM hospital_baseinfo WHERE hospital_id = %s", - (run["hospital_id"],), - ) + run = await select_run(analysis_run_id) + clinic = await select_hospital(run["hospital_id"]) + raw = await select_run_raw_data(analysis_run_id) + mainpage = raw.get("mainpage") or {} - raw_data = clinic["raw_data"] - clinic_data = json.loads(raw_data) if isinstance(raw_data, str) else (raw_data or {}) - - clinic_name = clinic["hospital_name"] or "" - address = clinic["road_address"] or "" - services = clinic_data.get("services", []) + clinic_name = (clinic or {}).get("hospital_name") or "" + address = (clinic or {}).get("road_address") or "" + services = mainpage.get("services", []) services_str = ", ".join(services[:3]) primary_service = services[0] if services else "" for analysis_type in _TYPES: - await execute( - "INSERT INTO market_analysis (analysis_run_id, analysis_type, status)" - " VALUES (%s, %s, 'processing')" - " ON DUPLICATE KEY UPDATE status = 'processing'", - (analysis_run_id, analysis_type), - ) + await upsert_market_status(analysis_run_id, analysis_type, "processing") llm = LLMService(provider="perplexity") results = await asyncio.gather( diff --git a/app/services/pipeline.py b/app/services/pipeline.py index f77b328..c441723 100644 --- a/app/services/pipeline.py +++ b/app/services/pipeline.py @@ -1,5 +1,5 @@ import logging -from common.db import fetchone, execute +from common.db.run import select_run, update_run_status from models.status import AnalysisStatus from services.collect import collect_all from services.market import run_market_analysis @@ -8,51 +8,23 @@ from services.analysis import run_report_task, run_plan_task logger = logging.getLogger(__name__) -async def run_pipeline(analysis_run_id: str, extra_channels: dict | None = None) -> None: +async def run_pipeline(analysis_run_id: str) -> None: logger.info("[pipeline] start run=%s", analysis_run_id) - extra_channels = extra_channels or {} # ── 1. Collect ────────────────────────────────────────────────────────── - run = await fetchone( - "SELECT hospital_id, instagram_data_id, facebook_data_id," - " naver_blog_data_id, youtube_data_id, gangnam_unni_data_id" - " FROM analysis_runs WHERE analysis_run_id = %s", - (analysis_run_id,), - ) - await collect_all( - analysis_run_id, - hospital_id=run["hospital_id"], - instagram_id=run["instagram_data_id"], - facebook_id=run["facebook_data_id"], - naver_blog_id=run["naver_blog_data_id"], - youtube_id=run["youtube_data_id"], - gangnam_unni_id=run["gangnam_unni_data_id"], - tiktok_url=extra_channels.get("tiktok"), - instagram_en_url=extra_channels.get("instagram_en"), - facebook_en_url=extra_channels.get("facebook_en"), - kakao_talk_url=extra_channels.get("kakao_talk"), - naver_cafe_url=extra_channels.get("naver_cafe"), - ) + run = await select_run(analysis_run_id) + await collect_all(analysis_run_id, hospital_id=run["hospital_id"]) # ── 2. Market ──────────────────────────────────────────────────────────── - await execute( - "UPDATE analysis_runs SET status = %s WHERE analysis_run_id = %s", - (AnalysisStatus.ANALYZING, analysis_run_id), - ) + await update_run_status(analysis_run_id, AnalysisStatus.ANALYZING) await run_market_analysis(analysis_run_id) # ── 3. Report ──────────────────────────────────────────────────────────── await run_report_task(analysis_run_id) # ── 4. Plan ────────────────────────────────────────────────────────────── - await execute( - "UPDATE analysis_runs SET status = %s WHERE analysis_run_id = %s", - (AnalysisStatus.PLANNING, analysis_run_id), - ) + await update_run_status(analysis_run_id, AnalysisStatus.PLANNING) await run_plan_task(analysis_run_id) - await execute( - "UPDATE analysis_runs SET status = %s WHERE analysis_run_id = %s", - (AnalysisStatus.COMPLETED, analysis_run_id), - ) + await update_run_status(analysis_run_id, AnalysisStatus.COMPLETED) logger.info("[pipeline] done run=%s", analysis_run_id)