Python实战笔记(二) 网络编程

简介: Python实战笔记(二) 网络编程

前言


Python 提供 socket 模块用于访问网络服务,使得不同主机之间的进程可以相互通信


正文


1、Socket 对象的创建


socket.socket([family[, type[, proto]]])


family:套接字家族,一般取值如下:

socket.AF_INET  :不同主机上的网络通信,使用 IPV4 协议

socket.AF_INET6:不同主机上的网络通信,使用 IPV6 协议

socket.AF_UNIX  :同一主机上的进程通信

type:套接字类型,一般取值如下:

socket.SOCK_STREAM:流套接字,用于 TCP 通信

socket.SOCK_DGRAM  :数据报套接字,用于 UDP 通信

socket.SOCK_RAW      :原始套接字,用于处理 ICMP、IGMP 等特殊的网络报文

proto:协议编号,默认为 0


2、Socket 对象的常用方法


(1)服务端的常用方法


bind(address):绑定地址到套接字,address 表示通信地址,格式取决于使用的套接字家族

listen(backlog):监听连接,backlog 表示在拒绝连接前可以挂起的最大连接数

accept():接受连接,返回 (conn, address),conn 是新的 Socket 对象,用来接收和发送数据


(2)客户端的常用方法


connect(address):连接到指定地址的套接字,如果连接出错,返回 socket.error

connect_ex(address):连接到指定地址的套接字,如果连接出错,返回 错误编码


(3)公共用途常用方法


close():关闭套接字


recv(bufsize[,flag]):接收数据,一般用于 TCP 协议,返回接收到的数据


recvfrom(bufsize[,flag]):接收数据,一般用于 UDP 协议,返回接收到的数据和发送方的地址


send(data[,flag]):发送数据,一般用于 TCP 协议,返回发送的字节数


sendto(data[,flag],address):发送数据,一般用于 UDP 协议,返回发送的字节数


getsockname():返回连接套接字的己方地址


getpeername():返回连接套接字的远程地址


getsockopt(level,optname):获取套接字选项


setsockopt(level,optname,value):设置套接字选项


gettimeout():获取套接字的超时值,如果没有设置超时时间,返回 None


settimeout(timeout):设置套接字的超时值,timeout 表示超时时间,单位为秒


3、例子


(1)客户端传输数据到服务端

# server.py
import socket
import threading
def create_service():
    server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    address = ('127.0.0.1', 8888)
    server.bind(address)
    server.listen(5)
    print('Waiting connection ...')
    while True:
        conn, addr = server.accept()
        thread = threading.Thread(target = handle_request, args = (conn, addr))
        thread.start()
    server.close()
def handle_request(conn, addr):
    print('Accept connection from', addr)
    while True:
        recv_byte = conn.recv(1024)
        recv_data = recv_byte.decode('UTF-8')
        print(recv_data)
        if recv_data == 'quit': break
    conn.close()
if __name__ == '__main__':
    create_service()


# client.py
import socket
client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
address = ('127.0.0.1', 8888)
client.connect(address)
print('Connect to', address)
while True:
    send_data = input('Please Enter Something: ')
    send_byte = send_data.encode('UTF-8')
    client.send(send_byte)
    if send_data == 'quit': break
client.close()


(2)客户端从服务端下载文件

# server.py
import socket
import threading
import os
import json
def create_service():
    server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    address = ('127.0.0.1', 8888)
    server.bind(address)
    server.listen(5)
    print('Waiting connection ...')
    while True:
        conn, addr = server.accept()
        thread = threading.Thread(target = handle_request, args = (conn, addr))
        thread.start()
    server.close()
def handle_request(conn, addr):
    print('Accept connection from', addr)
    while True:
        file_name = conn.recv(1024).decode('UTF-8')
        print('Request File:', file_name)
        if not os.path.exists(file_name):
            message = 'Not Exist'
            print(message)
            conn.send(message.encode('UTF-8'))
        else:
            real_name = os.path.split(file_name)[1]
            file_size = os.path.getsize(file_name)
            file_info = json.dumps({
                'real_name': real_name,
                'file_size': file_size
            })
            conn.send(file_info.encode('UTF-8'))
            conn.recv(1024)
            print('Send File ...')
            with open(file_name, 'rb') as file:
                for line in file:
                    conn.send(line)
                print('Send File Successful')
    conn.close()
if __name__ == '__main__':
    create_service()


# client.py
import socket
import os
import json
client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
address = ('127.0.0.1', 8888)
client.connect(address)
print('Connect to', address)
while True:
    file_name = input('Please Enter File Name: ')
    client.send(bytes(file_name, encoding='UTF-8'))
    recv_data = client.recv(1024).decode('UTF-8')
    if recv_data == 'Not Exist':
        print(recv_data)
    else:
        file_info = json.loads(recv_data)
        real_name = file_info['real_name']
        file_size = file_info['file_size']
        client.send(bytes('Received', encoding='UTF-8'))
        print('Receive File ...')
        recv_size = 0
        with open(real_name, 'wb') as file:
            while recv_size < file_size:
                chunk = client.recv(1024)
                file.write(chunk)
                recv_size += len(chunk)
            print('Receive File Successful')
client.close()


(3)客户端上传文件到服务端

# server.py
import socket
import threading
import os
import json
def create_service():
    server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    address = ('127.0.0.1', 8888)
    server.bind(address)
    server.listen(5)
    print('Waiting connection ...')
    while True:
        conn, addr = server.accept()
        thread = threading.Thread(target = handle_request, args = (conn, addr))
        thread.start()
    server.close()
def handle_request(conn, addr):
    print('Accept connection from', addr)
    while True:
        recv_data = conn.recv(1024).decode('UTF-8')
        file_info = json.loads(recv_data)
        real_name = file_info['real_name']
        file_size = file_info['file_size']
        conn.send(bytes('Received', encoding='UTF-8'))
        print('Receive File ...')
        recv_size = 0
        with open(real_name, 'wb') as file:
            while recv_size < file_size:
                chunk = conn.recv(1024)
                file.write(chunk)
                recv_size += len(chunk)
            print('Receive File Successful')
    conn.close()
if __name__ == '__main__':
    create_service()
# client.py
import socket
import os
import json
client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
address = ('127.0.0.1', 8888)
client.connect(address)
print('Connect to', address)
while True:
    file_name = input('Please Enter File Name: ')
    if not os.path.exists(file_name):
        print('Not Exist')
        continue
    real_name = os.path.split(file_name)[1]
    file_size = os.path.getsize(file_name)
    file_info = json.dumps({
        'real_name': real_name,
        'file_size': file_size
    })
    client.send(file_info.encode('UTF-8'))
    client.recv(1024)
    print('Send File ...')
    with open(file_name, 'rb') as file:
        for line in file:
            client.send(line)
        print('Send File Successful')
client.close()

文章知识点与官方知识档案匹配,可进一步学习相关知识

目录
相关文章
|
3天前
|
存储 设计模式 算法
|
4天前
|
API 数据库 数据安全/隐私保护
Flask框架在Python面试中的应用与实战
【4月更文挑战第18天】Django REST framework (DRF) 是用于构建Web API的强力工具,尤其适合Django应用。本文深入讨论DRF面试常见问题,包括视图、序列化、路由、权限控制、分页过滤排序及错误处理。同时,强调了易错点如序列化器验证、权限认证配置、API版本管理、性能优化和响应格式统一,并提供实战代码示例。了解这些知识点有助于在Python面试中展现优秀的Web服务开发能力。
22 1
|
1天前
|
存储 网络协议 关系型数据库
Python从入门到精通:2.3.2数据库操作与网络编程——学习socket编程,实现简单的TCP/UDP通信
Python从入门到精通:2.3.2数据库操作与网络编程——学习socket编程,实现简单的TCP/UDP通信
|
1天前
|
机器学习/深度学习 算法 Python
Python用RNN神经网络:LSTM、GRU、回归和ARIMA对COVID19新冠疫情人数时间序列预测
Python用RNN神经网络:LSTM、GRU、回归和ARIMA对COVID19新冠疫情人数时间序列预测
42 12
|
2天前
|
人工智能 Python
【AI大模型应用开发】【LangChain系列】实战案例1:用LangChain写Python代码并执行来生成答案
【AI大模型应用开发】【LangChain系列】实战案例1:用LangChain写Python代码并执行来生成答案
6 0
|
2天前
|
机器学习/深度学习 算法 算法框架/工具
数据分享|PYTHON用KERAS的LSTM神经网络进行时间序列预测天然气价格例子
数据分享|PYTHON用KERAS的LSTM神经网络进行时间序列预测天然气价格例子
22 0
|
3天前
|
存储 索引 Python
|
4天前
|
SQL 中间件 API
Flask框架在Python面试中的应用与实战
【4月更文挑战第18天】**Flask是Python的轻量级Web框架,以其简洁API和强大扩展性受欢迎。本文深入探讨了面试中关于Flask的常见问题,包括路由、Jinja2模板、数据库操作、中间件和错误处理。同时,提到了易错点,如路由冲突、模板安全、SQL注入,以及请求上下文管理。通过实例代码展示了如何创建和管理数据库、使用表单以及处理请求。掌握这些知识将有助于在面试中展现Flask技能。**
12 1
Flask框架在Python面试中的应用与实战
|
6天前
|
SQL 关系型数据库 MySQL
Python与MySQL数据库交互:面试实战
【4月更文挑战第16天】本文介绍了Python与MySQL交互的面试重点,包括使用`mysql-connector-python`或`pymysql`连接数据库、执行SQL查询、异常处理、防止SQL注入、事务管理和ORM框架。易错点包括忘记关闭连接、忽视异常处理、硬编码SQL、忽略事务及过度依赖低效查询。通过理解这些问题和提供策略,可提升面试表现。
25 6
|
7天前
|
机器学习/深度学习 Python
Python用LSTM长短期记忆神经网络对不稳定降雨量时间序列进行预测分析
Python用LSTM长短期记忆神经网络对不稳定降雨量时间序列进行预测分析
18 0

热门文章

最新文章