diff --git a/SQL/db_create_2.sql b/SQL/db_create_2.sql new file mode 100644 index 0000000..d1d5c52 --- /dev/null +++ b/SQL/db_create_2.sql @@ -0,0 +1,134 @@ +-- user_info +CREATE TABLE user_info +( + `user_id` INT NOT NULL AUTO_INCREMENT, + `username` VARCHAR(50) NOT NULL, + `password` VARCHAR(50) NOT NULL, + `created_at` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, + `updated_at` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, + PRIMARY KEY (user_id) +); + + +-- hospital_baseinfo +CREATE TABLE hospital_baseinfo +( + `hospital_id` CHAR(36) NOT NULL, + `owner_user_id` INT NOT NULL, + `hospital_name` VARCHAR(50) NOT NULL, + `hospital_name_en` VARCHAR(50) NULL, + `brn` VARCHAR(50) NOT NULL, + `road_address` VARCHAR(100) NULL, + `site_address` VARCHAR(100) NULL, + `status` VARCHAR(20) NOT NULL DEFAULT 'start', + `created_at` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, + `updated_at` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, + PRIMARY KEY (hospital_id) +); + +CREATE INDEX IX_hospital_baseinfo_1 ON hospital_baseinfo (owner_user_id); + + +-- remote_source: 병원별 채널 소스 정보 (instagram/facebook/naver_blog/youtube/gangnam_unni 등) +CREATE TABLE remote_source +( + `source_id` INT NOT NULL AUTO_INCREMENT, + `hospital_id` CHAR(36) NOT NULL, + `source_type` VARCHAR(50) NOT NULL, + `url` VARCHAR(500) NOT NULL, + `created_at` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, + PRIMARY KEY (source_id) +); + +CREATE INDEX IX_remote_source_1 ON remote_source (hospital_id); +CREATE INDEX IX_remote_source_2 ON remote_source (hospital_id, source_type); + + +-- analysis_runs +CREATE TABLE analysis_runs +( + `analysis_run_id` CHAR(36) NOT NULL, + `hospital_id` CHAR(36) NOT NULL, + `owner_user_id` INT NOT NULL DEFAULT 0, + `status` VARCHAR(50) NOT NULL DEFAULT 'discovering', + `report_data` JSON NULL, + `plan_data` JSON NULL, + `created_at` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, + `updated_at` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, + PRIMARY KEY (analysis_run_id) +); + +CREATE INDEX IX_analysis_runs_1 ON analysis_runs (hospital_id); +CREATE INDEX IX_analysis_runs_2 ON analysis_runs (owner_user_id); + + +-- raw_info: 분석 실행별 수집 원시 데이터 +CREATE TABLE raw_info +( + `info_id` INT NOT NULL AUTO_INCREMENT, + `source_id` INT NOT NULL, + `analysis_run_id` CHAR(36) NOT NULL, + `data_tag` VARCHAR(50) NOT NULL DEFAULT 'default', + `status` VARCHAR(20) NOT NULL DEFAULT 'start', + `raw_data` JSON NULL, + `created_at` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, + `updated_at` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, + PRIMARY KEY (info_id) +); + +CREATE INDEX IX_raw_info_1 ON raw_info (analysis_run_id); +CREATE INDEX IX_raw_info_2 ON raw_info (source_id); + + +-- file_data +CREATE TABLE file_data +( + `id` INT NOT NULL AUTO_INCREMENT, + `analysis_run_id` CHAR(36) NOT NULL, + `hospital_id` CHAR(36) NULL, + `file_type` ENUM('image','video','audio','document','file') NOT NULL DEFAULT 'file', + `file_name` VARCHAR(255) NOT NULL, + `file_url` VARCHAR(2048) NOT NULL, + `size_bytes` BIGINT NULL, + `is_deleted` BOOLEAN NOT NULL DEFAULT FALSE, + `created_at` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, + PRIMARY KEY (id), + INDEX IX_file_data_1 (analysis_run_id, is_deleted) +); + + +-- hospital_history +CREATE TABLE hospital_history +( + `id` INT NOT NULL AUTO_INCREMENT, + `hospital_id` CHAR(36) NOT NULL, + `owner_user_id` INT NOT NULL, + `hospital_name` VARCHAR(50) NOT NULL, + `hospital_name_en` VARCHAR(50) NULL, + `brn` VARCHAR(50) NOT NULL, + `road_address` VARCHAR(100) NULL, + `site_address` VARCHAR(100) NULL, + `status` VARCHAR(20) NOT NULL, + `analysis_run_id` CHAR(36) NULL, + `created_at` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, + PRIMARY KEY (id) +); + +CREATE INDEX IX_hospital_history_1 ON hospital_history (hospital_id); +CREATE INDEX IX_hospital_history_2 ON hospital_history (analysis_run_id); + + +-- market_analysis +CREATE TABLE market_analysis +( + `id` INT NOT NULL AUTO_INCREMENT, + `analysis_run_id` CHAR(36) NOT NULL, + `analysis_type` VARCHAR(50) NOT NULL, + `status` VARCHAR(20) NOT NULL DEFAULT 'start', + `data` JSON NULL, + `created_at` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, + PRIMARY KEY (id), + UNIQUE KEY UQ_market_analysis (analysis_run_id, analysis_type) +); + +CREATE INDEX IX_market_analysis_1 ON market_analysis (analysis_run_id); diff --git a/app/common/db2.py b/app/common/db2.py new file mode 100644 index 0000000..9a317a0 --- /dev/null +++ b/app/common/db2.py @@ -0,0 +1,236 @@ +import json +import os +import aiomysql +from common.utils import get_env + +_pool: aiomysql.Pool | None = None + + +async def get_pool() -> aiomysql.Pool: + global _pool + if _pool is None: + _pool = await aiomysql.create_pool( + host=get_env("MYSQL_HOST"), + port=int(os.getenv("MYSQL_PORT", "3306")), + user=get_env("MYSQL_USER"), + password=get_env("MYSQL_PASSWORD"), + db=get_env("MYSQL_DB"), + charset="utf8mb4", + minsize=0, + maxsize=30, + connect_timeout=10, + ) + return _pool + + +async def execute(sql: str, args: tuple = ()) -> int: + pool = await get_pool() + async with pool.acquire() as conn: + try: + async with conn.cursor() as cur: + await cur.execute(sql, args) + await conn.commit() + return cur.lastrowid + finally: + conn.close() + + +async def fetchone(sql: str, args: tuple = ()) -> dict | None: + pool = await get_pool() + async with pool.acquire() as conn: + try: + async with conn.cursor(aiomysql.DictCursor) as cur: + await cur.execute(sql, args) + return await cur.fetchone() + finally: + conn.close() + + +async def fetchall(sql: str, args: tuple = ()) -> list[dict]: + pool = await get_pool() + async with pool.acquire() as conn: + try: + async with conn.cursor(aiomysql.DictCursor) as cur: + await cur.execute(sql, args) + return await cur.fetchall() + finally: + conn.close() + + +# ── remote_source ───────────────────────────────────────────────────────────── + +async def insert_source(hospital_id: str, source_type: str, url: str) -> int: + return await execute( + "INSERT INTO remote_source (hospital_id, source_type, url) VALUES (%s, %s, %s)", + (hospital_id, source_type, url), + ) + + +# ── raw_info ────────────────────────────────────────────────────────────────── + +async def insert_raw_info(source_id: int, analysis_run_id: str, data_tag: str = "default") -> int: + return await execute( + "INSERT INTO raw_info (source_id, analysis_run_id, data_tag) VALUES (%s, %s, %s)", + (source_id, analysis_run_id, data_tag), + ) + + +async def set_raw_info_status(info_id: int, status: str) -> None: + await execute("UPDATE raw_info SET status = %s WHERE info_id = %s", (status, info_id)) + + +async def save_raw_info(info_id: int, data: dict) -> None: + await execute( + "UPDATE raw_info SET raw_data = %s, status = 'done' WHERE info_id = %s", + (json.dumps(data, ensure_ascii=False), info_id), + ) + + +async def fetch_raw(info_id: int | None) -> dict | None: + if info_id is None: + return None + row = await fetchone("SELECT raw_data FROM raw_info WHERE info_id = %s", (info_id,)) + if not row or not row["raw_data"]: + return None + return json.loads(row["raw_data"]) if isinstance(row["raw_data"], str) else row["raw_data"] + + +async def is_done(info_id: int | None) -> bool: + if info_id is None: + return True + r = await fetchone("SELECT status FROM raw_info WHERE info_id = %s", (info_id,)) + return r["status"] == "done" + + +async def get_analysis_raw_data(analysis_run_id: str) -> dict: + rows = await fetchall( + "SELECT rs.source_type, ri.raw_data" + " FROM raw_info ri JOIN remote_source rs USING (source_id)" + " WHERE ri.analysis_run_id = %s", + (analysis_run_id,), + ) + result: dict = {} + for row in rows: + raw = row["raw_data"] + result[row["source_type"]] = json.loads(raw) if isinstance(raw, str) else raw + return result + + +# ── analysis_runs ───────────────────────────────────────────────────────────── + +async def insert_analysis_run( + analysis_run_id: str, + hospital_id: str, + owner_user_id: int, +) -> str: + await execute( + "INSERT INTO analysis_runs (analysis_run_id, hospital_id, owner_user_id)" + " VALUES (%s, %s, %s)", + (analysis_run_id, hospital_id, owner_user_id), + ) + return analysis_run_id + + +async def save_analysis_report(analysis_run_id: str, data: dict) -> None: + await execute( + "UPDATE analysis_runs SET report_data = %s WHERE analysis_run_id = %s", + (json.dumps(data, ensure_ascii=False), analysis_run_id), + ) + + +# ── hospital_baseinfo ───────────────────────────────────────────────────────── + +async def _insert_hospital_history(hospital_id: str, analysis_run_id: str | None) -> None: + row = await fetchone( + "SELECT owner_user_id, hospital_name, hospital_name_en, brn, road_address, site_address, status" + " FROM hospital_baseinfo WHERE hospital_id = %s", + (hospital_id,), + ) + if not row: + return + await execute( + "INSERT INTO hospital_history" + " (hospital_id, owner_user_id, hospital_name, hospital_name_en, brn, road_address, site_address, status, analysis_run_id)" + " VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)", + ( + hospital_id, + row["owner_user_id"], + row["hospital_name"], + row["hospital_name_en"], + row["brn"], + row["road_address"], + row["site_address"], + row["status"], + analysis_run_id, + ), + ) + + +async def insert_hospital( + hospital_id: str, + name: str, + name_en: str | None = None, + road_address: str | None = None, + site_address: str | None = None, + owner_user_id: int = 0, + brn: str = "", +) -> dict: + await execute( + "INSERT INTO hospital_baseinfo (hospital_id, hospital_name, hospital_name_en, road_address, site_address, status, owner_user_id, brn)" + " VALUES (%s, %s, %s, %s, %s, 'done', %s, %s)", + (hospital_id, name, name_en, road_address, site_address, owner_user_id, brn), + ) + await _insert_hospital_history(hospital_id, analysis_run_id=None) + return await fetchone( + "SELECT created_at FROM hospital_baseinfo WHERE hospital_id = %s", + (hospital_id,), + ) + + +async def update_hospital_info(hospital_id: str, data: dict, analysis_run_id: str | None = None) -> None: + """clinic 스크래핑 후 hospital_baseinfo의 기본 필드 업데이트.""" + await execute( + "UPDATE hospital_baseinfo" + " SET status = 'done'," + " hospital_name = COALESCE(%s, hospital_name)," + " hospital_name_en = COALESCE(%s, hospital_name_en)," + " road_address = COALESCE(%s, road_address)" + " WHERE hospital_id = %s", + ( + data.get("clinicName"), + data.get("clinicNameEn"), + data.get("address"), + hospital_id, + ), + ) + await _insert_hospital_history(hospital_id, analysis_run_id) + + +# ── file_data ───────────────────────────────────────────────────────────────── + +async def insert_file_row( + analysis_run_id: str, + file_type: str, + file_name: str, + file_url: str, + size_bytes: int | None = None, + hospital_id: str | None = None, +) -> int: + return await execute( + "INSERT INTO file_data (analysis_run_id, hospital_id, file_type, file_name, file_url, size_bytes)" + " VALUES (%s, %s, %s, %s, %s, %s)", + (analysis_run_id, hospital_id, file_type, file_name, file_url, size_bytes), + ) + + +# ── market_analysis ─────────────────────────────────────────────────────────── + +async def get_market_analysis(analysis_run_id: str) -> dict: + rows = await fetchall( + "SELECT analysis_type, data FROM market_analysis WHERE analysis_run_id = %s AND status = 'done'", + (analysis_run_id,), + ) + return { + row["analysis_type"]: json.loads(row["data"]) if isinstance(row["data"], str) else row["data"] + for row in rows + }