ppt
安装
- 官网:
https://docs.sqlalchemy.org/en/14/
- 安装文档:
https://docs.sqlalchemy.org/en/14/intro.html#installation
- 声明 Models:
https://docs.sqlalchemy.org/en/14/orm/index.html
- SQLAlchemy 是 python 中最有名的 ORM 框架
- 安装命令:
pip install SQLAlchemy
sqlite 安装
- sqlite 下载地址:SQLite Download Page
- sqlite 安装教程:Window 10下 Sqlite安装教程
- sqlite 在线工具:GitHub - sqlitebrowser/sqlitebrowser: Official home of the DB Browser for SQLite (DB4S) project. Previously known as "SQLite Database Browser" and "Database Browser for SQLite". Website at:
orm的代码例子
from sqlalchemy import *
from sqlalchemy.orm import declarative_base, sessionmaker, Session
# 先创建基类,使用declarative_base()方法声明一个基类
# 创建表的时候需要继承这个基类对象
Base = declarative_base()
# 创建引擎,通过这个引擎,可以创建数据库连接,比如:mysql,sqlite...
# engine = create_engine("sqlite:///test.db")
## windows 时,有可能会报错,需要加上这个参数
engine = create_engine('sqlite:///test.db', connect_args={'check_same_thread': False})
# 连接的格式 dialect+driver://username:password@host:port/database
# sqlite: sqlite:///test.db
# mysql+pymysql://username:password@host:port/database
# 定义db_session对象,可以实现对数据库的增删改查,以及表的创建和删除
DBSession = sessionmaker(bind=engine)
db_session: Session = DBSession()
# 类--对应--表
class User(Base):
__tablename__ = "user"
# 属性 -- 对应 -- 表的列属性
id = Column(Integer, primary_key=True)
username = Column(String(80), nullable=False, unique=False)
password = Column(String(120))
def __repr__(self):
return '<User %r>' % self.username
class TestCase(Base):
__tablename__ = "testcase"
id = Column(Integer, primary_key=True)
name = Column(String(100), nullable=False)
desc = Column(String(100))
if __name__ == '__main__':
# 创建表,创建存储在元素数据中所有的表
Base.metadata.create_all(engine)
# 删除表
# Base.metadata.drop_all(engine)
case_route
# # 导入 Flask 的模块
from flask import Blueprint, request
# from data.data import cases
from orm_study.orm_connect import db_session, TestCase
case_bp = Blueprint("用例管理", __name__, url_prefix="/testcase")
# 查询用例
@case_bp.route("/", methods=["GET"])
def get_case():
args = request.args
_id = args.get("id")
datas = []
if _id:
case = db_session.query(TestCase).filter_by(id=_id).first()
# for case in my_cases:
# if int(_id) == case.get("id"):
# datas.append(case)
print(f"查询到的数据为=====>{case}")
if case:
datas.append({
"id": case.id,
"name": case.name,
"desc": case.desc,
})
else:
# datas = my_cases
cases = db_session.query(TestCase).all()
datas = [{"id": case.id, "name": case.name, "desc": case.desc} for case in cases]
# # 具体的实现
# for case in cases:
# dict_a = {}
# dict_a["id"] = case.id
# dict_a["name"] = case.name
# dict_a["desc"] = case.desc
# datas.append(dict_a)
return {"code": 0, "msg": "success", "data": datas}
#
# 新增测试用例
@case_bp.route("/add", methods=["POST"])
def add_case():
case_data = request.json
print(f"接收到的参数<===={case_data}")
# _id = int(case_data.get("id"))
name = case_data.get("name")
desc = case_data.get("desc")
# new_case = {"id": _id, "name": name, "desc": desc}
instance = TestCase(name=name, desc=desc)
# 判断重复
# for case in my_cases:
# if case.get("id") == _id or case.get("name") == name:
# return {"code": 40002, "msg": "case is exists"}
# my_cases.append(new_case)
# print(f"当前的全部用例: {my_cases}")
# return {"code": 0, "msg": "add success"}
if db_session.query(TestCase).filter_by(name=name).first():
return {"code": 40002, "msg": "case is exists"}
db_session.add(instance)
db_session.commit()
_id = instance.id
db_session.close()
if _id:
return {"code": 0, "msg": "add success", "data": {"case_id": _id}}
else:
return {"code": 40001, "msg": "add failed"}
#
#
# 删除测试用例
@case_bp.route("/delete", methods=["DELETE"])
def delete_case():
_id = int(request.args.get("id"))
exist = db_session.query(TestCase).filter_by(id=_id).first()
print(f"exist====> {exist}")
if exist:
db_session.query(TestCase).filter_by(id=_id).delete()
db_session.commit()
db_session.close()
return {"code": 0, "msg": "delete success"}
else:
return {"code": 40003, "msg": "case is not exists"}
# 更新测试用例
@case_bp.route("/update", methods=["PUT"])
def update_case():
case_data = request.json
_id = case_data.get("id")
name = case_data.get("name")
desc = case_data.get("desc")
# 如果case不存在,返回修改失败
if db_session.query(TestCase).filter_by(id=_id).first():
# 查到了case之后 进行更新操作
new_case_data = {
"name": name,
"desc": desc,
}
db_session.query(TestCase).filter_by(id=_id).update(new_case_data)
db_session.commit()
db_session.close()
return {"code": 0, "msg": "update success"}
return {"code": 40003, "msg": "case is not exists"}
接口的测试用例
import time
import pytest
import requests
class TestCaseServer:
def setup_class(self):
self.base_url = "http://127.0.0.1:5000/testcase"
def add_case(self):
data = {
"name": f"用例{time.time()}",
"desc": f"描述{time.time()}"
}
res = requests.post(url=f"{self.base_url}/add", json=data)
return res.json().get("data", {}).get("case_id")
def test_add(self):
data = {
"name": f"用例{time.time()}",
"desc": f"描述{time.time()}"
}
res = requests.post(url=f"{self.base_url}/add", json=data)
print(res.json())
assert res.json().get("code") == 0
def test_search_all(self):
res = requests.get(url=self.base_url)
print(res.json())
assert res.json().get("code") == 0
@pytest.mark.parametrize("_id,exp_length", [(3, 1), (1, 0)])
def test_search_one(self, _id, exp_length):
res = requests.get(url=self.base_url, params={"id": _id})
print(res.json())
assert res.json().get("code") == 0
length = len(res.json().get("data"))
assert length == exp_length
def test_delete(self):
case_id = self.add_case()
print(case_id)
assert case_id
url = f"{self.base_url}/delete"
res = requests.delete(url, params={"id": case_id})
print(res.json())
assert res.json().get("code") == 0
def test_update(self):
case_id = self.add_case()
print(case_id)
assert case_id
url = f"{self.base_url}/update"
case = requests.get(url=self.base_url, params={"id": case_id}).json().get("data")[0]
print(case)
case.update(
{
"name": case.get("name") + "_update",
"desc": case.get("desc") + "_update",
}
)
print(case)
res = requests.put(url=url, json=case)
print(res.json())
assert res.json().get("code") == 0
用户的登录
def user_login():
"""
登录功能,获取用户提交的数据,进行校验,校验成功则返回登录成功
:return:
"""
# return "登录成功"
# 获取请求体
data = request.json
# 使用请求的数据构建用户的对象
user = User(**data) # 等价于 User(username=data.get("username"),password=data.get("password"))
# 查询用户是否存在,存在则进行后续的步骤
user_result_data = db_session.query(User).filter_by(username=user.username).first()
# user_result = User()
if not user_result_data:
# 没查到用户
return {"code": 405, "msg": "user is not register"}
if not user_result_data.check_hash_password(data.get("password")):
return {"code": 401, "msg": "user or password is wrong"}
return {"code": 0, "msg": "user login success"}
平台的model设计
from datetime import datetime
from passlib.handlers.sha2_crypt import sha256_crypt
from sqlalchemy import *
from sqlalchemy.orm import declarative_base, sessionmaker, Session
# 先创建基类,使用declarative_base()方法声明一个基类
# 创建表的时候需要继承这个基类对象
Base = declarative_base()
# 创建引擎,通过这个引擎,可以创建数据库连接,比如:mysql,sqlite...
# engine = create_engine("sqlite:///test.db")
## windows 时,有可能会报错,需要加上这个参数
# engine = create_engine('sqlite:///test.db', connect_args={'check_same_thread': False}, echo=True)
db_host = "127.0.0.1"
db_port = "3306"
db_name = "test_platform"
db_user = "root"
db_psw = "hogwarts"
# 拼接url
db_url = f"mysql+pymysql://{db_user}:{db_psw}@{db_host}:{db_port}/{db_name}"
engine = create_engine(db_url, echo=True)
# 连接的格式 dialect+driver://username:password@host:port/database
# sqlite: sqlite:///test.db
# mysql+pymysql://username:password@host:port/database
# 定义db_session对象,可以实现对数据库的增删改查,以及表的创建和删除
DBSession = sessionmaker(bind=engine)
db_session: Session = DBSession()
# 类--对应--表
class User(Base):
__tablename__ = "user"
# 属性 -- 对应 -- 表的列属性
id = Column(Integer, primary_key=True)
username = Column(String(80), nullable=False, unique=False)
password = Column(String(120), nullable=False)
create_time = Column(DateTime, nullable=True, default=datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
def __init__(self, *args, **kwargs):
self.username = kwargs.get("username")
self.password = sha256_crypt.hash(kwargs.get("password"))
def check_hash_password(self, raw_password):
return sha256_crypt.verify(raw_password, self.password)
def __repr__(self):
return '<User %r>' % self.username
def as_dict(self):
return {
"id": self.id,
"username": self.username,
"password": self.password,
"create_time": str(self.create_time),
}
class TestCase(Base):
__tablename__ = "testcase"
id = Column(Integer, primary_key=True)
name = Column(String(100), nullable=False)
desc = Column(String(100))
def __repr__(self):
return '<Case %r>' % self.name
def as_dict(self):
return {
"id": self.id,
"name": self.name,
"desc": self.desc,
}
if __name__ == '__main__':
# 创建表,创建存储在元素数据中所有的表
Base.metadata.create_all(engine)
# 删除表
# Base.metadata.drop_all(engine)
# 数据的增加操作 create C
# 增 - 新增单条数据
# instance = TestCase(name="用例2", desc="description1")
# db_session.add(instance)
# id = instance.id
# # 批量添加操作
# add_case1 = TestCase(name="用例3", desc="描述3")
# add_case2 = TestCase(name="用例4", desc="描述4")
# ## 方式1 依次添加到session
# db_session.add(add_case1)
# db_session.add(add_case2)
# ## 方式2 一次性添加多条case
# add_case3 = TestCase(name="用例5", desc="描述5")
# add_case4 = TestCase(name="用例6", desc="描述6")
# db_session.add_all([add_case3, add_case4])
# db_session.commit()
# db_session.close()
# 查询操作 R
# 格式 db_session.query("类").all()/.first()
# res = db_session.query(TestCase).all()
# for rs in res:
# print(rs)
# res = db_session.query(TestCase).first()
# print(res)
## 条件查询 db_session.query("类").filter_by(条件).all()/.first()
# data = db_session.query(TestCase).filter_by(id=3).first()
# data = db_session.query(TestCase).filter_by(id=3).all()
# print(data)
# data = db_session.query(TestCase).filter_by(id=3).filter_by(name="用例3").first()
# print(data)
# 数据的修改 U
# 第一种方式,更新某个字段
# case = db_session.query(TestCase).filter_by(id=1).first()
# case.desc = "描述信息1111111"
# # 提交
# db_session.commit()
# # 关闭session
# db_session.close()
#
# # 第二种方式,直接调用update方法更新行数据
# db_session.query(TestCase).filter_by(id=1).update({"name": "用例1111111"})
# # 提交
# db_session.commit()
# # 关闭session
# db_session.close()
# 删除 D
# 方式一:
# # 查询数据
# case = db_session.query(TestCase).filter_by(id=1).first()
# # 删除操作
# db_session.delete(case)
# # 提交
# db_session.commit()
# # 关闭session
# db_session.close()
#
# # 方式二:
# # 查询结果直接删除操作
# db_session.query(TestCase).filter_by(id=2).delete()
# # 提交
# db_session.commit()
# # 关闭session
# db_session.close()
目前的代码
dgut_backend.zip (174.2 KB)