flask_sqlalchemy中delete方法调用成功,但是数据库数据并没有被删除?

image
为啥delete接口调用成功了,却删除不了数据库对应数据,
image
而不用delete方法就能成功删除数据库数据

把你完整的代码上传到gitee上 贴出来我看看

mysqldemo.py

# -*- coding: utf-8 -*-
from flask_sqlalchemy import SQLAlchemy
from flask import Flask
import datetime
from mysql import connector

# 实例化flask对象
app = Flask(__name__)
username = 'root'
pwd = '123456'
ip = '127.0.0.1'
port = '3306'
database = 'strategy_test'
app.config['SQLALCHEMY_DATABASE_URI'] = f'mysql+mysqlconnector://{username}:{pwd}@{ip}:{port}/{database}'
# 解决warning问题
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = True
db = SQLAlchemy(app)


class Testcase(db.Model):
    """
    一个类表示一个表
    """
    # 设定表名
    __tablename__ = 'testcaselist'
    # 每个类属性,表示这个表的表头数据
    ID = db.Column(db.Integer, primary_key=True)
    CaseID = db.Column(db.String(500), unique=True)
    CaseName = db.Column(db.String(500))
    Model = db.Column(db.String(500))
    UiAppID = db.Column(db.String(500))
    SetUp = db.Column(db.String(500))
    TearDown = db.Column(db.String(500))
    OperatingStep = db.Column(db.String(500))
    ExpectResult = db.Column(db.String(500))
    Tcase = db.Column(db.String(500))
    All = db.Column(db.String(500))
    GitUrl = db.Column(db.String(500))
    Command = db.Column(db.String(500))
    Remark = db.Column(db.String(500))
    CreateTime = db.Column(db.DateTime, default = datetime.datetime.now)

    def __repr__(self):
        return '<User %r>' % self.CaseName
    def as_dict(self):
        data = {"id": self.ID,
                "CaseID": self.CaseID,
                "CaseName": self.CaseName,
                "Model": self.Model,
                "uiAppID": self.UiAppID,
                "SetUp": self.SetUp,
                "TearDown": self.TearDown,
                "OperatingStep": self.OperatingStep,
                "ExpectResult": self.ExpectResult,
                "Tcase": self.Tcase,
                "All": self.All,
                "GitUrl": self.GitUrl,
                "Command": self.Command,
                "Remark": self.Remark}
        return data

if __name__ == '__main__':
    # 创建
    # db.create_all()
    # 增删改查
    # 增加数据信息
    # user1 = Testcase(ID=12,
    #              CaseID="case012",
    #              CaseName="baimingdan12",
    #              Model="model",
    #              UiAppID="300001",
    #              SetUp="getckv1",
    #              TearDown="setckv1",
    #              OperatingStep="{'FileName':'req.txt','uiappid':'300001'}",
    #              ExpectResult="{'Comm.int32_uiappid':'300001','Comm.int32_icm':'0'}",
    #              Tcase="tcase",
    #                 All = "all",
    #              GitUrl = "www.baidu.com",
    #              Command="python3 main.py /testcase/300001/300001.xsl",
    #              Remark="remark12")
    # user2 = Testcase(ID=2,
    #              CaseID="case002",
    #              CaseName="白名单2",
    #              Model = "媒体类",
    #              UiAppID = "300001",
    #              SetUp="getckv2",
    #              TearDown="setckv2",
    #              OperatingStep="{'FileName':'req.txt','uiappid':'300002'}",
    #              ExpectResult="{'Comm.int32_uiappid':'300002','Comm.int32_icm':'0'}",
    #              Tcase="tcase",
    #              All = "all",
    #              GitUrl="www.baidu.com",
    #              Command="python3 main.py /testcase/300002/300002.xsl",
    #              Remark="备注2")

    # db.session.add(user1)
    # db.session.add(user2)
    # case = Testcase.query.filter_by(ID=20).delete()
    case = Testcase.query.filter_by(ID=5).update({"ID":"5","Remark":"9999999"})
    db.session.commit()
   # case = Testcase.query.filter_by(ID=1).first()
   # print(case)

dataapi.py

import json
from flask import Flask, request
from flask_restful import Resource, Api
from flask_sqlalchemy import SQLAlchemy
from mysqldemo import Testcase
from flask_cors import CORS
from dataencoder import DateEncoder

# todo:
# 获取请求参数
# 获取请求体

# 实例化flask对象
app = Flask(__name__)
# 解决跨域问题
CORS(app, supports_credentials=True)
username = 'root'
pwd = '123456'
ip = '127.0.0.1'
port = '3306'
database = 'strategy_test'
app.config['SQLALCHEMY_DATABASE_URI'] = f'mysql+pymysql://{username}:{pwd}@{ip}:{port}/{database}?charset=utf8'
# 解决warning问题
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = True
db = SQLAlchemy(app)
api = Api(app)


class FastTestService(Resource):
    """
    测试用例服务
    """
    # 方法名对应app.route中的methods
    # 查询接口
    # @app.route('/testcase', methods = ['get'])
    def get(self):
        case_id = request.args.get("ID")
        if case_id:
            case_data = Testcase.query.filter_by(ID=case_id).first()
            app.logger.info(case_data)
            # data = [case_data.as_dict()]
            data = {
                "ID": case_data.ID,
                "CreateTime": json.dumps(case_data.CreateTime, cls=DateEncoder),
                "CaseID": case_data.CaseID,
                "CaseName": case_data.CaseName,
                "Model": case_data.Model,
                "UiAppID": case_data.UiAppID,
                "SetUp": case_data.SetUp,
                "TearDown": case_data.TearDown,
                "OperatingStep": case_data.OperatingStep,
                "ExpectResult": case_data.ExpectResult,
                "Tcase": case_data.Tcase,
                "All": case_data.All,
                "GitUrl": case_data.GitUrl,
                "Command": case_data.Command,
                "Remark": case_data.Remark}
        else:
            case_data = Testcase.query.all()
            # data = [i.as_dict() for i in case_data]
            data = [{
                "ID": i.ID,
                "CreateTime": json.dumps(i.CreateTime, cls=DateEncoder),
                 "CaseID": i.CaseID,
                 "CaseName": i.CaseName,
                "Model": i.Model,
                "UiAppID": i.UiAppID,
                 "SetUp": i.SetUp,
                 "TearDown": i.TearDown,
                 "OperatingStep": i.OperatingStep,
                 "ExpectResult": i.ExpectResult,
                 "Tcase": i.Tcase,
                 "GitUrl": i.GitUrl,
                 "Command": i.Command,
                 "Remark": i.Remark
            }for i in case_data]
            app.logger.info(data)
        return data

    # 添加接口
    # @app.route('/testcase', methods = ['post'])
    def post(self):
        case_data = request.json
        app.logger.info(case_data)
        testcase = Testcase(**case_data)
        app.logger.info(testcase)
        db.session.add(testcase)
        db.session.commit()
        return {'error': 0, 'msg': 'postsuccess'}

    # 修改接口
    # @app.route('/testcase', methods = ['put'])
    def put(self):
        case_id = request.json.get('ID')
        app.logger.info(case_id)
        app.logger.info(request.json)
        case = Testcase.query.filter_by(ID=case_id).update(request.json)
        db.session.commit()
        app.logger.info(case)
        return {'error': 0, 'msg': {'id': case_id}}

    # 删除接口
    # @app.route('/testcase', methods = ['delete'])
    def delete(self):
        case_id = request.args.get("ID")
        app.logger.info(case_id)
        if not case_id:
            return {'error': 101, 'msg': 'delete case_id can not be null'}
        Testcase.query.filter_by(ID=case_id).delete()
        db.session.commit()
        return {'error': 0, 'msg': {'id': case_id}}



class DebugTestCase(Resource):
    """
    调试测试用例
    """
    def get(self):
        case_id = request.args.get("ID")
        if case_id:
            case_data = Testcase.query.filter_by(ID=case_id).first()
            app.logger.info(case_data)
            # data = [case_data.as_dict()]
            data = {
                "ID": case_data.ID,
                "CreateTime": json.dumps(case_data.CreateTime, cls=DateEncoder),
                "CaseID": case_data.CaseID,
                "CaseName": case_data.CaseName,
                "Model": case_data.Model,
                "UiAppID": case_data.UiAppID,
                "SetUp": case_data.SetUp,
                "TearDown": case_data.TearDown,
                "OperatingStep": case_data.OperatingStep,
                "ExpectResult": case_data.ExpectResult,
                "Tcase": case_data.Tcase,
                "All": case_data.All,
                "GitUrl": case_data.GitUrl,
                "Command": case_data.Command,
                "Remark": case_data.Remark}
        return data


if __name__ == '__main__':
    api.add_resource(FastTestService, '/testcaselist')
    api.add_resource(DebugTestCase, '/debugtestcase')
    # debug=True的时候,是热加载
    app.run(debug = True, port = 8000)

test_dataapi.py

import pytest
import requests

class TestCase:
    """
    测试用例接口模块的测试代码
    """
    def setup_class(self):
        self.base_url = 'http://127.0.0.1:8000/testcaselist'


    def test_get(self):
        r = requests.get(self.base_url, params={"ID": 1})
        print(r.json())
        assert r.status_code == 200

    def test_post(self):
        data = {"ID": 13,
                "CaseID": "case013",
                "CaseName": "13",
                "CreateTime": "2022-04-21 11:43:19",
                "Model": "13",
                "UiAppID": "13",
                "SetUp": "getckv4",
                "TearDown": "setckv4",
                "OperatingStep": "{'FileName':'req3.txt','uiappid':'300003'}",
                "ExpectResult": "{'Comm.int32_uiappid':'300003','Comm.int32_icm':'0'}",
                "Tcase": "tcase",
                "GitUrl": "www.baidu.com",
                "Command": "python3 main.py /testcase/300003/300003.xsl",
                "Remark": "13",}
        r = requests.post(self.base_url, json=data)
        assert r.status_code == 200

    def test_put(self):
        data2 = {"ID":"5",
            "Remark": "133333"}
        r = requests.put(self.base_url, json=data2)
        print(r.json())
        assert r.status_code == 200

    def test_delete(self):
        r = requests.delete(self.base_url, params={"ID": 3})
        assert r.status_code == 200
if __name__ == '__main__':
    pytest.main(['test_dataapi.py'])

数据库不要分开声明。 你就先把mysqldemo.py 里面 的数据对象放到
和接口的一个文件内,挪到这里

关闭