运维开发网

用Python编写简单的gRPC服务的详细过程

运维开发网 https://www.qedev.com 2021-07-05 10:09 出处:网络 作者: 西京刀客
gRPC 是可以在任何环境中运行的现代开源高性能 RPC 框架。它可以通过可插拔的支持来有效地连接数据中心内和跨数据中心的服务,以实现负载平衡,跟踪,运行状况检查和身份验证编程客栈。它也适用于分布式计算的最后一

gRPC 是可以在任何环境中运行的现代开源高性能 RPC 框架。它可以通过可插拔的支持来有效地连接数据中心内和跨数据中心的服务,以实现负载平衡,跟踪,运行状况检查和身份验证编程客栈。它也适用于分布式计算的最后一英里,以将设备,移动应用程序和浏览器连接到后端服务。

用python编写简单的gRPC服务

grpc官网python参考:https://www.grpc.io/docs/languages/python/quickstart/

http://grpc.github.io/grpc/python/grpc.html

  • Python 3.5 or higher
  • pip version 9.0.1 or higher

安装gRPC相关的库

grpcio-tools主要用根据我们的protocol buffer定义来生成Python代码,官方解释是Protobuf code generator for gRPC。

#apt install python3-pip
pip install grpcio
pip install protobuf
pip install grpcio_tools

编写proto文件

proto是一个协议文件,客户端和服务器的通信接口正是通过proto文件协定的,可以根据不同语言生成对应语言的代码文件。

heartbeat.proto文件:

syntax = "proto3";

message HeartbeatRequest{
	string Host      = 1;
	int32  Mem       = 2;
	int32  Disk      = 3;
	int32  Cpu       = 4;
	int64  Timestamp = 5;
	int64  Seq       = 6;

}

message HeartbeatResponse{
	int32  ErrCode   = 1;
	string ErrMsg    = 2;
}

heartbeat_service.proto

syntax = "proto3";

import "heartbeat.proto";

// HeartBeatService
service HeartBeatService{
    rpc HeartBeat(HeartbeatRequest) returns(HeartbeatResponse编程客栈){}
}

核心 就是一个 用于生成需要用到数据类型的文件;一个就是用于生成相关调用方法的类。 一个定义数据类型,一个用于定义方法。

通过proto生成.py文件

proto文件需要通过protoc生成对应的.py文件。protoc的下载地址 。下载解压之后,将解压目录添加到path的环境变量中。

pip install grpcio
install grpcio-tools
#pip  install --upghttp://www.cppcns.comrade protobuf

注意:【下面命令是在proto文件所在的目录执行的,-I 用来指定proto的目录是 . 】

python -m grpc_tools.protoc -I=. --python_out=.. heartbeat.proto
python -m grpc_tools.protoc -I=. --grpc_python_out=.. heartbeat_service.proto
  • -I 指定proto所在目录
  • -m 指定通过protoc生成py文件
  • –python_out生成py文件的输出路径
  • heartbeat.proto、heartbeat_service.proto为 输入的proto文件

用Python编写简单的gRPC服务的详细过程

  • 生成的文件名中 xxx_pb2.py 就是我们刚才创建数据结构文件,里面有定义函数参数和返回数据结构; xxx_pb2_grpc.py 就是我们定义的函数,定义了我们客服端rpc将来要调用方法。

编译客户端和服务端代码

服务端

#!/usr/bin/env python
# coding=utf-8
import sys
from concurrent import futures
import time

import gr编程客栈pc
from google.protobuf.json_format import MessageToJson

import heartbeat_service_pb2_grpc
import heartbeat_pb2
from lib.core.log import LOGGER


class HeartBeatSrv(heartbeat_service_pb2_grpc.HeartBeatServiceServicer):
    def HeartBeat(self, msg, context):
        try:
            # LOGGER.info(MessageToJson(msg, preserving_proto_field_name=True))
            body = MessageToJson(msg, preserving_proto_field_name=True)
            LOGGER.info("Get Heartbeat Request: %s", body)

            response = heartbeat_pb2.HeartbeatResponse()
            response.ErrCode = 0000
            response.ErrMsg = "success"

            return response
        except Exception as e:
            print("exception in heartbeat")
            LOGGER.error("RPC Service exception: %s", e)
            response = heartbeat_pb2.HeartbeatResponse()
            response.ErrCode = 500
            response.ErrMsg = "rpc error: %s" % e
            return response


def server(host, rpc_port):
    # 这里通过thread pool来并发处理server的任务
    # 定义服务器并设置最大连接数,concurrent.futures是一个并发库,类似于线程池的概念
    grpc_server = grpc.server(future编程客栈s.ThreadPoolExecutor(max_workers=100))
    # 不使用SSL
    grpc_server.add_insecure_port('[::]' + ':' + str(rpc_port))
    # This method is only safe to call before the server is started.
    #  绑定处理器HeartBeatSrv(自己实现了处理函数)
    heartbeat_service_pb2_grpc.add_HeartBeatServiceServicer_to_server(HeartBeatSrv(), grpc_server)
    # 该方法只能调用一次, start() 不会阻塞
    # 启动服务器
    grpc_server.start()
    LOGGER.info("server start...")
    while 1:
        time.sleep(10)
    #grpc_server.wait_for_termination()


def main():
    try:
        LOGGER.info("begin start server")

        rpc_port = 8090
        host = "::"
        server(host, rpc_port)

    except Exception as e:
        LOGGER.error("server start error: %s", e)
        time.sleep(5)


if __name__ == '__main__':
    LOGGER.info(sys.path)
    main()

客户端

from time import sleep

import grpc

import heartbeat_pb2
import heartbeat_service_pb2_grpc
from lib.core.log import LOGGER


def run(seq):
    option = [('grpc.keepalive_timeout_ms', 10000)]
    #
    with grpc.insecure_channel(target='127.0.0.1:8090', options=option) as channel:
        # 客户端实例
        stub = heartbeat_service_pb2_grpc.HeartBeatServiceStub(channel)
        # stub调用服务端方法
        response = stub.HeartBeat(heartbeat_pb2.HeartbeatRequest(Host='hello grpc', Seq=seq), timeout=10)
        LOGGER.info("response ErrCode:%s", response.ErrCode)


if __name__ == '__main__':

    for i in range(1, 10000):
        LOGGER.info("i: %s", i)
        sleep(3)
        run(i)

参考

使用Python实现gRPC通信

参考URL: https://zhuanlan.zhihu.com/p/363810793

python grpc搭构服务

https://www.jianshu.com/p/10d9ca034567

python grpc 服务端和客户端调用demo

参考URL: https://blog.csdn.net/qq_42363032/article/details/115282405

到此这篇关于用Python编写简单的gRPC服务的文章就介绍到这了,更多相关Python gRPC服务内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

0

精彩评论

暂无评论...
验证码 换一张
取 消