Python 实现Ping命令状态检测

简介: ping 是一种因特网包探索器,用于测试网络连接量的程序,Ping是工作在TCP/IP网络体系结构中应用层的一个服务命令,主要是向特定的目的主机发送 ICMP 请求报文,测试目的站是否可达及了解其有关状态,实现Ping方法的这段代码原始版本来源于网络,后经排版封装后实现了一些功能,放在这里收藏之用。

ping 是一种因特网包探索器,用于测试网络连接量的程序,Ping是工作在TCP/IP网络体系结构中应用层的一个服务命令,主要是向特定的目的主机发送 ICMP 请求报文,测试目的站是否可达及了解其有关状态,实现Ping方法的这段代码原始版本来源于网络,后经排版封装后实现了一些功能,放在这里收藏之用。

第一步封装MyPing类,在pycharm下面创建一个MyPing.py文件,详细代码备注如下。

import time, struct
import socket, select

class MyPing():
    # 发送原始套接字
    def raw_socket(self, dst_addr, imcp_packet):
        rawsocket = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket.getprotobyname("icmp"))
        send_request_ping_time = time.time()
        rawsocket.sendto(imcp_packet, (dst_addr, 80))
        return send_request_ping_time, rawsocket

    # 计算校验和
    def chesksum(self, data):
        n = len(data)
        m = n % 2
        sum = 0
        for i in range(0, n - m, 2):
            sum += (data[i]) + ((data[i + 1]) << 8)
            sum = (sum >> 16) + (sum & 0xffff)
        if m:
            sum += (data[-1])
            sum = (sum >> 16) + (sum & 0xffff)
        answer = ~sum & 0xffff
        answer = answer >> 8 | (answer << 8 & 0xff00)
        return answer

    # 通过域名获取主机地址
    def get_host_address(self,host):
        dst_addr = socket.gethostbyname(host)
        return dst_addr

    # 接受到数据包
    def request_ping(self, data_type, data_code, data_checksum, data_ID, data_Sequence, payload_body):
        #  把字节打包成二进制数据
        imcp_packet = struct.pack('>BBHHH32s', data_type, data_code, data_checksum, data_ID, data_Sequence,payload_body)
        # 获取校验和
        icmp_chesksum = self.chesksum(imcp_packet)
        #  把校验和传入,再次打包
        imcp_packet = struct.pack('>BBHHH32s', data_type, data_code, icmp_chesksum, data_ID, data_Sequence,payload_body)
        return imcp_packet

    # 相应数据包,解包执行
    def reply_ping(self, send_request_ping_time, rawsocket, data_Sequence, timeout=3):
        while True:
            # 实例化select对象(非阻塞),可读,可写为空,异常为空,超时时间
            what_ready = select.select([rawsocket], [], [], timeout)
            # 等待时间 wait_for_time = (time.time() - started_select)
            wait_for_time = (time.time() - send_request_ping_time)
            # 没有返回可读的内容,判断超时
            if what_ready[0] == []:
                return -1
            # 记录接收时间
            time_received = time.time()
            # 设置接收的包的字节为1024
            received_packet, addr = rawsocket.recvfrom(1024)
            # 获取接收包的icmp头
            icmpHeader = received_packet[20:28]
            # 反转编码
            type, code, r_checksum, packet_id, sequence = struct.unpack(">BBHHH", icmpHeader)
            if type == 0 and sequence == data_Sequence:
                return time_received - send_request_ping_time
            # 数据包的超时时间判断
            timeout = timeout - wait_for_time
            if timeout <= 0:
                return -1

    # 向特定地址发送一条ping命令
    def send_ping(self, address):
        data_type = 8
        data_code = 0
        data_checksum = 0
        data_ID = 0
        data_Sequence = 1
        payload_body = b'abcdefghijklmnopqrstuvwabcdefghi'

        # 请求ping数据包的二进制转换
        icmp_packet = self.request_ping(data_type, data_code, data_checksum, data_ID, data_Sequence, payload_body)
        # 连接套接字,并将数据发送到套接字
        send_request_ping_time, rawsocket = self.raw_socket(address, icmp_packet)
        # 数据包传输时间
        times = self.reply_ping(send_request_ping_time, rawsocket, data_Sequence)
        if times > 0:
            return_time = int(times * 1000)
            return return_time
        else:
            return -1

实现模仿Windows中的ping命令,代码如下:

from MyPing import *

if __name__ == '__main__':
    # 使用Ping方法
    host = "www.lyshark.com"
    ping = MyPing()

    sumtime, shorttime, longtime, avgtime = 0, 1000, 0, 0
    # 8回射请求 11超时 0回射应答
    data_type = 8
    data_code = 0
    # 检验和
    data_checksum = 0
    # ID
    data_ID = 0
    # 序号
    data_Sequence = 1
    # 可选的内容
    payload_body = b'abcdefghijklmnopqrstuvwabcdefghi'
    dst_addr = socket.gethostbyname(host)
    print("正在 Ping {0} [{1}] 具有 32 字节的数据:".format(host, dst_addr))
    # 发送3次
    for i in range(0, 3):
        # 请求ping数据包的二进制转换
        icmp_packet = ping.request_ping(data_type, data_code, data_checksum, data_ID, data_Sequence + i, payload_body)
        # 连接套接字,并将数据发送到套接字
        send_request_ping_time, rawsocket = ping.raw_socket(dst_addr, icmp_packet)
        # 数据包传输时间
        times = ping.reply_ping(send_request_ping_time, rawsocket, data_Sequence + i)
        if times > 0:
            print("来自 {0} 的回复: 字节=32 时间={1}ms".format(dst_addr, int(times * 1000)))
            return_time = int(times * 1000)
            sumtime += return_time
            if return_time > longtime:
                longtime = return_time
            if return_time < shorttime:
                shorttime = return_time
            time.sleep(0.7)
        else:
            print("请求超时")

运行效果如下:

image.png

发送一条ping探测命令,send_ping()主要用于发送一个Ping包,后期我们可以实现一个主机存活探测器,主要调用代码如下:

from MyPing import *

if __name__ == "__main__":
    # 使用Ping方法
    ping = MyPing()

    address = ping.get_host_address("www.lyshark.com")
    print("域名地址: {}".format(address))

    # 对单台主机Ping
    ref = ping.send_ping(address)
    if( ref != -1):
        print("已经连通, 抖动值: {}".format(ref))

输出测试效果如下:

image.png

如上我们就可以实现对特定主机执行Ping检测了,接下来我们开始编写一个能够计算出特定范围内主机数的CalculationIP()方法,并通过开线程实现一个批量主机存活探测器.

from MyPing import *

def CalculationIP(Addr_Count):
    ret = []
    try:
        IP_Start = str(Addr_Count.split("/")[0]).split(".")
        IP_Heads = str(IP_Start[0] + "." + IP_Start[1] + "." + IP_Start[2] +".")
        IP_Start_Range = int(Addr_Count.split(".")[3].split("/")[0])
        IP_End_Range = int(Addr_Count.split("/")[1])
        for item in range(IP_Start_Range,IP_End_Range+1):
            ret.append(IP_Heads+str(item))
        return ret
    except Exception:
        return 0

if __name__ == "__main__":
    ret = CalculationIP("192.168.1.1/254")

    for each in ret:
        ping = MyPing()
        ref = ping.send_ping(each)
        print("地址: {} ---> 抖动值: {}".format(each,ref))

输出效果如下:

image.png


参考文献:https://blog.csdn.net/Small_Teenager/article/details/122123299

目录
相关文章
|
3月前
|
缓存 监控 Python
在Python中,如何检测和处理内存泄漏?
【2月更文挑战第7天】【2月更文挑战第18篇】在Python中,如何检测和处理内存泄漏?
|
2月前
|
网络协议 Shell Linux
【Shell 命令集合 网络通讯 】⭐⭐⭐Linux 测试与目标主机之间的网络连接ping 命令 使用指南
【Shell 命令集合 网络通讯 】⭐⭐⭐Linux 测试与目标主机之间的网络连接ping 命令 使用指南
43 1
|
3月前
|
监控 安全 自动驾驶
基于python的室内老人实时摔倒智能监测系统-跌倒检测系统(康复训练检测+代码)
基于python的室内老人实时摔倒智能监测系统-跌倒检测系统(康复训练检测+代码)
87 1
|
3月前
|
数据可视化 数据挖掘 调度
【Python数据挖掘】优化电能能源策略:基于非侵入式负荷检测与分解的智能解决方案
【Python数据挖掘】优化电能能源策略:基于非侵入式负荷检测与分解的智能解决方案
37 0
|
4月前
|
Python
Python 递归检测文件夹下的文件
Python 递归检测文件夹下的文件
23 0
|
3天前
|
机器学习/深度学习 算法 数据可视化
Python用KNN(K-近邻)回归、分类、异常值检测预测房价、最优K值选取、误差评估可视化
Python用KNN(K-近邻)回归、分类、异常值检测预测房价、最优K值选取、误差评估可视化
|
4天前
|
弹性计算 运维 Shell
设置Python 支持自动命令补齐功能
【4月更文挑战第29天】
6 0
|
4天前
|
弹性计算 运维 Shell
设置 Python 支持自动命令补齐功能
【4月更文挑战第29天】
5 1
|
5天前
|
数据采集 关系型数据库 BI
Python路面平整度检测车辆数据——速度修正
Python路面平整度检测车辆数据——速度修正
12 0
|
5天前
|
安全 网络协议 Linux
【专栏】一文教你玩转 Linux 的 ping 命令,从此成为 Linux 网络高手
【4月更文挑战第28天】本文详细介绍了Linux系统中ping命令的使用,包括其基本语法、输出信息、常用参数及高级用法。通过ping,用户可测试网络连通性、诊断故障及评估性能。此外,文章还讨论了ping在不同协议、模拟网络环境及与其他命令结合使用时的场景。注意防火墙和网络环境可能影响ping结果,理解错误信息有助于网络问题排查。熟练掌握ping命令,能助你成为Linux网络专家。不断学习和实践,提升网络技能,为构建稳定网络环境贡献力量。