import asyncio import os import pydantic from dotenv import load_dotenv from mysql.connector.aio import connect from mysql.connector.errors import IntegrityError from fastapi import HTTPException, status load_dotenv(verbose=True) async def get_cnx(): cnx = await connect( host = os.getenv("MYSQL_HOST"), user = os.getenv("MYSQL_USER"), password = os.getenv("MYSQL_PASSWORD"), database = os.getenv("MYSQL_DATABASE"), ) return cnx async def create_user(user_name:str, phone_number:str=None): async with await get_cnx() as cnx: async with await cnx.cursor() as cur: query = ''' INSERT INTO user_table (user_name, phone_number) VALUES (%(user_name)s, %(phone_number)s) ''' data = { "user_name" : user_name, "phone_number" : phone_number } try: await cur.execute(query, data) except IntegrityError as e: await cnx.rollback() raise HTTPException(status_code=status.HTTP_409_CONFLICT, detail="user already exist") user_id = cur.lastrowid await cnx.commit() return user_id async def get_user_info_from_id(user_id:int): async with await get_cnx() as cnx: async with await cnx.cursor() as cur: query = ''' SELECT user_name, phone_number FROM user_table WHERE user_id=%(user_id)s ''' data = {"user_id" : user_id} await cur.execute(query, data) user_info = await cur.fetchone() await cnx.commit() if not user_info: raise HTTPException(status.HTTP_404_NOT_FOUND) return user_info async def get_user_id_from_name(user_name:str): async with await get_cnx() as cnx: async with await cnx.cursor() as cur: query = ''' SELECT user_id FROM user_table WHERE user_name=%(user_name)s ''' data = {"user_name" : user_name} await cur.execute(query, data) user_info = await cur.fetchone() await cnx.commit() if not user_info: raise HTTPException(status.HTTP_404_NOT_FOUND) return user_info["user_id"] async def update_user_phone_from_id(user_id:int, phone_number:str=None): async with await get_cnx() as cnx: async with await cnx.cursor() as cur: count_query = ''' SELECT count(*) FROM user_table WHERE user_id = %(user_id)s ''' await cur.execute(count_query, {'user_id' : user_id}) found_rows = await cur.fetchone() if found_rows[0] == 0: await cnx.rollback() raise HTTPException(status_code=status.HTTP_404_NOT_FOUND) query = ''' UPDATE user_table SET phone_number = %(phone_number)s WHERE user_id = %(user_id)s ''' data = { "user_id" : user_id, "phone_number" : phone_number } await cur.execute(query, data) if cur.rowcount == 0: await cnx.rollback() raise HTTPException(status_code=status.HTTP_204_NO_CONTENT) await cnx.commit() return data async def delete_user_from_id(user_id:int): async with await get_cnx() as cnx: async with await cnx.cursor() as cur: count_query = ''' SELECT count(*) FROM user_table WHERE user_id = %(user_id)s ''' await cur.execute(count_query, {'user_id' : user_id}) found_rows = await cur.fetchone() if found_rows[0] == 0: await cnx.rollback() raise HTTPException(status_code=status.HTTP_404_NOT_FOUND) query = ''' DELETE FROM user_table WHERE user_id = %(user_id)s ''' data = { "user_id" : user_id } await cur.execute(query, data) await cnx.commit() return async def create_blog(blog_owner:int, blog_title:str, blog_content:str = None): async with await get_cnx() as cnx: async with await cnx.cursor() as cur: query = ''' INSERT INTO blog_table (blog_title, blog_content, blog_owner) VALUES (%(blog_title)s, %(blog_content)s, %(blog_owner)s) ''' data = { "blog_owner" : blog_owner, "blog_title" : blog_title, "blog_content" : blog_content } try: await cur.execute(query, data) except IntegrityError as e: await cnx.rollback() raise HTTPException(status_code=status.HTTP_409_CONFLICT, detail="blog already exist") blog_id = cur.lastrowid await cnx.commit() return blog_id async def get_blog_info_from_id(blog_id:int): async with await get_cnx() as cnx: async with await cnx.cursor() as cur: query = ''' SELECT blog_owner, blog_title, blog_content FROM blog_table WHERE blog_id=%(blog_id)s ''' data = {"blog_id" : blog_id} await cur.execute(query, data) user_info = await cur.fetchone() await cnx.commit() if not user_info: raise HTTPException(status.HTTP_404_NOT_FOUND) return user_info async def update_blog_content_from_id(blog_id:int, blog_content:str=None): async with await get_cnx() as cnx: async with await cnx.cursor() as cur: count_query = ''' SELECT count(*) FROM blog_table WHERE blog_id = %(blog_id)s ''' await cur.execute(count_query, {'blog_id' : blog_id}) found_rows = await cur.fetchone() if found_rows[0] == 0: await cnx.rollback() raise HTTPException(status_code=status.HTTP_404_NOT_FOUND) query = ''' UPDATE blog_table SET blog_content = %(blog_content)s WHERE blog_id = %(blog_id)s ''' data = { "blog_id" : blog_id, "blog_content" : blog_content } await cur.execute(query, data) if cur.rowcount == 0: await cnx.rollback() raise HTTPException(status_code=status.HTTP_204_NO_CONTENT) await cnx.commit() return data async def delete_blog_from_id(blog_id:int): async with await get_cnx() as cnx: async with await cnx.cursor() as cur: count_query = ''' SELECT count(*) FROM blog_table WHERE blog_id = %(blog_id)s ''' await cur.execute(count_query, {'blog_id' : blog_id}) found_rows = await cur.fetchone() if found_rows[0] == 0: await cnx.rollback() raise HTTPException(status_code=status.HTTP_404_NOT_FOUND) query = ''' DELETE FROM blog_table WHERE blog_id = %(blog_id)s ''' data = { "blog_id" : blog_id } await cur.execute(query, data) await cnx.commit() return