q_table_demo/app/services/demo_service.py

441 lines
15 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

"""
Q-Table 협상 전략 데모 메인 서비스
"""
import random
import time
from typing import Dict, List, Optional, Any, Tuple
import pandas as pd
import numpy as np
from app.models.schemas import (
CardType, ScenarioType, PriceZoneType,
ExperienceData, EpisodeGenerationRequest,
RewardCalculationRequest, RewardCalculationResponse,
ActionRecommendationRequest, ActionRecommendationResponse,
SystemStatus
)
from app.services.negotiation_env import NegotiationEnvironment
from app.services.qtable_learner import QTableLearner, ExperienceBuffer
from app.services.fqi_cql import FQICQLLearner
class DemoService:
"""Q-Table 협상 전략 데모 메인 서비스"""
def __init__(self):
# 환경 초기화
self.env = NegotiationEnvironment()
# 상태 및 행동 공간
self.states = self.env.get_all_states()
self.actions = self.env.get_all_actions()
# 학습 엔진들
self.experience_buffer = ExperienceBuffer(max_size=10000)
self.q_learner = QTableLearner(
states=self.states,
actions=self.actions,
learning_rate=0.1,
discount_factor=0.9,
epsilon=0.1
)
self.fqi_cql_learner = FQICQLLearner(
states=self.states,
actions=self.actions,
alpha=1.0,
gamma=0.95
)
# 통계 정보
self.episode_count = 0
self.start_time = time.time()
def calculate_reward(self, request: RewardCalculationRequest) -> RewardCalculationResponse:
"""보상 계산"""
reward, weight = self.env.calculate_reward(
scenario=request.scenario,
price_zone=request.price_zone,
anchor_price=request.anchor_price,
proposed_price=request.proposed_price,
is_end=request.is_end
)
# 시나리오 및 가격구간 가중치
scenario_weight = self.env.scenario_weights[request.scenario]
price_zone_weight = self.env.price_zone_weights[request.price_zone]
# 가격 비율
price_ratio = request.anchor_price / request.proposed_price if request.proposed_price > 0 else float('inf')
# 공식 분해
formula_breakdown = (
f"R(s,a) = W × (A/P) + (1-W) × End\n"
f"W = (S_n + PZ_n) / 2 = ({scenario_weight} + {price_zone_weight}) / 2 = {weight:.3f}\n"
f"A/P = {request.anchor_price}/{request.proposed_price} = {price_ratio:.3f}\n"
f"End = {1 if request.is_end else 0}\n"
f"R(s,a) = {weight:.3f} × {price_ratio:.3f} + {1-weight:.3f} × {1 if request.is_end else 0} = {reward:.3f}"
)
return RewardCalculationResponse(
reward=reward,
weight=weight,
scenario_weight=scenario_weight,
price_zone_weight=price_zone_weight,
price_ratio=price_ratio,
formula_breakdown=formula_breakdown
)
def generate_episodes(self, request: EpisodeGenerationRequest) -> Dict[str, Any]:
"""에피소드 생성"""
new_experiences = 0
episode_results = []
for episode in range(request.num_episodes):
episode_result = self._generate_single_episode(
max_steps=request.max_steps,
anchor_price=request.anchor_price,
exploration_rate=request.exploration_rate,
episode_id=self.episode_count + episode
)
episode_results.append(episode_result)
new_experiences += episode_result['steps']
self.episode_count += request.num_episodes
return {
"episodes_generated": request.num_episodes,
"new_experiences": new_experiences,
"episode_results": episode_results,
"total_episodes": self.episode_count
}
def _generate_single_episode(
self,
max_steps: int,
anchor_price: float,
exploration_rate: float,
episode_id: int
) -> Dict[str, Any]:
"""단일 에피소드 생성"""
# 초기 상태
current_state = "C0S0P0"
scenario = random.choice(list(ScenarioType))
episode_reward = 0.0
steps = 0
success = False
for step in range(max_steps):
# 행동 선택 (epsilon-greedy)
if random.random() < exploration_rate:
action = random.choice(self.actions)
is_exploration = True
else:
action = self.q_learner.get_optimal_action(current_state)
is_exploration = False
# 환경 응답
proposed_price = self.env.simulate_opponent_response(
current_card=action,
scenario=scenario,
anchor_price=anchor_price,
step=step
)
# 가격 구간 결정
price_zone = self.env.get_price_zone(proposed_price, anchor_price)
# 다음 상태
next_state = f"{action.value}{scenario.value}{price_zone.value}"
# 종료 조건 확인
is_done = self.env.is_negotiation_successful(proposed_price, anchor_price) or (step >= max_steps - 1)
if self.env.is_negotiation_successful(proposed_price, anchor_price):
success = True
# 보상 계산
reward, weight = self.env.calculate_reward(
scenario=scenario,
price_zone=price_zone,
anchor_price=anchor_price,
proposed_price=proposed_price,
is_end=is_done
)
# 경험 저장
metadata = {
'episode': episode_id,
'step': step,
'scenario': scenario.value,
'proposed_price': proposed_price,
'weight': weight,
'is_exploration': is_exploration,
'anchor_price': anchor_price
}
self.experience_buffer.add_experience(
state=current_state,
action=action,
reward=reward,
next_state=next_state,
done=is_done,
metadata=metadata
)
episode_reward += reward
steps += 1
current_state = next_state
if is_done:
break
return {
'episode_id': episode_id,
'steps': steps,
'total_reward': episode_reward,
'success': success,
'final_price': proposed_price if 'proposed_price' in locals() else anchor_price,
'scenario': scenario.value
}
def update_q_learning(self, learning_rate: float, discount_factor: float, batch_size: int) -> Dict[str, Any]:
"""Q-Learning 업데이트"""
# 하이퍼파라미터 설정
self.q_learner.set_hyperparameters(
learning_rate=learning_rate,
discount_factor=discount_factor
)
# 경험 데이터 가져오기
experiences = self.experience_buffer.get_experiences()
if not experiences:
return {"message": "No experience data available", "updates": 0}
# 배치 샘플링
if len(experiences) > batch_size:
batch = self.experience_buffer.sample_batch(batch_size)
else:
batch = experiences
# 배치 업데이트
result = self.q_learner.batch_update(batch)
return {
"message": "Q-Learning update completed",
"batch_size": len(batch),
"updates": result["updates"],
"avg_td_error": result["avg_td_error"],
"total_updates": self.q_learner.update_count
}
def run_fqi_cql(self, alpha: float, gamma: float, batch_size: int, num_iterations: int) -> Dict[str, Any]:
"""FQI+CQL 실행"""
# 하이퍼파라미터 설정
self.fqi_cql_learner.set_hyperparameters(
alpha=alpha,
gamma=gamma
)
# 경험 데이터 가져오기
experiences = self.experience_buffer.get_experiences()
if not experiences:
return {"message": "No experience data available", "iterations": 0}
# 배치 샘플링
if len(experiences) > batch_size:
batch = self.experience_buffer.sample_batch(batch_size)
else:
batch = experiences
# FQI+CQL 학습
result = self.fqi_cql_learner.train_multiple_iterations(
experience_batch=batch,
num_iterations=num_iterations
)
# 정책 비교
policy_comparison = self.fqi_cql_learner.compare_with_behavior_policy(batch)
return {
"message": "FQI+CQL training completed",
"training_result": result,
"policy_comparison": policy_comparison,
"batch_size": len(batch)
}
def get_action_recommendation(self, request: ActionRecommendationRequest) -> ActionRecommendationResponse:
"""행동 추천"""
# Q값들 가져오기
q_values = self.q_learner.get_state_q_values(request.current_state)
# 행동 선택
if request.use_epsilon_greedy:
action, is_exploration = self.q_learner.select_action(
state=request.current_state,
use_epsilon_greedy=True
)
# 임시로 epsilon 설정
original_epsilon = self.q_learner.epsilon
self.q_learner.epsilon = request.epsilon
action, is_exploration = self.q_learner.select_action(
state=request.current_state,
use_epsilon_greedy=True
)
self.q_learner.epsilon = original_epsilon
else:
action = self.q_learner.get_optimal_action(request.current_state)
is_exploration = False
# 신뢰도 계산 (Q값 분산 기반)
if q_values and len(q_values) > 1:
q_vals = list(q_values.values())
max_q = max(q_vals)
q_range = max(q_vals) - min(q_vals)
confidence = max_q / (q_range + 1e-8) if q_range > 0 else 1.0
confidence = min(confidence, 1.0)
else:
confidence = 0.0
return ActionRecommendationResponse(
recommended_action=action,
q_values=q_values,
confidence=confidence,
exploration=is_exploration
)
def get_system_status(self) -> SystemStatus:
"""시스템 상태 조회"""
exp_df = self.experience_buffer.get_dataframe()
if not exp_df.empty:
avg_reward = exp_df['reward'].mean()
success_count = exp_df['done'].sum()
success_rate = success_count / len(exp_df) if len(exp_df) > 0 else 0.0
unique_states = exp_df['state'].nunique()
else:
avg_reward = 0.0
success_rate = 0.0
unique_states = 0
return SystemStatus(
total_experiences=self.experience_buffer.size(),
q_table_updates=self.q_learner.update_count,
unique_states=unique_states,
average_reward=avg_reward,
success_rate=success_rate,
last_update=time.time()
)
def get_q_table(self) -> Dict[str, Any]:
"""Q-Table 데이터 반환"""
q_table_df = self.q_learner.get_q_table_copy()
stats = self.q_learner.get_learning_statistics()
return {
"q_table": q_table_df.to_dict(),
"statistics": stats,
"update_count": self.q_learner.update_count,
"hyperparameters": {
"learning_rate": self.q_learner.learning_rate,
"discount_factor": self.q_learner.discount_factor,
"epsilon": self.q_learner.epsilon
}
}
def get_fqi_cql_results(self) -> Dict[str, Any]:
"""FQI+CQL 결과 반환"""
q_network_df = self.fqi_cql_learner.get_q_network_copy()
stats = self.fqi_cql_learner.get_training_statistics()
return {
"q_network": q_network_df.to_dict(),
"statistics": stats,
"batch_count": self.fqi_cql_learner.batch_count,
"hyperparameters": {
"alpha": self.fqi_cql_learner.alpha,
"gamma": self.fqi_cql_learner.gamma,
"learning_rate": self.fqi_cql_learner.learning_rate
}
}
def get_experience_data(self) -> Dict[str, Any]:
"""경험 데이터 반환"""
exp_df = self.experience_buffer.get_dataframe()
if not exp_df.empty:
# 기본 통계
stats = {
"total_count": len(exp_df),
"avg_reward": exp_df['reward'].mean(),
"reward_std": exp_df['reward'].std(),
"success_rate": exp_df['done'].sum() / len(exp_df),
"unique_states": exp_df['state'].nunique(),
"unique_actions": exp_df['action'].nunique()
}
# 최근 데이터
recent_data = exp_df.tail(20).to_dict('records')
else:
stats = {
"total_count": 0,
"avg_reward": 0.0,
"reward_std": 0.0,
"success_rate": 0.0,
"unique_states": 0,
"unique_actions": 0
}
recent_data = []
return {
"statistics": stats,
"recent_data": recent_data,
"buffer_size": self.experience_buffer.size(),
"max_size": self.experience_buffer.max_size
}
def reset_all(self):
"""모든 학습 상태 초기화"""
self.experience_buffer.clear()
self.q_learner.reset()
self.fqi_cql_learner.reset()
self.episode_count = 0
self.start_time = time.time()
def compare_policies(self, state: str) -> Dict[str, Any]:
"""Q-Learning과 FQI+CQL 정책 비교"""
# Q-Learning 정책
q_learning_action = self.q_learner.get_optimal_action(state)
q_learning_values = self.q_learner.get_state_q_values(state)
# FQI+CQL 정책
fqi_cql_action = self.fqi_cql_learner.get_optimal_action(state)
fqi_cql_values = self.fqi_cql_learner.get_state_q_values(state)
# 정책 일치 여부
policy_agreement = (q_learning_action == fqi_cql_action)
# Q값 차이
q_value_differences = {}
for action_name in q_learning_values:
diff = abs(q_learning_values[action_name] - fqi_cql_values.get(action_name, 0.0))
q_value_differences[action_name] = diff
return {
"state": state,
"q_learning": {
"action": q_learning_action.value,
"q_values": q_learning_values
},
"fqi_cql": {
"action": fqi_cql_action.value,
"q_values": fqi_cql_values
},
"policy_agreement": policy_agreement,
"q_value_differences": q_value_differences,
"max_difference": max(q_value_differences.values()) if q_value_differences else 0.0
}
# 전역 서비스 인스턴스
demo_service = DemoService()