KT_Q_Table/interactive_negotiation.py

146 lines
6.0 KiB
Python

import random
import numpy as np
from negotiation_agent.environment import NegotiationEnv
from negotiation_agent.spaces import State, PriceZone, AcceptanceRate, Scenario
from agents.offline_agent import QLearningAgent
from usecases.initialize_env_usecase import initialize_environment_usecase
def convert_action_to_response(action_idx, proposed_price):
"""에이전트의 행동을 상황에 맞는 응답 텍스트로 변환"""
action_responses = {
0: [
"강한 수락 (Action 0: STRONG_ACCEPT): 제안을 매우 흡족하게 수락하겠습니다."
],
1: ["중간 수락 (Action 1: MEDIUM_ACCEPT): 제안을 수락하겠습니다."],
2: ["약한 수락 (Action 2: WEAK_ACCEPT): 고민 끝에 제안을 수락하겠습니다."],
3: [
f"강한 거절 (Action 3: STRONG_REJECT): {proposed_price}은(는) 너무 높은 가격입니다. 대폭 낮춰주셔야 합니다."
],
4: [
f"중간 거절 (Action 4: MEDIUM_REJECT): {proposed_price}은(는) 높습니다. 더 낮은 가격을 제안해주세요."
],
5: [
f"약한 거절 (Action 5: WEAK_REJECT): {proposed_price}은(는) 조금 높습니다. 더 조정이 필요합니다."
],
6: ["강한 가격 제안 (Action 6: STRONG_PROPOSE): 대폭 낮은 가격을 제안합니다."],
7: ["중간 가격 제안 (Action 7: MEDIUM_PROPOSE): 조정된 가격을 제안합니다."],
8: ["약한 가격 제안 (Action 8: WEAK_PROPOSE): 소폭 조정된 가격을 제안합니다."],
}
response = random.choice(
action_responses.get(
action_idx, [f"Action {action_idx}: 가격 조정이 필요합니다."]
)
)
return f"{proposed_price}에 대한 응답 - {response}"
def run_interactive_negotiation():
"""대화형 협상 시뮬레이션 실행"""
# 환경 및 에이전트 초기화
env = initialize_environment_usecase()
agent_params = {
"learning_rate": 0.001,
"discount_factor": 0.99,
"epsilon": 0.0, # 평가 모드에서는 탐험하지 않음
}
# MultiDiscrete 공간의 크기 계산
state_size = np.prod(env.observation_space.nvec) # 상태 공간의 전체 크기
action_size = (
env.action_space.n
if hasattr(env.action_space, "n")
else np.prod(env.action_space.nvec)
) # 행동 공간의 전체 크기
agent = QLearningAgent(agent_params, state_size, action_size)
# Q-table 로드
agent.load_q_table("saved_models/q_table.npy")
while True:
# 새로운 에피소드 시작
state = env.reset()
target_price = env.target_price
threshold_price = env.threshold_price
episode_done = False
print("\n=== 새로운 협상 시작 ===")
print(f"목표 가격: {target_price}")
print(f"임계 가격: {threshold_price}")
print("\n협상을 시작합니다. 가격을 제안해주세요.")
while not episode_done:
# 사용자 입력 받기
try:
user_price = float(input("\n당신의 제안 가격을 입력하세요: "))
# 목표가격 이하로 제안이 들어오면 즉시 수락 및 종료
if user_price <= target_price:
print("\n=== 협상 성공! ===")
print(
f"제안된 가격 ({user_price})이 목표가격 ({target_price}) 이하입니다."
)
print("에이전트: 즉시 수락 (특별 행동: 즉시 수락)")
print("\n시뮬레이션을 종료합니다.")
return # 전체 시뮬레이션 종료
except ValueError:
print("올바른 가격을 입력해주세요")
continue
# 현재 가격 업데이트 및 상태 계산
env.current_price = user_price
next_state = env._get_state()
# 상태 인덱스 계산
try:
state_idx = np.ravel_multi_index(next_state, env.observation_space.nvec)
except ValueError as e:
print(f"\n디버그 정보:")
print(f"현재 상태 벡터: {next_state}")
print(f"상태 공간 크기: {env.observation_space.nvec}")
print(f"에러: {e}")
state_idx = 0
# 현재 상태의 Q값들과 액션 마스크 출력
print(f"\n디버그 정보:")
print(f"현재 상태 벡터: {next_state}")
print(f"계산된 상태 인덱스: {state_idx}")
q_values = agent.q_table[state_idx]
# 액션 마스크 가져오기
action_mask = agent.episode_policy.get_action_mask()
print("\n현재 상태의 Q값들과 선택 가능 여부:")
print("(O: 선택 가능, X: 이미 사용됨)")
for action_idx, (q_value, mask) in enumerate(zip(q_values, action_mask)):
available = "O" if mask == 1 else "X"
print(f"Action {action_idx}: {q_value:.4f} [{available}]")
# 에이전트의 응답 생성 (epsilon=0이므로 항상 최대 Q값의 행동 선택)
agent_action = agent.get_action(state_idx)
masked_q_values = q_values * action_mask
max_q = np.max(masked_q_values)
print(f"\n선택된 행동: {agent_action} (Q값: {q_values[agent_action]:.4f})")
if np.allclose(masked_q_values[action_mask > 0], 0, atol=1e-10):
print("(순차적 선택: 모든 유효한 Q값이 0에 가까움)")
# 에이전트의 응답 출력
response = convert_action_to_response(agent_action, user_price)
print(f"\n에이전트의 응답: {response}")
state = next_state
# 다시 시작 여부 확인
if (
not input("\n새로운 협상을 시작하시겠습니까? (y/n): ")
.lower()
.startswith("y")
):
break
if __name__ == "__main__":
run_interactive_negotiation()