有没有相关的python工程师把我调式好下面这段测网速的代码???报错了?报错-问答-阿里云开发者社区-阿里云

开发者社区> 问答> 正文

有没有相关的python工程师把我调式好下面这段测网速的代码???报错了?报错

爱吃鱼的程序员 2020-06-10 15:03:30 1069
有没有相关的python工程师把我调式好下面这段测网速的代码???报错了
 
#!usrbinenv python
# -- coding utf-8 --
# Copyright 2012-2015 Matt Martz
# All Rights Reserved.
#
#    Licensed under the Apache License, Version 2.0 (the License); you may
#    not use this file except in compliance with the License. You may obtain
#    a copy of the License at
#
#         httpwww.apache.orglicensesLICENSE-2.0
#
#    Unless required by applicable law or agreed to in writing, software
#    distributed under the License is distributed on an AS IS BASIS, WITHOUT
#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
#    License for the specific language governing permissions and limitations
#    under the License.

import os
import re
import sys
import math
import signal
import socket
import timeit
import platform
import threading

__version__ = '0.3.4'

# Some global variables we use
user_agent = None
source = None
shutdown_event = None
scheme = 'http'


# Used for bound_interface
socket_socket = socket.socket

try
    import xml.etree.cElementTree as ET
except ImportError
    try
        import xml.etree.ElementTree as ET
    except ImportError
        from xml.dom import minidom as DOM
        ET = None

# Begin import game to handle Python 2 and Python 3
try
    from urllib2 import urlopen, Request, HTTPError, URLError
except ImportError
    from urllib.request import urlopen, Request, HTTPError, URLError

try
    from httplib import HTTPConnection, HTTPSConnection
except ImportError
    e_http_py2 = sys.exc_info()
    try
        from http.client import HTTPConnection, HTTPSConnection
    except ImportError
        e_http_py3 = sys.exc_info()
        raise SystemExit('Your python installation is missing required HTTP '
                         'client classesnn'
                         'Python 2 %sn'
                         'Python 3 %s' % (e_http_py2[1], e_http_py3[1]))

try
    from Queue import Queue
except ImportError
    from queue import Queue

try
    from urlparse import urlparse
except ImportError
    from urllib.parse import urlparse

try
    from urlparse import parse_qs
except ImportError
    try
        from urllib.parse import parse_qs
    except ImportError
        from cgi import parse_qs

try
    from hashlib import md5
except ImportError
    from md5 import md5

try
    from argparse import ArgumentParser as ArgParser
except ImportError
    from optparse import OptionParser as ArgParser

try
    import builtins
except ImportError
    def print_(args, kwargs)
        The new-style print function taken from
        httpspypi.python.orgpypisix

        
        fp = kwargs.pop(file, sys.stdout)
        if fp is None
            return

        def write(data)
            if not isinstance(data, basestring)
                data = str(data)
            fp.write(data)

        want_unicode = False
        sep = kwargs.pop(sep, None)
        if sep is not None
            if isinstance(sep, unicode)
                want_unicode = True
            elif not isinstance(sep, str)
                raise TypeError(sep must be None or a string)
        end = kwargs.pop(end, None)
        if end is not None
            if isinstance(end, unicode)
                want_unicode = True
            elif not isinstance(end, str)
                raise TypeError(end must be None or a string)
        if kwargs
            raise TypeError(invalid keyword arguments to print())
        if not want_unicode
            for arg in args
                if isinstance(arg, unicode)
                    want_unicode = True
                    break
        if want_unicode
            newline = unicode(n)
            space = unicode( )
        else
            newline = n
            space =  
        if sep is None
            sep = space
        if end is None
            end = newline
        for i, arg in enumerate(args)
            if i
                write(sep)
            write(arg)
        write(end)
else
    print_ = getattr(builtins, 'print')
    del builtins


class SpeedtestCliServerListError(Exception)
    Internal Exception class used to indicate to move on to the next
    URL for retrieving speedtest.net server details

    


def bound_socket(args, kwargs)
    Bind socket to a specified source IP address

    global source
    sock = socket_socket(args, kwargs)
    sock.bind((source, 0))
    return sock


def distance(origin, destination)
    Determine distance between 2 sets of [lat,lon] in km

    lat1, lon1 = origin
    lat2, lon2 = destination
    radius = 6371  # km

    dlat = math.radians(lat2 - lat1)
    dlon = math.radians(lon2 - lon1)
    a = (math.sin(dlat  2)  math.sin(dlat  2) +
         math.cos(math.radians(lat1)) 
         math.cos(math.radians(lat2))  math.sin(dlon  2) 
         math.sin(dlon  2))
    c = 2  math.atan2(math.sqrt(a), math.sqrt(1 - a))
    d = radius  c

    return d


def build_user_agent()
    Build a Mozilla5.0 compatible User-Agent string

    global user_agent
    if user_agent
        return user_agent

    ua_tuple = (
        'Mozilla5.0',
        '(%s; U; %s; en-us)' % (platform.system(), platform.architecture()[0]),
        'Python%s' % platform.python_version(),
        '(KHTML, like Gecko)',
        'speedtest-cli%s' % __version__
    )
    user_agent = ' '.join(ua_tuple)
    return user_agent


def build_request(url, data=None, headers={})
    Build a urllib2 request object

    This function automatically adds a User-Agent header to all requests

    

    if url[0] == ''
        schemed_url = '%s%s' % (scheme, url)
    else
        schemed_url = url

    headers['User-Agent'] = user_agent
    return Request(schemed_url, data=data, headers=headers)


def catch_request(request)
    Helper function to catch common exceptions encountered when
    establishing a connection with a HTTPHTTPS request

    

    try
        uh = urlopen(request)
        return uh, False
    except (HTTPError, URLError, socket.error)
        e = sys.exc_info()[1]
        return None, e


class FileGetter(threading.Thread)
    Thread class for retrieving a URL

    def __init__(self, url, start)
        self.url = url
        self.result = None
        self.starttime = start
        threading.Thread.__init__(self)

    def run(self)
        self.result = [0]
        try
            if (timeit.default_timer() - self.starttime) = 10
                request = build_request(self.url)
                f = urlopen(request)
                while 1 and not shutdown_event.isSet()
                    self.result.append(len(f.read(10240)))
                    if self.result[-1] == 0
                        break
                f.close()
        except IOError
            pass


def downloadSpeed(files, quiet=False)
    Function to launch FileGetter threads and calculate download speeds

    start = timeit.default_timer()

    def producer(q, files)
        for file in files
            thread = FileGetter(file, start)
            thread.start()
            q.put(thread, True)
            if not quiet and not shutdown_event.isSet()
                sys.stdout.write('.')
                sys.stdout.flush()

    finished = []

    def consumer(q, total_files)
        while len(finished)  total_files
            thread = q.get(True)
            while thread.isAlive()
                thread.join(timeout=0.1)
            finished.append(sum(thread.result))
            del thread

    q = Queue(6)
    prod_thread = threading.Thread(target=producer, args=(q, files))
    cons_thread = threading.Thread(target=consumer, args=(q, len(files)))
    start = timeit.default_timer()
    prod_thread.start()
    cons_thread.start()
    while prod_thread.isAlive()
        prod_thread.join(timeout=0.1)
    while cons_thread.isAlive()
        cons_thread.join(timeout=0.1)
    return (sum(finished)  (timeit.default_timer() - start))


class FilePutter(threading.Thread)
    Thread class for putting a URL

    def __init__(self, url, start, size)
        self.url = url
        chars = '0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ'
        data = chars  (int(round(int(size)  36.0)))
        self.data = ('content1=%s' % data[0int(size) - 9]).encode()
        del data
        self.result = None
        self.starttime = start
        threading.Thread.__init__(self)

    def run(self)
        try
            if ((timeit.default_timer() - self.starttime) = 10 and
                    not shutdown_event.isSet())
                request = build_request(self.url, data=self.data)
                f = urlopen(request)
                f.read(11)
                f.close()
                self.result = len(self.data)
            else
                self.result = 0
        except IOError
            self.result = 0


def uploadSpeed(url, sizes, quiet=False)
    Function to launch FilePutter threads and calculate upload speeds

    start = timeit.default_timer()

    def producer(q, sizes)
        for size in sizes
            thread = FilePutter(url, start, size)
            thread.start()
            q.put(thread, True)
            if not quiet and not shutdown_event.isSet()
                sys.stdout.write('.')
                sys.stdout.flush()

    finished = []

    def consumer(q, total_sizes)
        while len(finished)  total_sizes
            thread = q.get(True)
            while thread.isAlive()
                thread.join(timeout=0.1)
            finished.append(thread.result)
            del thread

    q = Queue(6)
    prod_thread = threading.Thread(target=producer, args=(q, sizes))
    cons_thread = threading.Thread(target=consumer, args=(q, len(sizes)))
    start = timeit.default_timer()
    prod_thread.start()
    cons_thread.start()
    while prod_thread.isAlive()
        prod_thread.join(timeout=0.1)
    while cons_thread.isAlive()
        cons_thread.join(timeout=0.1)
    return (sum(finished)  (timeit.default_timer() - start))


def getAttributesByTagName(dom, tagName)
    Retrieve an attribute from an XML document and return it in a
    consistent format

    Only used with xml.dom.minidom, which is likely only to be used
    with python versions older than 2.5
    
    elem = dom.getElementsByTagName(tagName)[0]
    return dict(list(elem.attributes.items()))


def getConfig()
    Download the speedtest.net configuration and return only the data
    we are interested in
    

    request = build_request('www.speedtest.netspeedtest-config.php')
    uh, e = catch_request(request)
    if e
        print_('Could not retrieve speedtest.net configuration %s' % e)
        sys.exit(1)
    configxml = []
    while 1
        configxml.append(uh.read(10240))
        if len(configxml[-1]) == 0
            break
    if int(uh.code) != 200
        return None
    uh.close()
    try
        try
            root = ET.fromstring(''.encode().join(configxml))
            config = {
                'client' root.find('client').attrib,
                'times' root.find('times').attrib,
                'download' root.find('download').attrib,
                'upload' root.find('upload').attrib}
        except AttributeError  # Python3 branch
            root = DOM.parseString(''.join(configxml))
            config = {
                'client' getAttributesByTagName(root, 'client'),
                'times' getAttributesByTagName(root, 'times'),
                'download' getAttributesByTagName(root, 'download'),
                'upload' getAttributesByTagName(root, 'upload')}
    except SyntaxError
        print_('Failed to parse speedtest.net configuration')
        sys.exit(1)
    del root
    del configxml
    return config


def closestServers(client, all=False)
    Determine the 5 closest speedtest.net servers based on geographic
    distance
    

    urls = [
        'www.speedtest.netspeedtest-servers-static.php',
        'c.speedtest.netspeedtest-servers-static.php',
        'www.speedtest.netspeedtest-servers.php',
        'c.speedtest.netspeedtest-servers.php',
    ]
    errors = []
    servers = {}
    for url in urls
        try
            request = build_request(url)
            uh, e = catch_request(request)
            if e
                errors.append('%s' % e)
                raise SpeedtestCliServerListError
            serversxml = []
            while 1
                serversxml.append(uh.read(10240))
                if len(serversxml[-1]) == 0
                    break
            if int(uh.code) != 200
                uh.close()
                raise SpeedtestCliServerListError
            uh.close()
            try
                try
                    root = ET.fromstring(''.encode().join(serversxml))
                    elements = root.getiterator('server')
                except AttributeError  # Python3 branch
                    root = DOM.parseString(''.join(serversxml))
                    elements = root.getElementsByTagName('server')
            except SyntaxError
                raise SpeedtestCliServerListError
            for server in elements
                try
                    attrib = server.attrib
                except AttributeError
                    attrib = dict(list(server.attributes.items()))
                d = distance([float(client['lat']),
                              float(client['lon'])],
                             [float(attrib.get('lat')),
                              float(attrib.get('lon'))])
                attrib['d'] = d
                if d not in servers
                    servers[d] = [attrib]
                else
                    servers[d].append(attrib)
            del root
            del serversxml
            del elements
        except SpeedtestCliServerListError
            continue

        # We were able to fetch and parse the list of speedtest.net servers
        if servers
            break

    if not servers
        print_('Failed to retrieve list of speedtest.net serversnn %s' %
               'n'.join(errors))
        sys.exit(1)

    closest = []
    for d in sorted(servers.keys())
        for s in servers[d]
            closest.append(s)
            if len(closest) == 5 and not all
                break
        else
            continue
        break

    del servers
    return closest


def getBestServer(servers)
    Perform a speedtest.net latency request to determine which
    speedtest.net server has the lowest latency
    

    results = {}
    for server in servers
        cum = []
        url = '%slatency.txt' % os.path.dirname(server['url'])
        urlparts = urlparse(url)
        for i in range(0, 3)
            try
                if urlparts[0] == 'https'
                    h = HTTPSConnection(urlparts[1])
                else
                    h = HTTPConnection(urlparts[1])
                headers = {'User-Agent' user_agent}
                start = timeit.default_timer()
                h.request(GET, urlparts[2], headers=headers)
                r = h.getresponse()
                total = (timeit.default_timer() - start)
            except (HTTPError, URLError, socket.error)
                cum.append(3600)
                continue
            text = r.read(9)
            if int(r.status) == 200 and text == 'test=test'.encode()
                cum.append(total)
            else
                cum.append(3600)
            h.close()
        avg = round((sum(cum)  6)  1000, 3)
        results[avg] = server
    fastest = sorted(results.keys())[0]
    best = results[fastest]
    best['latency'] = fastest

    return best


def ctrl_c(signum, frame)
    Catch Ctrl-C key sequence and set a shutdown_event for our threaded
    operations
    

    global shutdown_event
    shutdown_event.set()
    raise SystemExit('nCancelling...')


def version()
    Print the version

    raise SystemExit(__version__)


def speedtest()
    Run the full speedtest.net test

    global shutdown_event, source, scheme
    shutdown_event = threading.Event()

    signal.signal(signal.SIGINT, ctrl_c)

    description = (
        'Command line interface for testing internet bandwidth using '
        'speedtest.net.n'
        '------------------------------------------------------------'
        '--------------n'
        'httpsgithub.comsivelspeedtest-cli')

    parser = ArgParser(description=description)
    # Give optparse.OptionParser an `add_argument` method for
    # compatibility with argparse.ArgumentParser
    try
        parser.add_argument = parser.add_option
    except AttributeError
        pass
    parser.add_argument('--bytes', dest='units', action='store_const',
                        const=('byte', 1), default=('bit', 8),
                        help='Display values in bytes instead of bits. Does '
                             'not affect the image generated by --share')
    parser.add_argument('--share', action='store_true',
                        help='Generate and provide a URL to the speedtest.net '
                             'share results image')
    parser.add_argument('--simple', action='store_true',
                        help='Suppress verbose output, only show basic '
                             'information')
    parser.add_argument('--list', action='store_true',
                        help='Display a list of speedtest.net servers '
                             'sorted by distance')
    parser.add_argument('--server', help='Specify a server ID to test against')
    parser.add_argument('--mini', help='URL of the Speedtest Mini server')
    parser.add_argument('--source', help='Source IP address to bind to')
    parser.add_argument('--timeout', default=10, type=int,
                        help='HTTP timeout in seconds. Default 10')
    parser.add_argument('--secure', action='store_true',
                        help='Use HTTPS instead of HTTP when communicating '
                             'with speedtest.net operated servers')
    parser.add_argument('--version', action='store_true',
                        help='Show the version number and exit')

    options = parser.parse_args()
    if isinstance(options, tuple)
        args = options[0]
    else
        args = options
    del options

    # Print the version and exit
    if args.version
        version()

    socket.setdefaulttimeout(args.timeout)

    # Pre-cache the user agent string
    build_user_agent()

    # If specified bind to a specific IP address
    if args.source
        source = args.source
        socket.socket = bound_socket

    if args.secure
        scheme = 'https'

    if not args.simple
        print_('Retrieving speedtest.net configuration...')
    try
        config = getConfig()
    except URLError
        print_('Cannot retrieve speedtest configuration')
        sys.exit(1)

    if not args.simple
        print_('Retrieving speedtest.net server list...')
    if args.list or args.server
        servers = closestServers(config['client'], True)
        if args.list
            serverList = []
            for server in servers
                line = ('%(id)4s) %(sponsor)s (%(name)s, %(country)s) '
                        '[%(d)0.2f km]' % server)
                serverList.append(line)
            print_('n'.join(serverList).encode('utf-8', 'ignore'))
            sys.exit(0)
    else
        servers = closestServers(config['client'])

    if not args.simple
        print_('Testing from %(isp)s (%(ip)s)...' % config['client'])

    if args.server
        try
            best = getBestServer(filter(lambda x x['id'] == args.server,
                                        servers))
        except IndexError
            print_('Invalid server ID')
            sys.exit(1)
    elif args.mini
        name, ext = os.path.splitext(args.mini)
        if ext
            url = os.path.dirname(args.mini)
        else
            url = args.mini
        urlparts = urlparse(url)
        try
            request = build_request(args.mini)
            f = urlopen(request)
        except
            print_('Invalid Speedtest Mini URL')
            sys.exit(1)
        else
            text = f.read()
            f.close()
        extension = re.findall('upload_extension ([^]+)', text.decode())
        if not extension
            for ext in ['php', 'asp', 'aspx', 'jsp']
                try
                    request = build_request('%sspeedtestupload.%s' %
                                            (args.mini, ext))
                    f = urlopen(request)
                except
                    pass
                else
                    data = f.read().strip()
                    if (f.code == 200 and
                            len(data.splitlines()) == 1 and
                            re.match('size=[0-9]', data))
                        extension = [ext]
                        break
        if not urlparts or not extension
            print_('Please provide the full URL of your Speedtest Mini server')
            sys.exit(1)
        servers = [{
            'sponsor' 'Speedtest Mini',
            'name' urlparts[1],
            'd' 0,
            'url' '%sspeedtestupload.%s' % (url.rstrip(''), extension[0]),
            'latency' 0,
            'id' 0
        }]
        try
            best = getBestServer(servers)
        except
            best = servers[0]
    else
        if not args.simple
            print_('Selecting best server based on latency...')
        best = getBestServer(servers)

    if not args.simple
        print_(('Hosted by %(sponsor)s (%(name)s) [%(d)0.2f km] '
               '%(latency)s ms' % best).encode('utf-8', 'ignore'))
    else
        print_('Ping %(latency)s ms' % best)

    sizes = [350, 500, 750, 1000, 1500, 2000, 2500, 3000, 3500, 4000]
    urls = []
    for size in sizes
        for i in range(0, 4)
            urls.append('%srandom%sx%s.jpg' %
                        (os.path.dirname(best['url']), size, size))
    if not args.simple
        print_('Testing download speed', end='')
    dlspeed = downloadSpeed(urls, args.simple)
    if not args.simple
        print_()
    print_('Download %0.2f M%ss' %
           ((dlspeed  1000  1000)  args.units[1], args.units[0]))

    sizesizes = [int(.25  1000  1000), int(.5  1000  1000)]
    sizes = []
    for size in sizesizes
        for i in range(0, 25)
            sizes.append(size)
    if not args.simple
        print_('Testing upload speed', end='')
    ulspeed = uploadSpeed(best['url'], sizes, args.simple)
    if not args.simple
        print_()
    print_('Upload %0.2f M%ss' %
           ((ulspeed  1000  1000)  args.units[1], args.units[0]))

    if args.share and args.mini
        print_('Cannot generate a speedtest.net share results image while '
               'testing against a Speedtest Mini server')
    elif args.share
        dlspeedk = int(round((dlspeed  1000)  8, 0))
        ping = int(round(best['latency'], 0))
        ulspeedk = int(round((ulspeed  1000)  8, 0))

        # Build the request to send results back to speedtest.net
        # We use a list instead of a dict because the API expects parameters
        # in a certain order
        apiData = [
            'download=%s' % dlspeedk,
            'ping=%s' % ping,
            'upload=%s' % ulspeedk,
            'promo=',
            'startmode=%s' % 'pingselect',
            'recommendedserverid=%s' % best['id'],
            'accuracy=%s' % 1,
            'serverid=%s' % best['id'],
            'hash=%s' % md5(('%s-%s-%s-%s' %
                             (ping, ulspeedk, dlspeedk, '297aae72'))
                            .encode()).hexdigest()]

        headers = {'Referer' 'httpc.speedtest.netflashspeedtest.swf'}
        request = build_request('www.speedtest.netapiapi.php',
                                data='&'.join(apiData).encode(),
                                headers=headers)
        f, e = catch_request(request)
        if e
            print_('Could not submit results to speedtest.net %s' % e)
            sys.exit(1)
        response = f.read()
        code = f.code
        f.close()

        if int(code) != 200
            print_('Could not submit results to speedtest.net')
            sys.exit(1)

        qsargs = parse_qs(response.decode())
        resultid = qsargs.get('resultid')
        if not resultid or len(resultid) != 1
            print_('Could not submit results to speedtest.net')
            sys.exit(1)

        print_('Share results %swww.speedtest.netresult%s.png' %
               (scheme, resultid[0]))


def main()
    try
        speedtest()
    except KeyboardInterrupt
        print_('nCancelling...')


if __name__ == '__main__'
    main()

# vimts=4sw=4expandtab

Pasted from httpsraw.githubusercontent.comsivelspeedtest-climasterspeedtest_cli.py



工程师代码 pythonc++报错 代码的代码报错 网关报错 udp代码
分享到
取消 提交回答
全部回答(1)
开发与运维
使用钉钉扫一扫加入圈子
+ 订阅

集结各类场景实战经验,助你开发运维畅行无忧

相似问题
最新问题
推荐课程