db migration done
parent
c9c5ee9177
commit
3b4c154fb2
|
|
@ -1,158 +1,87 @@
|
||||||
-- 테이블 순서는 관계를 고려하여 한 번에 실행해도 에러가 발생하지 않게 정렬되었습니다.
|
-- user_info
|
||||||
|
|
||||||
-- instagram_data Table Create SQL
|
|
||||||
-- 테이블 생성 SQL - instagram_data
|
|
||||||
CREATE TABLE instagram_data
|
|
||||||
(
|
|
||||||
`id` INT NOT NULL AUTO_INCREMENT,
|
|
||||||
`hospital_id` CHAR(36) NOT NULL,
|
|
||||||
`url` VARCHAR(500) NOT NULL,
|
|
||||||
`status` VARCHAR(20) NOT NULL DEFAULT 'start',
|
|
||||||
`raw_data` JSON NULL,
|
|
||||||
`created_at` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
|
|
||||||
PRIMARY KEY (id)
|
|
||||||
);
|
|
||||||
|
|
||||||
-- Index 설정 SQL - instagram_data(hospital_id)
|
|
||||||
CREATE INDEX IX_instagram_data_1
|
|
||||||
ON instagram_data(hospital_id);
|
|
||||||
|
|
||||||
|
|
||||||
-- facebook_data Table Create SQL
|
|
||||||
-- 테이블 생성 SQL - facebook_data
|
|
||||||
CREATE TABLE facebook_data
|
|
||||||
(
|
|
||||||
`id` INT NOT NULL AUTO_INCREMENT,
|
|
||||||
`hospital_id` CHAR(36) NOT NULL,
|
|
||||||
`url` VARCHAR(500) NOT NULL,
|
|
||||||
`status` VARCHAR(20) NOT NULL DEFAULT 'start',
|
|
||||||
`raw_data` JSON NULL,
|
|
||||||
`created_at` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
|
|
||||||
PRIMARY KEY (id)
|
|
||||||
);
|
|
||||||
|
|
||||||
-- Index 설정 SQL - facebook_data(hospital_id)
|
|
||||||
CREATE INDEX IX_facebook_data_1
|
|
||||||
ON facebook_data(hospital_id);
|
|
||||||
|
|
||||||
|
|
||||||
-- naver_blog_data Table Create SQL
|
|
||||||
-- 테이블 생성 SQL - naver_blog_data
|
|
||||||
CREATE TABLE naver_blog_data
|
|
||||||
(
|
|
||||||
`id` INT NOT NULL AUTO_INCREMENT,
|
|
||||||
`hospital_id` CHAR(36) NOT NULL,
|
|
||||||
`url` VARCHAR(500) NOT NULL,
|
|
||||||
`status` VARCHAR(20) NOT NULL DEFAULT 'start',
|
|
||||||
`raw_data` JSON NULL,
|
|
||||||
`created_at` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
|
|
||||||
PRIMARY KEY (id)
|
|
||||||
);
|
|
||||||
|
|
||||||
-- Index 설정 SQL - naver_blog_data(hospital_id)
|
|
||||||
CREATE INDEX IX_naver_blog_data_1
|
|
||||||
ON naver_blog_data(hospital_id);
|
|
||||||
|
|
||||||
|
|
||||||
-- hospital_baseinfo Table Create SQL
|
|
||||||
-- 테이블 생성 SQL - hospital_baseinfo
|
|
||||||
CREATE TABLE hospital_baseinfo
|
|
||||||
(
|
|
||||||
`hospital_id` CHAR(36) NOT NULL,
|
|
||||||
`owner_user_id` INT NOT NULL,
|
|
||||||
`hospital_name` VARCHAR(50) NOT NULL,
|
|
||||||
`hospital_name_en` VARCHAR(50) NULL,
|
|
||||||
`brn` VARCHAR(50) NOT NULL,
|
|
||||||
`road_address` VARCHAR(100) NULL,
|
|
||||||
`site_address` VARCHAR(100) NULL,
|
|
||||||
`url` VARCHAR(500) NULL,
|
|
||||||
`status` VARCHAR(20) NOT NULL DEFAULT 'start',
|
|
||||||
`raw_data` JSON NULL,
|
|
||||||
`created_at` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
|
|
||||||
`updated_at` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
|
|
||||||
PRIMARY KEY (hospital_id)
|
|
||||||
);
|
|
||||||
|
|
||||||
-- Index 설정 SQL - hospital_baseinfo(owner_user_id)
|
|
||||||
CREATE INDEX IX_hospital_baseinfo_1
|
|
||||||
ON hospital_baseinfo(owner_user_id);
|
|
||||||
|
|
||||||
|
|
||||||
-- user_info Table Create SQL
|
|
||||||
-- 테이블 생성 SQL - user_info
|
|
||||||
CREATE TABLE user_info
|
CREATE TABLE user_info
|
||||||
(
|
(
|
||||||
`user_id` INT NOT NULL AUTO_INCREMENT,
|
`user_id` INT NOT NULL AUTO_INCREMENT,
|
||||||
`username` VARCHAR(50) NOT NULL,
|
`username` VARCHAR(50) NOT NULL,
|
||||||
`password` VARCHAR(50) NOT NULL,
|
`password` VARCHAR(50) NOT NULL,
|
||||||
`created_at` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
|
`created_at` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
|
||||||
`updated_at` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
|
`updated_at` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
|
||||||
PRIMARY KEY (user_id)
|
PRIMARY KEY (user_id)
|
||||||
);
|
);
|
||||||
|
|
||||||
-- youtube_data Table Create SQL
|
|
||||||
CREATE TABLE youtube_data
|
-- hospital_baseinfo
|
||||||
|
CREATE TABLE hospital_baseinfo
|
||||||
(
|
(
|
||||||
`id` INT NOT NULL AUTO_INCREMENT,
|
`hospital_id` CHAR(36) NOT NULL,
|
||||||
`hospital_id` CHAR(36) NOT NULL,
|
`owner_user_id` INT NOT NULL,
|
||||||
`url` VARCHAR(500) NOT NULL,
|
`hospital_name` VARCHAR(50) NOT NULL,
|
||||||
`status` VARCHAR(20) NOT NULL DEFAULT 'start',
|
`hospital_name_en` VARCHAR(50) NULL,
|
||||||
`raw_data` JSON NULL,
|
`brn` VARCHAR(50) NOT NULL,
|
||||||
`created_at` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
|
`road_address` VARCHAR(100) NULL,
|
||||||
PRIMARY KEY (id)
|
`site_address` VARCHAR(100) NULL,
|
||||||
|
`status` VARCHAR(20) NOT NULL DEFAULT 'start',
|
||||||
|
`created_at` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
|
||||||
|
`updated_at` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
|
||||||
|
PRIMARY KEY (hospital_id)
|
||||||
);
|
);
|
||||||
|
|
||||||
-- Index 설정 SQL - youtube_data(hospital_id)
|
CREATE INDEX IX_hospital_baseinfo_1 ON hospital_baseinfo (owner_user_id);
|
||||||
CREATE INDEX IX_youtube_data_1
|
|
||||||
ON youtube_data(hospital_id);
|
|
||||||
|
|
||||||
|
|
||||||
-- gangnam_unni_data Table Create SQL
|
-- remote_source: 병원별 채널 소스 정보 (instagram/facebook/naver_blog/youtube/gangnam_unni 등)
|
||||||
CREATE TABLE gangnam_unni_data
|
CREATE TABLE remote_source
|
||||||
(
|
(
|
||||||
`id` INT NOT NULL AUTO_INCREMENT,
|
`source_id` INT NOT NULL AUTO_INCREMENT,
|
||||||
`hospital_id` CHAR(36) NOT NULL,
|
`hospital_id` CHAR(36) NOT NULL,
|
||||||
`url` VARCHAR(500) NOT NULL,
|
`source_type` VARCHAR(50) NOT NULL,
|
||||||
`status` VARCHAR(20) NOT NULL DEFAULT 'start',
|
`language` CHAR(2) NULL,
|
||||||
`raw_data` JSON NULL,
|
`url` VARCHAR(500) NOT NULL,
|
||||||
`created_at` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
|
`created_at` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
|
||||||
PRIMARY KEY (id)
|
PRIMARY KEY (source_id)
|
||||||
);
|
);
|
||||||
|
|
||||||
-- Index 설정 SQL - gangnam_unni_data(hospital_id)
|
CREATE INDEX IX_remote_source_1 ON remote_source (hospital_id);
|
||||||
CREATE INDEX IX_gangnam_unni_data_1
|
CREATE INDEX IX_remote_source_2 ON remote_source (hospital_id, source_type);
|
||||||
ON gangnam_unni_data(hospital_id);
|
|
||||||
|
|
||||||
|
|
||||||
-- analysis_runs Table Create SQL
|
-- analysis_runs
|
||||||
CREATE TABLE analysis_runs
|
CREATE TABLE analysis_runs
|
||||||
(
|
(
|
||||||
`analysis_run_id` CHAR(36) NOT NULL,
|
`analysis_run_id` CHAR(36) NOT NULL,
|
||||||
`hospital_id` CHAR(36) NOT NULL,
|
`hospital_id` CHAR(36) NOT NULL,
|
||||||
`owner_user_id` INT NOT NULL DEFAULT 0,
|
`owner_user_id` INT NOT NULL DEFAULT 0,
|
||||||
`status` VARCHAR(50) NOT NULL DEFAULT 'discovering',
|
`status` VARCHAR(50) NOT NULL DEFAULT 'discovering',
|
||||||
`instagram_data_id` INT NULL,
|
`report_data` JSON NULL,
|
||||||
`facebook_data_id` INT NULL,
|
`plan_data` JSON NULL,
|
||||||
`naver_blog_data_id` INT NULL,
|
`created_at` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
|
||||||
`youtube_data_id` INT NULL,
|
`updated_at` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
|
||||||
`gangnam_unni_data_id` INT NULL,
|
|
||||||
`report_data` JSON NULL,
|
|
||||||
`plan_data` JSON NULL,
|
|
||||||
`created_at` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
|
|
||||||
`updated_at` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
|
|
||||||
PRIMARY KEY (analysis_run_id)
|
PRIMARY KEY (analysis_run_id)
|
||||||
);
|
);
|
||||||
|
|
||||||
-- Index 설정 SQL - analysis_runs(hospital_id)
|
CREATE INDEX IX_analysis_runs_1 ON analysis_runs (hospital_id);
|
||||||
CREATE INDEX IX_analysis_runs_1
|
CREATE INDEX IX_analysis_runs_2 ON analysis_runs (owner_user_id);
|
||||||
ON analysis_runs(hospital_id);
|
|
||||||
|
|
||||||
-- Index 설정 SQL - analysis_runs(owner_user_id)
|
|
||||||
CREATE INDEX IX_analysis_runs_2
|
|
||||||
ON analysis_runs(owner_user_id);
|
|
||||||
|
|
||||||
|
|
||||||
-- file_data Table Create SQL
|
-- raw_info: 분석 실행별 수집 원시 데이터
|
||||||
|
CREATE TABLE raw_info
|
||||||
|
(
|
||||||
|
`info_id` INT NOT NULL AUTO_INCREMENT,
|
||||||
|
`source_id` INT NOT NULL,
|
||||||
|
`analysis_run_id` CHAR(36) NOT NULL,
|
||||||
|
`data_tag` VARCHAR(50) NOT NULL DEFAULT 'default',
|
||||||
|
`status` VARCHAR(20) NOT NULL DEFAULT 'start',
|
||||||
|
`raw_data` JSON NULL,
|
||||||
|
`created_at` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
|
||||||
|
`updated_at` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
|
||||||
|
PRIMARY KEY (info_id)
|
||||||
|
);
|
||||||
|
|
||||||
|
CREATE INDEX IX_raw_info_1 ON raw_info (analysis_run_id);
|
||||||
|
CREATE INDEX IX_raw_info_2 ON raw_info (source_id);
|
||||||
|
|
||||||
|
|
||||||
|
-- file_data
|
||||||
CREATE TABLE file_data
|
CREATE TABLE file_data
|
||||||
(
|
(
|
||||||
`id` INT NOT NULL AUTO_INCREMENT,
|
`id` INT NOT NULL AUTO_INCREMENT,
|
||||||
|
|
@ -169,48 +98,38 @@ CREATE TABLE file_data
|
||||||
);
|
);
|
||||||
|
|
||||||
|
|
||||||
-- hospital_history Table Create SQL
|
-- hospital_history
|
||||||
CREATE TABLE hospital_history
|
CREATE TABLE hospital_history
|
||||||
(
|
(
|
||||||
`id` INT NOT NULL AUTO_INCREMENT,
|
`id` INT NOT NULL AUTO_INCREMENT,
|
||||||
`hospital_id` CHAR(36) NOT NULL,
|
`hospital_id` CHAR(36) NOT NULL,
|
||||||
`owner_user_id` INT NOT NULL,
|
`owner_user_id` INT NOT NULL,
|
||||||
`hospital_name` VARCHAR(50) NOT NULL,
|
`hospital_name` VARCHAR(50) NOT NULL,
|
||||||
`hospital_name_en` VARCHAR(50) NULL,
|
`hospital_name_en` VARCHAR(50) NULL,
|
||||||
`brn` VARCHAR(50) NOT NULL,
|
`brn` VARCHAR(50) NOT NULL,
|
||||||
`road_address` VARCHAR(100) NULL,
|
`road_address` VARCHAR(100) NULL,
|
||||||
`site_address` VARCHAR(100) NULL,
|
`site_address` VARCHAR(100) NULL,
|
||||||
`url` VARCHAR(500) NULL,
|
`status` VARCHAR(20) NOT NULL,
|
||||||
`status` VARCHAR(20) NOT NULL,
|
`analysis_run_id` CHAR(36) NULL,
|
||||||
`raw_data` JSON NULL,
|
`created_at` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
|
||||||
`analysis_run_id` CHAR(36) NULL,
|
|
||||||
`created_at` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
|
|
||||||
PRIMARY KEY (id)
|
PRIMARY KEY (id)
|
||||||
);
|
);
|
||||||
|
|
||||||
-- Index 설정 SQL - hospital_history(hospital_id)
|
CREATE INDEX IX_hospital_history_1 ON hospital_history (hospital_id);
|
||||||
CREATE INDEX IX_hospital_history_1
|
CREATE INDEX IX_hospital_history_2 ON hospital_history (analysis_run_id);
|
||||||
ON hospital_history(hospital_id);
|
|
||||||
|
|
||||||
-- Index 설정 SQL - hospital_history(analysis_run_id)
|
|
||||||
CREATE INDEX IX_hospital_history_2
|
|
||||||
ON hospital_history(analysis_run_id);
|
|
||||||
|
|
||||||
|
|
||||||
-- market_analysis Table Create SQL
|
-- market_analysis
|
||||||
CREATE TABLE market_analysis
|
CREATE TABLE market_analysis
|
||||||
(
|
(
|
||||||
`id` INT NOT NULL AUTO_INCREMENT,
|
`id` INT NOT NULL AUTO_INCREMENT,
|
||||||
`analysis_run_id` CHAR(36) NOT NULL,
|
`analysis_run_id` CHAR(36) NOT NULL,
|
||||||
`analysis_type` VARCHAR(50) NOT NULL,
|
`analysis_type` VARCHAR(50) NOT NULL,
|
||||||
`status` VARCHAR(20) NOT NULL DEFAULT 'start',
|
`status` VARCHAR(20) NOT NULL DEFAULT 'start',
|
||||||
`data` JSON NULL,
|
`data` JSON NULL,
|
||||||
`created_at` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
|
`created_at` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
|
||||||
PRIMARY KEY (id),
|
PRIMARY KEY (id),
|
||||||
UNIQUE KEY UQ_market_analysis (analysis_run_id, analysis_type)
|
UNIQUE KEY UQ_market_analysis (analysis_run_id, analysis_type)
|
||||||
);
|
);
|
||||||
|
|
||||||
-- Index 설정 SQL - market_analysis(analysis_run_id)
|
CREATE INDEX IX_market_analysis_1 ON market_analysis (analysis_run_id);
|
||||||
CREATE INDEX IX_market_analysis_1
|
|
||||||
ON market_analysis(analysis_run_id);
|
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -1,134 +0,0 @@
|
||||||
-- user_info
|
|
||||||
CREATE TABLE user_info
|
|
||||||
(
|
|
||||||
`user_id` INT NOT NULL AUTO_INCREMENT,
|
|
||||||
`username` VARCHAR(50) NOT NULL,
|
|
||||||
`password` VARCHAR(50) NOT NULL,
|
|
||||||
`created_at` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
|
|
||||||
`updated_at` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
|
|
||||||
PRIMARY KEY (user_id)
|
|
||||||
);
|
|
||||||
|
|
||||||
|
|
||||||
-- hospital_baseinfo
|
|
||||||
CREATE TABLE hospital_baseinfo
|
|
||||||
(
|
|
||||||
`hospital_id` CHAR(36) NOT NULL,
|
|
||||||
`owner_user_id` INT NOT NULL,
|
|
||||||
`hospital_name` VARCHAR(50) NOT NULL,
|
|
||||||
`hospital_name_en` VARCHAR(50) NULL,
|
|
||||||
`brn` VARCHAR(50) NOT NULL,
|
|
||||||
`road_address` VARCHAR(100) NULL,
|
|
||||||
`site_address` VARCHAR(100) NULL,
|
|
||||||
`status` VARCHAR(20) NOT NULL DEFAULT 'start',
|
|
||||||
`created_at` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
|
|
||||||
`updated_at` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
|
|
||||||
PRIMARY KEY (hospital_id)
|
|
||||||
);
|
|
||||||
|
|
||||||
CREATE INDEX IX_hospital_baseinfo_1 ON hospital_baseinfo (owner_user_id);
|
|
||||||
|
|
||||||
|
|
||||||
-- remote_source: 병원별 채널 소스 정보 (instagram/facebook/naver_blog/youtube/gangnam_unni 등)
|
|
||||||
CREATE TABLE remote_source
|
|
||||||
(
|
|
||||||
`source_id` INT NOT NULL AUTO_INCREMENT,
|
|
||||||
`hospital_id` CHAR(36) NOT NULL,
|
|
||||||
`source_type` VARCHAR(50) NOT NULL,
|
|
||||||
`url` VARCHAR(500) NOT NULL,
|
|
||||||
`created_at` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
|
|
||||||
PRIMARY KEY (source_id)
|
|
||||||
);
|
|
||||||
|
|
||||||
CREATE INDEX IX_remote_source_1 ON remote_source (hospital_id);
|
|
||||||
CREATE INDEX IX_remote_source_2 ON remote_source (hospital_id, source_type);
|
|
||||||
|
|
||||||
|
|
||||||
-- analysis_runs
|
|
||||||
CREATE TABLE analysis_runs
|
|
||||||
(
|
|
||||||
`analysis_run_id` CHAR(36) NOT NULL,
|
|
||||||
`hospital_id` CHAR(36) NOT NULL,
|
|
||||||
`owner_user_id` INT NOT NULL DEFAULT 0,
|
|
||||||
`status` VARCHAR(50) NOT NULL DEFAULT 'discovering',
|
|
||||||
`report_data` JSON NULL,
|
|
||||||
`plan_data` JSON NULL,
|
|
||||||
`created_at` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
|
|
||||||
`updated_at` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
|
|
||||||
PRIMARY KEY (analysis_run_id)
|
|
||||||
);
|
|
||||||
|
|
||||||
CREATE INDEX IX_analysis_runs_1 ON analysis_runs (hospital_id);
|
|
||||||
CREATE INDEX IX_analysis_runs_2 ON analysis_runs (owner_user_id);
|
|
||||||
|
|
||||||
|
|
||||||
-- raw_info: 분석 실행별 수집 원시 데이터
|
|
||||||
CREATE TABLE raw_info
|
|
||||||
(
|
|
||||||
`info_id` INT NOT NULL AUTO_INCREMENT,
|
|
||||||
`source_id` INT NOT NULL,
|
|
||||||
`analysis_run_id` CHAR(36) NOT NULL,
|
|
||||||
`data_tag` VARCHAR(50) NOT NULL DEFAULT 'default',
|
|
||||||
`status` VARCHAR(20) NOT NULL DEFAULT 'start',
|
|
||||||
`raw_data` JSON NULL,
|
|
||||||
`created_at` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
|
|
||||||
`updated_at` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
|
|
||||||
PRIMARY KEY (info_id)
|
|
||||||
);
|
|
||||||
|
|
||||||
CREATE INDEX IX_raw_info_1 ON raw_info (analysis_run_id);
|
|
||||||
CREATE INDEX IX_raw_info_2 ON raw_info (source_id);
|
|
||||||
|
|
||||||
|
|
||||||
-- file_data
|
|
||||||
CREATE TABLE file_data
|
|
||||||
(
|
|
||||||
`id` INT NOT NULL AUTO_INCREMENT,
|
|
||||||
`analysis_run_id` CHAR(36) NOT NULL,
|
|
||||||
`hospital_id` CHAR(36) NULL,
|
|
||||||
`file_type` ENUM('image','video','audio','document','file') NOT NULL DEFAULT 'file',
|
|
||||||
`file_name` VARCHAR(255) NOT NULL,
|
|
||||||
`file_url` VARCHAR(2048) NOT NULL,
|
|
||||||
`size_bytes` BIGINT NULL,
|
|
||||||
`is_deleted` BOOLEAN NOT NULL DEFAULT FALSE,
|
|
||||||
`created_at` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
|
|
||||||
PRIMARY KEY (id),
|
|
||||||
INDEX IX_file_data_1 (analysis_run_id, is_deleted)
|
|
||||||
);
|
|
||||||
|
|
||||||
|
|
||||||
-- hospital_history
|
|
||||||
CREATE TABLE hospital_history
|
|
||||||
(
|
|
||||||
`id` INT NOT NULL AUTO_INCREMENT,
|
|
||||||
`hospital_id` CHAR(36) NOT NULL,
|
|
||||||
`owner_user_id` INT NOT NULL,
|
|
||||||
`hospital_name` VARCHAR(50) NOT NULL,
|
|
||||||
`hospital_name_en` VARCHAR(50) NULL,
|
|
||||||
`brn` VARCHAR(50) NOT NULL,
|
|
||||||
`road_address` VARCHAR(100) NULL,
|
|
||||||
`site_address` VARCHAR(100) NULL,
|
|
||||||
`status` VARCHAR(20) NOT NULL,
|
|
||||||
`analysis_run_id` CHAR(36) NULL,
|
|
||||||
`created_at` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
|
|
||||||
PRIMARY KEY (id)
|
|
||||||
);
|
|
||||||
|
|
||||||
CREATE INDEX IX_hospital_history_1 ON hospital_history (hospital_id);
|
|
||||||
CREATE INDEX IX_hospital_history_2 ON hospital_history (analysis_run_id);
|
|
||||||
|
|
||||||
|
|
||||||
-- market_analysis
|
|
||||||
CREATE TABLE market_analysis
|
|
||||||
(
|
|
||||||
`id` INT NOT NULL AUTO_INCREMENT,
|
|
||||||
`analysis_run_id` CHAR(36) NOT NULL,
|
|
||||||
`analysis_type` VARCHAR(50) NOT NULL,
|
|
||||||
`status` VARCHAR(20) NOT NULL DEFAULT 'start',
|
|
||||||
`data` JSON NULL,
|
|
||||||
`created_at` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
|
|
||||||
PRIMARY KEY (id),
|
|
||||||
UNIQUE KEY UQ_market_analysis (analysis_run_id, analysis_type)
|
|
||||||
);
|
|
||||||
|
|
||||||
CREATE INDEX IX_market_analysis_1 ON market_analysis (analysis_run_id);
|
|
||||||
|
|
@ -2,12 +2,14 @@ import logging
|
||||||
import uuid6
|
import uuid6
|
||||||
from fastapi import APIRouter, BackgroundTasks, Depends, File, Form, HTTPException, UploadFile, status
|
from fastapi import APIRouter, BackgroundTasks, Depends, File, Form, HTTPException, UploadFile, status
|
||||||
from common.deps import verify_api_key
|
from common.deps import verify_api_key
|
||||||
from common.db import fetchone, insert_instagram_row, insert_facebook_row, insert_naver_blog_row, insert_youtube_row, insert_gangnam_unni_row, insert_analysis_run
|
from common.db.hospital import select_hospital
|
||||||
|
from common.db.source import select_source_mainpage, insert_source, insert_raw_info
|
||||||
|
from common.db.run import insert_run, select_run_status
|
||||||
from models.analysis import AnalysisCreate, AnalysisStartResponse, AnalysisStatusResponse
|
from models.analysis import AnalysisCreate, AnalysisStartResponse, AnalysisStatusResponse
|
||||||
from models.file import FileListItem, FileType, FileUploadResponse
|
from models.file import FileListItem, FileType, FileUploadResponse
|
||||||
from models.status import AnalysisStatus
|
from models.status import AnalysisStatus, SourceType
|
||||||
from services.pipeline import run_pipeline
|
from services.pipeline import run_pipeline
|
||||||
from services.file import get_analysis_files_response, handle_analysis_file_upload, soft_delete_analysis_file
|
from services.file_data import get_analysis_files_response, handle_analysis_file_upload, soft_delete_analysis_file
|
||||||
|
|
||||||
router = APIRouter(prefix="/api/analysis", tags=["analysis"], dependencies=[Depends(verify_api_key)])
|
router = APIRouter(prefix="/api/analysis", tags=["analysis"], dependencies=[Depends(verify_api_key)])
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
@ -20,23 +22,27 @@ async def start_analysis(body: AnalysisCreate, background_tasks: BackgroundTasks
|
||||||
hospital_id = body.clinic_id
|
hospital_id = body.clinic_id
|
||||||
|
|
||||||
# 사실 hospital과 owner_user_id 비교 후 검증이 필요한 거지만 일단 PoC 니까. 나중에 바꿉니다.
|
# 사실 hospital과 owner_user_id 비교 후 검증이 필요한 거지만 일단 PoC 니까. 나중에 바꿉니다.
|
||||||
hospital = await fetchone(
|
hospital = await select_hospital(hospital_id)
|
||||||
"SELECT owner_user_id, url FROM hospital_baseinfo WHERE hospital_id = %s",
|
|
||||||
(hospital_id,),
|
|
||||||
)
|
|
||||||
if not hospital:
|
if not hospital:
|
||||||
raise HTTPException(status_code=409, detail="Clinic not found")
|
raise HTTPException(status_code=409, detail="Clinic not found")
|
||||||
|
|
||||||
ig_id = await insert_instagram_row(hospital_id, body.channels.instagram) if body.channels.instagram else None
|
analysis_run_id = await insert_run(analysis_run_id, hospital_id, hospital["owner_user_id"])
|
||||||
fb_id = await insert_facebook_row(hospital_id, body.channels.facebook) if body.channels.facebook else None
|
|
||||||
nb_id = await insert_naver_blog_row(hospital_id, body.channels.naver_blog) if body.channels.naver_blog else None
|
|
||||||
yt_id = await insert_youtube_row(hospital_id, body.channels.youtube) if body.channels.youtube else None
|
|
||||||
gu_id = await insert_gangnam_unni_row(hospital_id, body.channels.gangnam_unni) if body.channels.gangnam_unni else None
|
|
||||||
|
|
||||||
analysis_run_id = await insert_analysis_run(
|
mainpage = await select_source_mainpage(hospital_id)
|
||||||
analysis_run_id, hospital_id, hospital["owner_user_id"],
|
if mainpage:
|
||||||
ig_id, fb_id, nb_id, yt_id, gu_id,
|
await insert_raw_info(mainpage["source_id"], analysis_run_id, data_tag=SourceType.MAINPAGE)
|
||||||
)
|
|
||||||
|
channels = [
|
||||||
|
(SourceType.INSTAGRAM, body.channels.instagram),
|
||||||
|
(SourceType.FACEBOOK, body.channels.facebook),
|
||||||
|
(SourceType.NAVER_BLOG, body.channels.naver_blog),
|
||||||
|
(SourceType.YOUTUBE, body.channels.youtube),
|
||||||
|
(SourceType.GANGNAM_UNNI, body.channels.gangnam_unni),
|
||||||
|
]
|
||||||
|
for source_type, url in channels:
|
||||||
|
if url:
|
||||||
|
source_id = await insert_source(hospital_id, source_type, url)
|
||||||
|
await insert_raw_info(source_id, analysis_run_id, data_tag=source_type)
|
||||||
|
|
||||||
background_tasks.add_task(run_pipeline, analysis_run_id)
|
background_tasks.add_task(run_pipeline, analysis_run_id)
|
||||||
|
|
||||||
|
|
@ -75,12 +81,12 @@ async def delete_analysis_run_file(run_id: str, file_id: int) -> None:
|
||||||
@router.get("/{run_id}/status", response_model=AnalysisStatusResponse)
|
@router.get("/{run_id}/status", response_model=AnalysisStatusResponse)
|
||||||
async def get_analysis_status(run_id: str):
|
async def get_analysis_status(run_id: str):
|
||||||
logger.info("GET /api/analysis/%s/status", run_id)
|
logger.info("GET /api/analysis/%s/status", run_id)
|
||||||
row = await fetchone("SELECT status FROM analysis_runs WHERE analysis_run_id = %s", (run_id,))
|
run_status = await select_run_status(run_id)
|
||||||
if not row:
|
if run_status is None:
|
||||||
raise HTTPException(status_code=404, detail="Run not found")
|
raise HTTPException(status_code=404, detail="Run not found")
|
||||||
return AnalysisStatusResponse(
|
return AnalysisStatusResponse(
|
||||||
analysis_run_id=run_id,
|
analysis_run_id=run_id,
|
||||||
status=AnalysisStatus(row["status"]),
|
status=AnalysisStatus(run_status),
|
||||||
progress=50.0,
|
progress=50.0,
|
||||||
current_step="",
|
current_step="",
|
||||||
channel_errors={},
|
channel_errors={},
|
||||||
|
|
|
||||||
|
|
@ -2,7 +2,8 @@ import logging
|
||||||
import uuid6
|
import uuid6
|
||||||
from fastapi import APIRouter, Depends, HTTPException, status
|
from fastapi import APIRouter, Depends, HTTPException, status
|
||||||
from common.deps import verify_api_key
|
from common.deps import verify_api_key
|
||||||
from common.db import insert_hospital, fetchone
|
from common.db.hospital import select_hospital, insert_hospital
|
||||||
|
from common.db.source import insert_source
|
||||||
from common.utils import get_env
|
from common.utils import get_env
|
||||||
from integrations.firecrawl import FirecrawlClient
|
from integrations.firecrawl import FirecrawlClient
|
||||||
from models.clinic import ClinicCreate, ClinicCreateResponse, ClinicResponse, ClinicHistoryResponse, RunSummary
|
from models.clinic import ClinicCreate, ClinicCreateResponse, ClinicResponse, ClinicHistoryResponse, RunSummary
|
||||||
|
|
@ -30,9 +31,8 @@ async def create_clinic(body: ClinicCreate):
|
||||||
name=info["clinicName"],
|
name=info["clinicName"],
|
||||||
name_en=info.get("clinicNameEn"),
|
name_en=info.get("clinicNameEn"),
|
||||||
road_address=info.get("address"),
|
road_address=info.get("address"),
|
||||||
url=body.url,
|
|
||||||
raw_data=info,
|
|
||||||
)
|
)
|
||||||
|
await insert_source(hospital_id, "mainpage", body.url)
|
||||||
return ClinicCreateResponse(
|
return ClinicCreateResponse(
|
||||||
id=hospital_id,
|
id=hospital_id,
|
||||||
url=body.url,
|
url=body.url,
|
||||||
|
|
@ -44,11 +44,7 @@ async def create_clinic(body: ClinicCreate):
|
||||||
@router.get("/{hospital_id}", response_model=ClinicResponse)
|
@router.get("/{hospital_id}", response_model=ClinicResponse)
|
||||||
async def get_clinic(hospital_id: str):
|
async def get_clinic(hospital_id: str):
|
||||||
logger.info("GET /api/clinics/%s", hospital_id)
|
logger.info("GET /api/clinics/%s", hospital_id)
|
||||||
row = await fetchone(
|
row = await select_hospital(hospital_id)
|
||||||
"SELECT hospital_id, hospital_name, hospital_name_en, road_address, url, status, raw_data, created_at, updated_at"
|
|
||||||
" FROM hospital_baseinfo WHERE hospital_id = %s",
|
|
||||||
(hospital_id,),
|
|
||||||
)
|
|
||||||
if not row:
|
if not row:
|
||||||
raise HTTPException(status_code=404, detail="Clinic not found")
|
raise HTTPException(status_code=404, detail="Clinic not found")
|
||||||
return ClinicResponse(**{**row, "created_at": str(row["created_at"]), "updated_at": str(row["updated_at"])})
|
return ClinicResponse(**{**row, "created_at": str(row["created_at"]), "updated_at": str(row["updated_at"])})
|
||||||
|
|
|
||||||
|
|
@ -1,7 +1,7 @@
|
||||||
import json
|
import json
|
||||||
import logging
|
import logging
|
||||||
from fastapi import APIRouter, Depends, HTTPException, Response
|
from fastapi import APIRouter, Depends, HTTPException, Response
|
||||||
from common.db import fetchone
|
from common.db.run import select_run_with_clinic
|
||||||
from common.deps import verify_api_key
|
from common.deps import verify_api_key
|
||||||
from integrations.llm.schemas.plan import PlanOutput
|
from integrations.llm.schemas.plan import PlanOutput
|
||||||
from models.plan import PlanApiResponse
|
from models.plan import PlanApiResponse
|
||||||
|
|
@ -13,13 +13,7 @@ logger = logging.getLogger(__name__)
|
||||||
@router.get("/{run_id}", response_model=PlanApiResponse, response_model_by_alias=True)
|
@router.get("/{run_id}", response_model=PlanApiResponse, response_model_by_alias=True)
|
||||||
async def get_plan(run_id: str):
|
async def get_plan(run_id: str):
|
||||||
logger.info("GET /api/plan/%s", run_id)
|
logger.info("GET /api/plan/%s", run_id)
|
||||||
row = await fetchone(
|
row = await select_run_with_clinic(run_id)
|
||||||
"SELECT ar.plan_data, ar.created_at, h.hospital_name, h.hospital_name_en, h.url"
|
|
||||||
" FROM analysis_runs ar"
|
|
||||||
" JOIN hospital_baseinfo h ON ar.hospital_id = h.hospital_id"
|
|
||||||
" WHERE ar.analysis_run_id = %s",
|
|
||||||
(run_id,),
|
|
||||||
)
|
|
||||||
if row is None:
|
if row is None:
|
||||||
raise HTTPException(status_code=404, detail="Run not found")
|
raise HTTPException(status_code=404, detail="Run not found")
|
||||||
if row["plan_data"] is None:
|
if row["plan_data"] is None:
|
||||||
|
|
@ -31,6 +25,6 @@ async def get_plan(run_id: str):
|
||||||
clinic_name=row["hospital_name"],
|
clinic_name=row["hospital_name"],
|
||||||
clinic_name_en=row["hospital_name_en"],
|
clinic_name_en=row["hospital_name_en"],
|
||||||
created_at=str(row["created_at"]),
|
created_at=str(row["created_at"]),
|
||||||
target_url=row["url"],
|
target_url=row["target_url"],
|
||||||
**plan.model_dump(),
|
**plan.model_dump(),
|
||||||
)
|
)
|
||||||
|
|
|
||||||
|
|
@ -1,7 +1,7 @@
|
||||||
import json
|
import json
|
||||||
import logging
|
import logging
|
||||||
from fastapi import APIRouter, Depends, HTTPException, Response
|
from fastapi import APIRouter, Depends, HTTPException, Response
|
||||||
from common.db import fetchone
|
from common.db.run import select_run_with_clinic
|
||||||
from common.deps import verify_api_key
|
from common.deps import verify_api_key
|
||||||
from integrations.llm.schemas.report import ReportOutput
|
from integrations.llm.schemas.report import ReportOutput
|
||||||
from models.report import MarketingReportResponse
|
from models.report import MarketingReportResponse
|
||||||
|
|
@ -13,13 +13,7 @@ logger = logging.getLogger(__name__)
|
||||||
@router.get("/{run_id}", response_model=MarketingReportResponse, response_model_by_alias=True)
|
@router.get("/{run_id}", response_model=MarketingReportResponse, response_model_by_alias=True)
|
||||||
async def get_report(run_id: str):
|
async def get_report(run_id: str):
|
||||||
logger.info("GET /api/report/%s", run_id)
|
logger.info("GET /api/report/%s", run_id)
|
||||||
row = await fetchone(
|
row = await select_run_with_clinic(run_id)
|
||||||
"SELECT ar.report_data, ar.created_at, h.hospital_name, h.hospital_name_en, h.url"
|
|
||||||
" FROM analysis_runs ar"
|
|
||||||
" JOIN hospital_baseinfo h ON ar.hospital_id = h.hospital_id"
|
|
||||||
" WHERE ar.analysis_run_id = %s",
|
|
||||||
(run_id,),
|
|
||||||
)
|
|
||||||
if row is None:
|
if row is None:
|
||||||
raise HTTPException(status_code=404, detail="Run not found")
|
raise HTTPException(status_code=404, detail="Run not found")
|
||||||
if row["report_data"] is None:
|
if row["report_data"] is None:
|
||||||
|
|
@ -31,6 +25,6 @@ async def get_report(run_id: str):
|
||||||
clinic_name=row["hospital_name"],
|
clinic_name=row["hospital_name"],
|
||||||
clinic_name_en=row["hospital_name_en"],
|
clinic_name_en=row["hospital_name_en"],
|
||||||
created_at=str(row["created_at"]),
|
created_at=str(row["created_at"]),
|
||||||
target_url=row["url"],
|
target_url=row["target_url"],
|
||||||
**llm_output.model_dump(exclude={"id", "created_at", "target_url"}),
|
**llm_output.model_dump(exclude={"id", "created_at", "target_url"}),
|
||||||
)
|
)
|
||||||
|
|
|
||||||
274
app/common/db.py
274
app/common/db.py
|
|
@ -1,274 +0,0 @@
|
||||||
import json
|
|
||||||
import os
|
|
||||||
import aiomysql
|
|
||||||
from common.utils import get_env
|
|
||||||
|
|
||||||
_pool: aiomysql.Pool | None = None
|
|
||||||
|
|
||||||
|
|
||||||
async def get_pool() -> aiomysql.Pool:
|
|
||||||
global _pool
|
|
||||||
if _pool is None:
|
|
||||||
_pool = await aiomysql.create_pool(
|
|
||||||
host=get_env("MYSQL_HOST"),
|
|
||||||
port=int(os.getenv("MYSQL_PORT", "3306")),
|
|
||||||
user=get_env("MYSQL_USER"),
|
|
||||||
password=get_env("MYSQL_PASSWORD"),
|
|
||||||
db=get_env("MYSQL_DB"),
|
|
||||||
charset="utf8mb4",
|
|
||||||
minsize=0,
|
|
||||||
maxsize=30,
|
|
||||||
connect_timeout=10,
|
|
||||||
)
|
|
||||||
return _pool
|
|
||||||
|
|
||||||
|
|
||||||
# 쓰기 (INSERT/UPDATE/DELETE)
|
|
||||||
async def execute(sql: str, args: tuple = ()) -> int:
|
|
||||||
pool = await get_pool()
|
|
||||||
async with pool.acquire() as conn:
|
|
||||||
try:
|
|
||||||
async with conn.cursor() as cur:
|
|
||||||
await cur.execute(sql, args)
|
|
||||||
await conn.commit()
|
|
||||||
return cur.lastrowid
|
|
||||||
finally:
|
|
||||||
conn.close()
|
|
||||||
|
|
||||||
|
|
||||||
# 읽기 (SELECT)
|
|
||||||
async def fetchone(sql: str, args: tuple = ()) -> dict | None:
|
|
||||||
pool = await get_pool()
|
|
||||||
async with pool.acquire() as conn:
|
|
||||||
try:
|
|
||||||
async with conn.cursor(aiomysql.DictCursor) as cur:
|
|
||||||
await cur.execute(sql, args)
|
|
||||||
return await cur.fetchone()
|
|
||||||
finally:
|
|
||||||
conn.close()
|
|
||||||
|
|
||||||
|
|
||||||
async def fetchall(sql: str, args: tuple = ()) -> list[dict]:
|
|
||||||
pool = await get_pool()
|
|
||||||
async with pool.acquire() as conn:
|
|
||||||
try:
|
|
||||||
async with conn.cursor(aiomysql.DictCursor) as cur:
|
|
||||||
await cur.execute(sql, args)
|
|
||||||
return await cur.fetchall()
|
|
||||||
finally:
|
|
||||||
conn.close()
|
|
||||||
|
|
||||||
async def insert_instagram_row(hospital_id: str, url: str) -> int:
|
|
||||||
return await execute("INSERT INTO instagram_data (hospital_id, url) VALUES (%s, %s)", (hospital_id, url))
|
|
||||||
|
|
||||||
|
|
||||||
async def insert_facebook_row(hospital_id: str, url: str) -> int:
|
|
||||||
return await execute("INSERT INTO facebook_data (hospital_id, url) VALUES (%s, %s)", (hospital_id, url))
|
|
||||||
|
|
||||||
|
|
||||||
async def insert_naver_blog_row(hospital_id: str, url: str) -> int:
|
|
||||||
return await execute("INSERT INTO naver_blog_data (hospital_id, url) VALUES (%s, %s)", (hospital_id, url))
|
|
||||||
|
|
||||||
|
|
||||||
async def insert_youtube_row(hospital_id: str, url: str) -> int:
|
|
||||||
return await execute("INSERT INTO youtube_data (hospital_id, url) VALUES (%s, %s)", (hospital_id, url))
|
|
||||||
|
|
||||||
|
|
||||||
async def insert_gangnam_unni_row(hospital_id: str, url: str) -> int:
|
|
||||||
return await execute("INSERT INTO gangnam_unni_data (hospital_id, url) VALUES (%s, %s)", (hospital_id, url))
|
|
||||||
|
|
||||||
|
|
||||||
async def insert_file_row(
|
|
||||||
analysis_run_id: str,
|
|
||||||
file_type: str,
|
|
||||||
file_name: str,
|
|
||||||
file_url: str,
|
|
||||||
size_bytes: int | None = None,
|
|
||||||
hospital_id: str | None = None,
|
|
||||||
) -> int:
|
|
||||||
return await execute(
|
|
||||||
"INSERT INTO file_data (analysis_run_id, hospital_id, file_type, file_name, file_url, size_bytes)"
|
|
||||||
" VALUES (%s, %s, %s, %s, %s, %s)",
|
|
||||||
(analysis_run_id, hospital_id, file_type, file_name, file_url, size_bytes),
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
async def insert_analysis_run(
|
|
||||||
analysis_run_id: str,
|
|
||||||
hospital_id: str,
|
|
||||||
owner_user_id: int,
|
|
||||||
instagram_data_id: int | None,
|
|
||||||
facebook_data_id: int | None,
|
|
||||||
naver_blog_data_id: int | None,
|
|
||||||
youtube_data_id: int | None,
|
|
||||||
gangnam_unni_data_id: int | None,
|
|
||||||
) -> str:
|
|
||||||
await execute(
|
|
||||||
"INSERT INTO analysis_runs"
|
|
||||||
" (analysis_run_id, hospital_id, owner_user_id, instagram_data_id, facebook_data_id, naver_blog_data_id, youtube_data_id, gangnam_unni_data_id)"
|
|
||||||
" VALUES (%s, %s, %s, %s, %s, %s, %s, %s)",
|
|
||||||
(analysis_run_id, hospital_id, owner_user_id, instagram_data_id, facebook_data_id, naver_blog_data_id, youtube_data_id, gangnam_unni_data_id),
|
|
||||||
)
|
|
||||||
return analysis_run_id
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
async def save_analysis_report(analysis_run_id: str, data: dict) -> None:
|
|
||||||
await execute(
|
|
||||||
"UPDATE analysis_runs SET report_data = %s WHERE analysis_run_id = %s",
|
|
||||||
(json.dumps(data, ensure_ascii=False), analysis_run_id),
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
async def is_done(table: str, row_id: int | None) -> bool:
|
|
||||||
if row_id is None:
|
|
||||||
return True
|
|
||||||
r = await fetchone(f"SELECT status FROM {table} WHERE id = %s", (row_id,))
|
|
||||||
return r["status"] == "done"
|
|
||||||
|
|
||||||
|
|
||||||
async def fetch_raw(table: str, row_id: int | None) -> dict | None:
|
|
||||||
if row_id is None:
|
|
||||||
return None
|
|
||||||
row = await fetchone(f"SELECT raw_data FROM {table} WHERE id = %s", (row_id,))
|
|
||||||
if not row or not row["raw_data"]:
|
|
||||||
return None
|
|
||||||
return json.loads(row["raw_data"]) if isinstance(row["raw_data"], str) else row["raw_data"]
|
|
||||||
|
|
||||||
|
|
||||||
async def get_analysis_raw_data(analysis_run_id: str) -> dict:
|
|
||||||
run = await fetchone(
|
|
||||||
"SELECT instagram_data_id, facebook_data_id, naver_blog_data_id, youtube_data_id, gangnam_unni_data_id"
|
|
||||||
" FROM analysis_runs WHERE analysis_run_id = %s",
|
|
||||||
(analysis_run_id,),
|
|
||||||
)
|
|
||||||
return {
|
|
||||||
"instagram": await fetch_raw("instagram_data", run["instagram_data_id"]),
|
|
||||||
"facebook": await fetch_raw("facebook_data", run["facebook_data_id"]),
|
|
||||||
"naver_blog": await fetch_raw("naver_blog_data", run["naver_blog_data_id"]),
|
|
||||||
"youtube": await fetch_raw("youtube_data", run["youtube_data_id"]),
|
|
||||||
"gangnam_unni": await fetch_raw("gangnam_unni_data", run["gangnam_unni_data_id"]),
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
async def set_instagram_status(row_id: int, status: str) -> None:
|
|
||||||
await execute("UPDATE instagram_data SET status = %s WHERE id = %s", (status, row_id))
|
|
||||||
|
|
||||||
|
|
||||||
async def set_facebook_status(row_id: int, status: str) -> None:
|
|
||||||
await execute("UPDATE facebook_data SET status = %s WHERE id = %s", (status, row_id))
|
|
||||||
|
|
||||||
|
|
||||||
async def set_naver_blog_status(row_id: int, status: str) -> None:
|
|
||||||
await execute("UPDATE naver_blog_data SET status = %s WHERE id = %s", (status, row_id))
|
|
||||||
|
|
||||||
|
|
||||||
async def set_youtube_status(row_id: int, status: str) -> None:
|
|
||||||
await execute("UPDATE youtube_data SET status = %s WHERE id = %s", (status, row_id))
|
|
||||||
|
|
||||||
|
|
||||||
async def set_gangnam_unni_status(row_id: int, status: str) -> None:
|
|
||||||
await execute("UPDATE gangnam_unni_data SET status = %s WHERE id = %s", (status, row_id))
|
|
||||||
|
|
||||||
|
|
||||||
async def save_instagram_raw_data(row_id: int, data: dict) -> None:
|
|
||||||
await execute("UPDATE instagram_data SET raw_data = %s, status = 'done' WHERE id = %s", (json.dumps(data, ensure_ascii=False), row_id))
|
|
||||||
|
|
||||||
|
|
||||||
async def save_facebook_raw_data(row_id: int, data: dict) -> None:
|
|
||||||
await execute("UPDATE facebook_data SET raw_data = %s, status = 'done' WHERE id = %s", (json.dumps(data, ensure_ascii=False), row_id))
|
|
||||||
|
|
||||||
|
|
||||||
async def save_naver_blog_raw_data(row_id: int, data: dict) -> None:
|
|
||||||
await execute("UPDATE naver_blog_data SET raw_data = %s, status = 'done' WHERE id = %s", (json.dumps(data, ensure_ascii=False), row_id))
|
|
||||||
|
|
||||||
|
|
||||||
async def save_youtube_raw_data(row_id: int, data: dict) -> None:
|
|
||||||
await execute("UPDATE youtube_data SET raw_data = %s, status = 'done' WHERE id = %s", (json.dumps(data, ensure_ascii=False), row_id))
|
|
||||||
|
|
||||||
|
|
||||||
async def save_gangnam_unni_raw_data(row_id: int, data: dict) -> None:
|
|
||||||
await execute("UPDATE gangnam_unni_data SET raw_data = %s, status = 'done' WHERE id = %s", (json.dumps(data, ensure_ascii=False), row_id))
|
|
||||||
|
|
||||||
|
|
||||||
async def _insert_hospital_history(hospital_id: str, analysis_run_id: str | None) -> None:
|
|
||||||
row = await fetchone(
|
|
||||||
"SELECT owner_user_id, hospital_name, hospital_name_en, brn, road_address, site_address, url, status, raw_data"
|
|
||||||
" FROM hospital_baseinfo WHERE hospital_id = %s",
|
|
||||||
(hospital_id,),
|
|
||||||
)
|
|
||||||
if not row:
|
|
||||||
return
|
|
||||||
await execute(
|
|
||||||
"INSERT INTO hospital_history"
|
|
||||||
" (hospital_id, owner_user_id, hospital_name, hospital_name_en, brn, road_address, site_address, url, status, raw_data, analysis_run_id)"
|
|
||||||
" VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)",
|
|
||||||
(
|
|
||||||
hospital_id,
|
|
||||||
row["owner_user_id"],
|
|
||||||
row["hospital_name"],
|
|
||||||
row["hospital_name_en"],
|
|
||||||
row["brn"],
|
|
||||||
row["road_address"],
|
|
||||||
row["site_address"],
|
|
||||||
row["url"],
|
|
||||||
row["status"],
|
|
||||||
row["raw_data"] if isinstance(row["raw_data"], str) else json.dumps(row["raw_data"], ensure_ascii=False) if row["raw_data"] else None,
|
|
||||||
analysis_run_id,
|
|
||||||
),
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
async def insert_hospital(
|
|
||||||
hospital_id: str,
|
|
||||||
name: str,
|
|
||||||
name_en: str | None = None,
|
|
||||||
road_address: str | None = None,
|
|
||||||
site_address: str | None = None,
|
|
||||||
url: str | None = None,
|
|
||||||
raw_data: dict | None = None,
|
|
||||||
owner_user_id: int = 0,
|
|
||||||
brn: str = "",
|
|
||||||
) -> dict:
|
|
||||||
await execute(
|
|
||||||
"INSERT INTO hospital_baseinfo (hospital_id, hospital_name, hospital_name_en, road_address, site_address, url, raw_data, status, owner_user_id, brn)"
|
|
||||||
" VALUES (%s, %s, %s, %s, %s, %s, %s, 'done', %s, %s)",
|
|
||||||
(hospital_id, name, name_en, road_address, site_address, url,
|
|
||||||
json.dumps(raw_data, ensure_ascii=False) if raw_data else None,
|
|
||||||
owner_user_id, brn),
|
|
||||||
)
|
|
||||||
await _insert_hospital_history(hospital_id, analysis_run_id=None)
|
|
||||||
return await fetchone(
|
|
||||||
"SELECT created_at FROM hospital_baseinfo WHERE hospital_id = %s",
|
|
||||||
(hospital_id,),
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
async def save_hospital_raw_data(hospital_id: str, data: dict, analysis_run_id: str | None = None) -> None:
|
|
||||||
await execute(
|
|
||||||
"UPDATE hospital_baseinfo"
|
|
||||||
" SET raw_data = %s, status = 'done',"
|
|
||||||
" hospital_name = COALESCE(%s, hospital_name),"
|
|
||||||
" hospital_name_en = COALESCE(%s, hospital_name_en),"
|
|
||||||
" road_address = COALESCE(%s, road_address)"
|
|
||||||
" WHERE hospital_id = %s",
|
|
||||||
(
|
|
||||||
json.dumps(data, ensure_ascii=False),
|
|
||||||
data.get("clinicName"),
|
|
||||||
data.get("clinicNameEn"),
|
|
||||||
data.get("address"),
|
|
||||||
hospital_id,
|
|
||||||
),
|
|
||||||
)
|
|
||||||
await _insert_hospital_history(hospital_id, analysis_run_id)
|
|
||||||
|
|
||||||
|
|
||||||
async def get_market_analysis(analysis_run_id: str) -> dict:
|
|
||||||
rows = await fetchall(
|
|
||||||
"SELECT analysis_type, data FROM market_analysis WHERE analysis_run_id = %s AND status = 'done'",
|
|
||||||
(analysis_run_id,),
|
|
||||||
)
|
|
||||||
return {
|
|
||||||
row["analysis_type"]: json.loads(row["data"]) if isinstance(row["data"], str) else row["data"]
|
|
||||||
for row in rows
|
|
||||||
}
|
|
||||||
|
|
@ -0,0 +1,14 @@
|
||||||
|
from common.db.base import execute, fetchone, fetchall
|
||||||
|
from common.db.hospital import select_hospital, update_hospital_status, insert_hospital, update_hospital
|
||||||
|
from common.db.source import (
|
||||||
|
insert_source, select_source_mainpage,
|
||||||
|
insert_raw_info, update_raw_info_status, update_raw_info,
|
||||||
|
select_raw_info_data,
|
||||||
|
select_run_sources, select_run_raw_data, select_run_mainpage_url,
|
||||||
|
)
|
||||||
|
from common.db.run import (
|
||||||
|
insert_run, select_run, select_run_status, update_run_status,
|
||||||
|
update_run_report, update_run_plan, select_run_with_clinic,
|
||||||
|
)
|
||||||
|
from common.db.market import upsert_market_status, upsert_market_result, select_market
|
||||||
|
from common.db.file_data import insert_file, select_run_files, select_file, delete_file
|
||||||
|
|
@ -0,0 +1,56 @@
|
||||||
|
import os
|
||||||
|
import aiomysql
|
||||||
|
from common.utils import get_env
|
||||||
|
|
||||||
|
_pool: aiomysql.Pool | None = None
|
||||||
|
|
||||||
|
|
||||||
|
async def get_pool() -> aiomysql.Pool:
|
||||||
|
global _pool
|
||||||
|
if _pool is None:
|
||||||
|
_pool = await aiomysql.create_pool(
|
||||||
|
host=get_env("MYSQL_HOST"),
|
||||||
|
port=int(os.getenv("MYSQL_PORT", "3306")),
|
||||||
|
user=get_env("MYSQL_USER"),
|
||||||
|
password=get_env("MYSQL_PASSWORD"),
|
||||||
|
db=get_env("MYSQL_DB"),
|
||||||
|
charset="utf8mb4",
|
||||||
|
minsize=0,
|
||||||
|
maxsize=30,
|
||||||
|
connect_timeout=10,
|
||||||
|
)
|
||||||
|
return _pool
|
||||||
|
|
||||||
|
|
||||||
|
async def execute(sql: str, args: tuple = ()) -> int:
|
||||||
|
pool = await get_pool()
|
||||||
|
async with pool.acquire() as conn:
|
||||||
|
try:
|
||||||
|
async with conn.cursor() as cur:
|
||||||
|
await cur.execute(sql, args)
|
||||||
|
await conn.commit()
|
||||||
|
return cur.lastrowid
|
||||||
|
finally:
|
||||||
|
conn.close()
|
||||||
|
|
||||||
|
|
||||||
|
async def fetchone(sql: str, args: tuple = ()) -> dict | None:
|
||||||
|
pool = await get_pool()
|
||||||
|
async with pool.acquire() as conn:
|
||||||
|
try:
|
||||||
|
async with conn.cursor(aiomysql.DictCursor) as cur:
|
||||||
|
await cur.execute(sql, args)
|
||||||
|
return await cur.fetchone()
|
||||||
|
finally:
|
||||||
|
conn.close()
|
||||||
|
|
||||||
|
|
||||||
|
async def fetchall(sql: str, args: tuple = ()) -> list[dict]:
|
||||||
|
pool = await get_pool()
|
||||||
|
async with pool.acquire() as conn:
|
||||||
|
try:
|
||||||
|
async with conn.cursor(aiomysql.DictCursor) as cur:
|
||||||
|
await cur.execute(sql, args)
|
||||||
|
return await cur.fetchall()
|
||||||
|
finally:
|
||||||
|
conn.close()
|
||||||
|
|
@ -0,0 +1,39 @@
|
||||||
|
from common.db.base import execute, fetchone, fetchall
|
||||||
|
|
||||||
|
|
||||||
|
async def insert_file(
|
||||||
|
analysis_run_id: str,
|
||||||
|
file_type: str,
|
||||||
|
file_name: str,
|
||||||
|
file_url: str,
|
||||||
|
size_bytes: int | None = None,
|
||||||
|
hospital_id: str | None = None,
|
||||||
|
) -> int:
|
||||||
|
return await execute(
|
||||||
|
"INSERT INTO file_data (analysis_run_id, hospital_id, file_type, file_name, file_url, size_bytes)"
|
||||||
|
" VALUES (%s, %s, %s, %s, %s, %s)",
|
||||||
|
(analysis_run_id, hospital_id, file_type, file_name, file_url, size_bytes),
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
async def select_run_files(analysis_run_id: str) -> list[dict]:
|
||||||
|
return await fetchall(
|
||||||
|
"SELECT id, file_type, file_name, file_url, size_bytes, created_at"
|
||||||
|
" FROM file_data WHERE analysis_run_id = %s AND is_deleted = FALSE"
|
||||||
|
" ORDER BY created_at DESC",
|
||||||
|
(analysis_run_id,),
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
async def select_file(file_id: int, analysis_run_id: str) -> dict | None:
|
||||||
|
return await fetchone(
|
||||||
|
"SELECT id FROM file_data WHERE id = %s AND analysis_run_id = %s",
|
||||||
|
(file_id, analysis_run_id),
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
async def delete_file(file_id: int) -> None:
|
||||||
|
await execute(
|
||||||
|
"UPDATE file_data SET is_deleted = TRUE WHERE id = %s AND is_deleted = FALSE",
|
||||||
|
(file_id,),
|
||||||
|
)
|
||||||
|
|
@ -0,0 +1,78 @@
|
||||||
|
from common.db.base import execute, fetchone
|
||||||
|
|
||||||
|
|
||||||
|
async def select_hospital(hospital_id: str) -> dict | None:
|
||||||
|
return await fetchone(
|
||||||
|
"SELECT hospital_id, owner_user_id, hospital_name, hospital_name_en,"
|
||||||
|
" brn, road_address, site_address, status, created_at, updated_at"
|
||||||
|
" FROM hospital_baseinfo WHERE hospital_id = %s",
|
||||||
|
(hospital_id,),
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
async def update_hospital_status(hospital_id: str, status: str) -> None:
|
||||||
|
await execute(
|
||||||
|
"UPDATE hospital_baseinfo SET status = %s WHERE hospital_id = %s",
|
||||||
|
(status, hospital_id),
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
async def _insert_hospital_history(hospital_id: str, analysis_run_id: str | None) -> None:
|
||||||
|
row = await fetchone(
|
||||||
|
"SELECT owner_user_id, hospital_name, hospital_name_en, brn, road_address, site_address, status"
|
||||||
|
" FROM hospital_baseinfo WHERE hospital_id = %s",
|
||||||
|
(hospital_id,),
|
||||||
|
)
|
||||||
|
if not row:
|
||||||
|
return
|
||||||
|
await execute(
|
||||||
|
"INSERT INTO hospital_history"
|
||||||
|
" (hospital_id, owner_user_id, hospital_name, hospital_name_en, brn, road_address, site_address, status, analysis_run_id)"
|
||||||
|
" VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)",
|
||||||
|
(
|
||||||
|
hospital_id,
|
||||||
|
row["owner_user_id"],
|
||||||
|
row["hospital_name"],
|
||||||
|
row["hospital_name_en"],
|
||||||
|
row["brn"],
|
||||||
|
row["road_address"],
|
||||||
|
row["site_address"],
|
||||||
|
row["status"],
|
||||||
|
analysis_run_id,
|
||||||
|
),
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
async def insert_hospital(
|
||||||
|
hospital_id: str,
|
||||||
|
name: str,
|
||||||
|
name_en: str | None = None,
|
||||||
|
road_address: str | None = None,
|
||||||
|
site_address: str | None = None,
|
||||||
|
owner_user_id: int = 0,
|
||||||
|
brn: str = "",
|
||||||
|
) -> dict:
|
||||||
|
await execute(
|
||||||
|
"INSERT INTO hospital_baseinfo"
|
||||||
|
" (hospital_id, hospital_name, hospital_name_en, road_address, site_address, status, owner_user_id, brn)"
|
||||||
|
" VALUES (%s, %s, %s, %s, %s, 'done', %s, %s)",
|
||||||
|
(hospital_id, name, name_en, road_address, site_address, owner_user_id, brn),
|
||||||
|
)
|
||||||
|
await _insert_hospital_history(hospital_id, analysis_run_id=None)
|
||||||
|
return await fetchone(
|
||||||
|
"SELECT created_at FROM hospital_baseinfo WHERE hospital_id = %s",
|
||||||
|
(hospital_id,),
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
async def update_hospital(hospital_id: str, data: dict, analysis_run_id: str | None = None) -> None:
|
||||||
|
await execute(
|
||||||
|
"UPDATE hospital_baseinfo"
|
||||||
|
" SET status = 'done',"
|
||||||
|
" hospital_name = COALESCE(%s, hospital_name),"
|
||||||
|
" hospital_name_en = COALESCE(%s, hospital_name_en),"
|
||||||
|
" road_address = COALESCE(%s, road_address)"
|
||||||
|
" WHERE hospital_id = %s",
|
||||||
|
(data.get("clinicName"), data.get("clinicNameEn"), data.get("address"), hospital_id),
|
||||||
|
)
|
||||||
|
await _insert_hospital_history(hospital_id, analysis_run_id)
|
||||||
|
|
@ -0,0 +1,31 @@
|
||||||
|
import json
|
||||||
|
from common.db.base import execute, fetchall
|
||||||
|
|
||||||
|
|
||||||
|
async def upsert_market_status(analysis_run_id: str, analysis_type: str, status: str) -> None:
|
||||||
|
await execute(
|
||||||
|
"INSERT INTO market_analysis (analysis_run_id, analysis_type, status)"
|
||||||
|
" VALUES (%s, %s, %s)"
|
||||||
|
" ON DUPLICATE KEY UPDATE status = VALUES(status)",
|
||||||
|
(analysis_run_id, analysis_type, status),
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
async def upsert_market_result(analysis_run_id: str, analysis_type: str, data: dict) -> None:
|
||||||
|
await execute(
|
||||||
|
"INSERT INTO market_analysis (analysis_run_id, analysis_type, status, data)"
|
||||||
|
" VALUES (%s, %s, 'done', %s)"
|
||||||
|
" ON DUPLICATE KEY UPDATE status = 'done', data = VALUES(data)",
|
||||||
|
(analysis_run_id, analysis_type, json.dumps(data, ensure_ascii=False)),
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
async def select_market(analysis_run_id: str) -> dict:
|
||||||
|
rows = await fetchall(
|
||||||
|
"SELECT analysis_type, data FROM market_analysis WHERE analysis_run_id = %s AND status = 'done'",
|
||||||
|
(analysis_run_id,),
|
||||||
|
)
|
||||||
|
return {
|
||||||
|
row["analysis_type"]: json.loads(row["data"]) if isinstance(row["data"], str) else row["data"]
|
||||||
|
for row in rows
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,64 @@
|
||||||
|
import json
|
||||||
|
from common.db.base import execute, fetchone
|
||||||
|
|
||||||
|
|
||||||
|
async def insert_run(
|
||||||
|
analysis_run_id: str,
|
||||||
|
hospital_id: str,
|
||||||
|
owner_user_id: int,
|
||||||
|
) -> str:
|
||||||
|
await execute(
|
||||||
|
"INSERT INTO analysis_runs (analysis_run_id, hospital_id, owner_user_id) VALUES (%s, %s, %s)",
|
||||||
|
(analysis_run_id, hospital_id, owner_user_id),
|
||||||
|
)
|
||||||
|
return analysis_run_id
|
||||||
|
|
||||||
|
|
||||||
|
async def select_run(analysis_run_id: str) -> dict | None:
|
||||||
|
return await fetchone(
|
||||||
|
"SELECT analysis_run_id, hospital_id, owner_user_id, status, created_at, updated_at"
|
||||||
|
" FROM analysis_runs WHERE analysis_run_id = %s",
|
||||||
|
(analysis_run_id,),
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
async def select_run_status(analysis_run_id: str) -> str | None:
|
||||||
|
row = await fetchone(
|
||||||
|
"SELECT status FROM analysis_runs WHERE analysis_run_id = %s",
|
||||||
|
(analysis_run_id,),
|
||||||
|
)
|
||||||
|
return row["status"] if row else None
|
||||||
|
|
||||||
|
|
||||||
|
async def update_run_status(analysis_run_id: str, status: str) -> None:
|
||||||
|
await execute(
|
||||||
|
"UPDATE analysis_runs SET status = %s WHERE analysis_run_id = %s",
|
||||||
|
(status, analysis_run_id),
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
async def update_run_report(analysis_run_id: str, data: dict) -> None:
|
||||||
|
await execute(
|
||||||
|
"UPDATE analysis_runs SET report_data = %s WHERE analysis_run_id = %s",
|
||||||
|
(json.dumps(data, ensure_ascii=False), analysis_run_id),
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
async def update_run_plan(analysis_run_id: str, data: dict) -> None:
|
||||||
|
await execute(
|
||||||
|
"UPDATE analysis_runs SET plan_data = %s WHERE analysis_run_id = %s",
|
||||||
|
(json.dumps(data, ensure_ascii=False), analysis_run_id),
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
async def select_run_with_clinic(analysis_run_id: str) -> dict | None:
|
||||||
|
return await fetchone(
|
||||||
|
"SELECT ar.report_data, ar.plan_data, ar.created_at,"
|
||||||
|
" h.hospital_name, h.hospital_name_en,"
|
||||||
|
" rs.url AS target_url"
|
||||||
|
" FROM analysis_runs ar"
|
||||||
|
" JOIN hospital_baseinfo h ON ar.hospital_id = h.hospital_id"
|
||||||
|
" LEFT JOIN remote_source rs ON rs.hospital_id = h.hospital_id AND rs.source_type = 'mainpage'"
|
||||||
|
" WHERE ar.analysis_run_id = %s",
|
||||||
|
(analysis_run_id,),
|
||||||
|
)
|
||||||
|
|
@ -0,0 +1,85 @@
|
||||||
|
import json
|
||||||
|
from common.db.base import execute, fetchone, fetchall
|
||||||
|
from models.status import SourceType
|
||||||
|
|
||||||
|
|
||||||
|
async def insert_source(
|
||||||
|
hospital_id: str,
|
||||||
|
source_type: SourceType,
|
||||||
|
url: str,
|
||||||
|
language: str | None = None,
|
||||||
|
) -> int:
|
||||||
|
return await execute(
|
||||||
|
"INSERT INTO remote_source (hospital_id, source_type, language, url) VALUES (%s, %s, %s, %s)",
|
||||||
|
(hospital_id, source_type, language, url),
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
async def select_source_mainpage(hospital_id: str) -> dict | None:
|
||||||
|
return await fetchone(
|
||||||
|
"SELECT source_id FROM remote_source WHERE hospital_id = %s AND source_type = 'mainpage'",
|
||||||
|
(hospital_id,),
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
async def insert_raw_info(
|
||||||
|
source_id: int,
|
||||||
|
analysis_run_id: str,
|
||||||
|
data_tag: SourceType,
|
||||||
|
) -> int:
|
||||||
|
return await execute(
|
||||||
|
"INSERT INTO raw_info (source_id, analysis_run_id, data_tag) VALUES (%s, %s, %s)",
|
||||||
|
(source_id, analysis_run_id, data_tag),
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
async def update_raw_info_status(info_id: int, status: str) -> None:
|
||||||
|
await execute("UPDATE raw_info SET status = %s WHERE info_id = %s", (status, info_id))
|
||||||
|
|
||||||
|
|
||||||
|
async def update_raw_info(info_id: int, data: dict) -> None:
|
||||||
|
await execute(
|
||||||
|
"UPDATE raw_info SET raw_data = %s, status = 'done' WHERE info_id = %s",
|
||||||
|
(json.dumps(data, ensure_ascii=False), info_id),
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
async def select_raw_info_data(info_id: int | None) -> dict | None:
|
||||||
|
if info_id is None:
|
||||||
|
return None
|
||||||
|
row = await fetchone("SELECT raw_data FROM raw_info WHERE info_id = %s", (info_id,))
|
||||||
|
if not row or not row["raw_data"]:
|
||||||
|
return None
|
||||||
|
return json.loads(row["raw_data"]) if isinstance(row["raw_data"], str) else row["raw_data"]
|
||||||
|
|
||||||
|
|
||||||
|
async def select_run_sources(analysis_run_id: str) -> list[dict]:
|
||||||
|
return await fetchall(
|
||||||
|
"SELECT ri.info_id, rs.source_type, rs.url"
|
||||||
|
" FROM raw_info ri JOIN remote_source rs USING (source_id)"
|
||||||
|
" WHERE ri.analysis_run_id = %s",
|
||||||
|
(analysis_run_id,),
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
async def select_run_raw_data(analysis_run_id: str) -> dict:
|
||||||
|
rows = await fetchall(
|
||||||
|
"SELECT rs.source_type, ri.raw_data"
|
||||||
|
" FROM raw_info ri JOIN remote_source rs USING (source_id)"
|
||||||
|
" WHERE ri.analysis_run_id = %s",
|
||||||
|
(analysis_run_id,),
|
||||||
|
)
|
||||||
|
result: dict = {}
|
||||||
|
for row in rows:
|
||||||
|
raw = row["raw_data"]
|
||||||
|
result[row["source_type"]] = json.loads(raw) if isinstance(raw, str) else raw
|
||||||
|
return result
|
||||||
|
|
||||||
|
|
||||||
|
async def select_run_mainpage_url(analysis_run_id: str) -> str:
|
||||||
|
row = await fetchone(
|
||||||
|
"SELECT rs.url FROM raw_info ri JOIN remote_source rs USING (source_id)"
|
||||||
|
" WHERE ri.analysis_run_id = %s AND rs.source_type = 'mainpage'",
|
||||||
|
(analysis_run_id,),
|
||||||
|
)
|
||||||
|
return (row or {}).get("url") or ""
|
||||||
|
|
@ -1,236 +0,0 @@
|
||||||
import json
|
|
||||||
import os
|
|
||||||
import aiomysql
|
|
||||||
from common.utils import get_env
|
|
||||||
|
|
||||||
_pool: aiomysql.Pool | None = None
|
|
||||||
|
|
||||||
|
|
||||||
async def get_pool() -> aiomysql.Pool:
|
|
||||||
global _pool
|
|
||||||
if _pool is None:
|
|
||||||
_pool = await aiomysql.create_pool(
|
|
||||||
host=get_env("MYSQL_HOST"),
|
|
||||||
port=int(os.getenv("MYSQL_PORT", "3306")),
|
|
||||||
user=get_env("MYSQL_USER"),
|
|
||||||
password=get_env("MYSQL_PASSWORD"),
|
|
||||||
db=get_env("MYSQL_DB"),
|
|
||||||
charset="utf8mb4",
|
|
||||||
minsize=0,
|
|
||||||
maxsize=30,
|
|
||||||
connect_timeout=10,
|
|
||||||
)
|
|
||||||
return _pool
|
|
||||||
|
|
||||||
|
|
||||||
async def execute(sql: str, args: tuple = ()) -> int:
|
|
||||||
pool = await get_pool()
|
|
||||||
async with pool.acquire() as conn:
|
|
||||||
try:
|
|
||||||
async with conn.cursor() as cur:
|
|
||||||
await cur.execute(sql, args)
|
|
||||||
await conn.commit()
|
|
||||||
return cur.lastrowid
|
|
||||||
finally:
|
|
||||||
conn.close()
|
|
||||||
|
|
||||||
|
|
||||||
async def fetchone(sql: str, args: tuple = ()) -> dict | None:
|
|
||||||
pool = await get_pool()
|
|
||||||
async with pool.acquire() as conn:
|
|
||||||
try:
|
|
||||||
async with conn.cursor(aiomysql.DictCursor) as cur:
|
|
||||||
await cur.execute(sql, args)
|
|
||||||
return await cur.fetchone()
|
|
||||||
finally:
|
|
||||||
conn.close()
|
|
||||||
|
|
||||||
|
|
||||||
async def fetchall(sql: str, args: tuple = ()) -> list[dict]:
|
|
||||||
pool = await get_pool()
|
|
||||||
async with pool.acquire() as conn:
|
|
||||||
try:
|
|
||||||
async with conn.cursor(aiomysql.DictCursor) as cur:
|
|
||||||
await cur.execute(sql, args)
|
|
||||||
return await cur.fetchall()
|
|
||||||
finally:
|
|
||||||
conn.close()
|
|
||||||
|
|
||||||
|
|
||||||
# ── remote_source ─────────────────────────────────────────────────────────────
|
|
||||||
|
|
||||||
async def insert_source(hospital_id: str, source_type: str, url: str) -> int:
|
|
||||||
return await execute(
|
|
||||||
"INSERT INTO remote_source (hospital_id, source_type, url) VALUES (%s, %s, %s)",
|
|
||||||
(hospital_id, source_type, url),
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
# ── raw_info ──────────────────────────────────────────────────────────────────
|
|
||||||
|
|
||||||
async def insert_raw_info(source_id: int, analysis_run_id: str, data_tag: str = "default") -> int:
|
|
||||||
return await execute(
|
|
||||||
"INSERT INTO raw_info (source_id, analysis_run_id, data_tag) VALUES (%s, %s, %s)",
|
|
||||||
(source_id, analysis_run_id, data_tag),
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
async def set_raw_info_status(info_id: int, status: str) -> None:
|
|
||||||
await execute("UPDATE raw_info SET status = %s WHERE info_id = %s", (status, info_id))
|
|
||||||
|
|
||||||
|
|
||||||
async def save_raw_info(info_id: int, data: dict) -> None:
|
|
||||||
await execute(
|
|
||||||
"UPDATE raw_info SET raw_data = %s, status = 'done' WHERE info_id = %s",
|
|
||||||
(json.dumps(data, ensure_ascii=False), info_id),
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
async def fetch_raw(info_id: int | None) -> dict | None:
|
|
||||||
if info_id is None:
|
|
||||||
return None
|
|
||||||
row = await fetchone("SELECT raw_data FROM raw_info WHERE info_id = %s", (info_id,))
|
|
||||||
if not row or not row["raw_data"]:
|
|
||||||
return None
|
|
||||||
return json.loads(row["raw_data"]) if isinstance(row["raw_data"], str) else row["raw_data"]
|
|
||||||
|
|
||||||
|
|
||||||
async def is_done(info_id: int | None) -> bool:
|
|
||||||
if info_id is None:
|
|
||||||
return True
|
|
||||||
r = await fetchone("SELECT status FROM raw_info WHERE info_id = %s", (info_id,))
|
|
||||||
return r["status"] == "done"
|
|
||||||
|
|
||||||
|
|
||||||
async def get_analysis_raw_data(analysis_run_id: str) -> dict:
|
|
||||||
rows = await fetchall(
|
|
||||||
"SELECT rs.source_type, ri.raw_data"
|
|
||||||
" FROM raw_info ri JOIN remote_source rs USING (source_id)"
|
|
||||||
" WHERE ri.analysis_run_id = %s",
|
|
||||||
(analysis_run_id,),
|
|
||||||
)
|
|
||||||
result: dict = {}
|
|
||||||
for row in rows:
|
|
||||||
raw = row["raw_data"]
|
|
||||||
result[row["source_type"]] = json.loads(raw) if isinstance(raw, str) else raw
|
|
||||||
return result
|
|
||||||
|
|
||||||
|
|
||||||
# ── analysis_runs ─────────────────────────────────────────────────────────────
|
|
||||||
|
|
||||||
async def insert_analysis_run(
|
|
||||||
analysis_run_id: str,
|
|
||||||
hospital_id: str,
|
|
||||||
owner_user_id: int,
|
|
||||||
) -> str:
|
|
||||||
await execute(
|
|
||||||
"INSERT INTO analysis_runs (analysis_run_id, hospital_id, owner_user_id)"
|
|
||||||
" VALUES (%s, %s, %s)",
|
|
||||||
(analysis_run_id, hospital_id, owner_user_id),
|
|
||||||
)
|
|
||||||
return analysis_run_id
|
|
||||||
|
|
||||||
|
|
||||||
async def save_analysis_report(analysis_run_id: str, data: dict) -> None:
|
|
||||||
await execute(
|
|
||||||
"UPDATE analysis_runs SET report_data = %s WHERE analysis_run_id = %s",
|
|
||||||
(json.dumps(data, ensure_ascii=False), analysis_run_id),
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
# ── hospital_baseinfo ─────────────────────────────────────────────────────────
|
|
||||||
|
|
||||||
async def _insert_hospital_history(hospital_id: str, analysis_run_id: str | None) -> None:
|
|
||||||
row = await fetchone(
|
|
||||||
"SELECT owner_user_id, hospital_name, hospital_name_en, brn, road_address, site_address, status"
|
|
||||||
" FROM hospital_baseinfo WHERE hospital_id = %s",
|
|
||||||
(hospital_id,),
|
|
||||||
)
|
|
||||||
if not row:
|
|
||||||
return
|
|
||||||
await execute(
|
|
||||||
"INSERT INTO hospital_history"
|
|
||||||
" (hospital_id, owner_user_id, hospital_name, hospital_name_en, brn, road_address, site_address, status, analysis_run_id)"
|
|
||||||
" VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)",
|
|
||||||
(
|
|
||||||
hospital_id,
|
|
||||||
row["owner_user_id"],
|
|
||||||
row["hospital_name"],
|
|
||||||
row["hospital_name_en"],
|
|
||||||
row["brn"],
|
|
||||||
row["road_address"],
|
|
||||||
row["site_address"],
|
|
||||||
row["status"],
|
|
||||||
analysis_run_id,
|
|
||||||
),
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
async def insert_hospital(
|
|
||||||
hospital_id: str,
|
|
||||||
name: str,
|
|
||||||
name_en: str | None = None,
|
|
||||||
road_address: str | None = None,
|
|
||||||
site_address: str | None = None,
|
|
||||||
owner_user_id: int = 0,
|
|
||||||
brn: str = "",
|
|
||||||
) -> dict:
|
|
||||||
await execute(
|
|
||||||
"INSERT INTO hospital_baseinfo (hospital_id, hospital_name, hospital_name_en, road_address, site_address, status, owner_user_id, brn)"
|
|
||||||
" VALUES (%s, %s, %s, %s, %s, 'done', %s, %s)",
|
|
||||||
(hospital_id, name, name_en, road_address, site_address, owner_user_id, brn),
|
|
||||||
)
|
|
||||||
await _insert_hospital_history(hospital_id, analysis_run_id=None)
|
|
||||||
return await fetchone(
|
|
||||||
"SELECT created_at FROM hospital_baseinfo WHERE hospital_id = %s",
|
|
||||||
(hospital_id,),
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
async def update_hospital_info(hospital_id: str, data: dict, analysis_run_id: str | None = None) -> None:
|
|
||||||
"""clinic 스크래핑 후 hospital_baseinfo의 기본 필드 업데이트."""
|
|
||||||
await execute(
|
|
||||||
"UPDATE hospital_baseinfo"
|
|
||||||
" SET status = 'done',"
|
|
||||||
" hospital_name = COALESCE(%s, hospital_name),"
|
|
||||||
" hospital_name_en = COALESCE(%s, hospital_name_en),"
|
|
||||||
" road_address = COALESCE(%s, road_address)"
|
|
||||||
" WHERE hospital_id = %s",
|
|
||||||
(
|
|
||||||
data.get("clinicName"),
|
|
||||||
data.get("clinicNameEn"),
|
|
||||||
data.get("address"),
|
|
||||||
hospital_id,
|
|
||||||
),
|
|
||||||
)
|
|
||||||
await _insert_hospital_history(hospital_id, analysis_run_id)
|
|
||||||
|
|
||||||
|
|
||||||
# ── file_data ─────────────────────────────────────────────────────────────────
|
|
||||||
|
|
||||||
async def insert_file_row(
|
|
||||||
analysis_run_id: str,
|
|
||||||
file_type: str,
|
|
||||||
file_name: str,
|
|
||||||
file_url: str,
|
|
||||||
size_bytes: int | None = None,
|
|
||||||
hospital_id: str | None = None,
|
|
||||||
) -> int:
|
|
||||||
return await execute(
|
|
||||||
"INSERT INTO file_data (analysis_run_id, hospital_id, file_type, file_name, file_url, size_bytes)"
|
|
||||||
" VALUES (%s, %s, %s, %s, %s, %s)",
|
|
||||||
(analysis_run_id, hospital_id, file_type, file_name, file_url, size_bytes),
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
# ── market_analysis ───────────────────────────────────────────────────────────
|
|
||||||
|
|
||||||
async def get_market_analysis(analysis_run_id: str) -> dict:
|
|
||||||
rows = await fetchall(
|
|
||||||
"SELECT analysis_type, data FROM market_analysis WHERE analysis_run_id = %s AND status = 'done'",
|
|
||||||
(analysis_run_id,),
|
|
||||||
)
|
|
||||||
return {
|
|
||||||
row["analysis_type"]: json.loads(row["data"]) if isinstance(row["data"], str) else row["data"]
|
|
||||||
for row in rows
|
|
||||||
}
|
|
||||||
|
|
@ -35,6 +35,7 @@ class ApifyClient:
|
||||||
|
|
||||||
async def fetch_instagram_profile(self, url: str) -> dict | None:
|
async def fetch_instagram_profile(self, url: str) -> dict | None:
|
||||||
username = urlparse(url).path.strip("/").split("/")[0] if "://" in url else url.lstrip("@")
|
username = urlparse(url).path.strip("/").split("/")[0] if "://" in url else url.lstrip("@")
|
||||||
|
print(username)
|
||||||
items = await self._run_actor("apify~instagram-profile-scraper", {"usernames": [username], "resultsLimit": 12})
|
items = await self._run_actor("apify~instagram-profile-scraper", {"usernames": [username], "resultsLimit": 12})
|
||||||
return items[0] if items else None
|
return items[0] if items else None
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -10,9 +10,7 @@ class ClinicResponse(BaseModel):
|
||||||
hospital_name: str
|
hospital_name: str
|
||||||
hospital_name_en: str | None
|
hospital_name_en: str | None
|
||||||
road_address: str | None
|
road_address: str | None
|
||||||
url: str | None
|
|
||||||
status: str
|
status: str
|
||||||
raw_data: dict | None
|
|
||||||
created_at: str
|
created_at: str
|
||||||
updated_at: str
|
updated_at: str
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -36,9 +36,22 @@ class DataSource(StrEnum):
|
||||||
SCRAPE = "scrape"
|
SCRAPE = "scrape"
|
||||||
|
|
||||||
|
|
||||||
|
class SourceType(StrEnum):
|
||||||
|
MAINPAGE = "mainpage"
|
||||||
|
INSTAGRAM = "instagram"
|
||||||
|
FACEBOOK = "facebook"
|
||||||
|
NAVER_BLOG = "naver_blog"
|
||||||
|
YOUTUBE = "youtube"
|
||||||
|
TIKTOK = "tiktok"
|
||||||
|
GANGNAM_UNNI = "gangnam_unni"
|
||||||
|
KAKAOTALK = "kakaotalk"
|
||||||
|
NAVER_CAFE = "naver_cafe"
|
||||||
|
|
||||||
|
|
||||||
class Language(StrEnum):
|
class Language(StrEnum):
|
||||||
KR = "KR"
|
KR = "KR"
|
||||||
EN = "EN"
|
EN = "EN"
|
||||||
|
WW = "WW"
|
||||||
|
|
||||||
|
|
||||||
class VideoType(StrEnum):
|
class VideoType(StrEnum):
|
||||||
|
|
|
||||||
|
|
@ -3,7 +3,10 @@ import logging
|
||||||
import os
|
import os
|
||||||
import re
|
import re
|
||||||
from datetime import datetime
|
from datetime import datetime
|
||||||
from common.db import fetchone, execute, fetch_raw, get_analysis_raw_data, save_analysis_report, get_market_analysis
|
from urllib.parse import urlparse
|
||||||
|
from common.db.run import select_run, update_run_report, update_run_plan
|
||||||
|
from common.db.source import select_run_raw_data, select_run_mainpage_url
|
||||||
|
from common.db.market import select_market
|
||||||
from integrations.llm.llm_service import LLMService
|
from integrations.llm.llm_service import LLMService
|
||||||
from integrations.llm.prompt import report_prompt, plan_prompt, youtube_diagnosis_prompt
|
from integrations.llm.prompt import report_prompt, plan_prompt, youtube_diagnosis_prompt
|
||||||
from integrations.llm.schemas.report import ReportOutput, ClinicSnapshot, YouTubeAudit
|
from integrations.llm.schemas.report import ReportOutput, ClinicSnapshot, YouTubeAudit
|
||||||
|
|
@ -14,18 +17,9 @@ logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
async def generate_report(analysis_run_id: str) -> ReportOutput:
|
async def generate_report(analysis_run_id: str) -> ReportOutput:
|
||||||
run = await fetchone(
|
raw = await select_run_raw_data(analysis_run_id)
|
||||||
"SELECT hospital_id FROM analysis_runs WHERE analysis_run_id = %s",
|
clinic = raw.get("mainpage") or {}
|
||||||
(analysis_run_id,),
|
market = await select_market(analysis_run_id)
|
||||||
)
|
|
||||||
clinic_row = await fetchone(
|
|
||||||
"SELECT raw_data FROM hospital_baseinfo WHERE hospital_id = %s",
|
|
||||||
(run["hospital_id"],),
|
|
||||||
)
|
|
||||||
raw_data = clinic_row["raw_data"] if clinic_row else None
|
|
||||||
clinic = json.loads(raw_data) if isinstance(raw_data, str) else (raw_data or {})
|
|
||||||
raw = await get_analysis_raw_data(analysis_run_id)
|
|
||||||
market = await get_market_analysis(analysis_run_id)
|
|
||||||
|
|
||||||
def _json(v) -> str | None:
|
def _json(v) -> str | None:
|
||||||
return json.dumps(v, ensure_ascii=False) if v else None
|
return json.dumps(v, ensure_ascii=False) if v else None
|
||||||
|
|
@ -43,27 +37,21 @@ async def generate_report(analysis_run_id: str) -> ReportOutput:
|
||||||
"market_trend": _json(market.get("trend")),
|
"market_trend": _json(market.get("trend")),
|
||||||
"market_target_audience": _json(market.get("target_audience")),
|
"market_target_audience": _json(market.get("target_audience")),
|
||||||
**{
|
**{
|
||||||
channel: _json(data)
|
source_type: _json(data)
|
||||||
for channel, data in raw.items()
|
for source_type, data in raw.items()
|
||||||
|
if source_type != "mainpage"
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
return await LLMService(provider="perplexity").generate(report_prompt, input_data)
|
return await LLMService(provider="perplexity").generate(report_prompt, input_data)
|
||||||
|
|
||||||
async def generate_plan(analysis_run_id: str) -> PlanOutput:
|
async def generate_plan(analysis_run_id: str) -> PlanOutput:
|
||||||
run = await fetchone(
|
run = await select_run(analysis_run_id)
|
||||||
"SELECT hospital_id, report_data FROM analysis_runs WHERE analysis_run_id = %s",
|
raw = await select_run_raw_data(analysis_run_id)
|
||||||
(analysis_run_id,),
|
clinic = raw.get("mainpage") or {}
|
||||||
)
|
|
||||||
clinic_row = await fetchone(
|
|
||||||
"SELECT raw_data FROM hospital_baseinfo WHERE hospital_id = %s",
|
|
||||||
(run["hospital_id"],),
|
|
||||||
)
|
|
||||||
raw_data = clinic_row["raw_data"] if clinic_row else None
|
|
||||||
clinic = json.loads(raw_data) if isinstance(raw_data, str) else (raw_data or {})
|
|
||||||
report_data = run["report_data"]
|
report_data = run["report_data"]
|
||||||
report = json.loads(report_data) if isinstance(report_data, str) else report_data
|
report = json.loads(report_data) if isinstance(report_data, str) else report_data
|
||||||
market = await get_market_analysis(analysis_run_id)
|
market = await select_market(analysis_run_id)
|
||||||
|
|
||||||
def _json(v) -> str | None:
|
def _json(v) -> str | None:
|
||||||
return json.dumps(v, ensure_ascii=False) if v else None
|
return json.dumps(v, ensure_ascii=False) if v else None
|
||||||
|
|
@ -93,7 +81,8 @@ def _build_clinic_snapshot(gangnam_unni: dict, hospital: dict) -> dict:
|
||||||
if gangnam_unni.get("name"): snapshot["name"] = gangnam_unni["name"]
|
if gangnam_unni.get("name"): snapshot["name"] = gangnam_unni["name"]
|
||||||
if hospital.get("clinicNameEn"): snapshot["name_en"] = hospital["clinicNameEn"]
|
if hospital.get("clinicNameEn"): snapshot["name_en"] = hospital["clinicNameEn"]
|
||||||
if hospital.get("phone"): snapshot["phone"] = hospital["phone"]
|
if hospital.get("phone"): snapshot["phone"] = hospital["phone"]
|
||||||
if hospital.get("domain"): snapshot["domain"] = hospital["domain"]
|
domain = hospital.get("domain") or urlparse(hospital.get("sourceUrl") or "").netloc
|
||||||
|
if domain: snapshot["domain"] = domain
|
||||||
if gangnam_unni.get("rating"): snapshot["overall_rating"] = gangnam_unni["rating"]
|
if gangnam_unni.get("rating"): snapshot["overall_rating"] = gangnam_unni["rating"]
|
||||||
if gangnam_unni.get("totalReviews"): snapshot["total_reviews"] = gangnam_unni["totalReviews"]
|
if gangnam_unni.get("totalReviews"): snapshot["total_reviews"] = gangnam_unni["totalReviews"]
|
||||||
if gangnam_unni.get("address"): snapshot["location"] = gangnam_unni["address"]
|
if gangnam_unni.get("address"): snapshot["location"] = gangnam_unni["address"]
|
||||||
|
|
@ -221,28 +210,17 @@ async def _build_youtube_audit(youtube: dict) -> dict:
|
||||||
|
|
||||||
|
|
||||||
async def _build_overrides(analysis_run_id: str) -> dict:
|
async def _build_overrides(analysis_run_id: str) -> dict:
|
||||||
run = await fetchone(
|
raw = await select_run_raw_data(analysis_run_id)
|
||||||
"SELECT hospital_id, instagram_data_id, facebook_data_id,"
|
if not raw:
|
||||||
" naver_blog_data_id, youtube_data_id, gangnam_unni_data_id"
|
|
||||||
" FROM analysis_runs WHERE analysis_run_id = %s",
|
|
||||||
(analysis_run_id,),
|
|
||||||
)
|
|
||||||
if not run:
|
|
||||||
return {}
|
return {}
|
||||||
|
|
||||||
hospital_row = await fetchone(
|
mainpage = raw.get("mainpage", {}) or {}
|
||||||
"SELECT raw_data, url FROM hospital_baseinfo WHERE hospital_id = %s",
|
instagram = raw.get("instagram", {}) or {}
|
||||||
(run["hospital_id"],),
|
facebook = raw.get("facebook", {}) or {}
|
||||||
)
|
youtube = raw.get("youtube", {}) or {}
|
||||||
hospital = json.loads(hospital_row["raw_data"]) if hospital_row and isinstance(hospital_row.get("raw_data"), str) else (hospital_row or {}).get("raw_data") or {}
|
gangnam_unni = raw.get("gangnam_unni", {}) or {}
|
||||||
hospital["domain"] = (hospital_row or {}).get("url") or ""
|
|
||||||
instagram = await fetch_raw("instagram_data", run["instagram_data_id"]) or {}
|
|
||||||
facebook = await fetch_raw("facebook_data", run["facebook_data_id"]) or {}
|
|
||||||
naver_blog = await fetch_raw("naver_blog_data", run["naver_blog_data_id"]) or {}
|
|
||||||
youtube = await fetch_raw("youtube_data", run["youtube_data_id"]) or {}
|
|
||||||
gangnam_unni = await fetch_raw("gangnam_unni_data", run["gangnam_unni_data_id"]) or {}
|
|
||||||
|
|
||||||
snapshot: dict = _build_clinic_snapshot(gangnam_unni, hospital)
|
snapshot: dict = _build_clinic_snapshot(gangnam_unni, mainpage)
|
||||||
yt_patch: dict = await _build_youtube_audit(youtube)
|
yt_patch: dict = await _build_youtube_audit(youtube)
|
||||||
|
|
||||||
# ── instagram ─────────────────────────────────────────────────────────────
|
# ── instagram ─────────────────────────────────────────────────────────────
|
||||||
|
|
@ -299,12 +277,7 @@ _MOCK_REPORT_PATH = os.path.join(os.path.dirname(__file__), "../mock/report_view
|
||||||
|
|
||||||
|
|
||||||
async def _is_mock(analysis_run_id: str) -> bool:
|
async def _is_mock(analysis_run_id: str) -> bool:
|
||||||
row = await fetchone(
|
url = await select_run_mainpage_url(analysis_run_id)
|
||||||
"SELECT h.url FROM analysis_runs ar JOIN hospital_baseinfo h USING (hospital_id)"
|
|
||||||
" WHERE ar.analysis_run_id = %s",
|
|
||||||
(analysis_run_id,),
|
|
||||||
)
|
|
||||||
url = (row or {}).get("url") or ""
|
|
||||||
return any(domain in url for domain in _MOCK_DOMAINS)
|
return any(domain in url for domain in _MOCK_DOMAINS)
|
||||||
|
|
||||||
|
|
||||||
|
|
@ -330,7 +303,7 @@ async def run_report_task(analysis_run_id: str) -> None:
|
||||||
else:
|
else:
|
||||||
result = await generate_report(analysis_run_id)
|
result = await generate_report(analysis_run_id)
|
||||||
result = _patch_report(result, await _build_overrides(analysis_run_id))
|
result = _patch_report(result, await _build_overrides(analysis_run_id))
|
||||||
await save_analysis_report(analysis_run_id, result.model_dump())
|
await update_run_report(analysis_run_id, result.model_dump())
|
||||||
logger.info("[report] done run=%s", analysis_run_id)
|
logger.info("[report] done run=%s", analysis_run_id)
|
||||||
|
|
||||||
|
|
||||||
|
|
@ -341,8 +314,5 @@ async def run_plan_task(analysis_run_id: str) -> None:
|
||||||
result = _load_mock_plan()
|
result = _load_mock_plan()
|
||||||
else:
|
else:
|
||||||
result = await generate_plan(analysis_run_id)
|
result = await generate_plan(analysis_run_id)
|
||||||
await execute(
|
await update_run_plan(analysis_run_id, result.model_dump())
|
||||||
"UPDATE analysis_runs SET plan_data = %s WHERE analysis_run_id = %s",
|
|
||||||
(json.dumps(result.model_dump(), ensure_ascii=False), analysis_run_id),
|
|
||||||
)
|
|
||||||
logger.info("[plan] done run=%s", analysis_run_id)
|
logger.info("[plan] done run=%s", analysis_run_id)
|
||||||
|
|
|
||||||
|
|
@ -1,96 +1,110 @@
|
||||||
import asyncio
|
import asyncio
|
||||||
import logging
|
import logging
|
||||||
from common.db import fetchone
|
from common.db.hospital import update_hospital_status, update_hospital
|
||||||
from common.db import (
|
from common.db.source import select_run_sources, update_raw_info_status, update_raw_info
|
||||||
set_instagram_status, save_instagram_raw_data,
|
|
||||||
set_facebook_status, save_facebook_raw_data,
|
|
||||||
set_naver_blog_status, save_naver_blog_raw_data,
|
|
||||||
set_youtube_status, save_youtube_raw_data,
|
|
||||||
set_gangnam_unni_status, save_gangnam_unni_raw_data,
|
|
||||||
execute, save_hospital_raw_data,
|
|
||||||
)
|
|
||||||
from common.utils import get_env
|
from common.utils import get_env
|
||||||
from integrations.apify import ApifyClient
|
from integrations.apify import ApifyClient
|
||||||
from integrations.naver import NaverClient
|
from integrations.naver import NaverClient
|
||||||
from integrations.youtube import YouTubeClient
|
from integrations.youtube import YouTubeClient
|
||||||
from integrations.firecrawl import FirecrawlClient
|
from integrations.firecrawl import FirecrawlClient
|
||||||
|
from models.status import SourceType
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
async def collect_instagram(analysis_run_id: str, row_id: int, url: str) -> None:
|
async def collect_instagram(analysis_run_id: str, info_id: int, url: str) -> None:
|
||||||
logger.info("[instagram] start run=%s url=%s", analysis_run_id, url)
|
logger.info("[instagram] start run=%s url=%s", analysis_run_id, url)
|
||||||
await set_instagram_status(row_id, "processing")
|
await update_raw_info_status(info_id, "processing")
|
||||||
data = await ApifyClient(get_env("APIFY_API_TOKEN")).get_instagram_profile(url)
|
data = await ApifyClient(get_env("APIFY_API_TOKEN")).get_instagram_profile(url)
|
||||||
await save_instagram_raw_data(row_id, data)
|
if data is None:
|
||||||
|
await update_raw_info_status(info_id, "failed")
|
||||||
|
logger.warning("[instagram] failed run=%s", analysis_run_id)
|
||||||
|
return
|
||||||
|
await update_raw_info(info_id, data)
|
||||||
logger.info("[instagram] done run=%s", analysis_run_id)
|
logger.info("[instagram] done run=%s", analysis_run_id)
|
||||||
|
|
||||||
|
|
||||||
async def collect_facebook(analysis_run_id: str, row_id: int, url: str) -> None:
|
async def collect_facebook(analysis_run_id: str, info_id: int, url: str) -> None:
|
||||||
logger.info("[facebook] start run=%s url=%s", analysis_run_id, url)
|
logger.info("[facebook] start run=%s url=%s", analysis_run_id, url)
|
||||||
await set_facebook_status(row_id, "processing")
|
await update_raw_info_status(info_id, "processing")
|
||||||
data = await ApifyClient(get_env("APIFY_API_TOKEN")).get_facebook_page(url)
|
data = await ApifyClient(get_env("APIFY_API_TOKEN")).get_facebook_page(url)
|
||||||
await save_facebook_raw_data(row_id, data)
|
if data is None:
|
||||||
|
await update_raw_info_status(info_id, "failed")
|
||||||
|
logger.warning("[facebook] failed run=%s", analysis_run_id)
|
||||||
|
return
|
||||||
|
await update_raw_info(info_id, data)
|
||||||
logger.info("[facebook] done run=%s", analysis_run_id)
|
logger.info("[facebook] done run=%s", analysis_run_id)
|
||||||
|
|
||||||
|
|
||||||
async def collect_naver_blog(analysis_run_id: str, row_id: int, url: str) -> None:
|
async def collect_naver_blog(analysis_run_id: str, info_id: int, url: str) -> None:
|
||||||
logger.info("[naver_blog] start run=%s url=%s", analysis_run_id, url)
|
logger.info("[naver_blog] start run=%s url=%s", analysis_run_id, url)
|
||||||
await set_naver_blog_status(row_id, "processing")
|
await update_raw_info_status(info_id, "processing")
|
||||||
data = await NaverClient(get_env("NAVER_CLIENT_ID"), get_env("NAVER_CLIENT_SECRET")).get_blog_rss(url)
|
data = await NaverClient(get_env("NAVER_CLIENT_ID"), get_env("NAVER_CLIENT_SECRET")).get_blog_rss(url)
|
||||||
await save_naver_blog_raw_data(row_id, data)
|
if data is None:
|
||||||
|
await update_raw_info_status(info_id, "failed")
|
||||||
|
logger.warning("[naver_blog] failed run=%s", analysis_run_id)
|
||||||
|
return
|
||||||
|
await update_raw_info(info_id, data)
|
||||||
logger.info("[naver_blog] done run=%s", analysis_run_id)
|
logger.info("[naver_blog] done run=%s", analysis_run_id)
|
||||||
|
|
||||||
|
|
||||||
async def collect_youtube(analysis_run_id: str, row_id: int, url: str) -> None:
|
async def collect_youtube(analysis_run_id: str, info_id: int, url: str) -> None:
|
||||||
logger.info("[youtube] start run=%s url=%s", analysis_run_id, url)
|
logger.info("[youtube] start run=%s url=%s", analysis_run_id, url)
|
||||||
await set_youtube_status(row_id, "processing")
|
await update_raw_info_status(info_id, "processing")
|
||||||
data = await YouTubeClient(get_env("YOUTUBE_API_KEY")).get_channel(url)
|
data = await YouTubeClient(get_env("YOUTUBE_API_KEY")).get_channel(url)
|
||||||
await save_youtube_raw_data(row_id, data)
|
if data is None:
|
||||||
|
await update_raw_info_status(info_id, "failed")
|
||||||
|
logger.warning("[youtube] failed run=%s", analysis_run_id)
|
||||||
|
return
|
||||||
|
await update_raw_info(info_id, data)
|
||||||
logger.info("[youtube] done run=%s", analysis_run_id)
|
logger.info("[youtube] done run=%s", analysis_run_id)
|
||||||
|
|
||||||
|
|
||||||
async def collect_gangnam_unni(analysis_run_id: str, row_id: int, url: str) -> None:
|
async def collect_gangnam_unni(analysis_run_id: str, info_id: int, url: str) -> None:
|
||||||
logger.info("[gangnam_unni] start run=%s url=%s", analysis_run_id, url)
|
logger.info("[gangnam_unni] start run=%s url=%s", analysis_run_id, url)
|
||||||
await set_gangnam_unni_status(row_id, "processing")
|
await update_raw_info_status(info_id, "processing")
|
||||||
data = await FirecrawlClient(get_env("FIRECRAWL_API_KEY")).get_gangnam_unni(url)
|
data = await FirecrawlClient(get_env("FIRECRAWL_API_KEY")).get_gangnam_unni(url)
|
||||||
await save_gangnam_unni_raw_data(row_id, data)
|
if data is None:
|
||||||
|
await update_raw_info_status(info_id, "failed")
|
||||||
|
logger.warning("[gangnam_unni] failed run=%s", analysis_run_id)
|
||||||
|
return
|
||||||
|
await update_raw_info(info_id, data)
|
||||||
logger.info("[gangnam_unni] done run=%s", analysis_run_id)
|
logger.info("[gangnam_unni] done run=%s", analysis_run_id)
|
||||||
|
|
||||||
|
|
||||||
async def collect_clinic_info(analysis_run_id: str, hospital_id: str, url: str) -> None:
|
async def collect_mainpage(analysis_run_id: str, info_id: int, hospital_id: str, url: str) -> None:
|
||||||
logger.info("[clinic] start run=%s url=%s", analysis_run_id, url)
|
logger.info("[mainpage] start run=%s url=%s", analysis_run_id, url)
|
||||||
await execute("UPDATE hospital_baseinfo SET status = 'processing' WHERE hospital_id = %s", (hospital_id,))
|
await update_raw_info_status(info_id, "processing")
|
||||||
|
await update_hospital_status(hospital_id, "processing")
|
||||||
data = await FirecrawlClient(get_env("FIRECRAWL_API_KEY")).fetch_clinic_info(url)
|
data = await FirecrawlClient(get_env("FIRECRAWL_API_KEY")).fetch_clinic_info(url)
|
||||||
await save_hospital_raw_data(hospital_id, data, analysis_run_id=analysis_run_id)
|
if data is None:
|
||||||
logger.info("[clinic] done run=%s", analysis_run_id)
|
await update_raw_info_status(info_id, "failed")
|
||||||
|
logger.warning("[mainpage] failed run=%s", analysis_run_id)
|
||||||
|
return
|
||||||
|
await update_raw_info(info_id, data)
|
||||||
|
await update_hospital(hospital_id, data, analysis_run_id=analysis_run_id)
|
||||||
|
logger.info("[mainpage] done run=%s", analysis_run_id)
|
||||||
|
|
||||||
|
|
||||||
async def collect_all(
|
async def collect_all(analysis_run_id: str, hospital_id: str) -> None:
|
||||||
analysis_run_id: str,
|
rows = await select_run_sources(analysis_run_id)
|
||||||
hospital_id: str,
|
|
||||||
instagram_id: int | None = None,
|
|
||||||
facebook_id: int | None = None,
|
|
||||||
naver_blog_id: int | None = None,
|
|
||||||
youtube_id: int | None = None,
|
|
||||||
gangnam_unni_id: int | None = None,
|
|
||||||
) -> None:
|
|
||||||
async def _url(table: str, row_id: int) -> str:
|
|
||||||
row = await fetchone(f"SELECT url FROM {table} WHERE id = %s", (row_id,))
|
|
||||||
return row["url"] if row else ""
|
|
||||||
|
|
||||||
hospital = await fetchone("SELECT url FROM hospital_baseinfo WHERE hospital_id = %s", (hospital_id,))
|
_collectors = {
|
||||||
tasks = [collect_clinic_info(analysis_run_id, hospital_id, hospital["url"])]
|
SourceType.INSTAGRAM: collect_instagram,
|
||||||
|
SourceType.FACEBOOK: collect_facebook,
|
||||||
|
SourceType.NAVER_BLOG: collect_naver_blog,
|
||||||
|
SourceType.YOUTUBE: collect_youtube,
|
||||||
|
SourceType.GANGNAM_UNNI: collect_gangnam_unni,
|
||||||
|
}
|
||||||
|
|
||||||
if instagram_id:
|
tasks = []
|
||||||
tasks.append(collect_instagram(analysis_run_id, instagram_id, await _url("instagram_data", instagram_id)))
|
for row in rows:
|
||||||
if facebook_id:
|
info_id = row["info_id"]
|
||||||
tasks.append(collect_facebook(analysis_run_id, facebook_id, await _url("facebook_data", facebook_id)))
|
source_type = row["source_type"]
|
||||||
if naver_blog_id:
|
url = row["url"]
|
||||||
tasks.append(collect_naver_blog(analysis_run_id, naver_blog_id, await _url("naver_blog_data", naver_blog_id)))
|
if source_type == SourceType.MAINPAGE:
|
||||||
if youtube_id:
|
tasks.append(collect_mainpage(analysis_run_id, info_id, hospital_id, url))
|
||||||
tasks.append(collect_youtube(analysis_run_id, youtube_id, await _url("youtube_data", youtube_id)))
|
elif source_type in _collectors:
|
||||||
if gangnam_unni_id:
|
tasks.append(_collectors[source_type](analysis_run_id, info_id, url))
|
||||||
tasks.append(collect_gangnam_unni(analysis_run_id, gangnam_unni_id, await _url("gangnam_unni_data", gangnam_unni_id)))
|
|
||||||
|
|
||||||
await asyncio.gather(*tasks, return_exceptions=True)
|
await asyncio.gather(*tasks, return_exceptions=True)
|
||||||
|
|
|
||||||
|
|
@ -2,7 +2,8 @@ import logging
|
||||||
|
|
||||||
from fastapi import HTTPException, UploadFile
|
from fastapi import HTTPException, UploadFile
|
||||||
|
|
||||||
from common.db import execute, fetchall, fetchone, insert_file_row
|
from common.db.run import select_run
|
||||||
|
from common.db.file_data import insert_file, select_run_files, select_file, delete_file
|
||||||
from integrations.azure_blob import AzureBlobUploader
|
from integrations.azure_blob import AzureBlobUploader
|
||||||
from models.file import FileListItem, FileType, FileUploadResponse
|
from models.file import FileListItem, FileType, FileUploadResponse
|
||||||
|
|
||||||
|
|
@ -31,10 +32,7 @@ async def upload_analysis_file(
|
||||||
content_type: str | None = None,
|
content_type: str | None = None,
|
||||||
) -> tuple[int, str]:
|
) -> tuple[int, str]:
|
||||||
"""analysis_run에 딸린 파일 업로드. Blob 업로드 + file_data row 생성. (file_id, url) 반환."""
|
"""analysis_run에 딸린 파일 업로드. Blob 업로드 + file_data row 생성. (file_id, url) 반환."""
|
||||||
run = await fetchone(
|
run = await select_run(analysis_run_id)
|
||||||
"SELECT hospital_id FROM analysis_runs WHERE analysis_run_id = %s",
|
|
||||||
(analysis_run_id,),
|
|
||||||
)
|
|
||||||
if not run:
|
if not run:
|
||||||
raise HTTPException(status_code=404, detail="analysis_run not found")
|
raise HTTPException(status_code=404, detail="analysis_run not found")
|
||||||
hospital_id = run["hospital_id"]
|
hospital_id = run["hospital_id"]
|
||||||
|
|
@ -47,7 +45,7 @@ async def upload_analysis_file(
|
||||||
content_type=content_type,
|
content_type=content_type,
|
||||||
)
|
)
|
||||||
|
|
||||||
file_id = await insert_file_row(
|
file_id = await insert_file(
|
||||||
analysis_run_id=analysis_run_id,
|
analysis_run_id=analysis_run_id,
|
||||||
hospital_id=hospital_id,
|
hospital_id=hospital_id,
|
||||||
file_type=file_type,
|
file_type=file_type,
|
||||||
|
|
@ -61,12 +59,7 @@ async def upload_analysis_file(
|
||||||
|
|
||||||
async def list_analysis_files(analysis_run_id: str) -> list[dict]:
|
async def list_analysis_files(analysis_run_id: str) -> list[dict]:
|
||||||
"""analysis_run에 딸린 (삭제 안 된) 파일 목록."""
|
"""analysis_run에 딸린 (삭제 안 된) 파일 목록."""
|
||||||
return await fetchall(
|
return await select_run_files(analysis_run_id)
|
||||||
"SELECT id, file_type, file_name, file_url, size_bytes, created_at FROM file_data"
|
|
||||||
" WHERE analysis_run_id = %s AND is_deleted = FALSE"
|
|
||||||
" ORDER BY created_at DESC",
|
|
||||||
(analysis_run_id,),
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
async def handle_analysis_file_upload(
|
async def handle_analysis_file_upload(
|
||||||
|
|
@ -102,7 +95,7 @@ async def handle_analysis_file_upload(
|
||||||
|
|
||||||
async def get_analysis_files_response(analysis_run_id: str) -> list[FileListItem]:
|
async def get_analysis_files_response(analysis_run_id: str) -> list[FileListItem]:
|
||||||
"""run 존재 확인 + 응답 모델 생성."""
|
"""run 존재 확인 + 응답 모델 생성."""
|
||||||
if not await fetchone("SELECT 1 FROM analysis_runs WHERE analysis_run_id = %s", (analysis_run_id,)):
|
if not await select_run(analysis_run_id):
|
||||||
raise HTTPException(status_code=404, detail="analysis_run not found")
|
raise HTTPException(status_code=404, detail="analysis_run not found")
|
||||||
rows = await list_analysis_files(analysis_run_id)
|
rows = await list_analysis_files(analysis_run_id)
|
||||||
return [FileListItem(**{**r, "created_at": str(r["created_at"])}) for r in rows]
|
return [FileListItem(**{**r, "created_at": str(r["created_at"])}) for r in rows]
|
||||||
|
|
@ -110,14 +103,8 @@ async def get_analysis_files_response(analysis_run_id: str) -> list[FileListItem
|
||||||
|
|
||||||
async def soft_delete_analysis_file(analysis_run_id: str, file_id: int) -> None:
|
async def soft_delete_analysis_file(analysis_run_id: str, file_id: int) -> None:
|
||||||
"""analysis_run에 딸린 파일을 소프트 삭제. 멱등성 보장."""
|
"""analysis_run에 딸린 파일을 소프트 삭제. 멱등성 보장."""
|
||||||
row = await fetchone(
|
row = await select_file(file_id, analysis_run_id)
|
||||||
"SELECT id FROM file_data WHERE id = %s AND analysis_run_id = %s",
|
|
||||||
(file_id, analysis_run_id),
|
|
||||||
)
|
|
||||||
if not row:
|
if not row:
|
||||||
raise HTTPException(status_code=404, detail="file not found")
|
raise HTTPException(status_code=404, detail="file not found")
|
||||||
await execute(
|
await delete_file(file_id)
|
||||||
"UPDATE file_data SET is_deleted = TRUE WHERE id = %s AND is_deleted = FALSE",
|
|
||||||
(file_id,),
|
|
||||||
)
|
|
||||||
logger.info("soft-deleted analysis file run=%s file_id=%s", analysis_run_id, file_id)
|
logger.info("soft-deleted analysis file run=%s file_id=%s", analysis_run_id, file_id)
|
||||||
|
|
@ -1,7 +1,9 @@
|
||||||
import asyncio
|
import asyncio
|
||||||
import json
|
|
||||||
import logging
|
import logging
|
||||||
from common.db import fetchone, execute
|
from common.db.run import select_run
|
||||||
|
from common.db.hospital import select_hospital
|
||||||
|
from common.db.market import upsert_market_status, upsert_market_result
|
||||||
|
from common.db.source import select_run_raw_data
|
||||||
from integrations.llm.llm_service import LLMService
|
from integrations.llm.llm_service import LLMService
|
||||||
from integrations.llm.prompt import (
|
from integrations.llm.prompt import (
|
||||||
market_competitors_prompt,
|
market_competitors_prompt,
|
||||||
|
|
@ -18,49 +20,27 @@ _TYPES = ["competitors", "keywords", "trend", "target_audience"]
|
||||||
async def _save(analysis_run_id: str, analysis_type: str, result, exc: Exception | None) -> None:
|
async def _save(analysis_run_id: str, analysis_type: str, result, exc: Exception | None) -> None:
|
||||||
if exc:
|
if exc:
|
||||||
logger.warning("[market] %s failed run=%s: %s", analysis_type, analysis_run_id, exc)
|
logger.warning("[market] %s failed run=%s: %s", analysis_type, analysis_run_id, exc)
|
||||||
await execute(
|
await upsert_market_status(analysis_run_id, analysis_type, "failed")
|
||||||
"INSERT INTO market_analysis (analysis_run_id, analysis_type, status)"
|
|
||||||
" VALUES (%s, %s, 'failed')"
|
|
||||||
" ON DUPLICATE KEY UPDATE status = 'failed'",
|
|
||||||
(analysis_run_id, analysis_type),
|
|
||||||
)
|
|
||||||
else:
|
else:
|
||||||
await execute(
|
await upsert_market_result(analysis_run_id, analysis_type, result.model_dump())
|
||||||
"INSERT INTO market_analysis (analysis_run_id, analysis_type, status, data)"
|
|
||||||
" VALUES (%s, %s, 'done', %s)"
|
|
||||||
" ON DUPLICATE KEY UPDATE status = 'done', data = VALUES(data)",
|
|
||||||
(analysis_run_id, analysis_type, json.dumps(result.model_dump(), ensure_ascii=False)),
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
async def run_market_analysis(analysis_run_id: str) -> None:
|
async def run_market_analysis(analysis_run_id: str) -> None:
|
||||||
logger.info("[market] start run=%s", analysis_run_id)
|
logger.info("[market] start run=%s", analysis_run_id)
|
||||||
|
|
||||||
run = await fetchone(
|
run = await select_run(analysis_run_id)
|
||||||
"SELECT hospital_id FROM analysis_runs WHERE analysis_run_id = %s",
|
clinic = await select_hospital(run["hospital_id"])
|
||||||
(analysis_run_id,),
|
raw = await select_run_raw_data(analysis_run_id)
|
||||||
)
|
mainpage = raw.get("mainpage") or {}
|
||||||
clinic = await fetchone(
|
|
||||||
"SELECT hospital_name, road_address, raw_data FROM hospital_baseinfo WHERE hospital_id = %s",
|
|
||||||
(run["hospital_id"],),
|
|
||||||
)
|
|
||||||
|
|
||||||
raw_data = clinic["raw_data"]
|
clinic_name = (clinic or {}).get("hospital_name") or ""
|
||||||
clinic_data = json.loads(raw_data) if isinstance(raw_data, str) else (raw_data or {})
|
address = (clinic or {}).get("road_address") or ""
|
||||||
|
services = mainpage.get("services", [])
|
||||||
clinic_name = clinic["hospital_name"] or ""
|
|
||||||
address = clinic["road_address"] or ""
|
|
||||||
services = clinic_data.get("services", [])
|
|
||||||
services_str = ", ".join(services[:3])
|
services_str = ", ".join(services[:3])
|
||||||
primary_service = services[0] if services else ""
|
primary_service = services[0] if services else ""
|
||||||
|
|
||||||
for analysis_type in _TYPES:
|
for analysis_type in _TYPES:
|
||||||
await execute(
|
await upsert_market_status(analysis_run_id, analysis_type, "processing")
|
||||||
"INSERT INTO market_analysis (analysis_run_id, analysis_type, status)"
|
|
||||||
" VALUES (%s, %s, 'processing')"
|
|
||||||
" ON DUPLICATE KEY UPDATE status = 'processing'",
|
|
||||||
(analysis_run_id, analysis_type),
|
|
||||||
)
|
|
||||||
|
|
||||||
llm = LLMService(provider="perplexity")
|
llm = LLMService(provider="perplexity")
|
||||||
results = await asyncio.gather(
|
results = await asyncio.gather(
|
||||||
|
|
|
||||||
|
|
@ -1,5 +1,5 @@
|
||||||
import logging
|
import logging
|
||||||
from common.db import fetchone, execute
|
from common.db.run import select_run, update_run_status
|
||||||
from models.status import AnalysisStatus
|
from models.status import AnalysisStatus
|
||||||
from services.collect import collect_all
|
from services.collect import collect_all
|
||||||
from services.market import run_market_analysis
|
from services.market import run_market_analysis
|
||||||
|
|
@ -12,41 +12,19 @@ async def run_pipeline(analysis_run_id: str) -> None:
|
||||||
logger.info("[pipeline] start run=%s", analysis_run_id)
|
logger.info("[pipeline] start run=%s", analysis_run_id)
|
||||||
|
|
||||||
# ── 1. Collect ──────────────────────────────────────────────────────────
|
# ── 1. Collect ──────────────────────────────────────────────────────────
|
||||||
run = await fetchone(
|
run = await select_run(analysis_run_id)
|
||||||
"SELECT hospital_id, instagram_data_id, facebook_data_id,"
|
await collect_all(analysis_run_id, hospital_id=run["hospital_id"])
|
||||||
" naver_blog_data_id, youtube_data_id, gangnam_unni_data_id"
|
|
||||||
" FROM analysis_runs WHERE analysis_run_id = %s",
|
|
||||||
(analysis_run_id,),
|
|
||||||
)
|
|
||||||
await collect_all(
|
|
||||||
analysis_run_id,
|
|
||||||
hospital_id=run["hospital_id"],
|
|
||||||
instagram_id=run["instagram_data_id"],
|
|
||||||
facebook_id=run["facebook_data_id"],
|
|
||||||
naver_blog_id=run["naver_blog_data_id"],
|
|
||||||
youtube_id=run["youtube_data_id"],
|
|
||||||
gangnam_unni_id=run["gangnam_unni_data_id"],
|
|
||||||
)
|
|
||||||
|
|
||||||
# ── 2. Market ────────────────────────────────────────────────────────────
|
# ── 2. Market ────────────────────────────────────────────────────────────
|
||||||
await execute(
|
await update_run_status(analysis_run_id, AnalysisStatus.ANALYZING)
|
||||||
"UPDATE analysis_runs SET status = %s WHERE analysis_run_id = %s",
|
|
||||||
(AnalysisStatus.ANALYZING, analysis_run_id),
|
|
||||||
)
|
|
||||||
await run_market_analysis(analysis_run_id)
|
await run_market_analysis(analysis_run_id)
|
||||||
|
|
||||||
# ── 3. Report ────────────────────────────────────────────────────────────
|
# ── 3. Report ────────────────────────────────────────────────────────────
|
||||||
await run_report_task(analysis_run_id)
|
await run_report_task(analysis_run_id)
|
||||||
|
|
||||||
# ── 4. Plan ──────────────────────────────────────────────────────────────
|
# ── 4. Plan ──────────────────────────────────────────────────────────────
|
||||||
await execute(
|
await update_run_status(analysis_run_id, AnalysisStatus.PLANNING)
|
||||||
"UPDATE analysis_runs SET status = %s WHERE analysis_run_id = %s",
|
|
||||||
(AnalysisStatus.PLANNING, analysis_run_id),
|
|
||||||
)
|
|
||||||
await run_plan_task(analysis_run_id)
|
await run_plan_task(analysis_run_id)
|
||||||
|
|
||||||
await execute(
|
await update_run_status(analysis_run_id, AnalysisStatus.COMPLETED)
|
||||||
"UPDATE analysis_runs SET status = %s WHERE analysis_run_id = %s",
|
|
||||||
(AnalysisStatus.COMPLETED, analysis_run_id),
|
|
||||||
)
|
|
||||||
logger.info("[pipeline] done run=%s", analysis_run_id)
|
logger.info("[pipeline] done run=%s", analysis_run_id)
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue