시장 조사 llm 추가 및 파이프라인 정리, db 커넥션 풀 문제 처리

upload
jaehwang 2026-05-19 15:22:34 +09:00
parent 42e09ae2d1
commit cda518c027
13 changed files with 328 additions and 65 deletions

View File

@ -179,3 +179,21 @@ CREATE INDEX IX_hospital_history_1
CREATE INDEX IX_hospital_history_2
ON hospital_history(analysis_run_id);
-- market_analysis Table Create SQL
CREATE TABLE market_analysis
(
`id` INT NOT NULL AUTO_INCREMENT,
`analysis_run_id` CHAR(36) NOT NULL,
`analysis_type` VARCHAR(50) NOT NULL,
`status` VARCHAR(20) NOT NULL DEFAULT 'start',
`data` JSON NULL,
`created_at` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (id),
UNIQUE KEY UQ_market_analysis (analysis_run_id, analysis_type)
);
-- Index 설정 SQL - market_analysis(analysis_run_id)
CREATE INDEX IX_market_analysis_1
ON market_analysis(analysis_run_id);

View File

@ -5,7 +5,7 @@ from common.deps import verify_api_key
from common.db import fetchone, insert_instagram_row, insert_facebook_row, insert_naver_blog_row, insert_youtube_row, insert_gangnam_unni_row, insert_analysis_run
from models.analysis import AnalysisCreate, AnalysisStartResponse, AnalysisStatusResponse
from models.status import AnalysisStatus
from services.collect import collect_instagram, collect_facebook, collect_naver_blog, collect_youtube, collect_gangnam_unni, collect_clinic_info
from services.pipeline import run_pipeline
router = APIRouter(prefix="/api/analysis", tags=["analysis"], dependencies=[Depends(verify_api_key)])
logger = logging.getLogger(__name__)
@ -24,7 +24,6 @@ async def start_analysis(body: AnalysisCreate, background_tasks: BackgroundTasks
)
if not hospital:
raise HTTPException(status_code=409, detail="Clinic not found")
owner_user_id = hospital["owner_user_id"] if hospital else 0
ig_id = await insert_instagram_row(hospital_id, body.channels.instagram) if body.channels.instagram else None
fb_id = await insert_facebook_row(hospital_id, body.channels.facebook) if body.channels.facebook else None
@ -32,20 +31,12 @@ async def start_analysis(body: AnalysisCreate, background_tasks: BackgroundTasks
yt_id = await insert_youtube_row(hospital_id, body.channels.youtube) if body.channels.youtube else None
gu_id = await insert_gangnam_unni_row(hospital_id, body.channels.gangnam_unni) if body.channels.gangnam_unni else None
analysis_run_id = await insert_analysis_run(analysis_run_id, hospital_id, owner_user_id, ig_id, fb_id, nb_id, yt_id, gu_id)
analysis_run_id = await insert_analysis_run(
analysis_run_id, hospital_id, hospital["owner_user_id"],
ig_id, fb_id, nb_id, yt_id, gu_id,
)
background_tasks.add_task(collect_clinic_info, analysis_run_id, hospital_id, hospital["url"])
if ig_id:
background_tasks.add_task(collect_instagram, analysis_run_id, ig_id, body.channels.instagram)
if fb_id:
background_tasks.add_task(collect_facebook, analysis_run_id, fb_id, body.channels.facebook)
if nb_id:
background_tasks.add_task(collect_naver_blog, analysis_run_id, nb_id, body.channels.naver_blog)
if yt_id:
background_tasks.add_task(collect_youtube, analysis_run_id, yt_id, body.channels.youtube)
if gu_id:
background_tasks.add_task(collect_gangnam_unni, analysis_run_id, gu_id, body.channels.gangnam_unni)
background_tasks.add_task(run_pipeline, analysis_run_id)
return AnalysisStartResponse(
analysis_run_id=analysis_run_id,

View File

@ -26,22 +26,38 @@ async def get_pool() -> aiomysql.Pool:
# 쓰기 (INSERT/UPDATE/DELETE)
async def execute(sql: str, args: tuple = ()) -> int:
pool = await get_pool()
print(
f"[Pool] size={pool.size} "
f"free={pool.freesize} "
f"used={pool.size - pool.freesize} "
f"max={pool.maxsize}"
)
async with pool.acquire() as conn:
await conn.ping(reconnect=True)
try:
async with conn.cursor() as cur:
await cur.execute(sql, args)
await conn.commit()
return cur.lastrowid
finally:
conn.close()
# 읽기 (SELECT)
async def fetchone(sql: str, args: tuple = ()) -> dict | None:
pool = await get_pool()
print(
f"[Pool] size={pool.size} "
f"free={pool.freesize} "
f"used={pool.size - pool.freesize} "
f"max={pool.maxsize}"
)
async with pool.acquire() as conn:
await conn.ping(reconnect=True)
try:
async with conn.cursor(aiomysql.DictCursor) as cur:
await cur.execute(sql, args)
return await cur.fetchone()
finally:
conn.close()
async def insert_instagram_row(hospital_id: str, url: str) -> int:
return await execute("INSERT INTO instagram_data (hospital_id, url) VALUES (%s, %s)", (hospital_id, url))

View File

@ -3,6 +3,12 @@ from pydantic import BaseModel
from common.utils import get_env
from integrations.llm.schemas.report import ReportInput, ReportOutput
from integrations.llm.schemas.plan import PlanInput, PlanOutput
from integrations.llm.schemas.market import (
MarketCompetitorsInput, MarketCompetitorsOutput,
MarketKeywordsInput, MarketKeywordsOutput,
MarketTrendInput, MarketTrendOutput,
MarketTargetAudienceInput, MarketTargetAudienceOutput,
)
_PROMPT_DIR = os.path.join(os.path.dirname(__file__), "temp-prompt")
@ -46,3 +52,31 @@ plan_prompt = Prompt(
input_class=PlanInput,
output_class=PlanOutput,
)
market_competitors_prompt = Prompt(
file_name="market_competitors_prompt.txt",
prompt_model="MARKET_MODEL",
input_class=MarketCompetitorsInput,
output_class=MarketCompetitorsOutput,
)
market_keywords_prompt = Prompt(
file_name="market_keywords_prompt.txt",
prompt_model="MARKET_MODEL",
input_class=MarketKeywordsInput,
output_class=MarketKeywordsOutput,
)
market_trend_prompt = Prompt(
file_name="market_trend_prompt.txt",
prompt_model="MARKET_MODEL",
input_class=MarketTrendInput,
output_class=MarketTrendOutput,
)
market_target_audience_prompt = Prompt(
file_name="market_target_audience_prompt.txt",
prompt_model="MARKET_MODEL",
input_class=MarketTargetAudienceInput,
output_class=MarketTargetAudienceOutput,
)

View File

@ -0,0 +1,61 @@
from pydantic import BaseModel
class MarketCompetitorsInput(BaseModel):
address: str
services: str
class MarketKeywordsInput(BaseModel):
services: str
class MarketTrendInput(BaseModel):
service: str
class MarketTargetAudienceInput(BaseModel):
clinic_name: str
# --- Output ---
class Competitor(BaseModel):
name: str
procedures: list[str]
reputation: str
marketing_channels: list[str]
class MarketCompetitorsOutput(BaseModel):
competitors: list[Competitor]
class Keyword(BaseModel):
keyword: str
monthly_volume: str
competition: str
class MarketKeywordsOutput(BaseModel):
keywords: list[Keyword]
long_tail_keywords: list[str]
class MarketTrendOutput(BaseModel):
market_size: str
growth_rate: str
key_trends: list[str]
channel_effectiveness: list[str]
class AudienceSegment(BaseModel):
age_group: str
gender: str
interested_procedures: list[str]
info_channels: list[str]
decision_factors: list[str]
class MarketTargetAudienceOutput(BaseModel):
segments: list[AudienceSegment]

View File

@ -0,0 +1,2 @@
{address} 근처 {services} 전문 성형외과/피부과 경쟁 병원 5곳을 분석해줘.
각 병원의 이름, 주요 시술, 온라인 평판, 마케팅 채널을 JSON 형식으로 제공해줘.

View File

@ -0,0 +1,2 @@
한국 {services} 관련 검색 키워드 트렌드.
네이버와 구글에서 월간 검색량이 높은 키워드 20개, 경쟁 강도, 추천 롱테일 키워드를 JSON 형식으로 제공해줘.

View File

@ -0,0 +1,2 @@
{clinic_name}의 잠재 고객 분석.
연령대별, 성별, 관심 시술, 정보 탐색 채널, 의사결정 요인을 JSON 형식으로 제공해줘.

View File

@ -0,0 +1,2 @@
한국 {service} 시장 트렌드 2025-2026.
시장 규모, 성장률, 주요 트렌드, 마케팅 채널별 효과를 JSON 형식으로 제공해줘.

View File

@ -1,7 +1,6 @@
import asyncio
import json
import logging
from common.db import fetchone, execute, is_done, get_analysis_raw_data, save_analysis_report
from common.db import fetchone, execute, get_analysis_raw_data, save_analysis_report
from integrations.llm.llm_service import LLMService
from integrations.llm.prompt import report_prompt, plan_prompt
from integrations.llm.schemas.report import ReportOutput
@ -69,43 +68,18 @@ async def generate_plan(analysis_run_id: str) -> PlanOutput:
return await LLMService(provider="perplexity").generate(plan_prompt, input_data)
async def run_plan_task(analysis_run_id: str) -> None:
logger.info("[plan] start run=%s", analysis_run_id)
result = await generate_plan(analysis_run_id)
await execute(
"UPDATE analysis_runs SET plan_data = %s, status = %s WHERE analysis_run_id = %s",
(json.dumps(result.model_dump(), ensure_ascii=False), AnalysisStatus.COMPLETED, analysis_run_id),
)
logger.info("[plan] done run=%s", analysis_run_id)
async def run_report_task(analysis_run_id: str) -> None:
logger.info("[report] start run=%s", analysis_run_id)
result = await generate_report(analysis_run_id)
await save_analysis_report(analysis_run_id, result.model_dump())
await execute("UPDATE analysis_runs SET status = %s WHERE analysis_run_id = %s", (AnalysisStatus.PLANNING, analysis_run_id))
logger.info("[report] done run=%s", analysis_run_id)
asyncio.create_task(run_plan_task(analysis_run_id))
async def check_and_advance_analysis(analysis_run_id: str) -> None:
run = await fetchone(
"SELECT hospital_id, instagram_data_id, facebook_data_id, naver_blog_data_id, youtube_data_id, gangnam_unni_data_id"
" FROM analysis_runs WHERE analysis_run_id = %s",
(analysis_run_id,),
async def run_plan_task(analysis_run_id: str) -> None:
logger.info("[plan] start run=%s", analysis_run_id)
result = await generate_plan(analysis_run_id)
await execute(
"UPDATE analysis_runs SET plan_data = %s WHERE analysis_run_id = %s",
(json.dumps(result.model_dump(), ensure_ascii=False), analysis_run_id),
)
clinic_row = await fetchone(
"SELECT status FROM hospital_baseinfo WHERE hospital_id = %s",
(run["hospital_id"],),
)
results = [
clinic_row["status"] == "done" if clinic_row else False,
await is_done("instagram_data", run["instagram_data_id"]),
await is_done("facebook_data", run["facebook_data_id"]),
await is_done("naver_blog_data", run["naver_blog_data_id"]),
await is_done("youtube_data", run["youtube_data_id"]),
await is_done("gangnam_unni_data", run["gangnam_unni_data_id"]),
]
if all(results):
await execute("UPDATE analysis_runs SET status = %s WHERE analysis_run_id = %s", (AnalysisStatus.ANALYZING, analysis_run_id))
asyncio.create_task(run_report_task(analysis_run_id))
logger.info("[plan] done run=%s", analysis_run_id)

View File

@ -1,4 +1,6 @@
import asyncio
import logging
from common.db import fetchone
from common.db import (
set_instagram_status, save_instagram_raw_data,
set_facebook_status, save_facebook_raw_data,
@ -8,7 +10,6 @@ from common.db import (
execute, save_hospital_raw_data,
)
from common.utils import get_env
from services.analysis import check_and_advance_analysis
from integrations.apify import ApifyClient
from integrations.naver import NaverClient
from integrations.youtube import YouTubeClient
@ -23,7 +24,6 @@ async def collect_instagram(analysis_run_id: str, row_id: int, url: str) -> None
data = await ApifyClient(get_env("APIFY_API_TOKEN")).get_instagram_profile(url)
await save_instagram_raw_data(row_id, data)
logger.info("[instagram] done run=%s", analysis_run_id)
await check_and_advance_analysis(analysis_run_id)
async def collect_facebook(analysis_run_id: str, row_id: int, url: str) -> None:
@ -32,7 +32,6 @@ async def collect_facebook(analysis_run_id: str, row_id: int, url: str) -> None:
data = await ApifyClient(get_env("APIFY_API_TOKEN")).get_facebook_page(url)
await save_facebook_raw_data(row_id, data)
logger.info("[facebook] done run=%s", analysis_run_id)
await check_and_advance_analysis(analysis_run_id)
async def collect_naver_blog(analysis_run_id: str, row_id: int, url: str) -> None:
@ -41,7 +40,6 @@ async def collect_naver_blog(analysis_run_id: str, row_id: int, url: str) -> Non
data = await NaverClient(get_env("NAVER_CLIENT_ID"), get_env("NAVER_CLIENT_SECRET")).get_blog_rss(url)
await save_naver_blog_raw_data(row_id, data)
logger.info("[naver_blog] done run=%s", analysis_run_id)
await check_and_advance_analysis(analysis_run_id)
async def collect_youtube(analysis_run_id: str, row_id: int, url: str) -> None:
@ -50,7 +48,6 @@ async def collect_youtube(analysis_run_id: str, row_id: int, url: str) -> None:
data = await YouTubeClient(get_env("YOUTUBE_API_KEY")).get_channel(url)
await save_youtube_raw_data(row_id, data)
logger.info("[youtube] done run=%s", analysis_run_id)
await check_and_advance_analysis(analysis_run_id)
async def collect_gangnam_unni(analysis_run_id: str, row_id: int, url: str) -> None:
@ -59,7 +56,6 @@ async def collect_gangnam_unni(analysis_run_id: str, row_id: int, url: str) -> N
data = await FirecrawlClient(get_env("FIRECRAWL_API_KEY")).get_gangnam_unni(url)
await save_gangnam_unni_raw_data(row_id, data)
logger.info("[gangnam_unni] done run=%s", analysis_run_id)
await check_and_advance_analysis(analysis_run_id)
async def collect_clinic_info(analysis_run_id: str, hospital_id: str, url: str) -> None:
@ -68,4 +64,33 @@ async def collect_clinic_info(analysis_run_id: str, hospital_id: str, url: str)
data = await FirecrawlClient(get_env("FIRECRAWL_API_KEY")).fetch_clinic_info(url)
await save_hospital_raw_data(hospital_id, data, analysis_run_id=analysis_run_id)
logger.info("[clinic] done run=%s", analysis_run_id)
await check_and_advance_analysis(analysis_run_id)
async def collect_all(
analysis_run_id: str,
hospital_id: str,
instagram_id: int | None = None,
facebook_id: int | None = None,
naver_blog_id: int | None = None,
youtube_id: int | None = None,
gangnam_unni_id: int | None = None,
) -> None:
async def _url(table: str, row_id: int) -> str:
row = await fetchone(f"SELECT url FROM {table} WHERE id = %s", (row_id,))
return row["url"] if row else ""
hospital = await fetchone("SELECT url FROM hospital_baseinfo WHERE hospital_id = %s", (hospital_id,))
tasks = [collect_clinic_info(analysis_run_id, hospital_id, hospital["url"])]
if instagram_id:
tasks.append(collect_instagram(analysis_run_id, instagram_id, await _url("instagram_data", instagram_id)))
if facebook_id:
tasks.append(collect_facebook(analysis_run_id, facebook_id, await _url("facebook_data", facebook_id)))
if naver_blog_id:
tasks.append(collect_naver_blog(analysis_run_id, naver_blog_id, await _url("naver_blog_data", naver_blog_id)))
if youtube_id:
tasks.append(collect_youtube(analysis_run_id, youtube_id, await _url("youtube_data", youtube_id)))
if gangnam_unni_id:
tasks.append(collect_gangnam_unni(analysis_run_id, gangnam_unni_id, await _url("gangnam_unni_data", gangnam_unni_id)))
await asyncio.gather(*tasks, return_exceptions=True)

84
app/services/market.py Normal file
View File

@ -0,0 +1,84 @@
import asyncio
import json
import logging
from common.db import fetchone, execute
from integrations.llm.llm_service import LLMService
from integrations.llm.prompt import (
market_competitors_prompt,
market_keywords_prompt,
market_trend_prompt,
market_target_audience_prompt,
)
logger = logging.getLogger(__name__)
_TYPES = ["competitors", "keywords", "trend", "target_audience"]
async def _save(analysis_run_id: str, analysis_type: str, result, exc: Exception | None) -> None:
if exc:
logger.warning("[market] %s failed run=%s: %s", analysis_type, analysis_run_id, exc)
await execute(
"INSERT INTO market_analysis (analysis_run_id, analysis_type, status)"
" VALUES (%s, %s, 'failed')"
" ON DUPLICATE KEY UPDATE status = 'failed'",
(analysis_run_id, analysis_type),
)
else:
await execute(
"INSERT INTO market_analysis (analysis_run_id, analysis_type, status, data)"
" VALUES (%s, %s, 'done', %s)"
" ON DUPLICATE KEY UPDATE status = 'done', data = VALUES(data)",
(analysis_run_id, analysis_type, json.dumps(result.model_dump(), ensure_ascii=False)),
)
async def run_market_analysis(analysis_run_id: str) -> None:
logger.info("[market] start run=%s", analysis_run_id)
run = await fetchone(
"SELECT hospital_id FROM analysis_runs WHERE analysis_run_id = %s",
(analysis_run_id,),
)
clinic = await fetchone(
"SELECT hospital_name, road_address, raw_data FROM hospital_baseinfo WHERE hospital_id = %s",
(run["hospital_id"],),
)
raw_data = clinic["raw_data"]
clinic_data = json.loads(raw_data) if isinstance(raw_data, str) else (raw_data or {})
clinic_name = clinic["hospital_name"] or ""
address = clinic["road_address"] or ""
services = clinic_data.get("services", [])
services_str = ", ".join(services[:3])
primary_service = services[0] if services else ""
for analysis_type in _TYPES:
await execute(
"INSERT INTO market_analysis (analysis_run_id, analysis_type, status)"
" VALUES (%s, %s, 'processing')"
" ON DUPLICATE KEY UPDATE status = 'processing'",
(analysis_run_id, analysis_type),
)
llm = LLMService(provider="perplexity")
results = await asyncio.gather(
llm.generate(market_competitors_prompt, {"address": address, "services": services_str}),
llm.generate(market_keywords_prompt, {"services": services_str}),
llm.generate(market_trend_prompt, {"service": primary_service}),
llm.generate(market_target_audience_prompt, {"clinic_name": clinic_name}),
return_exceptions=True,
)
# 그다지 좋은 방식은 아님
await asyncio.gather(*[
_save(
analysis_run_id,
analysis_type,
result=None if isinstance(r, Exception) else r,
exc=r if isinstance(r, Exception) else None,
)
for analysis_type, r in zip(_TYPES, results)
])
logger.info("[market] done run=%s", analysis_run_id)

52
app/services/pipeline.py Normal file
View File

@ -0,0 +1,52 @@
import logging
from common.db import fetchone, execute
from models.status import AnalysisStatus
from services.collect import collect_all
from services.market import run_market_analysis
from services.analysis import run_report_task, run_plan_task
logger = logging.getLogger(__name__)
async def run_pipeline(analysis_run_id: str) -> None:
logger.info("[pipeline] start run=%s", analysis_run_id)
# ── 1. Collect ──────────────────────────────────────────────────────────
run = await fetchone(
"SELECT hospital_id, instagram_data_id, facebook_data_id,"
" naver_blog_data_id, youtube_data_id, gangnam_unni_data_id"
" FROM analysis_runs WHERE analysis_run_id = %s",
(analysis_run_id,),
)
await collect_all(
analysis_run_id,
hospital_id=run["hospital_id"],
instagram_id=run["instagram_data_id"],
facebook_id=run["facebook_data_id"],
naver_blog_id=run["naver_blog_data_id"],
youtube_id=run["youtube_data_id"],
gangnam_unni_id=run["gangnam_unni_data_id"],
)
# ── 2. Market ────────────────────────────────────────────────────────────
await execute(
"UPDATE analysis_runs SET status = %s WHERE analysis_run_id = %s",
(AnalysisStatus.ANALYZING, analysis_run_id),
)
await run_market_analysis(analysis_run_id)
# ── 3. Report ────────────────────────────────────────────────────────────
await run_report_task(analysis_run_id)
# ── 4. Plan ──────────────────────────────────────────────────────────────
await execute(
"UPDATE analysis_runs SET status = %s WHERE analysis_run_id = %s",
(AnalysisStatus.PLANNING, analysis_run_id),
)
await run_plan_task(analysis_run_id)
await execute(
"UPDATE analysis_runs SET status = %s WHERE analysis_run_id = %s",
(AnalysisStatus.COMPLETED, analysis_run_id),
)
logger.info("[pipeline] done run=%s", analysis_run_id)