#!usrbinenv python # -- coding utf-8 -- # Copyright 2012-2015 Matt Martz # All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the License); you may # not use this file except in compliance with the License. You may obtain # a copy of the License at # # httpwww.apache.orglicensesLICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an AS IS BASIS, WITHOUT # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. import os import re import sys import math import signal import socket import timeit import platform import threading __version__ = '0.3.4' # Some global variables we use user_agent = None source = None shutdown_event = None scheme = 'http' # Used for bound_interface socket_socket = socket.socket try import xml.etree.cElementTree as ET except ImportError try import xml.etree.ElementTree as ET except ImportError from xml.dom import minidom as DOM ET = None # Begin import game to handle Python 2 and Python 3 try from urllib2 import urlopen, Request, HTTPError, URLError except ImportError from urllib.request import urlopen, Request, HTTPError, URLError try from httplib import HTTPConnection, HTTPSConnection except ImportError e_http_py2 = sys.exc_info() try from http.client import HTTPConnection, HTTPSConnection except ImportError e_http_py3 = sys.exc_info() raise SystemExit('Your python installation is missing required HTTP ' 'client classesnn' 'Python 2 %sn' 'Python 3 %s' % (e_http_py2[1], e_http_py3[1])) try from Queue import Queue except ImportError from queue import Queue try from urlparse import urlparse except ImportError from urllib.parse import urlparse try from urlparse import parse_qs except ImportError try from urllib.parse import parse_qs except ImportError from cgi import parse_qs try from hashlib import md5 except ImportError from md5 import md5 try from argparse import ArgumentParser as ArgParser except ImportError from optparse import OptionParser as ArgParser try import builtins except ImportError def print_(args, kwargs) The new-style print function taken from httpspypi.python.orgpypisix fp = kwargs.pop(file, sys.stdout) if fp is None return def write(data) if not isinstance(data, basestring) data = str(data) fp.write(data) want_unicode = False sep = kwargs.pop(sep, None) if sep is not None if isinstance(sep, unicode) want_unicode = True elif not isinstance(sep, str) raise TypeError(sep must be None or a string) end = kwargs.pop(end, None) if end is not None if isinstance(end, unicode) want_unicode = True elif not isinstance(end, str) raise TypeError(end must be None or a string) if kwargs raise TypeError(invalid keyword arguments to print()) if not want_unicode for arg in args if isinstance(arg, unicode) want_unicode = True break if want_unicode newline = unicode(n) space = unicode( ) else newline = n space = if sep is None sep = space if end is None end = newline for i, arg in enumerate(args) if i write(sep) write(arg) write(end) else print_ = getattr(builtins, 'print') del builtins class SpeedtestCliServerListError(Exception) Internal Exception class used to indicate to move on to the next URL for retrieving speedtest.net server details def bound_socket(args, kwargs) Bind socket to a specified source IP address global source sock = socket_socket(args, kwargs) sock.bind((source, 0)) return sock def distance(origin, destination) Determine distance between 2 sets of [lat,lon] in km lat1, lon1 = origin lat2, lon2 = destination radius = 6371 # km dlat = math.radians(lat2 - lat1) dlon = math.radians(lon2 - lon1) a = (math.sin(dlat 2) math.sin(dlat 2) + math.cos(math.radians(lat1)) math.cos(math.radians(lat2)) math.sin(dlon 2) math.sin(dlon 2)) c = 2 math.atan2(math.sqrt(a), math.sqrt(1 - a)) d = radius c return d def build_user_agent() Build a Mozilla5.0 compatible User-Agent string global user_agent if user_agent return user_agent ua_tuple = ( 'Mozilla5.0', '(%s; U; %s; en-us)' % (platform.system(), platform.architecture()[0]), 'Python%s' % platform.python_version(), '(KHTML, like Gecko)', 'speedtest-cli%s' % __version__ ) user_agent = ' '.join(ua_tuple) return user_agent def build_request(url, data=None, headers={}) Build a urllib2 request object This function automatically adds a User-Agent header to all requests if url[0] == '' schemed_url = '%s%s' % (scheme, url) else schemed_url = url headers['User-Agent'] = user_agent return Request(schemed_url, data=data, headers=headers) def catch_request(request) Helper function to catch common exceptions encountered when establishing a connection with a HTTPHTTPS request try uh = urlopen(request) return uh, False except (HTTPError, URLError, socket.error) e = sys.exc_info()[1] return None, e class FileGetter(threading.Thread) Thread class for retrieving a URL def __init__(self, url, start) self.url = url self.result = None self.starttime = start threading.Thread.__init__(self) def run(self) self.result = [0] try if (timeit.default_timer() - self.starttime) = 10 request = build_request(self.url) f = urlopen(request) while 1 and not shutdown_event.isSet() self.result.append(len(f.read(10240))) if self.result[-1] == 0 break f.close() except IOError pass def downloadSpeed(files, quiet=False) Function to launch FileGetter threads and calculate download speeds start = timeit.default_timer() def producer(q, files) for file in files thread = FileGetter(file, start) thread.start() q.put(thread, True) if not quiet and not shutdown_event.isSet() sys.stdout.write('.') sys.stdout.flush() finished = [] def consumer(q, total_files) while len(finished) total_files thread = q.get(True) while thread.isAlive() thread.join(timeout=0.1) finished.append(sum(thread.result)) del thread q = Queue(6) prod_thread = threading.Thread(target=producer, args=(q, files)) cons_thread = threading.Thread(target=consumer, args=(q, len(files))) start = timeit.default_timer() prod_thread.start() cons_thread.start() while prod_thread.isAlive() prod_thread.join(timeout=0.1) while cons_thread.isAlive() cons_thread.join(timeout=0.1) return (sum(finished) (timeit.default_timer() - start)) class FilePutter(threading.Thread) Thread class for putting a URL def __init__(self, url, start, size) self.url = url chars = '0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ' data = chars (int(round(int(size) 36.0))) self.data = ('content1=%s' % data[0int(size) - 9]).encode() del data self.result = None self.starttime = start threading.Thread.__init__(self) def run(self) try if ((timeit.default_timer() - self.starttime) = 10 and not shutdown_event.isSet()) request = build_request(self.url, data=self.data) f = urlopen(request) f.read(11) f.close() self.result = len(self.data) else self.result = 0 except IOError self.result = 0 def uploadSpeed(url, sizes, quiet=False) Function to launch FilePutter threads and calculate upload speeds start = timeit.default_timer() def producer(q, sizes) for size in sizes thread = FilePutter(url, start, size) thread.start() q.put(thread, True) if not quiet and not shutdown_event.isSet() sys.stdout.write('.') sys.stdout.flush() finished = [] def consumer(q, total_sizes) while len(finished) total_sizes thread = q.get(True) while thread.isAlive() thread.join(timeout=0.1) finished.append(thread.result) del thread q = Queue(6) prod_thread = threading.Thread(target=producer, args=(q, sizes)) cons_thread = threading.Thread(target=consumer, args=(q, len(sizes))) start = timeit.default_timer() prod_thread.start() cons_thread.start() while prod_thread.isAlive() prod_thread.join(timeout=0.1) while cons_thread.isAlive() cons_thread.join(timeout=0.1) return (sum(finished) (timeit.default_timer() - start)) def getAttributesByTagName(dom, tagName) Retrieve an attribute from an XML document and return it in a consistent format Only used with xml.dom.minidom, which is likely only to be used with python versions older than 2.5 elem = dom.getElementsByTagName(tagName)[0] return dict(list(elem.attributes.items())) def getConfig() Download the speedtest.net configuration and return only the data we are interested in request = build_request('www.speedtest.netspeedtest-config.php') uh, e = catch_request(request) if e print_('Could not retrieve speedtest.net configuration %s' % e) sys.exit(1) configxml = [] while 1 configxml.append(uh.read(10240)) if len(configxml[-1]) == 0 break if int(uh.code) != 200 return None uh.close() try try root = ET.fromstring(''.encode().join(configxml)) config = { 'client' root.find('client').attrib, 'times' root.find('times').attrib, 'download' root.find('download').attrib, 'upload' root.find('upload').attrib} except AttributeError # Python3 branch root = DOM.parseString(''.join(configxml)) config = { 'client' getAttributesByTagName(root, 'client'), 'times' getAttributesByTagName(root, 'times'), 'download' getAttributesByTagName(root, 'download'), 'upload' getAttributesByTagName(root, 'upload')} except SyntaxError print_('Failed to parse speedtest.net configuration') sys.exit(1) del root del configxml return config def closestServers(client, all=False) Determine the 5 closest speedtest.net servers based on geographic distance urls = [ 'www.speedtest.netspeedtest-servers-static.php', 'c.speedtest.netspeedtest-servers-static.php', 'www.speedtest.netspeedtest-servers.php', 'c.speedtest.netspeedtest-servers.php', ] errors = [] servers = {} for url in urls try request = build_request(url) uh, e = catch_request(request) if e errors.append('%s' % e) raise SpeedtestCliServerListError serversxml = [] while 1 serversxml.append(uh.read(10240)) if len(serversxml[-1]) == 0 break if int(uh.code) != 200 uh.close() raise SpeedtestCliServerListError uh.close() try try root = ET.fromstring(''.encode().join(serversxml)) elements = root.getiterator('server') except AttributeError # Python3 branch root = DOM.parseString(''.join(serversxml)) elements = root.getElementsByTagName('server') except SyntaxError raise SpeedtestCliServerListError for server in elements try attrib = server.attrib except AttributeError attrib = dict(list(server.attributes.items())) d = distance([float(client['lat']), float(client['lon'])], [float(attrib.get('lat')), float(attrib.get('lon'))]) attrib['d'] = d if d not in servers servers[d] = [attrib] else servers[d].append(attrib) del root del serversxml del elements except SpeedtestCliServerListError continue # We were able to fetch and parse the list of speedtest.net servers if servers break if not servers print_('Failed to retrieve list of speedtest.net serversnn %s' % 'n'.join(errors)) sys.exit(1) closest = [] for d in sorted(servers.keys()) for s in servers[d] closest.append(s) if len(closest) == 5 and not all break else continue break del servers return closest def getBestServer(servers) Perform a speedtest.net latency request to determine which speedtest.net server has the lowest latency results = {} for server in servers cum = [] url = '%slatency.txt' % os.path.dirname(server['url']) urlparts = urlparse(url) for i in range(0, 3) try if urlparts[0] == 'https' h = HTTPSConnection(urlparts[1]) else h = HTTPConnection(urlparts[1]) headers = {'User-Agent' user_agent} start = timeit.default_timer() h.request(GET, urlparts[2], headers=headers) r = h.getresponse() total = (timeit.default_timer() - start) except (HTTPError, URLError, socket.error) cum.append(3600) continue text = r.read(9) if int(r.status) == 200 and text == 'test=test'.encode() cum.append(total) else cum.append(3600) h.close() avg = round((sum(cum) 6) 1000, 3) results[avg] = server fastest = sorted(results.keys())[0] best = results[fastest] best['latency'] = fastest return best def ctrl_c(signum, frame) Catch Ctrl-C key sequence and set a shutdown_event for our threaded operations global shutdown_event shutdown_event.set() raise SystemExit('nCancelling...') def version() Print the version raise SystemExit(__version__) def speedtest() Run the full speedtest.net test global shutdown_event, source, scheme shutdown_event = threading.Event() signal.signal(signal.SIGINT, ctrl_c) description = ( 'Command line interface for testing internet bandwidth using ' 'speedtest.net.n' '------------------------------------------------------------' '--------------n' 'httpsgithub.comsivelspeedtest-cli') parser = ArgParser(description=description) # Give optparse.OptionParser an `add_argument` method for # compatibility with argparse.ArgumentParser try parser.add_argument = parser.add_option except AttributeError pass parser.add_argument('--bytes', dest='units', action='store_const', const=('byte', 1), default=('bit', 8), help='Display values in bytes instead of bits. Does ' 'not affect the image generated by --share') parser.add_argument('--share', action='store_true', help='Generate and provide a URL to the speedtest.net ' 'share results image') parser.add_argument('--simple', action='store_true', help='Suppress verbose output, only show basic ' 'information') parser.add_argument('--list', action='store_true', help='Display a list of speedtest.net servers ' 'sorted by distance') parser.add_argument('--server', help='Specify a server ID to test against') parser.add_argument('--mini', help='URL of the Speedtest Mini server') parser.add_argument('--source', help='Source IP address to bind to') parser.add_argument('--timeout', default=10, type=int, help='HTTP timeout in seconds. Default 10') parser.add_argument('--secure', action='store_true', help='Use HTTPS instead of HTTP when communicating ' 'with speedtest.net operated servers') parser.add_argument('--version', action='store_true', help='Show the version number and exit') options = parser.parse_args() if isinstance(options, tuple) args = options[0] else args = options del options # Print the version and exit if args.version version() socket.setdefaulttimeout(args.timeout) # Pre-cache the user agent string build_user_agent() # If specified bind to a specific IP address if args.source source = args.source socket.socket = bound_socket if args.secure scheme = 'https' if not args.simple print_('Retrieving speedtest.net configuration...') try config = getConfig() except URLError print_('Cannot retrieve speedtest configuration') sys.exit(1) if not args.simple print_('Retrieving speedtest.net server list...') if args.list or args.server servers = closestServers(config['client'], True) if args.list serverList = [] for server in servers line = ('%(id)4s) %(sponsor)s (%(name)s, %(country)s) ' '[%(d)0.2f km]' % server) serverList.append(line) print_('n'.join(serverList).encode('utf-8', 'ignore')) sys.exit(0) else servers = closestServers(config['client']) if not args.simple print_('Testing from %(isp)s (%(ip)s)...' % config['client']) if args.server try best = getBestServer(filter(lambda x x['id'] == args.server, servers)) except IndexError print_('Invalid server ID') sys.exit(1) elif args.mini name, ext = os.path.splitext(args.mini) if ext url = os.path.dirname(args.mini) else url = args.mini urlparts = urlparse(url) try request = build_request(args.mini) f = urlopen(request) except print_('Invalid Speedtest Mini URL') sys.exit(1) else text = f.read() f.close() extension = re.findall('upload_extension ([^]+)', text.decode()) if not extension for ext in ['php', 'asp', 'aspx', 'jsp'] try request = build_request('%sspeedtestupload.%s' % (args.mini, ext)) f = urlopen(request) except pass else data = f.read().strip() if (f.code == 200 and len(data.splitlines()) == 1 and re.match('size=[0-9]', data)) extension = [ext] break if not urlparts or not extension print_('Please provide the full URL of your Speedtest Mini server') sys.exit(1) servers = [{ 'sponsor' 'Speedtest Mini', 'name' urlparts[1], 'd' 0, 'url' '%sspeedtestupload.%s' % (url.rstrip(''), extension[0]), 'latency' 0, 'id' 0 }] try best = getBestServer(servers) except best = servers[0] else if not args.simple print_('Selecting best server based on latency...') best = getBestServer(servers) if not args.simple print_(('Hosted by %(sponsor)s (%(name)s) [%(d)0.2f km] ' '%(latency)s ms' % best).encode('utf-8', 'ignore')) else print_('Ping %(latency)s ms' % best) sizes = [350, 500, 750, 1000, 1500, 2000, 2500, 3000, 3500, 4000] urls = [] for size in sizes for i in range(0, 4) urls.append('%srandom%sx%s.jpg' % (os.path.dirname(best['url']), size, size)) if not args.simple print_('Testing download speed', end='') dlspeed = downloadSpeed(urls, args.simple) if not args.simple print_() print_('Download %0.2f M%ss' % ((dlspeed 1000 1000) args.units[1], args.units[0])) sizesizes = [int(.25 1000 1000), int(.5 1000 1000)] sizes = [] for size in sizesizes for i in range(0, 25) sizes.append(size) if not args.simple print_('Testing upload speed', end='') ulspeed = uploadSpeed(best['url'], sizes, args.simple) if not args.simple print_() print_('Upload %0.2f M%ss' % ((ulspeed 1000 1000) args.units[1], args.units[0])) if args.share and args.mini print_('Cannot generate a speedtest.net share results image while ' 'testing against a Speedtest Mini server') elif args.share dlspeedk = int(round((dlspeed 1000) 8, 0)) ping = int(round(best['latency'], 0)) ulspeedk = int(round((ulspeed 1000) 8, 0)) # Build the request to send results back to speedtest.net # We use a list instead of a dict because the API expects parameters # in a certain order apiData = [ 'download=%s' % dlspeedk, 'ping=%s' % ping, 'upload=%s' % ulspeedk, 'promo=', 'startmode=%s' % 'pingselect', 'recommendedserverid=%s' % best['id'], 'accuracy=%s' % 1, 'serverid=%s' % best['id'], 'hash=%s' % md5(('%s-%s-%s-%s' % (ping, ulspeedk, dlspeedk, '297aae72')) .encode()).hexdigest()] headers = {'Referer' 'httpc.speedtest.netflashspeedtest.swf'} request = build_request('www.speedtest.netapiapi.php', data='&'.join(apiData).encode(), headers=headers) f, e = catch_request(request) if e print_('Could not submit results to speedtest.net %s' % e) sys.exit(1) response = f.read() code = f.code f.close() if int(code) != 200 print_('Could not submit results to speedtest.net') sys.exit(1) qsargs = parse_qs(response.decode()) resultid = qsargs.get('resultid') if not resultid or len(resultid) != 1 print_('Could not submit results to speedtest.net') sys.exit(1) print_('Share results %swww.speedtest.netresult%s.png' % (scheme, resultid[0])) def main() try speedtest() except KeyboardInterrupt print_('nCancelling...') if __name__ == '__main__' main() # vimts=4sw=4expandtab Pasted from httpsraw.githubusercontent.comsivelspeedtest-climasterspeedtest_cli.py