o2o-castad-backend/app/video/services/video.py

853 lines
29 KiB
Python

import random
from typing import List
from fastapi import Request, status
from fastapi.exceptions import HTTPException
from sqlalchemy import Connection, text
from sqlalchemy.exc import SQLAlchemyError
from app.lyrics.schemas.lyrics_schema import (
AttributeData,
PromptTemplateData,
SongFormData,
SongSampleData,
StoreData,
)
from app.utils.chatgpt_prompt import chatgpt_api
async def get_store_info(conn: Connection) -> List[StoreData]:
try:
query = """SELECT * FROM store_default_info;"""
result = await conn.execute(text(query))
all_store_info = [
StoreData(
id=row[0],
store_info=row[1],
store_name=row[2],
store_category=row[3],
store_region=row[4],
store_address=row[5],
store_phone_number=row[6],
created_at=row[7],
)
for row in result
]
result.close()
return all_store_info
except SQLAlchemyError as e:
print(e)
raise HTTPException(
status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
detail="요청하신 서비스가 잠시 내부적으로 문제가 발생하였습니다.",
)
except Exception as e:
print(e)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="알수없는 이유로 서비스 오류가 발생하였습니다",
)
async def get_attribute(conn: Connection) -> List[AttributeData]:
try:
query = """SELECT * FROM attribute;"""
result = await conn.execute(text(query))
all_attribute = [
AttributeData(
id=row[0],
attr_category=row[1],
attr_value=row[2],
created_at=row[3],
)
for row in result
]
result.close()
return all_attribute
except SQLAlchemyError as e:
print(e)
raise HTTPException(
status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
detail="요청하신 서비스가 잠시 내부적으로 문제가 발생하였습니다.",
)
except Exception as e:
print(e)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="알수없는 이유로 서비스 오류가 발생하였습니다",
)
async def get_attribute(conn: Connection) -> List[AttributeData]:
try:
query = """SELECT * FROM attribute;"""
result = await conn.execute(text(query))
all_attribute = [
AttributeData(
id=row[0],
attr_category=row[1],
attr_value=row[2],
created_at=row[3],
)
for row in result
]
result.close()
return all_attribute
except SQLAlchemyError as e:
print(e)
raise HTTPException(
status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
detail="요청하신 서비스가 잠시 내부적으로 문제가 발생하였습니다.",
)
except Exception as e:
print(e)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="알수없는 이유로 서비스 오류가 발생하였습니다",
)
async def get_sample_song(conn: Connection) -> List[SongSampleData]:
try:
query = """SELECT * FROM song_sample;"""
result = await conn.execute(text(query))
all_sample_song = [
SongSampleData(
id=row[0],
ai=row[1],
ai_model=row[2],
genre=row[3],
sample_song=row[4],
)
for row in result
]
result.close()
return all_sample_song
except SQLAlchemyError as e:
print(e)
raise HTTPException(
status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
detail="요청하신 서비스가 잠시 내부적으로 문제가 발생하였습니다.",
)
except Exception as e:
print(e)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="알수없는 이유로 서비스 오류가 발생하였습니다",
)
async def get_prompt_template(conn: Connection) -> List[PromptTemplateData]:
try:
query = """SELECT * FROM prompt_template;"""
result = await conn.execute(text(query))
all_prompt_template = [
PromptTemplateData(
id=row[0],
description=row[1],
prompt=row[2],
)
for row in result
]
result.close()
return all_prompt_template
except SQLAlchemyError as e:
print(e)
raise HTTPException(
status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
detail="요청하신 서비스가 잠시 내부적으로 문제가 발생하였습니다.",
)
except Exception as e:
print(e)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="알수없는 이유로 서비스 오류가 발생하였습니다",
)
async def get_song_result(conn: Connection) -> List[PromptTemplateData]:
try:
query = """SELECT * FROM prompt_template;"""
result = await conn.execute(text(query))
all_prompt_template = [
PromptTemplateData(
id=row[0],
description=row[1],
prompt=row[2],
)
for row in result
]
result.close()
return all_prompt_template
except SQLAlchemyError as e:
print(e)
raise HTTPException(
status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
detail="요청하신 서비스가 잠시 내부적으로 문제가 발생하였습니다.",
)
except Exception as e:
print(e)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="알수없는 이유로 서비스 오류가 발생하였습니다",
)
async def make_song_result(request: Request, conn: Connection):
try:
# 1. Form 데이터 파싱
form_data = await SongFormData.from_form(request)
print(f"\n{'=' * 60}")
print(f"Store ID: {form_data.store_id}")
print(f"Lyrics IDs: {form_data.lyrics_ids}")
print(f"Prompt IDs: {form_data.prompts}")
print(f"{'=' * 60}\n")
# 2. Store 정보 조회
store_query = """
SELECT * FROM store_default_info WHERE id=:id;
"""
store_result = await conn.execute(text(store_query), {"id": form_data.store_id})
all_store_info = [
StoreData(
id=row[0],
store_info=row[1],
store_name=row[2],
store_category=row[3],
store_region=row[4],
store_address=row[5],
store_phone_number=row[6],
created_at=row[7],
)
for row in store_result
]
if not all_store_info:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Store not found: {form_data.store_id}",
)
store_info = all_store_info[0]
print(f"Store: {store_info.store_name}")
# 3. 속성 조회 -- 단계별 선택 프로세서시 구현 필요 없음
# 4. Sample Song 조회 및 결합
combined_sample_song = None
if form_data.lyrics_ids:
print(f"\n[샘플 가사 조회] - {len(form_data.lyrics_ids)}")
lyrics_query = """
SELECT sample_song FROM song_sample
WHERE id IN :ids
ORDER BY created_at;
"""
lyrics_result = await conn.execute(
text(lyrics_query), {"ids": tuple(form_data.lyrics_ids)}
)
sample_songs = [
row.sample_song for row in lyrics_result.fetchall() if row.sample_song
]
if sample_songs:
combined_sample_song = "\n\n".join(
[f"[샘플 {i + 1}]\n{song}" for i, song in enumerate(sample_songs)]
)
print(f"{len(sample_songs)}개의 샘플 가사 조회 완료")
else:
print("샘플 가사가 비어있습니다")
else:
print("선택된 lyrics가 없습니다")
# 5. 템플릿 가져오기
if not form_data.prompts:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="프롬프트 ID가 필요합니다",
)
print("템플릿 가져오기")
prompts_query = """
SELECT * FROM prompt_template WHERE id=:id;
"""
# ✅ 수정: store_query → prompts_query
prompts_result = await conn.execute(
text(prompts_query), {"id": form_data.prompts}
)
prompts_info = [
PromptTemplateData(
id=row[0],
description=row[1],
prompt=row[2],
)
for row in prompts_result
]
if not prompts_info:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Prompt not found: {form_data.prompts}",
)
prompt = prompts_info[0]
print(f"Prompt Template: {prompt.prompt}")
# ✅ 6. 프롬프트 조합
updated_prompt = prompt.prompt.replace("###", form_data.attributes_str).format(
name=store_info.store_name or "",
address=store_info.store_address or "",
category=store_info.store_category or "",
description=store_info.store_info or "",
)
updated_prompt += f"""
다음은 참고해야 하는 샘플 가사 정보입니다.
샘플 가사를 참고하여 작곡을 해주세요.
{combined_sample_song}
"""
print(f"\n[업데이트된 프롬프트]\n{updated_prompt}\n")
# 7. 모델에게 요청
generated_lyrics = await chatgpt_api.generate(prompt=updated_prompt)
# 글자 수 계산
total_chars_with_space = len(generated_lyrics)
total_chars_without_space = len(
generated_lyrics.replace(" ", "")
.replace("\n", "")
.replace("\r", "")
.replace("\t", "")
)
# final_lyrics 생성
final_lyrics = f"""속성 {form_data.attributes_str}
전체 글자 수 (공백 포함): {total_chars_with_space}
전체 글자 수 (공백 제외): {total_chars_without_space}\r\n\r\n{generated_lyrics}"""
print("=" * 40)
print("[translate:form_data.attributes_str:] ", form_data.attributes_str)
print("[translate:total_chars_with_space:] ", total_chars_with_space)
print("[translate:total_chars_without_space:] ", total_chars_without_space)
print("[translate:final_lyrics:]")
print(final_lyrics)
print("=" * 40)
# 8. DB 저장
insert_query = """
INSERT INTO song_results_all (
store_info, store_name, store_category, store_address, store_phone_number,
description, prompt, attr_category, attr_value,
ai, ai_model, genre,
sample_song, result_song, created_at
) VALUES (
:store_info, :store_name, :store_category, :store_address, :store_phone_number,
:description, :prompt, :attr_category, :attr_value,
:ai, :ai_model, :genre,
:sample_song, :result_song, NOW()
);
"""
# ✅ attr_category, attr_value 추가
insert_params = {
"store_info": store_info.store_info or "",
"store_name": store_info.store_name,
"store_category": store_info.store_category or "",
"store_address": store_info.store_address or "",
"store_phone_number": store_info.store_phone_number or "",
"description": store_info.store_info or "",
"prompt": form_data.prompts,
"attr_category": ", ".join(form_data.attributes.keys())
if form_data.attributes
else "",
"attr_value": ", ".join(form_data.attributes.values())
if form_data.attributes
else "",
"ai": "ChatGPT",
"ai_model": form_data.llm_model,
"genre": "후크송",
"sample_song": combined_sample_song or "없음",
"result_song": final_lyrics,
}
await conn.execute(text(insert_query), insert_params)
await conn.commit()
print("결과 저장 완료")
print("\n전체 결과 조회 중...")
# 9. 생성 결과 가져오기 (created_at 역순)
select_query = """
SELECT * FROM song_results_all
ORDER BY created_at DESC;
"""
all_results = await conn.execute(text(select_query))
results_list = [
{
"id": row.id,
"store_info": row.store_info,
"store_name": row.store_name,
"store_category": row.store_category,
"store_address": row.store_address,
"store_phone_number": row.store_phone_number,
"description": row.description,
"prompt": row.prompt,
"attr_category": row.attr_category,
"attr_value": row.attr_value,
"ai": row.ai,
"ai_model": row.ai_model,
"genre": row.genre,
"sample_song": row.sample_song,
"result_song": row.result_song,
"created_at": row.created_at.isoformat() if row.created_at else None,
}
for row in all_results.fetchall()
]
print(f"전체 {len(results_list)}개의 결과 조회 완료\n")
return results_list
except HTTPException:
raise
except SQLAlchemyError as e:
print(f"Database Error: {e}")
import traceback
traceback.print_exc()
raise HTTPException(
status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
detail="데이터베이스 연결에 문제가 발생했습니다.",
)
except Exception as e:
print(f"Unexpected Error: {e}")
import traceback
traceback.print_exc()
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="서비스 처리 중 오류가 발생했습니다.",
)
async def get_song_result(conn: Connection): # 반환 타입 수정
try:
select_query = """
SELECT * FROM song_results_all
ORDER BY created_at DESC;
"""
all_results = await conn.execute(text(select_query))
results_list = [
{
"id": row.id,
"store_info": row.store_info,
"store_name": row.store_name,
"store_category": row.store_category,
"store_address": row.store_address,
"store_phone_number": row.store_phone_number,
"description": row.description,
"prompt": row.prompt,
"attr_category": row.attr_category,
"attr_value": row.attr_value,
"ai": row.ai,
"ai_model": row.ai_model,
"season": row.season,
"num_of_people": row.num_of_people,
"people_category": row.people_category,
"genre": row.genre,
"sample_song": row.sample_song,
"result_song": row.result_song,
"created_at": row.created_at.isoformat() if row.created_at else None,
}
for row in all_results.fetchall()
]
print(f"전체 {len(results_list)}개의 결과 조회 완료\n")
return results_list
except HTTPException: # HTTPException은 그대로 raise
raise
except SQLAlchemyError as e:
print(f"Database Error: {e}")
import traceback
traceback.print_exc()
raise HTTPException(
status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
detail="데이터베이스 연결에 문제가 발생했습니다.",
)
except Exception as e:
print(f"Unexpected Error: {e}")
import traceback
traceback.print_exc()
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="서비스 처리 중 오류가 발생했습니다.",
)
async def make_automation(request: Request, conn: Connection):
try:
# 1. Form 데이터 파싱
form_data = await SongFormData.from_form(request)
print(f"\n{'=' * 60}")
print(f"Store ID: {form_data.store_id}")
print(f"{'=' * 60}\n")
# 2. Store 정보 조회
store_query = """
SELECT * FROM store_default_info WHERE id=:id;
"""
store_result = await conn.execute(text(store_query), {"id": form_data.store_id})
all_store_info = [
StoreData(
id=row[0],
store_info=row[1],
store_name=row[2],
store_category=row[3],
store_region=row[4],
store_address=row[5],
store_phone_number=row[6],
created_at=row[7],
)
for row in store_result
]
if not all_store_info:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Store not found: {form_data.store_id}",
)
store_info = all_store_info[0]
print(f"Store: {store_info.store_name}")
# 3. 속성 조회 -- 단계별 선택 프로세서시 구현 필요 없음
attribute_query = """
SELECT * FROM attribute;
"""
attribute_results = await conn.execute(text(attribute_query))
# 결과 가져오기
attribute_rows = attribute_results.fetchall()
formatted_attributes = ""
selected_categories = []
selected_values = []
if attribute_rows:
attribute_list = [
AttributeData(
id=row[0],
attr_category=row[1],
attr_value=row[2],
created_at=row[3],
)
for row in attribute_rows
]
# ✅ 각 category에서 하나의 value만 랜덤 선택
formatted_pairs = []
for attr in attribute_list:
# 쉼표로 분리 및 공백 제거
values = [v.strip() for v in attr.attr_value.split(",") if v.strip()]
if values:
# 랜덤하게 하나만 선택
selected_value = random.choice(values)
formatted_pairs.append(f"{attr.attr_category} : {selected_value}")
# ✅ 선택된 category와 value 저장
selected_categories.append(attr.attr_category)
selected_values.append(selected_value)
# 최종 문자열 생성
formatted_attributes = "\n".join(formatted_pairs)
print(f"\n[포맷팅된 문자열 속성 정보]\n{formatted_attributes}\n")
else:
print("속성 데이터가 없습니다")
formatted_attributes = ""
# 4. 템플릿 가져오기
print("템플릿 가져오기 (ID=1)")
prompts_query = """
SELECT * FROM prompt_template WHERE id=1;
"""
prompts_result = await conn.execute(text(prompts_query))
row = prompts_result.fetchone()
if not row:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Prompt ID 1 not found",
)
prompt = PromptTemplateData(
id=row[0],
description=row[1],
prompt=row[2],
)
print(f"Prompt Template: {prompt.prompt}")
# 5. 템플릿 조합
updated_prompt = prompt.prompt.replace("###", formatted_attributes).format(
name=store_info.store_name or "",
address=store_info.store_address or "",
category=store_info.store_category or "",
description=store_info.store_info or "",
)
print("\n" + "=" * 80)
print("업데이트된 프롬프트")
print("=" * 80)
print(updated_prompt)
print("=" * 80 + "\n")
# 4. Sample Song 조회 및 결합
combined_sample_song = None
if form_data.lyrics_ids:
print(f"\n[샘플 가사 조회] - {len(form_data.lyrics_ids)}")
lyrics_query = """
SELECT sample_song FROM song_sample
WHERE id IN :ids
ORDER BY created_at;
"""
lyrics_result = await conn.execute(
text(lyrics_query), {"ids": tuple(form_data.lyrics_ids)}
)
sample_songs = [
row.sample_song for row in lyrics_result.fetchall() if row.sample_song
]
if sample_songs:
combined_sample_song = "\n\n".join(
[f"[샘플 {i + 1}]\n{song}" for i, song in enumerate(sample_songs)]
)
print(f"{len(sample_songs)}개의 샘플 가사 조회 완료")
else:
print("샘플 가사가 비어있습니다")
else:
print("선택된 lyrics가 없습니다")
# 1. song_sample 테이블의 모든 ID 조회
print("\n[샘플 가사 랜덤 선택]")
all_ids_query = """
SELECT id FROM song_sample;
"""
ids_result = await conn.execute(text(all_ids_query))
all_ids = [row.id for row in ids_result.fetchall()]
print(f"전체 샘플 가사 개수: {len(all_ids)}")
# 2. 랜덤하게 3개 선택 (또는 전체 개수가 3개 미만이면 전체)
combined_sample_song = None
if all_ids:
# 3개 또는 전체 개수 중 작은 값 선택
sample_count = min(3, len(all_ids))
selected_ids = random.sample(all_ids, sample_count)
print(f"랜덤 선택된 ID: {selected_ids}")
# 3. 선택된 ID로 샘플 가사 조회
lyrics_query = """
SELECT sample_song FROM song_sample
WHERE id IN :ids
ORDER BY created_at;
"""
lyrics_result = await conn.execute(
text(lyrics_query), {"ids": tuple(selected_ids)}
)
sample_songs = [
row.sample_song for row in lyrics_result.fetchall() if row.sample_song
]
# 4. combined_sample_song 생성
if sample_songs:
combined_sample_song = "\n\n".join(
[f"[샘플 {i + 1}]\n{song}" for i, song in enumerate(sample_songs)]
)
print(f"{len(sample_songs)}개의 샘플 가사 조회 완료")
else:
print("샘플 가사가 비어있습니다")
else:
print("song_sample 테이블에 데이터가 없습니다")
# 5. 프롬프트에 샘플 가사 추가
if combined_sample_song:
updated_prompt += f"""
다음은 참고해야 하는 샘플 가사 정보입니다.
샘플 가사를 참고하여 작곡을 해주세요.
{combined_sample_song}
"""
print("샘플 가사 정보가 프롬프트에 추가되었습니다")
else:
print("샘플 가사가 없어 기본 프롬프트만 사용합니다")
print(f"\n[최종 프롬프트 길이: {len(updated_prompt)} 자]\n")
# 7. 모델에게 요청
generated_lyrics = await chatgpt_api.generate(prompt=updated_prompt)
# 글자 수 계산
total_chars_with_space = len(generated_lyrics)
total_chars_without_space = len(
generated_lyrics.replace(" ", "")
.replace("\n", "")
.replace("\r", "")
.replace("\t", "")
)
# final_lyrics 생성
final_lyrics = f"""속성 {formatted_attributes}
전체 글자 수 (공백 포함): {total_chars_with_space}
전체 글자 수 (공백 제외): {total_chars_without_space}\r\n\r\n{generated_lyrics}"""
# 8. DB 저장
insert_query = """
INSERT INTO song_results_all (
store_info, store_name, store_category, store_address, store_phone_number,
description, prompt, attr_category, attr_value,
ai, ai_model, genre,
sample_song, result_song, created_at
) VALUES (
:store_info, :store_name, :store_category, :store_address, :store_phone_number,
:description, :prompt, :attr_category, :attr_value,
:ai, :ai_model, :genre,
:sample_song, :result_song, NOW()
);
"""
print("\n[insert_params 선택된 속성 확인]")
print(f"Categories: {selected_categories}")
print(f"Values: {selected_values}")
print()
# attr_category, attr_value
insert_params = {
"store_info": store_info.store_info or "",
"store_name": store_info.store_name,
"store_category": store_info.store_category or "",
"store_address": store_info.store_address or "",
"store_phone_number": store_info.store_phone_number or "",
"description": store_info.store_info or "",
"prompt": prompt.id,
# 랜덤 선택된 category와 value 사용
"attr_category": ", ".join(selected_categories)
if selected_categories
else "",
"attr_value": ", ".join(selected_values) if selected_values else "",
"ai": "ChatGPT",
"ai_model": "gpt-5-mini",
"genre": "후크송",
"sample_song": combined_sample_song or "없음",
"result_song": final_lyrics,
}
await conn.execute(text(insert_query), insert_params)
await conn.commit()
print("결과 저장 완료")
print("\n전체 결과 조회 중...")
# 9. 생성 결과 가져오기 (created_at 역순)
select_query = """
SELECT * FROM song_results_all
ORDER BY created_at DESC;
"""
all_results = await conn.execute(text(select_query))
results_list = [
{
"id": row.id,
"store_info": row.store_info,
"store_name": row.store_name,
"store_category": row.store_category,
"store_address": row.store_address,
"store_phone_number": row.store_phone_number,
"description": row.description,
"prompt": row.prompt,
"attr_category": row.attr_category,
"attr_value": row.attr_value,
"ai": row.ai,
"ai_model": row.ai_model,
"genre": row.genre,
"sample_song": row.sample_song,
"result_song": row.result_song,
"created_at": row.created_at.isoformat() if row.created_at else None,
}
for row in all_results.fetchall()
]
print(f"전체 {len(results_list)}개의 결과 조회 완료\n")
return results_list
except HTTPException:
raise
except SQLAlchemyError as e:
print(f"Database Error: {e}")
import traceback
traceback.print_exc()
raise HTTPException(
status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
detail="데이터베이스 연결에 문제가 발생했습니다.",
)
except Exception as e:
print(f"Unexpected Error: {e}")
import traceback
traceback.print_exc()
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="서비스 처리 중 오류가 발생했습니다.",
)