第七部分:模块
""" 作者:川川 时间:2021/7/27 """ '''用文本编辑器在当前目录下创建 fibo.py 文件,输入以下内容''' #这部分单独创建文件!在这展示fibo内容只是便于好看 # def fib(n): # write Fibonacci series up to n # a, b = 0, 1 # while a < n: # print(a, end=' ') # a, b = b, a+b # print() # # def fib2(n): # return Fibonacci series up to n # result = [] # a, b = 0, 1 # while a < n: # result.append(a) # a, b = b, a+b # return result '''这项操作不直接把 fibo 函数定义的名称导入到当前符号表,只导入模块名 fibo 。要使用模块名访问函数''' # import fibo # print(fibo.fib(1000)) '''如果经常使用某个函数,可以把它赋值给局部变量''' # a = fibo.fib # print(a(400)) '''6.1. 模块详解''' '''import 语句有一个变体,可以直接把模块里的名称导入到另一个模块的符号表''' # from fibo import fib, fib2 # print(fib(500)) '''还有一种变体可以导入模块内定义的所有名称 不建议从模块或包内导入 *, 因为,这项操作经常让代码变得难以理解。''' # from fibo import * # print(fib(500)) '''模块名后使用 as 时,直接把 as 后的名称与导入模块绑定。意思就是改变名称使用,''' # import fibo as fib # print(fib.fib(500)) '''from 中也可以使用这种方式''' # from fibo import fib as fibonacci # print(fibonacci(500))
第八部分:深层输入输出
""" 作者:川川 时间:2021/7/27 """ '''在字符串开头的引号/三引号前添加 f 或 F 。在这种字符串中,可以在 { 和 } 字符之间输入引用的变量''' # year = 2021 # event = 'Referendum' # a=f'Results of the {year} {event}' # print(a) '''str.format() 该方法也用 { 和 } 标记替换变量的位置a 这种方法支持详细的格式化指令''' # yes_votes = 42_572_654 # no_votes = 43_132_495 # percentage = yes_votes / (yes_votes + no_votes) # a='{:-5} YES votes {:1.1%}'.format(yes_votes, percentage)#调整{}内部感受下 # print(a) '''只想快速显示变量进行调试,可以用 repr() 或 str() 函数把值转化为字符串。''' # s = 'Hello, world.' # print(str(s))#str() 函数返回供人阅读的值 # print(repr(s))#repr() 则生成适于解释器读取的值 # print(str(1/7)) # hellos = repr('hello') # print(hellos) '''7.1.1. 格式化字符串字面值''' '''格式化字符串字面值 (简称为 f-字符串)在字符串前加前缀 f 或 F,通过 {expression} 表达式,把 Python 表达式的值添加到字符串内''' '''下例将 pi 舍入到小数点后三位''' # import math # print(f'The value of pi is approximately {math.pi:.3f}.') '''在 ':' 后传递整数,为该字段设置最小字符宽度,常用于列对齐''' # table = {'Sjoerd': 4127, 'Jack': 4098, 'Dcab': 7678} # for name, phone in table.items(): # print(f'{name:10} ==> {phone:10d}') '''7.1.2. 字符串 format() 方法''' # print('We are the {} who say "{}!"'.format('knights', 'Ni')) '''花括号及之内的字符(称为格式字段)被替换为传递给 str.format() 方法的对象。花括号中的数字表示传递给 str.format() 方法的对象所在的位置。''' # print('{0} and {1}'.format('spam', 'eggs')) # print('{1} and {0}'.format('spam', 'eggs')) '''使用关键字参数名引用值。''' # print('This {food} is {adjective}.'.format(food='spam', adjective='absolutely horrible')) '''位置参数和关键字参数可以任意组合''' # print('The story of {0}, {1}, and {other}.'.format('Bill', 'Manfred', # other='Georg')) '''用方括号 '[]' 访问键来完成''' # table = {'Sjoerd': 4127, 'Jack': 4098, 'Dcab': 8637678} # print('Jack: {0[Jack]:d}; Sjoerd: {0[Sjoerd]:d}; ''Dcab: {0[Dcab]:d}'.format(table)) '''也可以用 '**' 符号,把 table 当作传递的关键字参数。''' # print('Jack: {Jack:d}; Sjoerd: {Sjoerd:d}; Dcab: {Dcab:d}'.format(**table)) '''生成一组整齐的列,包含给定整数及其平方与立方''' # for x in range(1, 11): # print('{0:2d} {1:3d} {2:4d}'.format(x, x * x, x * x * x)) '''7.1.3. 手动格式化字符串''' # for x in range(1, 11): # print(repr(x).rjust(2), repr(x * x).rjust(3), end=' ') # print(repr(x * x * x).rjust(4)) '''7.1.4. 旧式字符串格式化方法''' # import math # print('The value of pi is approximately %5.3f.' % math.pi) '''7.2. 读写文件¶''' '''最常用的参数有两个: open(filename, mode)''' # f = open('workfile', 'w') ''' 第一个实参是文件名字符串第二个实参是包含描述文件使用方式字符的字符串。 mode 的值包括 'r' ,表示文件只能读取;'w' 表示只能写入(现有同名文件会被覆盖); 'a' 表示打开文件并追加内容,任何写入的数据会自动添加到文件末尾。'r+' 表示打开文件进行读写。 mode 实参是可选的,省略时的默认值为 'r'。 ''' # with open('workfile') as f: # read_data = f.read() # print(read_data) # f.close()#如果没有使用 with 关键字,则应调用 f.close() 关闭文件,即可释放文件占用的系统资源。 # with open('workfile') as f: # a=f.read() # print(a) # f.close() '''f.readline() 从文件中读取单行数据''' # with open('workfile') as f: # a=f.readline() # b=f.readline() # c=f.readline() # print(a,b,c) # for i in f: # print(i) # f.close() '''从文件中读取多行时,可以用循环遍历整个文件对象''' # with open('workfile') as f: # for line in f: # print(line, end='') # f.close() '''f.write(string) 把 string 的内容写入文件,并返回写入的字符数。''' # with open('workfile','w') as f: # f.write('This is a test\n') # f.close() '''写入其他类型的对象前,要先把它们转化为字符串(文本模式)或字节对象(二进制模式)''' # with open('workfile','a') as f: # value = ('the answer', 42) # s = str(value) # f.write(s) # f.close() # f = open('workfile', 'rb+') # f.write(b'0123456789abcdef') # print(f.read()) # print(f.seek(5)) # print(f.read(1)) '''7.2.2. 使用 json 保存结构化数据''' # import json # a=json.dumps([1, 'simple', 'list']) # print(a) '''dumps() 函数还有一个变体, dump() ,它只将对象序列化为 text file ''' #如果 f 是 text file 对象 # json.dump(x, f) #要再次解码对象,如果 f 是已打开、供读取的 text file 对象 # x = json.load(f)
第九部分:异常和错误
""" 作者:川川 时间:2021/7/27 """ '''处理异常搭配:try except''' '''如果没有异常发生,则跳过 except 子句 并完成 try 语句的执行。''' # while True: # try: # x = int(input("Please enter a number: ")) # break # except ValueError: # print("Oops! That was no valid number. Try again...") '''一个 try 语句可能有多个 except 子句,以指定不同异常的处理程序。''' # class B(Exception): # pass # # class C(B): # pass # # class D(C): # pass # # for cls in [B, C, D]: # try: # raise cls() # except D: # print("D") # except C: # print("C") # except B: # print("B") '''最后的 except 子句可以省略异常名,以用作通配符''' # try: # f = open('workfile') # s = f.readline() # i = int(s.strip()) # except OSError as err: # print("OS error: {0}".format(err)) # except ValueError: # print("Could not convert data to an integer.") # except: # print("Unexpected error:", sys.exc_info()[0]) # raise '''try ... except 语句有一个可选的 else 子句,在使用时必须放在所有的 except 子句后面''' # import sys # for arg in sys.argv[1:]: # try: # f = open(arg, 'r') # except OSError: # print('cannot open', arg) # else: # print(arg, 'has', len(f.readlines()), 'lines') # f.close() '''处理 try 子句中调用(即使是间接地)的函数内部发生的异常。''' # def this_fails(): # x = 1 / 0 # try: # this_fails() # except ZeroDivisionError as err: # print('Handling run-time error:', err) '''raise 语句支持强制触发指定的异常''' # try: # raise NameError('HiThere') # except NameError: # print('An exception flew by!') # raise '''异常链''' # def func(): # raise IOError # try: # func() # except IOError as exc: # raise RuntimeError('Failed to open database') from exc '''异常链在 except 或 finally 子句触发异常时自动生成''' # try: # open('database.sqlite') # except IOError: # raise RuntimeError from None '''定义清理操作''' # try: # raise KeyboardInterrupt # finally: # print('Goodbye, world!') '''一个更为复杂的例子''' def divide(x, y): try: result = x / y except ZeroDivisionError: print("division by zero!") else: print("result is", result) finally: print("executing finally clause") print(divide(2,1))
第十部分:类
""" 作者:川川 时间:2021/7/27 """ # class Complex: # def __init__(self, realpart, imagpart): # self.r = realpart # self.i = imagpart # x = Complex(3.0, -4.5) # print(x.r,x.i) '''实例对象¶''' # x.counter = 1 # while x.counter < 10: # x.counter = x.counter * 2 # print(x.counter) # del x.counter '''9.3.4. 方法对象''' ''''通常,方法在绑定后立即被调用''' # class MyClass: # """A simple example class""" # i = 12345 # # def f(self): # return 'hello world' # x=MyClass() # xf = x.f # while True: # print(xf()) '''9.3.5. 类和实例变量''' # class Dog: # kind = 'canine' # def __init__(self, name): # self.name = name # d = Dog('Fido') # e = Dog('Buddy') # print(d.name) # print(e.name) # print(d.kind) ''''正确的类设计应该使用实例变量:''' # class Dog: # def __init__(self, name): # self.name = name # self.tricks = [] # creates a new empty list for each dog # # def add_trick(self, trick): # self.tricks.append(trick) # d = Dog('Fido') # e = Dog('Buddy') # d.add_trick('roll over') # e.add_trick('play dead') # e.add_trick('play dead') # # print(d.tricks) # print(e.tricks) ''''迭代器''' # for element in [1, 2, 3]: # print(element) # for element in (1, 2, 3): # print(element) # for key in {'one':1, 'two':2}: # print(key) # for char in "123": # print(char) # for line in open("workfile"): # print(line, end='') '''生成器''' # def reverse(data): # for index in range(len(data)-1, -1, -1): # yield data[index] # # for char in reverse('golf'): # print(char) '''生成器表达式''' # a=sum(i*i for i in range(10)) # print(a) # xvec = [10, 20, 30] # yvec = [7, 5, 3] # c=sum(x*y for x,y in zip(xvec, yvec)) # print(c)
第十一部分:标准库简介
""" 作者:川川 时间:2021/7/27 """ '''操作系统接口(个人感觉没啥用)''' # import os # print(os.getcwd())#打印出当前文件位置 # os.chdir('/server/accesslogs')#改变运行位置 # os.system('mkdir today') #运行这个再系统shell '''文件通配符''' '''glob 模块提供了一个在目录中使用通配符搜索创建文件列表的函数:''' # import glob # print(glob.glob('*.py')) '''命令行参数''' # import sys # print(sys.argv)#打印本文件位置 ''' 字符串模式匹配''' # import re # b=re.findall(r'\bf[a-z]*', 'which foot or hand fell fastest') # print(b) # a='tea for too'.replace('too', 'two') # print(a) '''数学''' # import math # a=math.cos(math.pi / 4) # print(a) # import random # b=random.choice(['apple', 'pear', 'banana']) # print(b) '''互联网访问''' # from urllib.request import urlopen # with urlopen('http://tycho.usno.navy.mil/cgi-bin/timer.pl') as response: # for line in response: # line = line.decode('utf-8') # Decoding the binary data to text. # if 'EST' in line or 'EDT' in line: # print(line) # smtplib 用于发送邮件 # import smtplib # server = smtplib.SMTP('localhost') # server.sendmail('soothsayer@example.org', 'jcaesar@example.org') # server.quit() ''' 日期和时间''' # from datetime import date # now = date.today() # print(now) # a=now.strftime("%m-%d-%y. %d %b %Y is a %A on the %d day of %B.") # print(a) # birthday = date(2000, 9, 20) # age = now - birthday # c=age.days # print(c) '''数据压缩''' # import zlib # s = b'witch which has which witches wrist watch' # print(len(s)) # t = zlib.compress(s) # print(len(t)) # print( zlib.decompress(t)) # print(zlib.crc32(s)) '''性能测量''' # from timeit import Timer # a= Timer('t=a; a=b; b=t', 'a=1; b=2').timeit() # print(a)
第十二部分:协程
""" 作者:川川 时间:2021/7/27 """ # import asyncio # async def main(): # print('Hello ...') # await asyncio.sleep(1) # print('... World!') # asyncio.run(main()) '''等待 1 秒后打印 "hello",然后 再次 等待 2 秒后打印 "world"''' # import asyncio # import time # # async def say_after(delay, what): # await asyncio.sleep(delay) # print(what) # async def main(): # print(f"started at {time.strftime('%X')}") # await say_after(1, 'hello') # await say_after(2, 'world') # print(f"finished at {time.strftime('%X')}") # asyncio.run(main()) '''asyncio.create_task() 函数用来并发运行作为 asyncio 任务 的多个协程。''' # async def main(): # task1 = asyncio.create_task( # say_after(1, 'hello')) # task2 = asyncio.create_task( # say_after(2, 'world')) # print(f"started at {time.strftime('%X')}") # # Wait until both tasks are completed (should take # # around 2 seconds.) # await task1 # await task2 # # print(f"finished at {time.strftime('%X')}") # asyncio.run(main()) '''Python 协程属于 可等待 对象,因此可以在其他协程中被等待''' # import asyncio # # async def nested(): # return 42 # # async def main(): # # Nothing happens if we just call "nested()". # # A coroutine object is created but not awaited, # # so it *won't run at all*. # # Let's do it differently now and await it: # print(await nested()) # will print "42". # # asyncio.run(main()) ''' 协程函数: 定义形式为 async def 的函数; 协程对象: 调用 协程函数 所返回的对象。 ''' '''当一个协程通过 asyncio.create_task() 等函数被封装为一个 任务,该协程会被自动调度执行''' # import asyncio # # async def nested(): # return 42 # # async def main(): # # Schedule nested() to run soon concurrently # # with "main()". # task = asyncio.create_task(nested()) # # # "task" can now be used to cancel "nested()", or # # can simply be awaited to wait until it is complete: # await task # asyncio.run(main()) '''运行 asyncio 程序''' '''asyncio.run(coro, *, debug=False)¶''' # import asyncio # async def main(): # await asyncio.sleep(1) # print('hello') # # asyncio.run(main()) '''创建任务''' '''asyncio.create_task(coro, *, name=None)¶''' # import asyncio # async def coro(): # return 2021 # task = asyncio.create_task(coro())#python3.7+ # This works in all Python versions but is less readable # task = asyncio.ensure_future(coro())#before python3.7 '''休眠''' ''' asyncio.sleep(delay, result=None, *, loop=None)¶''' '''以下协程示例运行 5 秒,每秒显示一次当前日期''' # import asyncio # import datetime # # async def display_date(): # loop = asyncio.get_running_loop() # end_time = loop.time() + 5.0 # while True: # print(datetime.datetime.now()) # if (loop.time() + 1.0) >= end_time: # break # await asyncio.sleep(1) # # asyncio.run(display_date()) '''并发运行任务''' ''' asyncio.gather(*aws, loop=None, return_exceptions=False)¶''' # import asyncio # # async def factorial(name, number): # f = 1 # for i in range(2, number + 1): # print(f"Task {name}: Compute factorial({i})...") # await asyncio.sleep(1) # f *= i # print(f"Task {name}: factorial({number}) = {f}") # # async def main(): # # Schedule three calls *concurrently*: # await asyncio.gather( # factorial("A", 2), # factorial("B", 3), # factorial("C", 4), # ) # # asyncio.run(main()) '''屏蔽取消操作''' '''asyncio.shield(aw, *, loop=None)保护一个 可等待对象 防止其被 取消''' # res = await shield(something())#demo '''如果希望完全忽略取消操作 (不推荐) 则 shield() 函数需要配合一个 try/except 代码段''' # try: # res = await shield(something()) # except CancelledError: # res = None '''超时''' '''asyncio.wait_for(aw, timeout, *, loop=None)¶''' # import asyncio # async def eternity(): # # Sleep for one hour # await asyncio.sleep(3600) # print('yay!') # # async def main(): # # Wait for at most 1 second # try: # await asyncio.wait_for(eternity(), timeout=1.0) # except asyncio.TimeoutError: # print('timeout!') # # asyncio.run(main()) '''简单等待''' '''syncio.wait(aws, *, loop=None, timeout=None, return_when=ALL_COMPLETED)''' # 用法: # import asyncio # done, pending = await asyncio.wait(aws) # async def foo(): # return 42 # # task = asyncio.create_task(foo()) # done, pending = await asyncio.wait({task}) # # if task in done: # asyncio.run(task) # # Everything will work as expected now. '''在线程中运行''' '''asyncio.to_thread(func, /, *args, **kwargs)在不同的线程中异步地运行函数 func。''' '''这个协程函数主要是用于执行在其他情况下会阻塞事件循环的 IO 密集型函数/方法''' # import asyncio,time # def blocking_io(): # print(f"start blocking_io at {time.strftime('%X')}") # # Note that time.sleep() can be replaced with any blocking # # IO-bound operation, such as file operations. # time.sleep(1) # print(f"blocking_io complete at {time.strftime('%X')}") # # async def main(): # print(f"started main at {time.strftime('%X')}") # # await asyncio.gather( # asyncio.to_thread(blocking_io), # asyncio.sleep(1)) # print(f"finished main at {time.strftime('%X')}") # asyncio.run(main()) ''':要取消一个正在运行的 Task 对象可使用 cancel() 方法。调用此方法将使该 Task 对象抛出一个 CancelledError 异常给打包的协程''' '''以下示例演示了协程是如何侦听取消请求的''' # import asyncio # async def cancel_me(): # print('cancel_me(): before sleep') # # try: # # Wait for 1 hour # await asyncio.sleep(3600) # except asyncio.CancelledError: # print('cancel_me(): cancel sleep') # raise # finally: # print('cancel_me(): after sleep') # # async def main(): # # Create a "cancel_me" Task # task = asyncio.create_task(cancel_me()) # # # Wait for 1 second # await asyncio.sleep(1) # # task.cancel() # try: # await task # except asyncio.CancelledError: # print("main(): cancel_me is cancelled now") # # asyncio.run(main()) '''基于生成器的协程''' '''@asyncio.coroutine 用来标记基于生成器的协程的装饰器。 此装饰器使得旧式的基于生成器的协程能与 async/await 代码相兼容 ''' # import asyncio # @asyncio.coroutine # def old_style_coroutine(): # yield from asyncio.sleep(1) # # async def main(): # await old_style_coroutine() '''队列能被用于多个的并发任务的工作量分配:''' import asyncio import random import time async def worker(name, queue): while True: # Get a "work item" out of the queue. sleep_for = await queue.get() # Sleep for the "sleep_for" seconds. await asyncio.sleep(sleep_for) # Notify the queue that the "work item" has been processed. queue.task_done() print(f'{name} has slept for {sleep_for:.2f} seconds') async def main(): # Create a queue that we will use to store our "workload". queue = asyncio.Queue() # Generate random timings and put them into the queue. total_sleep_time = 0 for _ in range(20): sleep_for = random.uniform(0.05, 1.0) total_sleep_time += sleep_for queue.put_nowait(sleep_for) # Create three worker tasks to process the queue concurrently. tasks = [] for i in range(3): task = asyncio.create_task(worker(f'worker-{i}', queue)) tasks.append(task) # Wait until the queue is fully processed. started_at = time.monotonic() await queue.join() total_slept_for = time.monotonic() - started_at # Cancel our worker tasks. for task in tasks: task.cancel() # Wait until all worker tasks are cancelled. await asyncio.gather(*tasks, return_exceptions=True) print('====') print(f'3 workers slept in parallel for {total_slept_for:.2f} seconds') print(f'total expected sleep time: {total_sleep_time:.2f} seconds') asyncio.run(main())
如果你有一定的编程基础,那么你一定会通过本文章查漏补缺,同时领略python基础。由于内容实在过多,本篇文章偏语法介绍,并没有举很多例子,如有不好之处多对见谅。