85 lines
3.1 KiB
Python
85 lines
3.1 KiB
Python
import asyncio
|
|
import json
|
|
import logging
|
|
from common.db import fetchone, execute
|
|
from integrations.llm.llm_service import LLMService
|
|
from integrations.llm.prompt import (
|
|
market_competitors_prompt,
|
|
market_keywords_prompt,
|
|
market_trend_prompt,
|
|
market_target_audience_prompt,
|
|
)
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
_TYPES = ["competitors", "keywords", "trend", "target_audience"]
|
|
|
|
|
|
async def _save(analysis_run_id: str, analysis_type: str, result, exc: Exception | None) -> None:
|
|
if exc:
|
|
logger.warning("[market] %s failed run=%s: %s", analysis_type, analysis_run_id, exc)
|
|
await execute(
|
|
"INSERT INTO market_analysis (analysis_run_id, analysis_type, status)"
|
|
" VALUES (%s, %s, 'failed')"
|
|
" ON DUPLICATE KEY UPDATE status = 'failed'",
|
|
(analysis_run_id, analysis_type),
|
|
)
|
|
else:
|
|
await execute(
|
|
"INSERT INTO market_analysis (analysis_run_id, analysis_type, status, data)"
|
|
" VALUES (%s, %s, 'done', %s)"
|
|
" ON DUPLICATE KEY UPDATE status = 'done', data = VALUES(data)",
|
|
(analysis_run_id, analysis_type, json.dumps(result.model_dump(), ensure_ascii=False)),
|
|
)
|
|
|
|
|
|
async def run_market_analysis(analysis_run_id: str) -> None:
|
|
logger.info("[market] start run=%s", analysis_run_id)
|
|
|
|
run = await fetchone(
|
|
"SELECT hospital_id FROM analysis_runs WHERE analysis_run_id = %s",
|
|
(analysis_run_id,),
|
|
)
|
|
clinic = await fetchone(
|
|
"SELECT hospital_name, road_address, raw_data FROM hospital_baseinfo WHERE hospital_id = %s",
|
|
(run["hospital_id"],),
|
|
)
|
|
|
|
raw_data = clinic["raw_data"]
|
|
clinic_data = json.loads(raw_data) if isinstance(raw_data, str) else (raw_data or {})
|
|
|
|
clinic_name = clinic["hospital_name"] or ""
|
|
address = clinic["road_address"] or ""
|
|
services = clinic_data.get("services", [])
|
|
services_str = ", ".join(services[:3])
|
|
primary_service = services[0] if services else ""
|
|
|
|
for analysis_type in _TYPES:
|
|
await execute(
|
|
"INSERT INTO market_analysis (analysis_run_id, analysis_type, status)"
|
|
" VALUES (%s, %s, 'processing')"
|
|
" ON DUPLICATE KEY UPDATE status = 'processing'",
|
|
(analysis_run_id, analysis_type),
|
|
)
|
|
|
|
llm = LLMService(provider="perplexity")
|
|
results = await asyncio.gather(
|
|
llm.generate(market_competitors_prompt, {"address": address, "services": services_str}),
|
|
llm.generate(market_keywords_prompt, {"services": services_str}),
|
|
llm.generate(market_trend_prompt, {"service": primary_service}),
|
|
llm.generate(market_target_audience_prompt, {"clinic_name": clinic_name}),
|
|
return_exceptions=True,
|
|
)
|
|
# 그다지 좋은 방식은 아님
|
|
await asyncio.gather(*[
|
|
_save(
|
|
analysis_run_id,
|
|
analysis_type,
|
|
result=None if isinstance(r, Exception) else r,
|
|
exc=r if isinstance(r, Exception) else None,
|
|
)
|
|
for analysis_type, r in zip(_TYPES, results)
|
|
])
|
|
|
|
logger.info("[market] done run=%s", analysis_run_id)
|