Python 测开28期 - WL - 学习笔记 - python Lv4 作业

作业

'''
完成学生系统的多任务web服务器
以面向对象方式实现,使用字典表示学生信息,不需要封装学生类
实现增删改查接口,返回json格式数据
请求方式为:
添加:http://127.0.01:8888/add?sid=s09&name=lucy&age=32&gender=male
修改:http://127.0.01:8888/change?sid=s09&name=lucys&age=31&gender=male
删除:http://127.0.01:8888/query?sid=s09&name=lucy
查询:http://127.0.01:8888/del?sid=s09
数据需要使用文件 db.txt 进行持久化存储,并保证数据的有效性
'''
import json
import os
import socket
import threading
import templates.app

from botocore import args

import templates


# 启动服务器函数
def startServer():
    # 创建 socket 对象
    server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    # 设置复用端口
    server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, True)
    # 获取本机名称与ip地址,绑定ip和端口
    hostname = socket.gethostname()
    ip = socket.gethostbyname(hostname)
    server.bind((ip, 8888))
    # 启动服务监听,最大128等待数
    server.listen(128)
    # 循环接收客户端请求
    while True:
        # 接收客户端的连接,返回客户端的socket对象和IP_port
        client, ip_port = server.accept()
        # 创建一个子线程处理客户端请求,主线程再去接收其他客户端的请求
        t = threading.Thread(target=handleClientRequest, args=(client,))
        # 设置守护线程
        t.daemon = True
        # 启动子线程处理客户端线程
        t.start()


# 处理客户端请求
def handleClientRequest(client):
    # 读取客户端的请求内容
    read_data = client.recv(4096).decode("utf-8")
    # 判断与客户端的连接是否断开
    if len(read_data) == 0:
        client.close()
        return
    # 解析客户端请求数据
    request = parseRequest(read_data)
    # 根据用户请求去做相应的处理,不同的请求使用不用的函数进行处理,这个处理函数称为接口,找接口的过程,称为路由
    response = router(request)
    # 服务器将响应数据返回给客户端
    client.send(response)
    # 服务器为客户端提供一次服务完成,关闭连接
    client.close()


# 解析报文函数
def parseRequest(read_data):
    # 用来保存数据的字典
    request = {
        "method": "",
        "path": "",
        "values": {}
    }
    # 先获取到第一行请求行数据
    read_data = read_data.split()
    # 保存请求方法
    request['method'] = read_data[0]
    # 处理请求路径和参数
    path = read_data[1]
    if '?' in path:
        tmp = path.split("?")
        # 保存路径
        path = tmp[0]
        # 提取参数
        params = tmp[1].split("&")
        for s in params:
            # 分解析查询参数字符串
            k, v = s.split("=")
            request["values"][k] = v
    # 保存请求路径
    request["path"] = path
    # 返回解析结果
    return request


# 路由函数
def router(request):
    # 取出客户端请求路径
    path = request.get("path")
    # 取出客户端请求数据
    data = request.get("values")
    print(f'data={data}')
    students = Start(data)
    # 使用 i f 实现一个简单的路由
    print(f'走哪个呢=-=-=-=-={path}')
    # 拼装完整的响应报文
    respones = "HTTP/1.1 200 OK\r\n"
    contenttype = "Content-Type: text/html; charset=UTF-8\r\n"
    response_body = ''
    if path == '/index' or path == '/' or path == '/indexs.html':
        response_body = students.index()
    elif path == '/add' or path == '/add.html':
        response_body = students.add()
    elif path == '/change' or path == '/change.html':
        response_body = students.change()
    elif path == '/query' or path == '/query.html':
        response_body = students.query()
    elif path == '/del' or path == '/del.html':
        response_body = students.sel_user()
    elif path == '/favicon.ico':
        response_body = students.image_ico()
        contenttype = 'image/avif,image/webp,image/apng,image/svg+xml,image/*,*/*;q=0.8'  # 图片类型
    respones += "Content-Type: text/html; charset=UTF-8\r\n"
    respones += contenttype
    respones += "\r\n"
    # 因为使用的是TCP字节流传输数据,所以要对响应数据进行转换类型
    if type(response_body) == str:
        response_body = response_body.encode('utf-8')
    respones = respones.encode('utf-8')
    respones += response_body
    print(f'response_body={respones}')
    return respones


class File:
    # 文件写入数据函数
    def __init__(self, db):
        self.db = db

    def add_file(self):
        file = open("db.txt", "w", encoding="UTF-8")
        file.write(self.db)
        file.close()
        return True

    # 读取文件函数
    def read_file():
        file_path = "db.txt"
        # 判断文件是否存在
        if not os.path.exists(file_path):
            file = open(file_path, "w", encoding="UTF-8")
            file.close()
            print('文件新创建成功')
        file = open("db.txt", "r", encoding="UTF-8")
        content = file.read()
        file.close()
        if content != "":
            content = json.loads(content)
        elif content == "" or content is None:
            content = []
        return content


# 以下均为接口函数
class Start:
    def __init__(self, user):
        self.user = []
        self.student = user
        print(self.student)

    def index(self):
        with open("templates/indexs.html", "r", encoding="utf-8") as file:
            return file.read()

    def add(self):
        self.user = File.read_file()
        global stu
        if self.student != {}:
            if self.user:
                for i in range(len(self.user)):
                    if self.student["sid"] == self.user[i]["sid"]:
                        stu = True
                        break
                    else:
                        stu = False
            else:
                stu = None
            if stu == True:
                return "<h1>学员编号已存在</h1>"
            else:
                self.user.append(self.student)
                files = File(json.dumps(self.user))
                files.add_file()
                with open("templates/add.html", "r", encoding="utf-8") as file:
                    return file.read()
        else:
            with open("templates/add.html", "r", encoding="utf-8") as file:
                return file.read()

    def change(self):
        self.user = File.read_file()
        global stu
        if self.student != {}:
            if self.user:
                for i in range(len(self.user)):
                    if self.student["sid"] == self.user[i]["sid"]:
                        stu = True
                        num =i
                        break
                    else:
                        stu = False
            else:
                stu = None
            if stu == True:
                self.user[i] = self.student
                files = File(json.dumps(self.user))
                files.add_file()
                with open("templates/add.html", "r", encoding="utf-8") as file:
                    return file.read()
            else:
                return "<h1>学员编号不存在</h1>"
        else:
            with open("templates/change.html", "r", encoding="utf-8") as file:
                return file.read()
    def query(self):
        self.user = File.read_file()
        global stu
        global num
        num_list = []
        if self.student["sid"] != "":
            # 请求时有sid
            if self.user: # 查询文件的数据不为空
                for i in range(len(self.user)):
                    if self.student["sid"] == self.user[i]["sid"]:
                        stu = True
                        num =i
                        break
                    else:
                        stuid = False
            else:
                stu = None
            if stu:
                self.user.pop(self.user[num])
                files = File(json.dumps(self.user))
                files.add_file()
                with open("templates/add.html", "r", encoding="utf-8") as file:
                    return file.read()
            else:
                return "学员编号不存在"
        elif self.student["name"] != "":
            # 请求时没有sid有name
            if self.user: # 查询文件的数据不为空
                for i in range(len(self.user)):
                    if self.student["name"] == self.user[i]["name"]:
                        num_list.append(i)
            else: # 查询文件为空
                stu = None
            if len(num_list) >0:
                for i in range(len(numlist)):
                    self.user.remove(self.user[len(num_list[i])])
                    files = FileWork(json.dumps(self.user))
                    files.add_file()
                    with open("templates/query.html", "r", encoding="utf-8") as file:
                        return file.read()
            else:
                return "学员姓名不存在"
        elif self.student["name"] == "" and self.student["sid"] == "":
            # 请求时没有参数
            return "删除时没有提供必要参数"

    def sel_user(self):
        global num, stu
        num_list = []
        self.user = File.read_file()
        if self.student["sid"] != "":
            if self.user:
                for i in range(len(self.user)):
                    if self.student["sid"] == self.user[i]["sid"]:
                        num = i
                        stu = True
                        break
                    else:
                        stu = False
            else:
                stu = None
            if stu:
                return f"查询的学生信息,编号:{self.user[num]['sid']},姓名:{self.user[num]['name']},年龄:{self.user[num]['age']},性别:{self.user[num]['gender']}"
            elif stu is None or stu == False:
                return "学生信息不存在"
        elif self.student["name"] != "":
            if self.user:
                for i in range(len(self.user)):
                    if self.student["name"] == self.user[i]["name"]:
                        num_list.append(i)
            else:
                stu = None
            if stu is None or num_list == []:
                return "学生信息不存在"
            else:
                for i in range(len(num_list)):
                    return f"学生信息:编号:{self.user[num_list[i]]['sid']},姓名L{self.user[num_list[i]]['name']},年龄:{self.user[num_list[i]]['age']},性别:{self.user[num_list[i]]['geender']}"
        elif self.student["sid"] == "" and self.student["name"] == "":
            for i in range(len(self.user)):
                return f"学生信息:编号:{self.user[i]['sid']},姓名L{self.user[i]['name']},年龄:{self.user[i]['age']},性别:{self.user[i]['geender']}"


    def image_ico(self):
        path = 'images/hogwarts.png'
        with open(path, "rb") as file:
            return file.read()

# 程序入口
if __name__ == '__main__':
    startServer()