异步编程

独立开发

视频

【python】asyncio的理解与入门,搞不明白协程?看这个视频就够了。

文字

知乎: https://zhuanlan.zhihu.com/p/720011200/edit

ppt

playground

协程入门

Goroutines: 有专门的调度器管理Goroutines的执行,结合了协程的轻量级特性和线程的能力,既支持协作式也支持抢占式的调度。

/*
https://gobyexample.com/goroutines
*/
package main

import (
	"fmt"
	"time"
)

func f(from string) {
	for i := 0; i < 3; i++ {
		fmt.Println(from, ":", i)
	}
}

func main() {
	// 同步调用
	f("direct")

	// 异步调用
	go f("goroutine")

	// 异步调用匿名函数
	go func(msg string) {
		fmt.Println(msg)
	}("going")

	// 用睡眠保证所有协程完成, 业务代码应该用WaitGroup
	time.Sleep(time.Second)
	fmt.Println("done")
}

asyncio综合案例

aiohttp + redis + mysql

"""
伪代码, io操作全用sleep代替

假想需求: 异步任务处理平台

1. 任务接收与入队:

* 用户通过api发送请求
* web服务器接收到请求后, 将其转化为任务对象,并将其放入到redis队列中。

* 此脚本是平台的工作者协程,不断地从队列中获取任务。
* 获取任务后, 协程开始调用api来处理任务(如数据分析、图像处理等)。
* 处理完成后, 将结果保存到MySQL数据库中

docker启动redis

* 异步redis: https://redis.readthedocs.io/en/latest/examples/asyncio_examples.html
* 异步请求: https://aiohttp.readthedocs.io/
* brpop: https://redis.io/docs/latest/commands/brpop/
"""

import asyncio

import aiohttp
from redis.asyncio import Redis, ConnectionPool

QUEUE_KEY = "schedule:task"


def get_redis_client():
    pool = ConnectionPool(max_connections=1000, decode_responses=True)
    return Redis.from_pool(pool)


async def get_mysql_pool():
    """创建数据库的连接池"""
    # 用aiomysql: https://github.com/aio-libs/aiomysql
    await asyncio.sleep(0.01)


async def call_baidu(word):
    async with aiohttp.ClientSession() as session:
        async with session.get(f"https://www.baidu.com/s?wd={word}") as resp:
            if resp.status == 200:
                return await resp.text()
            else:
                raise AssertionError(await resp.text())


async def process_task(word):
    # todo: 完善伪需求, 这里开asyncio.gather并发请求多个接口
    #  改为接收元组参数取得a和b
    #  然后并发请求加减乘除四个接口
    print("请求接口:", word)
    return await call_baidu(word)


async def write_result_to_db(task, pool):
    # todo: 解析html, 提取数据
    #  cpu计算可以放到其它线程去处理
    result = task.result()
    print("将结果写入数据库")
    await asyncio.sleep(0.2)


async def main():
    client = get_redis_client()
    print(f"Ping successful: {await client.ping()}")

    pool = await get_mysql_pool()
    try:
        while True:
            _, task_id = await client.brpop([QUEUE_KEY])
            task = asyncio.create_task(process_task(task_id))
            task.add_done_callback(lambda t: asyncio.create_task(write_result_to_db(t, pool)))
    except Exception as e:
        print(str(e))
    finally:
        await client.aclose()


if __name__ == '__main__':
    asyncio.run(main())

goroutine池

借助第三方库 ants 帮忙管理go协程

package main

import (
	"fmt"
	"github.com/panjf2000/ants/v2"
	"sync"
	"sync/atomic"
	"time"
)

var sum int32

func myFunc(i interface{}) {
	n := i.(int32)
	atomic.AddInt32(&sum, n)
	fmt.Printf("run with %d\n", n)
}

func demoFunc() {
	time.Sleep(10 * time.Millisecond)
	fmt.Println("Hello World!")
}

func main() {
	defer ants.Release()

	runTimes := 1000

	// Use the common pool.
	var wg sync.WaitGroup
	syncCalculateSum := func() {
		demoFunc()
		wg.Done()
	}
	for i := 0; i < runTimes; i++ {
		wg.Add(1)
		_ = ants.Submit(syncCalculateSum)
	}
	wg.Wait()
	fmt.Printf("running goroutines: %d\n", ants.Running())
	fmt.Printf("finish all tasks.\n")

	// Use the pool with a function,
	// set 10 to the capacity of goroutine pool and 1 second for expired duration.
	p, _ := ants.NewPoolWithFunc(10, func(i interface{}) {
		myFunc(i)
		wg.Done()
	})
	defer p.Release()
	// Submit tasks one by one.
	for i := 0; i < runTimes; i++ {
		wg.Add(1)
		_ = p.Invoke(int32(i))
	}
	wg.Wait()
	fmt.Printf("running goroutines: %d\n", p.Running())
	fmt.Printf("finish all tasks, result is %d\n", sum)
	if sum != 499500 {
		panic("the final result is wrong!!!")
	}

	// Use the MultiPool and set the capacity of the 10 goroutine pools to unlimited.
	// If you use -1 as the pool size parameter, the size will be unlimited.
	// There are two load-balancing algorithms for pools: ants.RoundRobin and ants.LeastTasks.
	mp, _ := ants.NewMultiPool(10, -1, ants.RoundRobin)
	defer mp.ReleaseTimeout(5 * time.Second)
	for i := 0; i < runTimes; i++ {
		wg.Add(1)
		_ = mp.Submit(syncCalculateSum)
	}
	wg.Wait()
	fmt.Printf("running goroutines: %d\n", mp.Running())
	fmt.Printf("finish all tasks.\n")

	// Use the MultiPoolFunc and set the capacity of 10 goroutine pools to (runTimes/10).
	mpf, _ := ants.NewMultiPoolWithFunc(10, runTimes/10, func(i interface{}) {
		myFunc(i)
		wg.Done()
	}, ants.LeastTasks)
	defer mpf.ReleaseTimeout(5 * time.Second)
	for i := 0; i < runTimes; i++ {
		wg.Add(1)
		_ = mpf.Invoke(int32(i))
	}
	wg.Wait()
	fmt.Printf("running goroutines: %d\n", mpf.Running())
	fmt.Printf("finish all tasks, result is %d\n", sum)
	if sum != 499500*2 {
		panic("the final result is wrong!!!")
	}
}

有点像python的基于多线程的异步模式