不同网络架构下文件下载服务协议设计

Keywords: #技术 #Python #Socket
Table of Contents

实验目的

  1. 熟悉不同网络架构的特点。
  2. 掌握不同网络架构下的文件下载服务协议设计。

实验内容

使用TCP协议设计一个文件下载服务协议,客户端发送要下载的文件路径给服务器,服务器将对应的文件内容送给客户端,客户端将文件存储到本地磁盘。注意,当文件不存在时给出提示。要求,服务的实现分别采用以下三种方法实现:

(1)单线程,迭代服务器(依次服务每一个客户端)

(2)多线程,并发服务器

(3)异步方式 asyncio库

例如: $ python download_server.py [-h] [-p port] host $ python download_client.py [-h] [-p port] host remote_file local_file 完成将服务器的文件remote_file下载到本地,命名为local_file

程序设计思路

网络应用拓扑结构

一对多的网络结构,即一台服务器对应多台客户端的星型拓扑结构

网络拓扑结构

应用层协议设计

客户端连接到服务端,服务端监听到客户端的连接。

客户端 –> 需要下载的文件路径 –> 服务端

客户端 <– 文件是否存在 <– 服务端

不存在则双方退出协议,存在则继续

客户端 –> 确认文件存在消息 –> 服务端

客户端 <– 文件头部信息长度 <– 服务端

客户端 –> 确认头部长度消息 –> 服务端

客户端 <– 文件头部信息 <– 服务端

客户端 –> 确认头部信息消息 –> 服务端

客户端 <– 文件内容数据 <– 服务端

双方退出协议部分

所选用的Python库介绍

import socket # 套接字

import sys # 系统操作

import zipfile # 压缩文件

import struct # 字符串打包解包

import json # json序列化

from tqdm import tqdm # 进度条

import threading # 多线程

import asyncio # 异步IO

import aiofiles # 异步文件IO

import selectors # selectors模块单线程异步轮询

程序源代码

服务器端源码

单线程服务端:

import socket
import sys
import os
import zipfile  # 压缩文件
import json  # json序列化
import struct  # 字符串打包解包
 
 
def handle_client(client_socket, client_address):
    flag = False  # 是否为文件夹
    # 接收客户端发送的文件路径
    file_path = client_socket.recv(1024).decode()
    # 判断文件或者文件夹是否存在
    if os.path.exists(file_path):
        print(f"[+] 发送 {file_path}{client_address}")
        # 发送文件存在的确认信息
        client_socket.send("file_exist".encode())
        # 接收客户端的确认信息
        rev = client_socket.recv(1024).decode()
        if rev != 'start_send_file':
            print(f"[-] {client_address} 取消发送 {file_path}")
            return
        # 如果是文件夹,就压缩打包成zip文件
        if os.path.isdir(file_path):
            zip_file_path = file_path + '.zip'
            zip_file = zipfile.ZipFile(zip_file_path, 'w', zipfile.ZIP_DEFLATED)
            for root, dirs, files in os.walk(file_path):
                for file in files:
                    zip_file.write(os.path.join(root, file))
            zip_file.close()
            flag = True  # 是否为文件夹
            file_path = zip_file_path
        # 发送文件头部信息{文件名,文件大小,文件类型}
        file_name = os.path.basename(file_path)
        file_size = os.path.getsize(file_path)
        file_type = os.path.splitext(file_path)[1]
        file_header = {
            'file_name': file_name,
            'file_size': file_size,
            'file_type': file_type
        }
        # json序列化
        file_header_json = json.dumps(file_header)
        file_header_bytes = file_header_json.encode()
        # 将文件头部信息的长度打包成固定长度的字符串
        header_size = struct.pack('i', len(file_header_bytes))
        # 先发送文件头部信息的长度
        client_socket.send(header_size)
        # 接收客户端的确认信息
        rev = client_socket.recv(1024)
        if rev.decode() != 'header_size_ok':  # 如果客户端确认信息不是header_size_ok
            print(f"[-] {client_address} 取消发送 {file_path}")
            return
        # 再发送文件头部信息
        client_socket.send(file_header_bytes)
        # 接收客户端的确认信息
        rev = client_socket.recv(1024)
        if rev.decode() == 'header_ok':  # 如果客户端确认信息不是header_ok
            # 发送文件内容
            print(f'[*] 正在发送文件 {file_name} 内容...')
            send_file(client_socket, file_path, file_size)
        # 删除文件夹压缩包
        if flag:
            os.remove(file_path)
    else:
        client_socket.send("FILE NOT FOUND".encode())
        print(f"[-] {file_path} 文件不存在")
 
 
def send_file(client_socket, file_path, file_size):
    try:
        with open(file_path, 'rb') as file:
            # 显示tqdm进度条
            for _ in range(file_size // 1024):
                file_data = file.read(1024)
                client_socket.send(file_data)
    except Exception as e:
        print(f"[-] {file_path} 文件发送失败\n")
    else:
        print(f"[+] {file_path} 文件发送成功\n")
 
 
 
def main():
    global client_socket
    if sys.argv[1] == '-h':
        help_message = '''
[+] 用法:
    python server.py [-h] [-P port]
 
[+] 参数说明:
    -h: 查看帮助信息
    -P: 本地服务器监听端口
    '''
        print(help_message)
        return
    elif len(sys.argv) != 3:
        print('[-] 参数错误!输入-h查看帮助信息')
        return
    elif sys.argv[1] != '-P':
        print('[-] 参数错误!输入-h查看帮助信息')
        return
    host = '127.0.0.1'
    port = sys.argv[2]
    try:
        port = int(port)
    except Exception as e:
        print('[-] 端口号必须为整数!')
        print(e)
        return
 
    server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    server.bind((host, port))
    server.listen(5)
 
    print(f"[*] 服务器正在监听:{host}:{port}")
 
    while True:
        try:
            # 接收客户端连接
            client_socket, client_address = server.accept()
            print(f"\n[+] 接收到客户端连接:{client_address}")
            # 处理客户端请求
            handle_client(client_socket, client_address)
        except KeyboardInterrupt:
            print("\n[-] 服务器已停止运行")
            break
    # 关闭客户端连接
    client_socket.close()
 
if __name__ == "__main__":
    main()

多线程服务端:

import socket
import sys
import os
import zipfile  # 压缩文件
import json  # json序列化
import struct  # 字符串打包解包
import threading
 
 
def handle_client(client_socket, client_address):
    flag = False  # 是否为文件夹
    # 接收客户端发送的文件路径
    file_path = client_socket.recv(1024).decode()
    # 判断文件或者文件夹是否存在
    if os.path.exists(file_path):
        print(f"[+] 发送 {file_path}{client_address}")
        # 发送文件存在的确认信息
        client_socket.send("file_exist".encode())
        # 接收客户端的确认信息
        rev = client_socket.recv(1024).decode()
        if rev != 'start_send_file':
            print(f"[-] {client_address} 取消发送 {file_path}")
            return
        # 如果是文件夹,就压缩打包成zip文件
        if os.path.isdir(file_path):
            zip_file_path = file_path + '.zip'
            zip_file = zipfile.ZipFile(zip_file_path, 'w', zipfile.ZIP_DEFLATED)
            for root, dirs, files in os.walk(file_path):
                for file in files:
                    zip_file.write(os.path.join(root, file))
            zip_file.close()
            flag = True  # 是否为文件夹
            file_path = zip_file_path
        # 发送文件头部信息{文件名,文件大小,文件类型}
        file_name = os.path.basename(file_path)
        file_size = os.path.getsize(file_path)
        file_type = os.path.splitext(file_path)[1]
        file_header = {
            'file_name': file_name,
            'file_size': file_size,
            'file_type': file_type
        }
        # json序列化
        file_header_json = json.dumps(file_header)
        file_header_bytes = file_header_json.encode()
        # 将文件头部信息的长度打包成固定长度的字符串
        header_size = struct.pack('i', len(file_header_bytes))
        # 先发送文件头部信息的长度
        client_socket.send(header_size)
        # 接收客户端的确认信息
        rev = client_socket.recv(1024)
        if rev.decode() != 'header_size_ok':  # 如果客户端确认信息不是header_size_ok
            print(f"[-] {client_address} 取消发送 {file_path}")
            return
        # 再发送文件头部信息
        client_socket.send(file_header_bytes)
        # 接收客户端的确认信息
        rev = client_socket.recv(1024)
        if rev.decode() == 'header_ok':  # 如果客户端确认信息不是header_ok
            # 发送文件内容
            print(f'[*] 正在发送文件 {file_name} 内容...')
            send_file(client_socket, file_path, file_size)
        # 删除文件夹压缩包
        if flag:
            os.remove(file_path)
    else:
        client_socket.send("FILE NOT FOUND".encode())
        print(f"[-] {file_path} 文件不存在")
 
 
def send_file(client_socket, file_path, file_size):
    try:
        with open(file_path, 'rb') as file:
            # 显示tqdm进度条
            for _ in range(file_size // 1024):
                file_data = file.read(1024)
                client_socket.send(file_data)
    except Exception as e:
        print(f"[-] {file_path} 文件发送失败")
    else:
        print(f"[+] {file_path} 文件发送成功")
 
 
 
def main():
    global client_socket
    if sys.argv[1] == '-h':
        help_message = '''
[+] 用法:
    python server.py [-h] [-P port]
 
[+] 参数说明:
    -h: 查看帮助信息
    -P: 本地服务器监听端口
    '''
        print(help_message)
        return
    elif len(sys.argv) != 3:
        print('[-] 参数错误!输入-h查看帮助信息')
        return
    elif sys.argv[1] != '-P':
        print('[-] 参数错误!输入-h查看帮助信息')
        return
    host = '127.0.0.1'
    port = sys.argv[2]
    try:
        port = int(port)
    except Exception as e:
        print('[-] 端口号必须为整数!')
        print(e)
        return
 
    server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    server.bind((host, port))
    server.listen(5)
 
    print(f"[*] 服务器正在监听:{host}:{port}")
 
    while True:
        try:
            # 接收客户端连接
            client_socket, client_address = server.accept()
            print(f"[+] 接收到客户端连接:{client_address}")
            # 多线程处理客户端请求
            # handle_client(client_socket, client_address)
            client_thread = threading.Thread(target=handle_client, args=(client_socket, client_address))
            client_thread.start()
        except KeyboardInterrupt:
            print("\n[-] 服务器已停止运行")
            break
    # 关闭客户端连接
    client_socket.close()
 
if __name__ == "__main__":
    main()

asyncio异步服务端:

import sys
import os
import zipfile  # 压缩文件
import json  # json序列化
import struct  # 字符串打包解包
import asyncio  # 异步IO socket
import aiofiles  # 异步文件IO
 
 
async def handle_client(reader, writer):
    # 获取客户端信息
    client_address = writer.get_extra_info('peername')
    print(f"\n[+] 接收到客户端连接:{client_address}")
    flag = False  # 是否为文件夹
    # 接收客户端发送的文件路径
    file_path = await asyncio.wait_for(reader.read(1024), None)
    file_path = file_path.decode()
    # 判断文件或者文件夹是否存在
    if os.path.exists(file_path):
        print(f"[+] 发送 {file_path}{client_address}")
        # 发送文件存在的确认信息
        writer.write("file_exist".encode())
        await writer.drain()  # 清空套接字 刷新缓冲区
        # 接收客户端的确认信息
        rev = await asyncio.wait_for(reader.read(1024), None)
        if rev.decode() != 'start_send_file':
            print(f"[-] {client_address} 取消发送 {file_path}")
            return
        # 如果是文件夹,就压缩打包成zip文件
        if os.path.isdir(file_path):
            file_path = await zip_file(file_path)
        # 发送文件头部信息{文件名,文件大小,文件类型}
        file_name = os.path.basename(file_path)
        file_size = os.path.getsize(file_path)
        file_type = os.path.splitext(file_path)[1]
        file_header = {
            'file_name': file_name,
            'file_size': file_size,
            'file_type': file_type
        }
        # json序列化
        file_header_json = json.dumps(file_header)
        file_header_bytes = file_header_json.encode()
        # 将文件头部信息的长度打包成固定长度的字符串
        header_size = struct.pack('i', len(file_header_bytes))
        # 先发送文件头部信息的长度
        writer.write(header_size)
        await writer.drain()  # 清空套接字 刷新缓冲区
        # 接收客户端的确认信息
        rev = await asyncio.wait_for(reader.read(1024), None)
        if rev.decode() != 'header_size_ok':  # 如果客户端确认信息不是header_size_ok
            print(f"[-] {client_address} 取消发送 {file_path}")
            return
        # 再发送文件头部信息
        writer.write(file_header_bytes)
        await writer.drain()  # 清空套接字 刷新缓冲区
        # 接收客户端的确认信息
        rev = await asyncio.wait_for(reader.read(1024), None)
        if rev.decode() == 'header_ok':  # 如果客户端确认信息不是header_ok
            # 发送文件内容
            print(f'[*] 正在发送文件 {file_name} 内容...')
            # send_file(client_socket, file_path, file_size)
            # 异步发送文件内容
            await send_file(writer, file_path, file_size)
        # 删除文件夹压缩包
        if flag:
            os.remove(file_path)
    else:
        client_socket.send("FILE NOT FOUND".encode())
        print(f"[-] {file_path} 文件不存在")
    await writer.drain()  # 清理 刷新缓冲区
    writer.close()  # 关闭连接
 
 
async def zip_file(file_path):
    zip_file_path = file_path + '.zip'
    zip_file = zipfile.ZipFile(zip_file_path, 'w', zipfile.ZIP_DEFLATED)
    for root, dirs, files in os.walk(file_path):
        for file in files:
            zip_file.write(os.path.join(root, file))
    zip_file.close()
    flag = True  # 是否为文件夹
    return zip_file_path
 
 
async def send_file(writer, file_path, file_size):
    try:
        async with aiofiles.open(file_path, 'rb') as file:
            # 显示tqdm进度条
            for _ in range(file_size // 1024):
                file_data = await file.read(1024)
                writer.write(file_data)
    except Exception as e:
        print(f"[-] {file_path} 文件发送失败\n")
    else:
        print(f"[+] {file_path} 文件发送成功\n")
 
 
async def main():
    global client_socket
    if sys.argv[1] == '-h':
        help_message = '''
[+] 用法:
    python server.py [-h] [-P port]
 
[+] 参数说明:
    -h: 查看帮助信息
    -P: 本地服务器监听端口
    '''
        print(help_message)
        return
    elif len(sys.argv) != 3:
        print('[-] 参数错误!输入-h查看帮助信息')
        return
    elif sys.argv[1] != '-P':
        print('[-] 参数错误!输入-h查看帮助信息')
        return
    host = '127.0.0.1'
    port = sys.argv[2]
    try:
        port = int(port)
    except Exception as e:
        print('[-] 端口号必须为整数!')
        print(e)
        return
 
    print(f"[*] 服务器正在监听:{host}:{port}")
    # 生成一个asyncio服务器
    server = await asyncio.start_server(handle_client, host, port)
    # 启动服务器 循环接收客户端连接
    async with server:
        await server.serve_forever()
    # 关闭客户端连接
    client_socket.close()
 
 
if __name__ == "__main__":
    try:
        asyncio.run(main())
    except KeyboardInterrupt:
        print("\n[-] 服务器已停止运行")

客户端源码

import socket
import sys
import zipfile  # 压缩文件
import struct  # 字符串打包解包
import json  # json序列化
from tqdm import tqdm  # 进度条
 
def main():
    if sys.argv[1] == '-h':
        help_message = '''
[+] 用法:
    python client.py [-h] [-H host] [-P port] [-r remote_file_path] [-l local_file_path]
    
[+] 参数说明:
    -h: 查看帮助信息
    -H: 远程服务器地址
    -P: 远程服务器端口
    -r: 远程文件路径
    -l: 本地文件路径
    '''
        print(help_message)
        return
    elif len(sys.argv) != 9:
        print('[-] 参数错误!输入-h查看帮助信息')
        return
    elif sys.argv[1] != '-H' or sys.argv[3] != '-P' or sys.argv[5] != '-r' or sys.argv[7] != '-l':
        print('[-] 参数错误!输入-h查看帮助信息')
        return
 
    host = sys.argv[2]
    port = sys.argv[4]
    remote_file_path = sys.argv[6]
    local_file_path = sys.argv[8]
    try:
        port = int(port)
    except Exception as e:
        print('[-] 端口号必须为整数!')
        print(e)
        return
    try:
        client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        client.connect((host, port))
    except Exception as e:
        print('[-] 连接服务器失败!')
        print(e)
        return
 
    # 发送要下载的文件路径
    client.send(remote_file_path.encode())
    # 接收文件是否存在
    rev = client.recv(1024).decode()
    if rev == 'FILE NOT FOUND':
        print(f'[-] {remote_file_path} 文件不存在')
        client.close()
        return
    # 发送确认信息
    client.send('start_send_file'.encode())
    # 接收文件头部长度
    print('[*] 正在接收文件头部信息...')
    rev = client.recv(10)
    file_header_size = struct.unpack('i', rev)[0]
    # 发送确认信息
    client.send('header_size_ok'.encode())
    # 接收文件头部信息
    file_header_bytes = client.recv(file_header_size)
    file_header_json = file_header_bytes.decode()
    file_header = json.loads(file_header_json)
    # 发送确认信息
    client.send('header_ok'.encode())
    print(f'[+] 文件名:{file_header["file_name"]} | 文件大小:{file_header["file_size"]} B')
    # 接收文件内容
    recv_file(client, local_file_path, file_header['file_size'])
 
 
def recv_file(client_socket, local_file_path, file_size):
    try:
        with open(local_file_path, 'wb') as file:
            # 显示tqdm进度条
            for _ in tqdm(range(file_size // 1024), desc="接收中", unit="KB", unit_scale=True):
                file_data = client_socket.recv(1024)
                file.write(file_data)
    except Exception as e:
        print(f"[-] {local_file_path} 文件接收失败")
    else:
        print(f"[+] {local_file_path} 文件接收成功")
 
 
if __name__ == "__main__":
    main()

程序测试方法及测试结果记录

  1. 测试方法

对于单线程的服务端,采用下载单个文件,检测下载过程中,客户端和服务端是否正常,并且能够处理异常情况

对于多线程、异步的服务端,采用同时下载两个或者多个文件的形式,检测下载过程众怒给,客户端和服务端是否正常,并且能够处理多种的异常情况

  1. 测试流程

单线程:

client端使用说明

server端使用说明

client端接收logo.png文件

server端显示的日志信息

多线程:

客户端和服务端使用说明,同单线程的情况

client端接收file.zip大文件的过程

另一client端同时接收logo.png文件成功

server端显示的日志信息

 client端接收大文件file.zip成功

全部client端接收文件成功后server端的日志信息

Asyncio异步:

客户端和服务端使用说明,同单线程的情况

client端接收file.zip文件过程

另一client端同时接收logo.png

server端显示的日志信息

实验分析总结及心得

实验过程中参考的文献:

Python使用Asyncio开发TCP服务器简单案例-CSDN博客

asyncio实现异步socket server - 掘金 (juejin.cn)

异步网络模型 - 老鸟python (birdpython.com)

python异步编程–回调模型(selectors模块) - -零 - 博客园 (cnblogs.com)

总结一下: 多线程的原理就是,在执行一个任务,当这个任务处于阻塞或者IO操作时,可以开启另外一个新的线程去执行另外有一个工作,提高执行效率。但是缺点就是整个线程的阻塞判断和线程切换都是程序自动判断,人为不能干预。

由此引出了协程的概念,协程就是在切换任务的时候,是我们在编程可以指定的。单线程异步就用了这样的思想,asyncio是单进程并发库,不是多线程,也不是多进程,单纯是在一个进程里面异步(切来切去运行),切换的地方用await标记,能够切换的函数用async标记,从而避免程序自动判断切换。

而对于select模型和poll模型的异步思想,是单线程轮询策略,核心就是在循环中,不断的select从任务列表取出可以读和写的fd对象,然后对其进行不同的处理,就是把所有的接收Socket数据和发送Socket数据都存在各自的列表队列里,之后循环每个队列中的每个对象,处理对应的事件操作。poll提供了register、modify、unregister、poll等方法,运行逻辑就是,如果是需要监测的事件,就将其register到poll对象中,然后通过poll方法不断取出就绪的fd,根据fd的事件类型(POLLIN, POLLOUT)对其进行不同的处理。

总的来说,单线程、多线程、asyncio实现的异步都很简单,很快就能写出来,但是select和poll模型就相对来说比较困难,本身的实现格式比较复杂,同时我设计的协议也并不简单,所以利用select模型实现异步还是比较困难的,最后虽然通过简化协议流程实现了这个服务端代码但是实现的效果还是不理想,异步处理的单个事件还是太大了,不知道怎么细化,还是有很大的改进空间的。