4.2 版中的新功能。
使用类似于标准库提供给线程的同步原语来协调协程。 这些类与标准库的 asyncio
包中提供的类非常相似。
请注意,这些原语实际上不是线程安全的,并且不能用来代替标准库的线程模块中的那些原语——它们旨在协调单线程应用程序中的 Tornado 协程,而不是保护多线程应用程序中的共享对象。
一个condition
允许一个或多个协程等待直到收到通知。
与标准 threading.Condition
类似,但不需要获取和释放的底层锁。
使用 Condition
,协程可以等待其他协程的通知:
from tornado import gen
from tornado.ioloop import IOLoop
from tornado.locks import Condition
condition = Condition()
async def waiter():
print("I'll wait right here")
await condition.wait()
print("I'm done waiting")
async def notifier():
print("About to notify")
condition.notify()
print("Done notifying")
async def runner():
# Wait for waiter() and notifier() in parallel
await gen.multi([waiter(), notifier()])
IOLoop.current().run_sync(runner)
结果为:
I'll wait right here
About to notify
Done notifying
I'm done waiting
wait
接受一个可选的 timeout
参数,它可以是一个绝对时间戳:
io_loop = IOLoop.current()
# Wait up to 1 second for a notification.
await condition.wait(timeout=io_loop.time() + 1)
datetime.timedelta
表示相对于当前时间的超时:
# Wait up to 1 second.
await condition.wait(timeout=datetime.timedelta(seconds=1))
如果在截止日期之前没有通知,则该方法返回 False。
如果条件被通知,则返回一个 Future
解析 True
,或者在超时后解析为 False
。
唤醒n个waiters
唤醒所有waiters
一个事件会阻塞协程,直到其内部标志设置为 True
。
类似于threading.Event
。
协程可以等待设置事件。 一旦设置,对 yield event.wait()
的调用将不会阻塞,除非事件已被清除:
from tornado import gen
from tornado.ioloop import IOLoop
from tornado.locks import Event
event = Event()
async def waiter():
print("Waiting for event")
await event.wait()
print("Not waiting this time")
await event.wait()
print("Done")
async def setter():
print("About to set the event")
event.set()
async def runner():
await gen.multi([waiter(), setter()])
IOLoop.current().run_sync(runner)
结果如下:
Waiting for event
About to set the event
Not waiting this time
Done
如果内部标志为True
,则返回True
将内部标志设置为 True
。 所有的waiters都被唤醒了。
设置标志后调用 wait
不会阻塞。
将内部标志重置为 False
。
调用 wait
将阻塞,直到调用 set
。
阻塞直到内部标志为True
。
返回一个 awaitable
,它在超时后引发 tornado.util.TimeoutError
。
在阻塞之前可以获取固定次数的锁。
信号量管理一个计数器,表示释放调用的数量减去获取调用的数量,再加上一个初始值。 如果需要,acquire
方法会阻塞,直到它可以返回而不使计数器为负。
信号量限制对共享资源的访问。 一次允许两个worker访问:
from tornado import gen
from tornado.ioloop import IOLoop
from tornado.locks import Semaphore
sem = Semaphore(2)
async def worker(worker_id):
await sem.acquire()
try:
print("Worker %d is working" % worker_id)
await use_some_resource()
finally:
print("Worker %d is done" % worker_id)
sem.release()
async def runner():
# Join all workers.
await gen.multi([worker(i) for i in range(3)])
IOLoop.current().run_sync(runner)
结果如下:
Worker 0 is working
Worker 1 is working
Worker 0 is done
Worker 2 is working
Worker 1 is done
Worker 2 is done
worker 0 和 1 被允许同时运行,但worker 2 等到信号量被worker 0 释放一次。
信号量可以用作异步上下文管理器:
async def worker(worker_id):
async with sem:
print("Worker %d is working" % worker_id)
await use_some_resource()
# Now the semaphore has been released.
print("Worker %d is done" % worker_id)
为了与旧版本的 Python 兼容,acquire
是一个上下文管理器,因此 worker 也可以写成:
@gen.coroutine
def worker(worker_id):
with (yield sem.acquire()):
print("Worker %d is working" % worker_id)
yield use_some_resource()
# Now the semaphore has been released.
print("Worker %d is done" % worker_id)
增加计数器并唤醒一个waiter。
减少计数器。 返回一个可等待的。
如果计数器为零,则阻塞并等待释放。awaitable
在截止日期后引发 TimeoutError
。
防止 release()
被调用太多次的信号量。
如果 release
增加信号量的值超过初始值,它会引发 ValueError
。 信号量主要用于保护容量有限的资源,因此信号量释放次数过多是错误的标志。
增加计数器并唤醒一个waiter。
减少计数器。 返回一个可等待的。
如果计数器为零,则阻塞并等待释放。 awaitable
在截止日期后引发 TimeoutError
。
协程的锁。
锁开始解锁,并立即获取锁。 当它被锁定时,产生acquire
的协程等待直到另一个协程调用release
。
释放未锁定的锁会引发 RuntimeError
。
Lock
可以用作带有 async with
语句的异步上下文管理器:
>>> from tornado import locks
>>> lock = locks.Lock()
>>>
>>> async def f():
... async with lock:
... # Do something holding the lock.
... pass
...
... # Now the lock is released.
为了与旧版本的 Python 兼容,acquire
方法异步返回一个常规上下文管理器:
>>> async def f2():
... with (yield lock.acquire()):
... # Do something holding the lock.
... pass
...
... # Now the lock is released.
尝试锁定。 返回一个awaitable
。
返回一个 awaitable
,它在超时后引发 tornado.util.TimeoutError
。
解锁。
排队等待获取的第一个协程获得锁。
如果未锁定,则引发 RuntimeError
。