o2o-castad-scheduler/jobs/sns_upload.py

94 lines
3.4 KiB
Python

"""
SNS 예약 업로드 잡
scheduled_at이 현재 시간 이전이고 status가 pending인 업로드 작업을
백엔드 내부 API로 트리거합니다.
"""
import asyncio
import logging
from datetime import datetime, timezone
import httpx
from sqlalchemy import text
from config import settings, TIMEZONE
from db import SessionLocal
from jobs.base import BaseJob
logger = logging.getLogger(__name__)
class SnsUploadJob(BaseJob):
name = "SNS 예약 업로드 체크"
async def run(self) -> None:
logger.info("[SNS_UPLOAD] 예약 업로드 체크 시작")
try:
upload_ids = await self._fetch_pending_uploads()
except Exception as e:
logger.error(f"[SNS_UPLOAD] DB 조회 오류: {e}")
return
if not upload_ids:
logger.info("[SNS_UPLOAD] 실행할 예약 업로드 없음")
return
logger.info(f"[SNS_UPLOAD] 예약 업로드 {len(upload_ids)}건 발견: {upload_ids}")
async with httpx.AsyncClient() as client:
tasks = [self._trigger_upload(uid, client) for uid in upload_ids]
results = await asyncio.gather(*tasks, return_exceptions=True)
success = sum(1 for r in results if r is True)
logger.info(f"[SNS_UPLOAD] 완료 - 성공: {success}/{len(upload_ids)}")
async def _fetch_pending_uploads(self) -> list[int]:
# DB의 다른 datetime 컬럼과 동일하게 Seoul time naive로 비교
utc_now = datetime.now(tz=timezone.utc)
server_now = datetime.now()
seoul_aware = datetime.now(TIMEZONE)
now = seoul_aware.replace(tzinfo=None).strftime("%Y-%m-%d %H:%M:%S")
logger.debug(
f"[SNS_UPLOAD][TIME_DEBUG] "
f"서버 UTC: {utc_now.strftime('%Y-%m-%d %H:%M:%S')}, "
f"서버 로컬: {server_now.strftime('%Y-%m-%d %H:%M:%S')}, "
f"Seoul aware: {seoul_aware.isoformat()}, "
f"TIMEZONE={TIMEZONE} (key={TIMEZONE.key}), "
f"now(쿼리용): {now}"
)
query = text("""
SELECT id FROM social_upload
WHERE status = 'pending'
AND scheduled_at IS NOT NULL
AND scheduled_at <= :now
""")
async with SessionLocal() as session:
result = await session.execute(query, {"now": now})
rows = result.fetchall()
return [row[0] for row in rows]
async def _trigger_upload(self, upload_id: int, client: httpx.AsyncClient) -> bool:
url = f"{settings.BACKEND_INTERNAL_URL}/internal/social/upload/{upload_id}"
try:
response = await client.post(
url,
headers={"X-Internal-Secret": settings.INTERNAL_SECRET_KEY},
timeout=10.0,
)
if response.status_code == 200:
logger.info(f"[SNS_UPLOAD] 업로드 트리거 성공 - upload_id: {upload_id}")
return True
else:
logger.error(
f"[SNS_UPLOAD] 업로드 트리거 실패 - upload_id: {upload_id}, "
f"status: {response.status_code}, body: {response.text}"
)
return False
except httpx.RequestError as e:
logger.error(f"[SNS_UPLOAD] 업로드 트리거 요청 오류 - upload_id: {upload_id}, error: {e}")
return False