import gymnasium as gym from gymnasium import spaces import numpy as np from negotiation_agent.spaces import ( NegotiationSpaces, State, PriceZone, AcceptanceRate, Scenario, ) class NegotiationEnv(gym.Env): """'Q-Table 설계 보고서' 기반의 협상 환경 Custom Environment""" def __init__(self, scenario=0, target_price=100, threshold_price=120): super(NegotiationEnv, self).__init__() self.spaces = NegotiationSpaces() self.observation_space = self.spaces.observation_space self.action_space = self.spaces.action_space self.initial_scenario = Scenario(scenario) self.target_price = target_price self.threshold_price = threshold_price self.current_price = None self.initial_price = None self.state = None def _get_state(self): """현재 정보를 바탕으로 State 배열을 계산""" if self.current_price <= self.target_price: price_zone = PriceZone.BELOW_TARGET elif self.target_price < self.current_price <= self.threshold_price: price_zone = PriceZone.BETWEEN_TARGET_AND_THRESHOLD else: price_zone = PriceZone.ABOVE_THRESHOLD acceptance_rate_val = ( self.initial_price - self.current_price ) / self.initial_price if acceptance_rate_val < 0.1: acceptance_rate_level = AcceptanceRate.LOW elif 0.1 <= acceptance_rate_val < 0.25: acceptance_rate_level = AcceptanceRate.MEDIUM else: acceptance_rate_level = AcceptanceRate.HIGH state = State( scenario=self.initial_scenario, price_zone=price_zone, acceptance_rate=acceptance_rate_level, ) return np.array(state.to_array()) def reset(self, seed=None, options=None): """환경을 초기 상태로 리셋""" super().reset(seed=seed) self.initial_price = self.threshold_price * 1.2 self.current_price = self.initial_price self.state = self._get_state() return self.state, {} def step(self, action): """행동을 수행하고 다음 상태, 보상 등을 반환""" # 아주 간단한 판매자 반응 시뮬레이션: 행동과 무관하게 가격이 조금씩 내려감 price_drop = np.random.uniform(0.02, 0.08) * self.current_price self.current_price -= price_drop terminated = self.current_price < self.target_price * 0.95 reward = self._calculate_reward(self._get_state(), terminated) self.state = self._get_state() return self.state, reward, terminated, False, {} def _calculate_reward(self, state, terminated): """보고서 기반 보상 함수 (간략화된 버전)""" scenario_idx, price_zone_idx, _ = state s_n_map = {0: 1.0, 1: 0.75, 2: 0.5, 3: 0.25} pz_n_map = {0: 0.1, 1: 0.5, 2: 1.0} W = (s_n_map[scenario_idx] + pz_n_map[price_zone_idx]) / 2 reward = W * (self.target_price / max(self.current_price, 1)) + (1 - W) * ( 1 if terminated else 0 ) return reward