avatar

刘刚刚的blog

采菊东篱下,悠然见南山🦥

  • 首页
  • 大模型应用
  • 常用软件/工具
  • Halo
  • 关于
Home Async
文章

Async

Posted 2025-02-28 Updated 2025-02- 28
By Administrator
23~29 min read

协程可以使用更少的资源实现“并发”的效果。

在开发过程中要注意协程只有在遇到await才会被切换,因此在web开发中如果出现会阻塞的代码(消耗时间的同步代码或者cpu密集型代码块)需要将其添加到其他进程或者线程,才能避免阻塞主其他的请求。

ps:与go相比,GIL锁的存在,在单线程中启动python时,相当于只有一套的GMP。

普通的异步调用

import asyncio
import time
​
async def say_after(delay, what):
    await asyncio.sleep(delay)
    print(what)
​
async def main():
    print(f"started at {time.strftime('%X')}")
​
    await say_after(1, 'hello')
    await say_after(2, 'world')
​
    print(f"finished at {time.strftime('%X')}")
​
asyncio.run(main())

create_task:异步并发调用

async def main():
    task1 = asyncio.create_task(
        say_after(1, 'hello'))
​
    task2 = asyncio.create_task(
        say_after(2, 'world'))
​
    print(f"started at {time.strftime('%X')}")
​
    # wait until both tasks are completed (should take
    # around 2 seconds.)
    await task1
    await task2
​
    print(f"finished at {time.strftime('%X')}")

TaskGroup:异步任务组

async def main():
    async with asyncio.TaskGroup() as tg:
        task1 = tg.create_task(
            say_after(1, 'hello'))
​
        task2 = tg.create_task(
            say_after(2, 'world'))
​
        print(f"started at {time.strftime('%X')}")
​
    # The await is implicit when the context manager exits.
​
    print(f"finished at {time.strftime('%X')}")

批量迭代创建任务:

注意:需要添加到集合,否则会导致任务执行期间被垃圾回收

background_tasks = set()
​
for i in range(10):
    task = asyncio.create_task(some_coro(param=i))
​
    # Add task to the set. This creates a strong reference.
    background_tasks.add(task)
​
    # To prevent keeping references to finished tasks forever,
    # make each task remove its own reference from the set after
    # completion:
    task.add_done_callback(background_tasks.discard)

任务组的使用及终止

通过在任务组中的任务抛出异常可以终止剩余任务的执行

import asyncio
from asyncio import TaskGroup
​
class TerminateTaskGroup(Exception):
    """Exception raised to terminate a task group."""
​
async def force_terminate_task_group():
    """Used to force termination of a task group."""
    raise TerminateTaskGroup()
​
async def job(task_id, sleep_time):
    print(f'Task {task_id}: start')
    await asyncio.sleep(sleep_time)
    print(f'Task {task_id}: done')
​
async def main():
    try:
        async with TaskGroup() as group:
            # spawn some tasks
            group.create_task(job(1, 0.5))
            group.create_task(job(2, 1.5))
            # sleep for 1 second
            await asyncio.sleep(1)
            # add an exception-raising task to force the group to terminate
            group.create_task(force_terminate_task_group())
    except* TerminateTaskGroup:
        pass
​
asyncio.run(main())

gather:并发的创建和运行任务

异常不会导致其他任务停止

import asyncio
​
async def factorial(name, number):
    f = 1
    for i in range(2, number + 1):
        print(f"Task {name}: compute factorial({number}), currently i={i}...")
        await asyncio.sleep(1)
        f *= i
    print(f"Task {name}: factorial({number}) = {f}")
    return f
​
async def main():
    # schedule three calls *concurrently*:
    L = await asyncio.gather(
        factorial("A", 2),
        factorial("B", 3),
        factorial("C", 4),
    )
    print(L)
​
asyncio.run(main())

wait_for:控制协程的运行时间

async def main():
    async with asyncio.timeout(10):
        await long_running_task()
        
# 其他
time_at
time_for
​
async def eternity():
    # Sleep for one hour
    await asyncio.sleep(3600)
    print('yay!')
​
async def main():
    # Wait for at most 1 second
    try:
        await asyncio.wait_for(eternity(), timeout=1.0)
    except TimeoutError:
        print('timeout!')
​
asyncio.run(main())

在其他thread中运行同步函数

def blocking_io():
    print(f"start blocking_io at {time.strftime('%X')}")
    # Note that time.sleep() can be replaced with any blocking
    # IO-bound operation, such as file operations.
    time.sleep(1)
    print(f"blocking_io complete at {time.strftime('%X')}")
​
async def main():
    print(f"started main at {time.strftime('%X')}")
​
    await asyncio.gather(
        asyncio.to_thread(blocking_io),
        asyncio.sleep(1))
​
    print(f"finished main at {time.strftime('%X')}")
​
​
asyncio.run(main())

与run_coroutine_threadsafe的区别

在非异步代码中需要运行协程,使用run_coroutine_threadsafe

import asyncio
import threading
​
# 定义一个协程
async def my_coroutine():
 await asyncio.sleep(1)
 return "Done"
​
# 在另一个线程中运行事件循环
def start_event_loop(loop):
 asyncio.set_event_loop(loop)
 loop.run_forever()
​
# 主线程中提交协程到事件循环
def submit_coroutine(loop):
 future = asyncio.run_coroutine_threadsafe(my_coroutine(), loop)
 print(future.result())  # 等待并获取结果
​
# 创建新的事件循环
loop = asyncio.new_event_loop()
​
# 启动事件循环线程
thread = threading.Thread(target=start_event_loop, args=(loop,))
thread.start()
​
# 在主线程中提交协程
submit_coroutine(loop)
​
# 关闭事件循环
loop.call_soon_threadsafe(loop.stop)
thread.join()

其他

​
# 通过将参数设置为0,可以在长时间的阻塞任务中,使得其他任务可以执行
asyncio.sleep
​
# 提高协程效率
# 只有在程序阻塞时才会切换,减少loop导致的性能开销
asyncio.create_eager_task_factory(custom_task_constructor)
​
# 防止协程被取消
asyncio.shield(aw)
​
# 当执行成功时,可以返回执行成功的时间
done, pending = await asyncio.wait(aws, *, timeout=None, return_when=ALL_COMPLETED)
​
# 遍历可等待的对象,获得
asyncio.as_completed

在以上函数外,python还提供了一系列的获取协程状态的函数。

问题:

  1. fastapi中,如果在async中调用了一个阻塞的函数,会阻塞其他请求吗

    会,可以将阻塞的函书放到其他线程或者进程(CPU密集)中

    # 放到进程中
    import concurrent.futures
    import math
    ​
    PRIMES = [
        112272535095293,
        112582705942171,
        112272535095293,
        115280095190773,
        115797848077099,
        1099726899285419]
    ​
    def is_prime(n):
        if n < 2:
            return False
        if n == 2:
            return True
        if n % 2 == 0:
            return False
    ​
        sqrt_n = int(math.floor(math.sqrt(n)))
        for i in range(3, sqrt_n + 1, 2):
            if n % i == 0:
                return False
        return True
    ​
    def main():
        with concurrent.futures.ProcessPoolExecutor() as executor:
            for number, prime in zip(PRIMES, executor.map(is_prime, PRIMES)):
                print('%d is prime: %s' % (number, prime))
    ​
    if __name__ == '__main__':
        main()
    # 放到线程中
    import asyncio
    
    def blocking_function():
        # 模拟阻塞操作
        import time
        time.sleep(5)
        return "Done"
    
    async def main():
        result = await asyncio.to_thread(blocking_function)
        print(result)
    
    asyncio.run(main())

License:  CC BY 4.0
Share

Further Reading

OLDER

大模型应用开发相关包/工具

NEWER

大模型返回中json_schema与json_mode的区别

Recently Updated

  • 文本切分-语义分割(Semantic Chunking)
  • dify 并发配置优化
  • Typing
  • 大模型返回中json_schema与json_mode的区别
  • Async

Trending Tags

Halo 运维 postgresql 设计模式 linux就该这么学 nas rag odoo python 文本切分

Contents

©2025 刘刚刚的blog. Some rights reserved.

Using the Halo theme Chirpy