94 lines
3.4 KiB
Python
94 lines
3.4 KiB
Python
"""
|
|
SNS 예약 업로드 잡
|
|
|
|
scheduled_at이 현재 시간 이전이고 status가 pending인 업로드 작업을
|
|
백엔드 내부 API로 트리거합니다.
|
|
"""
|
|
|
|
import asyncio
|
|
import logging
|
|
from datetime import datetime, timezone
|
|
|
|
import httpx
|
|
from sqlalchemy import text
|
|
|
|
from config import settings, TIMEZONE
|
|
from db import SessionLocal
|
|
from jobs.base import BaseJob
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class SnsUploadJob(BaseJob):
|
|
name = "SNS 예약 업로드 체크"
|
|
|
|
async def run(self) -> None:
|
|
logger.info("[SNS_UPLOAD] 예약 업로드 체크 시작")
|
|
|
|
try:
|
|
upload_ids = await self._fetch_pending_uploads()
|
|
except Exception as e:
|
|
logger.error(f"[SNS_UPLOAD] DB 조회 오류: {e}")
|
|
return
|
|
|
|
if not upload_ids:
|
|
logger.info("[SNS_UPLOAD] 실행할 예약 업로드 없음")
|
|
return
|
|
|
|
logger.info(f"[SNS_UPLOAD] 예약 업로드 {len(upload_ids)}건 발견: {upload_ids}")
|
|
|
|
async with httpx.AsyncClient() as client:
|
|
tasks = [self._trigger_upload(uid, client) for uid in upload_ids]
|
|
results = await asyncio.gather(*tasks, return_exceptions=True)
|
|
|
|
success = sum(1 for r in results if r is True)
|
|
logger.info(f"[SNS_UPLOAD] 완료 - 성공: {success}/{len(upload_ids)}")
|
|
|
|
async def _fetch_pending_uploads(self) -> list[int]:
|
|
# DB의 다른 datetime 컬럼과 동일하게 Seoul time naive로 비교
|
|
utc_now = datetime.now(tz=timezone.utc)
|
|
server_now = datetime.now()
|
|
seoul_aware = datetime.now(TIMEZONE)
|
|
now = seoul_aware.replace(tzinfo=None).strftime("%Y-%m-%d %H:%M:%S")
|
|
|
|
logger.debug(
|
|
f"[SNS_UPLOAD][TIME_DEBUG] "
|
|
f"서버 UTC: {utc_now.strftime('%Y-%m-%d %H:%M:%S')}, "
|
|
f"서버 로컬: {server_now.strftime('%Y-%m-%d %H:%M:%S')}, "
|
|
f"Seoul aware: {seoul_aware.isoformat()}, "
|
|
f"TIMEZONE={TIMEZONE} (key={TIMEZONE.key}), "
|
|
f"now(쿼리용): {now}"
|
|
)
|
|
|
|
query = text("""
|
|
SELECT id FROM social_upload
|
|
WHERE status = 'pending'
|
|
AND scheduled_at IS NOT NULL
|
|
AND scheduled_at <= :now
|
|
""")
|
|
async with SessionLocal() as session:
|
|
result = await session.execute(query, {"now": now})
|
|
rows = result.fetchall()
|
|
return [row[0] for row in rows]
|
|
|
|
async def _trigger_upload(self, upload_id: int, client: httpx.AsyncClient) -> bool:
|
|
url = f"{settings.BACKEND_INTERNAL_URL}/internal/social/upload/{upload_id}"
|
|
try:
|
|
response = await client.post(
|
|
url,
|
|
headers={"X-Internal-Secret": settings.INTERNAL_SECRET_KEY},
|
|
timeout=10.0,
|
|
)
|
|
if response.status_code == 200:
|
|
logger.info(f"[SNS_UPLOAD] 업로드 트리거 성공 - upload_id: {upload_id}")
|
|
return True
|
|
else:
|
|
logger.error(
|
|
f"[SNS_UPLOAD] 업로드 트리거 실패 - upload_id: {upload_id}, "
|
|
f"status: {response.status_code}, body: {response.text}"
|
|
)
|
|
return False
|
|
except httpx.RequestError as e:
|
|
logger.error(f"[SNS_UPLOAD] 업로드 트리거 요청 오류 - upload_id: {upload_id}, error: {e}")
|
|
return False
|