Flask操作数据库

ppt

霍格沃兹测试开发

安装

  • 官网:https://docs.sqlalchemy.org/en/14/
  • 安装文档:https://docs.sqlalchemy.org/en/14/intro.html#installation
  • 声明 Models:https://docs.sqlalchemy.org/en/14/orm/index.html
  • SQLAlchemy 是 python 中最有名的 ORM 框架
  • 安装命令: pip install SQLAlchemy

sqlite 安装

orm的代码例子

from sqlalchemy import *
from sqlalchemy.orm import declarative_base, sessionmaker, Session

# 先创建基类,使用declarative_base()方法声明一个基类
# 创建表的时候需要继承这个基类对象
Base = declarative_base()
# 创建引擎,通过这个引擎,可以创建数据库连接,比如:mysql,sqlite...
# engine = create_engine("sqlite:///test.db")
## windows 时,有可能会报错,需要加上这个参数
engine = create_engine('sqlite:///test.db', connect_args={'check_same_thread': False})

# 连接的格式  dialect+driver://username:password@host:port/database
# sqlite: sqlite:///test.db
# mysql+pymysql://username:password@host:port/database

# 定义db_session对象,可以实现对数据库的增删改查,以及表的创建和删除
DBSession = sessionmaker(bind=engine)
db_session: Session = DBSession()


# 类--对应--表
class User(Base):
    __tablename__ = "user"
    # 属性 -- 对应 -- 表的列属性
    id = Column(Integer, primary_key=True)
    username = Column(String(80), nullable=False, unique=False)
    password = Column(String(120))

    def __repr__(self):
        return '<User %r>' % self.username


class TestCase(Base):
    __tablename__ = "testcase"
    id = Column(Integer, primary_key=True)
    name = Column(String(100), nullable=False)
    desc = Column(String(100))


if __name__ == '__main__':
    # 创建表,创建存储在元素数据中所有的表
    Base.metadata.create_all(engine)
    # 删除表
    # Base.metadata.drop_all(engine)

case_route

# # 导入 Flask 的模块
from flask import Blueprint, request

# from data.data import cases
from orm_study.orm_connect import db_session, TestCase

case_bp = Blueprint("用例管理", __name__, url_prefix="/testcase")


# 查询用例
@case_bp.route("/", methods=["GET"])
def get_case():
    args = request.args
    _id = args.get("id")
    datas = []
    if _id:
        case = db_session.query(TestCase).filter_by(id=_id).first()
        # for case in my_cases:
        #     if int(_id) == case.get("id"):
        #         datas.append(case)
        print(f"查询到的数据为=====>{case}")
        if case:
            datas.append({
                "id": case.id,
                "name": case.name,
                "desc": case.desc,
            })
    else:
        # datas = my_cases
        cases = db_session.query(TestCase).all()
        datas = [{"id": case.id, "name": case.name, "desc": case.desc} for case in cases]
        # # 具体的实现
        # for case in cases:
        #     dict_a = {}
        #     dict_a["id"] = case.id
        #     dict_a["name"] = case.name
        #     dict_a["desc"] = case.desc
        #     datas.append(dict_a)
    return {"code": 0, "msg": "success", "data": datas}


#
# 新增测试用例
@case_bp.route("/add", methods=["POST"])
def add_case():
    case_data = request.json
    print(f"接收到的参数<===={case_data}")
    # _id = int(case_data.get("id"))
    name = case_data.get("name")
    desc = case_data.get("desc")
    # new_case = {"id": _id, "name": name, "desc": desc}
    instance = TestCase(name=name, desc=desc)
    # 判断重复
    # for case in my_cases:
    #     if case.get("id") == _id or case.get("name") == name:
    #         return {"code": 40002, "msg": "case is exists"}
    # my_cases.append(new_case)
    # print(f"当前的全部用例: {my_cases}")
    # return {"code": 0, "msg": "add success"}
    if db_session.query(TestCase).filter_by(name=name).first():
        return {"code": 40002, "msg": "case is exists"}
    db_session.add(instance)
    db_session.commit()
    _id = instance.id
    db_session.close()
    if _id:
        return {"code": 0, "msg": "add success", "data": {"case_id": _id}}
    else:
        return {"code": 40001, "msg": "add failed"}


#
#
# 删除测试用例
@case_bp.route("/delete", methods=["DELETE"])
def delete_case():
    _id = int(request.args.get("id"))
    exist = db_session.query(TestCase).filter_by(id=_id).first()
    print(f"exist====> {exist}")
    if exist:
        db_session.query(TestCase).filter_by(id=_id).delete()
        db_session.commit()
        db_session.close()
        return {"code": 0, "msg": "delete success"}
    else:
        return {"code": 40003, "msg": "case is not exists"}


# 更新测试用例
@case_bp.route("/update", methods=["PUT"])
def update_case():
    case_data = request.json
    _id = case_data.get("id")
    name = case_data.get("name")
    desc = case_data.get("desc")
    # 如果case不存在,返回修改失败
    if db_session.query(TestCase).filter_by(id=_id).first():
        # 查到了case之后   进行更新操作
        new_case_data = {
            "name": name,
            "desc": desc,
        }
        db_session.query(TestCase).filter_by(id=_id).update(new_case_data)
        db_session.commit()
        db_session.close()
        return {"code": 0, "msg": "update success"}
    return {"code": 40003, "msg": "case is not exists"}

接口的测试用例

import time

import pytest
import requests


class TestCaseServer:

    def setup_class(self):
        self.base_url = "http://127.0.0.1:5000/testcase"

    def add_case(self):
        data = {
            "name": f"用例{time.time()}",
            "desc": f"描述{time.time()}"
        }
        res = requests.post(url=f"{self.base_url}/add", json=data)
        return res.json().get("data", {}).get("case_id")

    def test_add(self):
        data = {
            "name": f"用例{time.time()}",
            "desc": f"描述{time.time()}"
        }
        res = requests.post(url=f"{self.base_url}/add", json=data)
        print(res.json())
        assert res.json().get("code") == 0

    def test_search_all(self):
        res = requests.get(url=self.base_url)
        print(res.json())
        assert res.json().get("code") == 0

    @pytest.mark.parametrize("_id,exp_length", [(3, 1), (1, 0)])
    def test_search_one(self, _id, exp_length):
        res = requests.get(url=self.base_url, params={"id": _id})
        print(res.json())
        assert res.json().get("code") == 0
        length = len(res.json().get("data"))
        assert length == exp_length

    def test_delete(self):
        case_id = self.add_case()
        print(case_id)
        assert case_id
        url = f"{self.base_url}/delete"
        res = requests.delete(url, params={"id": case_id})
        print(res.json())
        assert res.json().get("code") == 0

    def test_update(self):
        case_id = self.add_case()
        print(case_id)
        assert case_id
        url = f"{self.base_url}/update"
        case = requests.get(url=self.base_url, params={"id": case_id}).json().get("data")[0]
        print(case)
        case.update(
            {
                "name": case.get("name") + "_update",
                "desc": case.get("desc") + "_update",
            }
        )
        print(case)
        res = requests.put(url=url, json=case)
        print(res.json())
        assert res.json().get("code") == 0

用户的登录

def user_login():
    """
    登录功能,获取用户提交的数据,进行校验,校验成功则返回登录成功
    :return: 
    """
    # return "登录成功"
    # 获取请求体
    data = request.json
    # 使用请求的数据构建用户的对象
    user = User(**data)  # 等价于 User(username=data.get("username"),password=data.get("password"))
    # 查询用户是否存在,存在则进行后续的步骤
    user_result_data = db_session.query(User).filter_by(username=user.username).first()
    # user_result = User()
    if not user_result_data:
        # 没查到用户
        return {"code": 405, "msg": "user is not register"}
    if not user_result_data.check_hash_password(data.get("password")):
        return {"code": 401, "msg": "user or password is wrong"}
    return {"code": 0, "msg": "user login success"}

平台的model设计

from datetime import datetime

from passlib.handlers.sha2_crypt import sha256_crypt
from sqlalchemy import *
from sqlalchemy.orm import declarative_base, sessionmaker, Session

# 先创建基类,使用declarative_base()方法声明一个基类
# 创建表的时候需要继承这个基类对象
Base = declarative_base()
# 创建引擎,通过这个引擎,可以创建数据库连接,比如:mysql,sqlite...
# engine = create_engine("sqlite:///test.db")
## windows 时,有可能会报错,需要加上这个参数
# engine = create_engine('sqlite:///test.db', connect_args={'check_same_thread': False}, echo=True)
db_host = "127.0.0.1"
db_port = "3306"
db_name = "test_platform"
db_user = "root"
db_psw = "hogwarts"
# 拼接url
db_url = f"mysql+pymysql://{db_user}:{db_psw}@{db_host}:{db_port}/{db_name}"
engine = create_engine(db_url, echo=True)

# 连接的格式  dialect+driver://username:password@host:port/database
# sqlite: sqlite:///test.db
# mysql+pymysql://username:password@host:port/database

# 定义db_session对象,可以实现对数据库的增删改查,以及表的创建和删除
DBSession = sessionmaker(bind=engine)
db_session: Session = DBSession()


# 类--对应--表
class User(Base):
    __tablename__ = "user"
    # 属性 -- 对应 -- 表的列属性
    id = Column(Integer, primary_key=True)
    username = Column(String(80), nullable=False, unique=False)
    password = Column(String(120), nullable=False)
    create_time = Column(DateTime, nullable=True, default=datetime.now().strftime("%Y-%m-%d %H:%M:%S"))

    def __init__(self, *args, **kwargs):
        self.username = kwargs.get("username")
        self.password = sha256_crypt.hash(kwargs.get("password"))

    def check_hash_password(self, raw_password):
        return sha256_crypt.verify(raw_password, self.password)

    def __repr__(self):
        return '<User %r>' % self.username

    def as_dict(self):
        return {
            "id": self.id,
            "username": self.username,
            "password": self.password,
            "create_time": str(self.create_time),
        }


class TestCase(Base):
    __tablename__ = "testcase"
    id = Column(Integer, primary_key=True)
    name = Column(String(100), nullable=False)
    desc = Column(String(100))

    def __repr__(self):
        return '<Case %r>' % self.name

    def as_dict(self):
        return {
            "id": self.id,
            "name": self.name,
            "desc": self.desc,
        }


if __name__ == '__main__':
    # 创建表,创建存储在元素数据中所有的表
    Base.metadata.create_all(engine)
    # 删除表
    # Base.metadata.drop_all(engine)

    # 数据的增加操作   create  C
    # 增 - 新增单条数据
    # instance = TestCase(name="用例2", desc="description1")
    # db_session.add(instance)
    # id = instance.id

    # #  批量添加操作
    # add_case1 = TestCase(name="用例3", desc="描述3")
    # add_case2 = TestCase(name="用例4", desc="描述4")
    # ## 方式1  依次添加到session
    # db_session.add(add_case1)
    # db_session.add(add_case2)
    # ## 方式2 一次性添加多条case
    # add_case3 = TestCase(name="用例5", desc="描述5")
    # add_case4 = TestCase(name="用例6", desc="描述6")
    # db_session.add_all([add_case3, add_case4])
    # db_session.commit()
    # db_session.close()
    # 查询操作   R
    # 格式  db_session.query("类").all()/.first()
    # res = db_session.query(TestCase).all()
    # for rs in res:
    #     print(rs)
    # res = db_session.query(TestCase).first()
    # print(res)
    ## 条件查询  db_session.query("类").filter_by(条件).all()/.first()
    # data  = db_session.query(TestCase).filter_by(id=3).first()
    # data  = db_session.query(TestCase).filter_by(id=3).all()
    # print(data)
    # data = db_session.query(TestCase).filter_by(id=3).filter_by(name="用例3").first()
    # print(data)
    # 数据的修改  U
    # 第一种方式,更新某个字段
    # case = db_session.query(TestCase).filter_by(id=1).first()
    # case.desc = "描述信息1111111"
    # # 提交
    # db_session.commit()
    # # 关闭session
    # db_session.close()
    #
    # # 第二种方式,直接调用update方法更新行数据
    # db_session.query(TestCase).filter_by(id=1).update({"name": "用例1111111"})
    # # 提交
    # db_session.commit()
    # # 关闭session
    # db_session.close()
    # 删除  D
    # 方式一:
    # # 查询数据
    # case = db_session.query(TestCase).filter_by(id=1).first()
    # # 删除操作
    # db_session.delete(case)
    # # 提交
    # db_session.commit()
    # # 关闭session
    # db_session.close()
    #
    # # 方式二:
    # # 查询结果直接删除操作
    # db_session.query(TestCase).filter_by(id=2).delete()
    # # 提交
    # db_session.commit()
    # # 关闭session
    # db_session.close()

目前的代码

dgut_backend.zip (174.2 KB)

1 个赞

202041412102甘胜华

from flask import Blueprint, request

from orm_study.orm_connect import User, db_session

user_bp = Blueprint("用户", __name__, url_prefix="/user")

#登录功能,获取用户提交的数据,进行密码校验,校验成功则返回登录成功
@user_bp.route("/login", methods=["post"])
def user_login():
    # 获取请求体
    data = request.json
    # 使用请求的数据构建用户的对象
    user = User(**data)  # 等价于 User(username=data.get("username"),password=data.get("password"))
    # 查询用户是否存在,存在则进行后续的步骤
    user_result_data = db_session.query(User).filter_by(username=user.username).first()

    if not user_result_data:
        # 没查到用户
        return {"code": 405, "msg": "user is not register"}
    if not user_result_data.check_hash_password(data.get("password")):
        return {"code": 401, "msg": "user or password is wrong"}
    return {"code": 0, "msg": "user login success"}

#注册用户,对密码进行加密
@user_bp.route("/register", methods=["post"])
def user_register():
    user_data = request.json
    # username = user_data.get("username")
    # password = user_data.get("password")
    # user = User(name=name, password=password)
    user = User(**user_data)

    if db_session.query(User).filter_by(username=user.username).first():
        return {"code": 400, "msg": "user is exist"}

    db_session.add(user)
    db_session.commit()
    user_id = user.id
    db_session.close()
    if user_id:
        return {"code": 200, "msg": "register success"}
    else:
        return {"code": 400, "msg": "register fail"}


#获取所有用户
@user_bp.route("/", methods=["get"])
def get_all_user():
    users = db_session.query(User).all()
    if users:
        return {
            "code": 200,
            "msg": "success",
            "data": [{
                "id": user.id,
                "username": user.username,
                "password": user.password,
                "create_time": user.create_time
                }
                for user in users
            ]
        }

    return {"code": 404, "msg": "users is not exist", "data": {}}


#删除用户
@user_bp.route("/delete", methods=["delete"])
def delete_user():
    user_id = request.json.get("id")
    user = db_session.query(User).filter_by(id=user_id).first()

    if user:
        db_session.delete(user)
        db_session.commit()
        db_session.close()
        return {"code": 200, "msg": "delete success", "data": user.as_dict()}

    return {"code": 404, "msg": "user is not exist", "data": {}}

注册:
image

登录:
image

查询所有用户:

删除用户:

202041412208 蒋冠岳
user_route.py

from flask import Blueprint, request

from backend.orm_study.orm_connect import User, db_session
from data import users

user_bp = Blueprint("用户管理", __name__, url_prefix="/user")
userList = users


@user_bp.route("/", methods=["GET"])
def user():
    user_list = db_session.query(User).all()
    for user in user_list:
        print(user)
    print(f"userList:{user_list}")
    if user_list:
        return {"code": 0, "msg": "select success"}
    else:
        return {"code": 403, "msg": "select fail"}


@user_bp.route("/login", methods=["POST"])
def user_login():
    data = request.json
    user = User(**data)  # 等价于User(username=data.get("username"), password=data.get("password"))
    user_result_data = db_session.query(User).filter_by(username=user.username).first()
    # user_result = User()
    if not user_result_data:
        return {"code": 405, "msg": "user is not register"}
    if not user_result_data.check_hash_password(data.get("password")):
        return {"code": 401, "msg": "user or password is wrong"}
    return {"code": 0, "msg": "user login success"}


@user_bp.route("/register", methods=["POST"])
def user_register():
    data = request.json
    user = User(**data)
    user_result_data = db_session.query(User).filter_by(username=user.username).first()
    if user_result_data:
        return {"code": 405, "msg": "user is register"}
    else:
        db_session.add(user)
        db_session.commit()
        user_id = user.id
        # db_session.close()
        if user_id:
            return {"code": 0, "msg": "user register success"}

        else:
            return {"code": 0, "msg": "user register fail"}


@user_bp.route("/delete", methods=["DELETE"])
def user_delete():
    _id = request.args.get("id")
    print(f"id={_id}")
    exist = db_session.query(User).filter_by(id=_id).first()
    print(f"exist=====>{exist}")
    if exist:
        db_session.query(User).filter_by(id=_id).delete()
        db_session.commit()
        # db_session.close()
        return {"code": 0, "msg": "delete success"}
    else:
        return {"code": 0, "msg": "delete false"}

test_user_route.py

import requests


class TestUserRoute:

    def setup_class(self):
        self.base_url = "http://127.0.0.1:5000/user"

    def test_all(self):
        res = requests.get(f"{self.base_url}")
        print(res.json())
        assert res.json().get("code") == 0

    def test_login(self):
        data = {
            "username": "又待怎样",
            "password": "123456"
        }
        res = requests.post(f"{self.base_url}/login", json=data)
        print(res.json())
        assert res.json().get("code") == 0

    def test_register(self):
        data = {
            "username": "又待怎样",
            "password": "123456"
        }
        res = requests.post(f"{self.base_url}/register", json=data)
        print(res.json())
        assert res.json().get("code") == 0

    def test_delete(self):
        param = {"id": 1}
        res = requests.delete(f"{self.base_url}/delete", params=param)
        print(res.json())
        assert res.json().get("code") == 0

user_route.py

# # 导入 Flask 的模块
from flask import Blueprint, request
import datetime

# from data.data import cases
from orm_connect import db_session, TestCase, User

user = Blueprint("用户管理",__name__, url_prefix="/user")

@user.route("/register", methods = ["POST"])
def user_register():
    register_data = request.json
    username = register_data.get("username")
    password = register_data.get("password")
    create_time = datetime.datetime.now()
    instance = User(username=username,password=password,create_time=create_time)

    if db_session.query(User).filter_by(username = username).first():
        return {"code": 40002, "msg": "user is exists"}
    db_session.add(instance)
    db_session.commit()
    _id = instance.id
    db_session.close()
    if _id:
        return {"code": 0, "msg": "add success"}
    else:
        return {"code": 40001, "msg": "add failed"}

@user.route("/login", methods = ["POST"])
def user_login():
    """
    登录功能,获取用户提交的数据,进行校验,校验成功则返回登录成功
    :return:
    """
    # return "登录成功"
    # 获取请求体
    data = request.json
    # 使用请求的数据构建用户的对象
    user = User(**data)  # 等价于 User(username=data.get("username"),password=data.get("password"))
    # 查询用户是否存在,存在则进行后续的步骤
    user_result_data = db_session.query(User).filter_by(username=user.username).first()
    # user_result = User()
    if not user_result_data:
        # 没查到用户
        return {"code": 405, "msg": "user is not register"}
    if not user_result_data.check_hash_password(data.get("password")):
        return {"code": 401, "msg": "user or password is wrong"}
    return {"code": 0, "msg": "user login success"}

@user.route("/list", methods = ["GET"])
def user_list():
    users = db_session.query(User).all()
    if users:
        return {"code":0, "msg":"get all message success",
                "data":[{"id":user.id, "username":user.username, "password":user.password, "create_time":user.create_time} for user in users]
                }
    return {"code":200, "msg":"user massage not found"}

@user.route("/delete", methods = ["DELETE"])
def user_delete():
    user_id = request.json.get("id")
    user = db_session.query(User).filter_by(id = user_id).first()
    if user:
        db_session.delete(user)
        db_session.commit()
        db_session.close()
        return {"code":0, "msg":"delete success"}
    return {"code":200, "msg":"user is not exists"}

test_user_route.py

import requests


class TestUserRoute:

    def setup_class(self):
        self.base_url = "http://127.0.0.1:5000/user"

    def test_list(self):
        res = requests.get(f"{self.base_url}/list")
        print(res.json())
        assert res.json().get("code") == 0

    def test_login(self):
        data = {
            "username": "name2",
            "password": "123456"
        }
        res = requests.post(f"{self.base_url}/login", json=data)
        print(res.json())
        assert res.json().get("code") == 0

    def test_register(self):
        data = {
            "username": "asfgfg6",
            "password": "123456"
        }
        res = requests.post(f"{self.base_url}/register", json=data)
        print(res.json())
        assert res.json().get("code") == 0

    def test_delete(self):
        data = {"id": 5}
        res = requests.delete(f"{self.base_url}/delete", json=data)
        print(res.json())
        assert res.json().get("code") == 0

结果:

学号:202041404110 姓名:胡鑫

user_route.py

from flask import Blueprint,request
from orm_study.orm_connect import db_session, User

user_bp = Blueprint("用户管理", __name__, url_prefix="/user")


@user_bp.route("/", methods=['get'])
def user():
    res = db_session.query(User).all()
    users = []
    for rs in res:
        data = {"id": rs.id, "username": rs.username, "password": rs.password}
        users.append(data)
    return {"code": 200, "msg": "success", "data": users}


@user_bp.route("/login", methods=['post'])
def user_login():
    username = request.json.get("username")
    password = request.json.get("password")
    if not db_session.query(User).filter_by(username=username).first():
        return {"code": 401, "msg": "not this user"}
    if db_session.query(User).filter_by(username=username).filter_by(password=password).first():
        return {"code": 200, "msg": "login success"}
    return {"code": 402, "msg": "password is wrong"}


@user_bp.route("/register", methods=['post'])
def user_register():
    username = request.json.get("username")
    password = request.json.get("password")
    if db_session.query(User).filter_by(username=username).first():
        return {"code": 401, "msg": "username is exist"}
    db_session.add(User(username=username, password=password))
    db_session.commit()
    if db_session.query(User).filter_by(username=username).first():
        return {"code": 200, "msg": "register success"}
    return {"code": 501, "msg": "register fail"}


@user_bp.route("/delete", methods=['delete'])
def delete_user():
    _id = request.args.get("id")
    if not db_session.query(User).filter_by(id=_id).first():
        return {"code": 401, "msg": "no this user"}
    db_session.query(User).filter_by(id=_id).delete()
    db_session.commit()
    if db_session.query(User).filter_by(id=_id).first():
        return {"code": 501, "msg": "delete fail"}
    return {"code": 200, "msg": "delete success"}




server.py:

from flask import Flask
from user_route import user_bp

app = Flask(__name__)

# 注册蓝图
app.register_blueprint(user_bp)


if __name__ == '__main__':
    app.run()


test_server.py

import requests


class TestServer:

    def setup_class(self):
        self.base_url = "http://127.0.0.1:5000"


    def test_users_get(self):
        url = f"{self.base_url}/user"
        res = requests.get(url)
        print(res.json())

    def test_user_login(self):
        url = f"{self.base_url}/user/login"
        user_data = {
            "username": "hxin4",
            "password": "123456789"
        }
        res = requests.post(url, json=user_data)
        print(res.json())

    def test_user_register(self):
        url = f"{self.base_url}/user/register"
        user_data = {
            "username": "hxin1",
            "password": "123456"
        }
        res = requests.post(url, json=user_data)
        print(res.json())

    def test_delete_user(self):
        url = f"{self.base_url}/user/delete"
        arg = {"id": 1}
        res = requests.delete(url, params=arg)
        print(res.json())


测试结果:

202041404121 阮达

from flask import Blueprint, request
from study6.orm_mysql import db_session, User

user_bp = Blueprint("用户管理", __name__, url_prefix="/user")


@user_bp.route("/", methods=["GET"])
def user():
    user_list = db_session.query(User).all()
    for user in user_list:
        print(user)
    print(f"userList:{user_list}")
    if user_list:
        return {"code": 0, "msg": "select success"}
    else:
        return {"code": 4003, "msg": "select fail"}


@user_bp.route("/login", methods=["POST"])
def user_login():
    data = request.json
    user = User(**data)  # 等价于User(username=data.get("username"), password=data.get("password"))
    user_result_data = db_session.query(User).filter_by(username=user.username).first()
    # user_result = User()
    if not user_result_data:
        return {"code": 4005, "msg": "user is not register"}
    if not user_result_data.check_hash_password(data.get("password")):
        return {"code": 4001, "msg": "user or password is wrong"}
    return {"code": 0, "msg": "user login success"}


@user_bp.route("/register", methods=["POST"])
def user_register():
    data = request.json
    user = User(**data)
    user_result_data = db_session.query(User).filter_by(username=user.username).first()
    if user_result_data:
        return {"code": 4005, "msg": "user is register"}
    else:
        db_session.add(user)
        db_session.commit()
        user_id = user.id
        # db_session.close()
        if user_id:
            return {"code": 0, "msg": "user register success"}

        else:
            return {"code": 0, "msg": "user register fail"}


@user_bp.route("/delete", methods=["DELETE"])
def user_delete():
    _id = request.args.get("id")
    print(f"id={_id}")
    exist = db_session.query(User).filter_by(id=_id).first()
    print(f"exist=====>{exist}")
    if exist:
        db_session.query(User).filter_by(id=_id).delete()
        db_session.commit()
        # db_session.close()
        return {"code": 0, "msg": "delete success"}
    else:
        return {"code": 0, "msg": "delete false"}

user_route.py

from flask import Blueprint, request

from orm_study.orm_connect import User, db_session

user_bp = Blueprint("用户管理", __name__, url_prefix="/user")


@user_bp.route("/", methods=["GET"])
def user():
    users = db_session.query(User).all()
    if users:
        return {"code": 0, "msg": "user list",
                "data": [{"id": user.id, "username": user.username, "password": user.password, "create_time":
                    user.create_time} for user in users]}
    return {"code": 405, "msg": "user massage not found"}


@user_bp.route("/login", methods=["POST"])
def user_login():
    """
    登录功能,获取用户提交的数据,进行校验,校验成功则返回登录成功
    :return:
    """
    # return "登录成功"
    # 获取请求体
    data = request.json
    # 使用请求的数据构建用户的对象
    user = User(**data)  # 等价于 User(username=data.get("username"),password=data.get("password"))
    # 查询用户是否存在,存在则进行后续的步骤
    user_result_data = db_session.query(User).filter_by(username=user.username).first()
    # user_result = User()
    if not user_result_data:
        # 没查到用户
        return {"code": 405, "msg": "user is not register"}
    if not user_result_data.check_hash_password(data.get("password")):
        return {"code": 401, "msg": "user or password is wrong"}
    return {"code": 0, "msg": "user login success"}


@user_bp.route("/register", methods=["POST"])
def user_register():
    # 获取请求体
    data = request.json
    # 构建用户对象
    user = User(**data)
    print(user.password)

    if user:
        # 查询用户是否存在,存在则进行后续的步骤
        user_result_data = db_session.query(User).filter_by(username=user.username).first()
        if user_result_data:
            return {"code": 40003, "msg": "user is exist"}
        db_session.add(user)
        db_session.commit()
        user_id = user.id
        db_session.close()
        if user_id:
            # 存在id,则证明新增成功了
            return {"code": 0, "msg": f"register success"}
        else:
            return {"code": 40001, "msg": "register fail"}
    return {"code": 40001, "msg": "register fail"}


@user_bp.route("/delete", methods=["DELETE"])
def delete_user():
    # 获取请求体
    data = request.json
    user_id = data.get("id")
    user_result_data = db_session.query(User).filter_by(id=user_id).first()
    if user_result_data:
        db_session.delete(user_result_data)
        db_session.commit()
        db_session.close()
        return {"code": 0, "msg": "delete success"}
    return {"code": 40001, "msg": "delete fail"}

202041404203 段杰东

from flask import Blueprint, request

from sql import User, session

user_bp = Blueprint("用户管理", __name__,url_prefix="/users", template_folder='/user')



@user_bp.route("/")
def user():
    user2 = session.query(User).all()
    users = []
    for rs in user2:
        data = {"id":rs.id, "username": rs.username, "password": rs.password}
        users.append(data)
    return {"code": 200, "msg": "success", "data": users}


@user_bp.route("/login")
def user_login():
    data = request.json
    user = User(**data)  # 等价于 User(username=data.get("username"),password=data.get("password"))
    user_result_data = session.query(User).filter_by(username=user.username).first()
    if not user_result_data:
        # 没查到用户
        return {"code": 405, "msg": "user is not register"}
    if not user_result_data.check_hash_password(data.get("password")):
        return {"code": 401, "msg": "user or password is wrong"}
    return {"code": 0, "msg": "user login success"}


@user_bp.route("/register")
def user_register():
    data = request.json
    user = User(**data)
    user_result = session.query(User).filter_by(username=user.username).first()
    if not user_result:
        username = data.get("username")
        password = data.get("password")
        session.add(User(username=f"{username}", password=f"{password}"))
        session.commit()
        return {"code": 200, "msg": "注册成功"}
    else:
        return {"code": 200, "msg": "已存在该用户名"}


@user_bp.route("/delete")
def delete_user():
    id = request.args.get("id")
    user_result = session.query(User).filter_by(id=id).first()
    if not user_result:
        return {"code": 401, "msg": "没有该用户"}
    session.query(User).filter_by(id=id).delete()
    session.commit()
    if session.query(User).filter_by(id=id).first():
        return {"code": 501, "msg": "delete fail"}
    return {"code": 200, "msg": "delete success"}

image
image
image

202041412221 阮凯林

from flask import Blueprint, request
from orm_study.orm_connect import db_session, User

user_server = Blueprint("User", __name__, url_prefix="/user")


@user_server.route("/login", methods=["post"])
def login():
    request_body = request.json
    login_user = {
        "username": request_body.get("username"),
        "password": request_body.get("password")
    }
    user_list = db_session.query(User).all()
    for user in user_list:
        if login_user.get("username") == user.get("username") and login_user.get("password") == user.get("password"):
            return {"code": 200, "msg": "login success"}
    return {"code": 401, "msg": "account or password error"}


@user_server.route("/register", methods=["post"])
def register():
    request_body = request.json
    register_user = {
        "username": request_body.get("username"),
        "password": request_body.get("password")
    }
    user_list = db_session.query(User).all()
    for user in user_list:
        if register_user.get("username") == user.get("username"):
            return {"code": 401, "msg": "username is exist"}
    db_session.add(User(username=register_user.get("username"), password=register_user.get("password")))
    return {"code": 200, "msg": "register success"}

202041412213 林梓豪
user_route.py

from flask import Blueprint, request

from orm_study.orm_connect1 import User, db_session, TestCase

user_bp = Blueprint("用户管理", __name__, url_prefix="/user")

base_url = "http://127.0.0.1:5000"


@user_bp.route("/", methods=["get"])
def user():
    user_list = db_session.query(User).all()
    for user in user_list:
        print(user)
    print(f"userList:{user_list}")
    if user_list:
        return {"code": 0, "msg": "select success"}
    else:
        return {"code": 403, "msg": "select fail"}

@user_bp.route("/login", methods=["post"])
def user_lgoin():
    data = request.json
    user = User(**data)  # 等价于User(username = data.get("username"),password = data.get("password"))
    # 查询用户是否存在
    user_result_data = db_session.query(User).filter_by(username=user.username).first()
    # user_result = User()
    if not user_result_data:
        return {"code": 405, "msg": "user is not register"}
    if not user_result_data.check_hash_password(data.get("password")):
        return {"code": 401, "msg": "user or password is wrong"}
    return {"code": 0, "msg": "user login success"}


@user_bp.route("/register", methods=["post"])
def user_register():
    data = request.json
    user = User(**data)  # 等价于User(username = data.get("username"),password = data.get("password"))
    # 查询用户是否存在

    if user:
        user_result_data = db_session.query(User).filter_by(username=user.username).first()
        if user_result_data:
            return {"code": 40003, "msg": "user is exist"}
        db_session.add(user)
        db_session.commit()
        user_id = user.id
        db_session.close()
        if user_id:
            return {"code": 0, "msg": "register success"}
        else:
            return {"code": 40001, "msg": "register failed"}

    return {"code": 40001, "msg": "register failed"}


@user_bp.route("/delete",methods=["delete"])
def delete_user():
    _id = int(request.args.get("id"))
    exist = db_session.query(User).filter_by(id=_id).first()
    print(f"exist====> {exist}")
    if exist:
        db_session.query(User).filter_by(id=_id).delete()
        db_session.commit()
        # db_session.close()
        return {"code": 0, "msg": "delete success"}
    else:
        return {"code": 0, "msg": "case is not exists"}

test_user_server

import pytest
import requests


class TestUserServer:

    def setup_class(self):
        self.base_url = "http://127.0.0.1:5000/user"

    def test_login(self):
        url = f"{self.base_url}/login"
        data = {
            "username": "lihua",
            "password": "lh123456"
        }
        res = requests.post(url=url,json=data)
        print(res.json())

    def test_register(self):
        url = f"{self.base_url}/register"
        data = {
            "username": "lihua",
            "password": "lh123456"
        }
        res = requests.post(url=url,json=data)
        print(res.json())

    def test_get_user(self):
        url = self.base_url
        res = requests.get(url)
        print(res.json())
        assert res.json().get("code") == 0

    def test_delete(self):
        param ={"id":1}
        url = f"{self.base_url}/delete"
        res = requests.delete(url, params=param)
        print(res.json())
        assert res.json().get("code") == 0

202041412216 刘尚杰

from flask import Blueprint, request
from orm_study.orm_connect import db_session, User

user_server = Blueprint("User", __name__, url_prefix="/user")


@user_server.route("/login", methods=["post"])
def login():
    request_body = request.json
    login_user = {
        "username": request_body.get("username"),
        "password": request_body.get("password")
    }
    user_list = db_session.query(User).all()
    for user in user_list:
        if login_user.get("username") == user.get("username") and login_user.get("password") == user.get("password"):
            return {"code": 200, "msg": "登录成功"}
    return {"code": 401, "msg": "账号或者密码错误"}


@user_server.route("/register", methods=["post"])
def register():
    request_body = request.json
    register_user = {
        "username": request_body.get("username"),
        "password": request_body.get("password")
    }
    user_list = db_session.query(User).all()
    for user in user_list:
        if register_user.get("username") == user.get("username"):
            return {"code": 401, "msg": "用户名已存在"}
    db_session.add(User(username=register_user.get("username"), password=register_user.get("password")))
    return {"code": 200, "msg": "注册成功"}

姓名:翁开锋 学号:202041412223
user_route.py

from flask import Blueprint, request
from requests import Session
from sqlalchemy.orm import sessionmaker

from orm_study.orm_correct import User, engine

DBSession = sessionmaker(bind=engine)
db_session: Session = DBSession()

user_bp = Blueprint("用户管理", __name__, url_prefix="/user")

# list_login = [
#     {
#         "name": "admin",
#         "password": "12345"
#     }, {
#         "name": "me",
#         "password": "123456"
#     }, {
#         "name": "username",
#         "password": "56789"
#     }]


@user_bp.route("/")
def user():
    user_list = db_sesion.query(User).all()
    for user in user_list:
        print(user)
    return {
        "code": 0,
        "msg": "success",
        "data": user_list
    }


@user_bp.route("/login", methods=["post"])
def user_login():
    # name = request.form.get("name")
    # password = request.form.get("password")
    data = request.json
    user = User(**data)
    print(f"{user.name},{user.password}")
    user_data = db_session.query(User).filter_by(username=user.username).first()
    if not user_data:
        return {"code": 405, "msg": "user is not register"}
    if not user_data.check_hash_password(data.get("password")):
        return {"code": 401, "msg": "user or password is wrong"}
    return {"code": 0, "msg": "user login success"}


@user_bp.route("/register", methods=["post"])
def user_register():
    # name = request.form.get("name")
    # password = request.form.get("password")
    data = request.json
    user = User(**data)
    user_data = db_session.query(User).filter_by(username=user.username).first()
    if user_data:
        return {"code": 405, "msg": "user is register"}
    else:
        db_session.add(user)
        db_session.commit()
        user_id = user.id
        # db_session.close()
        if user_id:
            return {"code": 0, "msg": "user register success"}

        else:
            return {"code": 0, "msg": "user register fail"}


@user_bp.route("/delete", methods=["post"])
def delete_user():
    _id = request.args.get("id")
    print(f"id={_id}")
    exist = db_session.query(User).filter_by(id=_id).first()
    print(f"exist=====>{exist}")
    if exist:
        db_session.query(User).filter_by(id=_id).delete()
        db_session.commit()
        # db_session.close()
        return {"code": 0, "msg": "delete success"}
    else:
        return {"code": 0, "msg": "delete false"}


test_user.py

import requests


class TestUserRoute:

    def setup_class(self):
        self.base_url = "http://127.0.0.1:5000/user"

    def test_all(self):
        res = requests.get(f"{self.base_url}")
        print(res.json())
        assert res.json().get("code") == 0

    def test_login(self):
        data = {
            "username": "admin",
            "password": "123456"
        }
        res = requests.post(f"{self.base_url}/login", json=data)
        print(res.json())
        assert res.json().get("code") == 0

    def test_register(self):
        data = {
            "username": "admin",
            "password": "123456"
        }
        res = requests.post(f"{self.base_url}/register", json=data)
        print(res.json())
        assert res.json().get("code") == 0

    def test_delete(self):
        param = {"id": 1}
        res = requests.delete(f"{self.base_url}/delete", params=param)
        print(res.json())
        assert res.json().get("code") == 0

姓名:胡福生 学号:202041404204
user_route.py

from flask import Blueprint,request
from orm_study.orm_connect import db_session, User

user_bp = Blueprint("用户管理", __name__, url_prefix="/user")


@user_bp.route("/", methods=['GET'])
def user():
    users = db_session.query(User).all()
    if users:
        return {"code": 0, "msg": "select success",
                "data": [{"id": user.id, "username": user.username, "password": user.password,"create_time": user.create_time} for user in users]}
    else:
        return {'code': 401, 'msg': 'fail', 'data': "list display fail"}

@user_bp.route("/login", methods=['POST'])
def user_login():
    """
    登录功能,获取用户提交的数据,进行校验,校验成功则返回登录成功
    :return:
    """
    # return "登录成功"
    # 获取请求体
    data = request.json
    # 使用请求的数据构建用户的对象
    user = User(**data)  # 等价于 User(username=data.get("username"),password=data.get("password"))
    # 查询用户是否存在,存在则进行后续的步骤
    user_result_data = db_session.query(User).filter_by(username=user.username).first()
    # user_result = User()
    if not user_result_data:
        # 没查到用户
        return {"code": 405, "msg": "user is not register"}
    if not user_result_data.check_hash_password(data.get("password")):
        return {"code": 401, "msg": "user or password is wrong"}
    return {"code": 0, "msg": "user login success"}


@user_bp.route("/register", methods=['POST'])
def user_register():
    # 获取请求体
    data = request.json
    # 使用请求的数据构建用户的对象
    user = User(**data)  # 等价于 User(username=data.get("username"),password=data.get("password"))
    # 查询用户是否存在,存在则进行后续的步骤
    user_result_data = db_session.query(User).filter_by(username=user.username).first()
    # user_result = User()
    if not user_result_data:
        # 没查到用户
        db_session.add(user)
        db_session.commit()
        db_session.close()
        return {"code": 0, "msg": "register success"}
    else:
        return {"code": 401, "msg": "register fail",'data': '用户已存在'}

@user_bp.route("/delete", methods=['DELETE'])
def delete_user():
    _id = request.json.get("id")
    user_result_data = db_session.query(User).filter_by(id=_id).first()
    # user_result = User()
    if not user_result_data:
        # 没查到用户
        return {'code': 401, 'msg': 'delete fail', 'data': '用户不存在'}
    else:
        db_session.query(User).filter_by(id=_id).delete()
        db_session.commit()
        db_session.close()
        return {'code': 0, 'msg': 'delete success', 'data': '用户已删除'}

test_user_route.py

import requests

class TestUserRoute:

    def setup_class(self):
        self.base_url = "http://127.0.0.1:5000/user"

    def test_user(self):
        res = requests.get(f"{self.base_url}")
        print(res.json())
        assert res.json().get("code") == 0

    def test_user_login(self):
        data = {
            "username": "hfs",
            "password": "202041404204"
        }
        res = requests.post(f"{self.base_url}/login", json=data)
        print(res.json())
        assert res.json().get("code") == 0

    def test_user_register(self):
        data = {
            "username": "hufusheng",
            "password": "12345678"
        }
        res = requests.post(f"{self.base_url}/register", json=data)
        print(res.json())
        assert res.json().get("code") == 0

    def test_user_delete(self):
        data = {"id": 2}
        res = requests.delete(f"{self.base_url}/delete", json=data)
        print(res.json())
        assert res.json().get("code") == 0

server.py

from flask import Flask
from case_route import case_bp
from user_route import user_bp

app = Flask(__name__)
app.register_blueprint(user_bp)

if __name__ == '__main__':
    app.run(host="0.0.0.0", debug=True)


from flask import Blueprint,request
from orm_study.orm_connect import db_session, User

user_bp = Blueprint("用户管理", __name__, url_prefix="/user")


@user_bp.route("/", methods=['get'])
def user():
    res = db_session.query(User).all()
    users = []
    for rs in res:
        data = {"id": rs.id, "username": rs.username, "password": rs.password}
        users.append(data)
    return {"code": 200, "msg": "success", "data": users}


@user_bp.route("/login", methods=['post'])
def user_login():
    username = request.json.get("username")
    password = request.json.get("password")
    if not db_session.query(User).filter_by(username=username).first():
        return {"code": 401, "msg": "not this user"}
    if db_session.query(User).filter_by(username=username).filter_by(password=password).first():
        return {"code": 200, "msg": "login success"}
    return {"code": 402, "msg": "password is wrong"}


@user_bp.route("/register", methods=['post'])
def user_register():
    username = request.json.get("username")
    password = request.json.get("password")
    if db_session.query(User).filter_by(username=username).first():
        return {"code": 401, "msg": "username is exist"}
    db_session.add(User(username=username, password=password))
    db_session.commit()
    if db_session.query(User).filter_by(username=username).first():
        return {"code": 200, "msg": "register success"}
    return {"code": 501, "msg": "register fail"}


@user_bp.route("/delete", methods=['delete'])
def delete_user():
    _id = request.args.get("id")
    if not db_session.query(User).filter_by(id=_id).first():
        return {"code": 401, "msg": "no this user"}
    db_session.query(User).filter_by(id=_id).delete()
    db_session.commit()
    if db_session.query(User).filter_by(id=_id).first():
        return {"code": 501, "msg": "delete fail"}
    return {"code": 200, "msg": "delete success"}


import requests


class TestUserServer:

    def setup_class(self):
        self.base_url = "http://127.0.0.1:5000/user"

    def test_register(self):
        url = f"{self.base_url}/register"
        data = {
            "username":"lisi",
            "password": "aa123456"
        }
        res = requests.post(url=url,json=data)
        print(res.json())

    def test_login(self):
        url = f"{self.base_url}/login"
        data = {
            "username":"lisi",
            "password": "aa123456"
        }
        res = requests.post(url=url,json=data)
        print(res.json())

202041404101 曹岚

case_route.py


from flask import Blueprint, request

from orm_study.orm_connect import User, db_session

user_bp = Blueprint("用户管理", __name__, url_prefix="/user")


@user_bp.route("/")
def user():
    return "用户列表"


@user_bp.route("/login", methods=["POST"])
def user_login():
    """
    登录功能,获取用户提交的数据,进行校验,校验成功则返回登录成功
    :return:
    """
    # return "登录成功"
    # 获取请求体
    data = request.json
    # 使用请求的数据构建用户的对象
    user = User(**data)  # 等价于 User(username=data.get("username"),password=data.get("password"))
    # 查询用户是否存在,存在则进行后续的步骤
    user_result_data = db_session.query(User).filter_by(username=user.username).first()
    # user_result = User()
    if not user_result_data:
        # 没查到用户
        return {"code": 405, "msg": "user is not register"}
    if not user_result_data.check_hash_password(data.get("password")):
        return {"code": 401, "msg": "user or password is wrong"}
    return {"code": 0, "msg": "user login success"}


@user_bp.route("/register", methods=["POST"])
def user_register():
    # 获取请求体
    data = request.json
    # 构建用户对象
    user = User(**data)
    print(user.password)

    if user:
        # 查询用户是否存在,存在则进行后续的步骤
        user_result_data = db_session.query(User).filter_by(username=user.username).first()
        if user_result_data:
            return {"code": 40003, "msg": "user is exist"}
        db_session.add(user)
        db_session.commit()
        user_id = user.id
        if user_id:
            # 存在id,则证明新增成功了
            return {"code": 0, "msg": f"register success"}
        else:
            return {"code": 40001, "msg": "register fail"}
    return {"code": 40001, "msg": "register fail"}

@user_bp.route("/get", methods = ["GET"])
def user_list():
    users = db_session.query(User).all()
    if users:
        return {"code":200, "msg":"success",
                "data":[{"id":user.id, "username":user.username, "password":user.password, "create_time":user.create_time} for user in users]
                }
    return {"code":401, "msg":"fail"}

@user_bp.route("/delete",methods=["DELETE"])
def delete_user():
    _id = request.args.get("id")
    user_d = db_session.query(User).filter_by(id=_id).first()
    if user_d:
        db_session.query(User).filter_by(id=_id).delete()
        db_session.commit()
        db_session.close()
        return {"code": 200, "msg": "success"}
    return {"code": 401, "msg": "no this user"}




姓名:李升升
学号:202041404115


image

202041412220 区世钧
route:
M(H7XHL2UU1NQXO640G{)}D
~3FT%NK8MXGMZBX~VZ23W
FKFGAEAU3GVPZESKH{CEQ
6Q}HYW~HK%`2O7DZ432){70
H)~Y2XHEIQ1LAQ}Z5$VU5
test
F~RV%C2S5MLM$%I~VGCCFE

202041404214刘均豪

from flask import Blueprint, request

from orm_connect import User, db_session

user_bp = Blueprint(“用户管理”, name, url_prefix=“/user”)

@user_bp.route(“/”)
def user():
return “用户列表”

@user_bp.route(“/login”, methods=[“POST”])
def user_login():
“”"
登录功能,获取用户提交的数据,进行校验,校验成功则返回登录成功
:return:
“”"
# return “登录成功”
# 获取请求体
data = request.json
# 使用请求的数据构建用户的对象
user = User(**data) # 等价于 User(username=data.get(“username”),password=data.get(“password”))
# 查询用户是否存在,存在则进行后续的步骤
user_result_data = db_session.query(User).filter_by(username=user.username).first()
# user_result = User()
if not user_result_data:
# 没查到用户
return {“code”: 405, “msg”: “user is not register”}
if not user_result_data.check_hash_password(data.get(“password”)):
return {“code”: 401, “msg”: “user or password is wrong”}
return {“code”: 0, “msg”: “user login success”}

@user_bp.route(“/register”, methods=[“POST”])
def user_register():
# 获取请求体
data = request.json
# 构建用户对象
user = User(**data)
print(user.password)

if user:
    # 查询用户是否存在,存在则进行后续的步骤
    user_result_data = db_session.query(User).filter_by(username=user.username).first()
    if user_result_data:
        return {"code": 40003, "msg": "user is exist"}
    db_session.add(user)
    db_session.commit()
    user_id = user.id
    if user_id:
        # 存在id,则证明新增成功了
        return {"code": 0, "msg": f"register success"}
    else:
        return {"code": 40001, "msg": "register fail"}
return {"code": 40001, "msg": "register fail"}

#获取所有用户
@user_bp.route(“/”, methods=[“get”])
def get_all_user():
users = db_session.query(User).all()
if users:
return {
“code”: 200,
“msg”: “success”,
“data”: [{
“id”: user.id,
“username”: user.username,
“password”: user.password,
“create_time”: user.create_time
}
for user in users
]
}

return {"code": 404, "msg": "users is not exist", "data": {}}

#删除用户
@user_bp.route(“/delete”, methods=[“delete”])
def delete_user():
user_id = request.json.get(“id”)
user = db_session.query(User).filter_by(id=user_id).first()

if user:
    db_session.delete(user)
    db_session.commit()
    db_session.close()
    return {"code": 200, "msg": "delete success", "data": user.as_dict()}

return {"code": 404, "msg": "user is not exist", "data": {}}

202041404109 胡智
uesr_route.py


from flask import Blueprint

user_bp = Blueprint("用户管理", __name__, url_prefix="/user")


@user_bp.route("/")
def user():
    return "用户列表"


@user_bp.route("/login")
def user_login():
    return "登录成功"


@user_bp.route("/register")
def user_register():
    return "注册成功"


@user_bp.route("/delete")
def delete_user():
    return "删除成功"

server.py

from flask import Flask
from user_route import user_bp

app = Flask(__name__)

# 注册蓝图
app.register_blueprint(user_bp)


if __name__ == '__main__':
    app.run()

test_server.py

import requests


class TestServer:

    def setup_class(self):
        self.base_url = "http://127.0.0.1:5000"


    def test_users_get(self):
        url = f"{self.base_url}/user"
        res = requests.get(url)
        print(res.json())

    def test_user_login(self):
        url = f"{self.base_url}/user/login"
        user_data = {
            "username": "hxin4",
            "password": "123456789"
        }
        res = requests.post(url, json=user_data)
        print(res.json())

    def test_user_register(self):
        url = f"{self.base_url}/user/register"
        user_data = {
            "username": "hxin1",
            "password": "123456"
        }
        res = requests.post(url, json=user_data)
        print(res.json())

    def test_delete_user(self):
        url = f"{self.base_url}/user/delete"
        arg = {"id": 1}
        res = requests.delete(url, params=arg)
        print(res.json())

1