57 lines
1.6 KiB
Python
57 lines
1.6 KiB
Python
import os
|
|
import aiomysql
|
|
from common.utils import get_env
|
|
|
|
_pool: aiomysql.Pool | None = None
|
|
|
|
|
|
async def get_pool() -> aiomysql.Pool:
|
|
global _pool
|
|
if _pool is None:
|
|
_pool = await aiomysql.create_pool(
|
|
host=get_env("MYSQL_HOST"),
|
|
port=int(os.getenv("MYSQL_PORT", "3306")),
|
|
user=get_env("MYSQL_USER"),
|
|
password=get_env("MYSQL_PASSWORD"),
|
|
db=get_env("MYSQL_DB"),
|
|
charset="utf8mb4",
|
|
minsize=0,
|
|
maxsize=30,
|
|
connect_timeout=10,
|
|
)
|
|
return _pool
|
|
|
|
|
|
async def execute(sql: str, args: tuple = ()) -> int:
|
|
pool = await get_pool()
|
|
async with pool.acquire() as conn:
|
|
try:
|
|
async with conn.cursor() as cur:
|
|
await cur.execute(sql, args)
|
|
await conn.commit()
|
|
return cur.lastrowid
|
|
finally:
|
|
conn.close()
|
|
|
|
|
|
async def fetchone(sql: str, args: tuple = ()) -> dict | None:
|
|
pool = await get_pool()
|
|
async with pool.acquire() as conn:
|
|
try:
|
|
async with conn.cursor(aiomysql.DictCursor) as cur:
|
|
await cur.execute(sql, args)
|
|
return await cur.fetchone()
|
|
finally:
|
|
conn.close()
|
|
|
|
|
|
async def fetchall(sql: str, args: tuple = ()) -> list[dict]:
|
|
pool = await get_pool()
|
|
async with pool.acquire() as conn:
|
|
try:
|
|
async with conn.cursor(aiomysql.DictCursor) as cur:
|
|
await cur.execute(sql, args)
|
|
return await cur.fetchall()
|
|
finally:
|
|
conn.close()
|