sns scheduler 추가 .
commit
4de0ccdf87
|
|
@ -0,0 +1,15 @@
|
|||
# 백엔드 내부 API (Docker 네트워크 내부 URL)
|
||||
BACKEND_INTERNAL_URL=http://localhost:8000
|
||||
|
||||
# 내부 인증 키 (백엔드 .env의 INTERNAL_SECRET_KEY와 동일하게 설정)
|
||||
INTERNAL_SECRET_KEY=ado2-internal-backend-server-secret-key
|
||||
|
||||
# MySQL 설정 (백엔드와 동일한 DB)
|
||||
MYSQL_HOST=localhost
|
||||
MYSQL_PORT=3306
|
||||
MYSQL_USER=root
|
||||
MYSQL_PASSWORD=1234
|
||||
MYSQL_DB=castad_test1
|
||||
|
||||
# 체크 주기 (분)
|
||||
CHECK_INTERVAL_MINUTES=1
|
||||
|
|
@ -0,0 +1,15 @@
|
|||
# 백엔드 내부 API (Docker 네트워크 내부 URL)
|
||||
BACKEND_INTERNAL_URL=http://castad-app:8000
|
||||
|
||||
# 내부 인증 키 (백엔드 .env의 INTERNAL_SECRET_KEY와 동일하게 설정)
|
||||
INTERNAL_SECRET_KEY=change-me-internal-secret-key
|
||||
|
||||
# MySQL 설정 (백엔드와 동일한 DB)
|
||||
MYSQL_HOST=mysql
|
||||
MYSQL_PORT=3306
|
||||
MYSQL_USER=root
|
||||
MYSQL_PASSWORD=1234
|
||||
MYSQL_DB=mydb
|
||||
|
||||
# 체크 주기 (분)
|
||||
CHECK_INTERVAL_MINUTES=1
|
||||
|
|
@ -0,0 +1,10 @@
|
|||
FROM python:3.13-slim
|
||||
|
||||
WORKDIR /app
|
||||
|
||||
COPY pyproject.toml .
|
||||
RUN pip install uv && uv pip install --system -e .
|
||||
|
||||
COPY . .
|
||||
|
||||
CMD ["python", "main.py"]
|
||||
|
|
@ -0,0 +1,57 @@
|
|||
# o2o-ado2-scheduler
|
||||
|
||||
SNS 예약 업로드 스케줄러. 주기적으로 DB를 조회하여 예약 시간이 된 업로드 작업을 백엔드 내부 API로 트리거합니다.
|
||||
|
||||
---
|
||||
|
||||
## 실행
|
||||
|
||||
### 로컬
|
||||
|
||||
```bash
|
||||
pip install uv
|
||||
uv pip install -e .
|
||||
python main.py
|
||||
```
|
||||
|
||||
### Docker 단독 실행
|
||||
|
||||
```bash
|
||||
docker build -t o2o-ado2-scheduler .
|
||||
docker run --env-file .env o2o-ado2-scheduler
|
||||
```
|
||||
|
||||
### Docker Compose (백엔드와 같은 네트워크)
|
||||
|
||||
```yaml
|
||||
scheduler:
|
||||
build: .
|
||||
env_file: .env
|
||||
networks:
|
||||
- backend-network
|
||||
restart: unless-stopped
|
||||
```
|
||||
|
||||
> Docker Compose로 실행할 경우 `BACKEND_INTERNAL_URL`은 컨테이너 서비스명으로 설정합니다.
|
||||
> 예: `BACKEND_INTERNAL_URL=http://castad-app:8000`
|
||||
|
||||
---
|
||||
|
||||
## 환경변수 (.env)
|
||||
|
||||
`.env.example`을 복사하여 사용합니다.
|
||||
|
||||
```bash
|
||||
cp .env.example .env
|
||||
```
|
||||
|
||||
| 변수 | 설명 | 예시 |
|
||||
|------|------|------|
|
||||
| `BACKEND_INTERNAL_URL` | 백엔드 서버 URL | `http://localhost:8000` (로컬) / `http://castad-app:8000` (Docker) |
|
||||
| `INTERNAL_SECRET_KEY` | 내부 API 인증 키 (백엔드와 동일하게 설정) | `your-secret-key` |
|
||||
| `MYSQL_HOST` | MySQL 호스트 | `localhost` (로컬) / `mysql` (Docker) |
|
||||
| `MYSQL_PORT` | MySQL 포트 | `3306` |
|
||||
| `MYSQL_USER` | MySQL 유저 | `root` |
|
||||
| `MYSQL_PASSWORD` | MySQL 비밀번호 | `1234` |
|
||||
| `MYSQL_DB` | MySQL 데이터베이스명 | `castad_test1` |
|
||||
| `CHECK_INTERVAL_MINUTES` | 예약 업로드 체크 주기 (분) | `10` |
|
||||
Binary file not shown.
Binary file not shown.
|
|
@ -0,0 +1,45 @@
|
|||
from pydantic import Field
|
||||
from pydantic_settings import BaseSettings, SettingsConfigDict
|
||||
from pathlib import Path
|
||||
|
||||
PROJECT_DIR = Path(__file__).resolve().parent
|
||||
|
||||
_base_config = SettingsConfigDict(
|
||||
env_file=PROJECT_DIR / ".env",
|
||||
env_ignore_empty=True,
|
||||
extra="ignore",
|
||||
)
|
||||
|
||||
|
||||
class SchedulerSettings(BaseSettings):
|
||||
# 백엔드 내부 API 설정
|
||||
BACKEND_INTERNAL_URL: str = Field(
|
||||
default="http://castad-app:8000",
|
||||
description="백엔드 서버 내부 URL (Docker 네트워크)",
|
||||
)
|
||||
INTERNAL_SECRET_KEY: str = Field(
|
||||
default="change-me-internal-secret-key",
|
||||
description="내부 API 인증 키 (백엔드와 동일해야 함)",
|
||||
)
|
||||
|
||||
# MySQL 설정
|
||||
MYSQL_HOST: str = Field(default="mysql")
|
||||
MYSQL_PORT: int = Field(default=3306)
|
||||
MYSQL_USER: str = Field(default="root")
|
||||
MYSQL_PASSWORD: str = Field(default="")
|
||||
MYSQL_DB: str = Field(default="castad_test1")
|
||||
|
||||
# 스케줄러 설정
|
||||
CHECK_INTERVAL_MINUTES: int = Field(
|
||||
default=10,
|
||||
description="예약 업로드 체크 주기 (분)",
|
||||
)
|
||||
|
||||
model_config = _base_config
|
||||
|
||||
@property
|
||||
def MYSQL_URL(self) -> str:
|
||||
return f"mysql+aiomysql://{self.MYSQL_USER}:{self.MYSQL_PASSWORD}@{self.MYSQL_HOST}:{self.MYSQL_PORT}/{self.MYSQL_DB}"
|
||||
|
||||
|
||||
settings = SchedulerSettings()
|
||||
|
|
@ -0,0 +1,19 @@
|
|||
"""
|
||||
DB 연결 (공유 엔진 및 세션 팩토리)
|
||||
"""
|
||||
|
||||
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine
|
||||
|
||||
from config import settings
|
||||
|
||||
engine = create_async_engine(
|
||||
url=settings.MYSQL_URL,
|
||||
pool_pre_ping=True,
|
||||
pool_recycle=280,
|
||||
)
|
||||
|
||||
SessionLocal = async_sessionmaker(
|
||||
bind=engine,
|
||||
class_=AsyncSession,
|
||||
expire_on_commit=False,
|
||||
)
|
||||
|
|
@ -0,0 +1,15 @@
|
|||
"""
|
||||
등록된 잡 목록
|
||||
|
||||
새로운 플랫폼 잡 추가 시 이 목록에 인스턴스를 추가합니다.
|
||||
예: from jobs.instagram import InstagramUploadJob
|
||||
from jobs.tiktok import TikTokUploadJob
|
||||
"""
|
||||
|
||||
from jobs.sns_upload import SnsUploadJob
|
||||
|
||||
JOBS = [
|
||||
SnsUploadJob(),
|
||||
# InstagramUploadJob(),
|
||||
# TikTokUploadJob(),
|
||||
]
|
||||
Binary file not shown.
Binary file not shown.
Binary file not shown.
|
|
@ -0,0 +1,20 @@
|
|||
"""
|
||||
스케줄 잡 추상 베이스 클래스
|
||||
|
||||
새로운 플랫폼(Instagram, TikTok 등) 추가 시 이 클래스를 상속합니다.
|
||||
"""
|
||||
|
||||
from abc import ABC, abstractmethod
|
||||
|
||||
|
||||
class BaseJob(ABC):
|
||||
# 스케줄러 로그 및 job ID에 사용되는 이름
|
||||
name: str
|
||||
|
||||
# 체크 주기 (분) — None이면 config의 CHECK_INTERVAL_MINUTES 사용
|
||||
interval_minutes: int | None = None
|
||||
|
||||
@abstractmethod
|
||||
async def run(self) -> None:
|
||||
"""주기적으로 실행될 잡 로직"""
|
||||
...
|
||||
|
|
@ -0,0 +1,81 @@
|
|||
"""
|
||||
SNS 예약 업로드 잡
|
||||
|
||||
scheduled_at이 현재 시간 이전이고 status가 pending인 업로드 작업을
|
||||
백엔드 내부 API로 트리거합니다.
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
import logging
|
||||
from datetime import datetime
|
||||
from zoneinfo import ZoneInfo
|
||||
|
||||
import httpx
|
||||
from sqlalchemy import text
|
||||
|
||||
from config import settings
|
||||
from db import SessionLocal
|
||||
from jobs.base import BaseJob
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class SnsUploadJob(BaseJob):
|
||||
name = "SNS 예약 업로드 체크"
|
||||
|
||||
async def run(self) -> None:
|
||||
logger.info("[SNS_UPLOAD] 예약 업로드 체크 시작")
|
||||
|
||||
try:
|
||||
upload_ids = await self._fetch_pending_uploads()
|
||||
except Exception as e:
|
||||
logger.error(f"[SNS_UPLOAD] DB 조회 오류: {e}")
|
||||
return
|
||||
|
||||
if not upload_ids:
|
||||
logger.info("[SNS_UPLOAD] 실행할 예약 업로드 없음")
|
||||
return
|
||||
|
||||
logger.info(f"[SNS_UPLOAD] 예약 업로드 {len(upload_ids)}건 발견: {upload_ids}")
|
||||
|
||||
async with httpx.AsyncClient() as client:
|
||||
tasks = [self._trigger_upload(uid, client) for uid in upload_ids]
|
||||
results = await asyncio.gather(*tasks, return_exceptions=True)
|
||||
|
||||
success = sum(1 for r in results if r is True)
|
||||
logger.info(f"[SNS_UPLOAD] 완료 - 성공: {success}/{len(upload_ids)}")
|
||||
|
||||
async def _fetch_pending_uploads(self) -> list[int]:
|
||||
# DB의 다른 datetime 컬럼과 동일하게 Seoul time naive로 비교
|
||||
now = datetime.now(ZoneInfo("Asia/Seoul")).replace(tzinfo=None).strftime("%Y-%m-%d %H:%M:%S")
|
||||
query = text("""
|
||||
SELECT id FROM social_upload
|
||||
WHERE status = 'pending'
|
||||
AND scheduled_at IS NOT NULL
|
||||
AND scheduled_at <= :now
|
||||
""")
|
||||
async with SessionLocal() as session:
|
||||
result = await session.execute(query, {"now": now})
|
||||
rows = result.fetchall()
|
||||
return [row[0] for row in rows]
|
||||
|
||||
async def _trigger_upload(self, upload_id: int, client: httpx.AsyncClient) -> bool:
|
||||
url = f"{settings.BACKEND_INTERNAL_URL}/internal/social/upload/{upload_id}"
|
||||
try:
|
||||
response = await client.post(
|
||||
url,
|
||||
headers={"X-Internal-Secret": settings.INTERNAL_SECRET_KEY},
|
||||
timeout=10.0,
|
||||
)
|
||||
if response.status_code == 200:
|
||||
logger.info(f"[SNS_UPLOAD] 업로드 트리거 성공 - upload_id: {upload_id}")
|
||||
return True
|
||||
else:
|
||||
logger.error(
|
||||
f"[SNS_UPLOAD] 업로드 트리거 실패 - upload_id: {upload_id}, "
|
||||
f"status: {response.status_code}, body: {response.text}"
|
||||
)
|
||||
return False
|
||||
except httpx.RequestError as e:
|
||||
logger.error(f"[SNS_UPLOAD] 업로드 트리거 요청 오류 - upload_id: {upload_id}, error: {e}")
|
||||
return False
|
||||
|
|
@ -0,0 +1,53 @@
|
|||
"""
|
||||
스케줄러 엔트리포인트
|
||||
|
||||
APScheduler를 사용하여 jobs/ 에 등록된 잡들을 주기적으로 실행합니다.
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
import logging
|
||||
|
||||
from apscheduler.schedulers.asyncio import AsyncIOScheduler
|
||||
|
||||
from config import settings
|
||||
from jobs import JOBS
|
||||
|
||||
logging.basicConfig(
|
||||
level=logging.INFO,
|
||||
format="[%(asctime)s] [%(levelname)s] %(name)s - %(message)s",
|
||||
datefmt="%Y-%m-%d %H:%M:%S",
|
||||
)
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
async def main() -> None:
|
||||
scheduler = AsyncIOScheduler()
|
||||
|
||||
for job in JOBS:
|
||||
interval = job.interval_minutes or settings.CHECK_INTERVAL_MINUTES
|
||||
scheduler.add_job(
|
||||
job.run,
|
||||
trigger="interval",
|
||||
minutes=interval,
|
||||
id=job.name,
|
||||
name=job.name,
|
||||
max_instances=1,
|
||||
)
|
||||
logger.info(f"[SCHEDULER] 잡 등록 - '{job.name}' (주기: {interval}분)")
|
||||
|
||||
scheduler.start()
|
||||
logger.info(f"[SCHEDULER] 시작 - 백엔드: {settings.BACKEND_INTERNAL_URL}")
|
||||
|
||||
# 시작 시 모든 잡 즉시 1회 실행
|
||||
await asyncio.gather(*[job.run() for job in JOBS])
|
||||
|
||||
try:
|
||||
await asyncio.Event().wait()
|
||||
except (KeyboardInterrupt, SystemExit):
|
||||
logger.info("[SCHEDULER] 종료 중...")
|
||||
scheduler.shutdown()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
asyncio.run(main())
|
||||
|
|
@ -0,0 +1,14 @@
|
|||
[project]
|
||||
name = "o2o-ado2-scheduler"
|
||||
version = "0.1.0"
|
||||
description = "SNS 업로드 스케줄러"
|
||||
requires-python = ">=3.13"
|
||||
dependencies = [
|
||||
"apscheduler>=3.10.4",
|
||||
"httpx>=0.27.0",
|
||||
"sqlalchemy>=2.0.0",
|
||||
"aiomysql>=0.2.0",
|
||||
"pydantic>=2.0.0",
|
||||
"pydantic-settings>=2.0.0",
|
||||
"python-dotenv>=1.0.0",
|
||||
]
|
||||
Loading…
Reference in New Issue