aio2o-fastapi-sample/api-server/app/module/mysql_utils.py

127 lines
4.1 KiB
Python

import asyncio
import os
import pydantic
from dotenv import load_dotenv
from mysql.connector.aio import connect
from mysql.connector.errors import IntegrityError
from fastapi import HTTPException, status
load_dotenv(verbose=True)
async def get_cnx():
cnx = await connect(
host = os.getenv("MYSQL_HOST"),
user = os.getenv("MYSQL_USER"),
password = os.getenv("MYSQL_PASSWORD"),
database = os.getenv("MYSQL_DATABASE"),
)
return cnx
async def create_user(user_name:str, phone_number:str=None):
async with await get_cnx() as cnx:
async with await cnx.cursor() as cur:
query = '''
INSERT INTO user_table (user_name, phone_number)
VALUES (%(user_name)s, %(phone_number)s)
'''
data = {
"user_name" : user_name,
"phone_number" : phone_number
}
try:
await cur.execute(query, data)
except IntegrityError as e:
await cnx.rollback()
raise HTTPException(status_code=status.HTTP_409_CONFLICT, detail="user already exist")
user_id = cur.lastrowid
print(user_id)
await cnx.commit()
return user_id
async def get_user_info_from_id(user_id:int):
async with await get_cnx() as cnx:
async with await cnx.cursor() as cur:
query = '''
SELECT user_name, phone_number FROM user_table
WHERE user_id=%(user_id)s
'''
data = {"user_id" : user_id}
await cur.execute(query, data)
user_info = await cur.fetchone()
await cnx.commit()
if not user_info:
raise HTTPException(status.HTTP_404_NOT_FOUND)
return user_info
async def get_user_id_from_name(user_name:str):
async with await get_cnx() as cnx:
async with await cnx.cursor() as cur:
query = '''
SELECT user_id FROM user_table
WHERE user_name=%(user_name)s
'''
data = {"user_name" : user_name}
await cur.execute(query, data)
user_info = await cur.fetchone()
await cnx.commit()
if not user_info:
raise HTTPException(status.HTTP_404_NOT_FOUND)
return user_info["user_id"]
async def update_user_phone_from_id(user_id:int, phone_number:str=None):
async with await get_cnx() as cnx:
async with await cnx.cursor() as cur:
count_query = '''
SELECT count(*)
FROM user_table
WHERE user_id = %(user_id)s
'''
await cur.execute(count_query, {'user_id' : user_id})
found_rows = await cur.fetchone()
if found_rows[0] == 0:
await cnx.rollback()
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND)
query = '''
UPDATE user_table
SET phone_number = %(phone_number)s
WHERE user_id = %(user_id)s
'''
data = {
"user_id" : user_id,
"phone_number" : phone_number
}
await cur.execute(query, data)
if cur.rowcount == 0:
await cnx.rollback()
raise HTTPException(status_code=status.HTTP_204_NO_CONTENT)
await cnx.commit()
return data
async def delete_user_from_id(user_id:int):
async with await get_cnx() as cnx:
async with await cnx.cursor() as cur:
query = '''
DELETE
FROM user_table
WHERE user_id = %(user_id)s
'''
data = {
"user_id" : user_id
}
try:
await cur.execute(query, data)
except IntegrityError as e:
await cnx.rollback()
raise HTTPException(status_code=status.HTTP_409_CONFLICT, detail="user already exist")
await cnx.commit()
return