Compare commits

...

8 Commits

Author SHA1 Message Date
Mina Choi af61713697 refactor(branding): collect/report 단계 분리 + Vision logo hex 추가
- integrations/color_extractor → integrations/site_fetcher (HTTP) + services/brand_parser (파싱) 분리
- integrations/vision → integrations/llm/gemini_vision 이동
- services/collect_extras → services/collect.collect_brand_basics (collect) + services/branding (report) 분리
- Vision prompt 에 logo_colors_hex 5개 강제 + 길이 fallback (4·6개 들어와도 5개로 정규화)
- branding 단계: HTML parser canonical logo URL 을 Vision 에 1순위 전달
  → firecrawl 가 잘못된 이미지 (마케팅 배너 등) 를 logo 로 잡는 케이스 회피
- select_run 에서 큰 JSON 컬럼 (report_data/plan_data) 빼서 meta only
  → generate_plan 만 select_run_report_data 별도 조회. 4군데 호출자는 가벼워짐

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-06-02 11:08:38 +09:00
Mina Choi b844951ad8 refactor(branding): logo URL 을 raw_info.logo_url 컬럼으로 분리
- collect_brand_assets: Vision 결과의 logo_images 를 JSON 에서 제거하고
  진짜 로고(logo/og 매칭) 인 경우만 raw_info.logo_url 컬럼에 저장.
  favicon-only 매칭은 컬럼 저장 X (옛 logic 동일).
- analysis._build_overrides: select_branding_logo_url 로 컬럼 읽어
  ClinicSnapshot.logo_images 를 horizontal=logo_url 로 재구성.
- branding raw_data 가 "사실 데이터(URL/hex)" vs "Vision 분석 텍스트(묘사)"
  섞이던 문제 일부 해소 — URL 은 컬럼, 텍스트만 JSON 에 잔존.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-06-01 17:01:19 +09:00
Mina Choi 009d95377a Merge 'b6a0134 db 스키마 변경' on top of db-migration
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-06-01 16:57:12 +09:00
Mina Choi c23e620fb4 Merge branch 'db-migration': remote_source + raw_info 통합 스키마
- common/db.py 단일 파일 → common/db/ 패키지로 분리 (hospital/source/run/market/file_data)
- 모든 채널 데이터를 raw_info 단일 테이블로 통일 (hospital_baseinfo.raw_data / 채널별 *_data 테이블 제거)
- 부가 채널(tiktok/instagram_en/facebook_en/kakaotalk/naver_cafe)도 remote_source+raw_info 로 일원화
  - EN 채널은 같은 source_type + language='EN' 으로 구분, select_run_raw_data 가 합성키로 반환
- SourceType.BRANDING 추가 — brand_assets/channel_logos 결과를 하나의 raw_info entry 에 머지
- collect.collect_all: main wave gather → branding 2단계 순차 실행
- mock_urls 매칭 + _with_scheme 보정 유지

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-06-01 16:57:08 +09:00
jaehwang b6a0134ba7 db 스키마 변경 2026-06-01 16:51:30 +09:00
jaehwang 3b4c154fb2 db migration done 2026-06-01 15:31:33 +09:00
jaehwang c9c5ee9177 Merge branch 'main' into db-migration 2026-05-29 16:31:47 +09:00
jaehwang ab215395c6 db ready 2026-05-28 13:13:30 +09:00
26 changed files with 1217 additions and 1248 deletions

View File

@ -1,85 +1,4 @@
-- 테이블 순서는 관계를 고려하여 한 번에 실행해도 에러가 발생하지 않게 정렬되었습니다. -- user_info
-- instagram_data Table Create SQL
-- 테이블 생성 SQL - instagram_data
CREATE TABLE instagram_data
(
`id` INT NOT NULL AUTO_INCREMENT,
`hospital_id` CHAR(36) NOT NULL,
`url` VARCHAR(500) NOT NULL,
`status` VARCHAR(20) NOT NULL DEFAULT 'start',
`raw_data` JSON NULL,
`created_at` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (id)
);
-- Index 설정 SQL - instagram_data(hospital_id)
CREATE INDEX IX_instagram_data_1
ON instagram_data(hospital_id);
-- facebook_data Table Create SQL
-- 테이블 생성 SQL - facebook_data
CREATE TABLE facebook_data
(
`id` INT NOT NULL AUTO_INCREMENT,
`hospital_id` CHAR(36) NOT NULL,
`url` VARCHAR(500) NOT NULL,
`status` VARCHAR(20) NOT NULL DEFAULT 'start',
`raw_data` JSON NULL,
`created_at` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (id)
);
-- Index 설정 SQL - facebook_data(hospital_id)
CREATE INDEX IX_facebook_data_1
ON facebook_data(hospital_id);
-- naver_blog_data Table Create SQL
-- 테이블 생성 SQL - naver_blog_data
CREATE TABLE naver_blog_data
(
`id` INT NOT NULL AUTO_INCREMENT,
`hospital_id` CHAR(36) NOT NULL,
`url` VARCHAR(500) NOT NULL,
`status` VARCHAR(20) NOT NULL DEFAULT 'start',
`raw_data` JSON NULL,
`created_at` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (id)
);
-- Index 설정 SQL - naver_blog_data(hospital_id)
CREATE INDEX IX_naver_blog_data_1
ON naver_blog_data(hospital_id);
-- hospital_baseinfo Table Create SQL
-- 테이블 생성 SQL - hospital_baseinfo
CREATE TABLE hospital_baseinfo
(
`hospital_id` CHAR(36) NOT NULL,
`owner_user_id` INT NOT NULL,
`hospital_name` VARCHAR(50) NOT NULL,
`hospital_name_en` VARCHAR(50) NULL,
`brn` VARCHAR(50) NOT NULL,
`road_address` VARCHAR(100) NULL,
`site_address` VARCHAR(100) NULL,
`url` VARCHAR(500) NULL,
`status` VARCHAR(20) NOT NULL DEFAULT 'start',
`raw_data` JSON NULL,
`created_at` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
`updated_at` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY (hospital_id)
);
-- Index 설정 SQL - hospital_baseinfo(owner_user_id)
CREATE INDEX IX_hospital_baseinfo_1
ON hospital_baseinfo(owner_user_id);
-- user_info Table Create SQL
-- 테이블 생성 SQL - user_info
CREATE TABLE user_info CREATE TABLE user_info
( (
`user_id` INT NOT NULL AUTO_INCREMENT, `user_id` INT NOT NULL AUTO_INCREMENT,
@ -90,52 +9,49 @@ CREATE TABLE user_info
PRIMARY KEY (user_id) PRIMARY KEY (user_id)
); );
-- youtube_data Table Create SQL
CREATE TABLE youtube_data -- hospital_baseinfo
CREATE TABLE hospital_baseinfo
( (
`id` INT NOT NULL AUTO_INCREMENT,
`hospital_id` CHAR(36) NOT NULL, `hospital_id` CHAR(36) NOT NULL,
`url` VARCHAR(500) NOT NULL, `owner_user_id` INT NOT NULL,
`hospital_name` VARCHAR(50) NOT NULL,
`hospital_name_en` VARCHAR(50) NULL,
`brn` VARCHAR(50) NOT NULL,
`road_address` VARCHAR(100) NULL,
`site_address` VARCHAR(100) NULL,
`status` VARCHAR(20) NOT NULL DEFAULT 'start', `status` VARCHAR(20) NOT NULL DEFAULT 'start',
`raw_data` JSON NULL,
`created_at` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, `created_at` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (id) `updated_at` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY (hospital_id)
); );
-- Index 설정 SQL - youtube_data(hospital_id) CREATE INDEX IX_hospital_baseinfo_1 ON hospital_baseinfo (owner_user_id);
CREATE INDEX IX_youtube_data_1
ON youtube_data(hospital_id);
-- gangnam_unni_data Table Create SQL -- remote_source: 병원별 채널 소스 정보 (instagram/facebook/naver_blog/youtube/gangnam_unni 등)
CREATE TABLE gangnam_unni_data CREATE TABLE remote_source
( (
`id` INT NOT NULL AUTO_INCREMENT, `source_id` INT NOT NULL AUTO_INCREMENT,
`hospital_id` CHAR(36) NOT NULL, `hospital_id` CHAR(36) NOT NULL,
`source_type` VARCHAR(50) NOT NULL,
`language` CHAR(2) NULL,
`url` VARCHAR(500) NOT NULL, `url` VARCHAR(500) NOT NULL,
`status` VARCHAR(20) NOT NULL DEFAULT 'start',
`raw_data` JSON NULL,
`created_at` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, `created_at` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (id) PRIMARY KEY (source_id)
); );
-- Index 설정 SQL - gangnam_unni_data(hospital_id) CREATE INDEX IX_remote_source_1 ON remote_source (hospital_id);
CREATE INDEX IX_gangnam_unni_data_1 CREATE INDEX IX_remote_source_2 ON remote_source (hospital_id, source_type);
ON gangnam_unni_data(hospital_id);
-- analysis_runs Table Create SQL -- analysis_runs
CREATE TABLE analysis_runs CREATE TABLE analysis_runs
( (
`analysis_run_id` CHAR(36) NOT NULL, `analysis_run_id` CHAR(36) NOT NULL,
`hospital_id` CHAR(36) NOT NULL, `hospital_id` CHAR(36) NOT NULL,
`owner_user_id` INT NOT NULL DEFAULT 0, `owner_user_id` INT NOT NULL DEFAULT 0,
`status` VARCHAR(50) NOT NULL DEFAULT 'discovering', `status` VARCHAR(50) NOT NULL DEFAULT 'discovering',
`instagram_data_id` INT NULL,
`facebook_data_id` INT NULL,
`naver_blog_data_id` INT NULL,
`youtube_data_id` INT NULL,
`gangnam_unni_data_id` INT NULL,
`report_data` JSON NULL, `report_data` JSON NULL,
`plan_data` JSON NULL, `plan_data` JSON NULL,
`created_at` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, `created_at` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
@ -143,16 +59,30 @@ CREATE TABLE analysis_runs
PRIMARY KEY (analysis_run_id) PRIMARY KEY (analysis_run_id)
); );
-- Index 설정 SQL - analysis_runs(hospital_id) CREATE INDEX IX_analysis_runs_1 ON analysis_runs (hospital_id);
CREATE INDEX IX_analysis_runs_1 CREATE INDEX IX_analysis_runs_2 ON analysis_runs (owner_user_id);
ON analysis_runs(hospital_id);
-- Index 설정 SQL - analysis_runs(owner_user_id)
CREATE INDEX IX_analysis_runs_2
ON analysis_runs(owner_user_id);
-- file_data Table Create SQL -- raw_info: 분석 실행별 수집 원시 데이터
CREATE TABLE raw_info
(
`info_id` INT NOT NULL AUTO_INCREMENT,
`source_id` INT NOT NULL,
`analysis_run_id` CHAR(36) NOT NULL,
`data_tag` VARCHAR(50) NOT NULL DEFAULT 'default',
`status` VARCHAR(20) NOT NULL DEFAULT 'start',
`raw_data` JSON NULL,
`logo_url` VARCHAR(500) NULL,
`created_at` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
`updated_at` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY (info_id)
);
CREATE INDEX IX_raw_info_1 ON raw_info (analysis_run_id);
CREATE INDEX IX_raw_info_2 ON raw_info (source_id);
-- file_data
CREATE TABLE file_data CREATE TABLE file_data
( (
`id` INT NOT NULL AUTO_INCREMENT, `id` INT NOT NULL AUTO_INCREMENT,
@ -169,7 +99,7 @@ CREATE TABLE file_data
); );
-- hospital_history Table Create SQL -- hospital_history
CREATE TABLE hospital_history CREATE TABLE hospital_history
( (
`id` INT NOT NULL AUTO_INCREMENT, `id` INT NOT NULL AUTO_INCREMENT,
@ -180,24 +110,17 @@ CREATE TABLE hospital_history
`brn` VARCHAR(50) NOT NULL, `brn` VARCHAR(50) NOT NULL,
`road_address` VARCHAR(100) NULL, `road_address` VARCHAR(100) NULL,
`site_address` VARCHAR(100) NULL, `site_address` VARCHAR(100) NULL,
`url` VARCHAR(500) NULL,
`status` VARCHAR(20) NOT NULL, `status` VARCHAR(20) NOT NULL,
`raw_data` JSON NULL,
`analysis_run_id` CHAR(36) NULL, `analysis_run_id` CHAR(36) NULL,
`created_at` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, `created_at` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (id) PRIMARY KEY (id)
); );
-- Index 설정 SQL - hospital_history(hospital_id) CREATE INDEX IX_hospital_history_1 ON hospital_history (hospital_id);
CREATE INDEX IX_hospital_history_1 CREATE INDEX IX_hospital_history_2 ON hospital_history (analysis_run_id);
ON hospital_history(hospital_id);
-- Index 설정 SQL - hospital_history(analysis_run_id)
CREATE INDEX IX_hospital_history_2
ON hospital_history(analysis_run_id);
-- market_analysis Table Create SQL -- market_analysis
CREATE TABLE market_analysis CREATE TABLE market_analysis
( (
`id` INT NOT NULL AUTO_INCREMENT, `id` INT NOT NULL AUTO_INCREMENT,
@ -210,7 +133,4 @@ CREATE TABLE market_analysis
UNIQUE KEY UQ_market_analysis (analysis_run_id, analysis_type) UNIQUE KEY UQ_market_analysis (analysis_run_id, analysis_type)
); );
-- Index 설정 SQL - market_analysis(analysis_run_id) CREATE INDEX IX_market_analysis_1 ON market_analysis (analysis_run_id);
CREATE INDEX IX_market_analysis_1
ON market_analysis(analysis_run_id);

View File

@ -2,21 +2,22 @@ import logging
import uuid6 import uuid6
from fastapi import APIRouter, BackgroundTasks, Depends, File, Form, HTTPException, UploadFile, status from fastapi import APIRouter, BackgroundTasks, Depends, File, Form, HTTPException, UploadFile, status
from common.deps import verify_api_key from common.deps import verify_api_key
from common.db import fetchone, insert_instagram_row, insert_facebook_row, insert_naver_blog_row, insert_youtube_row, insert_gangnam_unni_row, insert_analysis_run from common.db.hospital import select_hospital
from common.db.source import select_source_mainpage, insert_source, insert_raw_info
from common.db.run import insert_run, select_run_status
from common.utils import _normalize_homepage, _with_scheme
from models.analysis import AnalysisCreate, AnalysisStartResponse, AnalysisStatusResponse from models.analysis import AnalysisCreate, AnalysisStartResponse, AnalysisStatusResponse
from models.file import FileListItem, FileType, FileUploadResponse from models.file import FileListItem, FileType, FileUploadResponse
from models.status import AnalysisStatus from models.status import AnalysisStatus, SourceType
from services.pipeline import run_pipeline from services.pipeline import run_pipeline
from services.file import get_analysis_files_response, handle_analysis_file_upload, soft_delete_analysis_file from services.file_data import get_analysis_files_response, handle_analysis_file_upload, soft_delete_analysis_file
from mock_urls import MOCK_CLINICS from mock_urls import MOCK_CLINICS
from common.utils import _normalize_homepage, _with_scheme
router = APIRouter(prefix="/api/analysis", tags=["analysis"], dependencies=[Depends(verify_api_key)]) router = APIRouter(prefix="/api/analysis", tags=["analysis"], dependencies=[Depends(verify_api_key)])
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
# 추후 DB에 클리닉별로 매핑할 채널들 — 지금은 mock_urls에서 homepage 매칭으로 보충.
# 메인 채널(IG/FB/YT/네이버블로그/강남언니) + 부가 채널(틱톡/영문 IG·FB/카카오/네이버카페) 모두 포함. # 클라가 일부만 보내거나 빈 값이면 mock_urls 의 동일 homepage 매칭으로 채워줌 (메인 + 부가 채널 동일 규칙).
# 클라가 일부만 보내거나 빈 값이면 mock에서 동일 hospital을 찾아 채워줌.
def _channels_from_mockurls(homepage_url: str) -> dict: def _channels_from_mockurls(homepage_url: str) -> dict:
target = _normalize_homepage(homepage_url) target = _normalize_homepage(homepage_url)
if not target: if not target:
@ -47,44 +48,51 @@ async def start_analysis(body: AnalysisCreate, background_tasks: BackgroundTasks
analysis_run_id = str(uuid6.uuid7()) analysis_run_id = str(uuid6.uuid7())
hospital_id = body.clinic_id hospital_id = body.clinic_id
# 사실 hospital과 owner_user_id 비교 후 검증이 필요한 거지만 일단 PoC 니까. 나중에 바꿉니다. # 사실 hospital 과 owner_user_id 비교 후 검증이 필요한 거지만 일단 PoC 니까. 나중에 바꿉니다.
hospital = await fetchone( hospital = await select_hospital(hospital_id)
"SELECT owner_user_id, url FROM hospital_baseinfo WHERE hospital_id = %s",
(hospital_id,),
)
if not hospital: if not hospital:
raise HTTPException(status_code=409, detail="Clinic not found") raise HTTPException(status_code=409, detail="Clinic not found")
# 클라가 안 보낸 채널은 mock_urls에서 homepage 매칭으로 보충 (main + extra 동일 규칙) analysis_run_id = await insert_run(analysis_run_id, hospital_id, hospital["owner_user_id"])
mock = _channels_from_mockurls(hospital["url"])
# 사용자가 'gangnamunni.com/...' 같이 scheme/www 없이 줘도 _with_scheme이 https://www. 보강. mainpage = await select_source_mainpage(hospital_id)
ig_url = _with_scheme(body.channels.instagram) or mock.get("instagram") if mainpage:
fb_url = _with_scheme(body.channels.facebook) or mock.get("facebook") await insert_raw_info(mainpage["source_id"], analysis_run_id, data_tag=SourceType.MAINPAGE)
nb_url = _with_scheme(body.channels.naver_blog) or mock.get("naver_blog") # branding (HTML/CSS + Vision 로고 매칭) — mainpage 와 같은 homepage URL 을 source 로 사용.
yt_url = _with_scheme(body.channels.youtube) or mock.get("youtube") branding_id = await insert_source(hospital_id, SourceType.BRANDING, mainpage["url"])
gu_url = _with_scheme(body.channels.gangnam_unni) or mock.get("gangnam_unni") await insert_raw_info(branding_id, analysis_run_id, data_tag=SourceType.BRANDING)
ig_id = await insert_instagram_row(hospital_id, ig_url) if ig_url else None # 클라가 안 보낸 채널은 mock_urls 에서 homepage 매칭으로 보충 (main + extra 동일 규칙).
fb_id = await insert_facebook_row(hospital_id, fb_url) if fb_url else None mock = _channels_from_mockurls((mainpage or {}).get("url") or "")
nb_id = await insert_naver_blog_row(hospital_id, nb_url) if nb_url else None
yt_id = await insert_youtube_row(hospital_id, yt_url) if yt_url else None
gu_id = await insert_gangnam_unni_row(hospital_id, gu_url) if gu_url else None
analysis_run_id = await insert_analysis_run( # 메인 5채널 (KR). _with_scheme 으로 'gangnamunni.com/...' 같이 scheme/www 없이 와도 보강.
analysis_run_id, hospital_id, hospital["owner_user_id"], main_channels = [
ig_id, fb_id, nb_id, yt_id, gu_id, (SourceType.INSTAGRAM, _with_scheme(body.channels.instagram) or mock.get("instagram")),
) (SourceType.FACEBOOK, _with_scheme(body.channels.facebook) or mock.get("facebook")),
(SourceType.NAVER_BLOG, _with_scheme(body.channels.naver_blog) or mock.get("naver_blog")),
(SourceType.YOUTUBE, _with_scheme(body.channels.youtube) or mock.get("youtube")),
(SourceType.GANGNAM_UNNI, _with_scheme(body.channels.gangnam_unni) or mock.get("gangnam_unni")),
]
for source_type, url in main_channels:
if url:
source_id = await insert_source(hospital_id, source_type, url)
await insert_raw_info(source_id, analysis_run_id, data_tag=source_type)
# 부가 채널 — instagram_en/facebook_en 은 동일 source_type 에 language='EN' 으로 구분, 나머지는 자체 source_type.
extra_channels = [
(SourceType.INSTAGRAM, "EN", _with_scheme(body.channels.instagram_en) or mock.get("instagram_en")),
(SourceType.FACEBOOK, "EN", _with_scheme(body.channels.facebook_en) or mock.get("facebook_en")),
(SourceType.TIKTOK, None, _with_scheme(body.channels.tiktok) or mock.get("tiktok")),
(SourceType.KAKAOTALK, None, _with_scheme(body.channels.kakao_talk) or mock.get("kakao_talk")),
(SourceType.NAVER_CAFE, None, _with_scheme(body.channels.naver_cafe) or mock.get("naver_cafe")),
]
for source_type, language, url in extra_channels:
if url:
source_id = await insert_source(hospital_id, source_type, url, language=language)
await insert_raw_info(source_id, analysis_run_id, data_tag=source_type)
extra_channels = {
"tiktok": body.channels.tiktok or mock.get("tiktok"),
"instagram_en": body.channels.instagram_en or mock.get("instagram_en"),
"facebook_en": body.channels.facebook_en or mock.get("facebook_en"),
"kakao_talk": body.channels.kakao_talk or mock.get("kakao_talk"),
"naver_cafe": body.channels.naver_cafe or mock.get("naver_cafe"),
}
logger.info("[analysis] main+extra channels resolved (mock_matched=%s)", bool(mock)) logger.info("[analysis] main+extra channels resolved (mock_matched=%s)", bool(mock))
background_tasks.add_task(run_pipeline, analysis_run_id, extra_channels) background_tasks.add_task(run_pipeline, analysis_run_id)
return AnalysisStartResponse( return AnalysisStartResponse(
analysis_run_id=analysis_run_id, analysis_run_id=analysis_run_id,
@ -121,12 +129,12 @@ async def delete_analysis_run_file(run_id: str, file_id: int) -> None:
@router.get("/{run_id}/status", response_model=AnalysisStatusResponse) @router.get("/{run_id}/status", response_model=AnalysisStatusResponse)
async def get_analysis_status(run_id: str): async def get_analysis_status(run_id: str):
logger.info("GET /api/analysis/%s/status", run_id) logger.info("GET /api/analysis/%s/status", run_id)
row = await fetchone("SELECT status FROM analysis_runs WHERE analysis_run_id = %s", (run_id,)) run_status = await select_run_status(run_id)
if not row: if run_status is None:
raise HTTPException(status_code=404, detail="Run not found") raise HTTPException(status_code=404, detail="Run not found")
return AnalysisStatusResponse( return AnalysisStatusResponse(
analysis_run_id=run_id, analysis_run_id=run_id,
status=AnalysisStatus(row["status"]), status=AnalysisStatus(run_status),
progress=50.0, progress=50.0,
current_step="", current_step="",
channel_errors={}, channel_errors={},

View File

@ -2,7 +2,8 @@ import logging
import uuid6 import uuid6
from fastapi import APIRouter, Depends, HTTPException, status from fastapi import APIRouter, Depends, HTTPException, status
from common.deps import verify_api_key from common.deps import verify_api_key
from common.db import insert_hospital, fetchone from common.db.hospital import select_hospital, insert_hospital
from common.db.source import insert_source
from common.utils import get_env from common.utils import get_env
from integrations.firecrawl import FirecrawlClient from integrations.firecrawl import FirecrawlClient
from models.clinic import ClinicCreate, ClinicCreateResponse, ClinicResponse, ClinicHistoryResponse, RunSummary from models.clinic import ClinicCreate, ClinicCreateResponse, ClinicResponse, ClinicHistoryResponse, RunSummary
@ -30,9 +31,8 @@ async def create_clinic(body: ClinicCreate):
name=info["clinicName"], name=info["clinicName"],
name_en=info.get("clinicNameEn"), name_en=info.get("clinicNameEn"),
road_address=info.get("address"), road_address=info.get("address"),
url=body.url,
raw_data=info,
) )
await insert_source(hospital_id, "mainpage", body.url)
return ClinicCreateResponse( return ClinicCreateResponse(
id=hospital_id, id=hospital_id,
url=body.url, url=body.url,
@ -44,11 +44,7 @@ async def create_clinic(body: ClinicCreate):
@router.get("/{hospital_id}", response_model=ClinicResponse) @router.get("/{hospital_id}", response_model=ClinicResponse)
async def get_clinic(hospital_id: str): async def get_clinic(hospital_id: str):
logger.info("GET /api/clinics/%s", hospital_id) logger.info("GET /api/clinics/%s", hospital_id)
row = await fetchone( row = await select_hospital(hospital_id)
"SELECT hospital_id, hospital_name, hospital_name_en, road_address, url, status, raw_data, created_at, updated_at"
" FROM hospital_baseinfo WHERE hospital_id = %s",
(hospital_id,),
)
if not row: if not row:
raise HTTPException(status_code=404, detail="Clinic not found") raise HTTPException(status_code=404, detail="Clinic not found")
return ClinicResponse(**{**row, "created_at": str(row["created_at"]), "updated_at": str(row["updated_at"])}) return ClinicResponse(**{**row, "created_at": str(row["created_at"]), "updated_at": str(row["updated_at"])})

View File

@ -1,11 +1,13 @@
import json import json
import logging import logging
from fastapi import APIRouter, Depends, HTTPException, Response from fastapi import APIRouter, Depends, HTTPException, Response
from common.db import fetchone, fetch_raw from common.db.run import select_run_with_clinic
from common.db.source import select_run_source_raw
from common.deps import verify_api_key from common.deps import verify_api_key
from common.utils import _with_scheme from common.utils import _with_scheme
from integrations.llm.schemas.plan import PlanOutput from integrations.llm.schemas.plan import PlanOutput
from models.plan import PlanApiResponse from models.plan import PlanApiResponse
from models.status import SourceType
router = APIRouter(prefix="/api/plan", tags=["plan"], dependencies=[Depends(verify_api_key)]) router = APIRouter(prefix="/api/plan", tags=["plan"], dependencies=[Depends(verify_api_key)])
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -14,26 +16,21 @@ logger = logging.getLogger(__name__)
@router.get("/{run_id}", response_model=PlanApiResponse, response_model_by_alias=True) @router.get("/{run_id}", response_model=PlanApiResponse, response_model_by_alias=True)
async def get_plan(run_id: str): async def get_plan(run_id: str):
logger.info("GET /api/plan/%s", run_id) logger.info("GET /api/plan/%s", run_id)
row = await fetchone( row = await select_run_with_clinic(run_id)
"SELECT ar.plan_data, ar.created_at, ar.gangnam_unni_data_id, h.hospital_name, h.hospital_name_en, h.url"
" FROM analysis_runs ar"
" JOIN hospital_baseinfo h ON ar.hospital_id = h.hospital_id"
" WHERE ar.analysis_run_id = %s",
(run_id,),
)
if row is None: if row is None:
raise HTTPException(status_code=404, detail="Run not found") raise HTTPException(status_code=404, detail="Run not found")
if row["plan_data"] is None: if row["plan_data"] is None:
return Response(status_code=204) return Response(status_code=204)
data = json.loads(row["plan_data"]) if isinstance(row["plan_data"], str) else row["plan_data"] data = json.loads(row["plan_data"]) if isinstance(row["plan_data"], str) else row["plan_data"]
plan = PlanOutput(**data) plan = PlanOutput(**data)
gangnam_unni = await fetch_raw("gangnam_unni_data", row["gangnam_unni_data_id"]) or {} # 강남언니에서 긁어온 이름이 있으면 우선 (hospital_baseinfo 의 정식 이름보다 강남언니가 더 광고용 표기).
clinic_name = gangnam_unni.get("name") or row["hospital_name"] gu = await select_run_source_raw(run_id, SourceType.GANGNAM_UNNI) or {}
clinic_name = gu.get("name") or row["hospital_name"]
return PlanApiResponse( return PlanApiResponse(
id=run_id, id=run_id,
clinic_name=clinic_name, clinic_name=clinic_name,
clinic_name_en=row["hospital_name_en"], clinic_name_en=row["hospital_name_en"],
created_at=str(row["created_at"]), created_at=str(row["created_at"]),
target_url=_with_scheme(row["url"]), target_url=_with_scheme(row["target_url"]),
**plan.model_dump(), **plan.model_dump(),
) )

View File

@ -1,7 +1,7 @@
import json import json
import logging import logging
from fastapi import APIRouter, Depends, HTTPException, Response from fastapi import APIRouter, Depends, HTTPException, Response
from common.db import fetchone from common.db.run import select_run_with_clinic
from common.deps import verify_api_key from common.deps import verify_api_key
from common.utils import _with_scheme from common.utils import _with_scheme
from integrations.llm.schemas.report import ReportOutput from integrations.llm.schemas.report import ReportOutput
@ -14,13 +14,7 @@ logger = logging.getLogger(__name__)
@router.get("/{run_id}", response_model=MarketingReportResponse, response_model_by_alias=True) @router.get("/{run_id}", response_model=MarketingReportResponse, response_model_by_alias=True)
async def get_report(run_id: str): async def get_report(run_id: str):
logger.info("GET /api/report/%s", run_id) logger.info("GET /api/report/%s", run_id)
row = await fetchone( row = await select_run_with_clinic(run_id)
"SELECT ar.report_data, ar.created_at, h.hospital_name, h.hospital_name_en, h.url"
" FROM analysis_runs ar"
" JOIN hospital_baseinfo h ON ar.hospital_id = h.hospital_id"
" WHERE ar.analysis_run_id = %s",
(run_id,),
)
if row is None: if row is None:
raise HTTPException(status_code=404, detail="Run not found") raise HTTPException(status_code=404, detail="Run not found")
if row["report_data"] is None: if row["report_data"] is None:
@ -32,6 +26,6 @@ async def get_report(run_id: str):
clinic_name=row["hospital_name"], clinic_name=row["hospital_name"],
clinic_name_en=row["hospital_name_en"], clinic_name_en=row["hospital_name_en"],
created_at=str(row["created_at"]), created_at=str(row["created_at"]),
target_url=_with_scheme(row["url"]), target_url=_with_scheme(row["target_url"]),
**llm_output.model_dump(exclude={"id", "created_at", "target_url"}), **llm_output.model_dump(exclude={"id", "created_at", "target_url"}),
) )

View File

@ -1,287 +0,0 @@
import json
import os
import aiomysql
from common.utils import get_env
_pool: aiomysql.Pool | None = None
async def get_pool() -> aiomysql.Pool:
global _pool
if _pool is None:
_pool = await aiomysql.create_pool(
host=get_env("MYSQL_HOST"),
port=int(os.getenv("MYSQL_PORT", "3306")),
user=get_env("MYSQL_USER"),
password=get_env("MYSQL_PASSWORD"),
db=get_env("MYSQL_DB"),
charset="utf8mb4",
minsize=0,
maxsize=30,
connect_timeout=10,
)
return _pool
# 쓰기 (INSERT/UPDATE/DELETE)
async def execute(sql: str, args: tuple = ()) -> int:
pool = await get_pool()
async with pool.acquire() as conn:
try:
async with conn.cursor() as cur:
await cur.execute(sql, args)
await conn.commit()
return cur.lastrowid
finally:
conn.close()
# 읽기 (SELECT)
async def fetchone(sql: str, args: tuple = ()) -> dict | None:
pool = await get_pool()
async with pool.acquire() as conn:
try:
async with conn.cursor(aiomysql.DictCursor) as cur:
await cur.execute(sql, args)
return await cur.fetchone()
finally:
conn.close()
async def fetchall(sql: str, args: tuple = ()) -> list[dict]:
pool = await get_pool()
async with pool.acquire() as conn:
try:
async with conn.cursor(aiomysql.DictCursor) as cur:
await cur.execute(sql, args)
return await cur.fetchall()
finally:
conn.close()
async def insert_instagram_row(hospital_id: str, url: str) -> int:
return await execute("INSERT INTO instagram_data (hospital_id, url) VALUES (%s, %s)", (hospital_id, url))
async def insert_facebook_row(hospital_id: str, url: str) -> int:
return await execute("INSERT INTO facebook_data (hospital_id, url) VALUES (%s, %s)", (hospital_id, url))
async def insert_naver_blog_row(hospital_id: str, url: str) -> int:
return await execute("INSERT INTO naver_blog_data (hospital_id, url) VALUES (%s, %s)", (hospital_id, url))
async def insert_youtube_row(hospital_id: str, url: str) -> int:
return await execute("INSERT INTO youtube_data (hospital_id, url) VALUES (%s, %s)", (hospital_id, url))
async def insert_gangnam_unni_row(hospital_id: str, url: str) -> int:
return await execute("INSERT INTO gangnam_unni_data (hospital_id, url) VALUES (%s, %s)", (hospital_id, url))
async def insert_file_row(
analysis_run_id: str,
file_type: str,
file_name: str,
file_url: str,
size_bytes: int | None = None,
hospital_id: str | None = None,
) -> int:
return await execute(
"INSERT INTO file_data (analysis_run_id, hospital_id, file_type, file_name, file_url, size_bytes)"
" VALUES (%s, %s, %s, %s, %s, %s)",
(analysis_run_id, hospital_id, file_type, file_name, file_url, size_bytes),
)
async def insert_analysis_run(
analysis_run_id: str,
hospital_id: str,
owner_user_id: int,
instagram_data_id: int | None,
facebook_data_id: int | None,
naver_blog_data_id: int | None,
youtube_data_id: int | None,
gangnam_unni_data_id: int | None,
) -> str:
await execute(
"INSERT INTO analysis_runs"
" (analysis_run_id, hospital_id, owner_user_id, instagram_data_id, facebook_data_id, naver_blog_data_id, youtube_data_id, gangnam_unni_data_id)"
" VALUES (%s, %s, %s, %s, %s, %s, %s, %s)",
(analysis_run_id, hospital_id, owner_user_id, instagram_data_id, facebook_data_id, naver_blog_data_id, youtube_data_id, gangnam_unni_data_id),
)
return analysis_run_id
async def save_analysis_report(analysis_run_id: str, data: dict) -> None:
await execute(
"UPDATE analysis_runs SET report_data = %s WHERE analysis_run_id = %s",
(json.dumps(data, ensure_ascii=False), analysis_run_id),
)
async def is_done(table: str, row_id: int | None) -> bool:
if row_id is None:
return True
r = await fetchone(f"SELECT status FROM {table} WHERE id = %s", (row_id,))
return r["status"] == "done"
async def fetch_raw(table: str, row_id: int | None) -> dict | None:
if row_id is None:
return None
row = await fetchone(f"SELECT raw_data FROM {table} WHERE id = %s", (row_id,))
if not row or not row["raw_data"]:
return None
return json.loads(row["raw_data"]) if isinstance(row["raw_data"], str) else row["raw_data"]
async def get_analysis_raw_data(analysis_run_id: str) -> dict:
run = await fetchone(
"SELECT instagram_data_id, facebook_data_id, naver_blog_data_id, youtube_data_id, gangnam_unni_data_id"
" FROM analysis_runs WHERE analysis_run_id = %s",
(analysis_run_id,),
)
return {
"instagram": await fetch_raw("instagram_data", run["instagram_data_id"]),
"facebook": await fetch_raw("facebook_data", run["facebook_data_id"]),
"naver_blog": await fetch_raw("naver_blog_data", run["naver_blog_data_id"]),
"youtube": await fetch_raw("youtube_data", run["youtube_data_id"]),
"gangnam_unni": await fetch_raw("gangnam_unni_data", run["gangnam_unni_data_id"]),
}
async def set_instagram_status(row_id: int, status: str) -> None:
await execute("UPDATE instagram_data SET status = %s WHERE id = %s", (status, row_id))
async def set_facebook_status(row_id: int, status: str) -> None:
await execute("UPDATE facebook_data SET status = %s WHERE id = %s", (status, row_id))
async def set_naver_blog_status(row_id: int, status: str) -> None:
await execute("UPDATE naver_blog_data SET status = %s WHERE id = %s", (status, row_id))
async def set_youtube_status(row_id: int, status: str) -> None:
await execute("UPDATE youtube_data SET status = %s WHERE id = %s", (status, row_id))
async def set_gangnam_unni_status(row_id: int, status: str) -> None:
await execute("UPDATE gangnam_unni_data SET status = %s WHERE id = %s", (status, row_id))
async def save_instagram_raw_data(row_id: int, data: dict) -> None:
await execute("UPDATE instagram_data SET raw_data = %s, status = 'done' WHERE id = %s", (json.dumps(data, ensure_ascii=False), row_id))
async def save_facebook_raw_data(row_id: int, data: dict) -> None:
await execute("UPDATE facebook_data SET raw_data = %s, status = 'done' WHERE id = %s", (json.dumps(data, ensure_ascii=False), row_id))
async def save_naver_blog_raw_data(row_id: int, data: dict) -> None:
await execute("UPDATE naver_blog_data SET raw_data = %s, status = 'done' WHERE id = %s", (json.dumps(data, ensure_ascii=False), row_id))
async def save_youtube_raw_data(row_id: int, data: dict) -> None:
await execute("UPDATE youtube_data SET raw_data = %s, status = 'done' WHERE id = %s", (json.dumps(data, ensure_ascii=False), row_id))
async def save_gangnam_unni_raw_data(row_id: int, data: dict) -> None:
await execute("UPDATE gangnam_unni_data SET raw_data = %s, status = 'done' WHERE id = %s", (json.dumps(data, ensure_ascii=False), row_id))
async def _insert_hospital_history(hospital_id: str, analysis_run_id: str | None) -> None:
row = await fetchone(
"SELECT owner_user_id, hospital_name, hospital_name_en, brn, road_address, site_address, url, status, raw_data"
" FROM hospital_baseinfo WHERE hospital_id = %s",
(hospital_id,),
)
if not row:
return
await execute(
"INSERT INTO hospital_history"
" (hospital_id, owner_user_id, hospital_name, hospital_name_en, brn, road_address, site_address, url, status, raw_data, analysis_run_id)"
" VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)",
(
hospital_id,
row["owner_user_id"],
row["hospital_name"],
row["hospital_name_en"],
row["brn"],
row["road_address"],
row["site_address"],
row["url"],
row["status"],
row["raw_data"] if isinstance(row["raw_data"], str) else json.dumps(row["raw_data"], ensure_ascii=False) if row["raw_data"] else None,
analysis_run_id,
),
)
async def insert_hospital(
hospital_id: str,
name: str,
name_en: str | None = None,
road_address: str | None = None,
site_address: str | None = None,
url: str | None = None,
raw_data: dict | None = None,
owner_user_id: int = 0,
brn: str = "",
) -> dict:
await execute(
"INSERT INTO hospital_baseinfo (hospital_id, hospital_name, hospital_name_en, road_address, site_address, url, raw_data, status, owner_user_id, brn)"
" VALUES (%s, %s, %s, %s, %s, %s, %s, 'done', %s, %s)",
(hospital_id, name, name_en, road_address, site_address, url,
json.dumps(raw_data, ensure_ascii=False) if raw_data else None,
owner_user_id, brn),
)
await _insert_hospital_history(hospital_id, analysis_run_id=None)
return await fetchone(
"SELECT created_at FROM hospital_baseinfo WHERE hospital_id = %s",
(hospital_id,),
)
async def save_hospital_raw_data(hospital_id: str, data: dict, analysis_run_id: str | None = None) -> None:
await execute(
"UPDATE hospital_baseinfo"
" SET raw_data = %s, status = 'done',"
" hospital_name = COALESCE(%s, hospital_name),"
" hospital_name_en = COALESCE(%s, hospital_name_en),"
" road_address = COALESCE(%s, road_address)"
" WHERE hospital_id = %s",
(
json.dumps(data, ensure_ascii=False),
data.get("clinicName"),
data.get("clinicNameEn"),
data.get("address"),
hospital_id,
),
)
await _insert_hospital_history(hospital_id, analysis_run_id)
async def merge_hospital_raw_data(hospital_id: str, patch: dict) -> None:
"""hospital_baseinfo.raw_data를 읽어 patch를 top-level 병합 후 저장 (read-modify-write).
부가 수집 단계들이 순차로 raw_data에 키를 덧붙일 사용."""
row = await fetchone("SELECT raw_data FROM hospital_baseinfo WHERE hospital_id = %s", (hospital_id,))
raw = row["raw_data"] if row else None
raw_data = json.loads(raw) if isinstance(raw, str) else (raw or {})
raw_data.update(patch)
await execute(
"UPDATE hospital_baseinfo SET raw_data = %s WHERE hospital_id = %s",
(json.dumps(raw_data, ensure_ascii=False), hospital_id),
)
async def get_market_analysis(analysis_run_id: str) -> dict:
rows = await fetchall(
"SELECT analysis_type, data FROM market_analysis WHERE analysis_run_id = %s AND status = 'done'",
(analysis_run_id,),
)
return {
row["analysis_type"]: json.loads(row["data"]) if isinstance(row["data"], str) else row["data"]
for row in rows
}

16
app/common/db/__init__.py Normal file
View File

@ -0,0 +1,16 @@
from common.db.base import execute, fetchone, fetchall
from common.db.hospital import select_hospital, update_hospital_status, insert_hospital, update_hospital
from common.db.source import (
insert_source, select_source_mainpage, select_source_by_type,
insert_raw_info, update_raw_info_status, update_raw_info, update_raw_info_merge,
update_raw_info_logo_url, select_branding_logo_url, select_branding_info_id,
select_raw_info_data,
select_run_sources, select_run_raw_data, select_run_source_raw,
select_run_mainpage_url,
)
from common.db.run import (
insert_run, select_run, select_run_status, update_run_status,
update_run_report, update_run_plan, select_run_with_clinic, select_run_report_data,
)
from common.db.market import upsert_market_status, upsert_market_result, select_market
from common.db.file_data import insert_file, select_run_files, select_file, delete_file

56
app/common/db/base.py Normal file
View File

@ -0,0 +1,56 @@
import os
import aiomysql
from common.utils import get_env
_pool: aiomysql.Pool | None = None
async def get_pool() -> aiomysql.Pool:
global _pool
if _pool is None:
_pool = await aiomysql.create_pool(
host=get_env("MYSQL_HOST"),
port=int(os.getenv("MYSQL_PORT", "3306")),
user=get_env("MYSQL_USER"),
password=get_env("MYSQL_PASSWORD"),
db=get_env("MYSQL_DB"),
charset="utf8mb4",
minsize=0,
maxsize=30,
connect_timeout=10,
)
return _pool
async def execute(sql: str, args: tuple = ()) -> int:
pool = await get_pool()
async with pool.acquire() as conn:
try:
async with conn.cursor() as cur:
await cur.execute(sql, args)
await conn.commit()
return cur.lastrowid
finally:
conn.close()
async def fetchone(sql: str, args: tuple = ()) -> dict | None:
pool = await get_pool()
async with pool.acquire() as conn:
try:
async with conn.cursor(aiomysql.DictCursor) as cur:
await cur.execute(sql, args)
return await cur.fetchone()
finally:
conn.close()
async def fetchall(sql: str, args: tuple = ()) -> list[dict]:
pool = await get_pool()
async with pool.acquire() as conn:
try:
async with conn.cursor(aiomysql.DictCursor) as cur:
await cur.execute(sql, args)
return await cur.fetchall()
finally:
conn.close()

View File

@ -0,0 +1,39 @@
from common.db.base import execute, fetchone, fetchall
async def insert_file(
analysis_run_id: str,
file_type: str,
file_name: str,
file_url: str,
size_bytes: int | None = None,
hospital_id: str | None = None,
) -> int:
return await execute(
"INSERT INTO file_data (analysis_run_id, hospital_id, file_type, file_name, file_url, size_bytes)"
" VALUES (%s, %s, %s, %s, %s, %s)",
(analysis_run_id, hospital_id, file_type, file_name, file_url, size_bytes),
)
async def select_run_files(analysis_run_id: str) -> list[dict]:
return await fetchall(
"SELECT id, file_type, file_name, file_url, size_bytes, created_at"
" FROM file_data WHERE analysis_run_id = %s AND is_deleted = FALSE"
" ORDER BY created_at DESC",
(analysis_run_id,),
)
async def select_file(file_id: int, analysis_run_id: str) -> dict | None:
return await fetchone(
"SELECT id FROM file_data WHERE id = %s AND analysis_run_id = %s",
(file_id, analysis_run_id),
)
async def delete_file(file_id: int) -> None:
await execute(
"UPDATE file_data SET is_deleted = TRUE WHERE id = %s AND is_deleted = FALSE",
(file_id,),
)

78
app/common/db/hospital.py Normal file
View File

@ -0,0 +1,78 @@
from common.db.base import execute, fetchone
async def select_hospital(hospital_id: str) -> dict | None:
return await fetchone(
"SELECT hospital_id, owner_user_id, hospital_name, hospital_name_en,"
" brn, road_address, site_address, status, created_at, updated_at"
" FROM hospital_baseinfo WHERE hospital_id = %s",
(hospital_id,),
)
async def update_hospital_status(hospital_id: str, status: str) -> None:
await execute(
"UPDATE hospital_baseinfo SET status = %s WHERE hospital_id = %s",
(status, hospital_id),
)
async def _insert_hospital_history(hospital_id: str, analysis_run_id: str | None) -> None:
row = await fetchone(
"SELECT owner_user_id, hospital_name, hospital_name_en, brn, road_address, site_address, status"
" FROM hospital_baseinfo WHERE hospital_id = %s",
(hospital_id,),
)
if not row:
return
await execute(
"INSERT INTO hospital_history"
" (hospital_id, owner_user_id, hospital_name, hospital_name_en, brn, road_address, site_address, status, analysis_run_id)"
" VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)",
(
hospital_id,
row["owner_user_id"],
row["hospital_name"],
row["hospital_name_en"],
row["brn"],
row["road_address"],
row["site_address"],
row["status"],
analysis_run_id,
),
)
async def insert_hospital(
hospital_id: str,
name: str,
name_en: str | None = None,
road_address: str | None = None,
site_address: str | None = None,
owner_user_id: int = 0,
brn: str = "",
) -> dict:
await execute(
"INSERT INTO hospital_baseinfo"
" (hospital_id, hospital_name, hospital_name_en, road_address, site_address, status, owner_user_id, brn)"
" VALUES (%s, %s, %s, %s, %s, 'done', %s, %s)",
(hospital_id, name, name_en, road_address, site_address, owner_user_id, brn),
)
await _insert_hospital_history(hospital_id, analysis_run_id=None)
return await fetchone(
"SELECT created_at FROM hospital_baseinfo WHERE hospital_id = %s",
(hospital_id,),
)
async def update_hospital(hospital_id: str, data: dict, analysis_run_id: str | None = None) -> None:
await execute(
"UPDATE hospital_baseinfo"
" SET status = 'done',"
" hospital_name = COALESCE(%s, hospital_name),"
" hospital_name_en = COALESCE(%s, hospital_name_en),"
" road_address = COALESCE(%s, road_address)"
" WHERE hospital_id = %s",
(data.get("clinicName"), data.get("clinicNameEn"), data.get("address"), hospital_id),
)
await _insert_hospital_history(hospital_id, analysis_run_id)

31
app/common/db/market.py Normal file
View File

@ -0,0 +1,31 @@
import json
from common.db.base import execute, fetchall
async def upsert_market_status(analysis_run_id: str, analysis_type: str, status: str) -> None:
await execute(
"INSERT INTO market_analysis (analysis_run_id, analysis_type, status)"
" VALUES (%s, %s, %s)"
" ON DUPLICATE KEY UPDATE status = VALUES(status)",
(analysis_run_id, analysis_type, status),
)
async def upsert_market_result(analysis_run_id: str, analysis_type: str, data: dict) -> None:
await execute(
"INSERT INTO market_analysis (analysis_run_id, analysis_type, status, data)"
" VALUES (%s, %s, 'done', %s)"
" ON DUPLICATE KEY UPDATE status = 'done', data = VALUES(data)",
(analysis_run_id, analysis_type, json.dumps(data, ensure_ascii=False)),
)
async def select_market(analysis_run_id: str) -> dict:
rows = await fetchall(
"SELECT analysis_type, data FROM market_analysis WHERE analysis_run_id = %s AND status = 'done'",
(analysis_run_id,),
)
return {
row["analysis_type"]: json.loads(row["data"]) if isinstance(row["data"], str) else row["data"]
for row in rows
}

76
app/common/db/run.py Normal file
View File

@ -0,0 +1,76 @@
import json
from common.db.base import execute, fetchone
async def insert_run(
analysis_run_id: str,
hospital_id: str,
owner_user_id: int,
) -> str:
await execute(
"INSERT INTO analysis_runs (analysis_run_id, hospital_id, owner_user_id) VALUES (%s, %s, %s)",
(analysis_run_id, hospital_id, owner_user_id),
)
return analysis_run_id
async def select_run(analysis_run_id: str) -> dict | None:
return await fetchone(
"SELECT analysis_run_id, hospital_id, owner_user_id, status, created_at, updated_at"
" FROM analysis_runs WHERE analysis_run_id = %s",
(analysis_run_id,),
)
async def select_run_report_data(analysis_run_id: str) -> dict | None:
"""report 결과가 필요할 때만 호출. raw JSON 파싱해서 dict 반환."""
import json
row = await fetchone(
"SELECT report_data FROM analysis_runs WHERE analysis_run_id = %s",
(analysis_run_id,),
)
if not row or not row["report_data"]:
return None
return json.loads(row["report_data"]) if isinstance(row["report_data"], str) else row["report_data"]
async def select_run_status(analysis_run_id: str) -> str | None:
row = await fetchone(
"SELECT status FROM analysis_runs WHERE analysis_run_id = %s",
(analysis_run_id,),
)
return row["status"] if row else None
async def update_run_status(analysis_run_id: str, status: str) -> None:
await execute(
"UPDATE analysis_runs SET status = %s WHERE analysis_run_id = %s",
(status, analysis_run_id),
)
async def update_run_report(analysis_run_id: str, data: dict) -> None:
await execute(
"UPDATE analysis_runs SET report_data = %s WHERE analysis_run_id = %s",
(json.dumps(data, ensure_ascii=False), analysis_run_id),
)
async def update_run_plan(analysis_run_id: str, data: dict) -> None:
await execute(
"UPDATE analysis_runs SET plan_data = %s WHERE analysis_run_id = %s",
(json.dumps(data, ensure_ascii=False), analysis_run_id),
)
async def select_run_with_clinic(analysis_run_id: str) -> dict | None:
return await fetchone(
"SELECT ar.report_data, ar.plan_data, ar.created_at,"
" h.hospital_name, h.hospital_name_en,"
" rs.url AS target_url"
" FROM analysis_runs ar"
" JOIN hospital_baseinfo h ON ar.hospital_id = h.hospital_id"
" LEFT JOIN remote_source rs ON rs.hospital_id = h.hospital_id AND rs.source_type = 'mainpage'"
" WHERE ar.analysis_run_id = %s",
(analysis_run_id,),
)

160
app/common/db/source.py Normal file
View File

@ -0,0 +1,160 @@
import json
from common.db.base import execute, fetchone, fetchall
from models.status import SourceType
async def insert_source(
hospital_id: str,
source_type: SourceType,
url: str,
language: str | None = None,
) -> int:
return await execute(
"INSERT INTO remote_source (hospital_id, source_type, language, url) VALUES (%s, %s, %s, %s)",
(hospital_id, source_type, language, url),
)
async def select_source_mainpage(hospital_id: str) -> dict | None:
return await fetchone(
"SELECT source_id, url FROM remote_source WHERE hospital_id = %s AND source_type = 'mainpage'",
(hospital_id,),
)
async def insert_raw_info(
source_id: int,
analysis_run_id: str,
data_tag: SourceType,
) -> int:
return await execute(
"INSERT INTO raw_info (source_id, analysis_run_id, data_tag) VALUES (%s, %s, %s)",
(source_id, analysis_run_id, data_tag),
)
async def update_raw_info_status(info_id: int, status: str) -> None:
await execute("UPDATE raw_info SET status = %s WHERE info_id = %s", (status, info_id))
async def update_raw_info(info_id: int, data: dict) -> None:
await execute(
"UPDATE raw_info SET raw_data = %s, status = 'done' WHERE info_id = %s",
(json.dumps(data, ensure_ascii=False), info_id),
)
async def select_raw_info_data(info_id: int | None) -> dict | None:
if info_id is None:
return None
row = await fetchone("SELECT raw_data FROM raw_info WHERE info_id = %s", (info_id,))
if not row or not row["raw_data"]:
return None
return json.loads(row["raw_data"]) if isinstance(row["raw_data"], str) else row["raw_data"]
async def select_run_sources(analysis_run_id: str) -> list[dict]:
return await fetchall(
"SELECT ri.info_id, rs.source_type, rs.url"
" FROM raw_info ri JOIN remote_source rs USING (source_id)"
" WHERE ri.analysis_run_id = %s",
(analysis_run_id,),
)
async def select_run_raw_data(analysis_run_id: str) -> dict:
# language='EN' 인 row 는 dict key 를 "<source_type>_en" 으로 합성 (KR/EN 동시 수집 시 키 충돌 방지).
rows = await fetchall(
"SELECT rs.source_type, rs.language, ri.raw_data"
" FROM raw_info ri JOIN remote_source rs USING (source_id)"
" WHERE ri.analysis_run_id = %s",
(analysis_run_id,),
)
result: dict = {}
for row in rows:
raw = row["raw_data"]
key = row["source_type"]
if (row.get("language") or "").upper() == "EN":
key = f"{key}_en"
result[key] = json.loads(raw) if isinstance(raw, str) else raw
return result
async def select_run_source_raw(
analysis_run_id: str, source_type: str, language: str | None = None,
) -> dict | None:
sql = (
"SELECT ri.raw_data FROM raw_info ri JOIN remote_source rs USING (source_id)"
" WHERE ri.analysis_run_id = %s AND rs.source_type = %s"
)
args: tuple = (analysis_run_id, source_type)
if language:
sql += " AND rs.language = %s"
args = (*args, language)
sql += " LIMIT 1"
row = await fetchone(sql, args)
if not row or not row["raw_data"]:
return None
return json.loads(row["raw_data"]) if isinstance(row["raw_data"], str) else row["raw_data"]
async def update_raw_info_logo_url(info_id: int, logo_url: str) -> None:
"""raw_info.logo_url 컬럼에 로고 URL 저장 (JSON raw_data 와 분리해 컬럼 인덱스/조회 용이)."""
await execute(
"UPDATE raw_info SET logo_url = %s WHERE info_id = %s",
(logo_url, info_id),
)
async def select_branding_info_id(analysis_run_id: str) -> int | None:
row = await fetchone(
"SELECT ri.info_id FROM raw_info ri JOIN remote_source rs USING (source_id)"
" WHERE ri.analysis_run_id = %s AND rs.source_type = 'branding' LIMIT 1",
(analysis_run_id,),
)
return (row or {}).get("info_id")
async def select_branding_logo_url(analysis_run_id: str) -> str | None:
row = await fetchone(
"SELECT ri.logo_url FROM raw_info ri JOIN remote_source rs USING (source_id)"
" WHERE ri.analysis_run_id = %s AND rs.source_type = 'branding' LIMIT 1",
(analysis_run_id,),
)
return (row or {}).get("logo_url")
async def update_raw_info_merge(info_id: int, patch: dict) -> None:
"""raw_info.raw_data 를 read-modify-write 로 top-level 머지.
source 단계별로 (: branding brandAssets channelLogos) 키를 덧붙일 사용."""
row = await fetchone("SELECT raw_data FROM raw_info WHERE info_id = %s", (info_id,))
if not row:
return
raw = row["raw_data"]
data = json.loads(raw) if isinstance(raw, str) else (raw or {})
data.update(patch)
await execute(
"UPDATE raw_info SET raw_data = %s, status = 'done' WHERE info_id = %s",
(json.dumps(data, ensure_ascii=False), info_id),
)
async def select_source_by_type(
hospital_id: str, source_type: str, language: str | None = None,
) -> dict | None:
sql = "SELECT source_id, url FROM remote_source WHERE hospital_id = %s AND source_type = %s"
args: tuple = (hospital_id, source_type)
if language:
sql += " AND language = %s"
args = (*args, language)
sql += " LIMIT 1"
return await fetchone(sql, args)
async def select_run_mainpage_url(analysis_run_id: str) -> str:
row = await fetchone(
"SELECT rs.url FROM raw_info ri JOIN remote_source rs USING (source_id)"
" WHERE ri.analysis_run_id = %s AND rs.source_type = 'mainpage'",
(analysis_run_id,),
)
return (row or {}).get("url") or ""

View File

@ -1,275 +0,0 @@
"""홈페이지 HTML/CSS에서 hex 색상 직접 추출 + 빈도 기반 brand palette 산출.
Vision LLM에 의존하지 않고 페이지의 실제 CSS 값을 정규식으로 잡음.
로고만 분석하는 Vision보다 사이트 전체 컬러 시스템 (primary/secondary/background/text) 정확히 추출.
"""
import logging
import re
import ssl
from collections import Counter
from urllib.parse import urljoin, urlparse
import httpx
logger = logging.getLogger(__name__)
def _make_ssl_context() -> ssl.SSLContext:
"""오래된 한국 의료 사이트들이 SSL DH_KEY_TOO_SMALL / cipher 약함 등으로 차단되는 문제 우회.
보안 등급 1 낮춤 + cert 검증 유지."""
ctx = ssl.create_default_context()
try:
ctx.set_ciphers("DEFAULT@SECLEVEL=1")
except ssl.SSLError:
pass
return ctx
async def _fetch_html(url: str, timeout: float = 20.0) -> tuple[int, str]:
"""SSL/검증 단계별 fallback으로 HTML 받기. 그랜드/톡스앤필 같은 oldsite 대응."""
headers = {"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36"}
# 1차: 표준 검증
try:
async with httpx.AsyncClient(timeout=timeout, follow_redirects=True, headers=headers) as c:
r = await c.get(url)
return r.status_code, r.text
except (httpx.ConnectError, httpx.ReadError, ssl.SSLError) as e:
logger.info("[fetch] %s standard SSL failed: %s — fallback to weak cipher", url, e)
# 2차: 약한 cipher 허용
try:
async with httpx.AsyncClient(timeout=timeout, follow_redirects=True, headers=headers, verify=_make_ssl_context()) as c:
r = await c.get(url)
return r.status_code, r.text
except (httpx.ConnectError, httpx.ReadError, ssl.SSLError) as e:
logger.info("[fetch] %s weak cipher failed: %s — fallback to verify=False", url, e)
# 3차: SSL 검증 끔 (host mismatch 등)
try:
async with httpx.AsyncClient(timeout=timeout, follow_redirects=True, headers=headers, verify=False) as c:
r = await c.get(url)
return r.status_code, r.text
except Exception as e:
logger.warning("[fetch] %s all fallbacks failed: %s", url, e)
return 0, ""
LOGO_IMG_PATTERNS = [
# 1) <img class="...logo..." src="...">
re.compile(r'<img[^>]*\bclass=["\'][^"\']*\blogo\b[^"\']*["\'][^>]*\bsrc=["\']([^"\']+)["\']', re.IGNORECASE),
# 2) <img src="..." class="...logo...">
re.compile(r'<img[^>]*\bsrc=["\']([^"\']+)["\'][^>]*\bclass=["\'][^"\']*\blogo\b[^"\']*["\']', re.IGNORECASE),
# 3) <img id="...logo..." src="...">
re.compile(r'<img[^>]*\bid=["\'][^"\']*\blogo\b[^"\']*["\'][^>]*\bsrc=["\']([^"\']+)["\']', re.IGNORECASE),
# 4) <img alt="...logo..." src="...">
re.compile(r'<img[^>]*\balt=["\'][^"\']*\blogo\b[^"\']*["\'][^>]*\bsrc=["\']([^"\']+)["\']', re.IGNORECASE),
# 5) <a/h1 class="logo"><...nested...><img src="...">
re.compile(r'<(?:a|h[1-6]|div|span)[^>]*\b(?:class|id)=["\'][^"\']*\blogo\b[^"\']*["\'][^>]*>(?:[^<]|<(?!img))*<img[^>]*\bsrc=["\']([^"\']+)["\']', re.IGNORECASE | re.DOTALL),
# 6) inline background-image: <a/div class="logo" style="background-image: url(...)">
re.compile(r'<(?:a|div|span|h[1-6])[^>]*\b(?:class|id)=["\'][^"\']*\blogo\b[^"\']*["\'][^>]*\bstyle=["\'][^"\']*background(?:-image)?\s*:\s*url\(\s*["\']?([^"\')\s]+)', re.IGNORECASE),
# 7) inline background-image: <a/div style="background-image: url(...)" class="logo"> (속성 순서 반대)
re.compile(r'<(?:a|div|span|h[1-6])[^>]*\bstyle=["\'][^"\']*background(?:-image)?\s*:\s*url\(\s*["\']?([^"\')\s]+)[^"\']*["\'][^>]*\b(?:class|id)=["\'][^"\']*\blogo\b', re.IGNORECASE),
# 8) src 자체에 "logo" 포함 (header_logo.png, brand-logo.svg 등)
re.compile(r'<img[^>]*\bsrc=["\']([^"\']*\blogo\b[^"\']*\.(?:png|svg|jpe?g|webp)[^"\']*)["\']', re.IGNORECASE),
# 9) <header>...<img src="..."> (헤더 영역 첫 img)
re.compile(r'<header\b[^>]*>(?:[^<]|<(?!img))*<img[^>]*\bsrc=["\']([^"\']+\.(?:png|svg|jpe?g|webp)[^"\']*)["\']', re.IGNORECASE | re.DOTALL),
# 10) <nav>...<img src="..."> (nav 영역 첫 img)
re.compile(r'<nav\b[^>]*>(?:[^<]|<(?!img))*<img[^>]*\bsrc=["\']([^"\']+\.(?:png|svg|jpe?g|webp)[^"\']*)["\']', re.IGNORECASE | re.DOTALL),
# 11) Open Graph image (대표 이미지) - 최후 fallback
re.compile(r'<meta[^>]*\bproperty=["\']og:image["\'][^>]*\bcontent=["\']([^"\']+)["\']', re.IGNORECASE),
re.compile(r'<meta[^>]*\bcontent=["\']([^"\']+)["\'][^>]*\bproperty=["\']og:image["\']', re.IGNORECASE),
]
# CSS 파일에서 .logo { background-image: url(...) } 추출용
LOGO_CSS_PATTERN = re.compile(
r'\.[\w-]*\blogo\b[\w-]*\s*(?:,\s*\.[\w-]+\s*)*\{[^}]*background(?:-image)?\s*:\s*url\(\s*["\']?([^"\')\s]+)',
re.IGNORECASE | re.DOTALL,
)
def find_logo_url_in_html(html: str, base_url: str, css_texts: list[str] | None = None) -> str | None:
"""HTML에서 logo URL 찾기. 우선순위:
1) 패턴 1~8 (class/id/alt/src에 'logo' 명시된 img 가장 specific)
2) 외부 CSS의 .logo background-image (class-based, specific)
3) 패턴 9~10 (<header>/<nav> img 가장 generic, 잘못 잡힐 위험 )
"""
def _is_noise(src: str) -> bool:
"""logo로 잘못 잡힐 가능성 높은 URL 패턴 — lang/flag/icon/arrow/spacer 등."""
if not src or src.startswith("data:"):
return True
if re.search(r"(blank|spacer|pixel|transparent|1x1)\b", src, re.IGNORECASE):
return True
# 헤더 첫 img가 lang flag / 검색 아이콘 / 네비 화살표인 경우 (JK plastic 한국어 깃발이 잡히던 케이스)
if re.search(r"(lang[-_]?(kor|eng|chn|jpn|rus|jp|en|ko|cn|ar|in)|flag|country|icon-|btn-|arrow|prev|next|search)\b", src, re.IGNORECASE):
return True
return False
# 1) class/id/alt/src/inline-bg/src-with-logo 패턴 (1~8)
for pat in LOGO_IMG_PATTERNS[:8]:
for m in pat.finditer(html):
src = m.group(1)
if _is_noise(src):
continue
return urljoin(base_url, src)
# 2) 외부 CSS의 .logo { background-image } — class-based 이므로 generic 패턴보다 우선
for css in (css_texts or []):
m = LOGO_CSS_PATTERN.search(css)
if m:
src = m.group(1)
if not _is_noise(src):
return urljoin(base_url, src)
# 3) header/nav 첫 img — 가장 generic, lang flag 등 noise 필터 강화 적용
for pat in LOGO_IMG_PATTERNS[8:]:
for m in pat.finditer(html):
src = m.group(1)
if _is_noise(src):
continue
return urljoin(base_url, src)
return None
HEX6 = re.compile(r"#([0-9a-fA-F]{6})\b")
HEX3 = re.compile(r"#([0-9a-fA-F]{3})\b(?![0-9a-fA-F])")
RGB = re.compile(r"rgba?\(\s*(\d{1,3})\s*,\s*(\d{1,3})\s*,\s*(\d{1,3})\s*(?:,\s*[\d.]+\s*)?\)")
CSS_VAR_HEX = re.compile(r"--[\w-]+\s*:\s*(#[0-9a-fA-F]{3,8})", re.IGNORECASE)
CSS_LINK = re.compile(r'<link[^>]+rel=["\']stylesheet["\'][^>]+href=["\']([^"\']+)["\']', re.IGNORECASE)
STYLE_BLOCK = re.compile(r"<style[^>]*>(.*?)</style>", re.IGNORECASE | re.DOTALL)
# 무채색·아주 흔한 노이즈 컬러 (이런 건 brand color로 잡지 않음)
NOISE = {
"#ffffff", "#000000", "#fff", "#000",
"#333", "#222", "#111", "#444", "#555", "#666", "#777", "#888", "#999",
"#aaa", "#bbb", "#ccc", "#ddd", "#eee", "#f0f0f0", "#f5f5f5", "#fafafa",
}
def _normalize(hex_str: str) -> str:
h = hex_str.lstrip("#").lower()
if len(h) == 3:
h = "".join(c * 2 for c in h)
if len(h) == 8:
h = h[:6]
return f"#{h}"
def _rgb_to_hex(r: int, g: int, b: int) -> str:
return f"#{r:02x}{g:02x}{b:02x}"
def _hex_to_rgb(h: str) -> tuple[int, int, int]:
h = h.lstrip("#")
return int(h[0:2], 16), int(h[2:4], 16), int(h[4:6], 16)
def _distance(a: str, b: str) -> float:
ar, ag, ab = _hex_to_rgb(a)
br, bg, bb = _hex_to_rgb(b)
return ((ar - br) ** 2 + (ag - bg) ** 2 + (ab - bb) ** 2) ** 0.5
def _is_grayscale(h: str, tol: int = 12) -> bool:
r, g, b = _hex_to_rgb(h)
return max(r, g, b) - min(r, g, b) < tol
def _extract_hex(text: str) -> list[str]:
"""텍스트에서 모든 hex 색상 추출 (정규화)."""
out: list[str] = []
out.extend(_normalize(m.group(0)) for m in HEX6.finditer(text))
out.extend(_normalize(m.group(0)) for m in HEX3.finditer(text))
for m in RGB.finditer(text):
r, g, b = int(m.group(1)), int(m.group(2)), int(m.group(3))
if 0 <= r <= 255 and 0 <= g <= 255 and 0 <= b <= 255:
out.append(_rgb_to_hex(r, g, b))
return out
def _cluster(colors: Counter, threshold: float = 25.0) -> list[tuple[str, int]]:
"""비슷한 색은 묶음. 가장 빈도 높은 색을 대표로."""
ranked = colors.most_common()
clusters: list[tuple[str, int]] = []
for color, count in ranked:
merged = False
for i, (rep, rep_count) in enumerate(clusters):
if _distance(color, rep) < threshold:
clusters[i] = (rep, rep_count + count)
merged = True
break
if not merged:
clusters.append((color, count))
return clusters
async def _fetch_html_and_css(homepage_url: str, max_css_files: int = 8) -> tuple[str, list[str]]:
"""홈페이지 HTML + 외부 CSS(Top N)를 한 번에 fetch. 로고/색상 추출이 사이트를 중복으로 긁지 않도록 공유.
_fetch_html이 SSL 약함/host mismatch까지 fallback 처리. 실패 ("", [])."""
status, html = await _fetch_html(homepage_url)
if status != 200 or not html:
logger.warning("[color_extractor] homepage fetch failed status=%s url=%s", status, homepage_url)
return "", []
css_texts: list[str] = []
for css_href in CSS_LINK.findall(html)[:max_css_files]:
cstatus, ctext = await _fetch_html(urljoin(homepage_url, css_href), timeout=15.0)
if cstatus == 200 and ctext:
css_texts.append(ctext)
return html, css_texts
def _colors_from_text(html: str, css_texts: list[str], source_url: str = "") -> dict:
"""이미 받아온 HTML + CSS 텍스트에서 hex 빈도 분석 → primary/accent/text + palette. (fetch 없음, 순수 계산)"""
# 1. HTML 내 <style> 블록 + 통째(inline style="color:#...") + 외부 CSS
all_text_chunks: list[str] = list(STYLE_BLOCK.findall(html))
all_text_chunks.append(html)
all_text_chunks.extend(css_texts)
# 2. 모든 hex 추출 (NOISE 제외)
counter: Counter = Counter()
for text in all_text_chunks:
for color in _extract_hex(text):
if color in NOISE:
continue
counter[color] += 1
if not counter:
logger.info("[color_extractor] no colors extracted from %s", source_url)
return {}
# 3. 비슷한 색 클러스터링
clustered = _cluster(counter)
# 4. primary = 빈도 높은 채도 있는 색 / accent = 두번째 채도 있는 색 / text = 빈도 높은 무채색
chromatic = [c for c, _ in clustered if not _is_grayscale(c)]
grayscale = [c for c, _ in clustered if _is_grayscale(c)]
palette_top = clustered[:8]
palette = [{"name": f"색상 {i+1}", "hex": h, "usage": f"빈도 {n}"} for i, (h, n) in enumerate(palette_top)]
return {
"brand_colors": {
"primary": chromatic[0] if chromatic else None,
"accent": chromatic[1] if len(chromatic) > 1 else None,
"text": grayscale[0] if grayscale else None,
},
"color_palette": palette,
"extracted_from": "html+css",
}
async def extract_brand_colors_from_site(homepage_url: str, max_css_files: int = 8) -> dict:
"""홈페이지 HTML + 외부 CSS fetch → hex 색상 빈도 분석 → primary/accent/text + palette 5종."""
html, css_texts = await _fetch_html_and_css(homepage_url, max_css_files)
if not html:
return {}
return _colors_from_text(html, css_texts, homepage_url)
async def extract_brand_assets_from_site(homepage_url: str, max_css_files: int = 8) -> dict:
"""사이트를 한 번만 fetch해서 로고 URL과 brand 색상을 함께 추출.
반환: {"logo_url": str | None, "colors": {brand_colors, color_palette, ...} | {}}"""
html, css_texts = await _fetch_html_and_css(homepage_url, max_css_files)
if not html:
return {"logo_url": None, "colors": {}}
return {
"logo_url": find_logo_url_in_html(html, homepage_url, css_texts=css_texts),
"colors": _colors_from_text(html, css_texts, homepage_url),
}

View File

@ -218,9 +218,10 @@ class VisionClient:
' "has_symbol": "심볼/아이콘이 있으면 true, 글자만 있으면 false (boolean)",\n' ' "has_symbol": "심볼/아이콘이 있으면 true, 글자만 있으면 false (boolean)",\n'
' "logo_symbol": "심볼이 묘사하는 대상 (예: \'잎사귀\', \'추상 곡선\'). 없으면 빈 문자열",\n' ' "logo_symbol": "심볼이 묘사하는 대상 (예: \'잎사귀\', \'추상 곡선\'). 없으면 빈 문자열",\n'
' "logo_text": "로고에 보이는 워드마크 텍스트 그대로 (한글/영문). 없으면 빈 문자열",\n' ' "logo_text": "로고에 보이는 워드마크 텍스트 그대로 (한글/영문). 없으면 빈 문자열",\n'
' "logo_colors_desc": "로고에 쓰인 색감을 사람이 부르는 이름으로 서술 (예: \'딥네이비 + 골드\'). 정확한 hex는 출력하지 말 것"\n' ' "logo_colors_desc": "로고에 쓰인 색감을 사람이 부르는 이름으로 서술 (예: \'딥네이비 + 골드\')",\n'
' "logo_colors_hex": ["로고에서 시각적으로 두드러진 색 정확히 5개의 hex 근사값 배열. 예: [\'#1A2B3C\', \'#D4A017\', \'#FFFFFF\', \'#9E5C2A\', \'#1F1F1F\']. 강한 색이 5개 안 되면 음영/명도 차이로 5개 채울 것. 빈 배열 금지."]\n'
"}\n" "}\n"
"주의: 색상 hex 값이나 logo URL 같은 필드는 출력하지 마세요 (별도 추출 로직이 처리).\n" "주의: logo_colors_hex 는 시각 추정이라 정확도 떨어질 수 있음. CSS 추출이 우선이고 이건 fallback/보완 용.\n"
"모든 설명/텍스트 값은 반드시 한국어로 작성하세요 (영어 금지)." "모든 설명/텍스트 값은 반드시 한국어로 작성하세요 (영어 금지)."
) )
result = await self._ask(urls, prompt) result = await self._ask(urls, prompt)
@ -228,6 +229,14 @@ class VisionClient:
return {} return {}
# logo_images는 우리가 직접 채움 (Vision은 묘사만) # logo_images는 우리가 직접 채움 (Vision은 묘사만)
result["logo_images"] = {"circle": None, "horizontal": logo_url, "korean": None} result["logo_images"] = {"circle": None, "horizontal": logo_url, "korean": None}
# logo_colors_hex 5개 강제 정규화 — LLM 이 4개나 6개 줄 수도 있어서 길이 fallback.
hex_list = [h for h in (result.get("logo_colors_hex") or []) if isinstance(h, str) and h.startswith("#")]
if hex_list:
while len(hex_list) < 5:
hex_list.append(hex_list[-1]) # 마지막 색 복제로 패딩
result["logo_colors_hex"] = hex_list[:5]
else:
result["logo_colors_hex"] = []
return result return result
async def describe_channel_logos( async def describe_channel_logos(

View File

@ -0,0 +1,66 @@
"""홈페이지 HTML + 외부 CSS 를 가져오는 fetch 전용 모듈.
오래된 한국 의료 사이트들이 SSL DH_KEY_TOO_SMALL / cipher 약함 / host mismatch 등으로
표준 fetch 차단되는 케이스가 많아 단계별 SSL fallback 으로 받는다.
파싱·도메인 로직은 들어가지 않음 순수 HTTP 응답 본문 반환.
"""
import logging
import re
import ssl
from urllib.parse import urljoin
import httpx
logger = logging.getLogger(__name__)
CSS_LINK = re.compile(
r'<link[^>]+rel=["\']stylesheet["\'][^>]+href=["\']([^"\']+)["\']',
re.IGNORECASE,
)
def _make_ssl_context() -> ssl.SSLContext:
"""보안 등급 1로 낮춤 + cert 검증 유지 (옛 한국 의료 사이트 cipher 약함 회피)."""
ctx = ssl.create_default_context()
try:
ctx.set_ciphers("DEFAULT@SECLEVEL=1")
except ssl.SSLError:
pass
return ctx
async def fetch_html(url: str, timeout: float = 20.0) -> tuple[int, str]:
"""SSL 검증 단계별 fallback 으로 HTML 본문 받기. 실패 시 (0, "")."""
headers = {"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36"}
try:
async with httpx.AsyncClient(timeout=timeout, follow_redirects=True, headers=headers) as c:
r = await c.get(url)
return r.status_code, r.text
except (httpx.ConnectError, httpx.ReadError, ssl.SSLError) as e:
logger.info("[fetch] %s standard SSL failed: %s — fallback to weak cipher", url, e)
try:
async with httpx.AsyncClient(timeout=timeout, follow_redirects=True, headers=headers, verify=_make_ssl_context()) as c:
r = await c.get(url)
return r.status_code, r.text
except (httpx.ConnectError, httpx.ReadError, ssl.SSLError) as e:
logger.info("[fetch] %s weak cipher failed: %s — fallback to verify=False", url, e)
try:
async with httpx.AsyncClient(timeout=timeout, follow_redirects=True, headers=headers, verify=False) as c:
r = await c.get(url)
return r.status_code, r.text
except Exception as e:
logger.warning("[fetch] %s all fallbacks failed: %s", url, e)
return 0, ""
async def fetch_html_and_css(homepage_url: str, max_css_files: int = 8) -> tuple[str, list[str]]:
"""홈페이지 HTML + 외부 CSS(Top N) 한 번에 fetch. 실패 시 ("", [])."""
status, html = await fetch_html(homepage_url)
if status != 200 or not html:
logger.warning("[fetch] homepage fetch failed status=%s url=%s", status, homepage_url)
return "", []
css_texts: list[str] = []
for css_href in CSS_LINK.findall(html)[:max_css_files]:
cstatus, ctext = await fetch_html(urljoin(homepage_url, css_href), timeout=15.0)
if cstatus == 200 and ctext:
css_texts.append(ctext)
return html, css_texts

View File

@ -10,9 +10,7 @@ class ClinicResponse(BaseModel):
hospital_name: str hospital_name: str
hospital_name_en: str | None hospital_name_en: str | None
road_address: str | None road_address: str | None
url: str | None
status: str status: str
raw_data: dict | None
created_at: str created_at: str
updated_at: str updated_at: str

View File

@ -36,9 +36,24 @@ class DataSource(StrEnum):
SCRAPE = "scrape" SCRAPE = "scrape"
class SourceType(StrEnum):
MAINPAGE = "mainpage"
INSTAGRAM = "instagram"
FACEBOOK = "facebook"
NAVER_BLOG = "naver_blog"
YOUTUBE = "youtube"
TIKTOK = "tiktok"
GANGNAM_UNNI = "gangnam_unni"
KAKAOTALK = "kakaotalk"
NAVER_CAFE = "naver_cafe"
# 부가 수집/분석 (HTML/CSS 재크롤 + Vision 로고 매칭) — 한 raw_info entry 에 brandAssets/channelLogos 같이 보관.
BRANDING = "branding"
class Language(StrEnum): class Language(StrEnum):
KR = "KR" KR = "KR"
EN = "EN" EN = "EN"
WW = "WW"
class VideoType(StrEnum): class VideoType(StrEnum):

View File

@ -2,32 +2,27 @@ import json
import logging import logging
import re import re
from datetime import datetime from datetime import datetime
from common.db import fetchone, execute, fetch_raw, get_analysis_raw_data, save_analysis_report, get_market_analysis from urllib.parse import urlparse
from common.db.run import update_run_report, update_run_plan, select_run_report_data
from common.db.source import select_run_raw_data, select_branding_logo_url
from common.db.market import select_market
from integrations.llm.llm_service import LLMService from integrations.llm.llm_service import LLMService
from integrations.llm.prompt import report_prompt, plan_prompt, youtube_diagnosis_prompt from integrations.llm.prompt import report_prompt, plan_prompt, youtube_diagnosis_prompt
from integrations.llm.schemas.report import ReportOutput, ClinicSnapshot, YouTubeAudit from integrations.llm.schemas.report import ReportOutput, ClinicSnapshot, YouTubeAudit
from services.branding import analyze_branding
from services.instagram_audit import build_instagram_accounts from services.instagram_audit import build_instagram_accounts
from services.facebook_audit import build_facebook_pages from services.facebook_audit import build_facebook_pages
from services.kpi_dashboard import build_kpi_dashboard from services.kpi_dashboard import build_kpi_dashboard
from integrations.llm.schemas.plan import PlanOutput from integrations.llm.schemas.plan import PlanOutput
from models.status import AnalysisStatus
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
async def generate_report(analysis_run_id: str) -> ReportOutput: async def generate_report(analysis_run_id: str) -> ReportOutput:
run = await fetchone( raw = await select_run_raw_data(analysis_run_id)
"SELECT hospital_id FROM analysis_runs WHERE analysis_run_id = %s", clinic = raw.get("mainpage") or {}
(analysis_run_id,), branding = raw.get("branding") or {}
) market = await select_market(analysis_run_id)
clinic_row = await fetchone(
"SELECT raw_data FROM hospital_baseinfo WHERE hospital_id = %s",
(run["hospital_id"],),
)
raw_data = clinic_row["raw_data"] if clinic_row else None
clinic = json.loads(raw_data) if isinstance(raw_data, str) else (raw_data or {})
raw = await get_analysis_raw_data(analysis_run_id)
market = await get_market_analysis(analysis_run_id)
def _json(v) -> str | None: def _json(v) -> str | None:
return json.dumps(v, ensure_ascii=False) if v else None return json.dumps(v, ensure_ascii=False) if v else None
@ -44,37 +39,36 @@ async def generate_report(analysis_run_id: str) -> ReportOutput:
"market_keywords": _json(market.get("keywords")), "market_keywords": _json(market.get("keywords")),
"market_trend": _json(market.get("trend")), "market_trend": _json(market.get("trend")),
"market_target_audience": _json(market.get("target_audience")), "market_target_audience": _json(market.get("target_audience")),
# firecrawl 이 mainpage 에서 뽑은 branding 메타(logoUrl/ogImage/faviconUrl) + Vision/CSS 산출물
"branding": _json(clinic.get("branding")), "branding": _json(clinic.get("branding")),
"brand_assets": _json(clinic.get("brandAssets")), "brand_assets": _json(branding.get("brandAssets")),
"tiktok": _json(clinic.get("tiktok")), "channel_logos": _json(branding.get("channelLogos")),
"instagram_en": _json(clinic.get("instagramEn")), # 부가 채널 (raw_info entry) — raw dict 의 한국식 key 그대로
"facebook_en": _json(clinic.get("facebookEn")), "tiktok": _json(raw.get("tiktok")),
"kakao_talk": _json(clinic.get("kakaoTalk")), "instagram_en": _json(raw.get("instagram_en")),
"naver_cafe": _json(clinic.get("naverCafe")), "facebook_en": _json(raw.get("facebook_en")),
"channel_logos": _json(clinic.get("channelLogos")), "kakao_talk": _json(raw.get("kakaotalk")),
"naver_cafe": _json(raw.get("naver_cafe")),
# 메인 5채널은 raw dict 그대로 펼쳐서 prompt placeholder 와 매칭
**{ **{
channel: _json(data) source_type: _json(data)
for channel, data in raw.items() for source_type, data in raw.items()
if source_type not in {
"mainpage", "branding",
"tiktok", "instagram_en", "facebook_en", "kakaotalk", "naver_cafe",
}
}, },
} }
return await LLMService(provider="perplexity").generate(report_prompt, input_data) return await LLMService(provider="perplexity").generate(report_prompt, input_data)
async def generate_plan(analysis_run_id: str) -> PlanOutput: async def generate_plan(analysis_run_id: str) -> PlanOutput:
run = await fetchone( raw = await select_run_raw_data(analysis_run_id)
"SELECT hospital_id, report_data FROM analysis_runs WHERE analysis_run_id = %s", clinic = raw.get("mainpage") or {}
(analysis_run_id,), branding = raw.get("branding") or {}
) report = await select_run_report_data(analysis_run_id)
clinic_row = await fetchone( market = await select_market(analysis_run_id)
"SELECT raw_data FROM hospital_baseinfo WHERE hospital_id = %s",
(run["hospital_id"],),
)
raw_data = clinic_row["raw_data"] if clinic_row else None
clinic = json.loads(raw_data) if isinstance(raw_data, str) else (raw_data or {})
report_data = run["report_data"]
report = json.loads(report_data) if isinstance(report_data, str) else report_data
market = await get_market_analysis(analysis_run_id)
raw = await get_analysis_raw_data(analysis_run_id)
def _json(v) -> str | None: def _json(v) -> str | None:
return json.dumps(v, ensure_ascii=False) if v else None return json.dumps(v, ensure_ascii=False) if v else None
@ -92,27 +86,28 @@ async def generate_plan(analysis_run_id: str) -> PlanOutput:
"market_keywords": _json(market.get("keywords")), "market_keywords": _json(market.get("keywords")),
"market_trend": _json(market.get("trend")), "market_trend": _json(market.get("trend")),
"market_target_audience": _json(market.get("target_audience")), "market_target_audience": _json(market.get("target_audience")),
"tiktok": _json(clinic.get("tiktok")), "tiktok": _json(raw.get("tiktok")),
"instagram_en": _json(clinic.get("instagramEn")), "instagram_en": _json(raw.get("instagram_en")),
"facebook_en": _json(clinic.get("facebookEn")), "facebook_en": _json(raw.get("facebook_en")),
"naver_blog": _json(_naver_blog_summary(raw.get("naver_blog"))), "naver_blog": _json(_naver_blog_summary(raw.get("naver_blog"))),
"naver_cafe": _json(clinic.get("naverCafe")), "naver_cafe": _json(raw.get("naver_cafe")),
"kakao_talk": _json(clinic.get("kakaoTalk")), "kakao_talk": _json(raw.get("kakaotalk")),
"channel_logos": _json(clinic.get("channelLogos")), "channel_logos": _json(branding.get("channelLogos")),
"brand_assets": _json(clinic.get("brandAssets")), "brand_assets": _json(branding.get("brandAssets")),
} }
return await LLMService(provider="perplexity").generate(plan_prompt, input_data) return await LLMService(provider="perplexity").generate(plan_prompt, input_data)
def _build_clinic_snapshot(gangnam_unni: dict, hospital: dict) -> dict: def _build_clinic_snapshot(gangnam_unni: dict, mainpage: dict, brand_assets: dict, logo_url: str | None) -> dict:
snapshot: dict = {} snapshot: dict = {}
doctors = gangnam_unni.get("doctors", []) doctors = gangnam_unni.get("doctors", [])
lead = max(doctors, key=lambda d: d.get("reviews", 0)) if doctors else None lead = max(doctors, key=lambda d: d.get("reviews", 0)) if doctors else None
if gangnam_unni.get("name"): snapshot["name"] = gangnam_unni["name"] if gangnam_unni.get("name"): snapshot["name"] = gangnam_unni["name"]
if hospital.get("clinicNameEn"): snapshot["name_en"] = hospital["clinicNameEn"] if mainpage.get("clinicNameEn"): snapshot["name_en"] = mainpage["clinicNameEn"]
if hospital.get("phone"): snapshot["phone"] = hospital["phone"] if mainpage.get("phone"): snapshot["phone"] = mainpage["phone"]
if hospital.get("domain"): snapshot["domain"] = hospital["domain"] domain = mainpage.get("domain") or urlparse(mainpage.get("sourceUrl") or "").netloc
if domain: snapshot["domain"] = domain
if gangnam_unni.get("rating"): snapshot["overall_rating"] = gangnam_unni["rating"] if gangnam_unni.get("rating"): snapshot["overall_rating"] = gangnam_unni["rating"]
if gangnam_unni.get("totalReviews"): snapshot["total_reviews"] = gangnam_unni["totalReviews"] if gangnam_unni.get("totalReviews"): snapshot["total_reviews"] = gangnam_unni["totalReviews"]
if gangnam_unni.get("address"): snapshot["location"] = gangnam_unni["address"] if gangnam_unni.get("address"): snapshot["location"] = gangnam_unni["address"]
@ -125,16 +120,16 @@ def _build_clinic_snapshot(gangnam_unni: dict, hospital: dict) -> dict:
"rating": lead.get("rating"), "rating": lead.get("rating"),
"review_count": lead.get("reviews"), "review_count": lead.get("reviews"),
} }
# brand_assets에서 logo_images / brand_colors 강제 주입. LLM이 프롬프트 가드 무시하고 null로 두는 케이스 차단. # logo URL 은 raw_info.logo_url 컬럼에서, brand_colors 는 JSON 에서 강제 주입. LLM 의 null 처리 차단.
ba = hospital.get("brandAssets") or {} if logo_url:
if ba.get("logo_images"): snapshot["logo_images"] = ba["logo_images"] snapshot["logo_images"] = {"circle": None, "horizontal": logo_url, "korean": None}
if ba.get("brand_colors"): snapshot["brand_colors"] = ba["brand_colors"] if brand_assets.get("brand_colors"): snapshot["brand_colors"] = brand_assets["brand_colors"]
return ClinicSnapshot.model_validate(snapshot).model_dump() return ClinicSnapshot.model_validate(snapshot).model_dump()
def _naver_blog_summary(blog: dict | None) -> dict | None: def _naver_blog_summary(blog: dict | None) -> dict | None:
"""plan 카드 한 장에 들어가는 건 전체 포스트 수와 최근 활동 시점뿐. 그 외(본문·링크·제목)는 """plan 카드 한 장에 들어가는 건 전체 포스트 수와 최근 활동 시점뿐. 그 외(본문·링크·제목)는
던져봐야 토큰만 늘고 LLM무관 정보로 hallucinate .""" 던져봐야 토큰만 늘고 LLM 무관 정보로 hallucinate ."""
if not blog: if not blog:
return None return None
posts = blog.get("posts") or [] posts = blog.get("posts") or []
@ -256,40 +251,43 @@ async def _build_youtube_audit(youtube: dict) -> dict:
async def _build_overrides(analysis_run_id: str) -> dict: async def _build_overrides(analysis_run_id: str) -> dict:
run = await fetchone( raw = await select_run_raw_data(analysis_run_id)
"SELECT hospital_id, instagram_data_id, facebook_data_id," if not raw:
" naver_blog_data_id, youtube_data_id, gangnam_unni_data_id"
" FROM analysis_runs WHERE analysis_run_id = %s",
(analysis_run_id,),
)
if not run:
return {} return {}
hospital_row = await fetchone( mainpage = raw.get("mainpage", {}) or {}
"SELECT raw_data, url FROM hospital_baseinfo WHERE hospital_id = %s", branding = raw.get("branding", {}) or {}
(run["hospital_id"],), instagram = raw.get("instagram", {}) or {}
) facebook = raw.get("facebook", {}) or {}
hospital = json.loads(hospital_row["raw_data"]) if hospital_row and isinstance(hospital_row.get("raw_data"), str) else (hospital_row or {}).get("raw_data") or {} youtube = raw.get("youtube", {}) or {}
hospital["domain"] = (hospital_row or {}).get("url") or "" gangnam_unni = raw.get("gangnam_unni", {}) or {}
instagram = await fetch_raw("instagram_data", run["instagram_data_id"]) or {} naver_blog = raw.get("naver_blog", {}) or {}
facebook = await fetch_raw("facebook_data", run["facebook_data_id"]) or {} instagram_en = raw.get("instagram_en", {}) or {}
naver_blog = await fetch_raw("naver_blog_data", run["naver_blog_data_id"]) or {} facebook_en = raw.get("facebook_en", {}) or {}
youtube = await fetch_raw("youtube_data", run["youtube_data_id"]) or {} tiktok = raw.get("tiktok", {}) or {}
gangnam_unni = await fetch_raw("gangnam_unni_data", run["gangnam_unni_data_id"]) or {} naver_cafe = raw.get("naver_cafe", {}) or {}
brand_assets = branding.get("brandAssets") or {}
channel_logos = branding.get("channelLogos") or {}
logo_url = await select_branding_logo_url(analysis_run_id)
snapshot: dict = _build_clinic_snapshot(gangnam_unni, hospital) snapshot: dict = _build_clinic_snapshot(gangnam_unni, mainpage, brand_assets, logo_url)
yt_patch: dict = await _build_youtube_audit(youtube) yt_patch: dict = await _build_youtube_audit(youtube)
# ── instagram (KR·EN 계정을 코드에서 구성 → LLM 출력 무시하고 교체) ────────────── # ── instagram (KR·EN 계정을 코드에서 구성 → LLM 출력 무시하고 교체) ──────────────
ig_patch = build_instagram_accounts( ig_patch = build_instagram_accounts(instagram, instagram_en, channel_logos)
instagram, hospital.get("instagramEn") or {}, hospital.get("channelLogos") or {},
)
# ── facebook (KR=facebook_data, EN=hospital.facebookEn 둘 다 코드 산출, [KR, EN] 순서) ── # ── facebook (KR=raw.facebook, EN=raw.facebook_en 둘 다 코드 산출, [KR, EN] 순서) ──
fb_pages = build_facebook_pages(facebook, hospital.get("facebookEn") or {}) fb_pages = build_facebook_pages(facebook, facebook_en)
# ── KPI dashboard: 7개 mockup 라이프사이클 공식으로 코드가 결정. LLM 출력은 무시. ────── # ── KPI dashboard: 7개 mockup 라이프사이클 공식으로 코드가 결정. LLM 출력은 무시. ──────
kpi = build_kpi_dashboard(instagram, facebook, youtube, gangnam_unni, hospital, naver_blog) # build_kpi_dashboard 의 hospital 인자에 부가 채널 dict 모아서 넘김 (instagramEn/facebookEn/tiktok/naverCafe 키 기대).
kpi_extras = {
"instagramEn": instagram_en,
"facebookEn": facebook_en,
"tiktok": tiktok,
"naverCafe": naver_cafe,
}
kpi = build_kpi_dashboard(instagram, facebook, youtube, gangnam_unni, kpi_extras, naver_blog)
overrides: dict = {} overrides: dict = {}
if snapshot: if snapshot:
@ -317,12 +315,13 @@ def _deep_merge(base: dict, overrides: dict) -> dict:
base[k] = v base[k] = v
return base return base
def _patch_report(result: ReportOutput, overrides: dict) -> ReportOutput: def _patch_report(result: ReportOutput, overrides: dict) -> ReportOutput:
merged = _deep_merge(result.model_dump(), overrides) merged = _deep_merge(result.model_dump(), overrides)
# 인스타 계정은 프롬프트에서 LLM이 []로 두게 했고, 코드가 수집 데이터로 채운다 (데이터 없으면 빈 리스트) # 인스타 계정은 프롬프트에서 LLM 이 [] 로 두게 했고, 코드가 수집 데이터로 채운다 (데이터 없으면 빈 리스트)
merged.setdefault("instagram_audit", {})["accounts"] = (overrides.get("instagram_audit") or {}).get("accounts") or [] merged.setdefault("instagram_audit", {})["accounts"] = (overrides.get("instagram_audit") or {}).get("accounts") or []
# 페북 페이지(KR+EN): _page_patch가 부분 필드만 만들어 그대로 박으면 검증 실패(label/logo 등 누락). # 페북 페이지(KR+EN): _page_patch 가 부분 필드만 만들어 그대로 박으면 검증 실패(label/logo 등 누락).
# LLM이 만든 첫 페이지(보통 KR)를 템플릿으로 복사한 뒤 코드 patch로 인덱스별 덮어쓰기 → # LLM 이 만든 첫 페이지(보통 KR)를 템플릿으로 복사한 뒤 코드 patch 로 인덱스별 덮어쓰기 →
# 필수 필드는 LLM 디폴트 받고, 수집 수치는 코드 값. EN 누락 버그 회피. # 필수 필드는 LLM 디폴트 받고, 수집 수치는 코드 값. EN 누락 버그 회피.
fb_pages = (overrides.get("facebook_audit") or {}).get("pages") or [] fb_pages = (overrides.get("facebook_audit") or {}).get("pages") or []
if fb_pages: if fb_pages:
@ -341,15 +340,16 @@ def _patch_report(result: ReportOutput, overrides: dict) -> ReportOutput:
async def run_report_task(analysis_run_id: str) -> None: async def run_report_task(analysis_run_id: str) -> None:
logger.info("[report] start run=%s", analysis_run_id) logger.info("[report] start run=%s", analysis_run_id)
await analyze_branding(analysis_run_id)
result = await generate_report(analysis_run_id) result = await generate_report(analysis_run_id)
result = _patch_report(result, await _build_overrides(analysis_run_id)) result = _patch_report(result, await _build_overrides(analysis_run_id))
await save_analysis_report(analysis_run_id, result.model_dump()) await update_run_report(analysis_run_id, result.model_dump())
logger.info("[report] done run=%s", analysis_run_id) logger.info("[report] done run=%s", analysis_run_id)
def _patch_plan(result: PlanOutput, logo_desc: str) -> PlanOutput: def _patch_plan(result: PlanOutput, logo_desc: str) -> PlanOutput:
"""brand_guide.channel_branding[].profile_photo 는 LLM 안 맡기고 코드가 박는다 """brand_guide.channel_branding[].profile_photo 는 LLM 안 맡기고 코드가 박는다
(모든 채널 동일값 = brand_assets.logo_description). LLMfallback 문구 hallucinate 방지.""" (모든 채널 동일값 = brand_assets.logo_description). LLM fallback 문구 hallucinate 방지."""
p = result.model_dump() p = result.model_dump()
for ch in (p.get("brand_guide") or {}).get("channel_branding") or []: for ch in (p.get("brand_guide") or {}).get("channel_branding") or []:
ch["profile_photo"] = logo_desc ch["profile_photo"] = logo_desc
@ -359,15 +359,10 @@ def _patch_plan(result: PlanOutput, logo_desc: str) -> PlanOutput:
async def run_plan_task(analysis_run_id: str) -> None: async def run_plan_task(analysis_run_id: str) -> None:
logger.info("[plan] start run=%s", analysis_run_id) logger.info("[plan] start run=%s", analysis_run_id)
result = await generate_plan(analysis_run_id) result = await generate_plan(analysis_run_id)
# profile_photo 는 brand_assets.logo_description 으로 코드가 박음 (LLM "(가이드 미보유)" 같은 hallucination 차단) # profile_photo 는 brand_assets.logo_description 으로 코드가 박음 (LLM "(가이드 미보유)" 같은 hallucination 차단).
run = await fetchone("SELECT hospital_id FROM analysis_runs WHERE analysis_run_id = %s", (analysis_run_id,)) raw = await select_run_raw_data(analysis_run_id)
if run: branding = raw.get("branding") or {}
hr = await fetchone("SELECT raw_data FROM hospital_baseinfo WHERE hospital_id = %s", (run["hospital_id"],)) logo_desc = ((branding.get("brandAssets") or {}).get("logo_description")) or ""
h = json.loads(hr["raw_data"]) if hr and isinstance(hr.get("raw_data"), str) else (hr or {}).get("raw_data") or {}
logo_desc = ((h.get("brandAssets") or {}).get("logo_description")) or ""
result = _patch_plan(result, logo_desc) result = _patch_plan(result, logo_desc)
await execute( await update_run_plan(analysis_run_id, result.model_dump())
"UPDATE analysis_runs SET plan_data = %s WHERE analysis_run_id = %s",
(json.dumps(result.model_dump(), ensure_ascii=False), analysis_run_id),
)
logger.info("[plan] done run=%s", analysis_run_id) logger.info("[plan] done run=%s", analysis_run_id)

View File

@ -0,0 +1,172 @@
"""collect 단계 - HTML/CSS 텍스트에서 brand 로고 URL + 색상 추출"""
import logging
import re
from collections import Counter
from urllib.parse import urljoin
logger = logging.getLogger(__name__)
# ── 로고 URL 추출 ─────────────────────────────────────────────────────────────
LOGO_IMG_PATTERNS = [
re.compile(r'<img[^>]*\bclass=["\'][^"\']*\blogo\b[^"\']*["\'][^>]*\bsrc=["\']([^"\']+)["\']', re.IGNORECASE),
re.compile(r'<img[^>]*\bsrc=["\']([^"\']+)["\'][^>]*\bclass=["\'][^"\']*\blogo\b[^"\']*["\']', re.IGNORECASE),
re.compile(r'<img[^>]*\bid=["\'][^"\']*\blogo\b[^"\']*["\'][^>]*\bsrc=["\']([^"\']+)["\']', re.IGNORECASE),
re.compile(r'<img[^>]*\balt=["\'][^"\']*\blogo\b[^"\']*["\'][^>]*\bsrc=["\']([^"\']+)["\']', re.IGNORECASE),
re.compile(r'<(?:a|h[1-6]|div|span)[^>]*\b(?:class|id)=["\'][^"\']*\blogo\b[^"\']*["\'][^>]*>(?:[^<]|<(?!img))*<img[^>]*\bsrc=["\']([^"\']+)["\']', re.IGNORECASE | re.DOTALL),
re.compile(r'<(?:a|div|span|h[1-6])[^>]*\b(?:class|id)=["\'][^"\']*\blogo\b[^"\']*["\'][^>]*\bstyle=["\'][^"\']*background(?:-image)?\s*:\s*url\(\s*["\']?([^"\')\s]+)', re.IGNORECASE),
re.compile(r'<(?:a|div|span|h[1-6])[^>]*\bstyle=["\'][^"\']*background(?:-image)?\s*:\s*url\(\s*["\']?([^"\')\s]+)[^"\']*["\'][^>]*\b(?:class|id)=["\'][^"\']*\blogo\b', re.IGNORECASE),
re.compile(r'<img[^>]*\bsrc=["\']([^"\']*\blogo\b[^"\']*\.(?:png|svg|jpe?g|webp)[^"\']*)["\']', re.IGNORECASE),
re.compile(r'<header\b[^>]*>(?:[^<]|<(?!img))*<img[^>]*\bsrc=["\']([^"\']+\.(?:png|svg|jpe?g|webp)[^"\']*)["\']', re.IGNORECASE | re.DOTALL),
re.compile(r'<nav\b[^>]*>(?:[^<]|<(?!img))*<img[^>]*\bsrc=["\']([^"\']+\.(?:png|svg|jpe?g|webp)[^"\']*)["\']', re.IGNORECASE | re.DOTALL),
re.compile(r'<meta[^>]*\bproperty=["\']og:image["\'][^>]*\bcontent=["\']([^"\']+)["\']', re.IGNORECASE),
re.compile(r'<meta[^>]*\bcontent=["\']([^"\']+)["\'][^>]*\bproperty=["\']og:image["\']', re.IGNORECASE),
]
LOGO_CSS_PATTERN = re.compile(
r'\.[\w-]*\blogo\b[\w-]*\s*(?:,\s*\.[\w-]+\s*)*\{[^}]*background(?:-image)?\s*:\s*url\(\s*["\']?([^"\')\s]+)',
re.IGNORECASE | re.DOTALL,
)
def find_logo_url_in_html(html: str, base_url: str, css_texts: list[str] | None = None) -> str | None:
"""HTML 에서 logo URL 찾기. 우선순위: 1) class/id/alt 명시 img 2) 외부 CSS .logo bg 3) header/nav 첫 img."""
def _is_noise(src: str) -> bool:
if not src or src.startswith("data:"):
return True
if re.search(r"(blank|spacer|pixel|transparent|1x1)\b", src, re.IGNORECASE):
return True
if re.search(r"(lang[-_]?(kor|eng|chn|jpn|rus|jp|en|ko|cn|ar|in)|flag|country|icon-|btn-|arrow|prev|next|search)\b", src, re.IGNORECASE):
return True
return False
for pat in LOGO_IMG_PATTERNS[:8]:
for m in pat.finditer(html):
src = m.group(1)
if _is_noise(src):
continue
return urljoin(base_url, src)
for css in (css_texts or []):
m = LOGO_CSS_PATTERN.search(css)
if m:
src = m.group(1)
if not _is_noise(src):
return urljoin(base_url, src)
for pat in LOGO_IMG_PATTERNS[8:]:
for m in pat.finditer(html):
src = m.group(1)
if _is_noise(src):
continue
return urljoin(base_url, src)
return None
# ── 색상 추출 ────────────────────────────────────────────────────────────────
HEX6 = re.compile(r"#([0-9a-fA-F]{6})\b")
HEX3 = re.compile(r"#([0-9a-fA-F]{3})\b(?![0-9a-fA-F])")
RGB = re.compile(r"rgba?\(\s*(\d{1,3})\s*,\s*(\d{1,3})\s*,\s*(\d{1,3})\s*(?:,\s*[\d.]+\s*)?\)")
STYLE_BLOCK = re.compile(r"<style[^>]*>(.*?)</style>", re.IGNORECASE | re.DOTALL)
NOISE = {
"#ffffff", "#000000", "#fff", "#000",
"#333", "#222", "#111", "#444", "#555", "#666", "#777", "#888", "#999",
"#aaa", "#bbb", "#ccc", "#ddd", "#eee", "#f0f0f0", "#f5f5f5", "#fafafa",
}
def _normalize(hex_str: str) -> str:
h = hex_str.lstrip("#").lower()
if len(h) == 3:
h = "".join(c * 2 for c in h)
if len(h) == 8:
h = h[:6]
return f"#{h}"
def _rgb_to_hex(r: int, g: int, b: int) -> str:
return f"#{r:02x}{g:02x}{b:02x}"
def _hex_to_rgb(h: str) -> tuple[int, int, int]:
h = h.lstrip("#")
return int(h[0:2], 16), int(h[2:4], 16), int(h[4:6], 16)
def _distance(a: str, b: str) -> float:
ar, ag, ab = _hex_to_rgb(a)
br, bg, bb = _hex_to_rgb(b)
return ((ar - br) ** 2 + (ag - bg) ** 2 + (ab - bb) ** 2) ** 0.5
def _is_grayscale(h: str, tol: int = 12) -> bool:
r, g, b = _hex_to_rgb(h)
return max(r, g, b) - min(r, g, b) < tol
def _extract_hex(text: str) -> list[str]:
out: list[str] = []
out.extend(_normalize(m.group(0)) for m in HEX6.finditer(text))
out.extend(_normalize(m.group(0)) for m in HEX3.finditer(text))
for m in RGB.finditer(text):
r, g, b = int(m.group(1)), int(m.group(2)), int(m.group(3))
if 0 <= r <= 255 and 0 <= g <= 255 and 0 <= b <= 255:
out.append(_rgb_to_hex(r, g, b))
return out
def _cluster(colors: Counter, threshold: float = 25.0) -> list[tuple[str, int]]:
ranked = colors.most_common()
clusters: list[tuple[str, int]] = []
for color, count in ranked:
merged = False
for i, (rep, rep_count) in enumerate(clusters):
if _distance(color, rep) < threshold:
clusters[i] = (rep, rep_count + count)
merged = True
break
if not merged:
clusters.append((color, count))
return clusters
def extract_brand_colors_from_text(html: str, css_texts: list[str], source_url: str = "") -> dict:
"""HTML + CSS 텍스트에서 hex 빈도 분석 → primary/accent/text + palette. (fetch 없음)"""
all_text_chunks: list[str] = list(STYLE_BLOCK.findall(html))
all_text_chunks.append(html)
all_text_chunks.extend(css_texts)
counter: Counter = Counter()
for text in all_text_chunks:
for color in _extract_hex(text):
if color in NOISE:
continue
counter[color] += 1
if not counter:
logger.info("[brand_parser] no colors extracted from %s", source_url)
return {}
clustered = _cluster(counter)
chromatic = [c for c, _ in clustered if not _is_grayscale(c)]
grayscale = [c for c, _ in clustered if _is_grayscale(c)]
palette_top = clustered[:8]
palette = [{"name": f"색상 {i+1}", "hex": h, "usage": f"빈도 {n}"} for i, (h, n) in enumerate(palette_top)]
return {
"brand_colors": {
"primary": chromatic[0] if chromatic else None,
"accent": chromatic[1] if len(chromatic) > 1 else None,
"text": grayscale[0] if grayscale else None,
},
"color_palette": palette,
"extracted_from": "html+css",
}

89
app/services/branding.py Normal file
View File

@ -0,0 +1,89 @@
"""report 단계 - Gemini Vision 으로 로고 묘사 + 채널 로고 매칭."""
import logging
import os
from urllib.parse import urlparse
from common.db.source import (
select_run_raw_data, update_raw_info_merge,
select_branding_info_id, select_branding_logo_url,
)
from common.utils import _run_optional_step
from integrations.llm.gemini_vision import VisionClient
logger = logging.getLogger(__name__)
async def _describe_logo(analysis_run_id: str, info_id: int, vc: VisionClient) -> None:
"""공식 로고 정성 묘사. branding raw_info["brandAssets"] 머지.
호출 우선순위: raw_info.logo_url 컬럼 (HTML parser canonical) firecrawl 메타 fallback."""
raw = await select_run_raw_data(analysis_run_id)
mainpage = raw.get("mainpage") or {}
homepage_url = mainpage.get("sourceUrl") or ""
branding_meta = mainpage.get("branding") or {}
column_logo = await select_branding_logo_url(analysis_run_id)
candidates = [u for u in [
column_logo,
branding_meta.get("logoUrl"),
branding_meta.get("ogImage"),
branding_meta.get("faviconUrl"),
] if u]
if homepage_url:
parsed = urlparse(homepage_url)
if parsed.scheme and parsed.netloc:
candidates.append(f"{parsed.scheme}://{parsed.netloc}/favicon.ico")
if not candidates:
logger.info("[brand_logo] skip — no candidates")
return
logger.info("[brand_logo] start run=%s candidates=%d", analysis_run_id, len(candidates))
result: dict = {}
for cand in candidates:
result = await vc.analyze_brand_assets(logo_url=cand, homepage_url=homepage_url)
if result:
break
result.pop("logo_images", None) # logo_images 는 컬럼으로 옮겼으니 JSON 에서 제거
if result:
await update_raw_info_merge(info_id, {"brandAssets": result})
logger.info("[brand_logo] done keys=%s", list(result.keys()) if result else None)
async def _describe_channel_logos(analysis_run_id: str, info_id: int, vc: VisionClient) -> None:
"""채널 프로필 로고를 공식 로고와 비교. branding raw_info["channelLogos"] 머지."""
raw = await select_run_raw_data(analysis_run_id)
official = await select_branding_logo_url(analysis_run_id)
_label = {
"instagram": "Instagram",
"facebook": "Facebook",
"youtube": "YouTube",
"instagram_en": "Instagram EN",
"facebook_en": "Facebook EN",
"tiktok": "TikTok",
}
logos = [{"channel": label, "url": img}
for key, label in _label.items()
if (img := (raw.get(key) or {}).get("profileImage"))]
if not logos:
logger.info("[channel_logos] skip — no channel profileImages")
return
logger.info("[channel_logos] start run=%s channels=%s official=%s",
analysis_run_id, [l["channel"] for l in logos], bool(official))
result = await vc.describe_channel_logos(official, logos)
if result:
result["logos"] = logos # Vision 못 본 채널도 url 은 프론트 표시용으로 보관
await update_raw_info_merge(info_id, {"channelLogos": result})
logger.info("[channel_logos] done keys=%s", list(result.keys()) if result else None)
async def analyze_branding(analysis_run_id: str) -> None:
"""report build 직전 호출 — 로고 묘사 + 채널 로고 매칭 (Gemini). 둘 다 격리."""
api_key = os.getenv("GEMINI_API_KEY")
if not api_key:
logger.info("[branding] skip — GEMINI_API_KEY 없음")
return
branding_info_id = await select_branding_info_id(analysis_run_id)
if branding_info_id is None:
logger.info("[branding] skip — branding source 없음 run=%s", analysis_run_id)
return
vc = VisionClient(api_key)
logger.info("[branding] start run=%s", analysis_run_id)
await _run_optional_step(_describe_logo(analysis_run_id, branding_info_id, vc), "brand_logo")
await _run_optional_step(_describe_channel_logos(analysis_run_id, branding_info_id, vc), "channel_logos")
logger.info("[branding] done run=%s", analysis_run_id)

View File

@ -1,120 +1,188 @@
import asyncio import asyncio
import logging import logging
from common.db import ( from common.db.hospital import update_hospital_status, update_hospital
fetchone, from common.db.source import select_run_sources, update_raw_info_status, update_raw_info
set_instagram_status, save_instagram_raw_data,
set_facebook_status, save_facebook_raw_data,
set_naver_blog_status, save_naver_blog_raw_data,
set_youtube_status, save_youtube_raw_data,
set_gangnam_unni_status, save_gangnam_unni_raw_data,
execute, save_hospital_raw_data,
)
from common.utils import get_env, _run_optional_step from common.utils import get_env, _run_optional_step
from integrations.apify import ApifyClient from integrations.apify import ApifyClient
from integrations.naver import NaverClient from integrations.naver import NaverClient
from integrations.youtube import YouTubeClient from integrations.youtube import YouTubeClient
from integrations.firecrawl import FirecrawlClient from integrations.firecrawl import FirecrawlClient
from services.collect_extras import collect_brand_assets, collect_extra_channels, collect_channel_logos from models.status import SourceType
from integrations.site_fetcher import fetch_html_and_css
from services.brand_parser import find_logo_url_in_html, extract_brand_colors_from_text
from common.db.source import update_raw_info_merge, update_raw_info_logo_url, select_run_raw_data
from services.facebook_audit import transform_for_storage as transform_facebook from services.facebook_audit import transform_for_storage as transform_facebook
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
async def collect_instagram(analysis_run_id: str, row_id: int, url: str) -> None: async def collect_instagram(analysis_run_id: str, info_id: int, url: str) -> None:
logger.info("[instagram] start run=%s url=%s", analysis_run_id, url) logger.info("[instagram] start run=%s url=%s", analysis_run_id, url)
await set_instagram_status(row_id, "processing") await update_raw_info_status(info_id, "processing")
data = await ApifyClient(get_env("APIFY_API_TOKEN")).get_instagram_profile(url) data = await ApifyClient(get_env("APIFY_API_TOKEN")).get_instagram_profile(url)
await save_instagram_raw_data(row_id, data) if data is None:
await update_raw_info_status(info_id, "failed")
logger.warning("[instagram] failed run=%s", analysis_run_id)
return
await update_raw_info(info_id, data)
logger.info("[instagram] done run=%s", analysis_run_id) logger.info("[instagram] done run=%s", analysis_run_id)
async def collect_facebook(analysis_run_id: str, row_id: int, url: str) -> None: async def collect_facebook(analysis_run_id: str, info_id: int, url: str) -> None:
logger.info("[facebook] start run=%s url=%s", analysis_run_id, url) logger.info("[facebook] start run=%s url=%s", analysis_run_id, url)
await set_facebook_status(row_id, "processing") await update_raw_info_status(info_id, "processing")
data = await ApifyClient(get_env("APIFY_API_TOKEN")).get_facebook_page(url) data = await ApifyClient(get_env("APIFY_API_TOKEN")).get_facebook_page(url)
if data is None:
await update_raw_info_status(info_id, "failed")
logger.warning("[facebook] failed run=%s", analysis_run_id)
return
data = transform_facebook(data) data = transform_facebook(data)
await save_facebook_raw_data(row_id, data) await update_raw_info(info_id, data)
logger.info("[facebook] done run=%s", analysis_run_id) logger.info("[facebook] done run=%s", analysis_run_id)
async def collect_naver_blog(analysis_run_id: str, row_id: int, url: str) -> None: async def collect_naver_blog(analysis_run_id: str, info_id: int, url: str) -> None:
logger.info("[naver_blog] start run=%s url=%s", analysis_run_id, url) logger.info("[naver_blog] start run=%s url=%s", analysis_run_id, url)
await set_naver_blog_status(row_id, "processing") await update_raw_info_status(info_id, "processing")
data = await NaverClient(get_env("NAVER_CLIENT_ID"), get_env("NAVER_CLIENT_SECRET")).get_blog_rss(url) data = await NaverClient(get_env("NAVER_CLIENT_ID"), get_env("NAVER_CLIENT_SECRET")).get_blog_rss(url)
await save_naver_blog_raw_data(row_id, data) if data is None:
await update_raw_info_status(info_id, "failed")
logger.warning("[naver_blog] failed run=%s", analysis_run_id)
return
await update_raw_info(info_id, data)
logger.info("[naver_blog] done run=%s", analysis_run_id) logger.info("[naver_blog] done run=%s", analysis_run_id)
async def collect_youtube(analysis_run_id: str, row_id: int, url: str) -> None: async def collect_youtube(analysis_run_id: str, info_id: int, url: str) -> None:
logger.info("[youtube] start run=%s url=%s", analysis_run_id, url) logger.info("[youtube] start run=%s url=%s", analysis_run_id, url)
await set_youtube_status(row_id, "processing") await update_raw_info_status(info_id, "processing")
data = await YouTubeClient(get_env("YOUTUBE_API_KEY")).get_channel(url) data = await YouTubeClient(get_env("YOUTUBE_API_KEY")).get_channel(url)
await save_youtube_raw_data(row_id, data) if data is None:
await update_raw_info_status(info_id, "failed")
logger.warning("[youtube] failed run=%s", analysis_run_id)
return
await update_raw_info(info_id, data)
logger.info("[youtube] done run=%s", analysis_run_id) logger.info("[youtube] done run=%s", analysis_run_id)
async def collect_gangnam_unni(analysis_run_id: str, row_id: int, url: str) -> None: async def collect_gangnam_unni(analysis_run_id: str, info_id: int, url: str) -> None:
logger.info("[gangnam_unni] start run=%s url=%s", analysis_run_id, url) logger.info("[gangnam_unni] start run=%s url=%s", analysis_run_id, url)
await set_gangnam_unni_status(row_id, "processing") await update_raw_info_status(info_id, "processing")
data = await FirecrawlClient(get_env("FIRECRAWL_API_KEY")).get_gangnam_unni(url) data = await FirecrawlClient(get_env("FIRECRAWL_API_KEY")).get_gangnam_unni(url)
await save_gangnam_unni_raw_data(row_id, data) if data is None:
await update_raw_info_status(info_id, "failed")
logger.warning("[gangnam_unni] failed run=%s", analysis_run_id)
return
await update_raw_info(info_id, data)
logger.info("[gangnam_unni] done run=%s", analysis_run_id) logger.info("[gangnam_unni] done run=%s", analysis_run_id)
async def collect_clinic_info(analysis_run_id: str, hospital_id: str, url: str) -> None: async def collect_mainpage(analysis_run_id: str, info_id: int, hospital_id: str, url: str) -> None:
logger.info("[clinic] start run=%s url=%s", analysis_run_id, url) logger.info("[mainpage] start run=%s url=%s", analysis_run_id, url)
await execute("UPDATE hospital_baseinfo SET status = 'processing' WHERE hospital_id = %s", (hospital_id,)) await update_raw_info_status(info_id, "processing")
await update_hospital_status(hospital_id, "processing")
data = await FirecrawlClient(get_env("FIRECRAWL_API_KEY")).fetch_clinic_info(url) data = await FirecrawlClient(get_env("FIRECRAWL_API_KEY")).fetch_clinic_info(url)
await save_hospital_raw_data(hospital_id, data, analysis_run_id=analysis_run_id) if data is None:
logger.info("[clinic] done run=%s", analysis_run_id) await update_raw_info_status(info_id, "failed")
logger.warning("[mainpage] failed run=%s", analysis_run_id)
return
# 홈페이지 URL 자체도 raw_data 에 박아둬야 brand_assets / 분석 단계에서 mainpage URL 재조회 없이 사용 가능.
data = {**data, "sourceUrl": url}
await update_raw_info(info_id, data)
await update_hospital(hospital_id, data, analysis_run_id=analysis_run_id)
logger.info("[mainpage] done run=%s", analysis_run_id)
async def collect_all( async def collect_tiktok(analysis_run_id: str, info_id: int, url: str) -> None:
analysis_run_id: str, logger.info("[tiktok] start run=%s url=%s", analysis_run_id, url)
hospital_id: str, await update_raw_info_status(info_id, "processing")
instagram_id: int | None = None, data = await ApifyClient(get_env("APIFY_API_TOKEN")).get_tiktok_profile(url)
facebook_id: int | None = None, if data is None:
naver_blog_id: int | None = None, await update_raw_info_status(info_id, "failed")
youtube_id: int | None = None, logger.warning("[tiktok] failed run=%s", analysis_run_id)
gangnam_unni_id: int | None = None, return
tiktok_url: str | None = None, await update_raw_info(info_id, data)
instagram_en_url: str | None = None, logger.info("[tiktok] done run=%s", analysis_run_id)
facebook_en_url: str | None = None,
kakao_talk_url: str | None = None,
naver_cafe_url: str | None = None,
) -> None:
async def _url(table: str, row_id: int) -> str:
row = await fetchone(f"SELECT url FROM {table} WHERE id = %s", (row_id,))
return row["url"] if row else ""
hospital = await fetchone("SELECT url FROM hospital_baseinfo WHERE hospital_id = %s", (hospital_id,))
tasks = [collect_clinic_info(analysis_run_id, hospital_id, hospital["url"])]
if instagram_id: async def collect_naver_cafe(analysis_run_id: str, info_id: int, url: str) -> None:
tasks.append(collect_instagram(analysis_run_id, instagram_id, await _url("instagram_data", instagram_id))) """카페는 로그인 필요라 본문 못 봄. URL 활성·cafeId·이름 언급수만 신호로 수집."""
if facebook_id: logger.info("[naver_cafe] start run=%s url=%s", analysis_run_id, url)
tasks.append(collect_facebook(analysis_run_id, facebook_id, await _url("facebook_data", facebook_id))) await update_raw_info_status(info_id, "processing")
if naver_blog_id: data = await NaverClient(get_env("NAVER_CLIENT_ID"), get_env("NAVER_CLIENT_SECRET")).get_cafe_info(url)
tasks.append(collect_naver_blog(analysis_run_id, naver_blog_id, await _url("naver_blog_data", naver_blog_id))) if data is None:
if youtube_id: await update_raw_info_status(info_id, "failed")
tasks.append(collect_youtube(analysis_run_id, youtube_id, await _url("youtube_data", youtube_id))) logger.warning("[naver_cafe] failed run=%s", analysis_run_id)
if gangnam_unni_id: return
tasks.append(collect_gangnam_unni(analysis_run_id, gangnam_unni_id, await _url("gangnam_unni_data", gangnam_unni_id))) await update_raw_info(info_id, data)
logger.info("[naver_cafe] done run=%s", analysis_run_id)
async def collect_kakaotalk(analysis_run_id: str, info_id: int, url: str) -> None:
"""카카오톡은 수집 X — URL 보관만. LLM이 채널 존재 신호로만 사용."""
logger.info("[kakaotalk] url-only run=%s url=%s", analysis_run_id, url)
await update_raw_info(info_id, {"url": url})
async def collect_brand_basics(analysis_run_id: str, info_id: int) -> None:
"""branding 단계 collect — HTML/CSS 한 번 fetch → logo URL(컬럼) + brand 색상(JSON).
mainpage 수집 결과 의존이라 main wave gather 끝난 호출."""
logger.info("[brand_basics] start run=%s info=%s", analysis_run_id, info_id)
raw = await select_run_raw_data(analysis_run_id)
mainpage = raw.get("mainpage") or {}
homepage_url = mainpage.get("sourceUrl") or ""
branding_meta = mainpage.get("branding") or {}
html, css_texts = await fetch_html_and_css(homepage_url) if homepage_url else ("", [])
html_logo_url = find_logo_url_in_html(html, homepage_url, css_texts) if html else None
css_colors = extract_brand_colors_from_text(html, css_texts, homepage_url) if html else {}
logo_url = html_logo_url or branding_meta.get("logoUrl") or branding_meta.get("ogImage")
if logo_url:
await update_raw_info_logo_url(info_id, logo_url)
payload: dict = {}
if css_colors:
if css_colors.get("brand_colors"): payload["brand_colors"] = css_colors["brand_colors"]
if css_colors.get("color_palette"): payload["color_palette"] = css_colors["color_palette"]
payload["color_source"] = "html+css"
if payload:
await update_raw_info_merge(info_id, {"brandAssets": payload})
logger.info("[brand_basics] done logo_url=%s colors=%s", bool(logo_url), bool(payload))
async def collect_all(analysis_run_id: str, hospital_id: str) -> None:
rows = await select_run_sources(analysis_run_id)
# source_type → collector. KR/EN 구분은 collector 입장에서 동일, language 컬럼만 다름.
_collectors = {
SourceType.INSTAGRAM: collect_instagram,
SourceType.FACEBOOK: collect_facebook,
SourceType.NAVER_BLOG: collect_naver_blog,
SourceType.YOUTUBE: collect_youtube,
SourceType.GANGNAM_UNNI: collect_gangnam_unni,
SourceType.TIKTOK: collect_tiktok,
SourceType.NAVER_CAFE: collect_naver_cafe,
SourceType.KAKAOTALK: collect_kakaotalk,
}
tasks = []
branding_info_id: int | None = None
for row in rows:
info_id = row["info_id"]
source_type = row["source_type"]
url = row["url"]
if source_type == SourceType.BRANDING:
branding_info_id = info_id # mainpage·채널 수집 끝난 뒤 2단계에서 사용
continue
if source_type == SourceType.MAINPAGE:
tasks.append(collect_mainpage(analysis_run_id, info_id, hospital_id, url))
elif source_type in _collectors:
tasks.append(_collectors[source_type](analysis_run_id, info_id, url))
await asyncio.gather(*tasks, return_exceptions=True) await asyncio.gather(*tasks, return_exceptions=True)
# 아래 3단계는 모두 hospital raw_data를 read-modify-write 하므로 race 방지 위해 순차. # 2단계: branding (brandAssets → channelLogos 한 raw_info 안에 머지). mainpage·채널 raw_data 의존이라 순차.
# brand_assets : clinic_info가 채운 branding.logoUrl로 공식 로고/hex 추출 # 부가 기능이라 실패해도 리포트는 나와야 하므로 _run_optional_step 으로 격리.
# extra_channels: 틱톡/인스타EN/페북EN 수집 if branding_info_id is not None:
# channel_logos : 공식 로고(brand_assets)+채널 profileImage(extra_channels) 채워진 뒤 Vision 비교 await _run_optional_step(collect_brand_basics(analysis_run_id, branding_info_id), "brand_basics")
# 부가 기능이라 실패해도 리포트는 나와야 하므로 _run_optional_step으로 각각 격리.
await _run_optional_step(collect_brand_assets(analysis_run_id, hospital_id), "brand_assets")
await _run_optional_step(
collect_extra_channels(
analysis_run_id, hospital_id,
tiktok_url=tiktok_url, instagram_en_url=instagram_en_url, facebook_en_url=facebook_en_url,
kakao_talk_url=kakao_talk_url, naver_cafe_url=naver_cafe_url,
),
"extra_channels",
)
await _run_optional_step(collect_channel_logos(analysis_run_id, hospital_id), "channel_logos")

View File

@ -1,191 +0,0 @@
import asyncio
import json
import logging
import os
from urllib.parse import urlparse
from common.db import fetchone, fetch_raw, merge_hospital_raw_data
from common.utils import get_env
from integrations.apify import ApifyClient
from integrations.vision import VisionClient
from integrations.naver import NaverClient
from integrations.color_extractor import extract_brand_assets_from_site
from services.facebook_audit import transform_for_storage as transform_facebook
logger = logging.getLogger(__name__)
async def collect_brand_assets(analysis_run_id: str, hospital_id: str) -> None:
"""홈페이지에서 로고 URL + brand hex 색상을 뽑아 raw_data["brandAssets"]에 저장.
- 로고 URL/hex: HTML·CSS 정규식 (color_extractor) Vision 의존 X, 사이트 전체 컬러 시스템이 정확.
- 로고 정성 묘사(심볼/워드마크/): Gemini Vision (GEMINI_API_KEY 없으면 색상만 저장하고 skip).
"""
logger.info("[brand_assets] start run=%s", analysis_run_id)
row = await fetchone(
"SELECT raw_data, url FROM hospital_baseinfo WHERE hospital_id = %s",
(hospital_id,),
)
if not row:
return
raw = row["raw_data"]
raw_data = json.loads(raw) if isinstance(raw, str) else (raw or {})
branding = raw_data.get("branding") or {}
homepage_url = row["url"]
# 0~1. 사이트 1회 fetch로 logo URL + brand hex 동시 추출 (img/background-image/CSS .logo, Vision 의존 X)
site = await extract_brand_assets_from_site(homepage_url) if homepage_url else {}
html_logo_url = site.get("logo_url")
css_colors = site.get("colors") or {}
if html_logo_url:
logger.info("[brand_assets] HTML logo found: %s", html_logo_url)
if css_colors:
logger.info("[brand_assets] css colors: %s", css_colors.get("brand_colors"))
# 2. 로고/대표 이미지 후보 (logo → og:image → favicon 순)
logo_url = html_logo_url or branding.get("logoUrl")
og_image = branding.get("ogImage")
favicon = branding.get("faviconUrl")
candidates: list[tuple[str, str]] = []
if logo_url: candidates.append(("logo", logo_url))
if og_image: candidates.append(("og", og_image))
if favicon: candidates.append(("favicon", favicon))
if homepage_url:
parsed = urlparse(homepage_url)
if parsed.scheme and parsed.netloc:
candidates.append(("favicon", f"{parsed.scheme}://{parsed.netloc}/favicon.ico"))
if not candidates and not css_colors:
logger.info("[brand_assets] skip — no logo/og/favicon candidates and no CSS colors")
return
# 3. Vision은 로고 정성 묘사만 (hex는 CSS 추출이 더 정확). 키 없으면 색상만 저장.
# SVG는 vision 내부에서 resvg로 PNG 래스터화 후 Gemini에 던지므로 분기 불필요.
result: dict = {}
used_kind: str | None = None
api_key = os.getenv("GEMINI_API_KEY")
if api_key and candidates:
vc = VisionClient(api_key)
for kind, cand in candidates:
result = await vc.analyze_brand_assets(logo_url=cand, homepage_url=homepage_url)
if result:
used_kind = kind
break
# favicon으로만 분석된 경우 진짜 로고가 아니므로 logo URL은 박지 않음 (묘사는 OK)
if result and used_kind == "favicon" and result.get("logo_images"):
result["logo_images"] = {"circle": None, "horizontal": None, "korean": None}
elif not api_key:
logger.info("[brand_assets] GEMINI_API_KEY not set — 색상만 저장, Vision 묘사 skip")
# 4. CSS에서 추출한 brand_colors/palette를 Vision보다 우선 사용
if css_colors:
if css_colors.get("brand_colors"):
result["brand_colors"] = css_colors["brand_colors"]
if css_colors.get("color_palette"):
result["color_palette"] = css_colors["color_palette"]
result["color_source"] = "html+css"
elif result:
result["color_source"] = "vision"
if result:
result["logo_source"] = used_kind or "none"
await merge_hospital_raw_data(hospital_id, {"brandAssets": result})
logger.info("[brand_assets] done keys=%s", list(result.keys()) if result else None)
async def collect_extra_channels(
analysis_run_id: str,
hospital_id: str,
tiktok_url: str | None = None,
instagram_en_url: str | None = None,
facebook_en_url: str | None = None,
kakao_talk_url: str | None = None,
naver_cafe_url: str | None = None,
) -> None:
"""틱톡 / 인스타 EN / 페북 EN / 네이버 카페 수집 + 카카오톡 URL만 보관 →
모두 hospital raw_data에 저장. 인스타EN·페북EN은 기존 Apify 수집기 재사용, 틱톡은 신규 액터.
네이버 카페는 로그인 필요라 본문 보지만 URL 활성·cafeId·이름 언급수만 신호로 수집.
카카오톡은 URL만 (LLM이 채널 존재 신호로만 사용)."""
apify = ApifyClient(get_env("APIFY_API_TOKEN"))
jobs: dict = {}
if instagram_en_url:
jobs["instagramEn"] = apify.get_instagram_profile(instagram_en_url)
if facebook_en_url:
jobs["facebookEn"] = apify.get_facebook_page(facebook_en_url)
if tiktok_url:
jobs["tiktok"] = apify.get_tiktok_profile(tiktok_url)
if naver_cafe_url:
nc = NaverClient(get_env("NAVER_CLIENT_ID"), get_env("NAVER_CLIENT_SECRET"))
jobs["naverCafe"] = nc.get_cafe_info(naver_cafe_url)
results: dict = {}
if jobs:
logger.info("[extra_channels] start run=%s channels=%s", analysis_run_id, list(jobs))
done = await asyncio.gather(*jobs.values(), return_exceptions=True)
for key, res in zip(jobs.keys(), done):
if isinstance(res, Exception):
logger.warning("[extra_channels] %s 수집 실패: %s", key, res)
elif res:
if key == "facebookEn":
res = transform_facebook(res)
results[key] = res
# URL-only 채널 (수집 X, 존재 여부만)
if kakao_talk_url:
results["kakaoTalk"] = {"url": kakao_talk_url}
if not results:
logger.info("[extra_channels] 수집 결과 없음 run=%s", analysis_run_id)
return
await merge_hospital_raw_data(hospital_id, results)
logger.info("[extra_channels] done run=%s keys=%s", analysis_run_id, list(results))
async def collect_channel_logos(analysis_run_id: str, hospital_id: str) -> None:
"""채널별 프로필 이미지(로고)를 모아 Gemini Vision으로 설명 + 공식 로고 일치 여부 평가.
hospital raw_data["channelLogos"] 저장. GEMINI_API_KEY 없으면 skip.
brand_assets(공식 로고)·extra_channels(틱톡/EN profileImage) 다음에 실행돼야 ."""
api_key = os.getenv("GEMINI_API_KEY")
if not api_key:
logger.info("[channel_logos] skip — GEMINI_API_KEY 없음")
return
hrow = await fetchone("SELECT raw_data FROM hospital_baseinfo WHERE hospital_id = %s", (hospital_id,))
raw = hrow["raw_data"] if hrow else None
raw_data = json.loads(raw) if isinstance(raw, str) else (raw or {})
official = ((raw_data.get("brandAssets") or {}).get("logo_images") or {}).get("horizontal")
run = await fetchone(
"SELECT instagram_data_id, facebook_data_id, youtube_data_id"
" FROM analysis_runs WHERE analysis_run_id = %s",
(analysis_run_id,),
)
logos: list[dict] = []
# 전용 테이블 채널 (KR)
for ch, table, col in [
("Instagram", "instagram_data", "instagram_data_id"),
("Facebook", "facebook_data", "facebook_data_id"),
("YouTube", "youtube_data", "youtube_data_id"),
]:
rid = (run or {}).get(col)
if rid:
d = await fetch_raw(table, rid) or {}
if d.get("profileImage"):
logos.append({"channel": ch, "url": d["profileImage"]})
# 추가 채널 (hospital raw_data)
for ch, key in [("Instagram EN", "instagramEn"), ("Facebook EN", "facebookEn"), ("TikTok", "tiktok")]:
img = (raw_data.get(key) or {}).get("profileImage")
if img:
logos.append({"channel": ch, "url": img})
if not logos:
logger.info("[channel_logos] skip — 채널 프로필 이미지 없음")
return
logger.info("[channel_logos] start run=%s channels=%s official=%s", analysis_run_id,
[l["channel"] for l in logos], bool(official))
result = await VisionClient(api_key).describe_channel_logos(official, logos)
if result:
# Vision이 못 본 채널도 url은 채워둠 (프론트에서 이미지 표시용)
result["logos"] = logos
await merge_hospital_raw_data(hospital_id, {"channelLogos": result})
logger.info("[channel_logos] done run=%s keys=%s", analysis_run_id, list(result.keys()) if result else None)

View File

@ -2,7 +2,8 @@ import logging
from fastapi import HTTPException, UploadFile from fastapi import HTTPException, UploadFile
from common.db import execute, fetchall, fetchone, insert_file_row from common.db.run import select_run
from common.db.file_data import insert_file, select_run_files, select_file, delete_file
from integrations.azure_blob import AzureBlobUploader from integrations.azure_blob import AzureBlobUploader
from models.file import FileListItem, FileType, FileUploadResponse from models.file import FileListItem, FileType, FileUploadResponse
@ -31,10 +32,7 @@ async def upload_analysis_file(
content_type: str | None = None, content_type: str | None = None,
) -> tuple[int, str]: ) -> tuple[int, str]:
"""analysis_run에 딸린 파일 업로드. Blob 업로드 + file_data row 생성. (file_id, url) 반환.""" """analysis_run에 딸린 파일 업로드. Blob 업로드 + file_data row 생성. (file_id, url) 반환."""
run = await fetchone( run = await select_run(analysis_run_id)
"SELECT hospital_id FROM analysis_runs WHERE analysis_run_id = %s",
(analysis_run_id,),
)
if not run: if not run:
raise HTTPException(status_code=404, detail="analysis_run not found") raise HTTPException(status_code=404, detail="analysis_run not found")
hospital_id = run["hospital_id"] hospital_id = run["hospital_id"]
@ -47,7 +45,7 @@ async def upload_analysis_file(
content_type=content_type, content_type=content_type,
) )
file_id = await insert_file_row( file_id = await insert_file(
analysis_run_id=analysis_run_id, analysis_run_id=analysis_run_id,
hospital_id=hospital_id, hospital_id=hospital_id,
file_type=file_type, file_type=file_type,
@ -61,12 +59,7 @@ async def upload_analysis_file(
async def list_analysis_files(analysis_run_id: str) -> list[dict]: async def list_analysis_files(analysis_run_id: str) -> list[dict]:
"""analysis_run에 딸린 (삭제 안 된) 파일 목록.""" """analysis_run에 딸린 (삭제 안 된) 파일 목록."""
return await fetchall( return await select_run_files(analysis_run_id)
"SELECT id, file_type, file_name, file_url, size_bytes, created_at FROM file_data"
" WHERE analysis_run_id = %s AND is_deleted = FALSE"
" ORDER BY created_at DESC",
(analysis_run_id,),
)
async def handle_analysis_file_upload( async def handle_analysis_file_upload(
@ -102,7 +95,7 @@ async def handle_analysis_file_upload(
async def get_analysis_files_response(analysis_run_id: str) -> list[FileListItem]: async def get_analysis_files_response(analysis_run_id: str) -> list[FileListItem]:
"""run 존재 확인 + 응답 모델 생성.""" """run 존재 확인 + 응답 모델 생성."""
if not await fetchone("SELECT 1 FROM analysis_runs WHERE analysis_run_id = %s", (analysis_run_id,)): if not await select_run(analysis_run_id):
raise HTTPException(status_code=404, detail="analysis_run not found") raise HTTPException(status_code=404, detail="analysis_run not found")
rows = await list_analysis_files(analysis_run_id) rows = await list_analysis_files(analysis_run_id)
return [FileListItem(**{**r, "created_at": str(r["created_at"])}) for r in rows] return [FileListItem(**{**r, "created_at": str(r["created_at"])}) for r in rows]
@ -110,14 +103,8 @@ async def get_analysis_files_response(analysis_run_id: str) -> list[FileListItem
async def soft_delete_analysis_file(analysis_run_id: str, file_id: int) -> None: async def soft_delete_analysis_file(analysis_run_id: str, file_id: int) -> None:
"""analysis_run에 딸린 파일을 소프트 삭제. 멱등성 보장.""" """analysis_run에 딸린 파일을 소프트 삭제. 멱등성 보장."""
row = await fetchone( row = await select_file(file_id, analysis_run_id)
"SELECT id FROM file_data WHERE id = %s AND analysis_run_id = %s",
(file_id, analysis_run_id),
)
if not row: if not row:
raise HTTPException(status_code=404, detail="file not found") raise HTTPException(status_code=404, detail="file not found")
await execute( await delete_file(file_id)
"UPDATE file_data SET is_deleted = TRUE WHERE id = %s AND is_deleted = FALSE",
(file_id,),
)
logger.info("soft-deleted analysis file run=%s file_id=%s", analysis_run_id, file_id) logger.info("soft-deleted analysis file run=%s file_id=%s", analysis_run_id, file_id)

View File

@ -1,7 +1,9 @@
import asyncio import asyncio
import json
import logging import logging
from common.db import fetchone, execute from common.db.run import select_run
from common.db.hospital import select_hospital
from common.db.market import upsert_market_status, upsert_market_result
from common.db.source import select_run_raw_data
from integrations.llm.llm_service import LLMService from integrations.llm.llm_service import LLMService
from integrations.llm.prompt import ( from integrations.llm.prompt import (
market_competitors_prompt, market_competitors_prompt,
@ -18,49 +20,27 @@ _TYPES = ["competitors", "keywords", "trend", "target_audience"]
async def _save(analysis_run_id: str, analysis_type: str, result, exc: Exception | None) -> None: async def _save(analysis_run_id: str, analysis_type: str, result, exc: Exception | None) -> None:
if exc: if exc:
logger.warning("[market] %s failed run=%s: %s", analysis_type, analysis_run_id, exc) logger.warning("[market] %s failed run=%s: %s", analysis_type, analysis_run_id, exc)
await execute( await upsert_market_status(analysis_run_id, analysis_type, "failed")
"INSERT INTO market_analysis (analysis_run_id, analysis_type, status)"
" VALUES (%s, %s, 'failed')"
" ON DUPLICATE KEY UPDATE status = 'failed'",
(analysis_run_id, analysis_type),
)
else: else:
await execute( await upsert_market_result(analysis_run_id, analysis_type, result.model_dump())
"INSERT INTO market_analysis (analysis_run_id, analysis_type, status, data)"
" VALUES (%s, %s, 'done', %s)"
" ON DUPLICATE KEY UPDATE status = 'done', data = VALUES(data)",
(analysis_run_id, analysis_type, json.dumps(result.model_dump(), ensure_ascii=False)),
)
async def run_market_analysis(analysis_run_id: str) -> None: async def run_market_analysis(analysis_run_id: str) -> None:
logger.info("[market] start run=%s", analysis_run_id) logger.info("[market] start run=%s", analysis_run_id)
run = await fetchone( run = await select_run(analysis_run_id)
"SELECT hospital_id FROM analysis_runs WHERE analysis_run_id = %s", clinic = await select_hospital(run["hospital_id"])
(analysis_run_id,), raw = await select_run_raw_data(analysis_run_id)
) mainpage = raw.get("mainpage") or {}
clinic = await fetchone(
"SELECT hospital_name, road_address, raw_data FROM hospital_baseinfo WHERE hospital_id = %s",
(run["hospital_id"],),
)
raw_data = clinic["raw_data"] clinic_name = (clinic or {}).get("hospital_name") or ""
clinic_data = json.loads(raw_data) if isinstance(raw_data, str) else (raw_data or {}) address = (clinic or {}).get("road_address") or ""
services = mainpage.get("services", [])
clinic_name = clinic["hospital_name"] or ""
address = clinic["road_address"] or ""
services = clinic_data.get("services", [])
services_str = ", ".join(services[:3]) services_str = ", ".join(services[:3])
primary_service = services[0] if services else "" primary_service = services[0] if services else ""
for analysis_type in _TYPES: for analysis_type in _TYPES:
await execute( await upsert_market_status(analysis_run_id, analysis_type, "processing")
"INSERT INTO market_analysis (analysis_run_id, analysis_type, status)"
" VALUES (%s, %s, 'processing')"
" ON DUPLICATE KEY UPDATE status = 'processing'",
(analysis_run_id, analysis_type),
)
llm = LLMService(provider="perplexity") llm = LLMService(provider="perplexity")
results = await asyncio.gather( results = await asyncio.gather(

View File

@ -1,5 +1,5 @@
import logging import logging
from common.db import fetchone, execute from common.db.run import select_run, update_run_status
from models.status import AnalysisStatus from models.status import AnalysisStatus
from services.collect import collect_all from services.collect import collect_all
from services.market import run_market_analysis from services.market import run_market_analysis
@ -8,51 +8,23 @@ from services.analysis import run_report_task, run_plan_task
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
async def run_pipeline(analysis_run_id: str, extra_channels: dict | None = None) -> None: async def run_pipeline(analysis_run_id: str) -> None:
logger.info("[pipeline] start run=%s", analysis_run_id) logger.info("[pipeline] start run=%s", analysis_run_id)
extra_channels = extra_channels or {}
# ── 1. Collect ────────────────────────────────────────────────────────── # ── 1. Collect ──────────────────────────────────────────────────────────
run = await fetchone( run = await select_run(analysis_run_id)
"SELECT hospital_id, instagram_data_id, facebook_data_id," await collect_all(analysis_run_id, hospital_id=run["hospital_id"])
" naver_blog_data_id, youtube_data_id, gangnam_unni_data_id"
" FROM analysis_runs WHERE analysis_run_id = %s",
(analysis_run_id,),
)
await collect_all(
analysis_run_id,
hospital_id=run["hospital_id"],
instagram_id=run["instagram_data_id"],
facebook_id=run["facebook_data_id"],
naver_blog_id=run["naver_blog_data_id"],
youtube_id=run["youtube_data_id"],
gangnam_unni_id=run["gangnam_unni_data_id"],
tiktok_url=extra_channels.get("tiktok"),
instagram_en_url=extra_channels.get("instagram_en"),
facebook_en_url=extra_channels.get("facebook_en"),
kakao_talk_url=extra_channels.get("kakao_talk"),
naver_cafe_url=extra_channels.get("naver_cafe"),
)
# ── 2. Market ──────────────────────────────────────────────────────────── # ── 2. Market ────────────────────────────────────────────────────────────
await execute( await update_run_status(analysis_run_id, AnalysisStatus.ANALYZING)
"UPDATE analysis_runs SET status = %s WHERE analysis_run_id = %s",
(AnalysisStatus.ANALYZING, analysis_run_id),
)
await run_market_analysis(analysis_run_id) await run_market_analysis(analysis_run_id)
# ── 3. Report ──────────────────────────────────────────────────────────── # ── 3. Report ────────────────────────────────────────────────────────────
await run_report_task(analysis_run_id) await run_report_task(analysis_run_id)
# ── 4. Plan ────────────────────────────────────────────────────────────── # ── 4. Plan ──────────────────────────────────────────────────────────────
await execute( await update_run_status(analysis_run_id, AnalysisStatus.PLANNING)
"UPDATE analysis_runs SET status = %s WHERE analysis_run_id = %s",
(AnalysisStatus.PLANNING, analysis_run_id),
)
await run_plan_task(analysis_run_id) await run_plan_task(analysis_run_id)
await execute( await update_run_status(analysis_run_id, AnalysisStatus.COMPLETED)
"UPDATE analysis_runs SET status = %s WHERE analysis_run_id = %s",
(AnalysisStatus.COMPLETED, analysis_run_id),
)
logger.info("[pipeline] done run=%s", analysis_run_id) logger.info("[pipeline] done run=%s", analysis_run_id)