异步编程
独立开发
视频
文字
ppt
协程入门
Goroutines: 有专门的调度器管理Goroutines的执行,结合了协程的轻量级特性和线程的能力,既支持协作式也支持抢占式的调度。
/*
https://gobyexample.com/goroutines
*/
package main
import (
"fmt"
"time"
)
func f(from string) {
for i := 0; i < 3; i++ {
fmt.Println(from, ":", i)
}
}
func main() {
// 同步调用
f("direct")
// 异步调用
go f("goroutine")
// 异步调用匿名函数
go func(msg string) {
fmt.Println(msg)
}("going")
// 用睡眠保证所有协程完成, 业务代码应该用WaitGroup
time.Sleep(time.Second)
fmt.Println("done")
}
/*
https://gobyexample.com/waitgroups
*/
package main
import (
"fmt"
"sync"
"time"
)
// 在协程运行的函数
func worker(id int) {
fmt.Printf("Worker %d starting\n", id)
// 用睡眠仿真昂贵的任务
time.Sleep(time.Second)
fmt.Printf("Worker %d done\n, id")
}
func main() {
// 用于等待所有的goroutines完成任务
var wg sync.WaitGroup
for i := 1; i <= 5; i++ {
wg.Add(1)
go func() {
defer wg.Done()
worker(i)
}()
}
wg.Wait()
// 注意: 这种方法没有直接的方式来传播worker的error,可以考虑使用: https://pkg.go.dev/golang.org/x/sync/errgroup#Group.Go
}
/*
sync.WaitGroup和channel结合使用
*/
package main
import (
"fmt"
"sync"
"time"
)
var tasks = []string{"Task1", "Task2", "Task3", "Task4"}
func worker(id int, taskCh <-chan string, resultCh chan<- string, wg *sync.WaitGroup) {
defer wg.Done()
for task := range taskCh {
fmt.Printf("Worker %d started %s\n", id, task)
time.Sleep(time.Second * 2)
resultCh <- fmt.Sprintf("Result of %s by Worker %d", task, id)
}
}
func main() {
var wg sync.WaitGroup
taskCh := make(chan string, len(tasks)) // 创建一个带缓冲的channel来发送任务
resultCh := make(chan string, len(tasks)) // 创建一个带缓冲的channel来接收结果
// 启动几个worker goroutines
numWorkers := 2
for i := 0; i < numWorkers; i++ {
wg.Add(1)
go worker(i, taskCh, resultCh, &wg)
}
// 发送任务到taskCh
for _, t := range tasks {
taskCh <- t
}
close(taskCh) // 关闭channel以通知worker没有更多的任务了
// 等待所有worker完成
go func() {
wg.Wait()
close(resultCh)
}()
// 收集并打印结果
for res := range resultCh {
fmt.Println(res)
}
}
Coroutines: 依赖于程序内部的任务主动让出控制权(yield), 而不是由操作系统强制调度
由于不需要像线程那样进行上下文切换,协程可以非常轻量,一个进程可以同时运行成千上万个协程。
import time
def f(_from):
for i in range(3):
time.sleep(1)
print(_from, ":", i)
if __name__ == '__main__':
# 同步调用
print(f"started at {time.strftime('%X')}")
f("direct")
time.sleep(1)
print(f"finished at {time.strftime('%X')}")
"""
started at 17:34:04
direct : 0
direct : 1
direct : 2
finished at 17:34:08
"""
import time
import asyncio
async def af(_from):
for i in range(3):
await asyncio.sleep(1)
print(_from, ":", i)
# 定义协程函数: https://docs.python.org/zh-cn/3.10/reference/compound_stmts.html#coroutine-function-definition
# 此时main函数是一个coroutine: https://docs.python.org/zh-cn/3/glossary.html#term-coroutine
async def main():
# 挂起coroutine的执行以等待一个awaitable对象: https://docs.python.org/zh-cn/3/reference/expressions.html#await
# 注意,如果event loop里有一个task有死循环,整个event loop就会卡死了(因为没有主动yield把控制权交还给event loop)
await af("goroutine")
# 等待上面的任务完成后才运行这一行
# await coroutine不会将主动权交还给event loop
# asyncio.sleep最后是等待Future(特殊的低层级可等待对象, 表示一个异步操作的最终结果)
# 这意味着该协程将保持等待直到该Future对象在其他地方操作完毕:
await asyncio.sleep(1)
if __name__ == '__main__':
# 异步调用
print(f"started at {time.strftime('%X')}")
# 1. 把main()这个coroutine变成一个event loop里面的task
# 2. run建立起event loop
# 3. event loop的最小执行单元是task, 不是coroutine
asyncio.run(main())
print(f"finished at {time.strftime('%X')}")
print("done")
"""
started at 17:34:34
goroutine : 0
goroutine : 1
goroutine : 2
finished at 17:34:38
done
"""
import time
import asyncio
async def af(_from):
tasks = []
for i in range(3):
# 建立起task, 被用来"并行"的调度协程
# 注册到event loop, 让sleep动作在这个额外的task中立即执行
task = asyncio.create_task(asyncio.sleep(1))
tasks.append(task)
print(_from, ":", i)
for task in tasks:
await task
async def main():
await af("goroutine")
await asyncio.sleep(1)
if __name__ == '__main__':
# 异步调用
print(f"started at {time.strftime('%X')}")
asyncio.run(main())
print(f"finished at {time.strftime('%X')}")
print("done")
"""
started at 19:31:25
goroutine : 0
goroutine : 1
goroutine : 2
started at 19:31:27
done
"""
asyncio综合案例
aiohttp + redis + mysql
"""
伪代码, io操作全用sleep代替
假想需求: 异步任务处理平台
1. 任务接收与入队:
* 用户通过api发送请求
* web服务器接收到请求后, 将其转化为任务对象,并将其放入到redis队列中。
* 此脚本是平台的工作者协程,不断地从队列中获取任务。
* 获取任务后, 协程开始调用api来处理任务(如数据分析、图像处理等)。
* 处理完成后, 将结果保存到MySQL数据库中
docker启动redis
* 异步redis: https://redis.readthedocs.io/en/latest/examples/asyncio_examples.html
* 异步请求: https://aiohttp.readthedocs.io/
* brpop: https://redis.io/docs/latest/commands/brpop/
"""
import asyncio
import aiohttp
from redis.asyncio import Redis, ConnectionPool
QUEUE_KEY = "schedule:task"
def get_redis_client():
pool = ConnectionPool(max_connections=1000, decode_responses=True)
return Redis.from_pool(pool)
async def get_mysql_pool():
"""创建数据库的连接池"""
# 用aiomysql: https://github.com/aio-libs/aiomysql
await asyncio.sleep(0.01)
async def call_baidu(word):
async with aiohttp.ClientSession() as session:
async with session.get(f"https://www.baidu.com/s?wd={word}") as resp:
if resp.status == 200:
return await resp.text()
else:
raise AssertionError(await resp.text())
async def process_task(word):
# todo: 完善伪需求, 这里开asyncio.gather并发请求多个接口
# 改为接收元组参数取得a和b
# 然后并发请求加减乘除四个接口
print("请求接口:", word)
return await call_baidu(word)
async def write_result_to_db(task, pool):
# todo: 解析html, 提取数据
# cpu计算可以放到其它线程去处理
result = task.result()
print("将结果写入数据库")
await asyncio.sleep(0.2)
async def main():
client = get_redis_client()
print(f"Ping successful: {await client.ping()}")
pool = await get_mysql_pool()
try:
while True:
_, task_id = await client.brpop([QUEUE_KEY])
task = asyncio.create_task(process_task(task_id))
task.add_done_callback(lambda t: asyncio.create_task(write_result_to_db(t, pool)))
except Exception as e:
print(str(e))
finally:
await client.aclose()
if __name__ == '__main__':
asyncio.run(main())
https://pypi.org/project/pytest-asyncio/
import pytest
from unittest.mock import patch, AsyncMock
from main import process_task
@pytest.mark.asyncio
async def test_proces_task():
mock_call_baidu = AsyncMock()
mock_call_baidu.return_value = """<html><title>python</title></html>"""
with patch('main.call_baidu', new=mock_call_baidu):
assert await process_task("Python") == mock_call_baidu.return_value
goroutine池
借助第三方库 ants 帮忙管理go协程
package main
import (
"fmt"
"github.com/panjf2000/ants/v2"
"sync"
"sync/atomic"
"time"
)
var sum int32
func myFunc(i interface{}) {
n := i.(int32)
atomic.AddInt32(&sum, n)
fmt.Printf("run with %d\n", n)
}
func demoFunc() {
time.Sleep(10 * time.Millisecond)
fmt.Println("Hello World!")
}
func main() {
defer ants.Release()
runTimes := 1000
// Use the common pool.
var wg sync.WaitGroup
syncCalculateSum := func() {
demoFunc()
wg.Done()
}
for i := 0; i < runTimes; i++ {
wg.Add(1)
_ = ants.Submit(syncCalculateSum)
}
wg.Wait()
fmt.Printf("running goroutines: %d\n", ants.Running())
fmt.Printf("finish all tasks.\n")
// Use the pool with a function,
// set 10 to the capacity of goroutine pool and 1 second for expired duration.
p, _ := ants.NewPoolWithFunc(10, func(i interface{}) {
myFunc(i)
wg.Done()
})
defer p.Release()
// Submit tasks one by one.
for i := 0; i < runTimes; i++ {
wg.Add(1)
_ = p.Invoke(int32(i))
}
wg.Wait()
fmt.Printf("running goroutines: %d\n", p.Running())
fmt.Printf("finish all tasks, result is %d\n", sum)
if sum != 499500 {
panic("the final result is wrong!!!")
}
// Use the MultiPool and set the capacity of the 10 goroutine pools to unlimited.
// If you use -1 as the pool size parameter, the size will be unlimited.
// There are two load-balancing algorithms for pools: ants.RoundRobin and ants.LeastTasks.
mp, _ := ants.NewMultiPool(10, -1, ants.RoundRobin)
defer mp.ReleaseTimeout(5 * time.Second)
for i := 0; i < runTimes; i++ {
wg.Add(1)
_ = mp.Submit(syncCalculateSum)
}
wg.Wait()
fmt.Printf("running goroutines: %d\n", mp.Running())
fmt.Printf("finish all tasks.\n")
// Use the MultiPoolFunc and set the capacity of 10 goroutine pools to (runTimes/10).
mpf, _ := ants.NewMultiPoolWithFunc(10, runTimes/10, func(i interface{}) {
myFunc(i)
wg.Done()
}, ants.LeastTasks)
defer mpf.ReleaseTimeout(5 * time.Second)
for i := 0; i < runTimes; i++ {
wg.Add(1)
_ = mpf.Invoke(int32(i))
}
wg.Wait()
fmt.Printf("running goroutines: %d\n", mpf.Running())
fmt.Printf("finish all tasks, result is %d\n", sum)
if sum != 499500*2 {
panic("the final result is wrong!!!")
}
}
有点像python的基于多线程的异步模式