import random import numpy as np from negotiation_agent.environment import NegotiationEnv from negotiation_agent.spaces import State, PriceZone, AcceptanceRate, Scenario from agents.offline_agent import QLearningAgent from usecases.initialize_env_usecase import initialize_environment_usecase def convert_action_to_response(action_idx, proposed_price): """에이전트의 행동을 상황에 맞는 응답 텍스트로 변환""" action_responses = { 0: [ "강한 수락 (Action 0: STRONG_ACCEPT): 제안을 매우 흡족하게 수락하겠습니다." ], 1: ["중간 수락 (Action 1: MEDIUM_ACCEPT): 제안을 수락하겠습니다."], 2: ["약한 수락 (Action 2: WEAK_ACCEPT): 고민 끝에 제안을 수락하겠습니다."], 3: [ f"강한 거절 (Action 3: STRONG_REJECT): {proposed_price}은(는) 너무 높은 가격입니다. 대폭 낮춰주셔야 합니다." ], 4: [ f"중간 거절 (Action 4: MEDIUM_REJECT): {proposed_price}은(는) 높습니다. 더 낮은 가격을 제안해주세요." ], 5: [ f"약한 거절 (Action 5: WEAK_REJECT): {proposed_price}은(는) 조금 높습니다. 더 조정이 필요합니다." ], 6: ["강한 가격 제안 (Action 6: STRONG_PROPOSE): 대폭 낮은 가격을 제안합니다."], 7: ["중간 가격 제안 (Action 7: MEDIUM_PROPOSE): 조정된 가격을 제안합니다."], 8: ["약한 가격 제안 (Action 8: WEAK_PROPOSE): 소폭 조정된 가격을 제안합니다."], } response = random.choice( action_responses.get( action_idx, [f"Action {action_idx}: 가격 조정이 필요합니다."] ) ) return f"{proposed_price}에 대한 응답 - {response}" def run_interactive_negotiation(): """대화형 협상 시뮬레이션 실행""" # 환경 및 에이전트 초기화 env = initialize_environment_usecase() agent_params = { "learning_rate": 0.001, "discount_factor": 0.99, "epsilon": 0.0, # 평가 모드에서는 탐험하지 않음 } # MultiDiscrete 공간의 크기 계산 state_size = np.prod(env.observation_space.nvec) # 상태 공간의 전체 크기 action_size = ( env.action_space.n if hasattr(env.action_space, "n") else np.prod(env.action_space.nvec) ) # 행동 공간의 전체 크기 agent = QLearningAgent(agent_params, state_size, action_size) # Q-table 로드 agent.load_q_table("saved_models/q_table.npy") while True: # 새로운 에피소드 시작 state = env.reset() target_price = env.target_price threshold_price = env.threshold_price episode_done = False print("\n=== 새로운 협상 시작 ===") print(f"목표 가격: {target_price}") print(f"임계 가격: {threshold_price}") print("\n협상을 시작합니다. 가격을 제안해주세요.") while not episode_done: # 사용자 입력 받기 try: user_price = float(input("\n당신의 제안 가격을 입력하세요: ")) # 목표가격 이하로 제안이 들어오면 즉시 수락 및 종료 if user_price <= target_price: print("\n=== 협상 성공! ===") print( f"제안된 가격 ({user_price})이 목표가격 ({target_price}) 이하입니다." ) print("에이전트: 즉시 수락 (특별 행동: 즉시 수락)") print("\n시뮬레이션을 종료합니다.") return # 전체 시뮬레이션 종료 except ValueError: print("올바른 가격을 입력해주세요") continue # 현재 가격 업데이트 및 상태 계산 env.current_price = user_price next_state = env._get_state() # 상태 인덱스 계산 try: state_idx = np.ravel_multi_index(next_state, env.observation_space.nvec) except ValueError as e: print(f"\n디버그 정보:") print(f"현재 상태 벡터: {next_state}") print(f"상태 공간 크기: {env.observation_space.nvec}") print(f"에러: {e}") state_idx = 0 # 현재 상태의 Q값들과 액션 마스크 출력 print(f"\n디버그 정보:") print(f"현재 상태 벡터: {next_state}") print(f"계산된 상태 인덱스: {state_idx}") q_values = agent.q_table[state_idx] # 액션 마스크 가져오기 action_mask = agent.episode_policy.get_action_mask() print("\n현재 상태의 Q값들과 선택 가능 여부:") print("(O: 선택 가능, X: 이미 사용됨)") for action_idx, (q_value, mask) in enumerate(zip(q_values, action_mask)): available = "O" if mask == 1 else "X" print(f"Action {action_idx}: {q_value:.4f} [{available}]") # 에이전트의 응답 생성 (epsilon=0이므로 항상 최대 Q값의 행동 선택) agent_action = agent.get_action(state_idx) masked_q_values = q_values * action_mask max_q = np.max(masked_q_values) print(f"\n선택된 행동: {agent_action} (Q값: {q_values[agent_action]:.4f})") if np.allclose(masked_q_values[action_mask > 0], 0, atol=1e-10): print("(순차적 선택: 모든 유효한 Q값이 0에 가까움)") # 에이전트의 응답 출력 response = convert_action_to_response(agent_action, user_price) print(f"\n에이전트의 응답: {response}") state = next_state # 다시 시작 여부 확인 if ( not input("\n새로운 협상을 시작하시겠습니까? (y/n): ") .lower() .startswith("y") ): break if __name__ == "__main__": run_interactive_negotiation()