o2o-castad-backend/app/comment/services/comment.py

190 lines
5.6 KiB
Python

from collections import defaultdict
from typing import List, Optional
from fastapi import HTTPException
from sqlalchemy import exists, select
from sqlalchemy.ext.asyncio import AsyncSession
from app.comment.models import Comment
from app.comment.schemas.comment_schema import CommentItem, ReplyItem
from app.utils.pagination import PaginatedResponse
from app.video.models import Video
async def _validate_parent(
session: AsyncSession,
parent_id: int,
video_id: int,
) -> None:
"""2-depth 제한 + 동일 video 검증."""
result = await session.execute(
select(Comment).where(
Comment.id == parent_id,
Comment.is_deleted == False, # noqa: E712
)
)
parent = result.scalar_one_or_none()
if parent is None:
raise HTTPException(status_code=400, detail="부모 댓글을 찾을 수 없습니다.")
if parent.video_id != video_id:
raise HTTPException(status_code=400, detail="다른 영상의 댓글에는 대댓글을 달 수 없습니다.")
if parent.parent_id is not None:
raise HTTPException(status_code=400, detail="대댓글에는 대댓글을 달 수 없습니다. (최대 2-depth)")
def _build_comment_items(
parents: list,
replies_map: dict,
current_user_uuid: Optional[str],
) -> List[CommentItem]:
items = []
for c in parents:
raw_replies = replies_map.get(c.id, [])
replies = [
ReplyItem(
id=r.id,
nickname=r.nickname or "익명",
content=None if r.is_deleted else r.content,
is_deleted=r.is_deleted,
is_mine=(current_user_uuid == r.user_uuid) if current_user_uuid else False,
created_at=r.created_at,
)
for r in raw_replies
]
items.append(
CommentItem(
id=c.id,
nickname=c.nickname or "익명",
content=None if c.is_deleted else c.content,
is_deleted=c.is_deleted,
is_mine=(current_user_uuid == c.user_uuid) if current_user_uuid else False,
created_at=c.created_at,
replies=replies,
)
)
return items
async def create_comment(
session: AsyncSession,
video_id: int,
user_uuid: str,
nickname: str,
content: str,
parent_id: Optional[int],
) -> Comment:
# Video 존재 확인
video_result = await session.execute(
select(Video).where(
Video.id == video_id,
Video.status == "completed",
Video.is_deleted == False, # noqa: E712
)
)
if video_result.scalar_one_or_none() is None:
raise HTTPException(status_code=404, detail="영상을 찾을 수 없습니다.")
# parent_id 검증
if parent_id is not None:
await _validate_parent(session, parent_id, video_id)
comment = Comment(
video_id=video_id,
user_uuid=user_uuid,
nickname=nickname,
parent_id=parent_id,
content=content,
)
session.add(comment)
await session.commit()
await session.refresh(comment)
return comment
async def list_comments(
session: AsyncSession,
video_id: int,
page: int,
page_size: int,
current_user_uuid: Optional[str],
) -> PaginatedResponse[CommentItem]:
offset = (page - 1) * page_size
# 살아있는 자식이 있는지 확인하는 서브쿼리
has_live_reply = (
exists()
.where(
Comment.parent_id == Comment.id,
Comment.is_deleted == False, # noqa: E712
)
.correlate(Comment)
)
# 최상위 댓글 필터: 삭제 안 됐거나 살아있는 대댓글이 있는 것
parent_where = [
Comment.video_id == video_id,
Comment.parent_id.is_(None),
(Comment.is_deleted == False) | has_live_reply, # noqa: E712
]
from sqlalchemy import func
count_q = select(func.count(Comment.id)).where(*parent_where)
total = (await session.execute(count_q)).scalar() or 0
parents_q = (
select(Comment)
.where(*parent_where)
.order_by(Comment.created_at.desc())
.offset(offset)
.limit(page_size)
)
parents = (await session.execute(parents_q)).scalars().all()
replies_map: dict = defaultdict(list)
if parents:
parent_ids = [c.id for c in parents]
replies_q = (
select(Comment)
.where(
Comment.parent_id.in_(parent_ids),
Comment.is_deleted == False, # noqa: E712
)
.order_by(Comment.created_at.asc())
)
replies = (await session.execute(replies_q)).scalars().all()
for r in replies:
replies_map[r.parent_id].append(r)
items = _build_comment_items(list(parents), replies_map, current_user_uuid)
return PaginatedResponse.create(
items=items,
total=total,
page=page,
page_size=page_size,
)
async def delete_comment(
session: AsyncSession,
comment_id: int,
current_user_uuid: str,
) -> None:
result = await session.execute(
select(Comment).where(
Comment.id == comment_id,
Comment.is_deleted == False, # noqa: E712
)
)
comment = result.scalar_one_or_none()
if comment is None:
raise HTTPException(status_code=404, detail="댓글을 찾을 수 없습니다.")
if comment.user_uuid != current_user_uuid:
raise HTTPException(status_code=403, detail="삭제 권한이 없습니다.")
comment.is_deleted = True
await session.commit()