gRPC

警告

使用gRPC有一个比较麻烦的地方: 国内打不开google的protobuf在线文档。

github地址:

https://github.com/grpc/grpc/tree/master/src/python/grpcio

安装grpc:

pip install grpcio

安装grpc辅助工具:

pip install grpcio-tools

Docs:

https://www.grpc.io/docs/languages/python/quickstart/

官方样例:

https://github.com/grpc/grpc/blob/master/examples/protos/route_guide.proto

etcd集群部署:

https://etcd.io/docs/v3.5/op-guide/clustering/

awesome:

https://github.com/grpc-ecosystem/awesome-grpc

django集成grpc:

https://github.com/gluk-w/django-grpc

知乎:

https://zhuanlan.zhihu.com/p/590207541?

使用gRPC的项目:

导入的问题

-I 参数可以控制从哪里导入pb2文件

python -m grpc_tools.protoc -I api=./api --python_out=. --grpc_python_out=. ./api/gbackend.proto

./api/gbackend_pb2_grpc.py

...
from api import gbackend_pb2 as api_dot_gbackend__pb2
...

django gRPC的接口测试

在后台启动一个实时的gRPC服务器

class LiveServerThread(threading.Thread):
    """Thread for running a live http server while the tests are running."""

    def __init__(self, port=50052):
        self.port = port
        self.is_ready = threading.Event()
        self.error = None
        super().__init__()

    def run(self):
        try:
            self.httpd = self._create_server()
            self.is_ready.set()
            self.httpd.start()
            self.httpd.wait_for_termination()
        except Exception as e:
            self.error = e
            self.is_ready.set()

    def _create_server(self):
        import grpc
        from concurrent import futures
        server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
        bae_pb2_grpc.add_MockExecServicer_to_server(BAE(), server)
        address = '[::]:{0}'.format(self.port)
        server.add_insecure_port(address)
        return server

    def terminate(self):
        if hasattr(self, 'httpd'):
            # Stop the WSGI server
            self.httpd.shutdown()
            self.httpd.server_close()
        self.join()


class GRPCTestCase(SimpleTestCase):
    port = 50052
    server_thread_class = LiveServerThread

    @classmethod
    def setUpClass(cls):
        super().setUpClass()
        cls.server_thread = cls._create_server_thread()
        cls.server_thread.daemon = True
        cls.server_thread.start()
        cls.server_thread.is_ready.wait()
        if cls.server_thread.error:
            # Clean up behind ourselves, since tearDownClass won't get called in
            # case of errors.
            cls._tearDownClassInternal()
            raise cls.server_thread.error

    @classmethod
    def _create_server_thread(cls):
        return cls.server_thread_class(
            port=cls.port,
        )

    @classmethod
    def _tearDownClassInternal(cls):
        # There may not be a 'server_thread' attribute if setUpClass() for some
        # reasons has raised an exception.
        if hasattr(cls, 'server_thread'):
            # Terminate the live server's thread
            cls.server_thread.terminate()

            super().tearDownClass()


class TestBaeServer(GRPCTestCase):
    def test_exec(self):
        target = 'localhost:{0}'.format('50052')
        with grpc.insecure_channel(target) as channel:
            stub = bae_pb2_grpc.MockExecStub(channel)
            source_code = "print(123)"
            response = stub.Exec(bae_pb2.Arg(source_code=source_code))
            resp = json.loads(response.resp)
            self.assertEqual(resp["output"], "123\n")
            self.assertEqual(resp["error"], "")

代码其实参考了LiveServerTestCase的实现。

快速开始

生成client和server代码

python -m grpc_tools.protoc -I../../protos --python_out=. --grpc_python_out=. ../../protos/route_guide.proto
  • ../../protos是.proto文件目录相对路径

  • –python_out=. 生成python文件导出路径

  • –grpc_python_out=. 生成grpc python文件导出路径

  • ../../protos/route_guide.proto是相对路径+proto文件名

如果在当前目录

python -m grpc_tools.protoc -I. --python_out=. --grpc_python_out=. rcs.proto

生成的代码文件为route_guide_pb2.py和route_guide_pb2_grpc.py,包含:

  • 用于route_guide.proto中定义的消息的类

  • 在route_guide.proto中定义的服务的类

  • RouteGuideStub,客户端可以使用它来调用RouteGuide rpc

  • RouteGuideServicer,它定义了RouteGuide服务实现的接口

  • 在route_guide.proto中定义的服务函数add_RouteGuideServicer_to_server,它将一个RouteGuideServicer添加到grpc.Server

编写server代码

创建和运行server分为两个工作项:

  • 通过执行服务的实际“工作”的函数实现从服务定义生成的服务接口。

  • 运行gRPC服务器侦听来自client的请求并传输响应。

You can find the example RouteGuide server in examples/python/route_guide/route_guide_server.py

实现RouteGuide

# RouteGuideServicer provides an implementation of the methods of the RouteGuide service.
class RouteGuideServicer(route_guide_pb2_grpc.RouteGuideServicer):
    """RouteGuideServicer实现了所有RouteGuide服务方法"""

    def GetFeature(self, request, context):
        """简单的RPC

        让我们先看一看最简单的类型GetFeature,它从客户机获取一个点,并在特性中从其数据库返回相应的特性信息。
        该方法被传递一个RPC和grpc的route_guide_pb2.Point请求。ServicerContext对象,该对象提供特定于rpc的信息,如超时限制。它返回一个route_guide_pb2.Feature响应。
        """
        feature = get_feature(self.db, request)
        if feature is None:
            return route_guide_pb2.Feature(name="", location=request)
        else:
            return feature

    def ListFeatures(self, request, context):
        """响应流的RPC

        ListFeatures是一个响应流RPC,它向客户端发送多个Feature。
        这里的请求消息是一个route_guide_pb2.Rectangle,客户端希望在其中查找Feature。该方法不是返回单个响应,而是产生零或多个响应。
        """
        left = min(request.lo.longitude, request.hi.longitude)
        right = max(request.lo.longitude, request.hi.longitude)
        top = max(request.lo.latitude, request.hi.latitude)
        bottom = min(request.lo.latitude, request.hi.latitude)
        for feature in self.db:
            if (feature.location.longitude >= left and
                feature.location.longitude <= right and
                feature.location.latitude >= bottom and
                feature.location.latitude <= top):
            yield feature

    def RecordRoute(self, request_iterator, context):
        """请求流的RPC

        请求流方法RecordRoute使用请求值的迭代器,并返回单个响应值。
        """
        point_count = 0
        feature_count = 0
        distance = 0.0
        prev_point = None

        start_time = time.time()
        for point in request_iterator:
            point_count += 1
            if get_feature(self.db, point):
                feature_count += 1
            if prev_point:
                distance += get_distance(prev_point, point)
            prev_point = point

        elapsed_time = time.time() - start_time
        return route_guide_pb2.RouteSummary(point_count=point_count,
                                            feature_count=feature_count,
                                            distance=int(distance),
                                            elapsed_time=int(elapsed_time))

    def RouteChat(self, request_iterator, context):
        """双向流的RPC

        该方法的语义是请求流方法和响应流方法语义的结合。传递给它的是一个请求值的迭代器,它本身也是一个响应值的迭代器。
        """
        prev_notes = []
        for new_note in request_iterator:
            for prev_note in prev_notes:
            if prev_note.location == new_note.location:
                yield prev_note
            prev_notes.append(new_note)

def serve():
    """启动服务

    server start()方法是非阻塞的。将实例化一个新线程来处理请求。调用server.start()的线程通常在此期间没有任何其他工作要做。
    在这种情况下,您可以调用server.wait_for_termination()来干净地阻塞调用线程,直到服务器终止。
    """
    server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
    route_guide_pb2_grpc.add_RouteGuideServicer_to_server(
        RouteGuideServicer(), server)
    server.add_insecure_port('[::]:50051')
    server.start()
    server.wait_for_termination()

编写client代码

完整代码

You can see the complete example client code in examples/python/route_guide/route_guide_client.py

# 创建存根
# 我们将从.proto生成的route_guide_pb2_grpc模块的RouteGuideStub类实例化
channel = grpc.insecure_channel('localhost:50051')
stub = route_guide_pb2_grpc.RouteGuideStub(channel)

# 调用服务端函数
# 对于返回单个响应(“response-unary”方法)的RPC方法,gRPC Python同时支持同步(阻塞)和异步(非阻塞)控制流语义。
# 对于响应流RPC方法,调用立即返回响应值的迭代器。调用迭代器的next()方法块,直到迭代器产生的响应可用为止。

# 简单的RPC
# 对简单RPC GetFeature的同步调用几乎与调用本地方法一样简单。
# RPC调用等待服务器响应,并将返回响应或引发异常:
feature = stub.GetFeature(point)

# 对GetFeature的异步调用与此类似,但类似于在线程池中异步调用本地方法:
feature_future = stub.GetFeature.future(point)
feature = feature_future.result()

# 相应流的RPC
# 调用响应流列表特性类似于处理序列类型:
for feature in stub.ListFeatures(rectangle):
    ...

# 请求流的RPC
# 调用请求流RecordRoute类似于向本地方法传递迭代器。
# 就像上面简单的RPC也返回单个响应一样,它可以被同步或异步调用:
route_summary = stub.RecordRoute(point_iterator)
route_summary_future = stub.RecordRoute.future(point_iterator)
route_summary = route_summary_future.result()

# 双向流的RPC
# 调用双向流RouteChat(就像在服务端那样)具有请求流和响应流语义的组合:
for received_route_note in stub.RouteChat(sent_route_note_iterator):
    ...

调试

server端报错时默认只提供错误内容,应该用try catch获取更多的错误信息(报错具体文件和位置),