异步编程
独立开发
视频
文字
ppt
协程入门
Goroutines: 有专门的调度器管理Goroutines的执行,结合了协程的轻量级特性和线程的能力,既支持协作式也支持抢占式的调度。
/*
https://gobyexample.com/goroutines
*/
package main
import (
	"fmt"
	"time"
)
func f(from string) {
	for i := 0; i < 3; i++ {
		fmt.Println(from, ":", i)
	}
}
func main() {
	// 同步调用
	f("direct")
	// 异步调用
	go f("goroutine")
	// 异步调用匿名函数
	go func(msg string) {
		fmt.Println(msg)
	}("going")
	// 用睡眠保证所有协程完成, 业务代码应该用WaitGroup
	time.Sleep(time.Second)
	fmt.Println("done")
}
/*
https://gobyexample.com/waitgroups
*/
package main
import (
	"fmt"
	"sync"
	"time"
)
// 在协程运行的函数
func worker(id int) {
	fmt.Printf("Worker %d starting\n", id)
	// 用睡眠仿真昂贵的任务
	time.Sleep(time.Second)
	fmt.Printf("Worker %d done\n, id")
}
func main() {
	// 用于等待所有的goroutines完成任务
	var wg sync.WaitGroup
	for i := 1; i <= 5; i++ {
		wg.Add(1)
		go func() {
			defer wg.Done()
			worker(i)
		}()
	}
	wg.Wait()
	// 注意: 这种方法没有直接的方式来传播worker的error,可以考虑使用: https://pkg.go.dev/golang.org/x/sync/errgroup#Group.Go
}
/*
sync.WaitGroup和channel结合使用
*/
package main
import (
	"fmt"
	"sync"
	"time"
)
var tasks = []string{"Task1", "Task2", "Task3", "Task4"}
func worker(id int, taskCh <-chan string, resultCh chan<- string, wg *sync.WaitGroup) {
	defer wg.Done()
	for task := range taskCh {
		fmt.Printf("Worker %d started %s\n", id, task)
		time.Sleep(time.Second * 2)
		resultCh <- fmt.Sprintf("Result of %s by Worker %d", task, id)
	}
}
func main() {
	var wg sync.WaitGroup
	taskCh := make(chan string, len(tasks))   // 创建一个带缓冲的channel来发送任务
	resultCh := make(chan string, len(tasks)) // 创建一个带缓冲的channel来接收结果
	// 启动几个worker goroutines
	numWorkers := 2
	for i := 0; i < numWorkers; i++ {
		wg.Add(1)
		go worker(i, taskCh, resultCh, &wg)
	}
	// 发送任务到taskCh
	for _, t := range tasks {
		taskCh <- t
	}
	close(taskCh) // 关闭channel以通知worker没有更多的任务了
	// 等待所有worker完成
	go func() {
		wg.Wait()
		close(resultCh)
	}()
	// 收集并打印结果
	for res := range resultCh {
		fmt.Println(res)
	}
}
Coroutines: 依赖于程序内部的任务主动让出控制权(yield), 而不是由操作系统强制调度
由于不需要像线程那样进行上下文切换,协程可以非常轻量,一个进程可以同时运行成千上万个协程。
import time
def f(_from):
    for i in range(3):
        time.sleep(1)
        print(_from, ":", i)
if __name__ == '__main__':
    # 同步调用
    print(f"started at {time.strftime('%X')}")
    f("direct")
    time.sleep(1)
    print(f"finished  at {time.strftime('%X')}")
"""
started at 17:34:04
direct : 0
direct : 1
direct : 2
finished  at 17:34:08
"""
import time
import asyncio
async def af(_from):
    for i in range(3):
        await asyncio.sleep(1)
        print(_from, ":", i)
# 定义协程函数: https://docs.python.org/zh-cn/3.10/reference/compound_stmts.html#coroutine-function-definition
# 此时main函数是一个coroutine: https://docs.python.org/zh-cn/3/glossary.html#term-coroutine
async def main():
    # 挂起coroutine的执行以等待一个awaitable对象: https://docs.python.org/zh-cn/3/reference/expressions.html#await
    # 注意,如果event loop里有一个task有死循环,整个event loop就会卡死了(因为没有主动yield把控制权交还给event loop)
    await af("goroutine")
    # 等待上面的任务完成后才运行这一行
    # await coroutine不会将主动权交还给event loop
    # asyncio.sleep最后是等待Future(特殊的低层级可等待对象, 表示一个异步操作的最终结果)
    # 这意味着该协程将保持等待直到该Future对象在其他地方操作完毕:
    await asyncio.sleep(1)
if __name__ == '__main__':
    # 异步调用
    print(f"started at {time.strftime('%X')}")
    # 1. 把main()这个coroutine变成一个event loop里面的task
    # 2. run建立起event loop
    # 3. event loop的最小执行单元是task, 不是coroutine
    asyncio.run(main())
    print(f"finished  at {time.strftime('%X')}")
    print("done")
"""
started at 17:34:34
goroutine : 0
goroutine : 1
goroutine : 2
finished  at 17:34:38
done
"""
import time
import asyncio
async def af(_from):
    tasks = []
    for i in range(3):
        # 建立起task, 被用来"并行"的调度协程
        # 注册到event loop, 让sleep动作在这个额外的task中立即执行
        task = asyncio.create_task(asyncio.sleep(1))
        tasks.append(task)
        print(_from, ":", i)
    for task in tasks:
        await task
async def main():
    await af("goroutine")
    await asyncio.sleep(1)
if __name__ == '__main__':
    # 异步调用
    print(f"started at {time.strftime('%X')}")
    asyncio.run(main())
    print(f"finished  at {time.strftime('%X')}")
    print("done")
"""
started at 19:31:25
goroutine : 0
goroutine : 1
goroutine : 2
started at 19:31:27
done
"""
asyncio综合案例
aiohttp + redis + mysql
"""
伪代码, io操作全用sleep代替
假想需求: 异步任务处理平台
1. 任务接收与入队:
* 用户通过api发送请求
* web服务器接收到请求后, 将其转化为任务对象,并将其放入到redis队列中。
* 此脚本是平台的工作者协程,不断地从队列中获取任务。
* 获取任务后, 协程开始调用api来处理任务(如数据分析、图像处理等)。
* 处理完成后, 将结果保存到MySQL数据库中
docker启动redis
* 异步redis: https://redis.readthedocs.io/en/latest/examples/asyncio_examples.html
* 异步请求: https://aiohttp.readthedocs.io/
* brpop: https://redis.io/docs/latest/commands/brpop/
"""
import asyncio
import aiohttp
from redis.asyncio import Redis, ConnectionPool
QUEUE_KEY = "schedule:task"
def get_redis_client():
    pool = ConnectionPool(max_connections=1000, decode_responses=True)
    return Redis.from_pool(pool)
async def get_mysql_pool():
    """创建数据库的连接池"""
    # 用aiomysql: https://github.com/aio-libs/aiomysql
    await asyncio.sleep(0.01)
async def call_baidu(word):
    async with aiohttp.ClientSession() as session:
        async with session.get(f"https://www.baidu.com/s?wd={word}") as resp:
            if resp.status == 200:
                return await resp.text()
            else:
                raise AssertionError(await resp.text())
async def process_task(word):
    # todo: 完善伪需求, 这里开asyncio.gather并发请求多个接口
    #  改为接收元组参数取得a和b
    #  然后并发请求加减乘除四个接口
    print("请求接口:", word)
    return await call_baidu(word)
async def write_result_to_db(task, pool):
    # todo: 解析html, 提取数据
    #  cpu计算可以放到其它线程去处理
    result = task.result()
    print("将结果写入数据库")
    await asyncio.sleep(0.2)
async def main():
    client = get_redis_client()
    print(f"Ping successful: {await client.ping()}")
    pool = await get_mysql_pool()
    try:
        while True:
            _, task_id = await client.brpop([QUEUE_KEY])
            task = asyncio.create_task(process_task(task_id))
            task.add_done_callback(lambda t: asyncio.create_task(write_result_to_db(t, pool)))
    except Exception as e:
        print(str(e))
    finally:
        await client.aclose()
if __name__ == '__main__':
    asyncio.run(main())
https://pypi.org/project/pytest-asyncio/
import pytest
from unittest.mock import patch, AsyncMock
from main import process_task
@pytest.mark.asyncio
async def test_proces_task():
    mock_call_baidu = AsyncMock()
    mock_call_baidu.return_value = """<html><title>python</title></html>"""
    with patch('main.call_baidu', new=mock_call_baidu):
        assert await process_task("Python") == mock_call_baidu.return_value
goroutine池
借助第三方库 ants 帮忙管理go协程
package main
import (
	"fmt"
	"github.com/panjf2000/ants/v2"
	"sync"
	"sync/atomic"
	"time"
)
var sum int32
func myFunc(i interface{}) {
	n := i.(int32)
	atomic.AddInt32(&sum, n)
	fmt.Printf("run with %d\n", n)
}
func demoFunc() {
	time.Sleep(10 * time.Millisecond)
	fmt.Println("Hello World!")
}
func main() {
	defer ants.Release()
	runTimes := 1000
	// Use the common pool.
	var wg sync.WaitGroup
	syncCalculateSum := func() {
		demoFunc()
		wg.Done()
	}
	for i := 0; i < runTimes; i++ {
		wg.Add(1)
		_ = ants.Submit(syncCalculateSum)
	}
	wg.Wait()
	fmt.Printf("running goroutines: %d\n", ants.Running())
	fmt.Printf("finish all tasks.\n")
	// Use the pool with a function,
	// set 10 to the capacity of goroutine pool and 1 second for expired duration.
	p, _ := ants.NewPoolWithFunc(10, func(i interface{}) {
		myFunc(i)
		wg.Done()
	})
	defer p.Release()
	// Submit tasks one by one.
	for i := 0; i < runTimes; i++ {
		wg.Add(1)
		_ = p.Invoke(int32(i))
	}
	wg.Wait()
	fmt.Printf("running goroutines: %d\n", p.Running())
	fmt.Printf("finish all tasks, result is %d\n", sum)
	if sum != 499500 {
		panic("the final result is wrong!!!")
	}
	// Use the MultiPool and set the capacity of the 10 goroutine pools to unlimited.
	// If you use -1 as the pool size parameter, the size will be unlimited.
	// There are two load-balancing algorithms for pools: ants.RoundRobin and ants.LeastTasks.
	mp, _ := ants.NewMultiPool(10, -1, ants.RoundRobin)
	defer mp.ReleaseTimeout(5 * time.Second)
	for i := 0; i < runTimes; i++ {
		wg.Add(1)
		_ = mp.Submit(syncCalculateSum)
	}
	wg.Wait()
	fmt.Printf("running goroutines: %d\n", mp.Running())
	fmt.Printf("finish all tasks.\n")
	// Use the MultiPoolFunc and set the capacity of 10 goroutine pools to (runTimes/10).
	mpf, _ := ants.NewMultiPoolWithFunc(10, runTimes/10, func(i interface{}) {
		myFunc(i)
		wg.Done()
	}, ants.LeastTasks)
	defer mpf.ReleaseTimeout(5 * time.Second)
	for i := 0; i < runTimes; i++ {
		wg.Add(1)
		_ = mpf.Invoke(int32(i))
	}
	wg.Wait()
	fmt.Printf("running goroutines: %d\n", mpf.Running())
	fmt.Printf("finish all tasks, result is %d\n", sum)
	if sum != 499500*2 {
		panic("the final result is wrong!!!")
	}
}
有点像python的基于多线程的异步模式