aio2o-fastapi-sample/api-server/app/module/mysql_utils.py

240 lines
8.1 KiB
Python

import asyncio
import os
import pydantic
from dotenv import load_dotenv
from mysql.connector.aio import connect
from mysql.connector.errors import IntegrityError
from fastapi import HTTPException, status
load_dotenv(verbose=True)
async def get_cnx():
cnx = await connect(
host = os.getenv("MYSQL_HOST"),
user = os.getenv("MYSQL_USER"),
password = os.getenv("MYSQL_PASSWORD"),
database = os.getenv("MYSQL_DATABASE"),
)
return cnx
async def create_user(user_name:str, phone_number:str=None):
async with await get_cnx() as cnx:
async with await cnx.cursor() as cur:
query = '''
INSERT INTO user_table (user_name, phone_number)
VALUES (%(user_name)s, %(phone_number)s)
'''
data = {
"user_name" : user_name,
"phone_number" : phone_number
}
try:
await cur.execute(query, data)
except IntegrityError as e:
await cnx.rollback()
raise HTTPException(status_code=status.HTTP_409_CONFLICT, detail="user already exist")
user_id = cur.lastrowid
print(user_id)
await cnx.commit()
return user_id
async def get_user_info_from_id(user_id:int):
async with await get_cnx() as cnx:
async with await cnx.cursor() as cur:
query = '''
SELECT user_name, phone_number FROM user_table
WHERE user_id=%(user_id)s
'''
data = {"user_id" : user_id}
await cur.execute(query, data)
user_info = await cur.fetchone()
await cnx.commit()
if not user_info:
raise HTTPException(status.HTTP_404_NOT_FOUND)
return user_info
async def get_user_id_from_name(user_name:str):
async with await get_cnx() as cnx:
async with await cnx.cursor() as cur:
query = '''
SELECT user_id FROM user_table
WHERE user_name=%(user_name)s
'''
data = {"user_name" : user_name}
await cur.execute(query, data)
user_info = await cur.fetchone()
await cnx.commit()
if not user_info:
raise HTTPException(status.HTTP_404_NOT_FOUND)
return user_info["user_id"]
async def update_user_phone_from_id(user_id:int, phone_number:str=None):
async with await get_cnx() as cnx:
async with await cnx.cursor() as cur:
count_query = '''
SELECT count(*)
FROM user_table
WHERE user_id = %(user_id)s
'''
await cur.execute(count_query, {'user_id' : user_id})
found_rows = await cur.fetchone()
if found_rows[0] == 0:
await cnx.rollback()
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND)
query = '''
UPDATE user_table
SET phone_number = %(phone_number)s
WHERE user_id = %(user_id)s
'''
data = {
"user_id" : user_id,
"phone_number" : phone_number
}
await cur.execute(query, data)
if cur.rowcount == 0:
await cnx.rollback()
raise HTTPException(status_code=status.HTTP_204_NO_CONTENT)
await cnx.commit()
return data
async def delete_user_from_id(user_id:int):
async with await get_cnx() as cnx:
async with await cnx.cursor() as cur:
count_query = '''
SELECT count(*)
FROM user_table
WHERE user_id = %(user_id)s
'''
await cur.execute(count_query, {'user_id' : user_id})
found_rows = await cur.fetchone()
if found_rows[0] == 0:
await cnx.rollback()
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND)
query = '''
DELETE
FROM user_table
WHERE user_id = %(user_id)s
'''
data = {
"user_id" : user_id
}
try:
await cur.execute(query, data)
except IntegrityError as e:
await cnx.rollback()
raise HTTPException(status_code=status.HTTP_409_CONFLICT, detail="user already exist")
await cnx.commit()
return
async def create_blog(blog_owner:int, blog_title:str, blog_content:str = None):
async with await get_cnx() as cnx:
async with await cnx.cursor() as cur:
query = '''
INSERT INTO blog_table (blog_title, blog_content, blog_owner)
VALUES (%(blog_title)s, %(blog_content)s, %(blog_owner)s)
'''
data = {
"blog_owner" : blog_owner,
"blog_title" : blog_title,
"blog_content" : blog_content
}
try:
await cur.execute(query, data)
except IntegrityError as e:
await cnx.rollback()
raise HTTPException(status_code=status.HTTP_409_CONFLICT, detail="blog already exist")
blog_id = cur.lastrowid
print(blog_id)
await cnx.commit()
return blog_id
async def get_blog_info_from_id(blog_id:int):
async with await get_cnx() as cnx:
async with await cnx.cursor() as cur:
query = '''
SELECT blog_owner, blog_title, blog_content FROM blog_table
WHERE blog_id=%(blog_id)s
'''
data = {"blog_id" : blog_id}
await cur.execute(query, data)
user_info = await cur.fetchone()
await cnx.commit()
if not user_info:
raise HTTPException(status.HTTP_404_NOT_FOUND)
return user_info
async def update_blog_content_from_id(blog_id:int, blog_content:str=None):
async with await get_cnx() as cnx:
async with await cnx.cursor() as cur:
count_query = '''
SELECT count(*)
FROM blog_table
WHERE blog_id = %(blog_id)s
'''
await cur.execute(count_query, {'blog_id' : blog_id})
found_rows = await cur.fetchone()
if found_rows[0] == 0:
await cnx.rollback()
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND)
query = '''
UPDATE blog_table
SET blog_content = %(blog_content)s
WHERE blog_id = %(blog_id)s
'''
data = {
"blog_id" : blog_id,
"blog_content" : blog_content
}
await cur.execute(query, data)
if cur.rowcount == 0:
await cnx.rollback()
raise HTTPException(status_code=status.HTTP_204_NO_CONTENT)
await cnx.commit()
return data
async def delete_blog_from_id(blog_id:int):
async with await get_cnx() as cnx:
async with await cnx.cursor() as cur:
count_query = '''
SELECT count(*)
FROM blog_table
WHERE blog_id = %(blog_id)s
'''
await cur.execute(count_query, {'blog_id' : blog_id})
found_rows = await cur.fetchone()
if found_rows[0] == 0:
await cnx.rollback()
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND)
query = '''
DELETE
FROM blog_table
WHERE blog_id = %(blog_id)s
'''
data = {
"blog_id" : blog_id
}
try:
await cur.execute(query, data)
except IntegrityError as e:
await cnx.rollback()
raise HTTPException(status_code=status.HTTP_409_CONFLICT, detail="already exist")
await cnx.commit()
return