[雪峰磁针石博客]python网络作业:使用python的socket库实现ICMP协议的ping

简介:

ICMP ping是您遇到过的最常见的网络扫描类型。 打开命令行提示符或终端并输入ping www.google.com非常容易。

为什么要在python中实现?

  • 很多名牌大学喜欢考试用python的socket库实现ICMP协议的ping
  • 个别环境没有ping

直接上代码:

#!/usr/bin/python3
# -*- coding: utf-8 -*-
# 技术支持:https://www.jianshu.com/u/69f40328d4f0 
# 技术支持 https://china-testing.github.io/
# https://github.com/china-testing/python-api-tesing/blob/master/practices/ping.py
# 讨论钉钉免费群21745728 qq群144081101 567351477
# CreateDate: 2018-11-22

import os 
import argparse 
import socket
import struct
import select
import time


ICMP_ECHO_REQUEST = 8 # Platform specific
DEFAULT_TIMEOUT = 2
DEFAULT_COUNT = 4 


class Pinger(object):
    """ Pings to a host -- the Pythonic way"""

    def __init__(self, target_host, count=DEFAULT_COUNT, timeout=DEFAULT_TIMEOUT):
        self.target_host = target_host
        self.count = count
        self.timeout = timeout


    def do_checksum(self, source_string):
        """  Verify the packet integritity """
        sum = 0
        max_count = (len(source_string)/2)*2
        count = 0
        while count < max_count:

            val = source_string[count + 1]*256 + source_string[count]                   
            sum = sum + val
            sum = sum & 0xffffffff 
            count = count + 2

        if max_count<len(source_string):
            sum = sum + ord(source_string[len(source_string) - 1])
            sum = sum & 0xffffffff 

        sum = (sum >> 16)  +  (sum & 0xffff)
        sum = sum + (sum >> 16)
        answer = ~sum
        answer = answer & 0xffff
        answer = answer >> 8 | (answer << 8 & 0xff00)
        return answer

    def receive_pong(self, sock, ID, timeout):
        """
        Receive ping from the socket.
        """
        time_remaining = timeout
        while True:
            start_time = time.time()
            readable = select.select([sock], [], [], time_remaining)
            time_spent = (time.time() - start_time)
            if readable[0] == []: # Timeout
                return

            time_received = time.time()
            recv_packet, addr = sock.recvfrom(1024)
            icmp_header = recv_packet[20:28]
            type, code, checksum, packet_ID, sequence = struct.unpack(
       "bbHHh", icmp_header
   )
            if packet_ID == ID:
                bytes_In_double = struct.calcsize("d")
                time_sent = struct.unpack("d", recv_packet[28:28 + bytes_In_double])[0]
                return time_received - time_sent

            time_remaining = time_remaining - time_spent
            if time_remaining <= 0:
                return


    def send_ping(self, sock,  ID):
        """
        Send ping to the target host
        """
        target_addr  =  socket.gethostbyname(self.target_host)

        my_checksum = 0

        # Create a dummy heder with a 0 checksum.
        header = struct.pack("bbHHh", ICMP_ECHO_REQUEST, 0, my_checksum, ID, 1)
        bytes_In_double = struct.calcsize("d")
        data = (192 - bytes_In_double) * "Q"
        data = struct.pack("d", time.time()) + bytes(data.encode('utf-8'))

        # Get the checksum on the data and the dummy header.
        my_checksum = self.do_checksum(header + data)
        header = struct.pack(
      "bbHHh", ICMP_ECHO_REQUEST, 0, socket.htons(my_checksum), ID, 1
  )
        packet = header + data
        sock.sendto(packet, (target_addr, 1))


    def ping_once(self):
        """
        Returns the delay (in seconds) or none on timeout.
        """
        icmp = socket.getprotobyname("icmp")
        try:
            sock = socket.socket(socket.AF_INET, socket.SOCK_RAW, icmp)
        except socket.error as e:
            if e.errno == 1:
                # Not superuser, so operation not permitted
                e.msg +=  "ICMP messages can only be sent from root user processes"
                raise socket.error(e.msg)
        except Exception as e:
            print ("Exception: %s" %(e))

        my_ID = os.getpid() & 0xFFFF

        self.send_ping(sock, my_ID)
        delay = self.receive_pong(sock, my_ID, self.timeout)
        sock.close()
        return delay


    def ping(self):
        """
        Run the ping process
        """
        for i in range(self.count):
            print ("Ping to %s..." % self.target_host,)
            try:
                delay  =  self.ping_once()
            except socket.gaierror as e:
                print ("Ping failed. (socket error: '%s')" % e[1])
                break

            if delay  ==  None:
                print ("Ping failed. (timeout within %ssec.)" % self.timeout)
            else:
                delay  =  delay * 1000
                print ("Get pong in %0.4fms" % delay)



if __name__ == '__main__':
    parser = argparse.ArgumentParser(description='Python ping')
    parser.add_argument('host', action="store", help=u'主机名')
    given_args = parser.parse_args()  
    target_host = given_args.host
    pinger = Pinger(target_host=target_host)
    pinger.ping()

参考资料

执行

注意要有root或管理员权限:

# python3 ping.py china-testing.github.io
Ping to china-testing.github.io...
Get pong in 160.7175ms
Ping to china-testing.github.io...
Get pong in 160.8465ms
Ping to china-testing.github.io...
Get pong in 12.0983ms
Ping to china-testing.github.io...
Get pong in 161.3324ms
# python3 ping.py www.so.com
Ping to www.so.com...
Get pong in 29.0303ms
Ping to www.so.com...
Get pong in 28.8599ms
Ping to www.so.com...
Get pong in 28.9860ms
Ping to www.so.com...
Get pong in 29.0167ms
相关文章
|
2月前
|
运维 监控 数据可视化
Python 网络请求架构——统一 SOCKS5 接入与配置管理
通过统一接入端点与标准化认证,集中管理配置、连接策略及监控,实现跨技术栈的一致性网络出口,提升系统稳定性、可维护性与可观测性。
|
3月前
|
存储 人工智能 测试技术
如何使用LangChain的Python库结合DeepSeek进行多轮次对话?
本文介绍如何使用LangChain结合DeepSeek实现多轮对话,测开人员可借此自动生成测试用例,提升自动化测试效率。
453 125
如何使用LangChain的Python库结合DeepSeek进行多轮次对话?
|
3月前
|
监控 数据可视化 数据挖掘
Python Rich库使用指南:打造更美观的命令行应用
Rich库是Python的终端美化利器,支持彩色文本、智能表格、动态进度条和语法高亮,大幅提升命令行应用的可视化效果与用户体验。
216 0
|
2月前
|
数据可视化 关系型数据库 MySQL
【可视化大屏】全流程讲解用python的pyecharts库实现拖拽可视化大屏的背后原理,简单粗暴!
本文详解基于Python的电影TOP250数据可视化大屏开发全流程,涵盖爬虫、数据存储、分析及可视化。使用requests+BeautifulSoup爬取数据,pandas存入MySQL,pyecharts实现柱状图、饼图、词云图、散点图等多种图表,并通过Page组件拖拽布局组合成大屏,支持多种主题切换,附完整源码与视频讲解。
223 4
【可视化大屏】全流程讲解用python的pyecharts库实现拖拽可视化大屏的背后原理,简单粗暴!
|
2月前
|
传感器 运维 前端开发
Python离群值检测实战:使用distfit库实现基于分布拟合的异常检测
本文解析异常(anomaly)与新颖性(novelty)检测的本质差异,结合distfit库演示基于概率密度拟合的单变量无监督异常检测方法,涵盖全局、上下文与集体离群值识别,助力构建高可解释性模型。
306 10
Python离群值检测实战:使用distfit库实现基于分布拟合的异常检测
|
2月前
|
机器学习/深度学习 大数据 关系型数据库
基于python大数据的青少年网络使用情况分析及预测系统
本研究基于Python大数据技术,构建青少年网络行为分析系统,旨在破解现有防沉迷模式下用户画像模糊、预警滞后等难题。通过整合多平台亿级数据,运用机器学习实现精准行为预测与实时干预,推动数字治理向“数据驱动”转型,为家庭、学校及政府提供科学决策支持,助力青少年健康上网。
|
3月前
|
JavaScript Java 大数据
基于python的网络课程在线学习交流系统
本研究聚焦网络课程在线学习交流系统,从社会、技术、教育三方面探讨其发展背景与意义。系统借助Java、Spring Boot、MySQL、Vue等技术实现,融合云计算、大数据与人工智能,推动教育公平与教学模式创新,具有重要理论价值与实践意义。
|
Linux 网络安全 Android开发
高级 ping 命令及技巧
Ping命令是一种常用的网络诊断工具,用于测试网络连接的可达性和延迟等信息
1416 57
|
Linux iOS开发 网络架构
如何使用 Ping 命令监测网络丢包情况?
如何使用 Ping 命令监测网络丢包情况?
10238 48

推荐镜像

更多