*********************************** gRPC *********************************** .. warning:: 使用gRPC有一个比较麻烦的地方: 国内打不开google的protobuf在线文档。 :github地址: https://github.com/grpc/grpc/tree/master/src/python/grpcio :安装grpc: ``pip install grpcio`` :安装grpc辅助工具: ``pip install grpcio-tools`` :Docs: https://www.grpc.io/docs/languages/python/quickstart/ :官方样例: https://github.com/grpc/grpc/blob/master/examples/protos/route_guide.proto :etcd集群部署: https://etcd.io/docs/v3.5/op-guide/clustering/ :awesome: https://github.com/grpc-ecosystem/awesome-grpc :django集成grpc: https://github.com/gluk-w/django-grpc :知乎: https://zhuanlan.zhihu.com/p/590207541? 使用gRPC的项目: * `core `_ .. toctree:: :maxdepth: 2 :caption: 参考资料 grpc服务注册和发现 grpc使用etcd做服务注册与发现 全方位对比Zookeeper、Eureka、Nacos、Consul和Etcd etcd集群安装和单机安装 资源服务器和授权服务的交互 你不知道的gRPC反向代理 gRPC跨进程使用引发的问题 GRPC的理解 导入的问题 ========================================================= * https://github.com/grpc/grpc/issues/29390 * https://github.com/protocolbuffers/protobuf/issues/1491 ``-I`` 参数可以控制从哪里导入pb2文件 .. code-block:: text python -m grpc_tools.protoc -I api=./api --python_out=. --grpc_python_out=. ./api/gbackend.proto ``./api/gbackend_pb2_grpc.py`` :: ... from api import gbackend_pb2 as api_dot_gbackend__pb2 ... django gRPC的接口测试 ========================================================== 在后台启动一个实时的gRPC服务器 .. code-block:: python class LiveServerThread(threading.Thread): """Thread for running a live http server while the tests are running.""" def __init__(self, port=50052): self.port = port self.is_ready = threading.Event() self.error = None super().__init__() def run(self): try: self.httpd = self._create_server() self.is_ready.set() self.httpd.start() self.httpd.wait_for_termination() except Exception as e: self.error = e self.is_ready.set() def _create_server(self): import grpc from concurrent import futures server = grpc.server(futures.ThreadPoolExecutor(max_workers=10)) bae_pb2_grpc.add_MockExecServicer_to_server(BAE(), server) address = '[::]:{0}'.format(self.port) server.add_insecure_port(address) return server def terminate(self): if hasattr(self, 'httpd'): # Stop the WSGI server self.httpd.shutdown() self.httpd.server_close() self.join() class GRPCTestCase(SimpleTestCase): port = 50052 server_thread_class = LiveServerThread @classmethod def setUpClass(cls): super().setUpClass() cls.server_thread = cls._create_server_thread() cls.server_thread.daemon = True cls.server_thread.start() cls.server_thread.is_ready.wait() if cls.server_thread.error: # Clean up behind ourselves, since tearDownClass won't get called in # case of errors. cls._tearDownClassInternal() raise cls.server_thread.error @classmethod def _create_server_thread(cls): return cls.server_thread_class( port=cls.port, ) @classmethod def _tearDownClassInternal(cls): # There may not be a 'server_thread' attribute if setUpClass() for some # reasons has raised an exception. if hasattr(cls, 'server_thread'): # Terminate the live server's thread cls.server_thread.terminate() super().tearDownClass() class TestBaeServer(GRPCTestCase): def test_exec(self): target = 'localhost:{0}'.format('50052') with grpc.insecure_channel(target) as channel: stub = bae_pb2_grpc.MockExecStub(channel) source_code = "print(123)" response = stub.Exec(bae_pb2.Arg(source_code=source_code)) resp = json.loads(response.resp) self.assertEqual(resp["output"], "123\n") self.assertEqual(resp["error"], "") 代码其实参考了LiveServerTestCase的实现。 快速开始 ==================================== 生成client和server代码 -------------------------------------- .. code-block:: text python -m grpc_tools.protoc -I../../protos --python_out=. --grpc_python_out=. ../../protos/route_guide.proto * ../../protos是.proto文件目录相对路径 * --python_out=. 生成python文件导出路径 * --grpc_python_out=. 生成grpc python文件导出路径 * ../../protos/route_guide.proto是相对路径+proto文件名 如果在当前目录 .. code-block:: text python -m grpc_tools.protoc -I. --python_out=. --grpc_python_out=. rcs.proto 生成的代码文件为route_guide_pb2.py和route_guide_pb2_grpc.py,包含: * 用于route_guide.proto中定义的消息的类 * 在route_guide.proto中定义的服务的类 * RouteGuideStub,客户端可以使用它来调用RouteGuide rpc * RouteGuideServicer,它定义了RouteGuide服务实现的接口 * 在route_guide.proto中定义的服务函数add_RouteGuideServicer_to_server,它将一个RouteGuideServicer添加到grpc.Server 编写server代码 ------------------------------------- 创建和运行server分为两个工作项: * 通过执行服务的实际“工作”的函数实现从服务定义生成的服务接口。 * 运行gRPC服务器侦听来自client的请求并传输响应。 You can find the example RouteGuide server in examples/python/route_guide/route_guide_server.py **实现RouteGuide** .. code-block:: python # RouteGuideServicer provides an implementation of the methods of the RouteGuide service. class RouteGuideServicer(route_guide_pb2_grpc.RouteGuideServicer): """RouteGuideServicer实现了所有RouteGuide服务方法""" def GetFeature(self, request, context): """简单的RPC 让我们先看一看最简单的类型GetFeature,它从客户机获取一个点,并在特性中从其数据库返回相应的特性信息。 该方法被传递一个RPC和grpc的route_guide_pb2.Point请求。ServicerContext对象,该对象提供特定于rpc的信息,如超时限制。它返回一个route_guide_pb2.Feature响应。 """ feature = get_feature(self.db, request) if feature is None: return route_guide_pb2.Feature(name="", location=request) else: return feature def ListFeatures(self, request, context): """响应流的RPC ListFeatures是一个响应流RPC,它向客户端发送多个Feature。 这里的请求消息是一个route_guide_pb2.Rectangle,客户端希望在其中查找Feature。该方法不是返回单个响应,而是产生零或多个响应。 """ left = min(request.lo.longitude, request.hi.longitude) right = max(request.lo.longitude, request.hi.longitude) top = max(request.lo.latitude, request.hi.latitude) bottom = min(request.lo.latitude, request.hi.latitude) for feature in self.db: if (feature.location.longitude >= left and feature.location.longitude <= right and feature.location.latitude >= bottom and feature.location.latitude <= top): yield feature def RecordRoute(self, request_iterator, context): """请求流的RPC 请求流方法RecordRoute使用请求值的迭代器,并返回单个响应值。 """ point_count = 0 feature_count = 0 distance = 0.0 prev_point = None start_time = time.time() for point in request_iterator: point_count += 1 if get_feature(self.db, point): feature_count += 1 if prev_point: distance += get_distance(prev_point, point) prev_point = point elapsed_time = time.time() - start_time return route_guide_pb2.RouteSummary(point_count=point_count, feature_count=feature_count, distance=int(distance), elapsed_time=int(elapsed_time)) def RouteChat(self, request_iterator, context): """双向流的RPC 该方法的语义是请求流方法和响应流方法语义的结合。传递给它的是一个请求值的迭代器,它本身也是一个响应值的迭代器。 """ prev_notes = [] for new_note in request_iterator: for prev_note in prev_notes: if prev_note.location == new_note.location: yield prev_note prev_notes.append(new_note) def serve(): """启动服务 server start()方法是非阻塞的。将实例化一个新线程来处理请求。调用server.start()的线程通常在此期间没有任何其他工作要做。 在这种情况下,您可以调用server.wait_for_termination()来干净地阻塞调用线程,直到服务器终止。 """ server = grpc.server(futures.ThreadPoolExecutor(max_workers=10)) route_guide_pb2_grpc.add_RouteGuideServicer_to_server( RouteGuideServicer(), server) server.add_insecure_port('[::]:50051') server.start() server.wait_for_termination() 编写client代码 -------------------------------- `完整代码 `_ You can see the complete example client code in examples/python/route_guide/route_guide_client.py .. code-block:: python # 创建存根 # 我们将从.proto生成的route_guide_pb2_grpc模块的RouteGuideStub类实例化 channel = grpc.insecure_channel('localhost:50051') stub = route_guide_pb2_grpc.RouteGuideStub(channel) # 调用服务端函数 # 对于返回单个响应(“response-unary”方法)的RPC方法,gRPC Python同时支持同步(阻塞)和异步(非阻塞)控制流语义。 # 对于响应流RPC方法,调用立即返回响应值的迭代器。调用迭代器的next()方法块,直到迭代器产生的响应可用为止。 # 简单的RPC # 对简单RPC GetFeature的同步调用几乎与调用本地方法一样简单。 # RPC调用等待服务器响应,并将返回响应或引发异常: feature = stub.GetFeature(point) # 对GetFeature的异步调用与此类似,但类似于在线程池中异步调用本地方法: feature_future = stub.GetFeature.future(point) feature = feature_future.result() # 相应流的RPC # 调用响应流列表特性类似于处理序列类型: for feature in stub.ListFeatures(rectangle): ... # 请求流的RPC # 调用请求流RecordRoute类似于向本地方法传递迭代器。 # 就像上面简单的RPC也返回单个响应一样,它可以被同步或异步调用: route_summary = stub.RecordRoute(point_iterator) route_summary_future = stub.RecordRoute.future(point_iterator) route_summary = route_summary_future.result() # 双向流的RPC # 调用双向流RouteChat(就像在服务端那样)具有请求流和响应流语义的组合: for received_route_note in stub.RouteChat(sent_route_note_iterator): ... 调试 ------------------------------- server端报错时默认只提供错误内容,应该用try catch获取更多的错误信息(报错具体文件和位置), .. include:: protobuf