170 lines
6.1 KiB
Python
170 lines
6.1 KiB
Python
from openai import OpenAI
|
|
from dotenv import load_dotenv
|
|
import os
|
|
import base64
|
|
from pathlib import Path
|
|
from PIL import Image
|
|
import requests
|
|
from io import BytesIO
|
|
import json
|
|
import re
|
|
|
|
# .env 로드
|
|
load_dotenv()
|
|
client = OpenAI(api_key=os.getenv("OPENAI_API_KEY"))
|
|
|
|
# GPT-4o mini 단가 (USD per 1M tokens)
|
|
INPUT_COST_PER_TOKEN = 0.15 / 1_000_000
|
|
OUTPUT_COST_PER_TOKEN = 0.60 / 1_000_000
|
|
BASE_IMAGE_TOKENS_512 = 2833 # 512x512 기준 추정
|
|
USD_TO_KRW = 1400 # 환율
|
|
|
|
#def parse_gpt_json_response(response_text: str):
|
|
# """
|
|
# GPT 모델이 반환한 코드 블록 형태의 JSON 응답을 실제 Python dict로 변환
|
|
# 코드 알고리즘이 문제일 경우 추후 고도화 -> 문자열 첫 {를 찾은 후 문자열 뒤집고 첫 } 찾은 후 크롭하면 됨.
|
|
# """
|
|
# if not response_text:
|
|
# return None
|
|
#
|
|
# # 1. ```json ... ``` 또는 ``` ... ``` 코드 블록 제거
|
|
# cleaned = re.sub(r"^```json\s*|\s*```$", "", response_text.strip(), flags=re.DOTALL)
|
|
#
|
|
# # 2. JSON 문자열 파싱 시도
|
|
# try:
|
|
# data = json.loads(cleaned)
|
|
# return data
|
|
# except json.JSONDecodeError:
|
|
# # 코드 블록이 없거나 포맷이 살짝 틀린 경우를 대비
|
|
# try:
|
|
# cleaned_alt = re.sub(r"^```|\s*```$", "", response_text.strip(), flags=re.DOTALL)
|
|
# data = json.loads(cleaned_alt)
|
|
# return data
|
|
# except Exception as e:
|
|
# print("JSON 파싱 실패:", e)
|
|
# print("원본 텍스트:\n", response_text)
|
|
# return None
|
|
|
|
import json
|
|
|
|
def parse_gpt_json_response(response_text: str):
|
|
"""
|
|
GPT 응답에서 첫 번째 '{'부터 마지막 '}'까지 추출하여 JSON으로 파싱.
|
|
GPT가 코드블록이나 안내문을 포함하더라도 안정적으로 동작.
|
|
"""
|
|
if not response_text:
|
|
return None
|
|
|
|
# 문자열에서 JSON 본문 추출
|
|
start = response_text.find('{')
|
|
end = response_text.rfind('}')
|
|
|
|
if start == -1 or end == -1 or start >= end:
|
|
print("JSON 형태를 찾을 수 없습니다.")
|
|
return None
|
|
|
|
json_str = response_text[start:end + 1].strip()
|
|
|
|
# JSON 파싱 시도
|
|
try:
|
|
return json.loads(json_str)
|
|
except json.JSONDecodeError as e:
|
|
print("JSON 파싱 실패:", e)
|
|
print("추출된 문자열:")
|
|
print(json_str)
|
|
return None
|
|
|
|
def estimate_image_tokens(width: int, height: int) -> int:
|
|
"""이미지 해상도 기반 토큰 추정"""
|
|
tiles_w = (width + 511) // 512
|
|
tiles_h = (height + 511) // 512
|
|
total_tiles = tiles_w * tiles_h
|
|
return total_tiles * BASE_IMAGE_TOKENS_512
|
|
|
|
def describe_input(text=None, image_path=None, audio_path=None, model="gpt-4o-mini"):
|
|
content = []
|
|
image_tokens_estimated = 0
|
|
w = h = 0
|
|
|
|
if text:
|
|
content.append({"type": "text", "text": text})
|
|
|
|
if image_path:
|
|
if image_path.startswith("http://") or image_path.startswith("https://"):
|
|
# URL 이미지 → 크기 가져오기
|
|
resp = requests.get(image_path, timeout=10)
|
|
img = Image.open(BytesIO(resp.content))
|
|
w, h = img.size
|
|
image_tokens_estimated = estimate_image_tokens(w, h)
|
|
content.append({
|
|
"type": "image_url",
|
|
"image_url": {"url": image_path}
|
|
})
|
|
else:
|
|
# 로컬 이미지
|
|
img_file = Path(image_path)
|
|
if not img_file.exists():
|
|
raise FileNotFoundError(f"이미지 파일을 찾을 수 없습니다: {image_path}")
|
|
img = Image.open(img_file)
|
|
w, h = img.size
|
|
image_tokens_estimated = estimate_image_tokens(w, h)
|
|
image_data = base64.b64encode(img_file.read_bytes()).decode("utf-8")
|
|
content.append({
|
|
"type": "image_url",
|
|
"image_url": {"url": f"data:image/jpeg;base64,{image_data}"}
|
|
})
|
|
|
|
response = client.chat.completions.create(
|
|
model=model,
|
|
messages=[{"role": "user", "content": content}]
|
|
)
|
|
|
|
result_text = response.choices[0].message.content
|
|
cleaned_result = parse_gpt_json_response(result_text)
|
|
usage = response.usage
|
|
|
|
# 비용 계산
|
|
prompt_cost = usage.prompt_tokens * INPUT_COST_PER_TOKEN
|
|
completion_cost = usage.completion_tokens * OUTPUT_COST_PER_TOKEN
|
|
image_cost = image_tokens_estimated * INPUT_COST_PER_TOKEN
|
|
total_cost_usd = prompt_cost + completion_cost + image_cost
|
|
total_cost_krw = total_cost_usd * USD_TO_KRW
|
|
|
|
print("\n--- 입력 Prompt ---")
|
|
print(text)
|
|
print("\n--- GPT 응답 ---")
|
|
print(cleaned_result)
|
|
print("\n--- 토큰 사용량 ---")
|
|
print(f"입력 토큰(prompt_tokens): {usage.prompt_tokens}")
|
|
print(f"출력 토큰(completion_tokens): {usage.completion_tokens}")
|
|
print(f"총합(total_tokens): {usage.total_tokens}")
|
|
if image_path:
|
|
print(f"이미지 크기: {w}x{h}px")
|
|
print(f"예상 이미지 토큰: {image_tokens_estimated}")
|
|
|
|
print("\n--- 예상 비용 ---")
|
|
print(f"입력 비용: ${prompt_cost:.6f}")
|
|
print(f"출력 비용: ${completion_cost:.6f}")
|
|
if image_path:
|
|
print(f"이미지 예상 비용: ${image_cost:.6f}")
|
|
print(f"총합 예상 비용: ${total_cost_usd:.6f} (≈ {total_cost_krw:,.2f} 원)")
|
|
|
|
return result_text, usage, total_cost_usd, total_cost_krw
|
|
|
|
|
|
|
|
|
|
if __name__ == "__main__":
|
|
test_data_url = "./image_ex/test3.jpeg"
|
|
name = "대성부대고기 용인본점"
|
|
cathegory = "음식점"
|
|
|
|
#describe_input(
|
|
# text="이 이미지를 설명해줘.",
|
|
# image_path=test_data_url
|
|
#)
|
|
|
|
describe_input(
|
|
text=f"이 이미지 퀄리티를, 해상도 upscale했을 때 {cathegory} - {name}의 광고에 사용 가능한 적합한 이미지인지 설명해줘, 적합하지 않은 데이터 사용시 고소당할 수 있음으로 매우 엄격해야함. 다음 항목에 대한 점수를 상, 중, 하로, json형식으로 제공해줘 : 구도, 노출, 색감, 시각적 노이즈, 내용 적절성. 왜 그렇게 생각하는지에 대한 항목별 설명을 먼저 작성. 답변은" + "{} 안의 json형식으로만 작성",
|
|
image_path=test_data_url
|
|
) |