From 3b4c154fb222dd4f500dc2250b4725c2accd4cfb Mon Sep 17 00:00:00 2001 From: jaehwang Date: Mon, 1 Jun 2026 15:31:33 +0900 Subject: [PATCH] db migration done --- SQL/db_create.sql | 249 ++++++++-------------- SQL/db_create_2.sql | 134 ------------ app/api/analysis.py | 44 ++-- app/api/clinics.py | 12 +- app/api/plan.py | 12 +- app/api/report.py | 12 +- app/common/db.py | 274 ------------------------- app/common/db/__init__.py | 14 ++ app/common/db/base.py | 56 +++++ app/common/db/file_data.py | 39 ++++ app/common/db/hospital.py | 78 +++++++ app/common/db/market.py | 31 +++ app/common/db/run.py | 64 ++++++ app/common/db/source.py | 85 ++++++++ app/common/db2.py | 236 --------------------- app/integrations/apify.py | 1 + app/models/clinic.py | 2 - app/models/status.py | 13 ++ app/services/analysis.py | 84 +++----- app/services/collect.py | 120 ++++++----- app/services/{file.py => file_data.py} | 29 +-- app/services/market.py | 48 ++--- app/services/pipeline.py | 34 +-- 23 files changed, 622 insertions(+), 1049 deletions(-) delete mode 100644 SQL/db_create_2.sql delete mode 100644 app/common/db.py create mode 100644 app/common/db/__init__.py create mode 100644 app/common/db/base.py create mode 100644 app/common/db/file_data.py create mode 100644 app/common/db/hospital.py create mode 100644 app/common/db/market.py create mode 100644 app/common/db/run.py create mode 100644 app/common/db/source.py delete mode 100644 app/common/db2.py rename app/services/{file.py => file_data.py} (80%) diff --git a/SQL/db_create.sql b/SQL/db_create.sql index c4b6f02..19a835e 100644 --- a/SQL/db_create.sql +++ b/SQL/db_create.sql @@ -1,158 +1,87 @@ --- 테이블 순서는 관계를 고려하여 한 번에 실행해도 에러가 발생하지 않게 정렬되었습니다. - --- instagram_data Table Create SQL --- 테이블 생성 SQL - instagram_data -CREATE TABLE instagram_data -( - `id` INT NOT NULL AUTO_INCREMENT, - `hospital_id` CHAR(36) NOT NULL, - `url` VARCHAR(500) NOT NULL, - `status` VARCHAR(20) NOT NULL DEFAULT 'start', - `raw_data` JSON NULL, - `created_at` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, - PRIMARY KEY (id) -); - --- Index 설정 SQL - instagram_data(hospital_id) -CREATE INDEX IX_instagram_data_1 - ON instagram_data(hospital_id); - - --- facebook_data Table Create SQL --- 테이블 생성 SQL - facebook_data -CREATE TABLE facebook_data -( - `id` INT NOT NULL AUTO_INCREMENT, - `hospital_id` CHAR(36) NOT NULL, - `url` VARCHAR(500) NOT NULL, - `status` VARCHAR(20) NOT NULL DEFAULT 'start', - `raw_data` JSON NULL, - `created_at` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, - PRIMARY KEY (id) -); - --- Index 설정 SQL - facebook_data(hospital_id) -CREATE INDEX IX_facebook_data_1 - ON facebook_data(hospital_id); - - --- naver_blog_data Table Create SQL --- 테이블 생성 SQL - naver_blog_data -CREATE TABLE naver_blog_data -( - `id` INT NOT NULL AUTO_INCREMENT, - `hospital_id` CHAR(36) NOT NULL, - `url` VARCHAR(500) NOT NULL, - `status` VARCHAR(20) NOT NULL DEFAULT 'start', - `raw_data` JSON NULL, - `created_at` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, - PRIMARY KEY (id) -); - --- Index 설정 SQL - naver_blog_data(hospital_id) -CREATE INDEX IX_naver_blog_data_1 - ON naver_blog_data(hospital_id); - - --- hospital_baseinfo Table Create SQL --- 테이블 생성 SQL - hospital_baseinfo -CREATE TABLE hospital_baseinfo -( - `hospital_id` CHAR(36) NOT NULL, - `owner_user_id` INT NOT NULL, - `hospital_name` VARCHAR(50) NOT NULL, - `hospital_name_en` VARCHAR(50) NULL, - `brn` VARCHAR(50) NOT NULL, - `road_address` VARCHAR(100) NULL, - `site_address` VARCHAR(100) NULL, - `url` VARCHAR(500) NULL, - `status` VARCHAR(20) NOT NULL DEFAULT 'start', - `raw_data` JSON NULL, - `created_at` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, - `updated_at` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, - PRIMARY KEY (hospital_id) -); - --- Index 설정 SQL - hospital_baseinfo(owner_user_id) -CREATE INDEX IX_hospital_baseinfo_1 - ON hospital_baseinfo(owner_user_id); - - --- user_info Table Create SQL --- 테이블 생성 SQL - user_info +-- user_info CREATE TABLE user_info ( - `user_id` INT NOT NULL AUTO_INCREMENT, - `username` VARCHAR(50) NOT NULL, - `password` VARCHAR(50) NOT NULL, - `created_at` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, - `updated_at` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, + `user_id` INT NOT NULL AUTO_INCREMENT, + `username` VARCHAR(50) NOT NULL, + `password` VARCHAR(50) NOT NULL, + `created_at` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, + `updated_at` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, PRIMARY KEY (user_id) ); --- youtube_data Table Create SQL -CREATE TABLE youtube_data + +-- hospital_baseinfo +CREATE TABLE hospital_baseinfo ( - `id` INT NOT NULL AUTO_INCREMENT, - `hospital_id` CHAR(36) NOT NULL, - `url` VARCHAR(500) NOT NULL, - `status` VARCHAR(20) NOT NULL DEFAULT 'start', - `raw_data` JSON NULL, - `created_at` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, - PRIMARY KEY (id) + `hospital_id` CHAR(36) NOT NULL, + `owner_user_id` INT NOT NULL, + `hospital_name` VARCHAR(50) NOT NULL, + `hospital_name_en` VARCHAR(50) NULL, + `brn` VARCHAR(50) NOT NULL, + `road_address` VARCHAR(100) NULL, + `site_address` VARCHAR(100) NULL, + `status` VARCHAR(20) NOT NULL DEFAULT 'start', + `created_at` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, + `updated_at` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, + PRIMARY KEY (hospital_id) ); --- Index 설정 SQL - youtube_data(hospital_id) -CREATE INDEX IX_youtube_data_1 - ON youtube_data(hospital_id); +CREATE INDEX IX_hospital_baseinfo_1 ON hospital_baseinfo (owner_user_id); --- gangnam_unni_data Table Create SQL -CREATE TABLE gangnam_unni_data +-- remote_source: 병원별 채널 소스 정보 (instagram/facebook/naver_blog/youtube/gangnam_unni 등) +CREATE TABLE remote_source ( - `id` INT NOT NULL AUTO_INCREMENT, - `hospital_id` CHAR(36) NOT NULL, - `url` VARCHAR(500) NOT NULL, - `status` VARCHAR(20) NOT NULL DEFAULT 'start', - `raw_data` JSON NULL, - `created_at` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, - PRIMARY KEY (id) + `source_id` INT NOT NULL AUTO_INCREMENT, + `hospital_id` CHAR(36) NOT NULL, + `source_type` VARCHAR(50) NOT NULL, + `language` CHAR(2) NULL, + `url` VARCHAR(500) NOT NULL, + `created_at` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, + PRIMARY KEY (source_id) ); --- Index 설정 SQL - gangnam_unni_data(hospital_id) -CREATE INDEX IX_gangnam_unni_data_1 - ON gangnam_unni_data(hospital_id); +CREATE INDEX IX_remote_source_1 ON remote_source (hospital_id); +CREATE INDEX IX_remote_source_2 ON remote_source (hospital_id, source_type); --- analysis_runs Table Create SQL +-- analysis_runs CREATE TABLE analysis_runs ( - `analysis_run_id` CHAR(36) NOT NULL, - `hospital_id` CHAR(36) NOT NULL, - `owner_user_id` INT NOT NULL DEFAULT 0, - `status` VARCHAR(50) NOT NULL DEFAULT 'discovering', - `instagram_data_id` INT NULL, - `facebook_data_id` INT NULL, - `naver_blog_data_id` INT NULL, - `youtube_data_id` INT NULL, - `gangnam_unni_data_id` INT NULL, - `report_data` JSON NULL, - `plan_data` JSON NULL, - `created_at` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, - `updated_at` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, + `analysis_run_id` CHAR(36) NOT NULL, + `hospital_id` CHAR(36) NOT NULL, + `owner_user_id` INT NOT NULL DEFAULT 0, + `status` VARCHAR(50) NOT NULL DEFAULT 'discovering', + `report_data` JSON NULL, + `plan_data` JSON NULL, + `created_at` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, + `updated_at` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, PRIMARY KEY (analysis_run_id) ); --- Index 설정 SQL - analysis_runs(hospital_id) -CREATE INDEX IX_analysis_runs_1 - ON analysis_runs(hospital_id); - --- Index 설정 SQL - analysis_runs(owner_user_id) -CREATE INDEX IX_analysis_runs_2 - ON analysis_runs(owner_user_id); +CREATE INDEX IX_analysis_runs_1 ON analysis_runs (hospital_id); +CREATE INDEX IX_analysis_runs_2 ON analysis_runs (owner_user_id); --- file_data Table Create SQL +-- raw_info: 분석 실행별 수집 원시 데이터 +CREATE TABLE raw_info +( + `info_id` INT NOT NULL AUTO_INCREMENT, + `source_id` INT NOT NULL, + `analysis_run_id` CHAR(36) NOT NULL, + `data_tag` VARCHAR(50) NOT NULL DEFAULT 'default', + `status` VARCHAR(20) NOT NULL DEFAULT 'start', + `raw_data` JSON NULL, + `created_at` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, + `updated_at` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, + PRIMARY KEY (info_id) +); + +CREATE INDEX IX_raw_info_1 ON raw_info (analysis_run_id); +CREATE INDEX IX_raw_info_2 ON raw_info (source_id); + + +-- file_data CREATE TABLE file_data ( `id` INT NOT NULL AUTO_INCREMENT, @@ -169,48 +98,38 @@ CREATE TABLE file_data ); --- hospital_history Table Create SQL +-- hospital_history CREATE TABLE hospital_history ( - `id` INT NOT NULL AUTO_INCREMENT, - `hospital_id` CHAR(36) NOT NULL, - `owner_user_id` INT NOT NULL, - `hospital_name` VARCHAR(50) NOT NULL, - `hospital_name_en` VARCHAR(50) NULL, - `brn` VARCHAR(50) NOT NULL, - `road_address` VARCHAR(100) NULL, - `site_address` VARCHAR(100) NULL, - `url` VARCHAR(500) NULL, - `status` VARCHAR(20) NOT NULL, - `raw_data` JSON NULL, - `analysis_run_id` CHAR(36) NULL, - `created_at` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, + `id` INT NOT NULL AUTO_INCREMENT, + `hospital_id` CHAR(36) NOT NULL, + `owner_user_id` INT NOT NULL, + `hospital_name` VARCHAR(50) NOT NULL, + `hospital_name_en` VARCHAR(50) NULL, + `brn` VARCHAR(50) NOT NULL, + `road_address` VARCHAR(100) NULL, + `site_address` VARCHAR(100) NULL, + `status` VARCHAR(20) NOT NULL, + `analysis_run_id` CHAR(36) NULL, + `created_at` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, PRIMARY KEY (id) ); --- Index 설정 SQL - hospital_history(hospital_id) -CREATE INDEX IX_hospital_history_1 - ON hospital_history(hospital_id); - --- Index 설정 SQL - hospital_history(analysis_run_id) -CREATE INDEX IX_hospital_history_2 - ON hospital_history(analysis_run_id); +CREATE INDEX IX_hospital_history_1 ON hospital_history (hospital_id); +CREATE INDEX IX_hospital_history_2 ON hospital_history (analysis_run_id); --- market_analysis Table Create SQL +-- market_analysis CREATE TABLE market_analysis ( - `id` INT NOT NULL AUTO_INCREMENT, - `analysis_run_id` CHAR(36) NOT NULL, - `analysis_type` VARCHAR(50) NOT NULL, - `status` VARCHAR(20) NOT NULL DEFAULT 'start', - `data` JSON NULL, - `created_at` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, + `id` INT NOT NULL AUTO_INCREMENT, + `analysis_run_id` CHAR(36) NOT NULL, + `analysis_type` VARCHAR(50) NOT NULL, + `status` VARCHAR(20) NOT NULL DEFAULT 'start', + `data` JSON NULL, + `created_at` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, PRIMARY KEY (id), UNIQUE KEY UQ_market_analysis (analysis_run_id, analysis_type) ); --- Index 설정 SQL - market_analysis(analysis_run_id) -CREATE INDEX IX_market_analysis_1 - ON market_analysis(analysis_run_id); - +CREATE INDEX IX_market_analysis_1 ON market_analysis (analysis_run_id); diff --git a/SQL/db_create_2.sql b/SQL/db_create_2.sql deleted file mode 100644 index d1d5c52..0000000 --- a/SQL/db_create_2.sql +++ /dev/null @@ -1,134 +0,0 @@ --- user_info -CREATE TABLE user_info -( - `user_id` INT NOT NULL AUTO_INCREMENT, - `username` VARCHAR(50) NOT NULL, - `password` VARCHAR(50) NOT NULL, - `created_at` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, - `updated_at` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, - PRIMARY KEY (user_id) -); - - --- hospital_baseinfo -CREATE TABLE hospital_baseinfo -( - `hospital_id` CHAR(36) NOT NULL, - `owner_user_id` INT NOT NULL, - `hospital_name` VARCHAR(50) NOT NULL, - `hospital_name_en` VARCHAR(50) NULL, - `brn` VARCHAR(50) NOT NULL, - `road_address` VARCHAR(100) NULL, - `site_address` VARCHAR(100) NULL, - `status` VARCHAR(20) NOT NULL DEFAULT 'start', - `created_at` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, - `updated_at` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, - PRIMARY KEY (hospital_id) -); - -CREATE INDEX IX_hospital_baseinfo_1 ON hospital_baseinfo (owner_user_id); - - --- remote_source: 병원별 채널 소스 정보 (instagram/facebook/naver_blog/youtube/gangnam_unni 등) -CREATE TABLE remote_source -( - `source_id` INT NOT NULL AUTO_INCREMENT, - `hospital_id` CHAR(36) NOT NULL, - `source_type` VARCHAR(50) NOT NULL, - `url` VARCHAR(500) NOT NULL, - `created_at` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, - PRIMARY KEY (source_id) -); - -CREATE INDEX IX_remote_source_1 ON remote_source (hospital_id); -CREATE INDEX IX_remote_source_2 ON remote_source (hospital_id, source_type); - - --- analysis_runs -CREATE TABLE analysis_runs -( - `analysis_run_id` CHAR(36) NOT NULL, - `hospital_id` CHAR(36) NOT NULL, - `owner_user_id` INT NOT NULL DEFAULT 0, - `status` VARCHAR(50) NOT NULL DEFAULT 'discovering', - `report_data` JSON NULL, - `plan_data` JSON NULL, - `created_at` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, - `updated_at` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, - PRIMARY KEY (analysis_run_id) -); - -CREATE INDEX IX_analysis_runs_1 ON analysis_runs (hospital_id); -CREATE INDEX IX_analysis_runs_2 ON analysis_runs (owner_user_id); - - --- raw_info: 분석 실행별 수집 원시 데이터 -CREATE TABLE raw_info -( - `info_id` INT NOT NULL AUTO_INCREMENT, - `source_id` INT NOT NULL, - `analysis_run_id` CHAR(36) NOT NULL, - `data_tag` VARCHAR(50) NOT NULL DEFAULT 'default', - `status` VARCHAR(20) NOT NULL DEFAULT 'start', - `raw_data` JSON NULL, - `created_at` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, - `updated_at` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, - PRIMARY KEY (info_id) -); - -CREATE INDEX IX_raw_info_1 ON raw_info (analysis_run_id); -CREATE INDEX IX_raw_info_2 ON raw_info (source_id); - - --- file_data -CREATE TABLE file_data -( - `id` INT NOT NULL AUTO_INCREMENT, - `analysis_run_id` CHAR(36) NOT NULL, - `hospital_id` CHAR(36) NULL, - `file_type` ENUM('image','video','audio','document','file') NOT NULL DEFAULT 'file', - `file_name` VARCHAR(255) NOT NULL, - `file_url` VARCHAR(2048) NOT NULL, - `size_bytes` BIGINT NULL, - `is_deleted` BOOLEAN NOT NULL DEFAULT FALSE, - `created_at` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, - PRIMARY KEY (id), - INDEX IX_file_data_1 (analysis_run_id, is_deleted) -); - - --- hospital_history -CREATE TABLE hospital_history -( - `id` INT NOT NULL AUTO_INCREMENT, - `hospital_id` CHAR(36) NOT NULL, - `owner_user_id` INT NOT NULL, - `hospital_name` VARCHAR(50) NOT NULL, - `hospital_name_en` VARCHAR(50) NULL, - `brn` VARCHAR(50) NOT NULL, - `road_address` VARCHAR(100) NULL, - `site_address` VARCHAR(100) NULL, - `status` VARCHAR(20) NOT NULL, - `analysis_run_id` CHAR(36) NULL, - `created_at` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, - PRIMARY KEY (id) -); - -CREATE INDEX IX_hospital_history_1 ON hospital_history (hospital_id); -CREATE INDEX IX_hospital_history_2 ON hospital_history (analysis_run_id); - - --- market_analysis -CREATE TABLE market_analysis -( - `id` INT NOT NULL AUTO_INCREMENT, - `analysis_run_id` CHAR(36) NOT NULL, - `analysis_type` VARCHAR(50) NOT NULL, - `status` VARCHAR(20) NOT NULL DEFAULT 'start', - `data` JSON NULL, - `created_at` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, - PRIMARY KEY (id), - UNIQUE KEY UQ_market_analysis (analysis_run_id, analysis_type) -); - -CREATE INDEX IX_market_analysis_1 ON market_analysis (analysis_run_id); diff --git a/app/api/analysis.py b/app/api/analysis.py index 7e2691c..e5d30cc 100644 --- a/app/api/analysis.py +++ b/app/api/analysis.py @@ -2,12 +2,14 @@ import logging import uuid6 from fastapi import APIRouter, BackgroundTasks, Depends, File, Form, HTTPException, UploadFile, status from common.deps import verify_api_key -from common.db import fetchone, insert_instagram_row, insert_facebook_row, insert_naver_blog_row, insert_youtube_row, insert_gangnam_unni_row, insert_analysis_run +from common.db.hospital import select_hospital +from common.db.source import select_source_mainpage, insert_source, insert_raw_info +from common.db.run import insert_run, select_run_status from models.analysis import AnalysisCreate, AnalysisStartResponse, AnalysisStatusResponse from models.file import FileListItem, FileType, FileUploadResponse -from models.status import AnalysisStatus +from models.status import AnalysisStatus, SourceType from services.pipeline import run_pipeline -from services.file import get_analysis_files_response, handle_analysis_file_upload, soft_delete_analysis_file +from services.file_data import get_analysis_files_response, handle_analysis_file_upload, soft_delete_analysis_file router = APIRouter(prefix="/api/analysis", tags=["analysis"], dependencies=[Depends(verify_api_key)]) logger = logging.getLogger(__name__) @@ -20,23 +22,27 @@ async def start_analysis(body: AnalysisCreate, background_tasks: BackgroundTasks hospital_id = body.clinic_id # 사실 hospital과 owner_user_id 비교 후 검증이 필요한 거지만 일단 PoC 니까. 나중에 바꿉니다. - hospital = await fetchone( - "SELECT owner_user_id, url FROM hospital_baseinfo WHERE hospital_id = %s", - (hospital_id,), - ) + hospital = await select_hospital(hospital_id) if not hospital: raise HTTPException(status_code=409, detail="Clinic not found") - ig_id = await insert_instagram_row(hospital_id, body.channels.instagram) if body.channels.instagram else None - fb_id = await insert_facebook_row(hospital_id, body.channels.facebook) if body.channels.facebook else None - nb_id = await insert_naver_blog_row(hospital_id, body.channels.naver_blog) if body.channels.naver_blog else None - yt_id = await insert_youtube_row(hospital_id, body.channels.youtube) if body.channels.youtube else None - gu_id = await insert_gangnam_unni_row(hospital_id, body.channels.gangnam_unni) if body.channels.gangnam_unni else None + analysis_run_id = await insert_run(analysis_run_id, hospital_id, hospital["owner_user_id"]) - analysis_run_id = await insert_analysis_run( - analysis_run_id, hospital_id, hospital["owner_user_id"], - ig_id, fb_id, nb_id, yt_id, gu_id, - ) + mainpage = await select_source_mainpage(hospital_id) + if mainpage: + await insert_raw_info(mainpage["source_id"], analysis_run_id, data_tag=SourceType.MAINPAGE) + + channels = [ + (SourceType.INSTAGRAM, body.channels.instagram), + (SourceType.FACEBOOK, body.channels.facebook), + (SourceType.NAVER_BLOG, body.channels.naver_blog), + (SourceType.YOUTUBE, body.channels.youtube), + (SourceType.GANGNAM_UNNI, body.channels.gangnam_unni), + ] + for source_type, url in channels: + if url: + source_id = await insert_source(hospital_id, source_type, url) + await insert_raw_info(source_id, analysis_run_id, data_tag=source_type) background_tasks.add_task(run_pipeline, analysis_run_id) @@ -75,12 +81,12 @@ async def delete_analysis_run_file(run_id: str, file_id: int) -> None: @router.get("/{run_id}/status", response_model=AnalysisStatusResponse) async def get_analysis_status(run_id: str): logger.info("GET /api/analysis/%s/status", run_id) - row = await fetchone("SELECT status FROM analysis_runs WHERE analysis_run_id = %s", (run_id,)) - if not row: + run_status = await select_run_status(run_id) + if run_status is None: raise HTTPException(status_code=404, detail="Run not found") return AnalysisStatusResponse( analysis_run_id=run_id, - status=AnalysisStatus(row["status"]), + status=AnalysisStatus(run_status), progress=50.0, current_step="", channel_errors={}, diff --git a/app/api/clinics.py b/app/api/clinics.py index 917e7eb..272e587 100644 --- a/app/api/clinics.py +++ b/app/api/clinics.py @@ -2,7 +2,8 @@ import logging import uuid6 from fastapi import APIRouter, Depends, HTTPException, status from common.deps import verify_api_key -from common.db import insert_hospital, fetchone +from common.db.hospital import select_hospital, insert_hospital +from common.db.source import insert_source from common.utils import get_env from integrations.firecrawl import FirecrawlClient from models.clinic import ClinicCreate, ClinicCreateResponse, ClinicResponse, ClinicHistoryResponse, RunSummary @@ -30,9 +31,8 @@ async def create_clinic(body: ClinicCreate): name=info["clinicName"], name_en=info.get("clinicNameEn"), road_address=info.get("address"), - url=body.url, - raw_data=info, ) + await insert_source(hospital_id, "mainpage", body.url) return ClinicCreateResponse( id=hospital_id, url=body.url, @@ -44,11 +44,7 @@ async def create_clinic(body: ClinicCreate): @router.get("/{hospital_id}", response_model=ClinicResponse) async def get_clinic(hospital_id: str): logger.info("GET /api/clinics/%s", hospital_id) - row = await fetchone( - "SELECT hospital_id, hospital_name, hospital_name_en, road_address, url, status, raw_data, created_at, updated_at" - " FROM hospital_baseinfo WHERE hospital_id = %s", - (hospital_id,), - ) + row = await select_hospital(hospital_id) if not row: raise HTTPException(status_code=404, detail="Clinic not found") return ClinicResponse(**{**row, "created_at": str(row["created_at"]), "updated_at": str(row["updated_at"])}) diff --git a/app/api/plan.py b/app/api/plan.py index ba59702..2b3464a 100644 --- a/app/api/plan.py +++ b/app/api/plan.py @@ -1,7 +1,7 @@ import json import logging from fastapi import APIRouter, Depends, HTTPException, Response -from common.db import fetchone +from common.db.run import select_run_with_clinic from common.deps import verify_api_key from integrations.llm.schemas.plan import PlanOutput from models.plan import PlanApiResponse @@ -13,13 +13,7 @@ logger = logging.getLogger(__name__) @router.get("/{run_id}", response_model=PlanApiResponse, response_model_by_alias=True) async def get_plan(run_id: str): logger.info("GET /api/plan/%s", run_id) - row = await fetchone( - "SELECT ar.plan_data, ar.created_at, h.hospital_name, h.hospital_name_en, h.url" - " FROM analysis_runs ar" - " JOIN hospital_baseinfo h ON ar.hospital_id = h.hospital_id" - " WHERE ar.analysis_run_id = %s", - (run_id,), - ) + row = await select_run_with_clinic(run_id) if row is None: raise HTTPException(status_code=404, detail="Run not found") if row["plan_data"] is None: @@ -31,6 +25,6 @@ async def get_plan(run_id: str): clinic_name=row["hospital_name"], clinic_name_en=row["hospital_name_en"], created_at=str(row["created_at"]), - target_url=row["url"], + target_url=row["target_url"], **plan.model_dump(), ) diff --git a/app/api/report.py b/app/api/report.py index 15f93a6..bf7cb61 100644 --- a/app/api/report.py +++ b/app/api/report.py @@ -1,7 +1,7 @@ import json import logging from fastapi import APIRouter, Depends, HTTPException, Response -from common.db import fetchone +from common.db.run import select_run_with_clinic from common.deps import verify_api_key from integrations.llm.schemas.report import ReportOutput from models.report import MarketingReportResponse @@ -13,13 +13,7 @@ logger = logging.getLogger(__name__) @router.get("/{run_id}", response_model=MarketingReportResponse, response_model_by_alias=True) async def get_report(run_id: str): logger.info("GET /api/report/%s", run_id) - row = await fetchone( - "SELECT ar.report_data, ar.created_at, h.hospital_name, h.hospital_name_en, h.url" - " FROM analysis_runs ar" - " JOIN hospital_baseinfo h ON ar.hospital_id = h.hospital_id" - " WHERE ar.analysis_run_id = %s", - (run_id,), - ) + row = await select_run_with_clinic(run_id) if row is None: raise HTTPException(status_code=404, detail="Run not found") if row["report_data"] is None: @@ -31,6 +25,6 @@ async def get_report(run_id: str): clinic_name=row["hospital_name"], clinic_name_en=row["hospital_name_en"], created_at=str(row["created_at"]), - target_url=row["url"], + target_url=row["target_url"], **llm_output.model_dump(exclude={"id", "created_at", "target_url"}), ) diff --git a/app/common/db.py b/app/common/db.py deleted file mode 100644 index 9012579..0000000 --- a/app/common/db.py +++ /dev/null @@ -1,274 +0,0 @@ -import json -import os -import aiomysql -from common.utils import get_env - -_pool: aiomysql.Pool | None = None - - -async def get_pool() -> aiomysql.Pool: - global _pool - if _pool is None: - _pool = await aiomysql.create_pool( - host=get_env("MYSQL_HOST"), - port=int(os.getenv("MYSQL_PORT", "3306")), - user=get_env("MYSQL_USER"), - password=get_env("MYSQL_PASSWORD"), - db=get_env("MYSQL_DB"), - charset="utf8mb4", - minsize=0, - maxsize=30, - connect_timeout=10, - ) - return _pool - - -# 쓰기 (INSERT/UPDATE/DELETE) -async def execute(sql: str, args: tuple = ()) -> int: - pool = await get_pool() - async with pool.acquire() as conn: - try: - async with conn.cursor() as cur: - await cur.execute(sql, args) - await conn.commit() - return cur.lastrowid - finally: - conn.close() - - -# 읽기 (SELECT) -async def fetchone(sql: str, args: tuple = ()) -> dict | None: - pool = await get_pool() - async with pool.acquire() as conn: - try: - async with conn.cursor(aiomysql.DictCursor) as cur: - await cur.execute(sql, args) - return await cur.fetchone() - finally: - conn.close() - - -async def fetchall(sql: str, args: tuple = ()) -> list[dict]: - pool = await get_pool() - async with pool.acquire() as conn: - try: - async with conn.cursor(aiomysql.DictCursor) as cur: - await cur.execute(sql, args) - return await cur.fetchall() - finally: - conn.close() - -async def insert_instagram_row(hospital_id: str, url: str) -> int: - return await execute("INSERT INTO instagram_data (hospital_id, url) VALUES (%s, %s)", (hospital_id, url)) - - -async def insert_facebook_row(hospital_id: str, url: str) -> int: - return await execute("INSERT INTO facebook_data (hospital_id, url) VALUES (%s, %s)", (hospital_id, url)) - - -async def insert_naver_blog_row(hospital_id: str, url: str) -> int: - return await execute("INSERT INTO naver_blog_data (hospital_id, url) VALUES (%s, %s)", (hospital_id, url)) - - -async def insert_youtube_row(hospital_id: str, url: str) -> int: - return await execute("INSERT INTO youtube_data (hospital_id, url) VALUES (%s, %s)", (hospital_id, url)) - - -async def insert_gangnam_unni_row(hospital_id: str, url: str) -> int: - return await execute("INSERT INTO gangnam_unni_data (hospital_id, url) VALUES (%s, %s)", (hospital_id, url)) - - -async def insert_file_row( - analysis_run_id: str, - file_type: str, - file_name: str, - file_url: str, - size_bytes: int | None = None, - hospital_id: str | None = None, -) -> int: - return await execute( - "INSERT INTO file_data (analysis_run_id, hospital_id, file_type, file_name, file_url, size_bytes)" - " VALUES (%s, %s, %s, %s, %s, %s)", - (analysis_run_id, hospital_id, file_type, file_name, file_url, size_bytes), - ) - - -async def insert_analysis_run( - analysis_run_id: str, - hospital_id: str, - owner_user_id: int, - instagram_data_id: int | None, - facebook_data_id: int | None, - naver_blog_data_id: int | None, - youtube_data_id: int | None, - gangnam_unni_data_id: int | None, -) -> str: - await execute( - "INSERT INTO analysis_runs" - " (analysis_run_id, hospital_id, owner_user_id, instagram_data_id, facebook_data_id, naver_blog_data_id, youtube_data_id, gangnam_unni_data_id)" - " VALUES (%s, %s, %s, %s, %s, %s, %s, %s)", - (analysis_run_id, hospital_id, owner_user_id, instagram_data_id, facebook_data_id, naver_blog_data_id, youtube_data_id, gangnam_unni_data_id), - ) - return analysis_run_id - - - -async def save_analysis_report(analysis_run_id: str, data: dict) -> None: - await execute( - "UPDATE analysis_runs SET report_data = %s WHERE analysis_run_id = %s", - (json.dumps(data, ensure_ascii=False), analysis_run_id), - ) - - -async def is_done(table: str, row_id: int | None) -> bool: - if row_id is None: - return True - r = await fetchone(f"SELECT status FROM {table} WHERE id = %s", (row_id,)) - return r["status"] == "done" - - -async def fetch_raw(table: str, row_id: int | None) -> dict | None: - if row_id is None: - return None - row = await fetchone(f"SELECT raw_data FROM {table} WHERE id = %s", (row_id,)) - if not row or not row["raw_data"]: - return None - return json.loads(row["raw_data"]) if isinstance(row["raw_data"], str) else row["raw_data"] - - -async def get_analysis_raw_data(analysis_run_id: str) -> dict: - run = await fetchone( - "SELECT instagram_data_id, facebook_data_id, naver_blog_data_id, youtube_data_id, gangnam_unni_data_id" - " FROM analysis_runs WHERE analysis_run_id = %s", - (analysis_run_id,), - ) - return { - "instagram": await fetch_raw("instagram_data", run["instagram_data_id"]), - "facebook": await fetch_raw("facebook_data", run["facebook_data_id"]), - "naver_blog": await fetch_raw("naver_blog_data", run["naver_blog_data_id"]), - "youtube": await fetch_raw("youtube_data", run["youtube_data_id"]), - "gangnam_unni": await fetch_raw("gangnam_unni_data", run["gangnam_unni_data_id"]), - } - - -async def set_instagram_status(row_id: int, status: str) -> None: - await execute("UPDATE instagram_data SET status = %s WHERE id = %s", (status, row_id)) - - -async def set_facebook_status(row_id: int, status: str) -> None: - await execute("UPDATE facebook_data SET status = %s WHERE id = %s", (status, row_id)) - - -async def set_naver_blog_status(row_id: int, status: str) -> None: - await execute("UPDATE naver_blog_data SET status = %s WHERE id = %s", (status, row_id)) - - -async def set_youtube_status(row_id: int, status: str) -> None: - await execute("UPDATE youtube_data SET status = %s WHERE id = %s", (status, row_id)) - - -async def set_gangnam_unni_status(row_id: int, status: str) -> None: - await execute("UPDATE gangnam_unni_data SET status = %s WHERE id = %s", (status, row_id)) - - -async def save_instagram_raw_data(row_id: int, data: dict) -> None: - await execute("UPDATE instagram_data SET raw_data = %s, status = 'done' WHERE id = %s", (json.dumps(data, ensure_ascii=False), row_id)) - - -async def save_facebook_raw_data(row_id: int, data: dict) -> None: - await execute("UPDATE facebook_data SET raw_data = %s, status = 'done' WHERE id = %s", (json.dumps(data, ensure_ascii=False), row_id)) - - -async def save_naver_blog_raw_data(row_id: int, data: dict) -> None: - await execute("UPDATE naver_blog_data SET raw_data = %s, status = 'done' WHERE id = %s", (json.dumps(data, ensure_ascii=False), row_id)) - - -async def save_youtube_raw_data(row_id: int, data: dict) -> None: - await execute("UPDATE youtube_data SET raw_data = %s, status = 'done' WHERE id = %s", (json.dumps(data, ensure_ascii=False), row_id)) - - -async def save_gangnam_unni_raw_data(row_id: int, data: dict) -> None: - await execute("UPDATE gangnam_unni_data SET raw_data = %s, status = 'done' WHERE id = %s", (json.dumps(data, ensure_ascii=False), row_id)) - - -async def _insert_hospital_history(hospital_id: str, analysis_run_id: str | None) -> None: - row = await fetchone( - "SELECT owner_user_id, hospital_name, hospital_name_en, brn, road_address, site_address, url, status, raw_data" - " FROM hospital_baseinfo WHERE hospital_id = %s", - (hospital_id,), - ) - if not row: - return - await execute( - "INSERT INTO hospital_history" - " (hospital_id, owner_user_id, hospital_name, hospital_name_en, brn, road_address, site_address, url, status, raw_data, analysis_run_id)" - " VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)", - ( - hospital_id, - row["owner_user_id"], - row["hospital_name"], - row["hospital_name_en"], - row["brn"], - row["road_address"], - row["site_address"], - row["url"], - row["status"], - row["raw_data"] if isinstance(row["raw_data"], str) else json.dumps(row["raw_data"], ensure_ascii=False) if row["raw_data"] else None, - analysis_run_id, - ), - ) - - -async def insert_hospital( - hospital_id: str, - name: str, - name_en: str | None = None, - road_address: str | None = None, - site_address: str | None = None, - url: str | None = None, - raw_data: dict | None = None, - owner_user_id: int = 0, - brn: str = "", -) -> dict: - await execute( - "INSERT INTO hospital_baseinfo (hospital_id, hospital_name, hospital_name_en, road_address, site_address, url, raw_data, status, owner_user_id, brn)" - " VALUES (%s, %s, %s, %s, %s, %s, %s, 'done', %s, %s)", - (hospital_id, name, name_en, road_address, site_address, url, - json.dumps(raw_data, ensure_ascii=False) if raw_data else None, - owner_user_id, brn), - ) - await _insert_hospital_history(hospital_id, analysis_run_id=None) - return await fetchone( - "SELECT created_at FROM hospital_baseinfo WHERE hospital_id = %s", - (hospital_id,), - ) - - -async def save_hospital_raw_data(hospital_id: str, data: dict, analysis_run_id: str | None = None) -> None: - await execute( - "UPDATE hospital_baseinfo" - " SET raw_data = %s, status = 'done'," - " hospital_name = COALESCE(%s, hospital_name)," - " hospital_name_en = COALESCE(%s, hospital_name_en)," - " road_address = COALESCE(%s, road_address)" - " WHERE hospital_id = %s", - ( - json.dumps(data, ensure_ascii=False), - data.get("clinicName"), - data.get("clinicNameEn"), - data.get("address"), - hospital_id, - ), - ) - await _insert_hospital_history(hospital_id, analysis_run_id) - - -async def get_market_analysis(analysis_run_id: str) -> dict: - rows = await fetchall( - "SELECT analysis_type, data FROM market_analysis WHERE analysis_run_id = %s AND status = 'done'", - (analysis_run_id,), - ) - return { - row["analysis_type"]: json.loads(row["data"]) if isinstance(row["data"], str) else row["data"] - for row in rows - } diff --git a/app/common/db/__init__.py b/app/common/db/__init__.py new file mode 100644 index 0000000..e7be4bd --- /dev/null +++ b/app/common/db/__init__.py @@ -0,0 +1,14 @@ +from common.db.base import execute, fetchone, fetchall +from common.db.hospital import select_hospital, update_hospital_status, insert_hospital, update_hospital +from common.db.source import ( + insert_source, select_source_mainpage, + insert_raw_info, update_raw_info_status, update_raw_info, + select_raw_info_data, + select_run_sources, select_run_raw_data, select_run_mainpage_url, +) +from common.db.run import ( + insert_run, select_run, select_run_status, update_run_status, + update_run_report, update_run_plan, select_run_with_clinic, +) +from common.db.market import upsert_market_status, upsert_market_result, select_market +from common.db.file_data import insert_file, select_run_files, select_file, delete_file diff --git a/app/common/db/base.py b/app/common/db/base.py new file mode 100644 index 0000000..c047ff6 --- /dev/null +++ b/app/common/db/base.py @@ -0,0 +1,56 @@ +import os +import aiomysql +from common.utils import get_env + +_pool: aiomysql.Pool | None = None + + +async def get_pool() -> aiomysql.Pool: + global _pool + if _pool is None: + _pool = await aiomysql.create_pool( + host=get_env("MYSQL_HOST"), + port=int(os.getenv("MYSQL_PORT", "3306")), + user=get_env("MYSQL_USER"), + password=get_env("MYSQL_PASSWORD"), + db=get_env("MYSQL_DB"), + charset="utf8mb4", + minsize=0, + maxsize=30, + connect_timeout=10, + ) + return _pool + + +async def execute(sql: str, args: tuple = ()) -> int: + pool = await get_pool() + async with pool.acquire() as conn: + try: + async with conn.cursor() as cur: + await cur.execute(sql, args) + await conn.commit() + return cur.lastrowid + finally: + conn.close() + + +async def fetchone(sql: str, args: tuple = ()) -> dict | None: + pool = await get_pool() + async with pool.acquire() as conn: + try: + async with conn.cursor(aiomysql.DictCursor) as cur: + await cur.execute(sql, args) + return await cur.fetchone() + finally: + conn.close() + + +async def fetchall(sql: str, args: tuple = ()) -> list[dict]: + pool = await get_pool() + async with pool.acquire() as conn: + try: + async with conn.cursor(aiomysql.DictCursor) as cur: + await cur.execute(sql, args) + return await cur.fetchall() + finally: + conn.close() diff --git a/app/common/db/file_data.py b/app/common/db/file_data.py new file mode 100644 index 0000000..ac3e9bc --- /dev/null +++ b/app/common/db/file_data.py @@ -0,0 +1,39 @@ +from common.db.base import execute, fetchone, fetchall + + +async def insert_file( + analysis_run_id: str, + file_type: str, + file_name: str, + file_url: str, + size_bytes: int | None = None, + hospital_id: str | None = None, +) -> int: + return await execute( + "INSERT INTO file_data (analysis_run_id, hospital_id, file_type, file_name, file_url, size_bytes)" + " VALUES (%s, %s, %s, %s, %s, %s)", + (analysis_run_id, hospital_id, file_type, file_name, file_url, size_bytes), + ) + + +async def select_run_files(analysis_run_id: str) -> list[dict]: + return await fetchall( + "SELECT id, file_type, file_name, file_url, size_bytes, created_at" + " FROM file_data WHERE analysis_run_id = %s AND is_deleted = FALSE" + " ORDER BY created_at DESC", + (analysis_run_id,), + ) + + +async def select_file(file_id: int, analysis_run_id: str) -> dict | None: + return await fetchone( + "SELECT id FROM file_data WHERE id = %s AND analysis_run_id = %s", + (file_id, analysis_run_id), + ) + + +async def delete_file(file_id: int) -> None: + await execute( + "UPDATE file_data SET is_deleted = TRUE WHERE id = %s AND is_deleted = FALSE", + (file_id,), + ) diff --git a/app/common/db/hospital.py b/app/common/db/hospital.py new file mode 100644 index 0000000..6e4cd88 --- /dev/null +++ b/app/common/db/hospital.py @@ -0,0 +1,78 @@ +from common.db.base import execute, fetchone + + +async def select_hospital(hospital_id: str) -> dict | None: + return await fetchone( + "SELECT hospital_id, owner_user_id, hospital_name, hospital_name_en," + " brn, road_address, site_address, status, created_at, updated_at" + " FROM hospital_baseinfo WHERE hospital_id = %s", + (hospital_id,), + ) + + +async def update_hospital_status(hospital_id: str, status: str) -> None: + await execute( + "UPDATE hospital_baseinfo SET status = %s WHERE hospital_id = %s", + (status, hospital_id), + ) + + +async def _insert_hospital_history(hospital_id: str, analysis_run_id: str | None) -> None: + row = await fetchone( + "SELECT owner_user_id, hospital_name, hospital_name_en, brn, road_address, site_address, status" + " FROM hospital_baseinfo WHERE hospital_id = %s", + (hospital_id,), + ) + if not row: + return + await execute( + "INSERT INTO hospital_history" + " (hospital_id, owner_user_id, hospital_name, hospital_name_en, brn, road_address, site_address, status, analysis_run_id)" + " VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)", + ( + hospital_id, + row["owner_user_id"], + row["hospital_name"], + row["hospital_name_en"], + row["brn"], + row["road_address"], + row["site_address"], + row["status"], + analysis_run_id, + ), + ) + + +async def insert_hospital( + hospital_id: str, + name: str, + name_en: str | None = None, + road_address: str | None = None, + site_address: str | None = None, + owner_user_id: int = 0, + brn: str = "", +) -> dict: + await execute( + "INSERT INTO hospital_baseinfo" + " (hospital_id, hospital_name, hospital_name_en, road_address, site_address, status, owner_user_id, brn)" + " VALUES (%s, %s, %s, %s, %s, 'done', %s, %s)", + (hospital_id, name, name_en, road_address, site_address, owner_user_id, brn), + ) + await _insert_hospital_history(hospital_id, analysis_run_id=None) + return await fetchone( + "SELECT created_at FROM hospital_baseinfo WHERE hospital_id = %s", + (hospital_id,), + ) + + +async def update_hospital(hospital_id: str, data: dict, analysis_run_id: str | None = None) -> None: + await execute( + "UPDATE hospital_baseinfo" + " SET status = 'done'," + " hospital_name = COALESCE(%s, hospital_name)," + " hospital_name_en = COALESCE(%s, hospital_name_en)," + " road_address = COALESCE(%s, road_address)" + " WHERE hospital_id = %s", + (data.get("clinicName"), data.get("clinicNameEn"), data.get("address"), hospital_id), + ) + await _insert_hospital_history(hospital_id, analysis_run_id) diff --git a/app/common/db/market.py b/app/common/db/market.py new file mode 100644 index 0000000..ca4093b --- /dev/null +++ b/app/common/db/market.py @@ -0,0 +1,31 @@ +import json +from common.db.base import execute, fetchall + + +async def upsert_market_status(analysis_run_id: str, analysis_type: str, status: str) -> None: + await execute( + "INSERT INTO market_analysis (analysis_run_id, analysis_type, status)" + " VALUES (%s, %s, %s)" + " ON DUPLICATE KEY UPDATE status = VALUES(status)", + (analysis_run_id, analysis_type, status), + ) + + +async def upsert_market_result(analysis_run_id: str, analysis_type: str, data: dict) -> None: + await execute( + "INSERT INTO market_analysis (analysis_run_id, analysis_type, status, data)" + " VALUES (%s, %s, 'done', %s)" + " ON DUPLICATE KEY UPDATE status = 'done', data = VALUES(data)", + (analysis_run_id, analysis_type, json.dumps(data, ensure_ascii=False)), + ) + + +async def select_market(analysis_run_id: str) -> dict: + rows = await fetchall( + "SELECT analysis_type, data FROM market_analysis WHERE analysis_run_id = %s AND status = 'done'", + (analysis_run_id,), + ) + return { + row["analysis_type"]: json.loads(row["data"]) if isinstance(row["data"], str) else row["data"] + for row in rows + } diff --git a/app/common/db/run.py b/app/common/db/run.py new file mode 100644 index 0000000..cbf188c --- /dev/null +++ b/app/common/db/run.py @@ -0,0 +1,64 @@ +import json +from common.db.base import execute, fetchone + + +async def insert_run( + analysis_run_id: str, + hospital_id: str, + owner_user_id: int, +) -> str: + await execute( + "INSERT INTO analysis_runs (analysis_run_id, hospital_id, owner_user_id) VALUES (%s, %s, %s)", + (analysis_run_id, hospital_id, owner_user_id), + ) + return analysis_run_id + + +async def select_run(analysis_run_id: str) -> dict | None: + return await fetchone( + "SELECT analysis_run_id, hospital_id, owner_user_id, status, created_at, updated_at" + " FROM analysis_runs WHERE analysis_run_id = %s", + (analysis_run_id,), + ) + + +async def select_run_status(analysis_run_id: str) -> str | None: + row = await fetchone( + "SELECT status FROM analysis_runs WHERE analysis_run_id = %s", + (analysis_run_id,), + ) + return row["status"] if row else None + + +async def update_run_status(analysis_run_id: str, status: str) -> None: + await execute( + "UPDATE analysis_runs SET status = %s WHERE analysis_run_id = %s", + (status, analysis_run_id), + ) + + +async def update_run_report(analysis_run_id: str, data: dict) -> None: + await execute( + "UPDATE analysis_runs SET report_data = %s WHERE analysis_run_id = %s", + (json.dumps(data, ensure_ascii=False), analysis_run_id), + ) + + +async def update_run_plan(analysis_run_id: str, data: dict) -> None: + await execute( + "UPDATE analysis_runs SET plan_data = %s WHERE analysis_run_id = %s", + (json.dumps(data, ensure_ascii=False), analysis_run_id), + ) + + +async def select_run_with_clinic(analysis_run_id: str) -> dict | None: + return await fetchone( + "SELECT ar.report_data, ar.plan_data, ar.created_at," + " h.hospital_name, h.hospital_name_en," + " rs.url AS target_url" + " FROM analysis_runs ar" + " JOIN hospital_baseinfo h ON ar.hospital_id = h.hospital_id" + " LEFT JOIN remote_source rs ON rs.hospital_id = h.hospital_id AND rs.source_type = 'mainpage'" + " WHERE ar.analysis_run_id = %s", + (analysis_run_id,), + ) diff --git a/app/common/db/source.py b/app/common/db/source.py new file mode 100644 index 0000000..0f8c5c4 --- /dev/null +++ b/app/common/db/source.py @@ -0,0 +1,85 @@ +import json +from common.db.base import execute, fetchone, fetchall +from models.status import SourceType + + +async def insert_source( + hospital_id: str, + source_type: SourceType, + url: str, + language: str | None = None, +) -> int: + return await execute( + "INSERT INTO remote_source (hospital_id, source_type, language, url) VALUES (%s, %s, %s, %s)", + (hospital_id, source_type, language, url), + ) + + +async def select_source_mainpage(hospital_id: str) -> dict | None: + return await fetchone( + "SELECT source_id FROM remote_source WHERE hospital_id = %s AND source_type = 'mainpage'", + (hospital_id,), + ) + + +async def insert_raw_info( + source_id: int, + analysis_run_id: str, + data_tag: SourceType, +) -> int: + return await execute( + "INSERT INTO raw_info (source_id, analysis_run_id, data_tag) VALUES (%s, %s, %s)", + (source_id, analysis_run_id, data_tag), + ) + + +async def update_raw_info_status(info_id: int, status: str) -> None: + await execute("UPDATE raw_info SET status = %s WHERE info_id = %s", (status, info_id)) + + +async def update_raw_info(info_id: int, data: dict) -> None: + await execute( + "UPDATE raw_info SET raw_data = %s, status = 'done' WHERE info_id = %s", + (json.dumps(data, ensure_ascii=False), info_id), + ) + + +async def select_raw_info_data(info_id: int | None) -> dict | None: + if info_id is None: + return None + row = await fetchone("SELECT raw_data FROM raw_info WHERE info_id = %s", (info_id,)) + if not row or not row["raw_data"]: + return None + return json.loads(row["raw_data"]) if isinstance(row["raw_data"], str) else row["raw_data"] + + +async def select_run_sources(analysis_run_id: str) -> list[dict]: + return await fetchall( + "SELECT ri.info_id, rs.source_type, rs.url" + " FROM raw_info ri JOIN remote_source rs USING (source_id)" + " WHERE ri.analysis_run_id = %s", + (analysis_run_id,), + ) + + +async def select_run_raw_data(analysis_run_id: str) -> dict: + rows = await fetchall( + "SELECT rs.source_type, ri.raw_data" + " FROM raw_info ri JOIN remote_source rs USING (source_id)" + " WHERE ri.analysis_run_id = %s", + (analysis_run_id,), + ) + result: dict = {} + for row in rows: + raw = row["raw_data"] + result[row["source_type"]] = json.loads(raw) if isinstance(raw, str) else raw + return result + + +async def select_run_mainpage_url(analysis_run_id: str) -> str: + row = await fetchone( + "SELECT rs.url FROM raw_info ri JOIN remote_source rs USING (source_id)" + " WHERE ri.analysis_run_id = %s AND rs.source_type = 'mainpage'", + (analysis_run_id,), + ) + return (row or {}).get("url") or "" diff --git a/app/common/db2.py b/app/common/db2.py deleted file mode 100644 index 9a317a0..0000000 --- a/app/common/db2.py +++ /dev/null @@ -1,236 +0,0 @@ -import json -import os -import aiomysql -from common.utils import get_env - -_pool: aiomysql.Pool | None = None - - -async def get_pool() -> aiomysql.Pool: - global _pool - if _pool is None: - _pool = await aiomysql.create_pool( - host=get_env("MYSQL_HOST"), - port=int(os.getenv("MYSQL_PORT", "3306")), - user=get_env("MYSQL_USER"), - password=get_env("MYSQL_PASSWORD"), - db=get_env("MYSQL_DB"), - charset="utf8mb4", - minsize=0, - maxsize=30, - connect_timeout=10, - ) - return _pool - - -async def execute(sql: str, args: tuple = ()) -> int: - pool = await get_pool() - async with pool.acquire() as conn: - try: - async with conn.cursor() as cur: - await cur.execute(sql, args) - await conn.commit() - return cur.lastrowid - finally: - conn.close() - - -async def fetchone(sql: str, args: tuple = ()) -> dict | None: - pool = await get_pool() - async with pool.acquire() as conn: - try: - async with conn.cursor(aiomysql.DictCursor) as cur: - await cur.execute(sql, args) - return await cur.fetchone() - finally: - conn.close() - - -async def fetchall(sql: str, args: tuple = ()) -> list[dict]: - pool = await get_pool() - async with pool.acquire() as conn: - try: - async with conn.cursor(aiomysql.DictCursor) as cur: - await cur.execute(sql, args) - return await cur.fetchall() - finally: - conn.close() - - -# ── remote_source ───────────────────────────────────────────────────────────── - -async def insert_source(hospital_id: str, source_type: str, url: str) -> int: - return await execute( - "INSERT INTO remote_source (hospital_id, source_type, url) VALUES (%s, %s, %s)", - (hospital_id, source_type, url), - ) - - -# ── raw_info ────────────────────────────────────────────────────────────────── - -async def insert_raw_info(source_id: int, analysis_run_id: str, data_tag: str = "default") -> int: - return await execute( - "INSERT INTO raw_info (source_id, analysis_run_id, data_tag) VALUES (%s, %s, %s)", - (source_id, analysis_run_id, data_tag), - ) - - -async def set_raw_info_status(info_id: int, status: str) -> None: - await execute("UPDATE raw_info SET status = %s WHERE info_id = %s", (status, info_id)) - - -async def save_raw_info(info_id: int, data: dict) -> None: - await execute( - "UPDATE raw_info SET raw_data = %s, status = 'done' WHERE info_id = %s", - (json.dumps(data, ensure_ascii=False), info_id), - ) - - -async def fetch_raw(info_id: int | None) -> dict | None: - if info_id is None: - return None - row = await fetchone("SELECT raw_data FROM raw_info WHERE info_id = %s", (info_id,)) - if not row or not row["raw_data"]: - return None - return json.loads(row["raw_data"]) if isinstance(row["raw_data"], str) else row["raw_data"] - - -async def is_done(info_id: int | None) -> bool: - if info_id is None: - return True - r = await fetchone("SELECT status FROM raw_info WHERE info_id = %s", (info_id,)) - return r["status"] == "done" - - -async def get_analysis_raw_data(analysis_run_id: str) -> dict: - rows = await fetchall( - "SELECT rs.source_type, ri.raw_data" - " FROM raw_info ri JOIN remote_source rs USING (source_id)" - " WHERE ri.analysis_run_id = %s", - (analysis_run_id,), - ) - result: dict = {} - for row in rows: - raw = row["raw_data"] - result[row["source_type"]] = json.loads(raw) if isinstance(raw, str) else raw - return result - - -# ── analysis_runs ───────────────────────────────────────────────────────────── - -async def insert_analysis_run( - analysis_run_id: str, - hospital_id: str, - owner_user_id: int, -) -> str: - await execute( - "INSERT INTO analysis_runs (analysis_run_id, hospital_id, owner_user_id)" - " VALUES (%s, %s, %s)", - (analysis_run_id, hospital_id, owner_user_id), - ) - return analysis_run_id - - -async def save_analysis_report(analysis_run_id: str, data: dict) -> None: - await execute( - "UPDATE analysis_runs SET report_data = %s WHERE analysis_run_id = %s", - (json.dumps(data, ensure_ascii=False), analysis_run_id), - ) - - -# ── hospital_baseinfo ───────────────────────────────────────────────────────── - -async def _insert_hospital_history(hospital_id: str, analysis_run_id: str | None) -> None: - row = await fetchone( - "SELECT owner_user_id, hospital_name, hospital_name_en, brn, road_address, site_address, status" - " FROM hospital_baseinfo WHERE hospital_id = %s", - (hospital_id,), - ) - if not row: - return - await execute( - "INSERT INTO hospital_history" - " (hospital_id, owner_user_id, hospital_name, hospital_name_en, brn, road_address, site_address, status, analysis_run_id)" - " VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)", - ( - hospital_id, - row["owner_user_id"], - row["hospital_name"], - row["hospital_name_en"], - row["brn"], - row["road_address"], - row["site_address"], - row["status"], - analysis_run_id, - ), - ) - - -async def insert_hospital( - hospital_id: str, - name: str, - name_en: str | None = None, - road_address: str | None = None, - site_address: str | None = None, - owner_user_id: int = 0, - brn: str = "", -) -> dict: - await execute( - "INSERT INTO hospital_baseinfo (hospital_id, hospital_name, hospital_name_en, road_address, site_address, status, owner_user_id, brn)" - " VALUES (%s, %s, %s, %s, %s, 'done', %s, %s)", - (hospital_id, name, name_en, road_address, site_address, owner_user_id, brn), - ) - await _insert_hospital_history(hospital_id, analysis_run_id=None) - return await fetchone( - "SELECT created_at FROM hospital_baseinfo WHERE hospital_id = %s", - (hospital_id,), - ) - - -async def update_hospital_info(hospital_id: str, data: dict, analysis_run_id: str | None = None) -> None: - """clinic 스크래핑 후 hospital_baseinfo의 기본 필드 업데이트.""" - await execute( - "UPDATE hospital_baseinfo" - " SET status = 'done'," - " hospital_name = COALESCE(%s, hospital_name)," - " hospital_name_en = COALESCE(%s, hospital_name_en)," - " road_address = COALESCE(%s, road_address)" - " WHERE hospital_id = %s", - ( - data.get("clinicName"), - data.get("clinicNameEn"), - data.get("address"), - hospital_id, - ), - ) - await _insert_hospital_history(hospital_id, analysis_run_id) - - -# ── file_data ───────────────────────────────────────────────────────────────── - -async def insert_file_row( - analysis_run_id: str, - file_type: str, - file_name: str, - file_url: str, - size_bytes: int | None = None, - hospital_id: str | None = None, -) -> int: - return await execute( - "INSERT INTO file_data (analysis_run_id, hospital_id, file_type, file_name, file_url, size_bytes)" - " VALUES (%s, %s, %s, %s, %s, %s)", - (analysis_run_id, hospital_id, file_type, file_name, file_url, size_bytes), - ) - - -# ── market_analysis ─────────────────────────────────────────────────────────── - -async def get_market_analysis(analysis_run_id: str) -> dict: - rows = await fetchall( - "SELECT analysis_type, data FROM market_analysis WHERE analysis_run_id = %s AND status = 'done'", - (analysis_run_id,), - ) - return { - row["analysis_type"]: json.loads(row["data"]) if isinstance(row["data"], str) else row["data"] - for row in rows - } diff --git a/app/integrations/apify.py b/app/integrations/apify.py index 9914fa6..4028a50 100644 --- a/app/integrations/apify.py +++ b/app/integrations/apify.py @@ -35,6 +35,7 @@ class ApifyClient: async def fetch_instagram_profile(self, url: str) -> dict | None: username = urlparse(url).path.strip("/").split("/")[0] if "://" in url else url.lstrip("@") + print(username) items = await self._run_actor("apify~instagram-profile-scraper", {"usernames": [username], "resultsLimit": 12}) return items[0] if items else None diff --git a/app/models/clinic.py b/app/models/clinic.py index 473b4a2..9191a93 100644 --- a/app/models/clinic.py +++ b/app/models/clinic.py @@ -10,9 +10,7 @@ class ClinicResponse(BaseModel): hospital_name: str hospital_name_en: str | None road_address: str | None - url: str | None status: str - raw_data: dict | None created_at: str updated_at: str diff --git a/app/models/status.py b/app/models/status.py index 34bac8b..95f4afd 100644 --- a/app/models/status.py +++ b/app/models/status.py @@ -36,9 +36,22 @@ class DataSource(StrEnum): SCRAPE = "scrape" +class SourceType(StrEnum): + MAINPAGE = "mainpage" + INSTAGRAM = "instagram" + FACEBOOK = "facebook" + NAVER_BLOG = "naver_blog" + YOUTUBE = "youtube" + TIKTOK = "tiktok" + GANGNAM_UNNI = "gangnam_unni" + KAKAOTALK = "kakaotalk" + NAVER_CAFE = "naver_cafe" + + class Language(StrEnum): KR = "KR" EN = "EN" + WW = "WW" class VideoType(StrEnum): diff --git a/app/services/analysis.py b/app/services/analysis.py index 63fc23e..7fef7ba 100644 --- a/app/services/analysis.py +++ b/app/services/analysis.py @@ -3,7 +3,10 @@ import logging import os import re from datetime import datetime -from common.db import fetchone, execute, fetch_raw, get_analysis_raw_data, save_analysis_report, get_market_analysis +from urllib.parse import urlparse +from common.db.run import select_run, update_run_report, update_run_plan +from common.db.source import select_run_raw_data, select_run_mainpage_url +from common.db.market import select_market from integrations.llm.llm_service import LLMService from integrations.llm.prompt import report_prompt, plan_prompt, youtube_diagnosis_prompt from integrations.llm.schemas.report import ReportOutput, ClinicSnapshot, YouTubeAudit @@ -14,18 +17,9 @@ logger = logging.getLogger(__name__) async def generate_report(analysis_run_id: str) -> ReportOutput: - run = await fetchone( - "SELECT hospital_id FROM analysis_runs WHERE analysis_run_id = %s", - (analysis_run_id,), - ) - clinic_row = await fetchone( - "SELECT raw_data FROM hospital_baseinfo WHERE hospital_id = %s", - (run["hospital_id"],), - ) - raw_data = clinic_row["raw_data"] if clinic_row else None - clinic = json.loads(raw_data) if isinstance(raw_data, str) else (raw_data or {}) - raw = await get_analysis_raw_data(analysis_run_id) - market = await get_market_analysis(analysis_run_id) + raw = await select_run_raw_data(analysis_run_id) + clinic = raw.get("mainpage") or {} + market = await select_market(analysis_run_id) def _json(v) -> str | None: return json.dumps(v, ensure_ascii=False) if v else None @@ -43,27 +37,21 @@ async def generate_report(analysis_run_id: str) -> ReportOutput: "market_trend": _json(market.get("trend")), "market_target_audience": _json(market.get("target_audience")), **{ - channel: _json(data) - for channel, data in raw.items() + source_type: _json(data) + for source_type, data in raw.items() + if source_type != "mainpage" }, } return await LLMService(provider="perplexity").generate(report_prompt, input_data) async def generate_plan(analysis_run_id: str) -> PlanOutput: - run = await fetchone( - "SELECT hospital_id, report_data FROM analysis_runs WHERE analysis_run_id = %s", - (analysis_run_id,), - ) - clinic_row = await fetchone( - "SELECT raw_data FROM hospital_baseinfo WHERE hospital_id = %s", - (run["hospital_id"],), - ) - raw_data = clinic_row["raw_data"] if clinic_row else None - clinic = json.loads(raw_data) if isinstance(raw_data, str) else (raw_data or {}) + run = await select_run(analysis_run_id) + raw = await select_run_raw_data(analysis_run_id) + clinic = raw.get("mainpage") or {} report_data = run["report_data"] report = json.loads(report_data) if isinstance(report_data, str) else report_data - market = await get_market_analysis(analysis_run_id) + market = await select_market(analysis_run_id) def _json(v) -> str | None: return json.dumps(v, ensure_ascii=False) if v else None @@ -93,7 +81,8 @@ def _build_clinic_snapshot(gangnam_unni: dict, hospital: dict) -> dict: if gangnam_unni.get("name"): snapshot["name"] = gangnam_unni["name"] if hospital.get("clinicNameEn"): snapshot["name_en"] = hospital["clinicNameEn"] if hospital.get("phone"): snapshot["phone"] = hospital["phone"] - if hospital.get("domain"): snapshot["domain"] = hospital["domain"] + domain = hospital.get("domain") or urlparse(hospital.get("sourceUrl") or "").netloc + if domain: snapshot["domain"] = domain if gangnam_unni.get("rating"): snapshot["overall_rating"] = gangnam_unni["rating"] if gangnam_unni.get("totalReviews"): snapshot["total_reviews"] = gangnam_unni["totalReviews"] if gangnam_unni.get("address"): snapshot["location"] = gangnam_unni["address"] @@ -221,28 +210,17 @@ async def _build_youtube_audit(youtube: dict) -> dict: async def _build_overrides(analysis_run_id: str) -> dict: - run = await fetchone( - "SELECT hospital_id, instagram_data_id, facebook_data_id," - " naver_blog_data_id, youtube_data_id, gangnam_unni_data_id" - " FROM analysis_runs WHERE analysis_run_id = %s", - (analysis_run_id,), - ) - if not run: + raw = await select_run_raw_data(analysis_run_id) + if not raw: return {} - hospital_row = await fetchone( - "SELECT raw_data, url FROM hospital_baseinfo WHERE hospital_id = %s", - (run["hospital_id"],), - ) - hospital = json.loads(hospital_row["raw_data"]) if hospital_row and isinstance(hospital_row.get("raw_data"), str) else (hospital_row or {}).get("raw_data") or {} - hospital["domain"] = (hospital_row or {}).get("url") or "" - instagram = await fetch_raw("instagram_data", run["instagram_data_id"]) or {} - facebook = await fetch_raw("facebook_data", run["facebook_data_id"]) or {} - naver_blog = await fetch_raw("naver_blog_data", run["naver_blog_data_id"]) or {} - youtube = await fetch_raw("youtube_data", run["youtube_data_id"]) or {} - gangnam_unni = await fetch_raw("gangnam_unni_data", run["gangnam_unni_data_id"]) or {} + mainpage = raw.get("mainpage", {}) or {} + instagram = raw.get("instagram", {}) or {} + facebook = raw.get("facebook", {}) or {} + youtube = raw.get("youtube", {}) or {} + gangnam_unni = raw.get("gangnam_unni", {}) or {} - snapshot: dict = _build_clinic_snapshot(gangnam_unni, hospital) + snapshot: dict = _build_clinic_snapshot(gangnam_unni, mainpage) yt_patch: dict = await _build_youtube_audit(youtube) # ── instagram ───────────────────────────────────────────────────────────── @@ -299,12 +277,7 @@ _MOCK_REPORT_PATH = os.path.join(os.path.dirname(__file__), "../mock/report_view async def _is_mock(analysis_run_id: str) -> bool: - row = await fetchone( - "SELECT h.url FROM analysis_runs ar JOIN hospital_baseinfo h USING (hospital_id)" - " WHERE ar.analysis_run_id = %s", - (analysis_run_id,), - ) - url = (row or {}).get("url") or "" + url = await select_run_mainpage_url(analysis_run_id) return any(domain in url for domain in _MOCK_DOMAINS) @@ -330,7 +303,7 @@ async def run_report_task(analysis_run_id: str) -> None: else: result = await generate_report(analysis_run_id) result = _patch_report(result, await _build_overrides(analysis_run_id)) - await save_analysis_report(analysis_run_id, result.model_dump()) + await update_run_report(analysis_run_id, result.model_dump()) logger.info("[report] done run=%s", analysis_run_id) @@ -341,8 +314,5 @@ async def run_plan_task(analysis_run_id: str) -> None: result = _load_mock_plan() else: result = await generate_plan(analysis_run_id) - await execute( - "UPDATE analysis_runs SET plan_data = %s WHERE analysis_run_id = %s", - (json.dumps(result.model_dump(), ensure_ascii=False), analysis_run_id), - ) + await update_run_plan(analysis_run_id, result.model_dump()) logger.info("[plan] done run=%s", analysis_run_id) diff --git a/app/services/collect.py b/app/services/collect.py index 2752c12..d0f9ab6 100644 --- a/app/services/collect.py +++ b/app/services/collect.py @@ -1,96 +1,110 @@ import asyncio import logging -from common.db import fetchone -from common.db import ( - set_instagram_status, save_instagram_raw_data, - set_facebook_status, save_facebook_raw_data, - set_naver_blog_status, save_naver_blog_raw_data, - set_youtube_status, save_youtube_raw_data, - set_gangnam_unni_status, save_gangnam_unni_raw_data, - execute, save_hospital_raw_data, -) +from common.db.hospital import update_hospital_status, update_hospital +from common.db.source import select_run_sources, update_raw_info_status, update_raw_info from common.utils import get_env from integrations.apify import ApifyClient from integrations.naver import NaverClient from integrations.youtube import YouTubeClient from integrations.firecrawl import FirecrawlClient +from models.status import SourceType logger = logging.getLogger(__name__) -async def collect_instagram(analysis_run_id: str, row_id: int, url: str) -> None: +async def collect_instagram(analysis_run_id: str, info_id: int, url: str) -> None: logger.info("[instagram] start run=%s url=%s", analysis_run_id, url) - await set_instagram_status(row_id, "processing") + await update_raw_info_status(info_id, "processing") data = await ApifyClient(get_env("APIFY_API_TOKEN")).get_instagram_profile(url) - await save_instagram_raw_data(row_id, data) + if data is None: + await update_raw_info_status(info_id, "failed") + logger.warning("[instagram] failed run=%s", analysis_run_id) + return + await update_raw_info(info_id, data) logger.info("[instagram] done run=%s", analysis_run_id) -async def collect_facebook(analysis_run_id: str, row_id: int, url: str) -> None: +async def collect_facebook(analysis_run_id: str, info_id: int, url: str) -> None: logger.info("[facebook] start run=%s url=%s", analysis_run_id, url) - await set_facebook_status(row_id, "processing") + await update_raw_info_status(info_id, "processing") data = await ApifyClient(get_env("APIFY_API_TOKEN")).get_facebook_page(url) - await save_facebook_raw_data(row_id, data) + if data is None: + await update_raw_info_status(info_id, "failed") + logger.warning("[facebook] failed run=%s", analysis_run_id) + return + await update_raw_info(info_id, data) logger.info("[facebook] done run=%s", analysis_run_id) -async def collect_naver_blog(analysis_run_id: str, row_id: int, url: str) -> None: +async def collect_naver_blog(analysis_run_id: str, info_id: int, url: str) -> None: logger.info("[naver_blog] start run=%s url=%s", analysis_run_id, url) - await set_naver_blog_status(row_id, "processing") + await update_raw_info_status(info_id, "processing") data = await NaverClient(get_env("NAVER_CLIENT_ID"), get_env("NAVER_CLIENT_SECRET")).get_blog_rss(url) - await save_naver_blog_raw_data(row_id, data) + if data is None: + await update_raw_info_status(info_id, "failed") + logger.warning("[naver_blog] failed run=%s", analysis_run_id) + return + await update_raw_info(info_id, data) logger.info("[naver_blog] done run=%s", analysis_run_id) -async def collect_youtube(analysis_run_id: str, row_id: int, url: str) -> None: +async def collect_youtube(analysis_run_id: str, info_id: int, url: str) -> None: logger.info("[youtube] start run=%s url=%s", analysis_run_id, url) - await set_youtube_status(row_id, "processing") + await update_raw_info_status(info_id, "processing") data = await YouTubeClient(get_env("YOUTUBE_API_KEY")).get_channel(url) - await save_youtube_raw_data(row_id, data) + if data is None: + await update_raw_info_status(info_id, "failed") + logger.warning("[youtube] failed run=%s", analysis_run_id) + return + await update_raw_info(info_id, data) logger.info("[youtube] done run=%s", analysis_run_id) -async def collect_gangnam_unni(analysis_run_id: str, row_id: int, url: str) -> None: +async def collect_gangnam_unni(analysis_run_id: str, info_id: int, url: str) -> None: logger.info("[gangnam_unni] start run=%s url=%s", analysis_run_id, url) - await set_gangnam_unni_status(row_id, "processing") + await update_raw_info_status(info_id, "processing") data = await FirecrawlClient(get_env("FIRECRAWL_API_KEY")).get_gangnam_unni(url) - await save_gangnam_unni_raw_data(row_id, data) + if data is None: + await update_raw_info_status(info_id, "failed") + logger.warning("[gangnam_unni] failed run=%s", analysis_run_id) + return + await update_raw_info(info_id, data) logger.info("[gangnam_unni] done run=%s", analysis_run_id) -async def collect_clinic_info(analysis_run_id: str, hospital_id: str, url: str) -> None: - logger.info("[clinic] start run=%s url=%s", analysis_run_id, url) - await execute("UPDATE hospital_baseinfo SET status = 'processing' WHERE hospital_id = %s", (hospital_id,)) +async def collect_mainpage(analysis_run_id: str, info_id: int, hospital_id: str, url: str) -> None: + logger.info("[mainpage] start run=%s url=%s", analysis_run_id, url) + await update_raw_info_status(info_id, "processing") + await update_hospital_status(hospital_id, "processing") data = await FirecrawlClient(get_env("FIRECRAWL_API_KEY")).fetch_clinic_info(url) - await save_hospital_raw_data(hospital_id, data, analysis_run_id=analysis_run_id) - logger.info("[clinic] done run=%s", analysis_run_id) + if data is None: + await update_raw_info_status(info_id, "failed") + logger.warning("[mainpage] failed run=%s", analysis_run_id) + return + await update_raw_info(info_id, data) + await update_hospital(hospital_id, data, analysis_run_id=analysis_run_id) + logger.info("[mainpage] done run=%s", analysis_run_id) -async def collect_all( - analysis_run_id: str, - hospital_id: str, - instagram_id: int | None = None, - facebook_id: int | None = None, - naver_blog_id: int | None = None, - youtube_id: int | None = None, - gangnam_unni_id: int | None = None, -) -> None: - async def _url(table: str, row_id: int) -> str: - row = await fetchone(f"SELECT url FROM {table} WHERE id = %s", (row_id,)) - return row["url"] if row else "" +async def collect_all(analysis_run_id: str, hospital_id: str) -> None: + rows = await select_run_sources(analysis_run_id) - hospital = await fetchone("SELECT url FROM hospital_baseinfo WHERE hospital_id = %s", (hospital_id,)) - tasks = [collect_clinic_info(analysis_run_id, hospital_id, hospital["url"])] + _collectors = { + SourceType.INSTAGRAM: collect_instagram, + SourceType.FACEBOOK: collect_facebook, + SourceType.NAVER_BLOG: collect_naver_blog, + SourceType.YOUTUBE: collect_youtube, + SourceType.GANGNAM_UNNI: collect_gangnam_unni, + } - if instagram_id: - tasks.append(collect_instagram(analysis_run_id, instagram_id, await _url("instagram_data", instagram_id))) - if facebook_id: - tasks.append(collect_facebook(analysis_run_id, facebook_id, await _url("facebook_data", facebook_id))) - if naver_blog_id: - tasks.append(collect_naver_blog(analysis_run_id, naver_blog_id, await _url("naver_blog_data", naver_blog_id))) - if youtube_id: - tasks.append(collect_youtube(analysis_run_id, youtube_id, await _url("youtube_data", youtube_id))) - if gangnam_unni_id: - tasks.append(collect_gangnam_unni(analysis_run_id, gangnam_unni_id, await _url("gangnam_unni_data", gangnam_unni_id))) + tasks = [] + for row in rows: + info_id = row["info_id"] + source_type = row["source_type"] + url = row["url"] + if source_type == SourceType.MAINPAGE: + tasks.append(collect_mainpage(analysis_run_id, info_id, hospital_id, url)) + elif source_type in _collectors: + tasks.append(_collectors[source_type](analysis_run_id, info_id, url)) await asyncio.gather(*tasks, return_exceptions=True) diff --git a/app/services/file.py b/app/services/file_data.py similarity index 80% rename from app/services/file.py rename to app/services/file_data.py index 339cc99..8e168c7 100644 --- a/app/services/file.py +++ b/app/services/file_data.py @@ -2,7 +2,8 @@ import logging from fastapi import HTTPException, UploadFile -from common.db import execute, fetchall, fetchone, insert_file_row +from common.db.run import select_run +from common.db.file_data import insert_file, select_run_files, select_file, delete_file from integrations.azure_blob import AzureBlobUploader from models.file import FileListItem, FileType, FileUploadResponse @@ -31,10 +32,7 @@ async def upload_analysis_file( content_type: str | None = None, ) -> tuple[int, str]: """analysis_run에 딸린 파일 업로드. Blob 업로드 + file_data row 생성. (file_id, url) 반환.""" - run = await fetchone( - "SELECT hospital_id FROM analysis_runs WHERE analysis_run_id = %s", - (analysis_run_id,), - ) + run = await select_run(analysis_run_id) if not run: raise HTTPException(status_code=404, detail="analysis_run not found") hospital_id = run["hospital_id"] @@ -47,7 +45,7 @@ async def upload_analysis_file( content_type=content_type, ) - file_id = await insert_file_row( + file_id = await insert_file( analysis_run_id=analysis_run_id, hospital_id=hospital_id, file_type=file_type, @@ -61,12 +59,7 @@ async def upload_analysis_file( async def list_analysis_files(analysis_run_id: str) -> list[dict]: """analysis_run에 딸린 (삭제 안 된) 파일 목록.""" - return await fetchall( - "SELECT id, file_type, file_name, file_url, size_bytes, created_at FROM file_data" - " WHERE analysis_run_id = %s AND is_deleted = FALSE" - " ORDER BY created_at DESC", - (analysis_run_id,), - ) + return await select_run_files(analysis_run_id) async def handle_analysis_file_upload( @@ -102,7 +95,7 @@ async def handle_analysis_file_upload( async def get_analysis_files_response(analysis_run_id: str) -> list[FileListItem]: """run 존재 확인 + 응답 모델 생성.""" - if not await fetchone("SELECT 1 FROM analysis_runs WHERE analysis_run_id = %s", (analysis_run_id,)): + if not await select_run(analysis_run_id): raise HTTPException(status_code=404, detail="analysis_run not found") rows = await list_analysis_files(analysis_run_id) return [FileListItem(**{**r, "created_at": str(r["created_at"])}) for r in rows] @@ -110,14 +103,8 @@ async def get_analysis_files_response(analysis_run_id: str) -> list[FileListItem async def soft_delete_analysis_file(analysis_run_id: str, file_id: int) -> None: """analysis_run에 딸린 파일을 소프트 삭제. 멱등성 보장.""" - row = await fetchone( - "SELECT id FROM file_data WHERE id = %s AND analysis_run_id = %s", - (file_id, analysis_run_id), - ) + row = await select_file(file_id, analysis_run_id) if not row: raise HTTPException(status_code=404, detail="file not found") - await execute( - "UPDATE file_data SET is_deleted = TRUE WHERE id = %s AND is_deleted = FALSE", - (file_id,), - ) + await delete_file(file_id) logger.info("soft-deleted analysis file run=%s file_id=%s", analysis_run_id, file_id) diff --git a/app/services/market.py b/app/services/market.py index 8cfa193..421e901 100644 --- a/app/services/market.py +++ b/app/services/market.py @@ -1,7 +1,9 @@ import asyncio -import json import logging -from common.db import fetchone, execute +from common.db.run import select_run +from common.db.hospital import select_hospital +from common.db.market import upsert_market_status, upsert_market_result +from common.db.source import select_run_raw_data from integrations.llm.llm_service import LLMService from integrations.llm.prompt import ( market_competitors_prompt, @@ -18,49 +20,27 @@ _TYPES = ["competitors", "keywords", "trend", "target_audience"] async def _save(analysis_run_id: str, analysis_type: str, result, exc: Exception | None) -> None: if exc: logger.warning("[market] %s failed run=%s: %s", analysis_type, analysis_run_id, exc) - await execute( - "INSERT INTO market_analysis (analysis_run_id, analysis_type, status)" - " VALUES (%s, %s, 'failed')" - " ON DUPLICATE KEY UPDATE status = 'failed'", - (analysis_run_id, analysis_type), - ) + await upsert_market_status(analysis_run_id, analysis_type, "failed") else: - await execute( - "INSERT INTO market_analysis (analysis_run_id, analysis_type, status, data)" - " VALUES (%s, %s, 'done', %s)" - " ON DUPLICATE KEY UPDATE status = 'done', data = VALUES(data)", - (analysis_run_id, analysis_type, json.dumps(result.model_dump(), ensure_ascii=False)), - ) + await upsert_market_result(analysis_run_id, analysis_type, result.model_dump()) async def run_market_analysis(analysis_run_id: str) -> None: logger.info("[market] start run=%s", analysis_run_id) - run = await fetchone( - "SELECT hospital_id FROM analysis_runs WHERE analysis_run_id = %s", - (analysis_run_id,), - ) - clinic = await fetchone( - "SELECT hospital_name, road_address, raw_data FROM hospital_baseinfo WHERE hospital_id = %s", - (run["hospital_id"],), - ) + run = await select_run(analysis_run_id) + clinic = await select_hospital(run["hospital_id"]) + raw = await select_run_raw_data(analysis_run_id) + mainpage = raw.get("mainpage") or {} - raw_data = clinic["raw_data"] - clinic_data = json.loads(raw_data) if isinstance(raw_data, str) else (raw_data or {}) - - clinic_name = clinic["hospital_name"] or "" - address = clinic["road_address"] or "" - services = clinic_data.get("services", []) + clinic_name = (clinic or {}).get("hospital_name") or "" + address = (clinic or {}).get("road_address") or "" + services = mainpage.get("services", []) services_str = ", ".join(services[:3]) primary_service = services[0] if services else "" for analysis_type in _TYPES: - await execute( - "INSERT INTO market_analysis (analysis_run_id, analysis_type, status)" - " VALUES (%s, %s, 'processing')" - " ON DUPLICATE KEY UPDATE status = 'processing'", - (analysis_run_id, analysis_type), - ) + await upsert_market_status(analysis_run_id, analysis_type, "processing") llm = LLMService(provider="perplexity") results = await asyncio.gather( diff --git a/app/services/pipeline.py b/app/services/pipeline.py index 35b722f..c441723 100644 --- a/app/services/pipeline.py +++ b/app/services/pipeline.py @@ -1,5 +1,5 @@ import logging -from common.db import fetchone, execute +from common.db.run import select_run, update_run_status from models.status import AnalysisStatus from services.collect import collect_all from services.market import run_market_analysis @@ -12,41 +12,19 @@ async def run_pipeline(analysis_run_id: str) -> None: logger.info("[pipeline] start run=%s", analysis_run_id) # ── 1. Collect ────────────────────────────────────────────────────────── - run = await fetchone( - "SELECT hospital_id, instagram_data_id, facebook_data_id," - " naver_blog_data_id, youtube_data_id, gangnam_unni_data_id" - " FROM analysis_runs WHERE analysis_run_id = %s", - (analysis_run_id,), - ) - await collect_all( - analysis_run_id, - hospital_id=run["hospital_id"], - instagram_id=run["instagram_data_id"], - facebook_id=run["facebook_data_id"], - naver_blog_id=run["naver_blog_data_id"], - youtube_id=run["youtube_data_id"], - gangnam_unni_id=run["gangnam_unni_data_id"], - ) + run = await select_run(analysis_run_id) + await collect_all(analysis_run_id, hospital_id=run["hospital_id"]) # ── 2. Market ──────────────────────────────────────────────────────────── - await execute( - "UPDATE analysis_runs SET status = %s WHERE analysis_run_id = %s", - (AnalysisStatus.ANALYZING, analysis_run_id), - ) + await update_run_status(analysis_run_id, AnalysisStatus.ANALYZING) await run_market_analysis(analysis_run_id) # ── 3. Report ──────────────────────────────────────────────────────────── await run_report_task(analysis_run_id) # ── 4. Plan ────────────────────────────────────────────────────────────── - await execute( - "UPDATE analysis_runs SET status = %s WHERE analysis_run_id = %s", - (AnalysisStatus.PLANNING, analysis_run_id), - ) + await update_run_status(analysis_run_id, AnalysisStatus.PLANNING) await run_plan_task(analysis_run_id) - await execute( - "UPDATE analysis_runs SET status = %s WHERE analysis_run_id = %s", - (AnalysisStatus.COMPLETED, analysis_run_id), - ) + await update_run_status(analysis_run_id, AnalysisStatus.COMPLETED) logger.info("[pipeline] done run=%s", analysis_run_id)