测试平台
课堂 ppt
PPT地址
训练营目标
- 测试平台的价值体系
- 测试平台的学习路线
- 测试平台实战练习
问题
- 为什么要做测试平台?为什么我们要学习平台开发的技术?怎么样去做测试平台才能达到价值最大化(个人、公司)
- 平台开发技术要学到什么程度?一定要非常深入吗?
- 平台开发需要掌握什么技术栈,如何学习可以提升投入产出比?
测试平台-功能设计
功能点 | 描述 | 案例 |
---|---|---|
项目管理 | 对于项目的分类,通常以业务线为维度 | 商城、飞书、企业微信 |
测试类型 | 手工测试、自动化测试 | pytest、JUnit、HttpRunner、Postman |
测试用例 | 测试用例的信息 | 标题、步骤、预期结果、实际结果 |
测试套件 | 用例的集合 | 通常是用例从不同维度组合,冒烟测试用例集,某业务线用例集 |
测试装置 | 用例执行前后的准备工作,处理工作 | 用例依赖环境的安装,比如 python requirements |
测试 Runner | 测试用例执行器调度器 | 执行环境的管理:性能测试环境、兼容性环境 |
测试报告 | 测试结果的体现 | allure 报告、手工报告 |
技术-前端
- 界面展示
- 良好的用户体验
- 数据展示
- 相关技术: VUE、React 等
技术-后端
- 把控业务逻辑
- 数据库交互
- 相关技术:Flask、Django、SpringBoot
新建一个空项目。并设置好虚拟环境,安装依赖
pip install flask
pip install sqlalchemy
pip install Flask-Cors==3.0.10
pip install flask-jwt-extended
pip install pymysql
pip install passlib
pip install jenkinsapi
启动模块server.py
"""
__author__ = '霍格沃兹测试开发学社'
__desc__ = '更多测试开发技术探讨,请访问:https://ceshiren.com/t/topic/15860'
"""
from flask import Flask
from flask_cors import CORS
from flask_jwt_extended import JWTManager
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker, Session, declarative_base
# 定义 app
app = Flask(__name__)
# 解决跨域
CORS(app, supports_credentials=True)
# 注册 jwt
jwt = JWTManager(app)
# 配置服务端密钥
app.config["JWT_SECRET_KEY"] = "hogwarts_user_AHKFJJD5"
# 开启数据库跟踪模式
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = True
# SQLAlchemy 设置
Base = declarative_base()
# 定义数据库
db_user = "root"
db_pass = 123456
db_host = "42.192.73.147"
db_port = "3307"
db_name = "tpf"
# 数据库类型+数据库引擎( pip install pymysql)
db_url = f'mysql+pymysql://{db_user}:{db_pass}@{db_host}:{db_port}/{db_name}?charset=utf8mb4'
# 创建引擎,连接到数据库
engine = create_engine(db_url, echo=True)
# 创建session对象
# DBSession = sessionmaker(bind=engine)
# db_session: Session = DBSession()
# 解决session的复用问题 不然会报使用的时候前一个session没有回滚
DBSession = scoped_session(sessionmaker(bind=engine))
@app.before_request
def before_request():
# 在每个请求前执行的代码
# 在请求开始的时候实例化DBsession
DBSession()
@app.teardown_request
def teardown_request(exception=None):
# 在每个请求后执行的代码
if exception:
DBSession.rollback()
# 请求结束之后remove掉DBsession
DBSession.remove()
def register_router():
# 如果出现循环导入,把导包语句放在方法内执行。并且调用此函数
from controller.user_controller import user_router
from controller.testcase_controller import testcase_router
from controller.plan_controller import plan_router
from controller.record_controller import record_router
# 注册蓝图
app.register_blueprint(user_router)
app.register_blueprint(testcase_router)
app.register_blueprint(plan_router)
app.register_blueprint(record_router)
if __name__ == '__main__':
register_router()
app.run(debug=True, port=5000)
实战1
- 根据要求,完成后端架构优化
Model层创建-构建记录表
from datetime import datetime
from passlib.handlers.sha2_crypt import sha256_crypt
from sqlalchemy import Column, Integer, String, DateTime
from server import Base
# 创建用户表
class UserModel(Base):
__tablename__ = "user"
# 用户 ID, 用户的唯 一标识
id = Column(Integer, primary_key=True)
# 用户名, 限定 80个字符 ,不为空,并且唯一
username = Column(String(80), nullable=False, unique=True)
# 密码
password = Column(String(500), nullable=False)
# 创建时间,不需要手动传入,在写入记录的时候自动生成
create_time = Column(DateTime, nullable=True, default=datetime.now().strftime('%Y-%m-%d %H:%M:%S'))
def __init__(self, *args, **kwargs):
# 密码进行自动加密
self.username = kwargs.get('username')
self.password = sha256_crypt.hash(kwargs.get('password'))
def check_hash_password(self, raw_password):
'''
校验密码
:param raw_password: 传入的密码
:return: 校验结果 True or False
'''
return sha256_crypt.verify(raw_password, self.password)
def as_dict(self):
return {
"id": self.id,
"username": self.username,
"password": self.password,
"create_time": str(self.create_time)
}
Model层创建-测试用例表
from sqlalchemy import Column, Integer, String, DateTime
from server import Base
# 创建用例表
class TestcaseModel(Base):
# 表名
__tablename__ = "testcase"
# 用例id,主键,唯一
id = Column(Integer, primary_key=True)
# 用例标题,不为空,并且唯一
name = Column(String(80), nullable=False, unique=True)
# 用例步骤
step = Column(String(120))
# 用例的自动化方法
method = Column(String(120))
# 备注
remark = Column(String(120))
def __repr__(self):
# 数据库的 魔法方法 直观展示数据
'''[<User "xxxx">,<User "yyyy">]'''
return '<Testcase %r>' % self.name
def as_dict(self):
return {
"id": self.id,
"name": self.name,
"step": self.step,
"method": self.method,
"remark": self.remark,
}
Model层创建-测试计划表
from sqlalchemy import Column, Integer, String, DateTime
from sqlalchemy.orm import relationship
from model.testcase_plan_rel import testcase_plan_rel
from server import Base
# 创建测试计划表
class PlanModel(Base):
__tablename__ = "plan"
# 测试计划 ID
id = Column(Integer, primary_key=True)
# 测试计划名称
name = Column(String(80), nullable=False, unique=True)
# 测试用例列表
testcases = relationship("TestcaseModel", secondary=testcase_plan_rel, backref='plan')
def __repr__(self):
# 数据库的 魔法方法 直观展示数据
'''[<User "xxxx">,<User "yyyy">]'''
return '<Plan %r>' % self.name
def as_dict(self):
return {
"id": self.id,
"name": self.name,
"testcases": [testcase.as_dict() for testcase in self.testcases],
}
Model层创建-多对多(测试用例与测试计划)
from sqlalchemy import Table, Column, Integer, ForeignKey
from server import Base
# 中间表
# 中间表表名
# 测试用例的外键
# 计划的外键
testcase_plan_rel = Table(
"testcase_plan_rel", # 表名
Base.metadata, # 表继承的类
# 参数一: 表名_id, 参数二:整型,参数3: 外键字符串('表名.id', 参数4: 是否为主键)
Column('testcase_id', Integer, ForeignKey('testcase.id', ondelete='CASCADE'), primary_key=True),
Column('plan_id', Integer, ForeignKey('plan.id', ondelete='CASCADE'), primary_key=True)
)
Model层创建-构建记录表
from datetime import datetime
from sqlalchemy import Column, Integer, ForeignKey, String, DateTime
from sqlalchemy.orm import relationship
from model.plan_model import PlanModel
from server import Base
# 创建构建记录表
class RecordModel(Base):
__tablename__ = "record"
# 在新系统下,使用带有显式注释的Mypy插件而在注释中不使用Mapped的SQLAlchemy应用程序会出现错误,
# 因为在使用relationship()等结构时,这些注释会被标记为错误。
# 将__allow_unmapped__添加到显式类型的ORM模型一节说明了如何暂时禁止为使用显式注释的遗留ORM模型引发这些错误。
__allow_unmapped__ = True
# 执行记录 ID
id = Column(Integer, primary_key=True)
# 测试计划 ID
plan_id = Column(Integer, ForeignKey('plan.id', ondelete='CASCADE'))
# 测试报告
report = Column(String(80), nullable=False, unique=True)
# 执行时间
create_time = Column(DateTime, nullable=True, default=datetime.now().strftime('%Y-%m-%d %H:%M:%S'))
# 参数1: 关联的另外一个业务表类名, 参数2: 反射别名
plan: PlanModel = relationship("PlanModel", backref="record")
def __repr__(self):
# 数据库的 魔法方法 直观展示数据
'''[<User "xxxx">,<User "yyyy">]'''
return '<Record %r>' % self.id
def as_dict(self):
return {
"id": self.id,
"plan_id": self.plan_id,
"report": self.report,
"create_time": str(self.create_time)
}
Model层创建-初始化数据库
from server import Base, engine
from model.testcase_model import TestcaseModel
from model.plan_model import PlanModel
from model.record_model import RecordModel
from model.user_model import UserModel
if __name__ == '__main__':
# 删除所有数据
# Base.metadata.drop_all(bind=engine)
# 创建表,需要传入创建连接的对象
Base.metadata.create_all(bind=engine)
实战2
完成用户、测试计划、测试用例、构建记录的表创建
Dao层创建-用户
from model.user_model import UserModel
from server import DBSession as db_session
# Dao 负责和数据库的交互
class UserDao:
def get(self, user_id) -> UserModel:
'''
根据 ID 查询用户
'''
return db_session.query(UserModel).filter_by(id=user_id).first()
def get_by_name(self, user_name) -> UserModel:
'''
根据姓名查询用户
'''
return db_session.query(UserModel).filter_by(username=user_name).first()
def create(self, user_model: UserModel):
'''
创建用户
'''
db_session.add(user_model)
db_session.commit()
return user_model.id
Dao层创建-测试用例
from model.testcase_model import TestcaseModel
from server import DBSession as db_session
# Dao 负责和数据库的交互
class TestcaseDao:
def get(self, testcase_id: int) -> TestcaseModel:
"""
添加用例
:param testcase_id: 用例id
:return: TestcaseModel
"""
return db_session.query(TestcaseModel).filter_by(id=testcase_id).first()
def get_by_name(self, testcase_name: str) -> TestcaseModel:
"""
根据测试用例名称查询
"""
return db_session.query(TestcaseModel).filter_by(name=testcase_name).first()
def list(self):
"""
获取用例列表
:return:
"""
return db_session.query(TestcaseModel).all()
def create(self, testcase_model: TestcaseModel) -> int:
"""
创建用例
:param testcase_model: testcase对象
:return:
"""
db_session.add(testcase_model)
db_session.commit()
return testcase_model.id
def delete(self, testcase_id: int) -> int:
"""
删除用例
:param testcase_id: 用例id
:return:
"""
db_session.query(TestcaseModel).filter_by(id=testcase_id).delete()
db_session.commit()
return testcase_id
def update(self, testcase_model: TestcaseModel) -> int:
"""
更新用例
:param testcase_model: testcase对象
:param testcase_id: 用例id
:return:
"""
db_session.query(TestcaseModel).filter_by(id=testcase_model.id).update(testcase_model.as_dict())
db_session.commit()
return testcase_model.id
Dao层创建-测试计划
from model.plan_model import PlanModel
from server import DBSession as db_session
# Dao 负责和数据库的交互
class PlanDao:
def get(self, plan_id) -> PlanModel:
# 根据id返回数据
return db_session.query(PlanModel).filter_by(id=plan_id).first()
def get_by_name(self, name) -> PlanModel:
# 根据name返回数据
return db_session.query(PlanModel).filter_by(name=name).first()
def list(self):
# 返回所有数据
return db_session.query(PlanModel).all()
def create(self, plan_do: PlanModel):
# 新增数据
db_session.add(plan_do)
db_session.commit()
return plan_do.id
def delete(self, plan_id):
# 删除操作
db_session.query(PlanModel).filter_by(id=plan_id).delete()
db_session.commit()
return plan_id
Dao层创建-构建记录
from model.record_model import RecordModel
from server import DBSession as db_session
# Dao 负责和数据库的交互
class RecordDao:
def list_by_plan_id(self, plan_id):
# 根据id返回数据
return db_session.query(RecordModel).filter_by(plan_id=plan_id).all()
def list(self):
# 返回所有数据
return db_session.query(RecordModel).all()
def create(self, build_do: RecordModel):
# 新增数据
db_session.add(build_do)
db_session.commit()
return build_do.id
实战3
完成用户、测试计划、测试用例、构建记录的Dao层代码
Server层创建-用户
from datetime import timedelta
from flask_jwt_extended import create_access_token
from dao.user_dao import UserDao
from model.user_model import UserModel
from server import app, jwt
user_dao = UserDao()
class UserService:
def get(self, user_id) -> UserModel:
'''
通过 ID 查询用户
'''
return user_dao.get(user_id)
def get_by_name(self, user_name):
'''
通过姓名查询用户
'''
return user_dao.get_by_name(user_name)
def create(self, user_model: UserModel) -> int:
'''
创建用户
'''
# 新增前先查询用户是否存在
user = user_dao.get_by_name(user_model.username)
if not user:
# 没有重名,创建用户
user_dao.create(user_model)
return user_model.id
def create_access_token(self, user_model):
'''
根据用户信息生成 token
'''
# 使用 jwt 生成 token
with app.app_context():
# token = create_access_token(identity=user_model.username, expires_delta=timedelta(days=1))
# identity 是生成 token 的依据,需要 json 格式的可序列化数据
# expires_delta 可以配置 token 的过期时间
token = create_access_token(
identity=user_model.as_dict(),
expires_delta=timedelta(days=1)
)
return token
# 配置回调函数中验证数据条件
@jwt.user_lookup_loader
def user_lookup_callback(self, _jwt_header, jwt_data):
# 因为 sub 字段下是请求 body 的内容,所以从 sub 中获取用户名用来查库
# 获取 username
username = jwt_data["sub"]["username"]
# 返回通过 username 查询用户的结果
return self.get_by_name(username)
Server层创建-测试用例
from typing import List
from dao.testcase_dao import TestcaseDao
from model.testcase_model import TestcaseModel
testcase_dao = TestcaseDao()
class TestcaseService:
def create(self, testcase_model: TestcaseModel) -> int:
"""
创建用例
"""
result = testcase_dao.get_by_name(testcase_model.name)
if not result:
return testcase_dao.create(testcase_model)
def update(self, testcase_model: TestcaseModel) -> int:
"""
更新用例
"""
if testcase_dao.get_by_name(testcase_model.name):
testcase_dao.update(testcase_model)
return testcase_model.id
def delete(self, testcase_id: int) -> int:
"""
删除用例
"""
if self.get(testcase_id):
return testcase_dao.delete(testcase_id)
def list(self) -> List[TestcaseModel]:
"""
获取全部用例
"""
return testcase_dao.list()
def get(self, testcase_id: int) -> TestcaseModel:
"""
获取某个测试用例
"""
return testcase_dao.get(testcase_id)
def get_by_name(self, testcase_name) -> TestcaseModel:
"""
根据名称获取某个测试用例
"""
return testcase_dao.get_by_name(testcase_name=testcase_name)
Server层创建-测试计划
from dao.plan_dao import PlanDao
from model.plan_model import PlanModel
from service.testcase_service import TestcaseService
plan_dao = PlanDao()
testcase_service = TestcaseService()
class PlanService:
def get(self, plan_id):
'''
通过 ID 查询测试计划
'''
return plan_dao.get(plan_id)
def get_by_name(self, plan_name):
'''
通过名称查询测试计划
'''
return plan_dao.get_by_name(plan_name)
def list(self):
'''
返回所有测试计划
'''
return plan_dao.list()
def create(self, plan_model: PlanModel, testcase_id_list):
'''
创建测试计划
'''
# 创建之前先通过名称查询计划是否已经存在
plan = self.get_by_name(plan_model.name)
print(f"名称 {plan_model.name} 的查询结果为 {plan}")
# 不存在则新增
if not plan:
# 根据测试用例 ID 查询获取测试用例对象列表
testcase_list = [testcase_service.get(testcase_id) for testcase_id in testcase_id_list]
# 构建测试计划对象
plan_model.testcases = testcase_list
# 创建测试计划
return plan_dao.create(plan_model)
# 存在则返回 False
return False
def delete(self, plan_id):
# 删除操作
# 删除之前先查询数据是否存在,存在则进行删除,不存在则返回false
plan = self.get(plan_id)
if not plan:
return False
else:
return plan_dao.delete(plan_id)
测试用例调度
from jenkinsapi.jenkins import Jenkins
class JenkinsUtils:
# Jenkins 服务
BASE_URL = "http://42.192.73.147:7080/"
# Jenkins 服务对应的用户名
USERNAME = "admin"
# Jenkins 服务对应的token
PASSWORD = "1146976718c30d0b1c4f571ef074904060"
JOB = "tpf"
@classmethod
def invoke(cls, invoke_params):
"""
执行构建任务
:return:
"""
# 获取 Jenkins 对象
jenkins_hogwarts = Jenkins(cls.BASE_URL, cls.USERNAME, cls.PASSWORD)
# 获取 Jenkins 的 job 对象
job = jenkins_hogwarts.get_job(cls.JOB)
# 构建 hogwarts job,传入的值必须是字典,key 对应 jenkins 设置的参数名
# job.invoke(build_params={
# "step": "xx.py::TestXX::test_xx",
# "methods": "pytest"
# })
# job.invoke(build_params=invoke_params)
job.invoke()
# 获取job 最后一次完成构建的编号
last_build_number = job.get_last_buildnumber() + 1
# 执行方式为:pytest 用例名称 指定报告生成地址
# 生成报告格式为
# http://42.192.73.147:7080/job/tpf/22/allure/
# 获取本次构建的报告地址
report = f"{cls.BASE_URL}job/{cls.JOB}/{last_build_number}/allure/"
return report
Server层创建-构建记录
from dao.record_dao import RecordDao
from model.record_model import RecordModel
from service.plan_service import PlanService
from utils.jenkins_utils import JenkinsUtils
record_dao = RecordDao()
plan_service = PlanService()
class RecordService:
def list_by_plan(self, plan_id):
'''
根据测试计划 ID 获取对应的测试记录
'''
return record_dao.list_by_plan_id(plan_id)
def list(self):
'''
获取所有执行记录
'''
return record_dao.list()
def create(self, plan_id):
'''
新增测试记录
'''
# 新增之前先查询要执行的测试计划是否存在
plan = plan_service.get(plan_id)
# 不存在则返回 False
if not plan:
return False
# 存在则创建测试记录
else:
# 执行命令格式
# pytest test_demo1 test_demo2 test_demo3
# 获取测试记录中包含的测试用例的执行方式
method_list = [testcase.method for testcase in plan.testcases]
# 为测试执行方式列表去重
methods = set(method_list)
print(f"去重后的 method 列表为 {methods}")
# 获取测试记录中包含的测试用例 step 中包含的 nodeid
# test_demo1 ==> test_add_params.py
nodeid_list = [testcase.step for testcase in plan.testcases]
nodeid = " ".join(nodeid_list)
print(f"获取到的用例 id 列表为 {nodeid}")
# 执行测试用例
invoke_params = {
"methods": 'pytest',
"steps": nodeid
}
report = JenkinsUtils.invoke(invoke_params)
# 构造测试记录对象
record_model = RecordModel(plan_id=plan_id, report=report)
# 新增测试记录
return record_dao.create(record_model)
实战4
完成用户、测试计划、测试用例、构建记录的Service层代码
单元测试用例
- test_user.py
from model.user_model import UserModel
from service.user_service import UserService
user1 = UserModel(username='admin1', password='admin1')
user_service = UserService()
def test_register():
id = user_service.create(user1)
assert id
print(id)
def test_token():
token = user_service.create_access_token(user1)
assert token
print(token)
- test_testcase.py
from model.testcase_model import TestcaseModel
from service.testcase_service import TestcaseService
testcase1 = TestcaseModel(name='name1', step='step1', method='method1', remark='1')
testcase2 = TestcaseModel(name='name2', step='step2', method='method2', remark='2')
testcase3 = TestcaseModel(id=2, name='name2', step='step22', method='method22', remark='22')
testcase_service = TestcaseService()
def test_add():
id = testcase_service.create(testcase2)
assert id
print(id)
def test_get():
testcase = testcase_service.get_by_name('name1').as_dict()
testcases = testcase_service.list()
print(testcase)
print([testcase.as_dict() for testcase in testcases])
def test_update():
testcase = testcase_service.update(testcase3)
print(testcase)
def test_delete():
testcase = testcase_service.delete(2)
print(testcase)
- test_plan.py
from model.plan_model import PlanModel
from service.plan_service import PlanService
plan1 = PlanModel(name='name1')
plan2 = PlanModel(name='name2')
plan_service = PlanService()
def test_add():
id = plan_service.create(plan1, [1, 3])
assert id
print(id)
def test_get():
plan = plan_service.get_by_name('name1').as_dict()
plans = plan_service.list()
print(plan)
print([plan.as_dict() for plan in plans])
def test_delete():
plan = plan_service.delete(1)
print(plan)
- test_record.py
from service.record_service import RecordService
record_service = RecordService()
def test_add():
id = record_service.create(2)
assert id
print(id.as_dict())
Controller层创建-用户
from flask import Blueprint, request
from model.user_model import UserModel
from service.user_service import UserService
user_router = Blueprint(name="user", import_name=__name__)
user_service = UserService()
@user_router.route("/user/register", methods=["POST"])
def user_register():
"""
用户的注册
{
"username":xxx,
"password":xxx,
}
:return:
"""
# 获取请求体
data = request.json
# 构建用户对象
user = UserModel(**data)
print(user.password)
if user:
user_id = user_service.create(user)
if user_id:
# 存在id,则证明新增成功了
return {"code": 0, "msg": f"register success"}
else:
return {"code": 40001, "msg": "register fail"}
return {"code": 40001, "msg": "register fail"}
@user_router.route("/user/login", methods=["POST"])
def user_login():
"""
用户的登录
{
"username":xxx,
"password":xxx,
}
:return:
"""
# 获取请求体
data = request.json
# 构建用户对象
user = UserModel(**data)
# 通过用户名查找用户是否存在
user_result = user_service.get_by_name(user.username)
print(user_result)
print(data.get("password"))
# 如果用户不存在,说明用户还未注册
if not user_result:
return {"code": 40013, "msg": "user is not register"}
# 如果密码不匹配,说明密码错误
if not user_result.check_hash_password(data.get("password")):
return {"code": 40014, "msg": "password is wrong"}
# 用户存在,且密码匹配,则生成 token
access_token = user_service.create_access_token(user_result)
print(access_token)
if access_token:
# 存在access_token,则证明登录成功了
return {"code": 0, "msg": "login success", "data": {"token": access_token}}
else:
return {"code": 40021, "msg": "login fail"}
Controller层创建-测试用例
from flask import Blueprint, request
from flask_jwt_extended import jwt_required
from model.testcase_model import TestcaseModel
from service.testcase_service import TestcaseService
# 声明蓝图
testcase_router = Blueprint(name="testcase", import_name=__name__)
testcase_service = TestcaseService()
@testcase_router.route("/testcase/get")
@jwt_required()
def testcase_get():
"""
测试用例的查找
{id: 1}
:return:
"""
# 获取请求参数
data = request.args
case_id = data.get("id")
# 如果有id则进行数据查找
if case_id:
testcase = testcase_service.get(int(case_id))
# 如果查询到结果
if testcase:
datas = [testcase.as_dict()]
return {"code": 0, "msg": "get data success", "data": datas}
else:
# 如果没有数据,则返回数据已存在
return {"code": 40004, "msg": "data is not exists"}
else:
# 如果没有id,则返回全部数据
datas = [testcase.as_dict() for testcase in testcase_service.list()]
return {"code": 0, "msg": "get data success", "data": datas}
@testcase_router.route("/testcase/post", methods=["POST"])
@jwt_required()
def testcase_post():
"""
测试用例的新增
{
"name":xxx,
"step":xxx,
"method":xxx,
"remark":xxx,
}
:return:
"""
# 获取请求体
data = request.json
# 构造测试用例对象
testcase = TestcaseModel(**data)
# 新增用例
case_id = testcase_service.create(testcase)
if case_id:
# 存在测试用例id, 则证明用例新增成功了
return {"code": 0, "msg": "add testcase success", "data": {"testcase_id": case_id}}
else:
return {"code": 40001, "msg": "testcase is exists"}
@testcase_router.route("/testcase/put", methods=["POST"])
@jwt_required()
def testcase_put():
"""
测试用例的修改
{
"id":xxx,
"name":xxx,
"step":xxx,
"method":xxx,
"remark":xxx,
}
:return:
"""
# 获取请求体
data = request.json
# 构造测试用例对象
testcase = TestcaseModel(**data)
# 修改测试用例
case_id = testcase_service.update(testcase)
if case_id:
# 存在测试用例id, 则证明用例新增成功了
return {"code": 0, "msg": "update testcase success", "data": {"testcase_id": case_id}}
else:
return {"code": 40001, "msg": "update testcas fail"}
@testcase_router.route("/testcase/delete", methods=["POST"])
@jwt_required()
def testcase_delete():
"""
测试用例的删除
{"id": 1}
:return:
"""
# 获取请求体
data = request.json
delete_case_id = data.get("id")
if delete_case_id:
case_id = testcase_service.delete(delete_case_id)
if case_id:
# 存在测试用例id,则证明用例修改成功了
return {"code": 0, "msg": f"delete testcase success", "data": {"testcase_id": case_id}}
else:
return {"code": 40001, "msg": "delete case fail"}
Controller层创建-测试计划
from flask import Blueprint, request
from flask_jwt_extended import jwt_required
from model.plan_model import PlanModel
from service.plan_service import PlanService
# 声明蓝图
plan_router = Blueprint(name="plan", import_name=__name__)
plan_service = PlanService()
@plan_router.route("/plan/get")
@jwt_required()
def plan_get():
"""
计划的查找
{id: 1}
:return:
"""
# 获取请求参数
id = request.args.get("id")
# 如果请求参数中存在 id
if id:
# 根据 id 查询测试计划
data = plan_service.get(id)
# 如果测试计划存在
if data:
# 将查询结果返回
datas = [data.as_dict()]
return {"code": 0, "msg": "get plan success", "data": datas}
else:
# 如果测试计划不存在,返回提示信息
return {"code": 40004, "msg": "plan is not exists"}
else:
# 如果参数中不包含 id,则返回全部测试计划
datas = [p.as_dict() for p in plan_service.list()]
return {"code": 0, "msg": "get plans success", "data": datas}
@plan_router.route("/plan/post", methods=["POST"])
@jwt_required()
def plan_post():
"""
计划的新增
{
"name":xxx,
"testcase_ids":[1,2,3,4],
}
:return:
"""
data = request.json
# data -> {name=1, testcase_ids=[2,3,4,5,6]}
testcase_id_list = data.pop("testcase_ids")
plan = PlanModel(**data)
# 新增
plan_id = plan_service.create(plan, testcase_id_list)
if plan_id:
# 存在id,则证明新增成功了
return {"code": 0, "msg": f"add plan success", "data": {"plan_id": plan_id}}
else:
return {"code": 40001, "msg": "plan is exists"}
@plan_router.route("/plan/delete", methods=["POST"])
@jwt_required()
def plan_delete():
"""
测试用例的删除
{"id": 1}
:return:
"""
# 获取请求体
data = request.json
# 删除
plan_id = plan_service.delete(data.get("id"))
if plan_id:
# 存在测试用例id,则证明用例修改成功了
return {"code": 0, "msg": f"plan delete success", "data": {"plan_id": plan_id}}
else:
return {"code": 40001, "msg": "delete plan fail"}
Controller层创建-构建记录
from flask import Blueprint, request
from flask_jwt_extended import jwt_required
from model.record_model import RecordModel
from service.record_service import RecordService
# 声明蓝图
record_router = Blueprint(name="record", import_name=__name__)
record_service = RecordService()
@record_router.route("/record/get")
@jwt_required()
def record_get():
"""
记录的查找
{id: 1}
:return:
"""
# 获取请求参数
plan_id = request.args.get("plan_id")
if plan_id:
# 如有有id则进行数据查找
data = record_service.list_by_plan(plan_id)
if data:
# 如果查到数据,则返回给前端
datas = [_.as_dict() for _ in data]
return {"code": 0, "msg": "get record success", "data": datas}
else:
# 如果没有数据,则返回数据已存在
return {"code": 40004, "msg": "record is not exists"}
else:
# 如果没有id,则返回全部数据
datas = [build.as_dict() for build in record_service.list()]
return {"code": 0, "msg": "get records success", "data": datas}
@record_router.route("/record/post", methods=["POST"])
@jwt_required()
def record_post():
"""
记录的新增
{
"plan_id":xxx,
}
:return:
"""
data = request.json
# 新增
record_id = record_service.create(data.get("plan_id")).id
if record_id:
# 存在id,则证明新增成功了
return {"code": 0, "msg": f"record add success", "data": {"record_id": record_id}}
else:
return {"code": 40001, "msg": "record is exists"}
实战5
完成用户、测试计划、测试用例、构建记录的Controller层代码
总结
- 测试平台架构
- 模型定义
- 分层开发
- 用户鉴权