import random from typing import List from fastapi import Request, status from fastapi.exceptions import HTTPException from sqlalchemy import Connection, text from sqlalchemy.exc import SQLAlchemyError from app.lyric.schemas.lyrics_schema import ( AttributeData, PromptTemplateData, SongFormData, SongSampleData, StoreData, ) from app.utils.chatgpt_prompt import chatgpt_api from app.utils.logger import get_logger # 로거 설정 logger = get_logger("lyric") async def get_store_info(conn: Connection) -> List[StoreData]: try: query = """SELECT * FROM store_default_info;""" result = await conn.execute(text(query)) all_store_info = [ StoreData( id=row[0], store_info=row[1], store_name=row[2], store_category=row[3], store_region=row[4], store_address=row[5], store_phone_number=row[6], created_at=row[7], ) for row in result ] result.close() return all_store_info except SQLAlchemyError as e: logger.error(f"Database error in get_store_info: {e}") raise HTTPException( status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail="요청하신 서비스가 잠시 내부적으로 문제가 발생하였습니다.", ) except Exception as e: logger.error(f"Unexpected error in get_store_info: {e}") raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="알수없는 이유로 서비스 오류가 발생하였습니다", ) async def get_attribute(conn: Connection) -> List[AttributeData]: try: query = """SELECT * FROM attribute;""" result = await conn.execute(text(query)) all_attribute = [ AttributeData( id=row[0], attr_category=row[1], attr_value=row[2], created_at=row[3], ) for row in result ] result.close() return all_attribute except SQLAlchemyError as e: logger.error(f"Database error in get_attribute: {e}") raise HTTPException( status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail="요청하신 서비스가 잠시 내부적으로 문제가 발생하였습니다.", ) except Exception as e: logger.error(f"Unexpected error in get_attribute: {e}") raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="알수없는 이유로 서비스 오류가 발생하였습니다", ) async def get_attribute(conn: Connection) -> List[AttributeData]: try: query = """SELECT * FROM attribute;""" result = await conn.execute(text(query)) all_attribute = [ AttributeData( id=row[0], attr_category=row[1], attr_value=row[2], created_at=row[3], ) for row in result ] result.close() return all_attribute except SQLAlchemyError as e: logger.error(f"Database error in get_attribute: {e}") raise HTTPException( status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail="요청하신 서비스가 잠시 내부적으로 문제가 발생하였습니다.", ) except Exception as e: logger.error(f"Unexpected error in get_attribute: {e}") raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="알수없는 이유로 서비스 오류가 발생하였습니다", ) async def get_sample_song(conn: Connection) -> List[SongSampleData]: try: query = """SELECT * FROM song_sample;""" result = await conn.execute(text(query)) all_sample_song = [ SongSampleData( id=row[0], ai=row[1], ai_model=row[2], genre=row[3], sample_song=row[4], ) for row in result ] result.close() return all_sample_song except SQLAlchemyError as e: logger.error(f"Database error in get_sample_song: {e}") raise HTTPException( status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail="요청하신 서비스가 잠시 내부적으로 문제가 발생하였습니다.", ) except Exception as e: logger.error(f"Unexpected error in get_sample_song: {e}") raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="알수없는 이유로 서비스 오류가 발생하였습니다", ) async def get_prompt_template(conn: Connection) -> List[PromptTemplateData]: try: query = """SELECT * FROM prompt_template;""" result = await conn.execute(text(query)) all_prompt_template = [ PromptTemplateData( id=row[0], description=row[1], prompt=row[2], ) for row in result ] result.close() return all_prompt_template except SQLAlchemyError as e: logger.error(f"Database error in get_prompt_template: {e}") raise HTTPException( status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail="요청하신 서비스가 잠시 내부적으로 문제가 발생하였습니다.", ) except Exception as e: logger.error(f"Unexpected error in get_prompt_template: {e}") raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="알수없는 이유로 서비스 오류가 발생하였습니다", ) async def get_song_result(conn: Connection) -> List[PromptTemplateData]: try: query = """SELECT * FROM prompt_template;""" result = await conn.execute(text(query)) all_prompt_template = [ PromptTemplateData( id=row[0], description=row[1], prompt=row[2], ) for row in result ] result.close() return all_prompt_template except SQLAlchemyError as e: logger.error(f"Database error in get_song_result: {e}") raise HTTPException( status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail="요청하신 서비스가 잠시 내부적으로 문제가 발생하였습니다.", ) except Exception as e: logger.error(f"Unexpected error in get_song_result: {e}") raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="알수없는 이유로 서비스 오류가 발생하였습니다", ) async def make_song_result(request: Request, conn: Connection): try: # 1. Form 데이터 파싱 form_data = await SongFormData.from_form(request) logger.info(f"{'=' * 60}") logger.info(f"Store ID: {form_data.store_id}") logger.info(f"Lyrics IDs: {form_data.lyrics_ids}") logger.info(f"Prompt IDs: {form_data.prompts}") logger.info(f"{'=' * 60}") # 2. Store 정보 조회 store_query = """ SELECT * FROM store_default_info WHERE id=:id; """ store_result = await conn.execute(text(store_query), {"id": form_data.store_id}) all_store_info = [ StoreData( id=row[0], store_info=row[1], store_name=row[2], store_category=row[3], store_region=row[4], store_address=row[5], store_phone_number=row[6], created_at=row[7], ) for row in store_result ] if not all_store_info: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=f"Store not found: {form_data.store_id}", ) store_info = all_store_info[0] logger.info(f"Store: {store_info.store_name}") # 3. 속성 조회 -- 단계별 선택 프로세서시 구현 필요 없음 # 4. Sample Song 조회 및 결합 combined_sample_song = None if form_data.lyrics_ids: logger.info(f"[샘플 가사 조회] - {len(form_data.lyrics_ids)}개") lyrics_query = """ SELECT sample_song FROM song_sample WHERE id IN :ids ORDER BY created_at; """ lyrics_result = await conn.execute( text(lyrics_query), {"ids": tuple(form_data.lyrics_ids)} ) sample_songs = [ row.sample_song for row in lyrics_result.fetchall() if row.sample_song ] if sample_songs: combined_sample_song = "\n\n".join( [f"[샘플 {i + 1}]\n{song}" for i, song in enumerate(sample_songs)] ) logger.info(f"{len(sample_songs)}개의 샘플 가사 조회 완료") else: logger.info("샘플 가사가 비어있습니다") else: logger.info("선택된 lyrics가 없습니다") # 5. 템플릿 가져오기 if not form_data.prompts: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="프롬프트 ID가 필요합니다", ) logger.info("템플릿 가져오기") prompts_query = """ SELECT * FROM prompt_template WHERE id=:id; """ # ✅ 수정: store_query → prompts_query prompts_result = await conn.execute( text(prompts_query), {"id": form_data.prompts} ) prompts_info = [ PromptTemplateData( id=row[0], description=row[1], prompt=row[2], ) for row in prompts_result ] if not prompts_info: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=f"Prompt not found: {form_data.prompts}", ) prompt = prompts_info[0] logger.debug(f"Prompt Template: {prompt.prompt}") # ✅ 6. 프롬프트 조합 updated_prompt = prompt.prompt.replace("###", form_data.attributes_str).format( name=store_info.store_name or "", address=store_info.store_address or "", category=store_info.store_category or "", description=store_info.store_info or "", ) updated_prompt += f""" 다음은 참고해야 하는 샘플 가사 정보입니다. 샘플 가사를 참고하여 작곡을 해주세요. {combined_sample_song} """ logger.debug(f"[업데이트된 프롬프트]\n{updated_prompt}") # 7. 모델에게 요청 generated_lyrics = await chatgpt_api.generate(prompt=updated_prompt) # 글자 수 계산 total_chars_with_space = len(generated_lyrics) total_chars_without_space = len( generated_lyrics.replace(" ", "") .replace("\n", "") .replace("\r", "") .replace("\t", "") ) # final_lyrics 생성 final_lyrics = f"""속성 {form_data.attributes_str} 전체 글자 수 (공백 포함): {total_chars_with_space}자 전체 글자 수 (공백 제외): {total_chars_without_space}자\r\n\r\n{generated_lyrics}""" logger.debug("=" * 40) logger.debug(f"[translate:form_data.attributes_str:] {form_data.attributes_str}") logger.debug(f"[translate:total_chars_with_space:] {total_chars_with_space}") logger.debug(f"[translate:total_chars_without_space:] {total_chars_without_space}") logger.debug(f"[translate:final_lyrics:]\n{final_lyrics}") logger.debug("=" * 40) # 8. DB 저장 insert_query = """ INSERT INTO song_results_all ( store_info, store_name, store_category, store_address, store_phone_number, description, prompt, attr_category, attr_value, ai, ai_model, genre, sample_song, result_song, created_at ) VALUES ( :store_info, :store_name, :store_category, :store_address, :store_phone_number, :description, :prompt, :attr_category, :attr_value, :ai, :ai_model, :genre, :sample_song, :result_song, NOW() ); """ # ✅ attr_category, attr_value 추가 insert_params = { "store_info": store_info.store_info or "", "store_name": store_info.store_name, "store_category": store_info.store_category or "", "store_address": store_info.store_address or "", "store_phone_number": store_info.store_phone_number or "", "description": store_info.store_info or "", "prompt": form_data.prompts, "attr_category": ", ".join(form_data.attributes.keys()) if form_data.attributes else "", "attr_value": ", ".join(form_data.attributes.values()) if form_data.attributes else "", "ai": "ChatGPT", "ai_model": form_data.llm_model, "genre": "후크송", "sample_song": combined_sample_song or "없음", "result_song": final_lyrics, } await conn.execute(text(insert_query), insert_params) await conn.commit() logger.info("결과 저장 완료") logger.info("전체 결과 조회 중...") # 9. 생성 결과 가져오기 (created_at 역순) select_query = """ SELECT * FROM song_results_all ORDER BY created_at DESC; """ all_results = await conn.execute(text(select_query)) results_list = [ { "id": row.id, "store_info": row.store_info, "store_name": row.store_name, "store_category": row.store_category, "store_address": row.store_address, "store_phone_number": row.store_phone_number, "description": row.description, "prompt": row.prompt, "attr_category": row.attr_category, "attr_value": row.attr_value, "ai": row.ai, "ai_model": row.ai_model, "genre": row.genre, "sample_song": row.sample_song, "result_song": row.result_song, "created_at": row.created_at.isoformat() if row.created_at else None, } for row in all_results.fetchall() ] logger.info(f"전체 {len(results_list)}개의 결과 조회 완료") return results_list except HTTPException: raise except SQLAlchemyError as e: logger.error(f"Database Error: {e}", exc_info=True) raise HTTPException( status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail="데이터베이스 연결에 문제가 발생했습니다.", ) except Exception as e: logger.error(f"Unexpected Error: {e}", exc_info=True) raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="서비스 처리 중 오류가 발생했습니다.", ) async def get_song_result(conn: Connection): # 반환 타입 수정 try: select_query = """ SELECT * FROM song_results_all ORDER BY created_at DESC; """ all_results = await conn.execute(text(select_query)) results_list = [ { "id": row.id, "store_info": row.store_info, "store_name": row.store_name, "store_category": row.store_category, "store_address": row.store_address, "store_phone_number": row.store_phone_number, "description": row.description, "prompt": row.prompt, "attr_category": row.attr_category, "attr_value": row.attr_value, "ai": row.ai, "ai_model": row.ai_model, "season": row.season, "num_of_people": row.num_of_people, "people_category": row.people_category, "genre": row.genre, "sample_song": row.sample_song, "result_song": row.result_song, "created_at": row.created_at.isoformat() if row.created_at else None, } for row in all_results.fetchall() ] logger.info(f"전체 {len(results_list)}개의 결과 조회 완료") return results_list except HTTPException: # HTTPException은 그대로 raise raise except SQLAlchemyError as e: logger.error(f"Database Error: {e}", exc_info=True) raise HTTPException( status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail="데이터베이스 연결에 문제가 발생했습니다.", ) except Exception as e: logger.error(f"Unexpected Error: {e}", exc_info=True) raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="서비스 처리 중 오류가 발생했습니다.", ) async def make_automation(request: Request, conn: Connection): try: # 1. Form 데이터 파싱 form_data = await SongFormData.from_form(request) logger.info(f"{'=' * 60}") logger.info(f"Store ID: {form_data.store_id}") logger.info(f"{'=' * 60}") # 2. Store 정보 조회 store_query = """ SELECT * FROM store_default_info WHERE id=:id; """ store_result = await conn.execute(text(store_query), {"id": form_data.store_id}) all_store_info = [ StoreData( id=row[0], store_info=row[1], store_name=row[2], store_category=row[3], store_region=row[4], store_address=row[5], store_phone_number=row[6], created_at=row[7], ) for row in store_result ] if not all_store_info: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=f"Store not found: {form_data.store_id}", ) store_info = all_store_info[0] logger.info(f"Store: {store_info.store_name}") # 3. 속성 조회 -- 단계별 선택 프로세서시 구현 필요 없음 attribute_query = """ SELECT * FROM attribute; """ attribute_results = await conn.execute(text(attribute_query)) # 결과 가져오기 attribute_rows = attribute_results.fetchall() formatted_attributes = "" selected_categories = [] selected_values = [] if attribute_rows: attribute_list = [ AttributeData( id=row[0], attr_category=row[1], attr_value=row[2], created_at=row[3], ) for row in attribute_rows ] # ✅ 각 category에서 하나의 value만 랜덤 선택 formatted_pairs = [] for attr in attribute_list: # 쉼표로 분리 및 공백 제거 values = [v.strip() for v in attr.attr_value.split(",") if v.strip()] if values: # 랜덤하게 하나만 선택 selected_value = random.choice(values) formatted_pairs.append(f"{attr.attr_category} : {selected_value}") # ✅ 선택된 category와 value 저장 selected_categories.append(attr.attr_category) selected_values.append(selected_value) # 최종 문자열 생성 formatted_attributes = "\n".join(formatted_pairs) logger.debug(f"[포맷팅된 문자열 속성 정보]\n{formatted_attributes}") else: logger.info("속성 데이터가 없습니다") formatted_attributes = "" # 4. 템플릿 가져오기 logger.info("템플릿 가져오기 (ID=1)") prompts_query = """ SELECT * FROM prompt_template WHERE id=1; """ prompts_result = await conn.execute(text(prompts_query)) row = prompts_result.fetchone() if not row: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="Prompt ID 1 not found", ) prompt = PromptTemplateData( id=row[0], description=row[1], prompt=row[2], ) logger.debug(f"Prompt Template: {prompt.prompt}") # 5. 템플릿 조합 updated_prompt = prompt.prompt.replace("###", formatted_attributes).format( name=store_info.store_name or "", address=store_info.store_address or "", category=store_info.store_category or "", description=store_info.store_info or "", ) logger.debug("=" * 80) logger.debug("업데이트된 프롬프트") logger.debug("=" * 80) logger.debug(updated_prompt) logger.debug("=" * 80) # 4. Sample Song 조회 및 결합 combined_sample_song = None if form_data.lyrics_ids: logger.info(f"[샘플 가사 조회] - {len(form_data.lyrics_ids)}개") lyrics_query = """ SELECT sample_song FROM song_sample WHERE id IN :ids ORDER BY created_at; """ lyrics_result = await conn.execute( text(lyrics_query), {"ids": tuple(form_data.lyrics_ids)} ) sample_songs = [ row.sample_song for row in lyrics_result.fetchall() if row.sample_song ] if sample_songs: combined_sample_song = "\n\n".join( [f"[샘플 {i + 1}]\n{song}" for i, song in enumerate(sample_songs)] ) logger.info(f"{len(sample_songs)}개의 샘플 가사 조회 완료") else: logger.info("샘플 가사가 비어있습니다") else: logger.info("선택된 lyrics가 없습니다") # 1. song_sample 테이블의 모든 ID 조회 logger.info("[샘플 가사 랜덤 선택]") all_ids_query = """ SELECT id FROM song_sample; """ ids_result = await conn.execute(text(all_ids_query)) all_ids = [row.id for row in ids_result.fetchall()] logger.info(f"전체 샘플 가사 개수: {len(all_ids)}개") # 2. 랜덤하게 3개 선택 (또는 전체 개수가 3개 미만이면 전체) combined_sample_song = None if all_ids: # 3개 또는 전체 개수 중 작은 값 선택 sample_count = min(3, len(all_ids)) selected_ids = random.sample(all_ids, sample_count) logger.info(f"랜덤 선택된 ID: {selected_ids}") # 3. 선택된 ID로 샘플 가사 조회 lyrics_query = """ SELECT sample_song FROM song_sample WHERE id IN :ids ORDER BY created_at; """ lyrics_result = await conn.execute( text(lyrics_query), {"ids": tuple(selected_ids)} ) sample_songs = [ row.sample_song for row in lyrics_result.fetchall() if row.sample_song ] # 4. combined_sample_song 생성 if sample_songs: combined_sample_song = "\n\n".join( [f"[샘플 {i + 1}]\n{song}" for i, song in enumerate(sample_songs)] ) logger.info(f"{len(sample_songs)}개의 샘플 가사 조회 완료") else: logger.info("샘플 가사가 비어있습니다") else: logger.info("song_sample 테이블에 데이터가 없습니다") # 5. 프롬프트에 샘플 가사 추가 if combined_sample_song: updated_prompt += f""" 다음은 참고해야 하는 샘플 가사 정보입니다. 샘플 가사를 참고하여 작곡을 해주세요. {combined_sample_song} """ logger.info("샘플 가사 정보가 프롬프트에 추가되었습니다") else: logger.info("샘플 가사가 없어 기본 프롬프트만 사용합니다") logger.info(f"[최종 프롬프트 길이: {len(updated_prompt)} 자]") # 7. 모델에게 요청 generated_lyrics = await chatgpt_api.generate(prompt=updated_prompt) # 글자 수 계산 total_chars_with_space = len(generated_lyrics) total_chars_without_space = len( generated_lyrics.replace(" ", "") .replace("\n", "") .replace("\r", "") .replace("\t", "") ) # final_lyrics 생성 final_lyrics = f"""속성 {formatted_attributes} 전체 글자 수 (공백 포함): {total_chars_with_space}자 전체 글자 수 (공백 제외): {total_chars_without_space}자\r\n\r\n{generated_lyrics}""" # 8. DB 저장 insert_query = """ INSERT INTO song_results_all ( store_info, store_name, store_category, store_address, store_phone_number, description, prompt, attr_category, attr_value, ai, ai_model, genre, sample_song, result_song, created_at ) VALUES ( :store_info, :store_name, :store_category, :store_address, :store_phone_number, :description, :prompt, :attr_category, :attr_value, :ai, :ai_model, :genre, :sample_song, :result_song, NOW() ); """ logger.debug("[insert_params 선택된 속성 확인]") logger.debug(f"Categories: {selected_categories}") logger.debug(f"Values: {selected_values}") # attr_category, attr_value insert_params = { "store_info": store_info.store_info or "", "store_name": store_info.store_name, "store_category": store_info.store_category or "", "store_address": store_info.store_address or "", "store_phone_number": store_info.store_phone_number or "", "description": store_info.store_info or "", "prompt": prompt.id, # 랜덤 선택된 category와 value 사용 "attr_category": ", ".join(selected_categories) if selected_categories else "", "attr_value": ", ".join(selected_values) if selected_values else "", "ai": "ChatGPT", "ai_model": "gpt-5-mini", "genre": "후크송", "sample_song": combined_sample_song or "없음", "result_song": final_lyrics, } await conn.execute(text(insert_query), insert_params) await conn.commit() logger.info("결과 저장 완료") logger.info("전체 결과 조회 중...") # 9. 생성 결과 가져오기 (created_at 역순) select_query = """ SELECT * FROM song_results_all ORDER BY created_at DESC; """ all_results = await conn.execute(text(select_query)) results_list = [ { "id": row.id, "store_info": row.store_info, "store_name": row.store_name, "store_category": row.store_category, "store_address": row.store_address, "store_phone_number": row.store_phone_number, "description": row.description, "prompt": row.prompt, "attr_category": row.attr_category, "attr_value": row.attr_value, "ai": row.ai, "ai_model": row.ai_model, "genre": row.genre, "sample_song": row.sample_song, "result_song": row.result_song, "created_at": row.created_at.isoformat() if row.created_at else None, } for row in all_results.fetchall() ] logger.info(f"전체 {len(results_list)}개의 결과 조회 완료") return results_list except HTTPException: raise except SQLAlchemyError as e: logger.error(f"Database Error: {e}", exc_info=True) raise HTTPException( status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail="데이터베이스 연결에 문제가 발생했습니다.", ) except Exception as e: logger.error(f"Unexpected Error: {e}", exc_info=True) raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="서비스 처리 중 오류가 발생했습니다.", )