O2Sound_ver2_final/backend/tests/test_async_services.py

137 lines
4.9 KiB
Python

import pytest
import asyncio
from unittest.mock import MagicMock, AsyncMock, patch
from app.core.redis.redis_manager import RedisManager
from app.shared.progress import Process
from app.services.auth_service import AuthService
from app.services.user_service import UserService
class TestAsyncServices:
"""비동기 서비스 테스트"""
@pytest.mark.asyncio
async def test_redis_manager_operations(self):
"""Redis Manager 비동기 작업 테스트"""
with patch('redis.asyncio.Redis') as mock_redis:
# Mock 설정
mock_instance = AsyncMock()
mock_redis.from_url.return_value = mock_instance
# RedisManager 인스턴스 생성
redis_manager = RedisManager()
# set 작업 테스트
mock_instance.set.return_value = True
result = await redis_manager.set("test_key", "test_value", 3600)
assert result == True
mock_instance.set.assert_called_once_with("test_key", "test_value", 3600)
# get 작업 테스트
mock_instance.get.return_value = b"test_value"
result = await redis_manager.get("test_key")
assert result == "test_value"
# exists 작업 테스트
mock_instance.exists.return_value = 1
result = await redis_manager.exists("test_key")
assert result == True
# delete 작업 테스트
mock_instance.delete.return_value = 1
result = await redis_manager.delete("test_key")
assert result == True
@pytest.mark.asyncio
async def test_progress_tracking(self):
"""진행상황 추적 테스트"""
# Mock RedisManager
mock_redis = AsyncMock()
mock_redis.set = AsyncMock(return_value=True)
mock_redis.get = AsyncMock(return_value='{"metadata": true, "lyrics": false}')
# Process 인스턴스 생성
process = Process(mock_redis)
# 초기 상태 설정
await process.init_task_status("task_123")
mock_redis.set.assert_called()
# 상태 조회
status = await process.get_task_status("task_123")
assert status["metadata"] == True
assert status["lyrics"] == False
@pytest.mark.asyncio
async def test_concurrent_requests(self):
"""동시 요청 처리 테스트"""
# Mock 서비스
mock_service = AsyncMock()
mock_service.process_request = AsyncMock(side_effect=lambda x: f"processed_{x}")
# 동시에 여러 요청 실행
tasks = []
for i in range(5):
task = mock_service.process_request(f"request_{i}")
tasks.append(task)
results = await asyncio.gather(*tasks)
# 검증
assert len(results) == 5
for i, result in enumerate(results):
assert result == f"processed_request_{i}"
assert mock_service.process_request.call_count == 5
@pytest.mark.asyncio
async def test_error_handling_in_async_context(self):
"""비동기 컨텍스트에서의 에러 처리 테스트"""
mock_service = AsyncMock()
mock_service.risky_operation = AsyncMock(side_effect=ValueError("테스트 에러"))
with pytest.raises(ValueError, match="테스트 에러"):
await mock_service.risky_operation()
@pytest.mark.asyncio
async def test_async_context_manager(self):
"""비동기 컨텍스트 매니저 테스트"""
class AsyncResource:
def __init__(self):
self.connected = False
async def __aenter__(self):
self.connected = True
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
self.connected = False
async with AsyncResource() as resource:
assert resource.connected == True
assert resource.connected == False
@pytest.mark.asyncio
async def test_timeout_handling(self):
"""타임아웃 처리 테스트"""
async def slow_operation():
await asyncio.sleep(2)
return "completed"
# 타임아웃으로 실패해야 함
with pytest.raises(asyncio.TimeoutError):
await asyncio.wait_for(slow_operation(), timeout=0.1)
@pytest.mark.asyncio
async def test_async_generator(self):
"""비동기 제너레이터 테스트"""
async def data_generator():
for i in range(3):
await asyncio.sleep(0.01) # 비동기 작업 시뮬레이션
yield i
results = []
async for value in data_generator():
results.append(value)
assert results == [0, 1, 2]