o2o-infinith-backend/app/common/db/base.py

57 lines
1.6 KiB
Python

import os
import aiomysql
from common.utils import get_env
_pool: aiomysql.Pool | None = None
async def get_pool() -> aiomysql.Pool:
global _pool
if _pool is None:
_pool = await aiomysql.create_pool(
host=get_env("MYSQL_HOST"),
port=int(os.getenv("MYSQL_PORT", "3306")),
user=get_env("MYSQL_USER"),
password=get_env("MYSQL_PASSWORD"),
db=get_env("MYSQL_DB"),
charset="utf8mb4",
minsize=0,
maxsize=30,
connect_timeout=10,
)
return _pool
async def execute(sql: str, args: tuple = ()) -> int:
pool = await get_pool()
async with pool.acquire() as conn:
try:
async with conn.cursor() as cur:
await cur.execute(sql, args)
await conn.commit()
return cur.lastrowid
finally:
conn.close()
async def fetchone(sql: str, args: tuple = ()) -> dict | None:
pool = await get_pool()
async with pool.acquire() as conn:
try:
async with conn.cursor(aiomysql.DictCursor) as cur:
await cur.execute(sql, args)
return await cur.fetchone()
finally:
conn.close()
async def fetchall(sql: str, args: tuple = ()) -> list[dict]:
pool = await get_pool()
async with pool.acquire() as conn:
try:
async with conn.cursor(aiomysql.DictCursor) as cur:
await cur.execute(sql, args)
return await cur.fetchall()
finally:
conn.close()