227 lines
8.6 KiB
Python
227 lines
8.6 KiB
Python
import json
|
|
import os
|
|
import aiomysql
|
|
from common.utils import get_env
|
|
|
|
_pool: aiomysql.Pool | None = None
|
|
|
|
|
|
async def get_pool() -> aiomysql.Pool:
|
|
global _pool
|
|
if _pool is None:
|
|
_pool = await aiomysql.create_pool(
|
|
host=get_env("MYSQL_HOST"),
|
|
port=int(os.getenv("MYSQL_PORT", "3306")),
|
|
user=get_env("MYSQL_USER"),
|
|
password=get_env("MYSQL_PASSWORD"),
|
|
db=get_env("MYSQL_DB"),
|
|
autocommit=True,
|
|
charset="utf8mb4",
|
|
)
|
|
return _pool
|
|
|
|
|
|
async def execute(sql: str, args: tuple = ()) -> int:
|
|
pool = await get_pool()
|
|
async with pool.acquire() as conn:
|
|
async with conn.cursor() as cur:
|
|
await cur.execute(sql, args)
|
|
return cur.lastrowid
|
|
|
|
|
|
async def fetchone(sql: str, args: tuple = ()) -> dict | None:
|
|
pool = await get_pool()
|
|
async with pool.acquire() as conn:
|
|
async with conn.cursor(aiomysql.DictCursor) as cur:
|
|
await cur.execute(sql, args)
|
|
return await cur.fetchone()
|
|
|
|
async def insert_instagram_row(hospital_id: str, url: str) -> int:
|
|
return await execute("INSERT INTO instagram_data (hospital_id, url) VALUES (%s, %s)", (hospital_id, url))
|
|
|
|
|
|
async def insert_facebook_row(hospital_id: str, url: str) -> int:
|
|
return await execute("INSERT INTO facebook_data (hospital_id, url) VALUES (%s, %s)", (hospital_id, url))
|
|
|
|
|
|
async def insert_naver_blog_row(hospital_id: str, url: str) -> int:
|
|
return await execute("INSERT INTO naver_blog_data (hospital_id, url) VALUES (%s, %s)", (hospital_id, url))
|
|
|
|
|
|
async def insert_youtube_row(hospital_id: str, url: str) -> int:
|
|
return await execute("INSERT INTO youtube_data (hospital_id, url) VALUES (%s, %s)", (hospital_id, url))
|
|
|
|
|
|
async def insert_gangnam_unni_row(hospital_id: str, url: str) -> int:
|
|
return await execute("INSERT INTO gangnam_unni_data (hospital_id, url) VALUES (%s, %s)", (hospital_id, url))
|
|
|
|
|
|
async def insert_analysis_run(
|
|
analysis_run_id: str,
|
|
hospital_id: str,
|
|
owner_user_id: int,
|
|
instagram_data_id: int | None,
|
|
facebook_data_id: int | None,
|
|
naver_blog_data_id: int | None,
|
|
youtube_data_id: int | None,
|
|
gangnam_unni_data_id: int | None,
|
|
) -> str:
|
|
await execute(
|
|
"INSERT INTO analysis_runs"
|
|
" (analysis_run_id, hospital_id, owner_user_id, instagram_data_id, facebook_data_id, naver_blog_data_id, youtube_data_id, gangnam_unni_data_id)"
|
|
" VALUES (%s, %s, %s, %s, %s, %s, %s, %s)",
|
|
(analysis_run_id, hospital_id, owner_user_id, instagram_data_id, facebook_data_id, naver_blog_data_id, youtube_data_id, gangnam_unni_data_id),
|
|
)
|
|
return analysis_run_id
|
|
|
|
|
|
|
|
async def save_analysis_report(analysis_run_id: str, data: dict) -> None:
|
|
await execute(
|
|
"UPDATE analysis_runs SET report_data = %s WHERE analysis_run_id = %s",
|
|
(json.dumps(data, ensure_ascii=False), analysis_run_id),
|
|
)
|
|
|
|
|
|
async def is_done(table: str, row_id: int | None) -> bool:
|
|
if row_id is None:
|
|
return True
|
|
r = await fetchone(f"SELECT status FROM {table} WHERE id = %s", (row_id,))
|
|
return r["status"] == "done"
|
|
|
|
|
|
async def _fetch_raw(table: str, row_id: int | None) -> dict | None:
|
|
if row_id is None:
|
|
return None
|
|
row = await fetchone(f"SELECT raw_data FROM {table} WHERE id = %s", (row_id,))
|
|
if not row or not row["raw_data"]:
|
|
return None
|
|
return json.loads(row["raw_data"]) if isinstance(row["raw_data"], str) else row["raw_data"]
|
|
|
|
|
|
async def get_analysis_raw_data(analysis_run_id: str) -> dict:
|
|
run = await fetchone(
|
|
"SELECT instagram_data_id, facebook_data_id, naver_blog_data_id, youtube_data_id, gangnam_unni_data_id"
|
|
" FROM analysis_runs WHERE analysis_run_id = %s",
|
|
(analysis_run_id,),
|
|
)
|
|
return {
|
|
"instagram": await _fetch_raw("instagram_data", run["instagram_data_id"]),
|
|
"facebook": await _fetch_raw("facebook_data", run["facebook_data_id"]),
|
|
"naver_blog": await _fetch_raw("naver_blog_data", run["naver_blog_data_id"]),
|
|
"youtube": await _fetch_raw("youtube_data", run["youtube_data_id"]),
|
|
"gangnam_unni": await _fetch_raw("gangnam_unni_data", run["gangnam_unni_data_id"]),
|
|
}
|
|
|
|
|
|
async def set_instagram_status(row_id: int, status: str) -> None:
|
|
await execute("UPDATE instagram_data SET status = %s WHERE id = %s", (status, row_id))
|
|
|
|
|
|
async def set_facebook_status(row_id: int, status: str) -> None:
|
|
await execute("UPDATE facebook_data SET status = %s WHERE id = %s", (status, row_id))
|
|
|
|
|
|
async def set_naver_blog_status(row_id: int, status: str) -> None:
|
|
await execute("UPDATE naver_blog_data SET status = %s WHERE id = %s", (status, row_id))
|
|
|
|
|
|
async def set_youtube_status(row_id: int, status: str) -> None:
|
|
await execute("UPDATE youtube_data SET status = %s WHERE id = %s", (status, row_id))
|
|
|
|
|
|
async def set_gangnam_unni_status(row_id: int, status: str) -> None:
|
|
await execute("UPDATE gangnam_unni_data SET status = %s WHERE id = %s", (status, row_id))
|
|
|
|
|
|
async def save_instagram_raw_data(row_id: int, data: dict) -> None:
|
|
await execute("UPDATE instagram_data SET raw_data = %s, status = 'done' WHERE id = %s", (json.dumps(data, ensure_ascii=False), row_id))
|
|
|
|
|
|
async def save_facebook_raw_data(row_id: int, data: dict) -> None:
|
|
await execute("UPDATE facebook_data SET raw_data = %s, status = 'done' WHERE id = %s", (json.dumps(data, ensure_ascii=False), row_id))
|
|
|
|
|
|
async def save_naver_blog_raw_data(row_id: int, data: dict) -> None:
|
|
await execute("UPDATE naver_blog_data SET raw_data = %s, status = 'done' WHERE id = %s", (json.dumps(data, ensure_ascii=False), row_id))
|
|
|
|
|
|
async def save_youtube_raw_data(row_id: int, data: dict) -> None:
|
|
await execute("UPDATE youtube_data SET raw_data = %s, status = 'done' WHERE id = %s", (json.dumps(data, ensure_ascii=False), row_id))
|
|
|
|
|
|
async def save_gangnam_unni_raw_data(row_id: int, data: dict) -> None:
|
|
await execute("UPDATE gangnam_unni_data SET raw_data = %s, status = 'done' WHERE id = %s", (json.dumps(data, ensure_ascii=False), row_id))
|
|
|
|
|
|
async def _insert_hospital_history(hospital_id: str, analysis_run_id: str | None) -> None:
|
|
row = await fetchone(
|
|
"SELECT owner_user_id, hospital_name, hospital_name_en, brn, road_address, site_address, url, status, raw_data"
|
|
" FROM hospital_baseinfo WHERE hospital_id = %s",
|
|
(hospital_id,),
|
|
)
|
|
if not row:
|
|
return
|
|
await execute(
|
|
"INSERT INTO hospital_history"
|
|
" (hospital_id, owner_user_id, hospital_name, hospital_name_en, brn, road_address, site_address, url, status, raw_data, analysis_run_id)"
|
|
" VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)",
|
|
(
|
|
hospital_id,
|
|
row["owner_user_id"],
|
|
row["hospital_name"],
|
|
row["hospital_name_en"],
|
|
row["brn"],
|
|
row["road_address"],
|
|
row["site_address"],
|
|
row["url"],
|
|
row["status"],
|
|
row["raw_data"] if isinstance(row["raw_data"], str) else json.dumps(row["raw_data"], ensure_ascii=False) if row["raw_data"] else None,
|
|
analysis_run_id,
|
|
),
|
|
)
|
|
|
|
|
|
async def insert_hospital(
|
|
hospital_id: str,
|
|
name: str,
|
|
name_en: str | None = None,
|
|
road_address: str | None = None,
|
|
site_address: str | None = None,
|
|
url: str | None = None,
|
|
raw_data: dict | None = None,
|
|
owner_user_id: int = 0,
|
|
brn: str = "",
|
|
) -> dict:
|
|
await execute(
|
|
"INSERT INTO hospital_baseinfo (hospital_id, hospital_name, hospital_name_en, road_address, site_address, url, raw_data, status, owner_user_id, brn)"
|
|
" VALUES (%s, %s, %s, %s, %s, %s, %s, 'done', %s, %s)",
|
|
(hospital_id, name, name_en, road_address, site_address, url,
|
|
json.dumps(raw_data, ensure_ascii=False) if raw_data else None,
|
|
owner_user_id, brn),
|
|
)
|
|
await _insert_hospital_history(hospital_id, analysis_run_id=None)
|
|
return await fetchone(
|
|
"SELECT created_at FROM hospital_baseinfo WHERE hospital_id = %s",
|
|
(hospital_id,),
|
|
)
|
|
|
|
|
|
async def save_hospital_raw_data(hospital_id: str, data: dict, analysis_run_id: str | None = None) -> None:
|
|
await execute(
|
|
"UPDATE hospital_baseinfo"
|
|
" SET raw_data = %s, status = 'done',"
|
|
" hospital_name = COALESCE(%s, hospital_name),"
|
|
" hospital_name_en = COALESCE(%s, hospital_name_en),"
|
|
" road_address = COALESCE(%s, road_address)"
|
|
" WHERE hospital_id = %s",
|
|
(
|
|
json.dumps(data, ensure_ascii=False),
|
|
data.get("clinicName"),
|
|
data.get("clinicNameEn"),
|
|
data.get("address"),
|
|
hospital_id,
|
|
),
|
|
)
|
|
await _insert_hospital_history(hospital_id, analysis_run_id)
|