gRPC
警告
使用gRPC有一个比较麻烦的地方: 国内打不开google的protobuf在线文档。
- github地址:
- 安装grpc:
pip install grpcio
- 安装grpc辅助工具:
pip install grpcio-tools
- Docs:
- 官方样例:
https://github.com/grpc/grpc/blob/master/examples/protos/route_guide.proto
- etcd集群部署:
- awesome:
- django集成grpc:
- 知乎:
使用gRPC的项目:
参考资料
导入的问题
-I
参数可以控制从哪里导入pb2文件
python -m grpc_tools.protoc -I api=./api --python_out=. --grpc_python_out=. ./api/gbackend.proto
./api/gbackend_pb2_grpc.py
...
from api import gbackend_pb2 as api_dot_gbackend__pb2
...
django gRPC的接口测试
在后台启动一个实时的gRPC服务器
class LiveServerThread(threading.Thread):
"""Thread for running a live http server while the tests are running."""
def __init__(self, port=50052):
self.port = port
self.is_ready = threading.Event()
self.error = None
super().__init__()
def run(self):
try:
self.httpd = self._create_server()
self.is_ready.set()
self.httpd.start()
self.httpd.wait_for_termination()
except Exception as e:
self.error = e
self.is_ready.set()
def _create_server(self):
import grpc
from concurrent import futures
server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
bae_pb2_grpc.add_MockExecServicer_to_server(BAE(), server)
address = '[::]:{0}'.format(self.port)
server.add_insecure_port(address)
return server
def terminate(self):
if hasattr(self, 'httpd'):
# Stop the WSGI server
self.httpd.shutdown()
self.httpd.server_close()
self.join()
class GRPCTestCase(SimpleTestCase):
port = 50052
server_thread_class = LiveServerThread
@classmethod
def setUpClass(cls):
super().setUpClass()
cls.server_thread = cls._create_server_thread()
cls.server_thread.daemon = True
cls.server_thread.start()
cls.server_thread.is_ready.wait()
if cls.server_thread.error:
# Clean up behind ourselves, since tearDownClass won't get called in
# case of errors.
cls._tearDownClassInternal()
raise cls.server_thread.error
@classmethod
def _create_server_thread(cls):
return cls.server_thread_class(
port=cls.port,
)
@classmethod
def _tearDownClassInternal(cls):
# There may not be a 'server_thread' attribute if setUpClass() for some
# reasons has raised an exception.
if hasattr(cls, 'server_thread'):
# Terminate the live server's thread
cls.server_thread.terminate()
super().tearDownClass()
class TestBaeServer(GRPCTestCase):
def test_exec(self):
target = 'localhost:{0}'.format('50052')
with grpc.insecure_channel(target) as channel:
stub = bae_pb2_grpc.MockExecStub(channel)
source_code = "print(123)"
response = stub.Exec(bae_pb2.Arg(source_code=source_code))
resp = json.loads(response.resp)
self.assertEqual(resp["output"], "123\n")
self.assertEqual(resp["error"], "")
代码其实参考了LiveServerTestCase的实现。
快速开始
生成client和server代码
python -m grpc_tools.protoc -I../../protos --python_out=. --grpc_python_out=. ../../protos/route_guide.proto
../../protos是.proto文件目录相对路径
–python_out=. 生成python文件导出路径
–grpc_python_out=. 生成grpc python文件导出路径
../../protos/route_guide.proto是相对路径+proto文件名
如果在当前目录
python -m grpc_tools.protoc -I. --python_out=. --grpc_python_out=. rcs.proto
生成的代码文件为route_guide_pb2.py和route_guide_pb2_grpc.py,包含:
用于route_guide.proto中定义的消息的类
在route_guide.proto中定义的服务的类
RouteGuideStub,客户端可以使用它来调用RouteGuide rpc
RouteGuideServicer,它定义了RouteGuide服务实现的接口
在route_guide.proto中定义的服务函数add_RouteGuideServicer_to_server,它将一个RouteGuideServicer添加到grpc.Server
编写server代码
创建和运行server分为两个工作项:
通过执行服务的实际“工作”的函数实现从服务定义生成的服务接口。
运行gRPC服务器侦听来自client的请求并传输响应。
You can find the example RouteGuide server in examples/python/route_guide/route_guide_server.py
实现RouteGuide
# RouteGuideServicer provides an implementation of the methods of the RouteGuide service.
class RouteGuideServicer(route_guide_pb2_grpc.RouteGuideServicer):
"""RouteGuideServicer实现了所有RouteGuide服务方法"""
def GetFeature(self, request, context):
"""简单的RPC
让我们先看一看最简单的类型GetFeature,它从客户机获取一个点,并在特性中从其数据库返回相应的特性信息。
该方法被传递一个RPC和grpc的route_guide_pb2.Point请求。ServicerContext对象,该对象提供特定于rpc的信息,如超时限制。它返回一个route_guide_pb2.Feature响应。
"""
feature = get_feature(self.db, request)
if feature is None:
return route_guide_pb2.Feature(name="", location=request)
else:
return feature
def ListFeatures(self, request, context):
"""响应流的RPC
ListFeatures是一个响应流RPC,它向客户端发送多个Feature。
这里的请求消息是一个route_guide_pb2.Rectangle,客户端希望在其中查找Feature。该方法不是返回单个响应,而是产生零或多个响应。
"""
left = min(request.lo.longitude, request.hi.longitude)
right = max(request.lo.longitude, request.hi.longitude)
top = max(request.lo.latitude, request.hi.latitude)
bottom = min(request.lo.latitude, request.hi.latitude)
for feature in self.db:
if (feature.location.longitude >= left and
feature.location.longitude <= right and
feature.location.latitude >= bottom and
feature.location.latitude <= top):
yield feature
def RecordRoute(self, request_iterator, context):
"""请求流的RPC
请求流方法RecordRoute使用请求值的迭代器,并返回单个响应值。
"""
point_count = 0
feature_count = 0
distance = 0.0
prev_point = None
start_time = time.time()
for point in request_iterator:
point_count += 1
if get_feature(self.db, point):
feature_count += 1
if prev_point:
distance += get_distance(prev_point, point)
prev_point = point
elapsed_time = time.time() - start_time
return route_guide_pb2.RouteSummary(point_count=point_count,
feature_count=feature_count,
distance=int(distance),
elapsed_time=int(elapsed_time))
def RouteChat(self, request_iterator, context):
"""双向流的RPC
该方法的语义是请求流方法和响应流方法语义的结合。传递给它的是一个请求值的迭代器,它本身也是一个响应值的迭代器。
"""
prev_notes = []
for new_note in request_iterator:
for prev_note in prev_notes:
if prev_note.location == new_note.location:
yield prev_note
prev_notes.append(new_note)
def serve():
"""启动服务
server start()方法是非阻塞的。将实例化一个新线程来处理请求。调用server.start()的线程通常在此期间没有任何其他工作要做。
在这种情况下,您可以调用server.wait_for_termination()来干净地阻塞调用线程,直到服务器终止。
"""
server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
route_guide_pb2_grpc.add_RouteGuideServicer_to_server(
RouteGuideServicer(), server)
server.add_insecure_port('[::]:50051')
server.start()
server.wait_for_termination()
编写client代码
You can see the complete example client code in examples/python/route_guide/route_guide_client.py
# 创建存根
# 我们将从.proto生成的route_guide_pb2_grpc模块的RouteGuideStub类实例化
channel = grpc.insecure_channel('localhost:50051')
stub = route_guide_pb2_grpc.RouteGuideStub(channel)
# 调用服务端函数
# 对于返回单个响应(“response-unary”方法)的RPC方法,gRPC Python同时支持同步(阻塞)和异步(非阻塞)控制流语义。
# 对于响应流RPC方法,调用立即返回响应值的迭代器。调用迭代器的next()方法块,直到迭代器产生的响应可用为止。
# 简单的RPC
# 对简单RPC GetFeature的同步调用几乎与调用本地方法一样简单。
# RPC调用等待服务器响应,并将返回响应或引发异常:
feature = stub.GetFeature(point)
# 对GetFeature的异步调用与此类似,但类似于在线程池中异步调用本地方法:
feature_future = stub.GetFeature.future(point)
feature = feature_future.result()
# 相应流的RPC
# 调用响应流列表特性类似于处理序列类型:
for feature in stub.ListFeatures(rectangle):
...
# 请求流的RPC
# 调用请求流RecordRoute类似于向本地方法传递迭代器。
# 就像上面简单的RPC也返回单个响应一样,它可以被同步或异步调用:
route_summary = stub.RecordRoute(point_iterator)
route_summary_future = stub.RecordRoute.future(point_iterator)
route_summary = route_summary_future.result()
# 双向流的RPC
# 调用双向流RouteChat(就像在服务端那样)具有请求流和响应流语义的组合:
for received_route_note in stub.RouteChat(sent_route_note_iterator):
...
调试
server端报错时默认只提供错误内容,应该用try catch获取更多的错误信息(报错具体文件和位置),