白白学习笔记【mitmproxy】:封装mitmproxy,实现数据存储和数据还原

前言:因为mitmdump是使用终端调起服务的,接口监听到的数据不能被外部模块/外部项目直接使用,所以需要将数据建立一个数据通道,方便使用数据

直接上代码

  • 代码文件
    interface.zip (4.2 KB)

  • 环境要求
    python3.8
    mitmproxy7.0.2

  • 目录结构

interface
 ├── __init__.py                 # mitmproxy调用层,便于外部模块调用mitm封装方法
 ├── interface_data              # 存放mitmproxy 监听到的接口数据
 ├── interface_mock              # mitmproxy maplocal数据源,maplocal时从该文件读取数据
 ├── interface_enum.py           # mitmproxy配置文件,管理需要监听的接口
 ├── interface_monitor.py        # mitmproxy执行文件,负责数据监听、数据修改
 └── interface_method.py         # mitmproxy执行文件,负责打开关闭mitmproxy、清除数据、还原数据
  • init.py
"""
@Time   :2021/10/27 7:15 下午
@Author :li
@decs   :__init__.py
"""
from interface_method import Interface


def run(scipt='InterfaceMonitor', port='8080'):
    """
    启动mitmdump监听
    :param scipt:监听方法
    :param port:监听端口
    :return:None
    """
    Interface().run(scipt, port)


def stop():
    """
    停止mitmdump监听
    :return:None
    """
    Interface().stop()


def get(interfacename):
    """
    获取接口数据
    :param interfacename:Enum接口名称
    :return: 接口数据
    """
    return Interface().get(interfacename)


def clean_local():
    """
    清除本地缓存
    :return:
    """
    Interface().clean_local()


if __name__ == '__main__':
    run()
  • interface_enum.py
"""
@Time   :2021/10/27 6:03 下午
@Author :li
@decs   :Interface_enum
"""
from enum import Enum

# 在这里管理需要监听的接口,使用接口path路径的方式进行管理
class InterfacePath(Enum):
    TEST1 = '/test/get'  # 调试接口1
    TEST2 = '/test/post'  # 调试接口2

  • interface_method.py
"""
@Time   :2021/10/27 7:16 下午
@Author :li
@decs   :接口公共方法
"""
import os
import yaml
import threading
import shutil
import requests
from mitmproxy.http import Request, Response, Headers


FILEPATH = os.path.dirname(os.path.abspath(__file__))


class InterfaceData:
    """
    接口数据类
    """
    def __init__(self, interfacepath):
        self.interfacepath = interfacepath.value
        self.request = self._get_request()
        self.response = self._get_response()

    def _get_request(self):
        requestpath = FILEPATH + '/interface_data' + self.interfacepath + '/request.yaml'
        if os.path.exists(requestpath):
            print('获取本地数据,数据源:{}'.format(requestpath))
            requestdata = yaml.safe_load(open(requestpath, 'r', encoding='utf-8'))
            requestdatalist = [
                requestdata['host'],
                requestdata['port'],
                requestdata['method'].encode(),
                requestdata['scheme'].encode(),
                requestdata['authority'].encode(),
                requestdata['path'].encode(),
                requestdata['http_version'].encode(),
                Headers(self._dict_to_list(requestdata['headers'])) if requestdata['headers'] != '' else None,
                requestdata['content'].encode(),
                Headers(self._dict_to_list(requestdata['trailers'])) if requestdata['trailers'] != '' else None,
                requestdata['timestamp_start'],
                requestdata['timestamp_end']
            ]
            return Request(*requestdatalist)

    def _get_response(self):
        responsepath = FILEPATH + '/interface_data' + self.interfacepath + '/response.yaml'
        if os.path.exists(responsepath):
            print('获取本地数据,数据源:{}'.format(responsepath))
            responsedata = yaml.safe_load(open(responsepath, 'r', encoding='utf-8'))
            responsedatalist = [
                responsedata['http_version'].encode(),
                responsedata['status_code'],
                responsedata['reason'].encode(),
                Headers(self._dict_to_list(responsedata['headers'])) if responsedata['headers'] != '' else None,
                responsedata['content'].encode(),
                Headers(self._dict_to_list(responsedata['trailers'])) if responsedata['trailers'] != '' else None,
                responsedata['timestamp_start'],
                responsedata['timestamp_end']
            ]
            return Response(*responsedatalist)

    def _dict_to_list(self, data: dict):
        li = []
        for key in data:
            li2 = data[key]
            for i in li2:
                li.append((key.encode(), i.encode()))
        return li


class Interface:
    _port = None
    _scipt = None
    _thread = None

    def __new__(cls, *args, **kwargs):
        if not hasattr(cls, '_instance'):
            with threading.Lock():
                Interface._instance = super().__new__(cls)
        return Interface._instance

    def run(self, scipt, port):
        print('打开mitmdump服务')
        self._scipt = scipt
        self._port = port
        self._thread = threading.Thread(target=self._proxy)
        self._thread.start()

    def stop(self):
        print('关闭mitmdump服务')
        proxies = {
            'http': 'http://127.0.0.1:{}'.format(self._port)
        }
        requests.get('http://shutdown.mitmdump.com/', proxies=proxies)

    def get(self, interfacename):
        print('获取接口数据,接口路径为:{}'.format(interfacename.value))
        return InterfaceData(interfacename)

    def clean_local(self):
        print('清除本地缓存数据')
        dirname = FILEPATH + '/interface_data'
        if os.path.exists(dirname):
            shutil.rmtree(dirname)
        os.mkdir(dirname)

    def _proxy(self):
        gotodir = 'cd {}'.format(FILEPATH)
        args = 'mitmdump -q -p {} -s interface_monitor.py {}'.format(self._port, self._scipt)
        command = gotodir + '\n' + args
        print(gotodir)
        print(args)
        os.system(command)


if __name__ == '__main__':
    pass

  • interface_monitor.py
"""
@Time   :2021/10/27 7:16 下午
@Author :li
@decs   :mitmdump监听
"""
import os
import sys
import yaml
from mitmproxy import http, ctx
from interface_enum import InterfacePath


FILEPATH = os.path.dirname(os.path.abspath(__file__))
INTERFACEENUM = InterfacePath


class InterfaceMonitor:
    """
    接口数据监听
    """
    def request(self, flow: http.HTTPFlow):
        path = flow.request.path.split('?')[0]
        if path in INTERFACEENUM._value2member_map_:
            print('监听到接口请求:{}'.format(path))
            di = {
                'host': flow.request.host,
                'port': flow.request.port,
                'method': flow.request.method,
                'scheme': flow.request.scheme,
                'authority': flow.request.authority,
                'path': flow.request.path,
                'http_version': flow.request.http_version,
                'headers': self.MultiDict_to_Dict(flow.request.headers),
                'content': flow.request.text,
                'trailers': self.MultiDict_to_Dict(flow.request.trailers),
                'timestamp_start': flow.request.timestamp_start,
                'timestamp_end': flow.request.timestamp_end
            }
            self.data_into_yaml(di, path, 'request')

    def response(self, flow: http.HTTPFlow):
        path = flow.request.path.split('?')[0]
        if path in INTERFACEENUM._value2member_map_:
            print('监听到接口响应:{}'.format(path))
            di = {
                'http_version': flow.response.http_version,
                'status_code': flow.response.status_code,
                'reason': flow.response.reason,
                'headers': self.MultiDict_to_Dict(flow.response.headers),
                'content': flow.response.text,
                'trailers': self.MultiDict_to_Dict(flow.response.trailers),
                'timestamp_start': flow.response.timestamp_start,
                'timestamp_end': flow.response.timestamp_end
            }
            self.data_into_yaml(di, path, 'response')

    def MultiDict_to_Dict(self, data):
        if data is None:
            return ''
        di = {}
        for key in data.keys():
            di[key] = data.get_all(key)
        return di

    def data_into_yaml(self, data, interfacepath, types):
        dirpath = FILEPATH + '/interface_data' + interfacepath
        if not os.path.exists(dirpath):
            os.makedirs(dirpath)

        yamlname = types + '.yaml'
        yamlpath = os.path.join(dirpath, yamlname)

        print('存入接口数据到{}'.format(yamlpath))
        with open(yamlpath, 'w') as f:
            f.write(yaml.safe_dump(data))
            f.flush()
            f.close()


class Shutdown:
    """
    关闭mitmproxy
    """
    def request(self, flow: http.HTTPFlow):
        if flow.request.pretty_url == "http://shutdown.mitmdump.com/":
            flow.response = http.Response.make(
                200,
                'mitmdump shutdown'.encode(),
                {"Content-Type": "text/html"}
            )

    def response(self, flow: http.HTTPFlow):
        if flow.request.pretty_url == "http://shutdown.mitmdump.com/":
            print('mitmdump shutdown')
            ctx.master.shutdown()


class ResponseMock:
    def response(self, flow: http.HTTPFlow):
        path = flow.request.path.split('?')[0]
        interfacename = INTERFACEENUM.RULE_LIST
        if path == interfacename.value:
            print('ResponseMock: {}'.format(path))
            flow.response.text = self.maploacl(interfacename)

    def maploacl(self, interface):
        filedi = {
            InterfacePath.TEST: 'test.txt',
        }
        mockpath = FILEPATH + '/interface_mock/' + filedi[interface]
        with open(mockpath, 'r') as f:
            text = f.read()
        return text


def set_addons():
    li = sys.argv[6].split(',')
    li.append('Shutdown')
    print('mitmdump addons 列表:' + str(li))
    for i in range(len(li)):
        li[i] = globals()[li[i]]()
    return li


addons = set_addons()

  • 简单介绍一下使用方式
"""
@Time   :1:08 下午
@Author :li
@decs   :demo
"""
import time
import interface
from interface.interface_enum import InterfacePath


# 1.清除本地缓存
interface.clean_local()

# 2.运行mitmproxy:默认运行InterfaceMonitor类,默认端口为8080,也可以自己设定
interface.run()
# interface.run(scipt='ResponseMock, InterfaceMonitor', port='8888')

# 3.调用接口,存入数据:程序暂停期间,发起接口请求,将接口数据存入本地
time.sleep(20)

# 4.关闭mitmproxy
interface.stop()

# 5.获取接口数据,获取到的数据等于flow,保留flow.request和flow.response的原汁原味
# request
requestdata = interface.get(InterfacePath.TEST1).request
print(requestdata.url)
print(requestdata.query)
print(requestdata.json())
print(requestdata.headers)

# response
responsedata = interface.get(InterfacePath.TEST1).response
print(responsedata.url)
print(responsedata.json())
print(responsedata.headers)

1 个赞