import pytest import asyncio from unittest.mock import MagicMock, AsyncMock, patch from app.core.redis.redis_manager import RedisManager from app.shared.progress import Process from app.services.auth_service import AuthService from app.services.user_service import UserService class TestAsyncServices: """비동기 서비스 테스트""" @pytest.mark.asyncio async def test_redis_manager_operations(self): """Redis Manager 비동기 작업 테스트""" with patch('redis.asyncio.Redis') as mock_redis: # Mock 설정 mock_instance = AsyncMock() mock_redis.from_url.return_value = mock_instance # RedisManager 인스턴스 생성 redis_manager = RedisManager() # set 작업 테스트 mock_instance.set.return_value = True result = await redis_manager.set("test_key", "test_value", 3600) assert result == True mock_instance.set.assert_called_once_with("test_key", "test_value", 3600) # get 작업 테스트 mock_instance.get.return_value = b"test_value" result = await redis_manager.get("test_key") assert result == "test_value" # exists 작업 테스트 mock_instance.exists.return_value = 1 result = await redis_manager.exists("test_key") assert result == True # delete 작업 테스트 mock_instance.delete.return_value = 1 result = await redis_manager.delete("test_key") assert result == True @pytest.mark.asyncio async def test_progress_tracking(self): """진행상황 추적 테스트""" # Mock RedisManager mock_redis = AsyncMock() mock_redis.set = AsyncMock(return_value=True) mock_redis.get = AsyncMock(return_value='{"metadata": true, "lyrics": false}') # Process 인스턴스 생성 process = Process(mock_redis) # 초기 상태 설정 await process.init_task_status("task_123") mock_redis.set.assert_called() # 상태 조회 status = await process.get_task_status("task_123") assert status["metadata"] == True assert status["lyrics"] == False @pytest.mark.asyncio async def test_concurrent_requests(self): """동시 요청 처리 테스트""" # Mock 서비스 mock_service = AsyncMock() mock_service.process_request = AsyncMock(side_effect=lambda x: f"processed_{x}") # 동시에 여러 요청 실행 tasks = [] for i in range(5): task = mock_service.process_request(f"request_{i}") tasks.append(task) results = await asyncio.gather(*tasks) # 검증 assert len(results) == 5 for i, result in enumerate(results): assert result == f"processed_request_{i}" assert mock_service.process_request.call_count == 5 @pytest.mark.asyncio async def test_error_handling_in_async_context(self): """비동기 컨텍스트에서의 에러 처리 테스트""" mock_service = AsyncMock() mock_service.risky_operation = AsyncMock(side_effect=ValueError("테스트 에러")) with pytest.raises(ValueError, match="테스트 에러"): await mock_service.risky_operation() @pytest.mark.asyncio async def test_async_context_manager(self): """비동기 컨텍스트 매니저 테스트""" class AsyncResource: def __init__(self): self.connected = False async def __aenter__(self): self.connected = True return self async def __aexit__(self, exc_type, exc_val, exc_tb): self.connected = False async with AsyncResource() as resource: assert resource.connected == True assert resource.connected == False @pytest.mark.asyncio async def test_timeout_handling(self): """타임아웃 처리 테스트""" async def slow_operation(): await asyncio.sleep(2) return "completed" # 타임아웃으로 실패해야 함 with pytest.raises(asyncio.TimeoutError): await asyncio.wait_for(slow_operation(), timeout=0.1) @pytest.mark.asyncio async def test_async_generator(self): """비동기 제너레이터 테스트""" async def data_generator(): for i in range(3): await asyncio.sleep(0.01) # 비동기 작업 시뮬레이션 yield i results = [] async for value in data_generator(): results.append(value) assert results == [0, 1, 2]