数据库数据发生改变了,
flask_sqlalchemy通过flask查询接口获取到的数据还是老的数据?查询之前我加了db.session.commit()也不行
import json
import datetime
from urllib import parse
from flask import Flask, request
from flask_restful import Resource, Api
from flask_sqlalchemy import SQLAlchemy
from flask_cors import CORS
from common.make_bon import run_case_one
# from common.make_bon import set_get_ckv
from common.dataencoder import DateEncoder
# todo:
# 获取请求参数
# 获取请求体
# 实例化flask对象
testccaseweb = Flask(__name__)
# 解决跨域问题
CORS(testccaseweb, supports_credentials=True)
testccaseweb = Flask(__name__)
username = '‘
password = ''
ip = ''
port = '3306'
database = ''
pwd = parse.quote_plus(password)
testccaseweb.config['SQLALCHEMY_DATABASE_URI'] = f'mysql+pymysql://{username}:{pwd}@{ip}:{port}/{database}'
# 解决warning问题
testccaseweb.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = True
testccaseweb.config['SQLALCHEMY_COMMIT_ON_TEARDOWN'] = True
class UnlockedAlchemy(SQLAlchemy):
def apply_driver_hacks(self, app, info, options):
if "isolation_level" not in options:
options["isolation_level"] = "READ COMMITTED"
return super(UnlockedAlchemy, self).apply_driver_hacks(app, info, options)
db = UnlockedAlchemy(testccaseweb)
api = Api(testccaseweb)
class FastTestService(Resource):
"""
测试用例服务
"""
# 方法名对应app.route中的methods
# 查询接口
# @app.route('/testcase', methods = ['get'])
def get(self):
db.session.commit()
case_id = request.args.get("id")
if case_id:
case_data = Testcase.query.filter_by(id=case_id).first()
testccaseweb.logger.info(case_data)
# data = [case_data.as_dict()]
data = {
"id": case_data.id,
"project": case_data.project,
"module": case_data.module,
"businessid": case_data.businessid,
"title": case_data.title,
"directory": case_data.directory,
"scene": case_data.scene,
"setup": case_data.setup,
"teardown": case_data.teardown,
"steps": case_data.steps,
"expect": case_data.expect,
"remarks": case_data.id,
"label": case_data.id,
"level": case_data.id,
"type": case_data.id,
"concurrent": case_data.id,
"other": case_data.id,
"extend": case_data.id,
"CreateTime": json.dumps(case_data.CreateTime, cls=DateEncoder)}
else:
db.session.commit()
case_data = Testcase.query.all()
# data = [i.as_dict() for i in case_data]
data = [{
"id": i.id,
"project": i.project,
"module": i.module,
"businessid": i.businessid,
"title": i.title,
"directory": i.directory,
"scene": i.scene,
"setup": i.setup,
"teardown": i.teardown,
"steps": i.steps,
"expect": i.expect,
"remarks": i.id,
"label": i.id,
"level": i.id,
"type": i.id,
"concurrent": i.id,
"other": i.id,
"extend": i.id,
"CreateTime": json.dumps(i.CreateTime, cls=DateEncoder)
}for i in case_data]
testccaseweb.logger.info(data)
return data