python day Twelve
一、线程池(补充)
1.上下文管理
2.终止线程池操作
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
|
例
1.
线程和队列
import
queue
import
threading
class
ThreadPool(
object
):
def
__init__(
self
, max_num
=
20
):
#max_num:固定队列参数20
self
.queue
=
queue.Queue(max_num)
for
i
in
range
(max_num):
#往队列里边循环加线程
self
.queue.put(threading.Thread)
def
get_thread(
self
):
#获取线程
return
self
.queue.get()
def
add_thread(
self
):
#增加线程,用于不断增加线程
self
.queue.put(threading.Thread)
pool
=
ThreadPool(
10
)
#定义线程池10个
def
func(arg, p):
print
(arg)
import
time
time.sleep(
2
)
#等待2s
p.add_thread()
#上边执行完一个任务后,线程自动关闭,再手动增加一个线程到队列。
for
i
in
range
(
30
):
#有30个任务需要执行
thread
=
pool.get_thread()
#从队列里获取一个线程
t
=
thread(target
=
func, args
=
(i, pool))
t.start()
#启动线程
例
2.
上下文管理和终止线程操作
import
queue
import
threading
import
contextlib
import
time
StopEvent
=
object
()
class
ThreadPool(
object
):
def
__init__(
self
, max_num, max_task_num
=
None
):
if
max_task_num:
self
.q
=
queue.Queue(max_task_num)
else
:
self
.q
=
queue.Queue()
self
.max_num
=
max_num
self
.cancel
=
False
self
.terminal
=
False
self
.generate_list
=
[]
self
.free_list
=
[]
def
run(
self
, func, args, callback
=
None
):
"""
线程池执行一个任务
:param func: 任务函数
:param args: 任务函数所需参数
:param callback: 任务执行失败或成功后执行的回调函数,回调函数有两个参数1、任务函数执行状态;2、任务函数返回值(默认为None,即:不执行回调函数)
:return: 如果线程池已经终止,则返回True否则None
"""
if
self
.cancel:
return
if
len
(
self
.free_list)
=
=
0
and
len
(
self
.generate_list) <
self
.max_num:
self
.generate_thread()
w
=
(func, args, callback,)
self
.q.put(w)
def
generate_thread(
self
):
#创建一个线程
t
=
threading.Thread(target
=
self
.call)
t.start()
def
call(
self
):
#循环去获取任务函数并执行任务函数
current_thread
=
threading.currentThread()
self
.generate_list.append(current_thread)
event
=
self
.q.get()
while
event !
=
StopEvent:
func, arguments, callback
=
event
try
:
result
=
func(
*
arguments)
success
=
True
except
Exception as e:
success
=
False
result
=
None
if
callback
is
not
None
:
try
:
callback(success, result)
except
Exception as e:
pass
with
self
.worker_state(
self
.free_list, current_thread):
#上下文管理,内部运行一个装饰器,具体内容看例3
if
self
.terminal:
event
=
StopEvent
else
:
event
=
self
.q.get()
else
:
self
.generate_list.remove(current_thread)
def
close(
self
):
#执行完所有的任务后,所有线程停止
self
.cancel
=
True
full_size
=
len
(
self
.generate_list)
while
full_size:
self
.q.put(StopEvent)
full_size
-
=
1
def
terminate(
self
):
#无论是否还有任务,终止线程
self
.terminal
=
True
while
self
.generate_list:
self
.q.put(StopEvent)
self
.q.queue.clear()
@contextlib.contextmanager
def
worker_state(
self
, state_list, worker_thread):
#用于记录线程中正在等待的线程数
state_list.append(worker_thread)
try
:
yield
finally
:
state_list.remove(worker_thread)
# How to use
pool
=
ThreadPool(
5
)
def
callback(status, result):
# status, execute action status
# result, execute action return value
pass
def
action(i):
print
(i)
for
i
in
range
(
30
):
ret
=
pool.run(action, (i,), callback)
time.sleep(
5
)
print
(
len
(pool.generate_list),
len
(pool.free_list))
print
(
len
(pool.generate_list),
len
(pool.free_list))
# pool.close() #等待任务结束之后,终止线程;终止线程池操作方法1.
pool.terminate()
#无论是否还有任务,终止线程;终止线程池操作方法2.
python
3.0
官方上下文管理详见:https:
/
/
docs.python.org
/
3
/
library
/
contextlib.html
|
二、redis 发布订阅
三、rabbitMQ
四、mysql
五、python pymysql模块
六、python ORM框架:SQLAchemy
七、python Paramiko模块
八、基于Paramiko模块来看堡垒机如何实现
本文转自506554897 51CTO博客,原文链接:http://blog.51cto.com/506554897/1846557,如需转载请自行联系原作者