import json import os import aiomysql from common.utils import get_env _pool: aiomysql.Pool | None = None async def get_pool() -> aiomysql.Pool: global _pool if _pool is None: _pool = await aiomysql.create_pool( host=get_env("MYSQL_HOST"), port=int(os.getenv("MYSQL_PORT", "3306")), user=get_env("MYSQL_USER"), password=get_env("MYSQL_PASSWORD"), db=get_env("MYSQL_DB"), charset="utf8mb4", minsize=0, maxsize=30, connect_timeout=10, ) return _pool async def execute(sql: str, args: tuple = ()) -> int: pool = await get_pool() async with pool.acquire() as conn: try: async with conn.cursor() as cur: await cur.execute(sql, args) await conn.commit() return cur.lastrowid finally: conn.close() async def fetchone(sql: str, args: tuple = ()) -> dict | None: pool = await get_pool() async with pool.acquire() as conn: try: async with conn.cursor(aiomysql.DictCursor) as cur: await cur.execute(sql, args) return await cur.fetchone() finally: conn.close() async def fetchall(sql: str, args: tuple = ()) -> list[dict]: pool = await get_pool() async with pool.acquire() as conn: try: async with conn.cursor(aiomysql.DictCursor) as cur: await cur.execute(sql, args) return await cur.fetchall() finally: conn.close() # ── remote_source ───────────────────────────────────────────────────────────── async def insert_source(hospital_id: str, source_type: str, url: str) -> int: return await execute( "INSERT INTO remote_source (hospital_id, source_type, url) VALUES (%s, %s, %s)", (hospital_id, source_type, url), ) # ── raw_info ────────────────────────────────────────────────────────────────── async def insert_raw_info(source_id: int, analysis_run_id: str, data_tag: str = "default") -> int: return await execute( "INSERT INTO raw_info (source_id, analysis_run_id, data_tag) VALUES (%s, %s, %s)", (source_id, analysis_run_id, data_tag), ) async def set_raw_info_status(info_id: int, status: str) -> None: await execute("UPDATE raw_info SET status = %s WHERE info_id = %s", (status, info_id)) async def save_raw_info(info_id: int, data: dict) -> None: await execute( "UPDATE raw_info SET raw_data = %s, status = 'done' WHERE info_id = %s", (json.dumps(data, ensure_ascii=False), info_id), ) async def fetch_raw(info_id: int | None) -> dict | None: if info_id is None: return None row = await fetchone("SELECT raw_data FROM raw_info WHERE info_id = %s", (info_id,)) if not row or not row["raw_data"]: return None return json.loads(row["raw_data"]) if isinstance(row["raw_data"], str) else row["raw_data"] async def is_done(info_id: int | None) -> bool: if info_id is None: return True r = await fetchone("SELECT status FROM raw_info WHERE info_id = %s", (info_id,)) return r["status"] == "done" async def get_analysis_raw_data(analysis_run_id: str) -> dict: rows = await fetchall( "SELECT rs.source_type, ri.raw_data" " FROM raw_info ri JOIN remote_source rs USING (source_id)" " WHERE ri.analysis_run_id = %s", (analysis_run_id,), ) result: dict = {} for row in rows: raw = row["raw_data"] result[row["source_type"]] = json.loads(raw) if isinstance(raw, str) else raw return result # ── analysis_runs ───────────────────────────────────────────────────────────── async def insert_analysis_run( analysis_run_id: str, hospital_id: str, owner_user_id: int, ) -> str: await execute( "INSERT INTO analysis_runs (analysis_run_id, hospital_id, owner_user_id)" " VALUES (%s, %s, %s)", (analysis_run_id, hospital_id, owner_user_id), ) return analysis_run_id async def save_analysis_report(analysis_run_id: str, data: dict) -> None: await execute( "UPDATE analysis_runs SET report_data = %s WHERE analysis_run_id = %s", (json.dumps(data, ensure_ascii=False), analysis_run_id), ) # ── hospital_baseinfo ───────────────────────────────────────────────────────── async def _insert_hospital_history(hospital_id: str, analysis_run_id: str | None) -> None: row = await fetchone( "SELECT owner_user_id, hospital_name, hospital_name_en, brn, road_address, site_address, status" " FROM hospital_baseinfo WHERE hospital_id = %s", (hospital_id,), ) if not row: return await execute( "INSERT INTO hospital_history" " (hospital_id, owner_user_id, hospital_name, hospital_name_en, brn, road_address, site_address, status, analysis_run_id)" " VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)", ( hospital_id, row["owner_user_id"], row["hospital_name"], row["hospital_name_en"], row["brn"], row["road_address"], row["site_address"], row["status"], analysis_run_id, ), ) async def insert_hospital( hospital_id: str, name: str, name_en: str | None = None, road_address: str | None = None, site_address: str | None = None, owner_user_id: int = 0, brn: str = "", ) -> dict: await execute( "INSERT INTO hospital_baseinfo (hospital_id, hospital_name, hospital_name_en, road_address, site_address, status, owner_user_id, brn)" " VALUES (%s, %s, %s, %s, %s, 'done', %s, %s)", (hospital_id, name, name_en, road_address, site_address, owner_user_id, brn), ) await _insert_hospital_history(hospital_id, analysis_run_id=None) return await fetchone( "SELECT created_at FROM hospital_baseinfo WHERE hospital_id = %s", (hospital_id,), ) async def update_hospital_info(hospital_id: str, data: dict, analysis_run_id: str | None = None) -> None: """clinic 스크래핑 후 hospital_baseinfo의 기본 필드 업데이트.""" await execute( "UPDATE hospital_baseinfo" " SET status = 'done'," " hospital_name = COALESCE(%s, hospital_name)," " hospital_name_en = COALESCE(%s, hospital_name_en)," " road_address = COALESCE(%s, road_address)" " WHERE hospital_id = %s", ( data.get("clinicName"), data.get("clinicNameEn"), data.get("address"), hospital_id, ), ) await _insert_hospital_history(hospital_id, analysis_run_id) # ── file_data ───────────────────────────────────────────────────────────────── async def insert_file_row( analysis_run_id: str, file_type: str, file_name: str, file_url: str, size_bytes: int | None = None, hospital_id: str | None = None, ) -> int: return await execute( "INSERT INTO file_data (analysis_run_id, hospital_id, file_type, file_name, file_url, size_bytes)" " VALUES (%s, %s, %s, %s, %s, %s)", (analysis_run_id, hospital_id, file_type, file_name, file_url, size_bytes), ) # ── market_analysis ─────────────────────────────────────────────────────────── async def get_market_analysis(analysis_run_id: str) -> dict: rows = await fetchall( "SELECT analysis_type, data FROM market_analysis WHERE analysis_run_id = %s AND status = 'done'", (analysis_run_id,), ) return { row["analysis_type"]: json.loads(row["data"]) if isinstance(row["data"], str) else row["data"] for row in rows }