gevent拾遗

 

  在前文已经介绍过了gevent的调度流程,本文介绍gevent一些重要的模块,包括Timeout,EventAsynResult, Semphore, socket patch,这些模块都涉及当前协程与hub的切换。本文分析的gevent版本为1.2

Timeout

  这个类在gevent.timeout模块,其作用是超时后在当前协程抛出异常,这样执行流程也强制回到了当前协程。看一个简单的例子:

 1 SLEEP = 6
 2 TIMEOUT = 5
 3 
 4 timeout = TimeoutTIMEOUT)
 5 timeout.start)
 6 
 7 def wait):
 8     gevent.sleepSLEEP)
 9     print'log in wait')
10 
11 begin = time.time)
12 try:
13     gevent.spawnwait).join)
14 except Timeout:
15     print'after %s catch Timeout Exception' % time.time) - begin))
16 finally:    
17     timeout.cancel)

  输出为:after 5.00100016594 catch Timeout Exception。可以看出,在5s之后在main协程抛出了Timeout异常(继承自BaseException)。Timeout的实现很简单,核心在start函数:

 1     def startself):
 2         """Schedule the timeout."""
 3         assert not self.pending, '%r is already started; to restart it, cancel it first' % self
 4         if self.seconds is None:  # "fake" timeout never expires)
 5             return
 6 
 7         if self.exception is None or self.exception is False or isinstanceself.exception, string_types):
 8             # timeout that raises self
 9             self.timer.startgetcurrent).throw, self)
10         else:  # regular timeout with user-provided exception
11             self.timer.startgetcurrent).throw, self.exception)

  从源码可以看到,在超时之后调用了getcurrent).throw),throw方法会切换协程,并抛出异常(在上面的代码中默认抛出Timeout异常)。使用Timeout有两点需要注意:

  第一:一定要记得在finally调用cancel,否则如果协程先于TIMEOUT时间恢复,之后还会抛出异常,例如下面的代码:

 1 import gevent
 2 from gevent import Timeout
 3 
 4 SLEEP = 4
 5 TIMEOUT = 5
 6 
 7 timeout = TimeoutTIMEOUT)
 8 timeout.start)
 9 
10 def wait):
11     gevent.sleepSLEEP)
12     print'log in wait')
13 
14 begin = time.time)
15 try:
16     gevent.spawnwait).join)
17 except Timeout:
18     print'after %s catch Timeout Exception'  % time.time) - begin))
19 # finally:    
20 #     timeout.cancel)
21 
22 gevent.sleep2)
23 print 'program will finish'

协程先于超时恢复

  上述的代码运行会抛出Timeout异常,在这个例子中,协程先于超时恢复(SLEEP < TIMEOUT),且没有在finally中调用Timeout.cancel。最后的两行保证程序不要过早结束退出,那么在hub调度的时候会重新抛出异常。

  由于Timeout实现了with协议__enter__和__exit__方法),更好的写法是将TImeout写在with语句中,如下面的代码:

 1 import gevent
 2 from gevent import Timeout
 3 
 4 SLEEP = 4
 5 TIMEOUT = 5
 6 
 7 
 8 def wait):
 9     gevent.sleepSLEEP)
10     print'log in wait')
11 
12 with TimeoutTIMEOUT):
13     begin = time.time)
14     try:
15         gevent.spawnwait).join)
16     except Timeout:
17         print'after %s catch Timeout Exception'  % time.time) - begin))
18 
19 gevent.sleep2)
20 print 'program will finish'

Timeout with

  第二:Timeout只是切换到当前协程,并不会取消已经注册的协程(上面通过spawn发起的协程),我们改改代码:

 1 import gevent
 2 from gevent import Timeout
 3 
 4 SLEEP = 6
 5 TIMEOUT = 5
 6 
 7 timeout = TimeoutTIMEOUT)
 8 timeout.start)
 9 
10 def wait):
11     gevent.sleepSLEEP)
12     print'log in wait')
13 
14 begin = time.time)
15 try:
16     gevent.spawnwait).join)
17 except Timeout:
18     print'after %s catch Timeout Exception'  % time.time) - begin))
19 finally:    
20     timeout.cancel)
21 
22 gevent.sleep2)
23 print 'program will finish'
24 # output:
25 # after 5.00100016594 catch Timeout Exception
26 # log in wait
27 # program will finish

Timeout不影响发起的协程

  从输出可以看到,即使因为超时切回了main greenlet,但spawn发起的协程并不受影响。如果希望超时取消之前发起的协程,那么可以在捕获到异常之后调用 Greenlet.kill

   第三:gevent对可能导致当前协程挂起的函数都提供了timeout参数,用于在指定时间到达之后恢复被挂起的协程。在函数内部会捕获Timeout异常,并不会抛出。例如:

 1 SLEEP = 6
 2 TIMEOUT = 5
 3 
 4 
 5 def wait):
 6     gevent.sleepSLEEP)
 7     print'log in wait')
 8 
 9 begin = time.time)
10 try:
11     gevent.spawnwait).joinTIMEOUT)
12 except Timeout:
13     print'after %s catch Timeout Exception' % time.time) - begin))
14 
15 print 'program will exit', time.time) - begin

函数的timeout参数

Event & AsyncResult:

  Event用来在Greenlet之间同步,tutorial上的例子简单明了:

 1 import gevent
 2 from gevent.event import Event
 3 
 4 '''
 5 Illustrates the use of events
 6 '''
 7 
 8 
 9 evt = Event)
10 
11 def setter):
12     '''After 3 seconds, wake all threads waiting on the value of evt'''
13     print'A: Hey wait for me, I have to do something')
14     gevent.sleep3)
15     print"Ok, I'm done")
16     evt.set)
17 
18 
19 def waiter):
20     '''After 3 seconds the get call will unblock'''
21     print"I'll wait for you")
22     evt.wait)  # blocking
23     print"It's about time")
24 
25 def main):
26     gevent.joinall[
27         gevent.spawnsetter),
28         gevent.spawnwaiter),
29         gevent.spawnwaiter),
30 
31     ])
32 
33 if __name__ == '__main__': main)

Event Example

  Event主要的两个方法是set和wait:wait等待事件发生,如果事件未发生那么挂起该协程;set通知事件发生,然后hub会唤醒所有wait在该事件的协程。从输出可知, 一次event触发可以唤醒所有在该event上等待的协程。AsyncResult同Event类似,只不过可以在协程唤醒的时候传值(有点类似generator的next send的区别)。接下来大致看看Event的set和wait方法。

  Event.wait的核心代码在gevent.event._AbstractLinkable._wait_core,其中_AbstractLinkable是Event的基类。_wait_core源码如下:

 1     def _wait_coreself, timeout, catch=Timeout):
 2         # The core of the wait implementation, handling
 3         # switching and linking. If *catch* is set to ),
 4         # a timeout that elapses will be allowed to be raised.
 5         # Returns a true value if the wait succeeded without timing out.
 6         switch = getcurrent).switch
 7         self.rawlinkswitch)
 8         try:
 9             timer = Timeout._start_new_or_dummytimeout)
10             try:
11                 try:
12                     result = self.hub.switch)
13                     if result is not self: # pragma: no cover
14                         raise InvalidSwitchError'Invalid switch into Event.wait): %r' % result, ))
15                     return True
16                 except catch as ex:
17                     if ex is not timer:
18                         raise
19                     # test_set_and_clear and test_timeout in test_threading
20                     # rely on the exact return values, not just truthish-ness
21                     return False
22             finally:
23                 timer.cancel)
24         finally:
25             self.unlinkswitch)

  首先是将当前协程的switch加入到Event的callback列表,然后切换到hub。

  接下来是set函数:

1     def setself):
2         self._flag = True # make event ready
3         self._check_and_notify)
1     def _check_and_notifyself):
2         # If this object is ready to be notified, begin the process.
3         if self.ready):
4             if self._links and not self._notifier:
5                 self._notifier = self.hub.loop.run_callbackself._notify_links)

  _check_and_notify函数通知hub调用_notify_links, 在这个函数中将调用Event的callback列表(记录的是之前各个协程的switch函数),这样就恢复了所有wait的协程。

Semaphore & Lock

  Semaphore是gevent提供的信号量,实例化为Semaphorevalue), value代表了可以并发的量。当value为1,就变成了互斥锁(Lock)。Semaphore提供了两个函数,acquire(P操作)和release(V操作)。当acquire操作导致资源数量将为0之后,就会在当前协程wait,源代码如下(gevent._semaphore.Semaphore.acquire):

 1     def acquireself, blocking=True, timeout=None):
 2         
 3         if self.counter > 0:
 4             self.counter -= 1
 5             return True
 6 
 7         if not blocking:
 8             return False
 9 
10         timeout = self._do_waittimeout)
11         if timeout is not None:
12             # Our timer expired.
13             return False
14 
15         # Neither our timer no another one expired, so we blocked until
16         # awoke. Therefore, the counter is ours
17         self.counter -= 1
18         assert self.counter >= 0
19         return True

  逻辑比较简单,如果counter数量大于0,那么表示可并发。否则进入wait,_do_wait的实现与Event.wait十分类似,都是记录当前协程的switch,并切换到hub。当资源足够切换回到当前协程,此时counter一定是大于0的。由于协程的并发并不等同于线程的并发,在任意时刻,一个线程内只可能有一个协程在调度,所以上面对counter的操作也不用加锁

Monkey-Patch

  对于python这种动态语言,在运行时替换模块、类、实例的属性都是非常容易的。我们以patch_socket为例:

>>> import socket
>>> printsocket.socket)
<class ‘gevent._socket2.socket’>
>>> from gevent import monkey
>>> monkey.patch_socket)
>>> printsocket.socket)
<class ‘gevent._socket2.socket’>
>>>

  可见在patch前后,同一个名字socket)所指向的对象是不一样的。在python2.x环境下,patch后的socket源码在gevent._socket2.py,如果是python3.x,那么对应的源码在gevent._socket3.py.。至于为什么patch之后就让原生的socket操作可以在协程之间协作,看两个函数socket.__init__ 和 socket.recv就明白了。

  __init__函数(gevent._socket2.socket.__init__):

 1     def __init__self, family=AF_INET, type=SOCK_STREAM, proto=0, _sock=None):
 2         if _sock is None:
 3             self._sock = _realsocketfamily, type, proto) # 原生的socket
 4             self.timeout = _socket.getdefaulttimeout)
 5         else:
 6             if hasattr_sock, '_sock'):
 7                 self._sock = _sock._sock
 8                 self.timeout = getattr_sock, 'timeout', False)
 9                 if self.timeout is False:
10                     self.timeout = _socket.getdefaulttimeout)
11             else:
12                 self._sock = _sock
13                 self.timeout = _socket.getdefaulttimeout)
14             if PYPY:
15                 self._sock._reuse)
16         self._sock.setblocking0) #设置成非阻塞
17         fileno = self._sock.fileno)
18         self.hub = get_hub)    # hub
19         io = self.hub.loop.io
20         self._read_event = iofileno, 1) # 监听事件
21         self._write_event = iofileno, 2)

  从init函数可以看到,patch后的socket还是会维护原生的socket对象,并且将原生的socket设置成非阻塞(line16),当一个socket是非阻塞时,如果读写数据没有准备好,那么会抛出EWOULDBLOCKEAGIN异常。最后两行注册socket的可读和可写事件。再来看看recv函数(gevent._socket2.socket.recv):

 1     def recvself, *args):
 2         sock = self._sock  # keeping the reference so that fd is not closed during waiting
 3         while True:
 4             try:
 5                 return sock.recv*args) # 如果数据准备好了,直接返回
 6             except error as ex:
 7                 if ex.args[0] != EWOULDBLOCK or self.timeout == 0.0:
 8                     raise
 9                 # QQQ without clearing exc_info test__refcount.test_clean_exit fails
10                 sys.exc_clear)
11             self._waitself._read_event) # 等待数据可读的watcher

   如果在while循环中读到了数据,那么直接返回。但实际很大概率数据并没有准备好,对于非阻塞socket,抛出EWOULDBLOCK异常(line7)。在第11行,调用wait,注册当前协程switch,并切换到hub,当read_event触发时,表示socket可读,这个时候就会切回当前协程,进入下一次while循环。

references:

http://sdiehl.github.io/gevent-tutorial/

http://www.cnblogs.com/xybaby/p/6370799.html

Published by

风君子

独自遨游何稽首 揭天掀地慰生平

发表回复

您的电子邮箱地址不会被公开。 必填项已用 * 标注