Python Tornado入门教程 Tornado 协程

2024-02-25 开发教程 Python Tornado入门教程 匿名 3

协程

协程是在 Tornado 中编写异步代码的推荐方式。协程使用 Python的​await​或​yield​关键字来暂停和恢复执行,而不是一连串的回调(在gevent等框架中看到的协作轻量级线程有时也称为协程,但在 Tornado 中,所有协程都使用显式上下文切换并被称为异步函数)
协程几乎和同步代码一样简单,但没有线程的开销。它们还通过减少可能发生上下文切换的位置数量,使并发更容易推理

例子:

async def fetch_coroutine(url):
http_client = AsyncHTTPClient()
response = await http_client.fetch(url)
return response.body

原生协程与装饰协程

Python 3.5 引入了​async​和​await​关键字(使用这些关键字的函数也称为“本机协程”)。为了与旧版本的 Python 兼容,您可以使用装饰器使用“装饰”或“基于产量”的协程tornado.gen.coroutine
尽可能推荐使用原生协程。仅在需要与旧版本的 Python 兼容时才使用修饰的协程。Tornado 文档中的示例通常会使用原生形式。
两种形式之间的转换通常很简单:

# Decorated:                    # Native:
# Normal function declaration
# with decorator # "async def" keywords
@gen.coroutine
def a(): async def a():
# "yield" all async funcs # "await" all async funcs
b = yield c() b = await c()
# "return" and "yield"
# cannot be mixed in
# Python 2, so raise a
# special exception. # Return normally
raise gen.Return(b) return b

下面概述了两种形式的协程之间的其他差异:

原生协程:

  • 通常更快。
  • 可以使用​async for​和​async with​语句使某些模式更简单。
  • 除非您知道 ​yield​和​await​他们,否则根本不要运行。装饰协程一旦被调用就可以开始“在后台”运行。请注意,对于这两种协程,使用​await​或​yield​是很重要的,这样任何异常都有可能发生。

装饰协程:

  • 包有额外的集成 ​concurrent.futures​,允许 ​executor.submit​直接产生结果。对于本机协程,请​IOLoop.run_in_executor​改用
  • 通过产生一个列表或字典来支持一些等待多个对象的简写。用于​tornado.gen.multi​在本机协程中执行此操作
  • 可以通过转换函数注册表支持与其他包的集成,包括 Twisted。要在本机协程中访问此功能,请使用 tornado.gen.convert_yielded
  • 总是返回一个Future对象。本机协程返回一个不是Future. 在 Tornado 中,两者大多可以互换。

如何运作

本节解释装饰协程的操作。原生协程在概念上相似,但由于与 Python 运行时的额外集成而稍微复杂一些

包含的函数​yield​是生成器。所有生成器都是异步的;当被调用时,它们返回一个生成器对象,而不是运行到完成。装饰器​@gen.coroutine​通过​yield​表达式与生成器通信,并通过返回一个Future

这是协程装饰器内部循环的简化版本:

# Simplified inner loop of tornado.gen.Runner
def run(self):
# send(x) makes the current yield return x.
# It returns when the next yield is reached
future = self.gen.send(self.next)
def callback(f):
self.next = f.result()
self.run()
future.add_done_callback(callback)

装饰器Future从生成器接收 a,等待(不阻塞)Future完成,然后“解包”并将结果作为表达式Future 的结果发送回生成器 。​yield​大多数异步代码从不直接接触类,除非立即将Future异步函数返回的值传递给​yield​表达式。

如何调用协程

协程不会以正常方式引发异常:它们引发的任何异常都将被困在等待对象中,直到它被产生。这意味着以正确的方式调用协程很重要,否则您可能会遇到未被注意到的错误:

async def divide(x, y):
return x / y
def bad_call():
# This should raise a ZeroDivisionError, but it won't because
# the coroutine is called incorrectly.
divide(1, 0)

在几乎所有情况下,任何调用协程的函数都必须是协程本身,并且在调用中使用​await​或者​yield​关键字。当您覆盖类中定义的方法时,请查阅文档以查看是否允许使用协程(文档应说明该方法“可能是协程”或“可能返回一个Future”):

async def good_call():
# await will unwrap the object returned by divide() and raise
# the exception.
await divide(1, 0)

有时你可能想“触发并忘记”一个协程而不等待它的结果。在这种情况下,建议使用IOLoop.spawn_callback,这使得IOLoop负责呼叫。如果失败,IOLoop将记录堆栈跟踪:

# The IOLoop will catch the exception and print a stack trace in
# the logs. Note that this doesn't look like a normal call, since
# we pass the function object to be called by the IOLoop.
IOLoop.current().spawn_callback(divide, 1, 0)

对于使用IOLoop.spawn_callback的函数,建议以这种方式使用​@gen.coroutine​,但对于使用​async def​的函数,则需要以这种方式使用(否则,协程运行程序将无法启动)。

最后,在程序的顶层,如果 IOLoop 尚未运行,您可以启动IOLoop,运行协程,然后IOLoop使用IOLoop.run_sync方法停止。这通常用于启动​main​面向批处理的程序的功能:

# run_sync() doesn't take arguments, so we must wrap the
# call in a lambda.
IOLoop.current().run_sync(lambda: divide(1, 0))

协程模式

调用阻塞函数

从协程调用阻塞函数的最简单方法是使用IOLoop.run_in_executor,它的返回值 ​Futures​与协程兼容:

async def call_blocking():
await IOLoop.current().run_in_executor(None, blocking_func, args)

并行性

multi函数接受值为列表和字典,并等待所有这些​Futures​:

from tornado.gen import multi
async def parallel_fetch(url1, url2):
resp1, resp2 = await multi([http_client.fetch(url1),
http_client.fetch(url2)])
async def parallel_fetch_many(urls):
responses = await multi ([http_client.fetch(url) for url in urls])
# responses is a list of HTTPResponses in the same order
async def parallel_fetch_dict(urls):
responses = await multi({url: http_client.fetch(url)
for url in urls})
# responses is a dict {url: HTTPResponse}

在装饰协程中,可以​yield​直接生成使用list 或dict:

@gen.coroutine
def parallel_fetch_decorated(url1, url2):
resp1, resp2 = yield [http_client.fetch(url1),
http_client.fetch(url2)]

交错

有时,保存一个Future而不是立即放弃它是有用的,这样你就可以在等待之前开始另一个操作。

from tornado.gen import convert_yielded
async def get(self):
# convert_yielded() starts the native coroutine in the background.
# This is equivalent to asyncio.ensure_future() (both work in Tornado).
fetch_future = convert_yielded(self.fetch_next_chunk())
while True:
chunk = yield fetch_future
if chunk is None: break
self.write(chunk)
fetch_future = convert_yielded(self.fetch_next_chunk())
yield self.flush()

这对装饰协程来说更容易一些,因为它们在调用时立即启动:

@gen.coroutine
def get(self):
fetch_future = self.fetch_next_chunk()
while True:
chunk = yield fetch_future
if chunk is None: break
self.write(chunk)
fetch_future = self.fetch_next_chunk()
yield self.flush()

循环

在本地协同程序中,可以使用​async for​。在较旧版本的Python中,使用协同路由进行循环是很棘手的,因为在​for​或​while​循环的每次迭代中都无法找到​yield​并捕获结果。相反,您需要将循环条件与访问结果分开,如本例中的Motor:

import motor
db = motor.MotorClient().test
@gen.coroutine
def loop_example(collection):
cursor = db.collection.find()
while (yield cursor.fetch_next):
doc = cursor.next_object()

在后台运行

PeriodicCallback通常不与coroutines一起使用。相反,协同程序可以包含​while True​:循环并使用tornado.gen.sleep:

async def minute_loop():
while True:
await do_something()
await gen.sleep(60)
# Coroutines that loop forever are generally started with
# spawn_callback().
IOLoop.current().spawn_callback(minute_loop)

有时可能需要一个更复杂的循环。例如,前一个循环每​60+N​秒运行一次,其中​N​是​do_something()​的运行时间。要准确地每60秒运行一次,请使用上面的交错模式:

async def minute_loop2():
while True:
nxt = gen.sleep(60) # Start the clock.
await do_something() # Run while the clock is ticking.
await nxt # Wait for the timer to run out.