298 lines
9.8 KiB
Python
298 lines
9.8 KiB
Python
import asyncio
|
|
import os
|
|
import pydantic
|
|
from dotenv import load_dotenv
|
|
from mysql.connector.aio import connect
|
|
from mysql.connector.errors import IntegrityError
|
|
from fastapi import HTTPException, status
|
|
load_dotenv(verbose=True)
|
|
|
|
async def get_cnx():
|
|
cnx = await connect(
|
|
host = os.getenv("MYSQL_HOST"),
|
|
port = int(os.getenv("MYSQL_PORT")),
|
|
user = os.getenv("MYSQL_USER"),
|
|
password = os.getenv("MYSQL_PASSWORD"),
|
|
database = os.getenv("MYSQL_DATABASE"),
|
|
)
|
|
return cnx
|
|
|
|
async def create_user(user_name:str, phone_number:str=None):
|
|
async with await get_cnx() as cnx:
|
|
async with await cnx.cursor() as cur:
|
|
query = '''
|
|
INSERT INTO user_table (user_name, phone_number)
|
|
VALUES (%(user_name)s, %(phone_number)s)
|
|
'''
|
|
data = {
|
|
"user_name" : user_name,
|
|
"phone_number" : phone_number
|
|
}
|
|
try:
|
|
await cur.execute(query, data)
|
|
except IntegrityError as e:
|
|
await cnx.rollback()
|
|
raise HTTPException(status_code=status.HTTP_409_CONFLICT, detail="user already exist")
|
|
|
|
user_id = cur.lastrowid
|
|
await cnx.commit()
|
|
return user_id
|
|
|
|
async def get_user_info_from_id(user_id:int):
|
|
async with await get_cnx() as cnx:
|
|
async with await cnx.cursor() as cur:
|
|
query = '''
|
|
SELECT user_name, phone_number FROM user_table
|
|
WHERE user_id=%(user_id)s
|
|
'''
|
|
data = {"user_id" : user_id}
|
|
|
|
await cur.execute(query, data)
|
|
user_info = await cur.fetchone()
|
|
await cnx.commit()
|
|
if not user_info:
|
|
raise HTTPException(status.HTTP_404_NOT_FOUND)
|
|
return user_info
|
|
|
|
async def get_user_id_from_name(user_name:str):
|
|
async with await get_cnx() as cnx:
|
|
async with await cnx.cursor() as cur:
|
|
query = '''
|
|
SELECT user_id FROM user_table
|
|
WHERE user_name=%(user_name)s
|
|
'''
|
|
data = {"user_name" : user_name}
|
|
await cur.execute(query, data)
|
|
user_info = await cur.fetchone()
|
|
await cnx.commit()
|
|
if not user_info:
|
|
raise HTTPException(status.HTTP_404_NOT_FOUND)
|
|
return user_info["user_id"]
|
|
|
|
async def update_user_phone_from_id(user_id:int, phone_number:str=None):
|
|
async with await get_cnx() as cnx:
|
|
async with await cnx.cursor() as cur:
|
|
count_query = '''
|
|
SELECT count(*)
|
|
FROM user_table
|
|
WHERE user_id = %(user_id)s
|
|
'''
|
|
await cur.execute(count_query, {'user_id' : user_id})
|
|
found_rows = await cur.fetchone()
|
|
if found_rows[0] == 0:
|
|
await cnx.rollback()
|
|
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND)
|
|
|
|
query = '''
|
|
UPDATE user_table
|
|
SET phone_number = %(phone_number)s
|
|
WHERE user_id = %(user_id)s
|
|
'''
|
|
data = {
|
|
"user_id" : user_id,
|
|
"phone_number" : phone_number
|
|
}
|
|
|
|
await cur.execute(query, data)
|
|
|
|
if cur.rowcount == 0:
|
|
await cnx.rollback()
|
|
raise HTTPException(status_code=status.HTTP_204_NO_CONTENT)
|
|
|
|
await cnx.commit()
|
|
return data
|
|
|
|
|
|
async def delete_user_from_id(user_id:int):
|
|
async with await get_cnx() as cnx:
|
|
async with await cnx.cursor() as cur:
|
|
count_query = '''
|
|
SELECT count(*)
|
|
FROM user_table
|
|
WHERE user_id = %(user_id)s
|
|
'''
|
|
await cur.execute(count_query, {'user_id' : user_id})
|
|
found_rows = await cur.fetchone()
|
|
if found_rows[0] == 0:
|
|
await cnx.rollback()
|
|
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND)
|
|
|
|
query = '''
|
|
DELETE
|
|
FROM user_table
|
|
WHERE user_id = %(user_id)s
|
|
'''
|
|
data = {
|
|
"user_id" : user_id
|
|
}
|
|
await cur.execute(query, data)
|
|
await cnx.commit()
|
|
return
|
|
|
|
async def create_blog(blog_owner:int, blog_title:str, blog_content:str = None):
|
|
async with await get_cnx() as cnx:
|
|
async with await cnx.cursor() as cur:
|
|
query = '''
|
|
INSERT INTO blog_table (blog_title, blog_content, blog_owner)
|
|
VALUES (%(blog_title)s, %(blog_content)s, %(blog_owner)s)
|
|
'''
|
|
data = {
|
|
"blog_owner" : blog_owner,
|
|
"blog_title" : blog_title,
|
|
"blog_content" : blog_content
|
|
}
|
|
try:
|
|
await cur.execute(query, data)
|
|
except IntegrityError as e:
|
|
await cnx.rollback()
|
|
raise HTTPException(status_code=status.HTTP_409_CONFLICT, detail="blog already exist")
|
|
|
|
blog_id = cur.lastrowid
|
|
await cnx.commit()
|
|
return blog_id
|
|
|
|
async def get_blog_info_from_id(blog_id:int):
|
|
async with await get_cnx() as cnx:
|
|
async with await cnx.cursor() as cur:
|
|
query = '''
|
|
SELECT blog_owner, blog_title, blog_content FROM blog_table
|
|
WHERE blog_id=%(blog_id)s
|
|
'''
|
|
data = {"blog_id" : blog_id}
|
|
|
|
await cur.execute(query, data)
|
|
user_info = await cur.fetchone()
|
|
await cnx.commit()
|
|
if not user_info:
|
|
raise HTTPException(status.HTTP_404_NOT_FOUND)
|
|
return user_info
|
|
|
|
async def update_blog_content_from_id(blog_id:int, blog_content:str=None):
|
|
async with await get_cnx() as cnx:
|
|
async with await cnx.cursor() as cur:
|
|
count_query = '''
|
|
SELECT count(*)
|
|
FROM blog_table
|
|
WHERE blog_id = %(blog_id)s
|
|
'''
|
|
await cur.execute(count_query, {'blog_id' : blog_id})
|
|
found_rows = await cur.fetchone()
|
|
if found_rows[0] == 0:
|
|
await cnx.rollback()
|
|
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND)
|
|
|
|
query = '''
|
|
UPDATE blog_table
|
|
SET blog_content = %(blog_content)s
|
|
WHERE blog_id = %(blog_id)s
|
|
'''
|
|
data = {
|
|
"blog_id" : blog_id,
|
|
"blog_content" : blog_content
|
|
}
|
|
|
|
await cur.execute(query, data)
|
|
|
|
if cur.rowcount == 0:
|
|
await cnx.rollback()
|
|
raise HTTPException(status_code=status.HTTP_204_NO_CONTENT)
|
|
|
|
await cnx.commit()
|
|
return data
|
|
|
|
|
|
async def delete_blog_from_id(blog_id:int):
|
|
async with await get_cnx() as cnx:
|
|
async with await cnx.cursor() as cur:
|
|
count_query = '''
|
|
SELECT count(*)
|
|
FROM blog_table
|
|
WHERE blog_id = %(blog_id)s
|
|
'''
|
|
await cur.execute(count_query, {'blog_id' : blog_id})
|
|
found_rows = await cur.fetchone()
|
|
if found_rows[0] == 0:
|
|
await cnx.rollback()
|
|
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND)
|
|
|
|
query = '''
|
|
DELETE
|
|
FROM blog_table
|
|
WHERE blog_id = %(blog_id)s
|
|
'''
|
|
data = {
|
|
"blog_id" : blog_id
|
|
}
|
|
await cur.execute(query, data)
|
|
await cnx.commit()
|
|
return
|
|
|
|
|
|
|
|
|
|
|
|
|
|
async def truncate_user_table():
|
|
async with await get_cnx() as cnx:
|
|
async with await cnx.cursor() as cur:
|
|
query = '''
|
|
TRUNCATE TABLE user_table
|
|
'''
|
|
await cur.execute(query)
|
|
await cnx.commit()
|
|
return
|
|
|
|
async def get_table_list():
|
|
async with await get_cnx() as cnx:
|
|
async with await cnx.cursor() as cur:
|
|
query = '''
|
|
SHOW TABLES
|
|
'''
|
|
await cur.execute(query)
|
|
table_list = [result[0] for result in await cur.fetchall()]
|
|
await cnx.commit()
|
|
return table_list
|
|
|
|
|
|
async def create_user_table():
|
|
async with await get_cnx() as cnx:
|
|
async with await cnx.cursor() as cur:
|
|
query = '''
|
|
-- 테이블 생성 SQL - user_table
|
|
CREATE TABLE user_table
|
|
(
|
|
`user_id` INT NOT NULL AUTO_INCREMENT,
|
|
`user_name` VARCHAR(20) NOT NULL,
|
|
`phone_number` VARCHAR(20) NULL,
|
|
`created_at` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
|
|
`updated_at` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
|
|
PRIMARY KEY (user_id)
|
|
);
|
|
|
|
-- Unique Index 설정 SQL - user_table(user_name)
|
|
CREATE UNIQUE INDEX UQ_user_table_1
|
|
ON user_table(user_name);
|
|
'''
|
|
await cur.execute(query)
|
|
await cnx.commit()
|
|
return
|
|
|
|
async def create_blog_table():
|
|
async with await get_cnx() as cnx:
|
|
async with await cnx.cursor() as cur:
|
|
query = '''
|
|
-- 테이블 생성 SQL - blog_table
|
|
CREATE TABLE blog_table
|
|
(
|
|
`blog_id` INT NOT NULL AUTO_INCREMENT,
|
|
`blog_title` VARCHAR(50) NOT NULL,
|
|
`blog_content` TEXT NULL,
|
|
`blog_owner` INT NOT NULL,
|
|
`created_at` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
|
|
`updated_at` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
|
|
PRIMARY KEY (blog_id)
|
|
);
|
|
'''
|
|
await cur.execute(query)
|
|
await cnx.commit()
|
|
return |