Compare commits

..

3 Commits

Author SHA1 Message Date
jaehwang 3b4c154fb2 db migration done 2026-06-01 15:31:33 +09:00
jaehwang c9c5ee9177 Merge branch 'main' into db-migration 2026-05-29 16:31:47 +09:00
jaehwang eed57729d9 clinic_overview , youtube analysis 정리 2026-05-29 16:19:06 +09:00
28 changed files with 819 additions and 1095 deletions

View File

@ -1,85 +1,4 @@
-- 테이블 순서는 관계를 고려하여 한 번에 실행해도 에러가 발생하지 않게 정렬되었습니다. -- user_info
-- instagram_data Table Create SQL
-- 테이블 생성 SQL - instagram_data
CREATE TABLE instagram_data
(
`id` INT NOT NULL AUTO_INCREMENT,
`hospital_id` CHAR(36) NOT NULL,
`url` VARCHAR(500) NOT NULL,
`status` VARCHAR(20) NOT NULL DEFAULT 'start',
`raw_data` JSON NULL,
`created_at` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (id)
);
-- Index 설정 SQL - instagram_data(hospital_id)
CREATE INDEX IX_instagram_data_1
ON instagram_data(hospital_id);
-- facebook_data Table Create SQL
-- 테이블 생성 SQL - facebook_data
CREATE TABLE facebook_data
(
`id` INT NOT NULL AUTO_INCREMENT,
`hospital_id` CHAR(36) NOT NULL,
`url` VARCHAR(500) NOT NULL,
`status` VARCHAR(20) NOT NULL DEFAULT 'start',
`raw_data` JSON NULL,
`created_at` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (id)
);
-- Index 설정 SQL - facebook_data(hospital_id)
CREATE INDEX IX_facebook_data_1
ON facebook_data(hospital_id);
-- naver_blog_data Table Create SQL
-- 테이블 생성 SQL - naver_blog_data
CREATE TABLE naver_blog_data
(
`id` INT NOT NULL AUTO_INCREMENT,
`hospital_id` CHAR(36) NOT NULL,
`url` VARCHAR(500) NOT NULL,
`status` VARCHAR(20) NOT NULL DEFAULT 'start',
`raw_data` JSON NULL,
`created_at` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (id)
);
-- Index 설정 SQL - naver_blog_data(hospital_id)
CREATE INDEX IX_naver_blog_data_1
ON naver_blog_data(hospital_id);
-- hospital_baseinfo Table Create SQL
-- 테이블 생성 SQL - hospital_baseinfo
CREATE TABLE hospital_baseinfo
(
`hospital_id` CHAR(36) NOT NULL,
`owner_user_id` INT NOT NULL,
`hospital_name` VARCHAR(50) NOT NULL,
`hospital_name_en` VARCHAR(50) NULL,
`brn` VARCHAR(50) NOT NULL,
`road_address` VARCHAR(100) NULL,
`site_address` VARCHAR(100) NULL,
`url` VARCHAR(500) NULL,
`status` VARCHAR(20) NOT NULL DEFAULT 'start',
`raw_data` JSON NULL,
`created_at` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
`updated_at` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY (hospital_id)
);
-- Index 설정 SQL - hospital_baseinfo(owner_user_id)
CREATE INDEX IX_hospital_baseinfo_1
ON hospital_baseinfo(owner_user_id);
-- user_info Table Create SQL
-- 테이블 생성 SQL - user_info
CREATE TABLE user_info CREATE TABLE user_info
( (
`user_id` INT NOT NULL AUTO_INCREMENT, `user_id` INT NOT NULL AUTO_INCREMENT,
@ -90,52 +9,49 @@ CREATE TABLE user_info
PRIMARY KEY (user_id) PRIMARY KEY (user_id)
); );
-- youtube_data Table Create SQL
CREATE TABLE youtube_data -- hospital_baseinfo
CREATE TABLE hospital_baseinfo
( (
`id` INT NOT NULL AUTO_INCREMENT,
`hospital_id` CHAR(36) NOT NULL, `hospital_id` CHAR(36) NOT NULL,
`url` VARCHAR(500) NOT NULL, `owner_user_id` INT NOT NULL,
`hospital_name` VARCHAR(50) NOT NULL,
`hospital_name_en` VARCHAR(50) NULL,
`brn` VARCHAR(50) NOT NULL,
`road_address` VARCHAR(100) NULL,
`site_address` VARCHAR(100) NULL,
`status` VARCHAR(20) NOT NULL DEFAULT 'start', `status` VARCHAR(20) NOT NULL DEFAULT 'start',
`raw_data` JSON NULL,
`created_at` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, `created_at` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (id) `updated_at` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY (hospital_id)
); );
-- Index 설정 SQL - youtube_data(hospital_id) CREATE INDEX IX_hospital_baseinfo_1 ON hospital_baseinfo (owner_user_id);
CREATE INDEX IX_youtube_data_1
ON youtube_data(hospital_id);
-- gangnam_unni_data Table Create SQL -- remote_source: 병원별 채널 소스 정보 (instagram/facebook/naver_blog/youtube/gangnam_unni 등)
CREATE TABLE gangnam_unni_data CREATE TABLE remote_source
( (
`id` INT NOT NULL AUTO_INCREMENT, `source_id` INT NOT NULL AUTO_INCREMENT,
`hospital_id` CHAR(36) NOT NULL, `hospital_id` CHAR(36) NOT NULL,
`source_type` VARCHAR(50) NOT NULL,
`language` CHAR(2) NULL,
`url` VARCHAR(500) NOT NULL, `url` VARCHAR(500) NOT NULL,
`status` VARCHAR(20) NOT NULL DEFAULT 'start',
`raw_data` JSON NULL,
`created_at` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, `created_at` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (id) PRIMARY KEY (source_id)
); );
-- Index 설정 SQL - gangnam_unni_data(hospital_id) CREATE INDEX IX_remote_source_1 ON remote_source (hospital_id);
CREATE INDEX IX_gangnam_unni_data_1 CREATE INDEX IX_remote_source_2 ON remote_source (hospital_id, source_type);
ON gangnam_unni_data(hospital_id);
-- analysis_runs Table Create SQL -- analysis_runs
CREATE TABLE analysis_runs CREATE TABLE analysis_runs
( (
`analysis_run_id` CHAR(36) NOT NULL, `analysis_run_id` CHAR(36) NOT NULL,
`hospital_id` CHAR(36) NOT NULL, `hospital_id` CHAR(36) NOT NULL,
`owner_user_id` INT NOT NULL DEFAULT 0, `owner_user_id` INT NOT NULL DEFAULT 0,
`status` VARCHAR(50) NOT NULL DEFAULT 'discovering', `status` VARCHAR(50) NOT NULL DEFAULT 'discovering',
`instagram_data_id` INT NULL,
`facebook_data_id` INT NULL,
`naver_blog_data_id` INT NULL,
`youtube_data_id` INT NULL,
`gangnam_unni_data_id` INT NULL,
`report_data` JSON NULL, `report_data` JSON NULL,
`plan_data` JSON NULL, `plan_data` JSON NULL,
`created_at` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, `created_at` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
@ -143,16 +59,29 @@ CREATE TABLE analysis_runs
PRIMARY KEY (analysis_run_id) PRIMARY KEY (analysis_run_id)
); );
-- Index 설정 SQL - analysis_runs(hospital_id) CREATE INDEX IX_analysis_runs_1 ON analysis_runs (hospital_id);
CREATE INDEX IX_analysis_runs_1 CREATE INDEX IX_analysis_runs_2 ON analysis_runs (owner_user_id);
ON analysis_runs(hospital_id);
-- Index 설정 SQL - analysis_runs(owner_user_id)
CREATE INDEX IX_analysis_runs_2
ON analysis_runs(owner_user_id);
-- file_data Table Create SQL -- raw_info: 분석 실행별 수집 원시 데이터
CREATE TABLE raw_info
(
`info_id` INT NOT NULL AUTO_INCREMENT,
`source_id` INT NOT NULL,
`analysis_run_id` CHAR(36) NOT NULL,
`data_tag` VARCHAR(50) NOT NULL DEFAULT 'default',
`status` VARCHAR(20) NOT NULL DEFAULT 'start',
`raw_data` JSON NULL,
`created_at` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
`updated_at` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY (info_id)
);
CREATE INDEX IX_raw_info_1 ON raw_info (analysis_run_id);
CREATE INDEX IX_raw_info_2 ON raw_info (source_id);
-- file_data
CREATE TABLE file_data CREATE TABLE file_data
( (
`id` INT NOT NULL AUTO_INCREMENT, `id` INT NOT NULL AUTO_INCREMENT,
@ -169,7 +98,7 @@ CREATE TABLE file_data
); );
-- hospital_history Table Create SQL -- hospital_history
CREATE TABLE hospital_history CREATE TABLE hospital_history
( (
`id` INT NOT NULL AUTO_INCREMENT, `id` INT NOT NULL AUTO_INCREMENT,
@ -180,24 +109,17 @@ CREATE TABLE hospital_history
`brn` VARCHAR(50) NOT NULL, `brn` VARCHAR(50) NOT NULL,
`road_address` VARCHAR(100) NULL, `road_address` VARCHAR(100) NULL,
`site_address` VARCHAR(100) NULL, `site_address` VARCHAR(100) NULL,
`url` VARCHAR(500) NULL,
`status` VARCHAR(20) NOT NULL, `status` VARCHAR(20) NOT NULL,
`raw_data` JSON NULL,
`analysis_run_id` CHAR(36) NULL, `analysis_run_id` CHAR(36) NULL,
`created_at` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, `created_at` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (id) PRIMARY KEY (id)
); );
-- Index 설정 SQL - hospital_history(hospital_id) CREATE INDEX IX_hospital_history_1 ON hospital_history (hospital_id);
CREATE INDEX IX_hospital_history_1 CREATE INDEX IX_hospital_history_2 ON hospital_history (analysis_run_id);
ON hospital_history(hospital_id);
-- Index 설정 SQL - hospital_history(analysis_run_id)
CREATE INDEX IX_hospital_history_2
ON hospital_history(analysis_run_id);
-- market_analysis Table Create SQL -- market_analysis
CREATE TABLE market_analysis CREATE TABLE market_analysis
( (
`id` INT NOT NULL AUTO_INCREMENT, `id` INT NOT NULL AUTO_INCREMENT,
@ -210,7 +132,4 @@ CREATE TABLE market_analysis
UNIQUE KEY UQ_market_analysis (analysis_run_id, analysis_type) UNIQUE KEY UQ_market_analysis (analysis_run_id, analysis_type)
); );
-- Index 설정 SQL - market_analysis(analysis_run_id) CREATE INDEX IX_market_analysis_1 ON market_analysis (analysis_run_id);
CREATE INDEX IX_market_analysis_1
ON market_analysis(analysis_run_id);

View File

@ -1,134 +0,0 @@
-- user_info
CREATE TABLE user_info
(
`user_id` INT NOT NULL AUTO_INCREMENT,
`username` VARCHAR(50) NOT NULL,
`password` VARCHAR(50) NOT NULL,
`created_at` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
`updated_at` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY (user_id)
);
-- hospital_baseinfo
CREATE TABLE hospital_baseinfo
(
`hospital_id` CHAR(36) NOT NULL,
`owner_user_id` INT NOT NULL,
`hospital_name` VARCHAR(50) NOT NULL,
`hospital_name_en` VARCHAR(50) NULL,
`brn` VARCHAR(50) NOT NULL,
`road_address` VARCHAR(100) NULL,
`site_address` VARCHAR(100) NULL,
`status` VARCHAR(20) NOT NULL DEFAULT 'start',
`created_at` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
`updated_at` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY (hospital_id)
);
CREATE INDEX IX_hospital_baseinfo_1 ON hospital_baseinfo (owner_user_id);
-- remote_source: 병원별 채널 소스 정보 (instagram/facebook/naver_blog/youtube/gangnam_unni 등)
CREATE TABLE remote_source
(
`source_id` INT NOT NULL AUTO_INCREMENT,
`hospital_id` CHAR(36) NOT NULL,
`source_type` VARCHAR(50) NOT NULL,
`url` VARCHAR(500) NOT NULL,
`created_at` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (source_id)
);
CREATE INDEX IX_remote_source_1 ON remote_source (hospital_id);
CREATE INDEX IX_remote_source_2 ON remote_source (hospital_id, source_type);
-- analysis_runs
CREATE TABLE analysis_runs
(
`analysis_run_id` CHAR(36) NOT NULL,
`hospital_id` CHAR(36) NOT NULL,
`owner_user_id` INT NOT NULL DEFAULT 0,
`status` VARCHAR(50) NOT NULL DEFAULT 'discovering',
`report_data` JSON NULL,
`plan_data` JSON NULL,
`created_at` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
`updated_at` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY (analysis_run_id)
);
CREATE INDEX IX_analysis_runs_1 ON analysis_runs (hospital_id);
CREATE INDEX IX_analysis_runs_2 ON analysis_runs (owner_user_id);
-- raw_info: 분석 실행별 수집 원시 데이터
CREATE TABLE raw_info
(
`info_id` INT NOT NULL AUTO_INCREMENT,
`source_id` INT NOT NULL,
`analysis_run_id` CHAR(36) NOT NULL,
`data_tag` VARCHAR(50) NOT NULL DEFAULT 'default',
`status` VARCHAR(20) NOT NULL DEFAULT 'start',
`raw_data` JSON NULL,
`created_at` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
`updated_at` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY (info_id)
);
CREATE INDEX IX_raw_info_1 ON raw_info (analysis_run_id);
CREATE INDEX IX_raw_info_2 ON raw_info (source_id);
-- file_data
CREATE TABLE file_data
(
`id` INT NOT NULL AUTO_INCREMENT,
`analysis_run_id` CHAR(36) NOT NULL,
`hospital_id` CHAR(36) NULL,
`file_type` ENUM('image','video','audio','document','file') NOT NULL DEFAULT 'file',
`file_name` VARCHAR(255) NOT NULL,
`file_url` VARCHAR(2048) NOT NULL,
`size_bytes` BIGINT NULL,
`is_deleted` BOOLEAN NOT NULL DEFAULT FALSE,
`created_at` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (id),
INDEX IX_file_data_1 (analysis_run_id, is_deleted)
);
-- hospital_history
CREATE TABLE hospital_history
(
`id` INT NOT NULL AUTO_INCREMENT,
`hospital_id` CHAR(36) NOT NULL,
`owner_user_id` INT NOT NULL,
`hospital_name` VARCHAR(50) NOT NULL,
`hospital_name_en` VARCHAR(50) NULL,
`brn` VARCHAR(50) NOT NULL,
`road_address` VARCHAR(100) NULL,
`site_address` VARCHAR(100) NULL,
`status` VARCHAR(20) NOT NULL,
`analysis_run_id` CHAR(36) NULL,
`created_at` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (id)
);
CREATE INDEX IX_hospital_history_1 ON hospital_history (hospital_id);
CREATE INDEX IX_hospital_history_2 ON hospital_history (analysis_run_id);
-- market_analysis
CREATE TABLE market_analysis
(
`id` INT NOT NULL AUTO_INCREMENT,
`analysis_run_id` CHAR(36) NOT NULL,
`analysis_type` VARCHAR(50) NOT NULL,
`status` VARCHAR(20) NOT NULL DEFAULT 'start',
`data` JSON NULL,
`created_at` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (id),
UNIQUE KEY UQ_market_analysis (analysis_run_id, analysis_type)
);
CREATE INDEX IX_market_analysis_1 ON market_analysis (analysis_run_id);

View File

@ -2,12 +2,14 @@ import logging
import uuid6 import uuid6
from fastapi import APIRouter, BackgroundTasks, Depends, File, Form, HTTPException, UploadFile, status from fastapi import APIRouter, BackgroundTasks, Depends, File, Form, HTTPException, UploadFile, status
from common.deps import verify_api_key from common.deps import verify_api_key
from common.db import fetchone, insert_instagram_row, insert_facebook_row, insert_naver_blog_row, insert_youtube_row, insert_gangnam_unni_row, insert_analysis_run from common.db.hospital import select_hospital
from common.db.source import select_source_mainpage, insert_source, insert_raw_info
from common.db.run import insert_run, select_run_status
from models.analysis import AnalysisCreate, AnalysisStartResponse, AnalysisStatusResponse from models.analysis import AnalysisCreate, AnalysisStartResponse, AnalysisStatusResponse
from models.file import FileListItem, FileType, FileUploadResponse from models.file import FileListItem, FileType, FileUploadResponse
from models.status import AnalysisStatus from models.status import AnalysisStatus, SourceType
from services.pipeline import run_pipeline from services.pipeline import run_pipeline
from services.file import get_analysis_files_response, handle_analysis_file_upload, soft_delete_analysis_file from services.file_data import get_analysis_files_response, handle_analysis_file_upload, soft_delete_analysis_file
router = APIRouter(prefix="/api/analysis", tags=["analysis"], dependencies=[Depends(verify_api_key)]) router = APIRouter(prefix="/api/analysis", tags=["analysis"], dependencies=[Depends(verify_api_key)])
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -20,23 +22,27 @@ async def start_analysis(body: AnalysisCreate, background_tasks: BackgroundTasks
hospital_id = body.clinic_id hospital_id = body.clinic_id
# 사실 hospital과 owner_user_id 비교 후 검증이 필요한 거지만 일단 PoC 니까. 나중에 바꿉니다. # 사실 hospital과 owner_user_id 비교 후 검증이 필요한 거지만 일단 PoC 니까. 나중에 바꿉니다.
hospital = await fetchone( hospital = await select_hospital(hospital_id)
"SELECT owner_user_id, url FROM hospital_baseinfo WHERE hospital_id = %s",
(hospital_id,),
)
if not hospital: if not hospital:
raise HTTPException(status_code=409, detail="Clinic not found") raise HTTPException(status_code=409, detail="Clinic not found")
ig_id = await insert_instagram_row(hospital_id, body.channels.instagram) if body.channels.instagram else None analysis_run_id = await insert_run(analysis_run_id, hospital_id, hospital["owner_user_id"])
fb_id = await insert_facebook_row(hospital_id, body.channels.facebook) if body.channels.facebook else None
nb_id = await insert_naver_blog_row(hospital_id, body.channels.naver_blog) if body.channels.naver_blog else None
yt_id = await insert_youtube_row(hospital_id, body.channels.youtube) if body.channels.youtube else None
gu_id = await insert_gangnam_unni_row(hospital_id, body.channels.gangnam_unni) if body.channels.gangnam_unni else None
analysis_run_id = await insert_analysis_run( mainpage = await select_source_mainpage(hospital_id)
analysis_run_id, hospital_id, hospital["owner_user_id"], if mainpage:
ig_id, fb_id, nb_id, yt_id, gu_id, await insert_raw_info(mainpage["source_id"], analysis_run_id, data_tag=SourceType.MAINPAGE)
)
channels = [
(SourceType.INSTAGRAM, body.channels.instagram),
(SourceType.FACEBOOK, body.channels.facebook),
(SourceType.NAVER_BLOG, body.channels.naver_blog),
(SourceType.YOUTUBE, body.channels.youtube),
(SourceType.GANGNAM_UNNI, body.channels.gangnam_unni),
]
for source_type, url in channels:
if url:
source_id = await insert_source(hospital_id, source_type, url)
await insert_raw_info(source_id, analysis_run_id, data_tag=source_type)
background_tasks.add_task(run_pipeline, analysis_run_id) background_tasks.add_task(run_pipeline, analysis_run_id)
@ -75,12 +81,12 @@ async def delete_analysis_run_file(run_id: str, file_id: int) -> None:
@router.get("/{run_id}/status", response_model=AnalysisStatusResponse) @router.get("/{run_id}/status", response_model=AnalysisStatusResponse)
async def get_analysis_status(run_id: str): async def get_analysis_status(run_id: str):
logger.info("GET /api/analysis/%s/status", run_id) logger.info("GET /api/analysis/%s/status", run_id)
row = await fetchone("SELECT status FROM analysis_runs WHERE analysis_run_id = %s", (run_id,)) run_status = await select_run_status(run_id)
if not row: if run_status is None:
raise HTTPException(status_code=404, detail="Run not found") raise HTTPException(status_code=404, detail="Run not found")
return AnalysisStatusResponse( return AnalysisStatusResponse(
analysis_run_id=run_id, analysis_run_id=run_id,
status=AnalysisStatus(row["status"]), status=AnalysisStatus(run_status),
progress=50.0, progress=50.0,
current_step="", current_step="",
channel_errors={}, channel_errors={},

View File

@ -2,7 +2,8 @@ import logging
import uuid6 import uuid6
from fastapi import APIRouter, Depends, HTTPException, status from fastapi import APIRouter, Depends, HTTPException, status
from common.deps import verify_api_key from common.deps import verify_api_key
from common.db import insert_hospital, fetchone from common.db.hospital import select_hospital, insert_hospital
from common.db.source import insert_source
from common.utils import get_env from common.utils import get_env
from integrations.firecrawl import FirecrawlClient from integrations.firecrawl import FirecrawlClient
from models.clinic import ClinicCreate, ClinicCreateResponse, ClinicResponse, ClinicHistoryResponse, RunSummary from models.clinic import ClinicCreate, ClinicCreateResponse, ClinicResponse, ClinicHistoryResponse, RunSummary
@ -30,9 +31,8 @@ async def create_clinic(body: ClinicCreate):
name=info["clinicName"], name=info["clinicName"],
name_en=info.get("clinicNameEn"), name_en=info.get("clinicNameEn"),
road_address=info.get("address"), road_address=info.get("address"),
url=body.url,
raw_data=info,
) )
await insert_source(hospital_id, "mainpage", body.url)
return ClinicCreateResponse( return ClinicCreateResponse(
id=hospital_id, id=hospital_id,
url=body.url, url=body.url,
@ -44,11 +44,7 @@ async def create_clinic(body: ClinicCreate):
@router.get("/{hospital_id}", response_model=ClinicResponse) @router.get("/{hospital_id}", response_model=ClinicResponse)
async def get_clinic(hospital_id: str): async def get_clinic(hospital_id: str):
logger.info("GET /api/clinics/%s", hospital_id) logger.info("GET /api/clinics/%s", hospital_id)
row = await fetchone( row = await select_hospital(hospital_id)
"SELECT hospital_id, hospital_name, hospital_name_en, road_address, url, status, raw_data, created_at, updated_at"
" FROM hospital_baseinfo WHERE hospital_id = %s",
(hospital_id,),
)
if not row: if not row:
raise HTTPException(status_code=404, detail="Clinic not found") raise HTTPException(status_code=404, detail="Clinic not found")
return ClinicResponse(**{**row, "created_at": str(row["created_at"]), "updated_at": str(row["updated_at"])}) return ClinicResponse(**{**row, "created_at": str(row["created_at"]), "updated_at": str(row["updated_at"])})

View File

@ -1,7 +1,7 @@
import json import json
import logging import logging
from fastapi import APIRouter, Depends, HTTPException, Response from fastapi import APIRouter, Depends, HTTPException, Response
from common.db import fetchone from common.db.run import select_run_with_clinic
from common.deps import verify_api_key from common.deps import verify_api_key
from integrations.llm.schemas.plan import PlanOutput from integrations.llm.schemas.plan import PlanOutput
from models.plan import PlanApiResponse from models.plan import PlanApiResponse
@ -13,13 +13,7 @@ logger = logging.getLogger(__name__)
@router.get("/{run_id}", response_model=PlanApiResponse, response_model_by_alias=True) @router.get("/{run_id}", response_model=PlanApiResponse, response_model_by_alias=True)
async def get_plan(run_id: str): async def get_plan(run_id: str):
logger.info("GET /api/plan/%s", run_id) logger.info("GET /api/plan/%s", run_id)
row = await fetchone( row = await select_run_with_clinic(run_id)
"SELECT ar.plan_data, ar.created_at, h.hospital_name, h.hospital_name_en, h.url"
" FROM analysis_runs ar"
" JOIN hospital_baseinfo h ON ar.hospital_id = h.hospital_id"
" WHERE ar.analysis_run_id = %s",
(run_id,),
)
if row is None: if row is None:
raise HTTPException(status_code=404, detail="Run not found") raise HTTPException(status_code=404, detail="Run not found")
if row["plan_data"] is None: if row["plan_data"] is None:
@ -31,6 +25,6 @@ async def get_plan(run_id: str):
clinic_name=row["hospital_name"], clinic_name=row["hospital_name"],
clinic_name_en=row["hospital_name_en"], clinic_name_en=row["hospital_name_en"],
created_at=str(row["created_at"]), created_at=str(row["created_at"]),
target_url=row["url"], target_url=row["target_url"],
**plan.model_dump(), **plan.model_dump(),
) )

View File

@ -1,7 +1,7 @@
import json import json
import logging import logging
from fastapi import APIRouter, Depends, HTTPException, Response from fastapi import APIRouter, Depends, HTTPException, Response
from common.db import fetchone from common.db.run import select_run_with_clinic
from common.deps import verify_api_key from common.deps import verify_api_key
from integrations.llm.schemas.report import ReportOutput from integrations.llm.schemas.report import ReportOutput
from models.report import MarketingReportResponse from models.report import MarketingReportResponse
@ -13,13 +13,7 @@ logger = logging.getLogger(__name__)
@router.get("/{run_id}", response_model=MarketingReportResponse, response_model_by_alias=True) @router.get("/{run_id}", response_model=MarketingReportResponse, response_model_by_alias=True)
async def get_report(run_id: str): async def get_report(run_id: str):
logger.info("GET /api/report/%s", run_id) logger.info("GET /api/report/%s", run_id)
row = await fetchone( row = await select_run_with_clinic(run_id)
"SELECT ar.report_data, ar.created_at, h.hospital_name, h.hospital_name_en, h.url"
" FROM analysis_runs ar"
" JOIN hospital_baseinfo h ON ar.hospital_id = h.hospital_id"
" WHERE ar.analysis_run_id = %s",
(run_id,),
)
if row is None: if row is None:
raise HTTPException(status_code=404, detail="Run not found") raise HTTPException(status_code=404, detail="Run not found")
if row["report_data"] is None: if row["report_data"] is None:
@ -31,6 +25,6 @@ async def get_report(run_id: str):
clinic_name=row["hospital_name"], clinic_name=row["hospital_name"],
clinic_name_en=row["hospital_name_en"], clinic_name_en=row["hospital_name_en"],
created_at=str(row["created_at"]), created_at=str(row["created_at"]),
target_url=row["url"], target_url=row["target_url"],
**llm_output.model_dump(exclude={"id", "created_at", "target_url"}), **llm_output.model_dump(exclude={"id", "created_at", "target_url"}),
) )

View File

@ -1,274 +0,0 @@
import json
import os
import aiomysql
from common.utils import get_env
_pool: aiomysql.Pool | None = None
async def get_pool() -> aiomysql.Pool:
global _pool
if _pool is None:
_pool = await aiomysql.create_pool(
host=get_env("MYSQL_HOST"),
port=int(os.getenv("MYSQL_PORT", "3306")),
user=get_env("MYSQL_USER"),
password=get_env("MYSQL_PASSWORD"),
db=get_env("MYSQL_DB"),
charset="utf8mb4",
minsize=0,
maxsize=30,
connect_timeout=10,
)
return _pool
# 쓰기 (INSERT/UPDATE/DELETE)
async def execute(sql: str, args: tuple = ()) -> int:
pool = await get_pool()
async with pool.acquire() as conn:
try:
async with conn.cursor() as cur:
await cur.execute(sql, args)
await conn.commit()
return cur.lastrowid
finally:
conn.close()
# 읽기 (SELECT)
async def fetchone(sql: str, args: tuple = ()) -> dict | None:
pool = await get_pool()
async with pool.acquire() as conn:
try:
async with conn.cursor(aiomysql.DictCursor) as cur:
await cur.execute(sql, args)
return await cur.fetchone()
finally:
conn.close()
async def fetchall(sql: str, args: tuple = ()) -> list[dict]:
pool = await get_pool()
async with pool.acquire() as conn:
try:
async with conn.cursor(aiomysql.DictCursor) as cur:
await cur.execute(sql, args)
return await cur.fetchall()
finally:
conn.close()
async def insert_instagram_row(hospital_id: str, url: str) -> int:
return await execute("INSERT INTO instagram_data (hospital_id, url) VALUES (%s, %s)", (hospital_id, url))
async def insert_facebook_row(hospital_id: str, url: str) -> int:
return await execute("INSERT INTO facebook_data (hospital_id, url) VALUES (%s, %s)", (hospital_id, url))
async def insert_naver_blog_row(hospital_id: str, url: str) -> int:
return await execute("INSERT INTO naver_blog_data (hospital_id, url) VALUES (%s, %s)", (hospital_id, url))
async def insert_youtube_row(hospital_id: str, url: str) -> int:
return await execute("INSERT INTO youtube_data (hospital_id, url) VALUES (%s, %s)", (hospital_id, url))
async def insert_gangnam_unni_row(hospital_id: str, url: str) -> int:
return await execute("INSERT INTO gangnam_unni_data (hospital_id, url) VALUES (%s, %s)", (hospital_id, url))
async def insert_file_row(
analysis_run_id: str,
file_type: str,
file_name: str,
file_url: str,
size_bytes: int | None = None,
hospital_id: str | None = None,
) -> int:
return await execute(
"INSERT INTO file_data (analysis_run_id, hospital_id, file_type, file_name, file_url, size_bytes)"
" VALUES (%s, %s, %s, %s, %s, %s)",
(analysis_run_id, hospital_id, file_type, file_name, file_url, size_bytes),
)
async def insert_analysis_run(
analysis_run_id: str,
hospital_id: str,
owner_user_id: int,
instagram_data_id: int | None,
facebook_data_id: int | None,
naver_blog_data_id: int | None,
youtube_data_id: int | None,
gangnam_unni_data_id: int | None,
) -> str:
await execute(
"INSERT INTO analysis_runs"
" (analysis_run_id, hospital_id, owner_user_id, instagram_data_id, facebook_data_id, naver_blog_data_id, youtube_data_id, gangnam_unni_data_id)"
" VALUES (%s, %s, %s, %s, %s, %s, %s, %s)",
(analysis_run_id, hospital_id, owner_user_id, instagram_data_id, facebook_data_id, naver_blog_data_id, youtube_data_id, gangnam_unni_data_id),
)
return analysis_run_id
async def save_analysis_report(analysis_run_id: str, data: dict) -> None:
await execute(
"UPDATE analysis_runs SET report_data = %s WHERE analysis_run_id = %s",
(json.dumps(data, ensure_ascii=False), analysis_run_id),
)
async def is_done(table: str, row_id: int | None) -> bool:
if row_id is None:
return True
r = await fetchone(f"SELECT status FROM {table} WHERE id = %s", (row_id,))
return r["status"] == "done"
async def fetch_raw(table: str, row_id: int | None) -> dict | None:
if row_id is None:
return None
row = await fetchone(f"SELECT raw_data FROM {table} WHERE id = %s", (row_id,))
if not row or not row["raw_data"]:
return None
return json.loads(row["raw_data"]) if isinstance(row["raw_data"], str) else row["raw_data"]
async def get_analysis_raw_data(analysis_run_id: str) -> dict:
run = await fetchone(
"SELECT instagram_data_id, facebook_data_id, naver_blog_data_id, youtube_data_id, gangnam_unni_data_id"
" FROM analysis_runs WHERE analysis_run_id = %s",
(analysis_run_id,),
)
return {
"instagram": await fetch_raw("instagram_data", run["instagram_data_id"]),
"facebook": await fetch_raw("facebook_data", run["facebook_data_id"]),
"naver_blog": await fetch_raw("naver_blog_data", run["naver_blog_data_id"]),
"youtube": await fetch_raw("youtube_data", run["youtube_data_id"]),
"gangnam_unni": await fetch_raw("gangnam_unni_data", run["gangnam_unni_data_id"]),
}
async def set_instagram_status(row_id: int, status: str) -> None:
await execute("UPDATE instagram_data SET status = %s WHERE id = %s", (status, row_id))
async def set_facebook_status(row_id: int, status: str) -> None:
await execute("UPDATE facebook_data SET status = %s WHERE id = %s", (status, row_id))
async def set_naver_blog_status(row_id: int, status: str) -> None:
await execute("UPDATE naver_blog_data SET status = %s WHERE id = %s", (status, row_id))
async def set_youtube_status(row_id: int, status: str) -> None:
await execute("UPDATE youtube_data SET status = %s WHERE id = %s", (status, row_id))
async def set_gangnam_unni_status(row_id: int, status: str) -> None:
await execute("UPDATE gangnam_unni_data SET status = %s WHERE id = %s", (status, row_id))
async def save_instagram_raw_data(row_id: int, data: dict) -> None:
await execute("UPDATE instagram_data SET raw_data = %s, status = 'done' WHERE id = %s", (json.dumps(data, ensure_ascii=False), row_id))
async def save_facebook_raw_data(row_id: int, data: dict) -> None:
await execute("UPDATE facebook_data SET raw_data = %s, status = 'done' WHERE id = %s", (json.dumps(data, ensure_ascii=False), row_id))
async def save_naver_blog_raw_data(row_id: int, data: dict) -> None:
await execute("UPDATE naver_blog_data SET raw_data = %s, status = 'done' WHERE id = %s", (json.dumps(data, ensure_ascii=False), row_id))
async def save_youtube_raw_data(row_id: int, data: dict) -> None:
await execute("UPDATE youtube_data SET raw_data = %s, status = 'done' WHERE id = %s", (json.dumps(data, ensure_ascii=False), row_id))
async def save_gangnam_unni_raw_data(row_id: int, data: dict) -> None:
await execute("UPDATE gangnam_unni_data SET raw_data = %s, status = 'done' WHERE id = %s", (json.dumps(data, ensure_ascii=False), row_id))
async def _insert_hospital_history(hospital_id: str, analysis_run_id: str | None) -> None:
row = await fetchone(
"SELECT owner_user_id, hospital_name, hospital_name_en, brn, road_address, site_address, url, status, raw_data"
" FROM hospital_baseinfo WHERE hospital_id = %s",
(hospital_id,),
)
if not row:
return
await execute(
"INSERT INTO hospital_history"
" (hospital_id, owner_user_id, hospital_name, hospital_name_en, brn, road_address, site_address, url, status, raw_data, analysis_run_id)"
" VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)",
(
hospital_id,
row["owner_user_id"],
row["hospital_name"],
row["hospital_name_en"],
row["brn"],
row["road_address"],
row["site_address"],
row["url"],
row["status"],
row["raw_data"] if isinstance(row["raw_data"], str) else json.dumps(row["raw_data"], ensure_ascii=False) if row["raw_data"] else None,
analysis_run_id,
),
)
async def insert_hospital(
hospital_id: str,
name: str,
name_en: str | None = None,
road_address: str | None = None,
site_address: str | None = None,
url: str | None = None,
raw_data: dict | None = None,
owner_user_id: int = 0,
brn: str = "",
) -> dict:
await execute(
"INSERT INTO hospital_baseinfo (hospital_id, hospital_name, hospital_name_en, road_address, site_address, url, raw_data, status, owner_user_id, brn)"
" VALUES (%s, %s, %s, %s, %s, %s, %s, 'done', %s, %s)",
(hospital_id, name, name_en, road_address, site_address, url,
json.dumps(raw_data, ensure_ascii=False) if raw_data else None,
owner_user_id, brn),
)
await _insert_hospital_history(hospital_id, analysis_run_id=None)
return await fetchone(
"SELECT created_at FROM hospital_baseinfo WHERE hospital_id = %s",
(hospital_id,),
)
async def save_hospital_raw_data(hospital_id: str, data: dict, analysis_run_id: str | None = None) -> None:
await execute(
"UPDATE hospital_baseinfo"
" SET raw_data = %s, status = 'done',"
" hospital_name = COALESCE(%s, hospital_name),"
" hospital_name_en = COALESCE(%s, hospital_name_en),"
" road_address = COALESCE(%s, road_address)"
" WHERE hospital_id = %s",
(
json.dumps(data, ensure_ascii=False),
data.get("clinicName"),
data.get("clinicNameEn"),
data.get("address"),
hospital_id,
),
)
await _insert_hospital_history(hospital_id, analysis_run_id)
async def get_market_analysis(analysis_run_id: str) -> dict:
rows = await fetchall(
"SELECT analysis_type, data FROM market_analysis WHERE analysis_run_id = %s AND status = 'done'",
(analysis_run_id,),
)
return {
row["analysis_type"]: json.loads(row["data"]) if isinstance(row["data"], str) else row["data"]
for row in rows
}

14
app/common/db/__init__.py Normal file
View File

@ -0,0 +1,14 @@
from common.db.base import execute, fetchone, fetchall
from common.db.hospital import select_hospital, update_hospital_status, insert_hospital, update_hospital
from common.db.source import (
insert_source, select_source_mainpage,
insert_raw_info, update_raw_info_status, update_raw_info,
select_raw_info_data,
select_run_sources, select_run_raw_data, select_run_mainpage_url,
)
from common.db.run import (
insert_run, select_run, select_run_status, update_run_status,
update_run_report, update_run_plan, select_run_with_clinic,
)
from common.db.market import upsert_market_status, upsert_market_result, select_market
from common.db.file_data import insert_file, select_run_files, select_file, delete_file

56
app/common/db/base.py Normal file
View File

@ -0,0 +1,56 @@
import os
import aiomysql
from common.utils import get_env
_pool: aiomysql.Pool | None = None
async def get_pool() -> aiomysql.Pool:
global _pool
if _pool is None:
_pool = await aiomysql.create_pool(
host=get_env("MYSQL_HOST"),
port=int(os.getenv("MYSQL_PORT", "3306")),
user=get_env("MYSQL_USER"),
password=get_env("MYSQL_PASSWORD"),
db=get_env("MYSQL_DB"),
charset="utf8mb4",
minsize=0,
maxsize=30,
connect_timeout=10,
)
return _pool
async def execute(sql: str, args: tuple = ()) -> int:
pool = await get_pool()
async with pool.acquire() as conn:
try:
async with conn.cursor() as cur:
await cur.execute(sql, args)
await conn.commit()
return cur.lastrowid
finally:
conn.close()
async def fetchone(sql: str, args: tuple = ()) -> dict | None:
pool = await get_pool()
async with pool.acquire() as conn:
try:
async with conn.cursor(aiomysql.DictCursor) as cur:
await cur.execute(sql, args)
return await cur.fetchone()
finally:
conn.close()
async def fetchall(sql: str, args: tuple = ()) -> list[dict]:
pool = await get_pool()
async with pool.acquire() as conn:
try:
async with conn.cursor(aiomysql.DictCursor) as cur:
await cur.execute(sql, args)
return await cur.fetchall()
finally:
conn.close()

View File

@ -0,0 +1,39 @@
from common.db.base import execute, fetchone, fetchall
async def insert_file(
analysis_run_id: str,
file_type: str,
file_name: str,
file_url: str,
size_bytes: int | None = None,
hospital_id: str | None = None,
) -> int:
return await execute(
"INSERT INTO file_data (analysis_run_id, hospital_id, file_type, file_name, file_url, size_bytes)"
" VALUES (%s, %s, %s, %s, %s, %s)",
(analysis_run_id, hospital_id, file_type, file_name, file_url, size_bytes),
)
async def select_run_files(analysis_run_id: str) -> list[dict]:
return await fetchall(
"SELECT id, file_type, file_name, file_url, size_bytes, created_at"
" FROM file_data WHERE analysis_run_id = %s AND is_deleted = FALSE"
" ORDER BY created_at DESC",
(analysis_run_id,),
)
async def select_file(file_id: int, analysis_run_id: str) -> dict | None:
return await fetchone(
"SELECT id FROM file_data WHERE id = %s AND analysis_run_id = %s",
(file_id, analysis_run_id),
)
async def delete_file(file_id: int) -> None:
await execute(
"UPDATE file_data SET is_deleted = TRUE WHERE id = %s AND is_deleted = FALSE",
(file_id,),
)

78
app/common/db/hospital.py Normal file
View File

@ -0,0 +1,78 @@
from common.db.base import execute, fetchone
async def select_hospital(hospital_id: str) -> dict | None:
return await fetchone(
"SELECT hospital_id, owner_user_id, hospital_name, hospital_name_en,"
" brn, road_address, site_address, status, created_at, updated_at"
" FROM hospital_baseinfo WHERE hospital_id = %s",
(hospital_id,),
)
async def update_hospital_status(hospital_id: str, status: str) -> None:
await execute(
"UPDATE hospital_baseinfo SET status = %s WHERE hospital_id = %s",
(status, hospital_id),
)
async def _insert_hospital_history(hospital_id: str, analysis_run_id: str | None) -> None:
row = await fetchone(
"SELECT owner_user_id, hospital_name, hospital_name_en, brn, road_address, site_address, status"
" FROM hospital_baseinfo WHERE hospital_id = %s",
(hospital_id,),
)
if not row:
return
await execute(
"INSERT INTO hospital_history"
" (hospital_id, owner_user_id, hospital_name, hospital_name_en, brn, road_address, site_address, status, analysis_run_id)"
" VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)",
(
hospital_id,
row["owner_user_id"],
row["hospital_name"],
row["hospital_name_en"],
row["brn"],
row["road_address"],
row["site_address"],
row["status"],
analysis_run_id,
),
)
async def insert_hospital(
hospital_id: str,
name: str,
name_en: str | None = None,
road_address: str | None = None,
site_address: str | None = None,
owner_user_id: int = 0,
brn: str = "",
) -> dict:
await execute(
"INSERT INTO hospital_baseinfo"
" (hospital_id, hospital_name, hospital_name_en, road_address, site_address, status, owner_user_id, brn)"
" VALUES (%s, %s, %s, %s, %s, 'done', %s, %s)",
(hospital_id, name, name_en, road_address, site_address, owner_user_id, brn),
)
await _insert_hospital_history(hospital_id, analysis_run_id=None)
return await fetchone(
"SELECT created_at FROM hospital_baseinfo WHERE hospital_id = %s",
(hospital_id,),
)
async def update_hospital(hospital_id: str, data: dict, analysis_run_id: str | None = None) -> None:
await execute(
"UPDATE hospital_baseinfo"
" SET status = 'done',"
" hospital_name = COALESCE(%s, hospital_name),"
" hospital_name_en = COALESCE(%s, hospital_name_en),"
" road_address = COALESCE(%s, road_address)"
" WHERE hospital_id = %s",
(data.get("clinicName"), data.get("clinicNameEn"), data.get("address"), hospital_id),
)
await _insert_hospital_history(hospital_id, analysis_run_id)

31
app/common/db/market.py Normal file
View File

@ -0,0 +1,31 @@
import json
from common.db.base import execute, fetchall
async def upsert_market_status(analysis_run_id: str, analysis_type: str, status: str) -> None:
await execute(
"INSERT INTO market_analysis (analysis_run_id, analysis_type, status)"
" VALUES (%s, %s, %s)"
" ON DUPLICATE KEY UPDATE status = VALUES(status)",
(analysis_run_id, analysis_type, status),
)
async def upsert_market_result(analysis_run_id: str, analysis_type: str, data: dict) -> None:
await execute(
"INSERT INTO market_analysis (analysis_run_id, analysis_type, status, data)"
" VALUES (%s, %s, 'done', %s)"
" ON DUPLICATE KEY UPDATE status = 'done', data = VALUES(data)",
(analysis_run_id, analysis_type, json.dumps(data, ensure_ascii=False)),
)
async def select_market(analysis_run_id: str) -> dict:
rows = await fetchall(
"SELECT analysis_type, data FROM market_analysis WHERE analysis_run_id = %s AND status = 'done'",
(analysis_run_id,),
)
return {
row["analysis_type"]: json.loads(row["data"]) if isinstance(row["data"], str) else row["data"]
for row in rows
}

64
app/common/db/run.py Normal file
View File

@ -0,0 +1,64 @@
import json
from common.db.base import execute, fetchone
async def insert_run(
analysis_run_id: str,
hospital_id: str,
owner_user_id: int,
) -> str:
await execute(
"INSERT INTO analysis_runs (analysis_run_id, hospital_id, owner_user_id) VALUES (%s, %s, %s)",
(analysis_run_id, hospital_id, owner_user_id),
)
return analysis_run_id
async def select_run(analysis_run_id: str) -> dict | None:
return await fetchone(
"SELECT analysis_run_id, hospital_id, owner_user_id, status, created_at, updated_at"
" FROM analysis_runs WHERE analysis_run_id = %s",
(analysis_run_id,),
)
async def select_run_status(analysis_run_id: str) -> str | None:
row = await fetchone(
"SELECT status FROM analysis_runs WHERE analysis_run_id = %s",
(analysis_run_id,),
)
return row["status"] if row else None
async def update_run_status(analysis_run_id: str, status: str) -> None:
await execute(
"UPDATE analysis_runs SET status = %s WHERE analysis_run_id = %s",
(status, analysis_run_id),
)
async def update_run_report(analysis_run_id: str, data: dict) -> None:
await execute(
"UPDATE analysis_runs SET report_data = %s WHERE analysis_run_id = %s",
(json.dumps(data, ensure_ascii=False), analysis_run_id),
)
async def update_run_plan(analysis_run_id: str, data: dict) -> None:
await execute(
"UPDATE analysis_runs SET plan_data = %s WHERE analysis_run_id = %s",
(json.dumps(data, ensure_ascii=False), analysis_run_id),
)
async def select_run_with_clinic(analysis_run_id: str) -> dict | None:
return await fetchone(
"SELECT ar.report_data, ar.plan_data, ar.created_at,"
" h.hospital_name, h.hospital_name_en,"
" rs.url AS target_url"
" FROM analysis_runs ar"
" JOIN hospital_baseinfo h ON ar.hospital_id = h.hospital_id"
" LEFT JOIN remote_source rs ON rs.hospital_id = h.hospital_id AND rs.source_type = 'mainpage'"
" WHERE ar.analysis_run_id = %s",
(analysis_run_id,),
)

85
app/common/db/source.py Normal file
View File

@ -0,0 +1,85 @@
import json
from common.db.base import execute, fetchone, fetchall
from models.status import SourceType
async def insert_source(
hospital_id: str,
source_type: SourceType,
url: str,
language: str | None = None,
) -> int:
return await execute(
"INSERT INTO remote_source (hospital_id, source_type, language, url) VALUES (%s, %s, %s, %s)",
(hospital_id, source_type, language, url),
)
async def select_source_mainpage(hospital_id: str) -> dict | None:
return await fetchone(
"SELECT source_id FROM remote_source WHERE hospital_id = %s AND source_type = 'mainpage'",
(hospital_id,),
)
async def insert_raw_info(
source_id: int,
analysis_run_id: str,
data_tag: SourceType,
) -> int:
return await execute(
"INSERT INTO raw_info (source_id, analysis_run_id, data_tag) VALUES (%s, %s, %s)",
(source_id, analysis_run_id, data_tag),
)
async def update_raw_info_status(info_id: int, status: str) -> None:
await execute("UPDATE raw_info SET status = %s WHERE info_id = %s", (status, info_id))
async def update_raw_info(info_id: int, data: dict) -> None:
await execute(
"UPDATE raw_info SET raw_data = %s, status = 'done' WHERE info_id = %s",
(json.dumps(data, ensure_ascii=False), info_id),
)
async def select_raw_info_data(info_id: int | None) -> dict | None:
if info_id is None:
return None
row = await fetchone("SELECT raw_data FROM raw_info WHERE info_id = %s", (info_id,))
if not row or not row["raw_data"]:
return None
return json.loads(row["raw_data"]) if isinstance(row["raw_data"], str) else row["raw_data"]
async def select_run_sources(analysis_run_id: str) -> list[dict]:
return await fetchall(
"SELECT ri.info_id, rs.source_type, rs.url"
" FROM raw_info ri JOIN remote_source rs USING (source_id)"
" WHERE ri.analysis_run_id = %s",
(analysis_run_id,),
)
async def select_run_raw_data(analysis_run_id: str) -> dict:
rows = await fetchall(
"SELECT rs.source_type, ri.raw_data"
" FROM raw_info ri JOIN remote_source rs USING (source_id)"
" WHERE ri.analysis_run_id = %s",
(analysis_run_id,),
)
result: dict = {}
for row in rows:
raw = row["raw_data"]
result[row["source_type"]] = json.loads(raw) if isinstance(raw, str) else raw
return result
async def select_run_mainpage_url(analysis_run_id: str) -> str:
row = await fetchone(
"SELECT rs.url FROM raw_info ri JOIN remote_source rs USING (source_id)"
" WHERE ri.analysis_run_id = %s AND rs.source_type = 'mainpage'",
(analysis_run_id,),
)
return (row or {}).get("url") or ""

View File

@ -1,236 +0,0 @@
import json
import os
import aiomysql
from common.utils import get_env
_pool: aiomysql.Pool | None = None
async def get_pool() -> aiomysql.Pool:
global _pool
if _pool is None:
_pool = await aiomysql.create_pool(
host=get_env("MYSQL_HOST"),
port=int(os.getenv("MYSQL_PORT", "3306")),
user=get_env("MYSQL_USER"),
password=get_env("MYSQL_PASSWORD"),
db=get_env("MYSQL_DB"),
charset="utf8mb4",
minsize=0,
maxsize=30,
connect_timeout=10,
)
return _pool
async def execute(sql: str, args: tuple = ()) -> int:
pool = await get_pool()
async with pool.acquire() as conn:
try:
async with conn.cursor() as cur:
await cur.execute(sql, args)
await conn.commit()
return cur.lastrowid
finally:
conn.close()
async def fetchone(sql: str, args: tuple = ()) -> dict | None:
pool = await get_pool()
async with pool.acquire() as conn:
try:
async with conn.cursor(aiomysql.DictCursor) as cur:
await cur.execute(sql, args)
return await cur.fetchone()
finally:
conn.close()
async def fetchall(sql: str, args: tuple = ()) -> list[dict]:
pool = await get_pool()
async with pool.acquire() as conn:
try:
async with conn.cursor(aiomysql.DictCursor) as cur:
await cur.execute(sql, args)
return await cur.fetchall()
finally:
conn.close()
# ── remote_source ─────────────────────────────────────────────────────────────
async def insert_source(hospital_id: str, source_type: str, url: str) -> int:
return await execute(
"INSERT INTO remote_source (hospital_id, source_type, url) VALUES (%s, %s, %s)",
(hospital_id, source_type, url),
)
# ── raw_info ──────────────────────────────────────────────────────────────────
async def insert_raw_info(source_id: int, analysis_run_id: str, data_tag: str = "default") -> int:
return await execute(
"INSERT INTO raw_info (source_id, analysis_run_id, data_tag) VALUES (%s, %s, %s)",
(source_id, analysis_run_id, data_tag),
)
async def set_raw_info_status(info_id: int, status: str) -> None:
await execute("UPDATE raw_info SET status = %s WHERE info_id = %s", (status, info_id))
async def save_raw_info(info_id: int, data: dict) -> None:
await execute(
"UPDATE raw_info SET raw_data = %s, status = 'done' WHERE info_id = %s",
(json.dumps(data, ensure_ascii=False), info_id),
)
async def fetch_raw(info_id: int | None) -> dict | None:
if info_id is None:
return None
row = await fetchone("SELECT raw_data FROM raw_info WHERE info_id = %s", (info_id,))
if not row or not row["raw_data"]:
return None
return json.loads(row["raw_data"]) if isinstance(row["raw_data"], str) else row["raw_data"]
async def is_done(info_id: int | None) -> bool:
if info_id is None:
return True
r = await fetchone("SELECT status FROM raw_info WHERE info_id = %s", (info_id,))
return r["status"] == "done"
async def get_analysis_raw_data(analysis_run_id: str) -> dict:
rows = await fetchall(
"SELECT rs.source_type, ri.raw_data"
" FROM raw_info ri JOIN remote_source rs USING (source_id)"
" WHERE ri.analysis_run_id = %s",
(analysis_run_id,),
)
result: dict = {}
for row in rows:
raw = row["raw_data"]
result[row["source_type"]] = json.loads(raw) if isinstance(raw, str) else raw
return result
# ── analysis_runs ─────────────────────────────────────────────────────────────
async def insert_analysis_run(
analysis_run_id: str,
hospital_id: str,
owner_user_id: int,
) -> str:
await execute(
"INSERT INTO analysis_runs (analysis_run_id, hospital_id, owner_user_id)"
" VALUES (%s, %s, %s)",
(analysis_run_id, hospital_id, owner_user_id),
)
return analysis_run_id
async def save_analysis_report(analysis_run_id: str, data: dict) -> None:
await execute(
"UPDATE analysis_runs SET report_data = %s WHERE analysis_run_id = %s",
(json.dumps(data, ensure_ascii=False), analysis_run_id),
)
# ── hospital_baseinfo ─────────────────────────────────────────────────────────
async def _insert_hospital_history(hospital_id: str, analysis_run_id: str | None) -> None:
row = await fetchone(
"SELECT owner_user_id, hospital_name, hospital_name_en, brn, road_address, site_address, status"
" FROM hospital_baseinfo WHERE hospital_id = %s",
(hospital_id,),
)
if not row:
return
await execute(
"INSERT INTO hospital_history"
" (hospital_id, owner_user_id, hospital_name, hospital_name_en, brn, road_address, site_address, status, analysis_run_id)"
" VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)",
(
hospital_id,
row["owner_user_id"],
row["hospital_name"],
row["hospital_name_en"],
row["brn"],
row["road_address"],
row["site_address"],
row["status"],
analysis_run_id,
),
)
async def insert_hospital(
hospital_id: str,
name: str,
name_en: str | None = None,
road_address: str | None = None,
site_address: str | None = None,
owner_user_id: int = 0,
brn: str = "",
) -> dict:
await execute(
"INSERT INTO hospital_baseinfo (hospital_id, hospital_name, hospital_name_en, road_address, site_address, status, owner_user_id, brn)"
" VALUES (%s, %s, %s, %s, %s, 'done', %s, %s)",
(hospital_id, name, name_en, road_address, site_address, owner_user_id, brn),
)
await _insert_hospital_history(hospital_id, analysis_run_id=None)
return await fetchone(
"SELECT created_at FROM hospital_baseinfo WHERE hospital_id = %s",
(hospital_id,),
)
async def update_hospital_info(hospital_id: str, data: dict, analysis_run_id: str | None = None) -> None:
"""clinic 스크래핑 후 hospital_baseinfo의 기본 필드 업데이트."""
await execute(
"UPDATE hospital_baseinfo"
" SET status = 'done',"
" hospital_name = COALESCE(%s, hospital_name),"
" hospital_name_en = COALESCE(%s, hospital_name_en),"
" road_address = COALESCE(%s, road_address)"
" WHERE hospital_id = %s",
(
data.get("clinicName"),
data.get("clinicNameEn"),
data.get("address"),
hospital_id,
),
)
await _insert_hospital_history(hospital_id, analysis_run_id)
# ── file_data ─────────────────────────────────────────────────────────────────
async def insert_file_row(
analysis_run_id: str,
file_type: str,
file_name: str,
file_url: str,
size_bytes: int | None = None,
hospital_id: str | None = None,
) -> int:
return await execute(
"INSERT INTO file_data (analysis_run_id, hospital_id, file_type, file_name, file_url, size_bytes)"
" VALUES (%s, %s, %s, %s, %s, %s)",
(analysis_run_id, hospital_id, file_type, file_name, file_url, size_bytes),
)
# ── market_analysis ───────────────────────────────────────────────────────────
async def get_market_analysis(analysis_run_id: str) -> dict:
rows = await fetchall(
"SELECT analysis_type, data FROM market_analysis WHERE analysis_run_id = %s AND status = 'done'",
(analysis_run_id,),
)
return {
row["analysis_type"]: json.loads(row["data"]) if isinstance(row["data"], str) else row["data"]
for row in rows
}

View File

@ -35,6 +35,7 @@ class ApifyClient:
async def fetch_instagram_profile(self, url: str) -> dict | None: async def fetch_instagram_profile(self, url: str) -> dict | None:
username = urlparse(url).path.strip("/").split("/")[0] if "://" in url else url.lstrip("@") username = urlparse(url).path.strip("/").split("/")[0] if "://" in url else url.lstrip("@")
print(username)
items = await self._run_actor("apify~instagram-profile-scraper", {"usernames": [username], "resultsLimit": 12}) items = await self._run_actor("apify~instagram-profile-scraper", {"usernames": [username], "resultsLimit": 12})
return items[0] if items else None return items[0] if items else None

View File

@ -1,7 +1,7 @@
import os import os
from pydantic import BaseModel from pydantic import BaseModel
from common.utils import get_env from common.utils import get_env
from integrations.llm.schemas.report import ReportInput, ReportOutput from integrations.llm.schemas.report import ReportInput, ReportOutput, YouTubeDiagnosisInput, YouTubeDiagnosisOutput
from integrations.llm.schemas.plan import PlanInput, PlanOutput from integrations.llm.schemas.plan import PlanInput, PlanOutput
from integrations.llm.schemas.market import ( from integrations.llm.schemas.market import (
MarketCompetitorsInput, MarketCompetitorsOutput, MarketCompetitorsInput, MarketCompetitorsOutput,
@ -80,3 +80,10 @@ market_target_audience_prompt = Prompt(
input_class=MarketTargetAudienceInput, input_class=MarketTargetAudienceInput,
output_class=MarketTargetAudienceOutput, output_class=MarketTargetAudienceOutput,
) )
youtube_diagnosis_prompt = Prompt(
file_name="youtube_diagnosis_prompt.txt",
prompt_model="REPORT_MODEL",
input_class=YouTubeDiagnosisInput,
output_class=YouTubeDiagnosisOutput,
)

View File

@ -70,18 +70,12 @@ class RegistryData(BaseModel):
class ClinicSnapshot(BaseModel): class ClinicSnapshot(BaseModel):
name: str name: str
name_en: str name_en: str
established: str
years_in_business: int
staff_count: int staff_count: int
lead_doctor: LeadDoctor lead_doctor: LeadDoctor
overall_rating: float overall_rating: float
total_reviews: int total_reviews: int
price_range: PriceRange
certifications: list[str] certifications: list[str]
media_appearances: list[str]
medical_tourism: list[str]
location: str location: str
nearest_station: str
phone: str phone: str
domain: str domain: str
logo_images: LogoImages | None = None logo_images: LogoImages | None = None
@ -137,7 +131,6 @@ class YouTubeAudit(BaseModel):
avg_video_length: str avg_video_length: str
upload_frequency: str upload_frequency: str
channel_created_date: str channel_created_date: str
subscriber_rank: str
channel_description: str channel_description: str
linked_urls: list[LinkedUrl] linked_urls: list[LinkedUrl]
playlists: list[str] playlists: list[str]
@ -345,3 +338,20 @@ class MarketingReport(BaseModel):
ReportOutput = MarketingReport ReportOutput = MarketingReport
# --- YouTubeDiagnosis ---
class YouTubeDiagnosisInput(BaseModel):
channel_name: str | None = None
subscribers: int | None = None
total_videos: int | None = None
total_views: int | None = None
avg_video_length: str | None = None
upload_frequency: str | None = None
top_videos: str | None = None
playlists: str | None = None
class YouTubeDiagnosisOutput(BaseModel):
diagnosis: list[DiagnosisItem]

View File

@ -0,0 +1,24 @@
다음은 성형외과/피부과 유튜브 채널 데이터입니다.
채널명: {channel_name}
구독자 수: {subscribers}
총 영상 수: {total_videos}
총 조회수: {total_views}
평균 영상 길이: {avg_video_length}
업로드 주기: {upload_frequency}
인기 영상 목록: {top_videos}
플레이리스트: {playlists}
위 데이터를 바탕으로 이 채널의 마케팅 문제점과 개선사항을 진단해줘.
각 항목은 category(진단 카테고리), detail(상세 설명), severity(critical/warning/info) 형식의 JSON 배열로 출력해줘.
진단 카테고리들은 다음과 같아. :
구독자 대비 조회수 비율,
최근 롱폼 조회수,
Shorts 조회수,
업로드 빈도,
콘텐츠 톤앤매너,
썸네일 디자인,
최고 성과 Shorts
출처 번호([1], [2] 등)는 굳이 포함하지 마.

View File

@ -79,7 +79,17 @@ class YouTubeClient:
if resp and resp.is_success: if resp and resp.is_success:
videos = resp.json().get("items", [])[:10] videos = resp.json().get("items", [])[:10]
return {"channelId": channel_id, "channel": channel, "videos": videos} playlists: list[dict] = []
resp = await http_request(
HTTPMethod.GET,
url=f"{YT}/playlists",
params={"part": "snippet", "channelId": channel_id, "maxResults": 50, "key": self.api_key},
label="yt-playlists",
)
if resp and resp.is_success:
playlists = resp.json().get("items", [])
return {"channelId": channel_id, "channel": channel, "videos": videos, "playlists": playlists}
async def get_channel(self, url: str) -> dict | None: async def get_channel(self, url: str) -> dict | None:
raw = await self.fetch_channel(url) raw = await self.fetch_channel(url)
@ -109,6 +119,11 @@ class YouTubeClient:
} }
for v in raw["videos"] for v in raw["videos"]
], ],
"playlists": [
p.get("snippet", {}).get("title")
for p in raw["playlists"]
if p.get("snippet", {}).get("title")
],
} }
async def search_channels(self, query: str, max_results: int = 3) -> list[str]: async def search_channels(self, query: str, max_results: int = 3) -> list[str]:

View File

@ -10,9 +10,7 @@ class ClinicResponse(BaseModel):
hospital_name: str hospital_name: str
hospital_name_en: str | None hospital_name_en: str | None
road_address: str | None road_address: str | None
url: str | None
status: str status: str
raw_data: dict | None
created_at: str created_at: str
updated_at: str updated_at: str

View File

@ -68,18 +68,12 @@ class RegistryData(CamelModel):
class ClinicSnapshot(CamelModel): class ClinicSnapshot(CamelModel):
name: str name: str
name_en: str name_en: str
established: str
years_in_business: int
staff_count: int staff_count: int
lead_doctor: LeadDoctor lead_doctor: LeadDoctor
overall_rating: float overall_rating: float
total_reviews: int total_reviews: int
price_range: PriceRange
certifications: list[str] certifications: list[str]
media_appearances: list[str]
medical_tourism: list[str]
location: str location: str
nearest_station: str
phone: str phone: str
domain: str domain: str
logo_images: LogoImages | None = None logo_images: LogoImages | None = None
@ -131,7 +125,6 @@ class YouTubeAudit(CamelModel):
avg_video_length: str avg_video_length: str
upload_frequency: str upload_frequency: str
channel_created_date: str channel_created_date: str
subscriber_rank: str
channel_description: str channel_description: str
linked_urls: list[LinkedUrl] linked_urls: list[LinkedUrl]
playlists: list[str] playlists: list[str]

View File

@ -36,9 +36,22 @@ class DataSource(StrEnum):
SCRAPE = "scrape" SCRAPE = "scrape"
class SourceType(StrEnum):
MAINPAGE = "mainpage"
INSTAGRAM = "instagram"
FACEBOOK = "facebook"
NAVER_BLOG = "naver_blog"
YOUTUBE = "youtube"
TIKTOK = "tiktok"
GANGNAM_UNNI = "gangnam_unni"
KAKAOTALK = "kakaotalk"
NAVER_CAFE = "naver_cafe"
class Language(StrEnum): class Language(StrEnum):
KR = "KR" KR = "KR"
EN = "EN" EN = "EN"
WW = "WW"
class VideoType(StrEnum): class VideoType(StrEnum):

View File

@ -1,10 +1,15 @@
import json import json
import logging import logging
import os import os
from common.db import fetchone, execute, fetch_raw, get_analysis_raw_data, save_analysis_report, get_market_analysis import re
from datetime import datetime
from urllib.parse import urlparse
from common.db.run import select_run, update_run_report, update_run_plan
from common.db.source import select_run_raw_data, select_run_mainpage_url
from common.db.market import select_market
from integrations.llm.llm_service import LLMService from integrations.llm.llm_service import LLMService
from integrations.llm.prompt import report_prompt, plan_prompt from integrations.llm.prompt import report_prompt, plan_prompt, youtube_diagnosis_prompt
from integrations.llm.schemas.report import ReportOutput from integrations.llm.schemas.report import ReportOutput, ClinicSnapshot, YouTubeAudit
from integrations.llm.schemas.plan import PlanOutput from integrations.llm.schemas.plan import PlanOutput
from models.status import AnalysisStatus from models.status import AnalysisStatus
@ -12,18 +17,9 @@ logger = logging.getLogger(__name__)
async def generate_report(analysis_run_id: str) -> ReportOutput: async def generate_report(analysis_run_id: str) -> ReportOutput:
run = await fetchone( raw = await select_run_raw_data(analysis_run_id)
"SELECT hospital_id FROM analysis_runs WHERE analysis_run_id = %s", clinic = raw.get("mainpage") or {}
(analysis_run_id,), market = await select_market(analysis_run_id)
)
clinic_row = await fetchone(
"SELECT raw_data FROM hospital_baseinfo WHERE hospital_id = %s",
(run["hospital_id"],),
)
raw_data = clinic_row["raw_data"] if clinic_row else None
clinic = json.loads(raw_data) if isinstance(raw_data, str) else (raw_data or {})
raw = await get_analysis_raw_data(analysis_run_id)
market = await get_market_analysis(analysis_run_id)
def _json(v) -> str | None: def _json(v) -> str | None:
return json.dumps(v, ensure_ascii=False) if v else None return json.dumps(v, ensure_ascii=False) if v else None
@ -41,27 +37,21 @@ async def generate_report(analysis_run_id: str) -> ReportOutput:
"market_trend": _json(market.get("trend")), "market_trend": _json(market.get("trend")),
"market_target_audience": _json(market.get("target_audience")), "market_target_audience": _json(market.get("target_audience")),
**{ **{
channel: _json(data) source_type: _json(data)
for channel, data in raw.items() for source_type, data in raw.items()
if source_type != "mainpage"
}, },
} }
return await LLMService(provider="perplexity").generate(report_prompt, input_data) return await LLMService(provider="perplexity").generate(report_prompt, input_data)
async def generate_plan(analysis_run_id: str) -> PlanOutput: async def generate_plan(analysis_run_id: str) -> PlanOutput:
run = await fetchone( run = await select_run(analysis_run_id)
"SELECT hospital_id, report_data FROM analysis_runs WHERE analysis_run_id = %s", raw = await select_run_raw_data(analysis_run_id)
(analysis_run_id,), clinic = raw.get("mainpage") or {}
)
clinic_row = await fetchone(
"SELECT raw_data FROM hospital_baseinfo WHERE hospital_id = %s",
(run["hospital_id"],),
)
raw_data = clinic_row["raw_data"] if clinic_row else None
clinic = json.loads(raw_data) if isinstance(raw_data, str) else (raw_data or {})
report_data = run["report_data"] report_data = run["report_data"]
report = json.loads(report_data) if isinstance(report_data, str) else report_data report = json.loads(report_data) if isinstance(report_data, str) else report_data
market = await get_market_analysis(analysis_run_id) market = await select_market(analysis_run_id)
def _json(v) -> str | None: def _json(v) -> str | None:
return json.dumps(v, ensure_ascii=False) if v else None return json.dumps(v, ensure_ascii=False) if v else None
@ -84,33 +74,15 @@ async def generate_plan(analysis_run_id: str) -> PlanOutput:
return await LLMService(provider="perplexity").generate(plan_prompt, input_data) return await LLMService(provider="perplexity").generate(plan_prompt, input_data)
async def _build_overrides(analysis_run_id: str) -> dict: def _build_clinic_snapshot(gangnam_unni: dict, hospital: dict) -> dict:
run = await fetchone(
"SELECT hospital_id, instagram_data_id, facebook_data_id,"
" naver_blog_data_id, youtube_data_id, gangnam_unni_data_id"
" FROM analysis_runs WHERE analysis_run_id = %s",
(analysis_run_id,),
)
if not run:
return {}
hospital_row = await fetchone(
"SELECT raw_data FROM hospital_baseinfo WHERE hospital_id = %s",
(run["hospital_id"],),
)
hospital = json.loads(hospital_row["raw_data"]) if hospital_row and isinstance(hospital_row.get("raw_data"), str) else (hospital_row or {}).get("raw_data") or {}
instagram = await fetch_raw("instagram_data", run["instagram_data_id"]) or {}
facebook = await fetch_raw("facebook_data", run["facebook_data_id"]) or {}
naver_blog = await fetch_raw("naver_blog_data", run["naver_blog_data_id"]) or {}
youtube = await fetch_raw("youtube_data", run["youtube_data_id"]) or {}
gangnam_unni = await fetch_raw("gangnam_unni_data", run["gangnam_unni_data_id"]) or {}
snapshot: dict = {} snapshot: dict = {}
# ── gangnam_unni ──────────────────────────────────────────────────────────
doctors = gangnam_unni.get("doctors", []) doctors = gangnam_unni.get("doctors", [])
lead = max(doctors, key=lambda d: d.get("reviews", 0)) if doctors else None lead = max(doctors, key=lambda d: d.get("reviews", 0)) if doctors else None
if gangnam_unni.get("name"): snapshot["name"] = gangnam_unni["name"] if gangnam_unni.get("name"): snapshot["name"] = gangnam_unni["name"]
if hospital.get("clinicNameEn"): snapshot["name_en"] = hospital["clinicNameEn"]
if hospital.get("phone"): snapshot["phone"] = hospital["phone"]
domain = hospital.get("domain") or urlparse(hospital.get("sourceUrl") or "").netloc
if domain: snapshot["domain"] = domain
if gangnam_unni.get("rating"): snapshot["overall_rating"] = gangnam_unni["rating"] if gangnam_unni.get("rating"): snapshot["overall_rating"] = gangnam_unni["rating"]
if gangnam_unni.get("totalReviews"): snapshot["total_reviews"] = gangnam_unni["totalReviews"] if gangnam_unni.get("totalReviews"): snapshot["total_reviews"] = gangnam_unni["totalReviews"]
if gangnam_unni.get("address"): snapshot["location"] = gangnam_unni["address"] if gangnam_unni.get("address"): snapshot["location"] = gangnam_unni["address"]
@ -123,6 +95,133 @@ async def _build_overrides(analysis_run_id: str) -> dict:
"rating": lead.get("rating"), "rating": lead.get("rating"),
"review_count": lead.get("reviews"), "review_count": lead.get("reviews"),
} }
return ClinicSnapshot.model_validate(snapshot).model_dump()
def _parse_iso_duration_seconds(iso: str) -> int:
m = re.match(r"PT(?:(\d+)H)?(?:(\d+)M)?(?:(\d+)S)?", iso or "")
if not m:
return 0
h, mins, s = (int(x or 0) for x in m.groups())
return h * 3600 + mins * 60 + s
def _format_seconds(seconds: int) -> str:
m, s = divmod(seconds, 60)
h, m = divmod(m, 60)
return f"{h}시간 {m}" if h else f"{m}{s}"
def _format_clock(seconds: int) -> str:
m, s = divmod(seconds, 60)
h, m = divmod(m, 60)
return f"{h}:{m:02d}:{s:02d}" if h else f"{m}:{s:02d}"
def _calc_avg_video_length(videos: list[dict]) -> str:
durations = [_parse_iso_duration_seconds(v.get("duration", "")) for v in videos]
durations = [d for d in durations if d > 0]
if not durations:
return ""
return _format_seconds(sum(durations) // len(durations))
def _relative_date(date_str: str) -> str:
if not date_str:
return ""
try:
past = datetime.fromisoformat(date_str[:10])
except ValueError:
return ""
days = (datetime.now() - past).days
if days < 1:
return "오늘"
if days < 30:
return f"{days}일 전"
if days < 365:
return f"{days // 30}개월 전"
return f"{days // 365}년 전"
def _calc_upload_frequency(videos: list[dict]) -> str:
dates = sorted(
[v["date"][:10] for v in videos if v.get("date")],
reverse=True,
)
if len(dates) < 2:
return ""
gaps = [
(datetime.fromisoformat(dates[i]) - datetime.fromisoformat(dates[i + 1])).days
for i in range(len(dates) - 1)
]
avg_days = sum(gaps) // len(gaps)
if avg_days <= 7:
return f"{7 // max(avg_days, 1)}"
if avg_days <= 30:
return f"{30 // avg_days}"
return f"{avg_days}일에 1회"
async def _build_youtube_audit(youtube: dict) -> dict:
videos = youtube.get("videos", [])
yt_patch: dict = {
"weekly_view_growth": {"absolute": 0, "percentage": 0.0},
"estimated_monthly_revenue": {"min": 0, "max": 0},
"linked_urls": [],
"avg_video_length": _calc_avg_video_length(videos),
"upload_frequency": _calc_upload_frequency(videos),
}
if youtube.get("channelName"): yt_patch["channel_name"] = youtube["channelName"]
if youtube.get("handle"): yt_patch["handle"] = youtube["handle"]
if youtube.get("subscribers"): yt_patch["subscribers"] = youtube["subscribers"]
if youtube.get("totalVideos"): yt_patch["total_videos"] = youtube["totalVideos"]
if youtube.get("totalViews"): yt_patch["total_views"] = youtube["totalViews"]
if youtube.get("publishedAt"): yt_patch["channel_created_date"] = youtube["publishedAt"][:10]
if youtube.get("description"): yt_patch["channel_description"] = youtube["description"]
if youtube.get("playlists"): yt_patch["playlists"] = youtube["playlists"]
if videos:
yt_patch["top_videos"] = [
{
"title": v["title"],
"views": v["views"],
"duration": _format_clock(_parse_iso_duration_seconds(v.get("duration", ""))),
"type": "Short" if "M" not in v.get("duration", "") else "Long",
"uploaded_ago": _relative_date(v.get("date", "")),
}
for v in videos
]
diagnosis_result = await LLMService(provider="perplexity").generate(
youtube_diagnosis_prompt,
{
"channel_name": yt_patch.get("channel_name"),
"subscribers": yt_patch.get("subscribers"),
"total_videos": yt_patch.get("total_videos"),
"total_views": yt_patch.get("total_views"),
"avg_video_length": yt_patch.get("avg_video_length"),
"upload_frequency": yt_patch.get("upload_frequency"),
"top_videos": json.dumps(yt_patch.get("top_videos", []), ensure_ascii=False),
"playlists": json.dumps(yt_patch.get("playlists", []), ensure_ascii=False),
},
)
yt_patch["diagnosis"] = [item.model_dump() for item in diagnosis_result.diagnosis]
return YouTubeAudit.model_validate(yt_patch).model_dump()
async def _build_overrides(analysis_run_id: str) -> dict:
raw = await select_run_raw_data(analysis_run_id)
if not raw:
return {}
mainpage = raw.get("mainpage", {}) or {}
instagram = raw.get("instagram", {}) or {}
facebook = raw.get("facebook", {}) or {}
youtube = raw.get("youtube", {}) or {}
gangnam_unni = raw.get("gangnam_unni", {}) or {}
snapshot: dict = _build_clinic_snapshot(gangnam_unni, mainpage)
yt_patch: dict = await _build_youtube_audit(youtube)
# ── instagram ───────────────────────────────────────────────────────────── # ── instagram ─────────────────────────────────────────────────────────────
ig_patch: dict = {} ig_patch: dict = {}
@ -143,26 +242,6 @@ async def _build_overrides(analysis_run_id: str) -> dict:
if facebook.get("categories"): fb_patch["category"] = ", ".join(facebook["categories"]) if facebook.get("categories"): fb_patch["category"] = ", ".join(facebook["categories"])
if facebook.get("website"): fb_patch["linked_domain"] = facebook["website"] if facebook.get("website"): fb_patch["linked_domain"] = facebook["website"]
# ── youtube ───────────────────────────────────────────────────────────────
yt_patch: dict = {}
if youtube.get("channelName"): yt_patch["channel_name"] = youtube["channelName"]
if youtube.get("handle"): yt_patch["handle"] = youtube["handle"]
if youtube.get("subscribers"): yt_patch["subscribers"] = youtube["subscribers"]
if youtube.get("totalVideos"): yt_patch["total_videos"] = youtube["totalVideos"]
if youtube.get("totalViews"): yt_patch["total_views"] = youtube["totalViews"]
if youtube.get("publishedAt"): yt_patch["channel_created_date"] = youtube["publishedAt"][:10]
if youtube.get("description"): yt_patch["channel_description"] = youtube["description"]
if youtube.get("videos"):
yt_patch["top_videos"] = [
{
"title": v["title"],
"views": v["views"],
"duration": v.get("duration"),
"type": "Short" if "M" not in v.get("duration", "") else "Long",
"uploaded_ago": v.get("date", "")[:10],
}
for v in youtube["videos"]
]
overrides: dict = {} overrides: dict = {}
if snapshot: if snapshot:
@ -198,12 +277,7 @@ _MOCK_REPORT_PATH = os.path.join(os.path.dirname(__file__), "../mock/report_view
async def _is_mock(analysis_run_id: str) -> bool: async def _is_mock(analysis_run_id: str) -> bool:
row = await fetchone( url = await select_run_mainpage_url(analysis_run_id)
"SELECT h.url FROM analysis_runs ar JOIN hospital_baseinfo h USING (hospital_id)"
" WHERE ar.analysis_run_id = %s",
(analysis_run_id,),
)
url = (row or {}).get("url") or ""
return any(domain in url for domain in _MOCK_DOMAINS) return any(domain in url for domain in _MOCK_DOMAINS)
@ -225,10 +299,11 @@ async def run_report_task(analysis_run_id: str) -> None:
if await _is_mock(analysis_run_id): if await _is_mock(analysis_run_id):
logger.info("[report] mock mode run=%s", analysis_run_id) logger.info("[report] mock mode run=%s", analysis_run_id)
result = _load_mock_report() result = _load_mock_report()
result.youtube_audit.linked_urls = []
else: else:
result = await generate_report(analysis_run_id) result = await generate_report(analysis_run_id)
result = _patch_report(result, await _build_overrides(analysis_run_id)) result = _patch_report(result, await _build_overrides(analysis_run_id))
await save_analysis_report(analysis_run_id, result.model_dump()) await update_run_report(analysis_run_id, result.model_dump())
logger.info("[report] done run=%s", analysis_run_id) logger.info("[report] done run=%s", analysis_run_id)
@ -239,8 +314,5 @@ async def run_plan_task(analysis_run_id: str) -> None:
result = _load_mock_plan() result = _load_mock_plan()
else: else:
result = await generate_plan(analysis_run_id) result = await generate_plan(analysis_run_id)
await execute( await update_run_plan(analysis_run_id, result.model_dump())
"UPDATE analysis_runs SET plan_data = %s WHERE analysis_run_id = %s",
(json.dumps(result.model_dump(), ensure_ascii=False), analysis_run_id),
)
logger.info("[plan] done run=%s", analysis_run_id) logger.info("[plan] done run=%s", analysis_run_id)

View File

@ -1,96 +1,110 @@
import asyncio import asyncio
import logging import logging
from common.db import fetchone from common.db.hospital import update_hospital_status, update_hospital
from common.db import ( from common.db.source import select_run_sources, update_raw_info_status, update_raw_info
set_instagram_status, save_instagram_raw_data,
set_facebook_status, save_facebook_raw_data,
set_naver_blog_status, save_naver_blog_raw_data,
set_youtube_status, save_youtube_raw_data,
set_gangnam_unni_status, save_gangnam_unni_raw_data,
execute, save_hospital_raw_data,
)
from common.utils import get_env from common.utils import get_env
from integrations.apify import ApifyClient from integrations.apify import ApifyClient
from integrations.naver import NaverClient from integrations.naver import NaverClient
from integrations.youtube import YouTubeClient from integrations.youtube import YouTubeClient
from integrations.firecrawl import FirecrawlClient from integrations.firecrawl import FirecrawlClient
from models.status import SourceType
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
async def collect_instagram(analysis_run_id: str, row_id: int, url: str) -> None: async def collect_instagram(analysis_run_id: str, info_id: int, url: str) -> None:
logger.info("[instagram] start run=%s url=%s", analysis_run_id, url) logger.info("[instagram] start run=%s url=%s", analysis_run_id, url)
await set_instagram_status(row_id, "processing") await update_raw_info_status(info_id, "processing")
data = await ApifyClient(get_env("APIFY_API_TOKEN")).get_instagram_profile(url) data = await ApifyClient(get_env("APIFY_API_TOKEN")).get_instagram_profile(url)
await save_instagram_raw_data(row_id, data) if data is None:
await update_raw_info_status(info_id, "failed")
logger.warning("[instagram] failed run=%s", analysis_run_id)
return
await update_raw_info(info_id, data)
logger.info("[instagram] done run=%s", analysis_run_id) logger.info("[instagram] done run=%s", analysis_run_id)
async def collect_facebook(analysis_run_id: str, row_id: int, url: str) -> None: async def collect_facebook(analysis_run_id: str, info_id: int, url: str) -> None:
logger.info("[facebook] start run=%s url=%s", analysis_run_id, url) logger.info("[facebook] start run=%s url=%s", analysis_run_id, url)
await set_facebook_status(row_id, "processing") await update_raw_info_status(info_id, "processing")
data = await ApifyClient(get_env("APIFY_API_TOKEN")).get_facebook_page(url) data = await ApifyClient(get_env("APIFY_API_TOKEN")).get_facebook_page(url)
await save_facebook_raw_data(row_id, data) if data is None:
await update_raw_info_status(info_id, "failed")
logger.warning("[facebook] failed run=%s", analysis_run_id)
return
await update_raw_info(info_id, data)
logger.info("[facebook] done run=%s", analysis_run_id) logger.info("[facebook] done run=%s", analysis_run_id)
async def collect_naver_blog(analysis_run_id: str, row_id: int, url: str) -> None: async def collect_naver_blog(analysis_run_id: str, info_id: int, url: str) -> None:
logger.info("[naver_blog] start run=%s url=%s", analysis_run_id, url) logger.info("[naver_blog] start run=%s url=%s", analysis_run_id, url)
await set_naver_blog_status(row_id, "processing") await update_raw_info_status(info_id, "processing")
data = await NaverClient(get_env("NAVER_CLIENT_ID"), get_env("NAVER_CLIENT_SECRET")).get_blog_rss(url) data = await NaverClient(get_env("NAVER_CLIENT_ID"), get_env("NAVER_CLIENT_SECRET")).get_blog_rss(url)
await save_naver_blog_raw_data(row_id, data) if data is None:
await update_raw_info_status(info_id, "failed")
logger.warning("[naver_blog] failed run=%s", analysis_run_id)
return
await update_raw_info(info_id, data)
logger.info("[naver_blog] done run=%s", analysis_run_id) logger.info("[naver_blog] done run=%s", analysis_run_id)
async def collect_youtube(analysis_run_id: str, row_id: int, url: str) -> None: async def collect_youtube(analysis_run_id: str, info_id: int, url: str) -> None:
logger.info("[youtube] start run=%s url=%s", analysis_run_id, url) logger.info("[youtube] start run=%s url=%s", analysis_run_id, url)
await set_youtube_status(row_id, "processing") await update_raw_info_status(info_id, "processing")
data = await YouTubeClient(get_env("YOUTUBE_API_KEY")).get_channel(url) data = await YouTubeClient(get_env("YOUTUBE_API_KEY")).get_channel(url)
await save_youtube_raw_data(row_id, data) if data is None:
await update_raw_info_status(info_id, "failed")
logger.warning("[youtube] failed run=%s", analysis_run_id)
return
await update_raw_info(info_id, data)
logger.info("[youtube] done run=%s", analysis_run_id) logger.info("[youtube] done run=%s", analysis_run_id)
async def collect_gangnam_unni(analysis_run_id: str, row_id: int, url: str) -> None: async def collect_gangnam_unni(analysis_run_id: str, info_id: int, url: str) -> None:
logger.info("[gangnam_unni] start run=%s url=%s", analysis_run_id, url) logger.info("[gangnam_unni] start run=%s url=%s", analysis_run_id, url)
await set_gangnam_unni_status(row_id, "processing") await update_raw_info_status(info_id, "processing")
data = await FirecrawlClient(get_env("FIRECRAWL_API_KEY")).get_gangnam_unni(url) data = await FirecrawlClient(get_env("FIRECRAWL_API_KEY")).get_gangnam_unni(url)
await save_gangnam_unni_raw_data(row_id, data) if data is None:
await update_raw_info_status(info_id, "failed")
logger.warning("[gangnam_unni] failed run=%s", analysis_run_id)
return
await update_raw_info(info_id, data)
logger.info("[gangnam_unni] done run=%s", analysis_run_id) logger.info("[gangnam_unni] done run=%s", analysis_run_id)
async def collect_clinic_info(analysis_run_id: str, hospital_id: str, url: str) -> None: async def collect_mainpage(analysis_run_id: str, info_id: int, hospital_id: str, url: str) -> None:
logger.info("[clinic] start run=%s url=%s", analysis_run_id, url) logger.info("[mainpage] start run=%s url=%s", analysis_run_id, url)
await execute("UPDATE hospital_baseinfo SET status = 'processing' WHERE hospital_id = %s", (hospital_id,)) await update_raw_info_status(info_id, "processing")
await update_hospital_status(hospital_id, "processing")
data = await FirecrawlClient(get_env("FIRECRAWL_API_KEY")).fetch_clinic_info(url) data = await FirecrawlClient(get_env("FIRECRAWL_API_KEY")).fetch_clinic_info(url)
await save_hospital_raw_data(hospital_id, data, analysis_run_id=analysis_run_id) if data is None:
logger.info("[clinic] done run=%s", analysis_run_id) await update_raw_info_status(info_id, "failed")
logger.warning("[mainpage] failed run=%s", analysis_run_id)
return
await update_raw_info(info_id, data)
await update_hospital(hospital_id, data, analysis_run_id=analysis_run_id)
logger.info("[mainpage] done run=%s", analysis_run_id)
async def collect_all( async def collect_all(analysis_run_id: str, hospital_id: str) -> None:
analysis_run_id: str, rows = await select_run_sources(analysis_run_id)
hospital_id: str,
instagram_id: int | None = None,
facebook_id: int | None = None,
naver_blog_id: int | None = None,
youtube_id: int | None = None,
gangnam_unni_id: int | None = None,
) -> None:
async def _url(table: str, row_id: int) -> str:
row = await fetchone(f"SELECT url FROM {table} WHERE id = %s", (row_id,))
return row["url"] if row else ""
hospital = await fetchone("SELECT url FROM hospital_baseinfo WHERE hospital_id = %s", (hospital_id,)) _collectors = {
tasks = [collect_clinic_info(analysis_run_id, hospital_id, hospital["url"])] SourceType.INSTAGRAM: collect_instagram,
SourceType.FACEBOOK: collect_facebook,
SourceType.NAVER_BLOG: collect_naver_blog,
SourceType.YOUTUBE: collect_youtube,
SourceType.GANGNAM_UNNI: collect_gangnam_unni,
}
if instagram_id: tasks = []
tasks.append(collect_instagram(analysis_run_id, instagram_id, await _url("instagram_data", instagram_id))) for row in rows:
if facebook_id: info_id = row["info_id"]
tasks.append(collect_facebook(analysis_run_id, facebook_id, await _url("facebook_data", facebook_id))) source_type = row["source_type"]
if naver_blog_id: url = row["url"]
tasks.append(collect_naver_blog(analysis_run_id, naver_blog_id, await _url("naver_blog_data", naver_blog_id))) if source_type == SourceType.MAINPAGE:
if youtube_id: tasks.append(collect_mainpage(analysis_run_id, info_id, hospital_id, url))
tasks.append(collect_youtube(analysis_run_id, youtube_id, await _url("youtube_data", youtube_id))) elif source_type in _collectors:
if gangnam_unni_id: tasks.append(_collectors[source_type](analysis_run_id, info_id, url))
tasks.append(collect_gangnam_unni(analysis_run_id, gangnam_unni_id, await _url("gangnam_unni_data", gangnam_unni_id)))
await asyncio.gather(*tasks, return_exceptions=True) await asyncio.gather(*tasks, return_exceptions=True)

View File

@ -2,7 +2,8 @@ import logging
from fastapi import HTTPException, UploadFile from fastapi import HTTPException, UploadFile
from common.db import execute, fetchall, fetchone, insert_file_row from common.db.run import select_run
from common.db.file_data import insert_file, select_run_files, select_file, delete_file
from integrations.azure_blob import AzureBlobUploader from integrations.azure_blob import AzureBlobUploader
from models.file import FileListItem, FileType, FileUploadResponse from models.file import FileListItem, FileType, FileUploadResponse
@ -31,10 +32,7 @@ async def upload_analysis_file(
content_type: str | None = None, content_type: str | None = None,
) -> tuple[int, str]: ) -> tuple[int, str]:
"""analysis_run에 딸린 파일 업로드. Blob 업로드 + file_data row 생성. (file_id, url) 반환.""" """analysis_run에 딸린 파일 업로드. Blob 업로드 + file_data row 생성. (file_id, url) 반환."""
run = await fetchone( run = await select_run(analysis_run_id)
"SELECT hospital_id FROM analysis_runs WHERE analysis_run_id = %s",
(analysis_run_id,),
)
if not run: if not run:
raise HTTPException(status_code=404, detail="analysis_run not found") raise HTTPException(status_code=404, detail="analysis_run not found")
hospital_id = run["hospital_id"] hospital_id = run["hospital_id"]
@ -47,7 +45,7 @@ async def upload_analysis_file(
content_type=content_type, content_type=content_type,
) )
file_id = await insert_file_row( file_id = await insert_file(
analysis_run_id=analysis_run_id, analysis_run_id=analysis_run_id,
hospital_id=hospital_id, hospital_id=hospital_id,
file_type=file_type, file_type=file_type,
@ -61,12 +59,7 @@ async def upload_analysis_file(
async def list_analysis_files(analysis_run_id: str) -> list[dict]: async def list_analysis_files(analysis_run_id: str) -> list[dict]:
"""analysis_run에 딸린 (삭제 안 된) 파일 목록.""" """analysis_run에 딸린 (삭제 안 된) 파일 목록."""
return await fetchall( return await select_run_files(analysis_run_id)
"SELECT id, file_type, file_name, file_url, size_bytes, created_at FROM file_data"
" WHERE analysis_run_id = %s AND is_deleted = FALSE"
" ORDER BY created_at DESC",
(analysis_run_id,),
)
async def handle_analysis_file_upload( async def handle_analysis_file_upload(
@ -102,7 +95,7 @@ async def handle_analysis_file_upload(
async def get_analysis_files_response(analysis_run_id: str) -> list[FileListItem]: async def get_analysis_files_response(analysis_run_id: str) -> list[FileListItem]:
"""run 존재 확인 + 응답 모델 생성.""" """run 존재 확인 + 응답 모델 생성."""
if not await fetchone("SELECT 1 FROM analysis_runs WHERE analysis_run_id = %s", (analysis_run_id,)): if not await select_run(analysis_run_id):
raise HTTPException(status_code=404, detail="analysis_run not found") raise HTTPException(status_code=404, detail="analysis_run not found")
rows = await list_analysis_files(analysis_run_id) rows = await list_analysis_files(analysis_run_id)
return [FileListItem(**{**r, "created_at": str(r["created_at"])}) for r in rows] return [FileListItem(**{**r, "created_at": str(r["created_at"])}) for r in rows]
@ -110,14 +103,8 @@ async def get_analysis_files_response(analysis_run_id: str) -> list[FileListItem
async def soft_delete_analysis_file(analysis_run_id: str, file_id: int) -> None: async def soft_delete_analysis_file(analysis_run_id: str, file_id: int) -> None:
"""analysis_run에 딸린 파일을 소프트 삭제. 멱등성 보장.""" """analysis_run에 딸린 파일을 소프트 삭제. 멱등성 보장."""
row = await fetchone( row = await select_file(file_id, analysis_run_id)
"SELECT id FROM file_data WHERE id = %s AND analysis_run_id = %s",
(file_id, analysis_run_id),
)
if not row: if not row:
raise HTTPException(status_code=404, detail="file not found") raise HTTPException(status_code=404, detail="file not found")
await execute( await delete_file(file_id)
"UPDATE file_data SET is_deleted = TRUE WHERE id = %s AND is_deleted = FALSE",
(file_id,),
)
logger.info("soft-deleted analysis file run=%s file_id=%s", analysis_run_id, file_id) logger.info("soft-deleted analysis file run=%s file_id=%s", analysis_run_id, file_id)

View File

@ -1,7 +1,9 @@
import asyncio import asyncio
import json
import logging import logging
from common.db import fetchone, execute from common.db.run import select_run
from common.db.hospital import select_hospital
from common.db.market import upsert_market_status, upsert_market_result
from common.db.source import select_run_raw_data
from integrations.llm.llm_service import LLMService from integrations.llm.llm_service import LLMService
from integrations.llm.prompt import ( from integrations.llm.prompt import (
market_competitors_prompt, market_competitors_prompt,
@ -18,49 +20,27 @@ _TYPES = ["competitors", "keywords", "trend", "target_audience"]
async def _save(analysis_run_id: str, analysis_type: str, result, exc: Exception | None) -> None: async def _save(analysis_run_id: str, analysis_type: str, result, exc: Exception | None) -> None:
if exc: if exc:
logger.warning("[market] %s failed run=%s: %s", analysis_type, analysis_run_id, exc) logger.warning("[market] %s failed run=%s: %s", analysis_type, analysis_run_id, exc)
await execute( await upsert_market_status(analysis_run_id, analysis_type, "failed")
"INSERT INTO market_analysis (analysis_run_id, analysis_type, status)"
" VALUES (%s, %s, 'failed')"
" ON DUPLICATE KEY UPDATE status = 'failed'",
(analysis_run_id, analysis_type),
)
else: else:
await execute( await upsert_market_result(analysis_run_id, analysis_type, result.model_dump())
"INSERT INTO market_analysis (analysis_run_id, analysis_type, status, data)"
" VALUES (%s, %s, 'done', %s)"
" ON DUPLICATE KEY UPDATE status = 'done', data = VALUES(data)",
(analysis_run_id, analysis_type, json.dumps(result.model_dump(), ensure_ascii=False)),
)
async def run_market_analysis(analysis_run_id: str) -> None: async def run_market_analysis(analysis_run_id: str) -> None:
logger.info("[market] start run=%s", analysis_run_id) logger.info("[market] start run=%s", analysis_run_id)
run = await fetchone( run = await select_run(analysis_run_id)
"SELECT hospital_id FROM analysis_runs WHERE analysis_run_id = %s", clinic = await select_hospital(run["hospital_id"])
(analysis_run_id,), raw = await select_run_raw_data(analysis_run_id)
) mainpage = raw.get("mainpage") or {}
clinic = await fetchone(
"SELECT hospital_name, road_address, raw_data FROM hospital_baseinfo WHERE hospital_id = %s",
(run["hospital_id"],),
)
raw_data = clinic["raw_data"] clinic_name = (clinic or {}).get("hospital_name") or ""
clinic_data = json.loads(raw_data) if isinstance(raw_data, str) else (raw_data or {}) address = (clinic or {}).get("road_address") or ""
services = mainpage.get("services", [])
clinic_name = clinic["hospital_name"] or ""
address = clinic["road_address"] or ""
services = clinic_data.get("services", [])
services_str = ", ".join(services[:3]) services_str = ", ".join(services[:3])
primary_service = services[0] if services else "" primary_service = services[0] if services else ""
for analysis_type in _TYPES: for analysis_type in _TYPES:
await execute( await upsert_market_status(analysis_run_id, analysis_type, "processing")
"INSERT INTO market_analysis (analysis_run_id, analysis_type, status)"
" VALUES (%s, %s, 'processing')"
" ON DUPLICATE KEY UPDATE status = 'processing'",
(analysis_run_id, analysis_type),
)
llm = LLMService(provider="perplexity") llm = LLMService(provider="perplexity")
results = await asyncio.gather( results = await asyncio.gather(

View File

@ -1,5 +1,5 @@
import logging import logging
from common.db import fetchone, execute from common.db.run import select_run, update_run_status
from models.status import AnalysisStatus from models.status import AnalysisStatus
from services.collect import collect_all from services.collect import collect_all
from services.market import run_market_analysis from services.market import run_market_analysis
@ -12,41 +12,19 @@ async def run_pipeline(analysis_run_id: str) -> None:
logger.info("[pipeline] start run=%s", analysis_run_id) logger.info("[pipeline] start run=%s", analysis_run_id)
# ── 1. Collect ────────────────────────────────────────────────────────── # ── 1. Collect ──────────────────────────────────────────────────────────
run = await fetchone( run = await select_run(analysis_run_id)
"SELECT hospital_id, instagram_data_id, facebook_data_id," await collect_all(analysis_run_id, hospital_id=run["hospital_id"])
" naver_blog_data_id, youtube_data_id, gangnam_unni_data_id"
" FROM analysis_runs WHERE analysis_run_id = %s",
(analysis_run_id,),
)
await collect_all(
analysis_run_id,
hospital_id=run["hospital_id"],
instagram_id=run["instagram_data_id"],
facebook_id=run["facebook_data_id"],
naver_blog_id=run["naver_blog_data_id"],
youtube_id=run["youtube_data_id"],
gangnam_unni_id=run["gangnam_unni_data_id"],
)
# ── 2. Market ──────────────────────────────────────────────────────────── # ── 2. Market ────────────────────────────────────────────────────────────
await execute( await update_run_status(analysis_run_id, AnalysisStatus.ANALYZING)
"UPDATE analysis_runs SET status = %s WHERE analysis_run_id = %s",
(AnalysisStatus.ANALYZING, analysis_run_id),
)
await run_market_analysis(analysis_run_id) await run_market_analysis(analysis_run_id)
# ── 3. Report ──────────────────────────────────────────────────────────── # ── 3. Report ────────────────────────────────────────────────────────────
await run_report_task(analysis_run_id) await run_report_task(analysis_run_id)
# ── 4. Plan ────────────────────────────────────────────────────────────── # ── 4. Plan ──────────────────────────────────────────────────────────────
await execute( await update_run_status(analysis_run_id, AnalysisStatus.PLANNING)
"UPDATE analysis_runs SET status = %s WHERE analysis_run_id = %s",
(AnalysisStatus.PLANNING, analysis_run_id),
)
await run_plan_task(analysis_run_id) await run_plan_task(analysis_run_id)
await execute( await update_run_status(analysis_run_id, AnalysisStatus.COMPLETED)
"UPDATE analysis_runs SET status = %s WHERE analysis_run_id = %s",
(AnalysisStatus.COMPLETED, analysis_run_id),
)
logger.info("[pipeline] done run=%s", analysis_run_id) logger.info("[pipeline] done run=%s", analysis_run_id)