[雪峰磁针石博客]python网络作业:使用python的socket库实现ICMP协议的ping

简介:

ICMP ping是您遇到过的最常见的网络扫描类型。 打开命令行提示符或终端并输入ping www.google.com非常容易。

为什么要在python中实现?

  • 很多名牌大学喜欢考试用python的socket库实现ICMP协议的ping
  • 个别环境没有ping

直接上代码:

#!/usr/bin/python3
# -*- coding: utf-8 -*-
# 技术支持:https://www.jianshu.com/u/69f40328d4f0 
# 技术支持 https://china-testing.github.io/
# https://github.com/china-testing/python-api-tesing/blob/master/practices/ping.py
# 讨论钉钉免费群21745728 qq群144081101 567351477
# CreateDate: 2018-11-22

import os 
import argparse 
import socket
import struct
import select
import time


ICMP_ECHO_REQUEST = 8 # Platform specific
DEFAULT_TIMEOUT = 2
DEFAULT_COUNT = 4 


class Pinger(object):
    """ Pings to a host -- the Pythonic way"""

    def __init__(self, target_host, count=DEFAULT_COUNT, timeout=DEFAULT_TIMEOUT):
        self.target_host = target_host
        self.count = count
        self.timeout = timeout


    def do_checksum(self, source_string):
        """  Verify the packet integritity """
        sum = 0
        max_count = (len(source_string)/2)*2
        count = 0
        while count < max_count:

            val = source_string[count + 1]*256 + source_string[count]                   
            sum = sum + val
            sum = sum & 0xffffffff 
            count = count + 2

        if max_count<len(source_string):
            sum = sum + ord(source_string[len(source_string) - 1])
            sum = sum & 0xffffffff 

        sum = (sum >> 16)  +  (sum & 0xffff)
        sum = sum + (sum >> 16)
        answer = ~sum
        answer = answer & 0xffff
        answer = answer >> 8 | (answer << 8 & 0xff00)
        return answer

    def receive_pong(self, sock, ID, timeout):
        """
        Receive ping from the socket.
        """
        time_remaining = timeout
        while True:
            start_time = time.time()
            readable = select.select([sock], [], [], time_remaining)
            time_spent = (time.time() - start_time)
            if readable[0] == []: # Timeout
                return

            time_received = time.time()
            recv_packet, addr = sock.recvfrom(1024)
            icmp_header = recv_packet[20:28]
            type, code, checksum, packet_ID, sequence = struct.unpack(
       "bbHHh", icmp_header
   )
            if packet_ID == ID:
                bytes_In_double = struct.calcsize("d")
                time_sent = struct.unpack("d", recv_packet[28:28 + bytes_In_double])[0]
                return time_received - time_sent

            time_remaining = time_remaining - time_spent
            if time_remaining <= 0:
                return


    def send_ping(self, sock,  ID):
        """
        Send ping to the target host
        """
        target_addr  =  socket.gethostbyname(self.target_host)

        my_checksum = 0

        # Create a dummy heder with a 0 checksum.
        header = struct.pack("bbHHh", ICMP_ECHO_REQUEST, 0, my_checksum, ID, 1)
        bytes_In_double = struct.calcsize("d")
        data = (192 - bytes_In_double) * "Q"
        data = struct.pack("d", time.time()) + bytes(data.encode('utf-8'))

        # Get the checksum on the data and the dummy header.
        my_checksum = self.do_checksum(header + data)
        header = struct.pack(
      "bbHHh", ICMP_ECHO_REQUEST, 0, socket.htons(my_checksum), ID, 1
  )
        packet = header + data
        sock.sendto(packet, (target_addr, 1))


    def ping_once(self):
        """
        Returns the delay (in seconds) or none on timeout.
        """
        icmp = socket.getprotobyname("icmp")
        try:
            sock = socket.socket(socket.AF_INET, socket.SOCK_RAW, icmp)
        except socket.error as e:
            if e.errno == 1:
                # Not superuser, so operation not permitted
                e.msg +=  "ICMP messages can only be sent from root user processes"
                raise socket.error(e.msg)
        except Exception as e:
            print ("Exception: %s" %(e))

        my_ID = os.getpid() & 0xFFFF

        self.send_ping(sock, my_ID)
        delay = self.receive_pong(sock, my_ID, self.timeout)
        sock.close()
        return delay


    def ping(self):
        """
        Run the ping process
        """
        for i in range(self.count):
            print ("Ping to %s..." % self.target_host,)
            try:
                delay  =  self.ping_once()
            except socket.gaierror as e:
                print ("Ping failed. (socket error: '%s')" % e[1])
                break

            if delay  ==  None:
                print ("Ping failed. (timeout within %ssec.)" % self.timeout)
            else:
                delay  =  delay * 1000
                print ("Get pong in %0.4fms" % delay)



if __name__ == '__main__':
    parser = argparse.ArgumentParser(description='Python ping')
    parser.add_argument('host', action="store", help=u'主机名')
    given_args = parser.parse_args()  
    target_host = given_args.host
    pinger = Pinger(target_host=target_host)
    pinger.ping()

参考资料

执行

注意要有root或管理员权限:

# python3 ping.py china-testing.github.io
Ping to china-testing.github.io...
Get pong in 160.7175ms
Ping to china-testing.github.io...
Get pong in 160.8465ms
Ping to china-testing.github.io...
Get pong in 12.0983ms
Ping to china-testing.github.io...
Get pong in 161.3324ms
# python3 ping.py www.so.com
Ping to www.so.com...
Get pong in 29.0303ms
Ping to www.so.com...
Get pong in 28.8599ms
Ping to www.so.com...
Get pong in 28.9860ms
Ping to www.so.com...
Get pong in 29.0167ms
相关文章
|
9月前
|
C++
基于Reactor模型的高性能网络库之地址篇
这段代码定义了一个 InetAddress 类,是 C++ 网络编程中用于封装 IPv4 地址和端口的常见做法。该类的主要作用是方便地表示和操作一个网络地址(IP + 端口)
387 58
|
6月前
|
运维 监控 数据可视化
Python 网络请求架构——统一 SOCKS5 接入与配置管理
通过统一接入端点与标准化认证,集中管理配置、连接策略及监控,实现跨技术栈的一致性网络出口,提升系统稳定性、可维护性与可观测性。
|
9月前
|
机器学习/深度学习 算法 量子技术
GQNN框架:让Python开发者轻松构建量子神经网络
为降低量子神经网络的研发门槛并提升其实用性,本文介绍一个名为GQNN(Generalized Quantum Neural Network)的Python开发框架。
238 4
GQNN框架:让Python开发者轻松构建量子神经网络
|
9月前
|
网络协议 算法 Java
基于Reactor模型的高性能网络库之Tcpserver组件-上层调度器
TcpServer 是一个用于管理 TCP 连接的类,包含成员变量如事件循环(EventLoop)、连接池(ConnectionMap)和回调函数等。其主要功能包括监听新连接、设置线程池、启动服务器及处理连接事件。通过 Acceptor 接收新连接,并使用轮询算法将连接分配给子事件循环(subloop)进行读写操作。调用链从 start() 开始,经由线程池启动和 Acceptor 监听,最终由 TcpConnection 管理具体连接的事件处理。
299 2
|
9月前
基于Reactor模型的高性能网络库之Tcpconnection组件
TcpConnection 由 subLoop 管理 connfd,负责处理具体连接。它封装了连接套接字,通过 Channel 监听可读、可写、关闭、错误等
256 1
|
9月前
|
负载均衡 算法 安全
基于Reactor模式的高性能网络库之线程池组件设计篇
EventLoopThreadPool 是 Reactor 模式中实现“一个主线程 + 多个工作线程”的关键组件,用于高效管理多个 EventLoop 并在多核 CPU 上分担高并发 I/O 压力。通过封装 Thread 类和 EventLoopThread,实现线程创建、管理和事件循环的调度,形成线程池结构。每个 EventLoopThread 管理一个子线程与对应的 EventLoop(subloop),主线程(base loop)通过负载均衡算法将任务派发至各 subloop,从而提升系统性能与并发处理能力。
494 3
|
9月前
基于Reactor模式的高性能网络库github地址
https://github.com/zyi30/reactor-net.git
216 0
|
6月前
|
机器学习/深度学习 大数据 关系型数据库
基于python大数据的青少年网络使用情况分析及预测系统
本研究基于Python大数据技术,构建青少年网络行为分析系统,旨在破解现有防沉迷模式下用户画像模糊、预警滞后等难题。通过整合多平台亿级数据,运用机器学习实现精准行为预测与实时干预,推动数字治理向“数据驱动”转型,为家庭、学校及政府提供科学决策支持,助力青少年健康上网。
|
7月前
|
JavaScript Java 大数据
基于python的网络课程在线学习交流系统
本研究聚焦网络课程在线学习交流系统,从社会、技术、教育三方面探讨其发展背景与意义。系统借助Java、Spring Boot、MySQL、Vue等技术实现,融合云计算、大数据与人工智能,推动教育公平与教学模式创新,具有重要理论价值与实践意义。
|
9月前
基于Reactor模型的高性能网络库之Poller(EpollPoller)组件
封装底层 I/O 多路复用机制(如 epoll)的抽象类 Poller,提供统一接口支持多种实现。Poller 是一个抽象基类,定义了 Channel 管理、事件收集等核心功能,并与 EventLoop 绑定。其子类 EPollPoller 实现了基于 epoll 的具体操作,包括事件等待、Channel 更新和删除等。通过工厂方法可创建默认的 Poller 实例,实现多态调用。
423 60

推荐镜像

更多