python进程、线程和协程

admin 2022年1月6日01:34:32评论45 views字数 31207阅读104分1秒阅读模式

0x1进程

进程(process)是正在运行的程序的实例,但一个程序可能会产生多个进程。
比如,打开 Chrome 浏览器程序,它可能会产生多个进程,主程序需要一个进程,一个网页标签需要一个进程,一个插件也需要一个进程,等等。

进程时一个具有一定功能的程序在一个数据集上的一次动态执行过程。
进程由程序,数据集合和进程控制块三部分组成。

  • 程序用于描述进程要完成的功能,是控制进程执行的指令集;
  • 数据集合是程序在执行时需要的数据和工作区;
  • 程序控制块(PCB)包含程序的描述信息和控制信息,是进程存在的唯一标志。

0x1.1fork()

在介绍 Python 的进程编程之前,让我们先看看 Unix/Linux 中的 fork 函数。在 Unix/Linux 系统中,fork 函数被用于创建进程。这个函数很特殊,对于普通的函数,调用它一次,返回一次,但是调用 fork 一次,它返回两次。事实上,fork 函数创建了新的进程,我们把它称为子进程,子进程几乎是当前进程(即父进程)的一个拷贝:它会复制父进程的代码段,堆栈段和数据段。

对于父进程,fork 函数返回了子进程的进程号 pid,对于子进程,fork 函数则返回 0,这也是 fork 函数返回两次的原因,根据返回值,我们可以判断进程是父进程还是子进程。

下面我们看一段 C 代码,它展示了 fork 的基本使用:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
#include <unistd.h>
#include <stdio.h>

int main(int argc, char const *argv[])
{
int pid;
pid = fork(); // 使用 fork 函数

if (pid < 0) {
printf("Fail to create process\n");
}
else if (pid == 0) {
printf("I am child process (%d) and my parent is (%d)\n", getpid(), getppid());
}
else {
printf("I (%d) just created a child process (%d)\n", getpid(), pid);
}
return 0;
}

其中,getpid 用于获取当前进程号,getppid 用于获取父进程号。

事实上,Python 的 os 模块包含了普遍的操作系统功能,该模块也提供了 fork 函数,把上面的代码改成用 Python 来实现,如下:

1
2
3
4
5
6
7
8
9
10
import os

pid = os.fork()

if pid < 0:
print 'Fail to create process'
elif pid == 0:
print 'I am child process (%s) and my parent is (%s).' % (os.getpid(), os.getppid())
else:
print 'I (%s) just created a child process (%s).' % (os.getpid(), pid)

运行上面的代码,产生如下输出:

1
2
I (86645) just created a child process (86646).
I am child process (86646) and my parent is (86645).

需要注意的是,虽然子进程复制了父进程的代码段和数据段等,但是一旦子进程开始运行,子进程和父进程就是相互独立的,它们之间不再共享任何数据。

0x1.2多进程

Python 提供了一个 multiprocessing 模块,利用它,我们可以来编写跨平台的多进程程序,但需要注意的是 multiprocessing 在 Windows 和 Linux 平台的不一致性:一样的代码在 Windows 和 Linux 下运行的结果可能不同。因为 Windows 的进程模型和 Linux 不一样,Windows 下没有 fork。

我们先来看一个简单的例子,该例子演示了在主进程中启动一个子进程,并等待其结束,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
import os
from multiprocessing import Process

# 子进程要执行的代码
def child_proc(name):
print 'Run child process %s (%s)...' % (name, os.getpid())

if __name__ == '__main__':
print 'Parent process %s.' % os.getpid()
p = Process(target=child_proc, args=('test',))
print 'Process will start.'
p.start()
p.join()
print 'Process end.'

在上面的代码中,我们从 multiprocessing 模块引入了 Process,Process 是一个用于创建进程对象的类,其中,target 指定了进程要执行的函数,args 指定了参数。在创建了进程实例 p 之后,我们调用 start 方法开始执行该子进程,接着,我们又调用了 join 方法,该方法用于阻塞子进程以外的所有进程(这里指父进程),当子进程执行完毕后,父进程才会继续执行,它通常用于进程间的同步。

可以看到,用上面这种方式来创建进程比直接使用 fork 更简单易懂。现在,让我们看下输出结果:

1
2
3
4
Parent process 7170.
Process will start.
Run child process test (10075)...
Process end.

0x1.3multiprocessing 与平台有关

1
2
3
4
5
6
7
8
9
10
11
12
13
14
import random
import os
from multiprocessing import Process

num = random.randint(0, 100)

def show_num():
print("pid:{}, num is {}".format(os.getpid(), num))

if __name__ == "__main__":
print("pid:{}, num is {}".format(os.getpid(), num))
p = Process(target=show_num)
p.start()
p.join()

在 Windows 下运行以上代码,输出的结果如下(你得到不一样的结果也是对的):

1
2
pid:6504, num is 25
pid:6880, num is 6

我们发现,num 的值是不一样的!

在 Linux 下运行以上代码,可以看到 num 的值是一样的:

1
2
pid:11747, num is 13
pid:11748, num is 13

0x1.4进程池Pool

在上面,我们只是创建了一个进程,如果要创建多个进程呢?Python 提供了进程池的方式,让我们批量创建子进程,让我们看一个简单的示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
import os, time
from multiprocessing import Pool

def foo(x):
print 'Run task %s (pid:%s)...' % (x, os.getpid())
time.sleep(2)
print 'Task %s result is: %s' % (x, x * x)

if __name__ == '__main__':
print 'Parent process %s.' % os.getpid()
p = Pool(4) # 设置进程数
for i in range(5):
p.apply_async(foo, args=(i,)) # 设置每个进程要执行的函数和参数
print 'Waiting for all subprocesses done...'
p.close()
p.join()
print 'All subprocesses done.'

在上面的代码中,Pool 用于生成进程池,对 Pool 对象调用 apply_async 方法可以使每个进程异步执行任务,也就说不用等上一个任务执行完才执行下一个任务,close 方法用于关闭进程池,确保没有新的进程加入,join 方法会等待所有子进程执行完毕。

看看执行结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
Parent process 7170.
Run task 1 (pid:10320)...
Run task 0 (pid:10319)...
Run task 3 (pid:10322)...
Run task 2 (pid:10321)...
Waiting for all subprocesses done...
Task 1 result is: 1
Task 0 result is: 0
Run task 4 (pid:10320)...
Task 3 result is: 9
Task 2 result is: 4
Task 4 result is: 16
All subprocesses done.

0x1.5进程间通信

每个进程各自有不同的用户地址空间,任何一个进程的全局变量在另一个进程中都看不到,所以进程之间要交换数据必须通过内核,在内核中开辟一块缓冲区,进程A把数据从用户空间拷到内核缓冲区,进程B再从内核缓冲区把数据读走,内核提供的这种机制称为进程间通信。假如创建了多个进程,那么进程间的通信是必不可少的。

Python提供了多种进程通信的方式,其中以Queue和Pipe用得最多。下面分别介绍这两种模式

0x1.5.1Queue

Queue是一种多进程安全的队列。实现多进程间的通信有两种方法:

  • put() 用于向队列中加入数据。有两个属性:blocked和timeout。blocked为true时(默认为True)且timeout为正值时,如果当队列已满会阻塞timeout时间,在这个时间内如果队列有空位会加入,如果超过时间仍然没有空位会抛出Queue.Full异常。
  • get() 用于从队列中获取一个数据并将其从队列中删除。有两个属性:blocked和timeout。blocked为true(默认为True)且timeout为正值时,如果当前队列为空会阻塞timeout时间,在这个时间内如果队列有新数据会获取,如果超过时间仍然没有新数据会抛出Queue.Empty异常。

下面以队列(Queue)为例,在父进程中创建两个子进程,一个往队列写数据,一个从对列读数据,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
import time
import os
from multiprocessing import Pool,Queue,Process

def write_task(q):
try:
n=1
while n<5:
print "write,%d"%n
q.put(n)
time.sleep(1)
n+=1
except BaseException:
print "write_task error"
finally:
print "write_task end"

def read_task(q):
try:
n=1
while n<5:
print "read,%d"%q.get()
n+=1
except BaseException:
print "read_task error"
finally:
print "read_task end"

if __name__=="__main__":
q=Queue()
pw=Process(target=write_task,args=(q,))
pr=Process(target=read_task,args=(q,))
pw.start()
pr.start()
pw.join()
pr.join()
print "DONE"

执行结果如下:

1
2
3
4
5
6
7
8
9
10
11
write, 1
read, 1
write, 2
read, 2
write, 3
read, 3
write, 4
read, 4
write_task end
read_task end
DONE

0x1.5.2Pipe

Pipe与Queue不同之处在于Pipe是用于两个进程之间的通信。就像进程位于一根水管的两端。让我们看看Pipe官方文档的描述:

Returns a pair (conn1, conn2) of Connection objects representing the ends of a pipe.

Piep返回conn1和conn2代表水管的两端。Pipe还有一个参数duplex(adj. 二倍的,双重的 n. 双工;占两层楼的公寓套房),默认为True。当duplex为True时,开启双工模式,此时水管的两边都可以进行收发。当duplex为False,那么conn1只负责接受信息,conn2只负责发送信息。
conn通过send()和recv()来发送和接受信息。值得注意的是,如果管道中没有信息可接受,recv()会一直阻塞直到管道关闭(任意一端进程接结束则管道关闭)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
from multiprocessing import Process,Pipe
import os

def put_data(p,nums):
print('现在的进程编号为:%s,这个一个send进程' % os.getpid())
for num in nums:
p.send(num)
print('%s已经放入管道中啦!' % num)

def get_data(p):
print('现在的进程编号为:%s,这个一个recv进程' % os.getpid())
while True:
print('已经从管道中获取%s并从中删除' % p.recv())

if __name__ == '__main__':
p = Pipe(duplex=False)
# 此时Pipe[1]即是Pipe返回的conn2
p1 = Process(target=put_data,args=(p[1],['1','2','3'],))
# 此时Pipe[0]即是Pipe返回的conn1
p3 = Process(target=get_data,args=(p[0],))
p1.start()
p3.start()
p1.join()
p3.terminate()

让我们看一下输出结果

1
2
3
4
5
6
7
8
现在的进程编号为:9868,这个一个recv进程
现在的进程编号为:9072,这个一个send进程
1已经放入管道中啦!
已经从管道中获取1,并从中删除
2已经放入管道中啦!
已经从管道中获取2并从中删除
3已经放入管道中啦!
已经从管道中获取3并从中删除

0x1.5.3Array

1
2
3
4
5
6
7
8
9
10
11
12
13
14
from multiprocessing import Process
from multiprocessing import Array

def func(i, ar):
ar[i] = i
for item in ar:
print(item)
print("------")
if __name__=='__main__':
ar = Array('i', 5)
for i in range(5):
p = Process(target=func, args=(i, ar,))
p.start()
p.join()

Array的局限性在于受制于数组的特性,即需要指定数据类型且长度固定

1
2
3
4
5
6
7
# 数据类型对照表
'c': ctypes.c_char, 'u': ctypes.c_wchar,
'b': ctypes.c_byte, 'B': ctypes.c_ubyte,
'h': ctypes.c_short, 'H': ctypes.c_ushort,
'i': ctypes.c_int, 'I': ctypes.c_uint,
'l': ctypes.c_long, 'L': ctypes.c_ulong,
'f': ctypes.c_float, 'd': ctypes.c_double

0x1.5.4manage.dict()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
#coding:utf-8

from multiprocessing import Process, Manager

# 每个子进程执行的函数
# 参数中,传递了一个用于多进程之间数据共享的特殊字典
def func(i, d):
d[i] = i + 100
print(d.values())

if __name__=='__main__':
# 在主进程中创建特殊字典
m = Manager()
d = m.dict()

for i in range(5):
# 让子进程去修改主进程的特殊字典
p = Process(target=func, args=(i, d))
p.start()
p.join()


output:
[101]
[100, 101]
[100, 101, 102]
[100, 101, 102, 103]
[100, 101, 102, 103, 104]

0x2线程

线程(thread)是进程(process)中的一个实体,一个进程至少包含一个线程。比如,对于视频播放器,显示视频用一个线程,播放音频用另一个线程。如果我们把进程看成一个容器,则线程是此容器的工作单位。

进程和线程的区别主要有:

  • 进程之间是相互独立的,多进程中,同一个变量,各自有一份拷贝存在于每个进程中,但互不影响;而同一个进程的多个线程是内存共享的,所有变量都由所有线程共享;
  • 由于进程间是独立的,因此一个进程的崩溃不会影响到其他进程;而线程是包含在进程之内的,线程的崩溃就会引发进程的崩溃,继而导致同一进程内的其他线程也奔溃;

线程的生命周期由run方法决定,当run方法结束时线程死亡。可以通过继承Thread,重写run方法改变Thread的功能,最后还是通过start()方法开线程。

1
2
3
4
5
6
7
8
9
from threading import Thread

class MyThread(Thread):
def run(self):
print('i am sorry')

if __name__ == '__main__':
t = MyThread()
t.start()

通过args参数以一个元组的方式给线程中的函数传参。

1
2
3
4
5
6
7
8
from threading import Thread

def sorry(name):
print('i am sorry',name)

if __name__ == '__main__':
t = Thread(target=sorry,args=('mike',))
t.start()

0x2.1threding模块简介

threding模块的类

对象 描述
Thread 表示一个执行线程的对象
Lock 锁对象
RLock 可重入锁对象,使单一线程可以(再次)获得已持有的锁(递归锁)
Condition 条件变量对象,使得一个线程等待另外一个线程满足特定的条件,比如改变状态或者某个数据值
Event 条件变量的通用版本,任意数量的线程等待某个事件的发生,在该事件发生后所有的线程都将被激活
Semaphore 为线程间的有限资源提供一个计数器,如果没有可用资源时会被阻塞
BoundedSemaphore 于Semaphore相似,不过它不允许超过初始值
Timer 于Thread类似,不过它要在运行前等待一定时间
Barrier 创建一个障碍,必须达到指定数量的线程后才可以继续

0x2.2Thread类

0x2.2.1使用方法

Thread是线程类,有两种使用方法,直接传入要运行的方法或从Thread继承并覆盖run()
方法一:将要执行的方法作为参数传给Thread的构造方法

1
2
3
4
5
6
7
8
9
10
11
12
13
#coding:utf-8
import threading
import time

def action(arg):
time.sleep(1)
print 'the arg is:%s\r' %arg

for i in xrange(4):
t =threading.Thread(target=action,args=(i,))
t.start()

print 'main thread end!'

方法二:从Thread继承,并重写run()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
#coding:utf-8
import threading
import time
class MyThread(threading.Thread):
def __init__(self,arg):
super(MyThread, self).__init__()#注意:一定要显式的调用父类的初始化函数。
self.arg=arg
def run(self):#定义每个线程要运行的函数
time.sleep(1)
print 'the arg is:%s\r' % self.arg

for i in xrange(4):
t =MyThread(i)
t.start()

print 'main thread end!'

构造方法:
Thread(group=None, target=None, name=None, args=(), kwargs={})

  • group: 线程组,目前还没有实现,库引用中提示必须是None;
  • target: 要执行的方法;
  • name: 线程名;
  • args/kwargs: 要传入方法的参数。

实例方法:

  • isAlive(): 返回线程是否在运行。正在运行指启动后、终止前。
  • get/setName(name): 获取/设置线程名。
  • start(): 线程准备就绪,等待CPU调度
  •  is/setDaemon(bool): 获取/设置是后台线程(默认前台线程(False))。(在start之前设置)
    如果是后台线程,主线程执行过程中,后台线程也在进行,主线程执行完毕后,后台线程不论成功与否,主线程和后台线程均停止。
    如果是前台线程,主线程执行过程中,前台线程也在进行,主线程执行完毕后,等待前台线程也执行完成后,程序停止
  • start(): 启动线程。
  • join([timeout]): 阻塞当前上下文环境的线程,直到调用此方法的线程终止或到达指定的timeout(可选参数)。

0x2.2.2setDeamon与join

使用例子一(未设置setDeamon):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
# coding:utf-8
import threading
import time

def action(arg):
time.sleep(1)
print 'sub thread start! the thread name is:%s\r' % threading.currentThread().getName()
print 'the arg is:%s\r' %arg
time.sleep(1)

for i in xrange(4):
t =threading.Thread(target=action,args=(i,))
t.start()

print 'main_thread end!'



output:
main_thread end!
sub thread start!the thread name is:Thread-3

the arg is:2

sub thread start!the thread name is:Thread-2
sub thread start!the thread name is:Thread-1

the arg is:1

the arg is:0
sub thread start!the thread name is:Thread-4
the arg is:3
[Finished in 2.2s]

验证了serDeamon(False)(默认)前台线程,主线程执行过程中,前台线程也在进行,主线程执行完毕后,等待前台线程也执行完成后,主线程停止。

使用例子二(setDeamon=True)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# coding:utf-8
import threading
import time

def action(arg):
time.sleep(1)
print 'sub thread start!the thread name is:%s\r' % threading.currentThread().getName()
print 'the arg is:%s\r' %arg
time.sleep(1)

for i in xrange(4):
t =threading.Thread(target=action,args=(i,))
t.setDaemon(True)#设置线程为后台线程
t.start()

print 'main_thread end!'

output:
main_thread end!

验证了serDeamon(True)后台线程,主线程执行过程中,后台线程也在进行,主线程执行完毕后,后台线程不论成功与否,主、后台线程均停止。

使用例子三(设置join)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
#coding:utf-8
import threading
import time

def action(arg):
time.sleep(1)
print 'sub thread start!the thread name is:%s ' % threading.currentThread().getName()
print 'the arg is:%s ' %arg
time.sleep(1)

thread_list = [] #线程存放列表
for i in xrange(4):
t =threading.Thread(target=action,args=(i,))
t.setDaemon(True)
thread_list.append(t)

for t in thread_list:
t.start()

for t in thread_list:
t.join()



output:
sub thread start!the thread name is:Thread-2 sub thread start!the thread name is:Thread-1

the arg is:1 the arg is:0

sub thread start!the thread name is:Thread-4 sub thread start!the thread name is:Thread-3
the arg is:2

the arg is:3
[Finished in 2.2s]
设置join之后,主线程等待子线程全部执行完成后或者子线程超时后,主线程才结束

验证了 join()阻塞当前上下文环境的线程,直到调用此方法的线程终止或到达指定的timeout,即使设置了setDeamon(True)主线程依然要等待子线程结束。

使用例子四(join不妥当的用法,使多线程编程顺序执行)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
#coding:utf-8
import threading
import time

def action(arg):
time.sleep(1)
print 'sub thread start!the thread name is:%s ' % threading.currentThread().getName()
print 'the arg is:%s ' %arg
time.sleep(1)


for i in xrange(4):
t =threading.Thread(target=action,args=(i,))
t.setDaemon(True)
t.start()
t.join()

print 'main_thread end!'


ouput:
sub thread start!the thread name is:Thread-1
the arg is:0
sub thread start!the thread name is:Thread-2
the arg is:1
sub thread start!the thread name is:Thread-3
the arg is:2
sub thread start!the thread name is:Thread-4
the arg is:3
main_thread end!
[Finished in 8.2s]
可以看出此时,程序只能顺序执行,每个线程都被上一个线程的join阻塞,使得“多线程”失去了多线程意义。

0x2.3Lock、Rlock类

  由于线程之间随机调度:某线程可能在执行n条后,CPU接着执行其他线程。为了多个线程同时操作一个内存中的资源时不产生混乱,我们使用锁。

  • Lock(指令锁)是可用的最低级的同步指令。Lock处于锁定状态时,不被特定的线程拥有。Lock包含两种状态——锁定和非锁定,以及两个基本的方法。

    可以认为Lock有一个锁定池,当线程请求锁定时,将线程至于池中,直到获得锁定后出池。池中的线程处于状态图中的同步阻塞状态。

  • RLock(可重入锁)是一个可以被同一个线程请求多次的同步指令。RLock使用了“拥有的线程”和“递归等级”的概念,处于锁定状态时,RLock被某个线程拥有。拥有RLock的线程可以再次调用acquire(),释放锁时需要调用release()相同次数。

    可以认为RLock包含一个锁定池和一个初始值为0的计数器,每次成功调用 acquire()/release(),计数器将+1/-1,为0时锁处于未锁定状态。

简言之:Lock属于全局,Rlock属于线程。

构造方法:
Lock(),Rlock(),推荐使用Rlock()

实例方法:

  • acquire([timeout]): 尝试获得锁定。使线程进入同步阻塞状态。
  • release(): 释放锁。使用前线程必须已获得锁定,否则将抛出异常。

由于同一个进程之间的线程是内存共享的,所以当多个线程对同一个变量进行修改的时候,就会得到意想不到的结果。

让我们先看一个简单的例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
from threading import Thread, current_thread

num = 0

def calc():
global num
print 'thread %s is running...' % current_thread().name
for _ in xrange(10000):
num += 1
print 'thread %s ended.' % current_thread().name

if __name__ == '__main__':
print 'thread %s is running...' % current_thread().name

threads = []
for i in range(5):
threads.append(Thread(target=calc))
threads[i].start()
for i in range(5):
threads[i].join()

print 'global num: %d' % num
print 'thread %s ended.' % current_thread().name

在上面的代码中,我们创建了 5 个线程,每个线程对全局变量 num 进行 10000 次的 加 1 操作,这里之所以要循环 10000 次,是为了延长单个线程的执行时间,使线程执行时能出现中断切换的情况。现在问题来了,当这 5 个线程执行完毕时,全局变量的值是多少呢?是 50000 吗?

让我们看下执行结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
thread MainThread is running...
thread Thread-34 is running...
thread Thread-34 ended.
thread Thread-35 is running...
thread Thread-36 is running...
thread Thread-37 is running...
thread Thread-38 is running...
thread Thread-35 ended.
thread Thread-38 ended.
thread Thread-36 ended.
thread Thread-37 ended.
global num: 30668
thread MainThread ended.

我们发现 num 的值是 30668,事实上,num 的值是不确定的,你再运行一遍,会发现结果变了。

原因是因为 num += 1 不是一个原子操作,也就是说它在执行时被分成若干步:

计算 num + 1,存入临时变量 tmp 中;
将 tmp 的值赋给 num.
由于线程是交替运行的,线程在执行时可能中断,就会导致其他线程读到一个脏值。

为了保证计算的准确性,我们就需要给 num += 1 这个操作加上锁。当某个线程开始执行这个操作时,由于该线程获得了锁,因此其他线程不能同时执行该操作,只能等待,直到锁被释放,这样就可以避免修改的冲突。创建一个锁可以通过 threading.Lock() 来实现,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
from threading import Thread, current_thread, Lock

num = 0
lock = Lock()

def calc():
global num
print 'thread %s is running...' % current_thread().name
for _ in xrange(10000):
lock.acquire() # 获取锁
num += 1
lock.release() # 释放锁
print 'thread %s ended.' % current_thread().name

if __name__ == '__main__':
print 'thread %s is running...' % current_thread().name

threads = []
for i in range(5):
threads.append(Thread(target=calc))
threads[i].start()
for i in range(5):
threads[i].join()

print 'global num: %d' % num
print 'thread %s ended.' % current_thread().name

让我们看下执行结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
thread MainThread is running...
thread Thread-44 is running...
thread Thread-45 is running...
thread Thread-46 is running...
thread Thread-47 is running...
thread Thread-48 is running...
thread Thread-45 ended.
thread Thread-47 ended.
thread Thread-48 ended.
thread Thread-46 ended.
thread Thread-44 ended.
global num: 50000
thread MainThread ended.

Lock对比Rlock

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
#coding:utf-8

import threading
lock = threading.Lock() #Lock对象
lock.acquire()
lock.acquire() #产生了死锁。
lock.release()
lock.release()
print lock.acquire()


import threading
rLock = threading.RLock() #RLock对象
rLock.acquire()
rLock.acquire() #在同一线程内,程序不会堵塞。
rLock.release()
rLock.release()

0x2.4Condition类

  Condition(条件变量)通常与一个锁关联。需要在多个Contidion中共享一个锁时,可以传递一个Lock/RLock实例给构造方法,否则它将自己生成一个RLock实例。

  可以认为,除了Lock带有的锁定池外,Condition还包含一个等待池,池中的线程处于等待阻塞状态,直到另一个线程调用notify()/notifyAll()通知;得到通知后线程进入锁定池等待锁定。

构造方法:
Condition([lock/rlock])

实例方法:

  • acquire([timeout])/release(): 调用关联的锁的相应方法。
  • wait([timeout]): 调用这个方法将使线程进入Condition的等待池等待通知,并释放锁。使用前线程必须已获得锁定,否则将抛出异常。
  • notify(): 调用这个方法将从等待池挑选一个线程并通知,收到通知的线程将自动调用acquire()尝试获得锁定(进入锁定池);其他线程仍然在等待池中。调用这个方法不会释放锁定。使用前线程必须已获得锁定,否则将抛出异常。
  • notifyAll(): 调用这个方法将通知等待池中所有的线程,这些线程都将进入锁定池尝试获得锁定。调用这个方法不会释放锁定。使用前线程必须已获得锁定,否则将抛出异常。

例子1

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
#coding:utf-8

# encoding: UTF-8
import threading
import time

# 商品
product = None
# 条件变量
con = threading.Condition()


# 生产者方法
def produce():
global product

if con.acquire():
while True:
if product is None:
print 'produce...'
product = 'anything'

# 通知消费者,商品已经生产
con.notify()

# 等待通知
con.wait()
time.sleep(2)


# 消费者方法
def consume():
global product

if con.acquire():
while True:
if product is not None:
print 'consume...'
product = None

# 通知生产者,商品已经没了
con.notify()

# 等待通知
con.wait()
time.sleep(2)


t1 = threading.Thread(target=produce)
t2 = threading.Thread(target=consume)
t2.start()
t1.start()


output:
produce...
consume...
produce...
consume...
produce...
consume...
produce...
consume...
produce...
consume...
produce...
consume...
produce...
consume...
produce...
consume...
produce...
consume...
produce...
consume...
produce...
consume...
produce...
consume...
produce...
consume...
produce...
consume...
produce...

程序不断循环运行下去。重复生产消费过程。

例子2

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
#coding:utf-8


import threading
import time

condition = threading.Condition()
products = 0

class Producer(threading.Thread):
def run(self):
global products
while True:
if condition.acquire():
if products < 10:
products += 1;
print "Producer(%s):deliver one, now products:%s" %(self.name, products)
condition.notify()#不释放锁定,因此需要下面一句
condition.release()
else:
print "Producer(%s):already 10, stop deliver, now products:%s" %(self.name, products)
condition.wait();#自动释放锁定
time.sleep(2)

class Consumer(threading.Thread):
def run(self):
global products
while True:
if condition.acquire():
if products > 1:
products -= 1
print "Consumer(%s):consume one, now products:%s" %(self.name, products)
condition.notify()
condition.release()
else:
print "Consumer(%s):only 1, stop consume, products:%s" %(self.name, products)
condition.wait();
time.sleep(2)

if __name__ == "__main__":
for p in range(0, 2):
p = Producer()
p.start()

for c in range(0, 3):
c = Consumer()
c.start()

0x2.5Event类

 Event(事件)是最简单的线程通信机制之一:一个线程通知事件,其他线程等待事件。Event内置了一个初始为False的标志,当调用set()时设为True,调用clear()时重置为 False。wait()将阻塞线程至等待阻塞状态。

  Event其实就是一个简化版的 Condition。Event没有锁,无法使线程进入同步阻塞状态。

构造方法:
Event()

实例方法:

  • isSet(): 当内置标志为True时返回True。
  • set(): 将标志设为True,并通知所有处于等待阻塞状态的线程恢复运行状态。
  • clear(): 将标志设为False。
  • wait([timeout]): 如果标志为True将立即返回,否则阻塞线程至等待阻塞状态,等待其他线程调用set()。

例子1

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
# encoding: UTF-8
import threading
import time

event = threading.Event()
def func():
# 等待事件,进入等待阻塞状态
print '%s wait for event...' % threading.currentThread().getName()
event.wait()
# 收到事件后进入运行状态
print '%s recv event.' % threading.currentThread().getName()
t1 = threading.Thread(target=func)
t2 = threading.Thread(target=func)
t1.start()
t2.start()

time.sleep(2)

# 发送事件通知
print 'MainThread set event.'
event.set()

0x2.6 timer类

Timer(定时器)是Thread的派生类,用于在指定时间后调用一个方法。
构造方法:
Timer(interval, function, args=[], kwargs={})

  • interval: 指定的时间
  • function: 要执行的方法
  • args/kwargs: 方法的参数

实例方法:
Timer从Thread派生,没有增加实例方法。

1
2
3
4
5
6
7
# encoding: UTF-8
import threading
def func():
print 'hello timer!'
timer = threading.Timer(5, func)
timer.start()
线程延迟5秒后执行。

0x2.7local类

local是一个小写字母开头的类,用于管理 thread-local(线程局部的)数据。对于同一个local,线程无法访问其他线程设置的属性;线程设置的属性不会被其他线程设置的同名属性替换。

  可以把local看成是一个“线程-属性字典”的字典,local封装了从自身使用线程作为 key检索对应的属性字典、再使用属性名作为key检索属性值的细节。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# encoding: UTF-8
import threading

local = threading.local()
local.tname = 'main'
def func():
local.tname = 'notmain'
print local.tname
t1 = threading.Thread(target=func)
t1.start()
t1.join()
print local.tname

output:
notmain
main
[Finished in 0.2s]

具体可看 http://funhacks.net/explore-python/Process-Thread-Coroutine/threadlocal.html

0x2.8GIL 锁

Python 的线程虽然是真正的线程,但解释器执行代码时,有一个 GIL 锁(Global Interpreter Lock),任何 Python 线程执行前,必须先获得 GIL 锁。每执行 100 条字节码,解释器就自动释放 GIL 锁,让别的线程有机会执行。这个 GIL 全局锁实际上把所有线程的执行代码都给上了锁,所以,多线程在 Python 中只能交替执行,即使 100 个线程跑在 100 核 CPU 上,也只能用到 1 个核。

GIL 是 Python 解释器设计的历史遗留问题,通常我们用的解释器是官方实现的 CPython,要真正利用多核,除非重写一个不带 GIL 的解释器。所以,在 Python 如果一定要通过多线程利用多核,那只能通过 C 扩展来实现。

因而,多线程的并发在 Python 中就是一个美丽的梦,如果想真正实现多核任务,还是通过多进程来实现吧。

0x3 协程

具体可看此篇文章:https://thief.one/2017/02/20/Python%E5%8D%8F%E7%A8%8B/

0x3.1协程的理解

概念

协程,又称微线程,纤程,英文名Coroutine。协程的作用,是在执行函数A时,可以随时中断,去执行函数B,然后中断继续执行函数A(可以自由切换)。但这一过程并不是函数调用(没有调用语句),这一整个过程看似像多线程,然而协程只有一个线程执行。

优势

  • 执行效率极高,因为子程序切换(函数)不是线程切换,由程序自身控制,没有切换线程的开销。所以与多线程相比,线程的数量越多,协程性能的优势越明显。
  • 不需要多线程的锁机制,因为只有一个线程,也不存在同时写变量冲突,在控制共享资源时也不需要加锁,因此执行效率高很多。
      

说明:协程可以处理IO密集型程序的效率问题,但是处理CPU密集型不是它的长处,如要充分发挥CPU利用率可以结合多进程+协程。

0x3.2Python2.x协程

python2.x协程应用:

  • yield
  • gevent

0x3.2.1 Gevent

gevent是第三方库,通过greenlet实现协程,其基本思想:
当一个greenlet遇到IO操作时,比如访问网络,就自动切换到其他的greenlet,等到IO操作完成,再在适当的时候切换回来继续执行。由于IO操作非常耗时,经常使程序处于等待状态,有了gevent为我们自动切换协程,就保证总有greenlet在运行,而不是等待IO。

Gevent使用说明

  • monkey可以使一些阻塞的模块变得不阻塞,机制:遇到IO操作则自动切换,手动切换可以用gevent.sleep(0)(将爬虫代码换成这个,效果一样可以达到切换上下文)
  • gevent.spawn() 方法会创建一个新的greenlet协程对象,并运行它。
  • gevent.joinall() 方法会等待所有传入的greenlet协程运行结束后再退出,这个方法可以接受一个”timeout”参数来设置超时时间,单位是秒。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
#! -*- coding:utf-8 -*-
import gevent
from gevent import monkey;monkey.patch_all()
import urllib2
def get_body(i):
print "start",i
urllib2.urlopen("http://cn.bing.com")
print "end",i
tasks=[gevent.spawn(get_body,i) for i in range(3)]
gevent.joinall(tasks)

output:
start 0
start 1
start 2
end 0
end 2
end 1
[Finished in 11.7s]

说明:从结果上来看,执行get_body的顺序应该先是输出”start”,然后执行到urllib2时碰到IO堵塞,则会自动切换运行下一个程序(继续执行get_body输出start),直到urllib2返回结果,再执行end。也就是说,程序没有等待urllib2请求网站返回结果,而是直接先跳过了,等待执行完毕再回来获取返回值。值得一提的是,在此过程中,只有一个线程在执行,因此这与多线程的概念是不一样的。
换成多线程的代码看看:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
#! -*- coding:utf-8 -*-
import threading
import urllib2
def get_body(i):
print "start",i
urllib2.urlopen("http://cn.bing.com")
print "end",i
for i in range(3):
t=threading.Thread(target=get_body,args=(i,))
t.start()

output:
start 0
start 1
start 2
end 2
end 0
end 1
[Finished in 11.7s]

说明:从结果来看,多线程与协程的效果一样,都是达到了IO阻塞时切换的功能。不同的是,多线程切换的是线程(线程间切换),协程切换的是上下文(可以理解为执行的函数)。而切换线程的开销明显是要大于切换上下文的开销,因此当线程越多,协程的效率就越比多线程的高。(猜想多进程的切换开销应该是最大的)

0x3.3Python3.x协程

详情可看此篇;https://thief.one/2018/06/21/1/

0x3.3.1协程函数(异步函数)

0x3.3.1.1创建协程函数

先来看下普通函数:

1
2
3
4
5
6
7
8
9
10
def test1():
print("1")
print("2")
def test2():
print("3")
print("4")
a = test1()
b = test2()
print(a,type(a))
print(b,type(b))

运行以上代码得到结果:

1
2
3
4
5
6
1
2
3
4
None <class 'NoneType'>
None <class 'NoneType'>

说明:程序顺序执行了test1、test2函数,在调用函数的时候就自动进入了函数体,并执行了函数的内容。

然后使用async关键词将普通函数变成协程函数,即异步函数:

1
2
3
4
5
6
7
8
async def test1():
print("1")
print("2")
async def test2():
print("3")
print("4")
print(test1())
print(test2())

运行以上代码得到结果:

1
2
3
4
5
6
<coroutine object test1 at 0x109f4c620>
asyncio_python3_test.py:16: RuntimeWarning: coroutine 'test1' was never awaited
print(test1())
<coroutine object test2 at 0x109f4c620>
asyncio_python3_test.py:17: RuntimeWarning: coroutine 'test2' was never awaited
print(test2())

说明:忽略结果中的告警,可以看到调用函数test1、test2的时候,并没有进入函数体且执行函数内容,而是返回了一个coroutine(协程对象)。

除了函数外,类的方法也可以使用async关键词将其变成协程方法:

1
2
3
class test:
async def run(self):
print("1")

0x3.3.1.2执行协程函数

   前面我们成功创建了协程函数,并且在调用函数的时候返回了一个协程对象,那么怎么进入函数体并执行函数内容呢?类似于生成器,可以使用send方法执行函数,修改下前面的代码:

1
2
3
4
5
6
7
8
9
10
async def test1():
print("1")
print("2")
async def test2():
print("3")
print("4")
a = test1()
b = test2()
a.send(None)
b.send(None)

运行以上代码得到以下结果:

1
2
3
4
5
6
7
1
2
Traceback (most recent call last):
File "asyncio_python3_test.py", line 19, in <module>
a.send(None)
StopIteration
sys:1: RuntimeWarning: coroutine 'test2' was never awaited

   说明:程序先执行了test1协程函数,当test1执行完时报了StopIteration异常,这是协程函数执行完饭回的一个异常,我们可以用try except捕捉,来用判断协程函数是否执行完毕。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
async def test1():
print("1")
print("2")
async def test2():
print("3")
print("4")
a = test1()
b = test2()
try:
a.send(None) # 可以通过调用 send 方法,执行协程函数
except StopIteration as e:
print(e.value)
# 协程函数执行结束时会抛出一个StopIteration 异常,标志着协程函数执行结束,返回值在value中
pass
try:
b.send(None) # 可以通过调用 send 方法,执行协程函数
except StopIteration:
print(e.value)
# 协程函数执行结束时会抛出一个StopIteration 异常,标志着协程函数执行结束,返回值在value中
pass

运行以上代码得到以下结果:

1
2
3
4
1
2
3
4

   说明:程序先执行了test1函数,等到test1函数执行完后再执行test2函数。从执行过程上来看目前协程函数与普通函数没有区别,并没有实现异步函数,那么如何交叉运行协程函数呢?

0x3.3.1.3交叉执行协程函数(await)

   通过以上例子,我们发现定义协程函数可以使用async关键词,执行函数可以使用send方法,那么如何实现在两个协程函数间来回切换执行呢?这里需要使用await关键词,修改一下代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
import asyncio
async def test1():
print("1")
await asyncio.sleep(1) # asyncio.sleep(1)返回的也是一个协程对象
print("2")
async def test2():
print("3")
print("4")
a = test1()
b = test2()
try:
a.send(None) # 可以通过调用 send 方法,执行协程函数
except StopIteration:
# 协程函数执行结束时会抛出一个StopIteration 异常,标志着协程函数执行结束
pass
try:
b.send(None) # 可以通过调用 send 方法,执行协程函数
except StopIteration:
pass

运行以上函数得到以下结果:

1
2
3
1
3
4

   说明:程序先执行test1协程函数,在执行到await时,test1函数停止了执行(阻塞);接着开始执行test2协程函数,直到test2执行完毕。从结果中,我们可以看到,直到程序运行完毕,test1函数也没有执行完(没有执行print(“2”)),那么如何使test1函数执行完毕呢?可以使用asyncio自带的方法循环执行协程函数。

0x3.3.1.4 await与阻塞

   使用async可以定义协程对象,使用await可以针对耗时的操作进行挂起,就像生成器里的yield一样,函数让出控制权。协程遇到await,事件循环将会挂起该协程,执行别的协程,直到其他的协程也挂起或者执行完毕,再进行下一个协程的执行,协程的目的也是让一些耗时的操作异步化。

注意点:await后面跟的必须是一个Awaitable对象,或者实现了相应协议的对象,查看Awaitable抽象类的代码,表明了只要一个类实现了await方法,那么通过它构造出来的实例就是一个Awaitable,并且Coroutine类也继承了Awaitable。

0x3.3.1.5自动循环执行协程函数

   通过前面介绍我们知道执行协程函数需要使用send方法,但一旦协程函数执行过程中切换到其他函数了,那么这个函数就不在被继续运行了,并且使用sned方法不是很高效。那么如何在执行整个程序过程中,自动得执行所有的协程函数呢,就如同多线程、多进程那样,隐式得执行而不是显示的通过send方法去执行函数。

0x3.3.1.5.1事件循环方法

前面提到的问题就需要用到事件循环方法去解决,即asyncio.get_event_loop方法,修改以上代码如下:

1
2
3
4
5
6
7
8
9
10
import asyncio
async def test1():
print("1")
await test2()
print("2")
async def test2():
print("3")
print("4")
loop = asyncio.get_event_loop()
loop.run_until_complete(test1())

运行以上代码得到以下结果:

1
2
3
4
1
3
4
2

说明:asyncio.get_event_loop方法可以创建一个事件循环,然后使用run_until_complete将协程注册到事件循环,并启动事件循环。

0x3.3.1.5.2task任务

由于协程对象不能直接运行,在注册事件循环的时候,其实是run_until_complete方法将协程包装成为了一个任务(task)对象。所谓task对象是Future类的子类,保存了协程运行后的状态,用于未来获取协程的结果。我们也可以手动将协程对象定义成task,修改以上代码如下:

1
2
3
4
5
6
7
8
9
10
11
import asyncio
async def test1():
print("1")
await test2()
print("2")
async def test2():
print("3")
print("4")
loop = asyncio.get_event_loop()
task = loop.create_task(test1())
loop.run_until_complete(task)

   说明:前面说到task对象保存了协程运行的状态,并且可以获取协程函数运行的返回值,那么具体该如何获取呢?这里可以分两种方式,一种需要绑定回调函数,另外一种则直接在运行完task任务后输出。值得一提的是,如果使用send方法执行函数,则返回值可以通过捕捉StopIteration异常,利用StopIteration.value获取。

0x3.3.1.5.3直接输出task结果

当协程函数运行结束后,我们需要得到其返回值,第一种方式就是等到task状态为finish时,调用task的result方法获取返回值。

1
2
3
4
5
6
7
8
9
10
11
12
13
import asyncio
async def test1():
print("1")
await test2()
print("2")
return "stop"
async def test2():
print("3")
print("4")
loop = asyncio.get_event_loop()
task = asyncio.ensure_future(test1())
loop.run_until_complete(task)
print(task.result())

运行以上代码得到以下结果:

1
2
3
4
5
1
3
4
2
stop
0x3.3.1.5.4回调函数

   获取返回值的第二种方法是可以通过绑定回调函数,在task执行完毕的时候可以获取执行的结果,回调的最后一个参数是future对象,通过该对象可以获取协程返回值。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
import asyncio
async def test1():
print("1")
await test2()
print("2")
return "stop"
async def test2():
print("3")
print("4")
def callback(future):
print('Callback:',future.result()) # 通过future对象的result方法可以获取协程函数的返回值
loop = asyncio.get_event_loop()
task = asyncio.ensure_future(test1()) # 创建task,test1()是一个协程对象
task.add_done_callback(callback) # 绑定回调函数
loop.run_until_complete(task)

运行以上代码得到以下结果:

1
2
3
4
5
1
3
4
2
Callback: stop

如果回调函数需要接受多个参数,可以通过偏函数导入,修改代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
import asyncio
import functools
async def test1():
print("1")
await test2()
print("2")
return "stop"
async def test2():
print("3")
print("4")
def callback(param1,param2,future):
print(param1,param2)
print('Callback:',future.result())
loop = asyncio.get_event_loop()
task = asyncio.ensure_future(test1())
task.add_done_callback(functools.partial(callback,"param1","param2"))
loop.run_until_complete(task)

说明:回调函数中的future对象就是创建的task对象。

0x3.3.1.5.5future对象

   future对象有几个状态:Pending、Running、Done、Cancelled。创建future的时候,task为pending,事件循环调用执行的时候当然就是running,调用完毕自然就是done,如果需要停止事件循环,就需要先把task取消,可以使用asyncio.Task获取事件循环的task。

0x3.3.1.5.6协程停止

   前面介绍了使用事件循环执行协程函数,那么怎么停止执行呢?在停止执行协程前,需要先取消task,然后再停止loop事件循环。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
import asyncio
async def test1():
print("1")
await asyncio.sleep(3)
print("2")
return "stop"
tasks = [
asyncio.ensure_future(test1()),
asyncio.ensure_future(test1()),
asyncio.ensure_future(test1()),
]
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(asyncio.wait(tasks))
except KeyboardInterrupt as e:
for task in asyncio.Task.all_tasks():
task.cancel()
loop.stop()
loop.run_forever()
finally:
loop.close()

运行以上代码,按ctrl+c可以结束执行。

0x3.3.2本文中用到的一些概念及方法

event_loop事件循环:程序开启一个无限的循环,当把一些函数注册到事件循环上时,满足事件发生条件即调用相应的函数。

  • coroutine协程对象:指一个使用async关键字定义的函数,它的调用不会立即执行函数,而是会返回一个协程对象,协程对象需要注册到事件循环,由事件循环调用。
  • task任务:一个协程对象就是一个原生可以挂起的函数,任务则是对协程进一步封装,其中包含任务的各种状态。
  • future:代表将来执行或没有执行的任务的结果,它和task上没有本质的区别
  • async/await关键字:python3.5用于定义协程的关键字,async定义一个协程,await用于挂起阻塞的异步调用接口。

0x3.3.3并发与并行

   并发通常指有多个任务需要同时进行,并行则是同一时刻有多个任务执行。用多线程、多进程、协程来说,协程实现并发,多线程与多进程实现并行。

0x3.3.3.1asyncio协程如何实现并发

   asyncio想要实现并发,就需要多个协程来完成任务,每当有任务阻塞的时候就await,然后其他协程继续工作,这需要创建多个协程的列表,然后将这些协程注册到事件循环中。这里指的多个协程,可以是多个协程函数,也可以是一个协程函数的多个协程对象。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
import asyncio
async def test1():
print("1")
await asyncio.sleep(1)
print("2")
return "stop"
a = test1()
b = test1()
c = test1()
tasks = [
asyncio.ensure_future(a),
asyncio.ensure_future(b),
asyncio.ensure_future(c),
]
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks)) # 注意asyncio.wait方法
for task in tasks:
print("task result is ",task.result())

运行以上代码得到以下结果:

1
2
3
4
5
6
7
8
9
1
1
1
2
2
2
task result is stop
task result is stop
task result is stop

说明:代码先是定义了三个协程对象,然后通过asyncio.ensure_future方法创建了三个task,并且将所有的task加入到了task列表,最终使用loop.run_until_complete将task列表添加到事件循环中。

0x3.3.4协程爬虫

   前面介绍了如何使用async与await创建协程函数,使用asyncio.get_event_loop创建事件循环并执行协程函数。例子很好地展示了协程并发的高效,但在实际应用场景中该如何开发协程程序?比如说异步爬虫。我尝试用requests模块、urllib模块写异步爬虫,但实际操作发现并不支持asyncio异步,因此可以使用aiohttp模块编写异步爬虫。

0x3.3.4.1aiohttp实现

1
2
3
4
5
6
7
8
9
10
11
import asyncio
import aiohttp
async def run(url):
print("start spider ",url)
async with aiohttp.ClientSession() as session:
async with session.get(url) as resp:
print(resp.url)
url_list = ["https://thief.one","https://home.nmask.cn","https://movie.nmask.cn","https://tool.nmask.cn"]
tasks = [asyncio.ensure_future(run(url)) for url in url_list]
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))

运行以上代码得到以下结果:

1
2
3
4
5
6
7
8
start spider  https://thief.one
start spider https://home.nmask.cn
start spider https://movie.nmask.cn
start spider https://tool.nmask.cn
https://movie.nmask.cn
https://home.nmask.cn
https://tool.nmask.cn
https://thief.one

说明:aiohttp基于asyncio实现,既可以用来写webserver,也可以当爬虫使用。

0x3.3.4.2requests实现

   由于requests模块阻塞了客户代码与asycio事件循环的唯一线程,因此在执行调用时,整个应用程序都会冻结,但如果一定要用requests模块,可以使用事件循环对象的run_in_executor方法,通过run_in_executor方法来新建一个线程来执行耗时函数,因此可以这样修改代码实现:

1
2
3
4
5
6
7
8
9
10
11
12
import asyncio
import requests
async def run(url):
print("start ",url)
loop = asyncio.get_event_loop()
response = await loop.run_in_executor(None, requests.get, url)
print(response.url)

url_list = ["https://thief.one","https://home.nmask.cn","https://movie.nmask.cn","https://tool.nmask.cn"]
tasks = [asyncio.ensure_future(run(url)) for url in url_list]
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))

如果要给requests带上参数,可以使用functools:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
import asyncio
import requests
import functools
async def run(url):
print("start ",url)
loop = asyncio.get_event_loop()
try:
response = await loop.run_in_executor(None,functools.partial(requests.get,url=url,params="",timeout=1))
except Exception as e:
print(e)
else:
print(response.url)
url_list = ["https://thief.one","https://home.nmask.cn","https://movie.nmask.cn","https://tool.nmask.cn"]
tasks = [asyncio.ensure_future(run(url)) for url in url_list]
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))

0x3.3.5asyncio中使用阻塞函数

   如同前面介绍如何在asyncio中使用requests模块一样,如果想在asyncio中使用其他阻塞函数,该怎么实现呢?虽然目前有异步函数支持asyncio,但实际问题是大部分IO模块还不支持asyncio。

阻塞函数在asyncio中使用的问题
   阻塞函数(例如io读写,requests网络请求)阻塞了客户代码与asycio事件循环的唯一线程,因此在执行调用时,整个应用程序都会冻结。

解决方案
   这个问题的解决方法是使用事件循环对象的run_in_executor方法。asyncio的事件循环在背后维护着一个ThreadPoolExecutor对象,我们可以调用run_in_executor方法,把可调用对象发给它执行,即可以通过run_in_executor方法来新建一个线程来执行耗时函数。

run_in_executor方法

1
AbstractEventLoop.run_in_executor(executor, func, *args)
  • executor 参数应该是一个 Executor 实例。如果为 None,则使用默认 executor。
  • func 就是要执行的函数
  • args 就是传递给 func 的参数
    实际例子(使用time.sleep()):
1
2
3
4
5
6
7
8
9
10
11
12
13
14
import asyncio
import time
async def run(url):
print("start ",url)
loop = asyncio.get_event_loop()
try:
await loop.run_in_executor(None,time.sleep,1)
except Exception as e:
print(e)
print("stop ",url)
url_list = ["https://thief.one","https://home.nmask.cn","https://movie.nmask.cn","https://tool.nmask.cn"]
tasks = [asyncio.ensure_future(run(url)) for url in url_list]
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))

运行以上代码得到以下函数:

1
2
3
4
5
6
7
8
start  https://thief.one
start https://home.nmask.cn
start https://movie.nmask.cn
start https://tool.nmask.cn
stop https://thief.one
stop https://movie.nmask.cn
stop https://home.nmask.cn
stop https://tool.nmask.cn

说明:有了run_in_executor方法,我们就可以使用之前熟悉的模块创建协程并发了,而不需要使用特定的模块进行IO异步开发。

参考文章:
进程线程协程:http://funhacks.net/explore-python/Process-Thread-Coroutine/process.html
Python之路:(十五)进程、线程和协程:http://www.liangxiansen.cn/2016/08/08/python-thread-process-gevent/
Python中多进程之间的数据共享:https://docs.lvrui.io/2016/07/24/Python%E4%B8%AD%E5%A4%9A%E8%BF%9B%E7%A8%8B%E4%B9%8B%E9%97%B4%E7%9A%84%E6%95%B0%E6%8D%AE%E5%85%B1%E4%BA%AB/
多线程总结:https://www.cnblogs.com/tkqasn/p/5700281.html

Python协程:https://thief.one/2017/02/20/Python%E5%8D%8F%E7%A8%8B/
Python3.5协程学习研究:https://thief.one/2018/06/21/1/

FROM :blog.cfyqy.com | Author:cfyqy

  • 左青龙
  • 微信扫一扫
  • weinxin
  • 右白虎
  • 微信扫一扫
  • weinxin
admin
  • 本文由 发表于 2022年1月6日01:34:32
  • 转载请保留本文链接(CN-SEC中文网:感谢原作者辛苦付出):
                   python进程、线程和协程http://cn-sec.com/archives/721872.html

发表评论

匿名网友 填写信息