为啥delete接口调用成功了,却删除不了数据库对应数据,
而不用delete方法就能成功删除数据库数据
把你完整的代码上传到gitee上 贴出来我看看
mysqldemo.py
# -*- coding: utf-8 -*-
from flask_sqlalchemy import SQLAlchemy
from flask import Flask
import datetime
from mysql import connector
# 实例化flask对象
app = Flask(__name__)
username = 'root'
pwd = '123456'
ip = '127.0.0.1'
port = '3306'
database = 'strategy_test'
app.config['SQLALCHEMY_DATABASE_URI'] = f'mysql+mysqlconnector://{username}:{pwd}@{ip}:{port}/{database}'
# 解决warning问题
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = True
db = SQLAlchemy(app)
class Testcase(db.Model):
"""
一个类表示一个表
"""
# 设定表名
__tablename__ = 'testcaselist'
# 每个类属性,表示这个表的表头数据
ID = db.Column(db.Integer, primary_key=True)
CaseID = db.Column(db.String(500), unique=True)
CaseName = db.Column(db.String(500))
Model = db.Column(db.String(500))
UiAppID = db.Column(db.String(500))
SetUp = db.Column(db.String(500))
TearDown = db.Column(db.String(500))
OperatingStep = db.Column(db.String(500))
ExpectResult = db.Column(db.String(500))
Tcase = db.Column(db.String(500))
All = db.Column(db.String(500))
GitUrl = db.Column(db.String(500))
Command = db.Column(db.String(500))
Remark = db.Column(db.String(500))
CreateTime = db.Column(db.DateTime, default = datetime.datetime.now)
def __repr__(self):
return '<User %r>' % self.CaseName
def as_dict(self):
data = {"id": self.ID,
"CaseID": self.CaseID,
"CaseName": self.CaseName,
"Model": self.Model,
"uiAppID": self.UiAppID,
"SetUp": self.SetUp,
"TearDown": self.TearDown,
"OperatingStep": self.OperatingStep,
"ExpectResult": self.ExpectResult,
"Tcase": self.Tcase,
"All": self.All,
"GitUrl": self.GitUrl,
"Command": self.Command,
"Remark": self.Remark}
return data
if __name__ == '__main__':
# 创建
# db.create_all()
# 增删改查
# 增加数据信息
# user1 = Testcase(ID=12,
# CaseID="case012",
# CaseName="baimingdan12",
# Model="model",
# UiAppID="300001",
# SetUp="getckv1",
# TearDown="setckv1",
# OperatingStep="{'FileName':'req.txt','uiappid':'300001'}",
# ExpectResult="{'Comm.int32_uiappid':'300001','Comm.int32_icm':'0'}",
# Tcase="tcase",
# All = "all",
# GitUrl = "www.baidu.com",
# Command="python3 main.py /testcase/300001/300001.xsl",
# Remark="remark12")
# user2 = Testcase(ID=2,
# CaseID="case002",
# CaseName="白名单2",
# Model = "媒体类",
# UiAppID = "300001",
# SetUp="getckv2",
# TearDown="setckv2",
# OperatingStep="{'FileName':'req.txt','uiappid':'300002'}",
# ExpectResult="{'Comm.int32_uiappid':'300002','Comm.int32_icm':'0'}",
# Tcase="tcase",
# All = "all",
# GitUrl="www.baidu.com",
# Command="python3 main.py /testcase/300002/300002.xsl",
# Remark="备注2")
# db.session.add(user1)
# db.session.add(user2)
# case = Testcase.query.filter_by(ID=20).delete()
case = Testcase.query.filter_by(ID=5).update({"ID":"5","Remark":"9999999"})
db.session.commit()
# case = Testcase.query.filter_by(ID=1).first()
# print(case)
dataapi.py
import json
from flask import Flask, request
from flask_restful import Resource, Api
from flask_sqlalchemy import SQLAlchemy
from mysqldemo import Testcase
from flask_cors import CORS
from dataencoder import DateEncoder
# todo:
# 获取请求参数
# 获取请求体
# 实例化flask对象
app = Flask(__name__)
# 解决跨域问题
CORS(app, supports_credentials=True)
username = 'root'
pwd = '123456'
ip = '127.0.0.1'
port = '3306'
database = 'strategy_test'
app.config['SQLALCHEMY_DATABASE_URI'] = f'mysql+pymysql://{username}:{pwd}@{ip}:{port}/{database}?charset=utf8'
# 解决warning问题
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = True
db = SQLAlchemy(app)
api = Api(app)
class FastTestService(Resource):
"""
测试用例服务
"""
# 方法名对应app.route中的methods
# 查询接口
# @app.route('/testcase', methods = ['get'])
def get(self):
case_id = request.args.get("ID")
if case_id:
case_data = Testcase.query.filter_by(ID=case_id).first()
app.logger.info(case_data)
# data = [case_data.as_dict()]
data = {
"ID": case_data.ID,
"CreateTime": json.dumps(case_data.CreateTime, cls=DateEncoder),
"CaseID": case_data.CaseID,
"CaseName": case_data.CaseName,
"Model": case_data.Model,
"UiAppID": case_data.UiAppID,
"SetUp": case_data.SetUp,
"TearDown": case_data.TearDown,
"OperatingStep": case_data.OperatingStep,
"ExpectResult": case_data.ExpectResult,
"Tcase": case_data.Tcase,
"All": case_data.All,
"GitUrl": case_data.GitUrl,
"Command": case_data.Command,
"Remark": case_data.Remark}
else:
case_data = Testcase.query.all()
# data = [i.as_dict() for i in case_data]
data = [{
"ID": i.ID,
"CreateTime": json.dumps(i.CreateTime, cls=DateEncoder),
"CaseID": i.CaseID,
"CaseName": i.CaseName,
"Model": i.Model,
"UiAppID": i.UiAppID,
"SetUp": i.SetUp,
"TearDown": i.TearDown,
"OperatingStep": i.OperatingStep,
"ExpectResult": i.ExpectResult,
"Tcase": i.Tcase,
"GitUrl": i.GitUrl,
"Command": i.Command,
"Remark": i.Remark
}for i in case_data]
app.logger.info(data)
return data
# 添加接口
# @app.route('/testcase', methods = ['post'])
def post(self):
case_data = request.json
app.logger.info(case_data)
testcase = Testcase(**case_data)
app.logger.info(testcase)
db.session.add(testcase)
db.session.commit()
return {'error': 0, 'msg': 'postsuccess'}
# 修改接口
# @app.route('/testcase', methods = ['put'])
def put(self):
case_id = request.json.get('ID')
app.logger.info(case_id)
app.logger.info(request.json)
case = Testcase.query.filter_by(ID=case_id).update(request.json)
db.session.commit()
app.logger.info(case)
return {'error': 0, 'msg': {'id': case_id}}
# 删除接口
# @app.route('/testcase', methods = ['delete'])
def delete(self):
case_id = request.args.get("ID")
app.logger.info(case_id)
if not case_id:
return {'error': 101, 'msg': 'delete case_id can not be null'}
Testcase.query.filter_by(ID=case_id).delete()
db.session.commit()
return {'error': 0, 'msg': {'id': case_id}}
class DebugTestCase(Resource):
"""
调试测试用例
"""
def get(self):
case_id = request.args.get("ID")
if case_id:
case_data = Testcase.query.filter_by(ID=case_id).first()
app.logger.info(case_data)
# data = [case_data.as_dict()]
data = {
"ID": case_data.ID,
"CreateTime": json.dumps(case_data.CreateTime, cls=DateEncoder),
"CaseID": case_data.CaseID,
"CaseName": case_data.CaseName,
"Model": case_data.Model,
"UiAppID": case_data.UiAppID,
"SetUp": case_data.SetUp,
"TearDown": case_data.TearDown,
"OperatingStep": case_data.OperatingStep,
"ExpectResult": case_data.ExpectResult,
"Tcase": case_data.Tcase,
"All": case_data.All,
"GitUrl": case_data.GitUrl,
"Command": case_data.Command,
"Remark": case_data.Remark}
return data
if __name__ == '__main__':
api.add_resource(FastTestService, '/testcaselist')
api.add_resource(DebugTestCase, '/debugtestcase')
# debug=True的时候,是热加载
app.run(debug = True, port = 8000)
test_dataapi.py
import pytest
import requests
class TestCase:
"""
测试用例接口模块的测试代码
"""
def setup_class(self):
self.base_url = 'http://127.0.0.1:8000/testcaselist'
def test_get(self):
r = requests.get(self.base_url, params={"ID": 1})
print(r.json())
assert r.status_code == 200
def test_post(self):
data = {"ID": 13,
"CaseID": "case013",
"CaseName": "13",
"CreateTime": "2022-04-21 11:43:19",
"Model": "13",
"UiAppID": "13",
"SetUp": "getckv4",
"TearDown": "setckv4",
"OperatingStep": "{'FileName':'req3.txt','uiappid':'300003'}",
"ExpectResult": "{'Comm.int32_uiappid':'300003','Comm.int32_icm':'0'}",
"Tcase": "tcase",
"GitUrl": "www.baidu.com",
"Command": "python3 main.py /testcase/300003/300003.xsl",
"Remark": "13",}
r = requests.post(self.base_url, json=data)
assert r.status_code == 200
def test_put(self):
data2 = {"ID":"5",
"Remark": "133333"}
r = requests.put(self.base_url, json=data2)
print(r.json())
assert r.status_code == 200
def test_delete(self):
r = requests.delete(self.base_url, params={"ID": 3})
assert r.status_code == 200
if __name__ == '__main__':
pytest.main(['test_dataapi.py'])