为什么需要线程池
目前的大多数网络服务器,包括Web服务器、Email服务器以及数据库服务器等都具有一个共同点,就是单位时间内必须处理数目巨大的连接请求,但处理时间却相对较短。
传统多线程方案中我们采用的服务器模型则是一旦接受到请求之后,即创建一个新的线程,由该线程执行任务。任务执行完毕后,线程退出,这就是是“即时创建, 即时销毁”的策略。尽管与创建进程相比,创建线程的时间已经大大的缩短,但是如果提交给线程的任务是执行时间较短,而且执行次数极其频繁,那么服务器将处于不停的创建线程,销毁线程的状态。
我们将传统方案中的线程执行过程分为三个过程:T1、T2、T3:
T1:线程创建时间
T2:线程执行时间,包括线程的同步等时间
T3:线程销毁时间
那么我们可以看出,线程本身的开销所占的比例为(T1+T3) / (T1+T2+T3)。如果线程执行的时间很短的话,这比开销可能占到20%-50%左右。如果任务执行时间很频繁的话,这笔开销将是不可忽略的。
除此之外,线程池能够减少创建的线程个数。通常线程池所允许的并发线程是有上界的,如果同时需要并发的线程数超过上界,那么一部分线程将会等待。而传统方案中,如果同时请求数目为2000,那么最坏情况下,系统可能需要产生2000个线程。尽管这不是一个很大的数目,但是也有部分机器可能达不到这种要求。
因此线程池的出现正是着眼于减少线程池本身带来的开销。线程池采用预创建的技术,在应用程序启动之后,将立即创建一定数量的线程(N1),放入空闲队列 中。这些线程都是处于阻塞(Suspended)状态,不消耗CPU,但占用较小的内存空间。当任务到来后,缓冲池选择一个空闲线程,把任务传入此线程中运行。当N1个线程都在处理任务后,缓冲池自动创建一定数量的新线程,用于处理更多的任务。在任务执行完毕后线程也不退出,而是继续保持在池中等待下一次的任务。当系统比较空闲时,大部分线程都一直处于暂停状态,线程池自动销毁一部分线程,回收系统资源。
基于这种预创建技术,线程池将线程创建和销毁本身所带来的开销分摊到了各个具体的任务上,执行次数越多,每个任务所分担到的线程本身开销则越小,不过我们另外可能需要考虑进去线程之间同步所带来的开销。
Python自带的ThreadPoolExecutor
我用的python3.6,使用的是线程池来自concurrent.futures。下面我先简单介绍一下线程池的实现,ThreadPoolExecutor的详细使用可以参看我的github相应模块:github内容
1 | #python 3.6 |
以上是ThreadPoolExecutor的API,接下来我们来看看ThreadPoolExecutor内部是如何实现的,以下内容来自ThreadPoolExecutor源码。
首先看看init(),我们首先声明了线程池的几个基本属性:
- self._max_workers:最大线程数
- self._work_queue:等待队列
- self._threads:线程集合
- self._shutdown, self._shutdown_lock:实现手动关闭线程池
- self._thread_name_prefix:个性化定制
这也给了我们一个线程池的雏形,我们在自制的时候可以参考。
1 | class ThreadPoolExecutor(_base.Executor): |
接下来看最核心的submit函数的源码,终于可以愉快的添加注释啦。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20def submit(self, fn, *args, **kwargs):
# 保证在submit的过程中不会被粗暴的关闭以及打断
with self._shutdown_lock:
# 如果已经被关闭,就不能再加啦
if self._shutdown:
raise RuntimeError('cannot schedule new futures after shutdown')
# 会为每个提交函数创建一个Future对象,这个很重要,
# 通过这个Future类可以查询的线程的运行状态以及返回值,这是单纯的threading.Thread做不到的
f = _base.Future()
# 这个_WorkIthem的作用类似于threading.Thread的run(),因为在线程池里所有的线程都是以Future类的形式存在的,
# 所以要通过_WorkItem来为每个Future绑定一个run()的函数
w = _WorkItem(f, fn, args, kwargs)
# 放入等待队列
self._work_queue.put(w)
# 线程池真正的运行函数
self._adjust_thread_count()
# 立即返回future对象
return f
接下来看看self._adjust_thread_count()。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20def _adjust_thread_count(self):
# When the executor gets lost, the weakref callback will wake up
# the worker threads.
# 这里定义了弱引用的callback函数。
def weakref_cb(_, q=self._work_queue):
q.put(None)
# TODO(bquinlan): Should avoid creating new threads if there are more
# idle threads than items in the work queue.
num_threads = len(self._threads)
if num_threads < self._max_workers:
thread_name = '%s_%d' % (self._thread_name_prefix or self,
num_threads)
# 这里的target和args的应用可以说是非常专业了,下方会特别讲
t = threading.Thread(name=thread_name, target=_worker,
args=(weakref.ref(self, weakref_cb),
self._work_queue))
t.daemon = True
t.start()
self._threads.add(t)
_threads_queues[t] = self._work_queue
其实,这个线程池的逻辑是很简单但高效的,我被卡住的地方是一些很细节的东西,比如self._thread只有add,那么是怎么删掉跑完的线程啊,以及注释提到的weakref。
但是在我查了weakref的作用之后,我觉得这两点都跟python的垃圾回收机制有关。我们可以看到weakref.ref(self, weakref_cb),它是对我们的线程池实例(self)作了弱引用。
如果一个对象有一个常规引用,它是不会被垃圾回收机制回收的,如果一个对象只剩下一个弱引用,那么他可能被垃圾回收机制回收,而weakref_cb是他被删除时的回调函数。这也说明这段代码在
完成线程池的基本逻辑之外还兼顾了垃圾回收机制,不愧是源码,跪拜。当然关于一个Thread对象怎么从self._thread中删除,我也很好奇,可能也是跟垃圾回收有关。但需要强调的是,就算线程数是满的
也不会导致刚submit的函数被永远耽误,因为在submit中他们已经被put进入了self._work_queue这个线程间通信用的queue了,也就说它是可以被每个活跃的线程看见,并且run的。
自制的线程池
这是一个基础款的线程池,实现的基本功能有限制等待队列的数量以及限制线程的数量,由于逻辑简单,就直接上代码啦。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55# python3.6
import threading
import queue
import os
work_queue = queue.Queue()
class WorkerPool:
def __init__(self, max_worker_num = None):
if max_worker_num:
if max_worker_num > 0 and isinstance(max_worker_num, int):
self._worker_num = max_worker_num
else:
raise ValueError('the maximum number of the pool must be a positive integer')
else:
self._worker_num = (os.cpu_count() or 1) * 5
# self._work_queue = queue.Queue() # 这里不会给任务队列设置最大值,如果需要加上maximum=
self._thread_exists = set()
def submit(self, fn, args, kwargs):
if args is None:
args = ()
if kwargs is None:
kwargs = {}
para = (fn, args, kwargs)
work_queue.put(para)
self._check_pool()
def _check_pool(self):
if len(self._thread_exists) < self._worker_num:
worker = _Worker()
worker.start()
self._thread_exists.add(worker)
# 等待queue里的任务执行完毕后再继续执行程序
def wait_complete(self):
for worker in self._thread_exists:
if worker.isAlive():
worker.join()
class _Worker(threading.Thread):
def run(self):
while True:
# 给get加上一个timeout,这样当queue在timeout时间后仍为空
# 就会抛出queue.Empty的error,用try捕获后,break,就某种意义上的关闭了线程池
# 不然就会一直循环等待新的任务
try:
fn, args, kwargs = work_queue.get(block=True)
fn(*args, **kwargs)
# 线程每从任务中取出一个任务,都要执行task_done(),
# 这样我们才知道线程完成了一件一件的任务,在对线程用join阻塞的时候,
# 线程就可以判断是不是都task_done了,都done了就可以结束阻塞
work_queue.task_done()
except queue.Empty:
break