99 lines
2.5 KiB
Python
99 lines
2.5 KiB
Python
"""
|
|
Common Utility Functions
|
|
|
|
공통으로 사용되는 유틸리티 함수들을 정의합니다.
|
|
|
|
사용 예시:
|
|
from app.utils.common import generate_task_id, generate_ksuid
|
|
|
|
# task_id 생성
|
|
task_id = await generate_task_id(session=session, table_name=Project)
|
|
|
|
# ksuid 생성
|
|
ksuid = await generate_ksuid(session=session, table_name=User)
|
|
|
|
Note:
|
|
페이지네이션 기능은 app.utils.pagination 모듈을 사용하세요:
|
|
from app.utils.pagination import PaginatedResponse, get_paginated
|
|
"""
|
|
|
|
from typing import Any, Optional, Type
|
|
|
|
from sqlalchemy import select
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
from svix_ksuid import Ksuid
|
|
|
|
|
|
async def generate_task_id(
|
|
session: Optional[AsyncSession] = None,
|
|
table_name: Optional[Type[Any]] = None,
|
|
) -> str:
|
|
"""고유한 task_id를 생성합니다.
|
|
|
|
Args:
|
|
session: SQLAlchemy AsyncSession (optional)
|
|
table_name: task_id 컬럼이 있는 SQLAlchemy 테이블 클래스 (optional)
|
|
|
|
Returns:
|
|
str: 생성된 ksuid 문자열
|
|
|
|
Usage:
|
|
# 단순 ksuid 생성
|
|
task_id = await generate_task_id()
|
|
|
|
# 테이블에서 중복 검사 후 생성
|
|
task_id = await generate_task_id(session=session, table_name=Project)
|
|
"""
|
|
task_id = str(Ksuid())
|
|
|
|
if session is None or table_name is None:
|
|
return task_id
|
|
|
|
while True:
|
|
result = await session.execute(
|
|
select(table_name).where(table_name.task_id == task_id)
|
|
)
|
|
existing = result.scalar_one_or_none()
|
|
|
|
if existing is None:
|
|
return task_id
|
|
|
|
task_id = str(Ksuid())
|
|
|
|
|
|
async def generate_ksuid(
|
|
session: Optional[AsyncSession] = None,
|
|
table_name: Optional[Type[Any]] = None,
|
|
) -> str:
|
|
"""고유한 ksuid를 생성합니다.
|
|
|
|
Args:
|
|
session: SQLAlchemy AsyncSession (optional)
|
|
table_name: user_ksuid 컬럼이 있는 SQLAlchemy 테이블 클래스 (optional)
|
|
|
|
Returns:
|
|
str: 생성된 ksuid 문자열
|
|
|
|
Usage:
|
|
# 단순 ksuid 생성
|
|
ksuid = await generate_ksuid()
|
|
|
|
# 테이블에서 중복 검사 후 생성
|
|
ksuid = await generate_ksuid(session=session, table_name=User)
|
|
"""
|
|
ksuid = str(Ksuid())
|
|
|
|
if session is None or table_name is None:
|
|
return ksuid
|
|
|
|
while True:
|
|
result = await session.execute(
|
|
select(table_name).where(table_name.user_ksuid == ksuid)
|
|
)
|
|
existing = result.scalar_one_or_none()
|
|
|
|
if existing is None:
|
|
return ksuid
|
|
|
|
ksuid = str(Ksuid())
|