姓名:王冠毅 学号:202041412222
orm_connect.py
from mysqlx import Session
from sqlalchemy import *
from sqlalchemy.orm import declarative_base, sessionmaker
Base = declarative_base()
# 创建引擎,通过这个引擎,可以创建数据库连接,比如:mysql,sqlite...
# engine = create_engine("sqlite:///test.db")
# ## windows 时,有可能会报错,需要加上这个参数
engine =create_engine('sqlite:///test.db',connect_args={'check_same_thread': False})
DBSession = sessionmaker(bind=engine)
db_session: Session = DBSession()
class User(Base):
__tablename__ = "user"
# 属性 -- 对应 -- 表的列属性
id = Column(Integer, primary_key=True)
username = Column(String(80), nullable=False, unique=False)
password = Column(String(120))
def __repr__(self):
return '<User %r>' % self.username
class TestCase(Base):
__tablename__ = "testcase"
id = Column(Integer, primary_key=True)
name = Column(String(100), nullable=False)
desc = Column(String(100))
if __name__ == '__main__':
# 创建表,创建存储在元素数据中所有的表
# Base.metadata.create_all(engine)
# 删除表
# Base.metadata.drop_all(engine)
instance = User(username="yuki", password=1234)
db_session.add(instance)
# id = instance.id
res = db_session.query(User).all()
# 遍历数据,得到想要的字段
for rs in res:
print(rs.username, rs.password)
db_session.commit()
db_session.close()
user_route.py
from flask import Blueprint, request
from orm_connect import db_session,User
user_bp = Blueprint("用户管理", __name__, url_prefix="/user")
@user_bp.route("/", methods=["get"])
def user():
users=db_session.query(User).all()
if users:
return {
"code": 200,
"msg": "success",
"data": [{
"id": user.id,
"username": user.username,
"password": user.password,
}
for user in users
]
}
return {"code": 404, "msg": "users is not exist", "data": {}}
@user_bp.route("/login",methods=["get"])
def user_login():
user_data = request.json
data = request.json
# 使用请求的数据构建用户的对象
user = User(**data)
user_result_data = db_session.query(User).filter_by(username=user.username).first()
if not user_result_data:
# 没查到用户
return {"code": 405, "msg": "用户不存在"}
if user_result_data.password!=user.password:
return {"code": 401, "msg": "用户名或密码错误"}
return {"code": 0, "msg": "登录成功"}
@user_bp.route("/register", methods=["post"])
def user_register():
user_data = request.json
user = User(**user_data)
if db_session.query(User).filter_by(username=user.username).first():
return {"code": 400, "msg": "用户名已存在"}
db_session.add(user)
db_session.commit()
user_id = user.id
db_session.close()
if user_id:
return {"code": 200, "msg": "注册成功"}
else:
return {"code": 400, "msg": "注册失败"}
@user_bp.route("/delete", methods=["post"])
def delete_user():
user_id = request.json.get("id")
user = db_session.query(User).filter_by(id=user_id).first()
if user:
db_session.delete(user)
db_session.commit()
db_session.close()
return {"code": 200, "msg": "删除成功", "data": user_id}
return {"code": 404, "msg": "用户不存在", "data": {}}