python使用locust的client发布送文件请求

数据源:

JSONArray array = object.getJSONArray("files");
        array.stream()
                .filter(e -> array != null)
                .filter(e -> !array.isEmpty())
                .forEach(obj -> {
                    JSONObject json = (JSONObject) obj;
                    String[] names = StringUtils.split(json.get("name").toString(), "/");
                    // 取出content字符转成流循环构建进入 multiPart中
                    requestSpecification.multiPart("files", names[names.length - 1], new ByteArrayInputStream(ReadJsonUtils.objectToJson(json.get("content")).getBytes(Charset.defaultCharset())));
                });
        return requestSpecification
                .post(url
                .then().statusCode(200).extract().response();

我现在想使用locuts中的client进行 http请求 我应该怎么传入以上的数据 以下为python代码

       """
        模拟发送数据
        """
        form_data = {
            "roleList": [],
            "resourceList": [],
            "resourceDataList": [],
            "roleResourceMappingList": [],
            "reportIdList": [],
            "appId": app_id,
            "env": "",
            "logicPageResourceDtoList": {},
            "logicAuthFlag": False,
            "callLogicValidations": {},
            "callbackLogicsName": [],
            "frontends": [],
            "replicas": 1,
        }
        with open('resources/deployment.json', 'r', encoding='utf-8') as deployment_file:
            deployment_str = json.load(deployment_file)
        shared_keys = set(form_data.keys()) & set(deployment_str.keys())
        form_data.update({key: deployment_str[key] for key in shared_keys})
 
        files = deployment_str["files"]
        file_list = []
        for file_data in files:
            aa = {"files", io.BytesIO(file_data.get("content").encode('utf-8'))}
            file_list.append(aa)
 
        headers = {
            "Cookie": self.parent.cookie,
            "Content-Type": "multipart/form-data",
        }
 
        response = self.client.post(url="/api/v1/app/deployments", data=form_data, files=file_list, headers=headers)
        if response.status_code == 200:
            response_body = response.json()
            print(f"response_body: {response_body}")
            code = response_body.get('code')
            if code == 200:
                print(f"请求成功")
            else:
                request_id = response.headers['requestId']
                print(f"请求失败: {request_id}")
        else:
            print(f"接口调用失败: {response.status_code}")

你的请求方法,请求url 请求头,传递的formdata数据都是什么?
以下使用request完成的:

传递formdata 使用data参数传递

  • r=requests.请求方法(请求url,data=formdata数据,headers=headers)
  • r.status_code #响应状态码
  • r.json() #响应数据
    写的简单的逻辑你看下

上面的参数在 form_data = {
“roleList”: ,
“resourceList”: ,
“resourceDataList”: ,
“roleResourceMappingList”: ,
“reportIdList”: ,
“appId”: app_id,
“env”: “”,
“logicPageResourceDtoList”: {},
“logicAuthFlag”: False,
“callLogicValidations”: {},
“callbackLogicsName”: ,
“frontends”: ,
“replicas”: 1,
}中 底下的二进制文件不知道怎么传

class TestDeploymentLocust(TaskSet):

    # @task(1)  # 任务的权重为1,如果有多个任务,可以将权重值定义成不同的值,
    def deployment(self):
        try:
            app_id = self.user.queue_data.get()
        except queue.Empty:
            print("没有可用的app_id")
            exit(1)

        if app_id is None:
            print("获取到的appId为空")
            exit(1)

        print(f"appId: {app_id}")
        """
        模拟发送数据
        """
        form_data = {
            "roleList": [],
            "resourceList": [],
            "resourceDataList": [],
            "roleResourceMappingList": [],
            "reportIdList": [],
            "appId": app_id,
            "env": "",
            "logicPageResourceDtoList": {},
            "logicAuthFlag": False,
            "callLogicValidations": {},
            "callbackLogicsName": [],
            "frontends": [],
            "replicas": 1,
        }
        with open('resources/deployment.json', 'r', encoding='utf-8') as deployment_file:
            deployment_str = json.load(deployment_file)
        shared_keys = set(form_data.keys()) & set(deployment_str.keys())
        form_data.update({key: deployment_str[key] for key in shared_keys})

        files = deployment_str["files"]
        file_list = []
        for file_data in files:
            aa = {"files", io.BytesIO(file_data.get("content").encode('utf-8'))}
            file_list.append(aa)

        headers = {
            "Cookie": self.parent.cookie,
            "Content-Type": "multipart/form-data",
        }

        response = self.client.post(url="/api/v1/app/deployments", data=form_data, files=file_list, headers=headers)
        if response.status_code == 200:
            response_body = response.json()
            print(f"response_body: {response_body}")
            code = response_body.get('code')
            if code == 200:
                print(f"请求成功")
            else:
                request_id = response.headers['requestId']
                print(f"请求失败: {request_id}")
        else:
            print(f"接口调用失败: {response.status_code}")
class WebsiteUser(HttpUser):
    tasks = [TestDeploymentLocust]
    host = "http://autocode.lcap.hatest.163yun.com"
    wait_time = between(3, 10)
    app_ids = read_app_ids("resources/appId.json")
    queue_data = queue.Queue()
    # 将所有元素依次放入队列中
    for app_id in app_ids:
        queue_data.put(app_id)
    cookie = login()