WebServer:
"""
__author__ = '霍格沃兹测试开发学社'
__desc__ = '更多测试开发技术探讨,请访问:https://ceshiren.com/t/topic/15860'
"""
# 导包
import socket
import threading
# 启动服务器的函数
def startServer():
# 创建一个socket 对象
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# 设置复用端口
server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, True)
# 绑定IP与端口, 参数是一个元组,IP是字符串,端口是数字
server.bind(("", 8888))
# 启动服务器的监听
server.listen(256)
print("MyWebServer String On 127.0.0.1:8888")
# 使用死循环接受客户端的请求
while True:
# 接收客户端的连接,返回客户端的socker对象和IP_port
client, ip_port = server.accept()
print(f"客户端 {ip_port[0]} 使用 { ip_port[1] } 端口连接成功")
# 创建一个子线程去处理客户端的请求,主线程再去接受其它客户端的请求
t = threading.Thread(target=handleClientRequest, args=(client,))
# 设置守护线程
t.daemon = True
# 启动子线程去处理客户端请求
t.start()
# 用来处理用户请求的函数
def handleClientRequest(client):
# 读取客户端的请求内容
recv_data = client.recv(4096).decode("utf-8")
# 判断与客户端的连接是否断开
if len(recv_data) == 0:
client.close()
return
# 解析客户端请求数据
request = parseRequest(recv_data)
# 根据用户请求去做相应的处理,不同的请求使用不同的函数进行处理,这个处理函数称为接口,找接口的过程,称为路由
response = router(request)
# 服务器将响应数据返回给客户端
client.send(response)
# 服务器为客户端 提供一次服务完成,关闭连接
client.close()
# 用来解析请求报文的函数
def parseRequest(recv_data):
# 用来保存数据的字典
request = {
"method": "",
"path": "",
"values": {}
}
# 先获取到第一行请求行数据
recv_data = recv_data.split()
# 保存请求方法
request["method"] = recv_data[0]
# 处理请求路径 和参数
path = recv_data[1]
if "?" in path:
tmp = path.split("?")
# 保存路径
path = tmp[0]
# 提取参数
params = tmp[1].split("&")
for s in params:
# 分解析查询参数字符串
k, v = s.split("=")
request["values"][k] = v
# 保存请求路径
request["path"] = path
# 返回解析结果
return request
# GET /login ? nusername=socker&passwd=123123123 HTTP/1.1
# 路由函数
def router(request):
# 取出客户端 的请求路径
path = request.get("path")
# 使用 if 实现一个简单的路由
response_body = ""
if path == "/index":
response_body = index()
elif path == "/list":
response_body = list()
elif path == "/news":
response_body = news()
elif path == "/dandan":
response_body = dandan()
elif path == "/query":
response_body = query()
else:
response_body = images(path)
# 拼装完整的响应报文
response = "HTTP/1.1 200 OK\r\n"
# image/avif,image/webp,image/apng,image/svg+xml,image/*,*/*;q=0.8
response += "Content-Type: text/html;charset=utf-8\r\n"
response += "Server: MyWebServer V1.0\r\n"
response += "\r\n"
# 因为使用的是TCP字节流传输数据,所以要对响应数据进行转换类型
response = response.encode("utf-8")
response += response_body
print(response)
return response
# 下面这些函数称为接口函数,用来对不同的请求路径 接收相应的数据信息
# /index
def index():
with open("templates/index.html", "rb") as file:
return file.read()
# /list
def list():
with open("templates/list.html", "rb") as file:
return file.read()
# /news
def news():
with open("templates/news.html", "rb") as file:
return file.read()
# /dandan
def dandan():
with open("templates/index_back.html", "rb") as file:
return file.read()
# /images/img16.jpg
# 用来处理处理的请求接口
def images(path):
path = "./templates" + path # ./templages/images/img16.jpg
with open(path, "rb") as file:
return file.read()
# 返回一个json格式的数据接口 ( JSON js对象简谱)
import json
def query():
# python vs js
# 列表 vs 数组 []
# 字典 vs 对象 {}
# json字符串 中出现的所有字符串,必须使用 双引号包裹
data = [
{"name": "Tom", "age":23,"gender": "male"},
{"name": "Tom", "age":23,"gender": "male"},
{"name": "Tom", "age":23,"gender": "male"},
{"name": "Tom", "age":23,"gender": "male"}
]
return json.dumps(data).encode()
# 程序入口
if __name__ == '__main__':
startServer()
Pytest 框架
"""
__author__ = '霍格沃兹测试开发学社'
__desc__ = '更多测试开发技术探讨,请访问:https://ceshiren.com/t/topic/15860'
"""
import WebServer
def test_case1():
assert WebServer.list() == b'asdfasdf asdf asdf '