作业
'''
完成学生系统的多任务web服务器
以面向对象方式实现,使用字典表示学生信息,不需要封装学生类
实现增删改查接口,返回json格式数据
请求方式为:
添加:http://127.0.01:8888/add?sid=s09&name=lucy&age=32&gender=male
修改:http://127.0.01:8888/change?sid=s09&name=lucys&age=31&gender=male
删除:http://127.0.01:8888/query?sid=s09&name=lucy
查询:http://127.0.01:8888/del?sid=s09
数据需要使用文件 db.txt 进行持久化存储,并保证数据的有效性
'''
import json
import os
import socket
import threading
import templates.app
from botocore import args
import templates
# 启动服务器函数
def startServer():
# 创建 socket 对象
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# 设置复用端口
server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, True)
# 获取本机名称与ip地址,绑定ip和端口
hostname = socket.gethostname()
ip = socket.gethostbyname(hostname)
server.bind((ip, 8888))
# 启动服务监听,最大128等待数
server.listen(128)
# 循环接收客户端请求
while True:
# 接收客户端的连接,返回客户端的socket对象和IP_port
client, ip_port = server.accept()
# 创建一个子线程处理客户端请求,主线程再去接收其他客户端的请求
t = threading.Thread(target=handleClientRequest, args=(client,))
# 设置守护线程
t.daemon = True
# 启动子线程处理客户端线程
t.start()
# 处理客户端请求
def handleClientRequest(client):
# 读取客户端的请求内容
read_data = client.recv(4096).decode("utf-8")
# 判断与客户端的连接是否断开
if len(read_data) == 0:
client.close()
return
# 解析客户端请求数据
request = parseRequest(read_data)
# 根据用户请求去做相应的处理,不同的请求使用不用的函数进行处理,这个处理函数称为接口,找接口的过程,称为路由
response = router(request)
# 服务器将响应数据返回给客户端
client.send(response)
# 服务器为客户端提供一次服务完成,关闭连接
client.close()
# 解析报文函数
def parseRequest(read_data):
# 用来保存数据的字典
request = {
"method": "",
"path": "",
"values": {}
}
# 先获取到第一行请求行数据
read_data = read_data.split()
# 保存请求方法
request['method'] = read_data[0]
# 处理请求路径和参数
path = read_data[1]
if '?' in path:
tmp = path.split("?")
# 保存路径
path = tmp[0]
# 提取参数
params = tmp[1].split("&")
for s in params:
# 分解析查询参数字符串
k, v = s.split("=")
request["values"][k] = v
# 保存请求路径
request["path"] = path
# 返回解析结果
return request
# 路由函数
def router(request):
# 取出客户端请求路径
path = request.get("path")
# 取出客户端请求数据
data = request.get("values")
print(f'data={data}')
students = Start(data)
# 使用 i f 实现一个简单的路由
print(f'走哪个呢=-=-=-=-={path}')
# 拼装完整的响应报文
respones = "HTTP/1.1 200 OK\r\n"
contenttype = "Content-Type: text/html; charset=UTF-8\r\n"
response_body = ''
if path == '/index' or path == '/' or path == '/indexs.html':
response_body = students.index()
elif path == '/add' or path == '/add.html':
response_body = students.add()
elif path == '/change' or path == '/change.html':
response_body = students.change()
elif path == '/query' or path == '/query.html':
response_body = students.query()
elif path == '/del' or path == '/del.html':
response_body = students.sel_user()
elif path == '/favicon.ico':
response_body = students.image_ico()
contenttype = 'image/avif,image/webp,image/apng,image/svg+xml,image/*,*/*;q=0.8' # 图片类型
respones += "Content-Type: text/html; charset=UTF-8\r\n"
respones += contenttype
respones += "\r\n"
# 因为使用的是TCP字节流传输数据,所以要对响应数据进行转换类型
if type(response_body) == str:
response_body = response_body.encode('utf-8')
respones = respones.encode('utf-8')
respones += response_body
print(f'response_body={respones}')
return respones
class File:
# 文件写入数据函数
def __init__(self, db):
self.db = db
def add_file(self):
file = open("db.txt", "w", encoding="UTF-8")
file.write(self.db)
file.close()
return True
# 读取文件函数
def read_file():
file_path = "db.txt"
# 判断文件是否存在
if not os.path.exists(file_path):
file = open(file_path, "w", encoding="UTF-8")
file.close()
print('文件新创建成功')
file = open("db.txt", "r", encoding="UTF-8")
content = file.read()
file.close()
if content != "":
content = json.loads(content)
elif content == "" or content is None:
content = []
return content
# 以下均为接口函数
class Start:
def __init__(self, user):
self.user = []
self.student = user
print(self.student)
def index(self):
with open("templates/indexs.html", "r", encoding="utf-8") as file:
return file.read()
def add(self):
self.user = File.read_file()
global stu
if self.student != {}:
if self.user:
for i in range(len(self.user)):
if self.student["sid"] == self.user[i]["sid"]:
stu = True
break
else:
stu = False
else:
stu = None
if stu == True:
return "<h1>学员编号已存在</h1>"
else:
self.user.append(self.student)
files = File(json.dumps(self.user))
files.add_file()
with open("templates/add.html", "r", encoding="utf-8") as file:
return file.read()
else:
with open("templates/add.html", "r", encoding="utf-8") as file:
return file.read()
def change(self):
self.user = File.read_file()
global stu
if self.student != {}:
if self.user:
for i in range(len(self.user)):
if self.student["sid"] == self.user[i]["sid"]:
stu = True
num =i
break
else:
stu = False
else:
stu = None
if stu == True:
self.user[i] = self.student
files = File(json.dumps(self.user))
files.add_file()
with open("templates/add.html", "r", encoding="utf-8") as file:
return file.read()
else:
return "<h1>学员编号不存在</h1>"
else:
with open("templates/change.html", "r", encoding="utf-8") as file:
return file.read()
def query(self):
self.user = File.read_file()
global stu
global num
num_list = []
if self.student["sid"] != "":
# 请求时有sid
if self.user: # 查询文件的数据不为空
for i in range(len(self.user)):
if self.student["sid"] == self.user[i]["sid"]:
stu = True
num =i
break
else:
stuid = False
else:
stu = None
if stu:
self.user.pop(self.user[num])
files = File(json.dumps(self.user))
files.add_file()
with open("templates/add.html", "r", encoding="utf-8") as file:
return file.read()
else:
return "学员编号不存在"
elif self.student["name"] != "":
# 请求时没有sid有name
if self.user: # 查询文件的数据不为空
for i in range(len(self.user)):
if self.student["name"] == self.user[i]["name"]:
num_list.append(i)
else: # 查询文件为空
stu = None
if len(num_list) >0:
for i in range(len(numlist)):
self.user.remove(self.user[len(num_list[i])])
files = FileWork(json.dumps(self.user))
files.add_file()
with open("templates/query.html", "r", encoding="utf-8") as file:
return file.read()
else:
return "学员姓名不存在"
elif self.student["name"] == "" and self.student["sid"] == "":
# 请求时没有参数
return "删除时没有提供必要参数"
def sel_user(self):
global num, stu
num_list = []
self.user = File.read_file()
if self.student["sid"] != "":
if self.user:
for i in range(len(self.user)):
if self.student["sid"] == self.user[i]["sid"]:
num = i
stu = True
break
else:
stu = False
else:
stu = None
if stu:
return f"查询的学生信息,编号:{self.user[num]['sid']},姓名:{self.user[num]['name']},年龄:{self.user[num]['age']},性别:{self.user[num]['gender']}"
elif stu is None or stu == False:
return "学生信息不存在"
elif self.student["name"] != "":
if self.user:
for i in range(len(self.user)):
if self.student["name"] == self.user[i]["name"]:
num_list.append(i)
else:
stu = None
if stu is None or num_list == []:
return "学生信息不存在"
else:
for i in range(len(num_list)):
return f"学生信息:编号:{self.user[num_list[i]]['sid']},姓名L{self.user[num_list[i]]['name']},年龄:{self.user[num_list[i]]['age']},性别:{self.user[num_list[i]]['geender']}"
elif self.student["sid"] == "" and self.student["name"] == "":
for i in range(len(self.user)):
return f"学生信息:编号:{self.user[i]['sid']},姓名L{self.user[i]['name']},年龄:{self.user[i]['age']},性别:{self.user[i]['geender']}"
def image_ico(self):
path = 'images/hogwarts.png'
with open(path, "rb") as file:
return file.read()
# 程序入口
if __name__ == '__main__':
startServer()