参考:
https://blog.csdn.net/bocai_xiaodaidai/article/details/103958468
https://blog.csdn.net/Xw_Classmate/article/details/118643368
简介
gRPC
是由 google
开发的一个高性能、通用的开源RPC
框架,主要面向移动应用开发且基于HTTP/2
协议标准而设计,同时支持大多数流行的编程语言。
优点
- 高效的二进制编码机制
- 采用http2协议
- 清晰的接口规范
- 对流的支持
安装模块
pip3 install grpcio-tools==1.47.0
# 自动安装好了以下依赖模块
# Installing collected packages: protobuf, grpcio, grpcio-tools
# Successfully installed grpcio-1.48.0 grpcio-tools-1.47.0 protobuf-3.20.1
文件结构
├── client.py # gRPC 客户端代码
├── data_pb2_grpc.py # 由data.proto生成
├── data_pb2.py # 由data.proto生成
├── data.proto # protobuf数据格式
└── server.py # gRPC 服务端代码
生成protobuf文件
syntax = "proto3";
// 定义服务,用在rpc传输中
service GRPCTest {
rpc RemoteFunc(request_data) returns (response_data);
}
// 请求服务数据结构
message request_data {
string text = 1;
}
// 返回结果数据结果
message response_data {
int32 code = 1; // 状态码
string result = 2; // 结果
}
python3 -m grpc_tools.protoc -I. --python_out=. --grpc_python_out=. ./data.proto
data_pb2_grpc.py:
# Generated by the gRPC Python protocol compiler plugin. DO NOT EDIT!
"""Client and server classes corresponding to protobuf-defined services."""
import grpc
import data_pb2 as data__pb2
class GRPCTestStub(object):
"""定义服务,用在rpc传输中
"""
def __init__(self, channel):
"""Constructor.
Args:
channel: A grpc.Channel.
"""
self.RemoteFunc = channel.unary_unary(
'/GRPCTest/RemoteFunc',
request_serializer=data__pb2.request_data.SerializeToString,
response_deserializer=data__pb2.response_data.FromString,
)
class GRPCTestServicer(object):
"""定义服务,用在rpc传输中
"""
def RemoteFunc(self, request, context):
"""Missing associated documentation comment in .proto file."""
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
context.set_details('Method not implemented!')
raise NotImplementedError('Method not implemented!')
def add_GRPCTestServicer_to_server(servicer, server):
rpc_method_handlers = {
'RemoteFunc': grpc.unary_unary_rpc_method_handler(
servicer.RemoteFunc,
request_deserializer=data__pb2.request_data.FromString,
response_serializer=data__pb2.response_data.SerializeToString,
),
}
generic_handler = grpc.method_handlers_generic_handler(
'GRPCTest', rpc_method_handlers)
server.add_generic_rpc_handlers((generic_handler,))
# This class is part of an EXPERIMENTAL API.
class GRPCTest(object):
"""定义服务,用在rpc传输中
"""
@staticmethod
def RemoteFunc(request,
target,
options=(),
channel_credentials=None,
call_credentials=None,
insecure=False,
compression=None,
wait_for_ready=None,
timeout=None,
metadata=None):
return grpc.experimental.unary_unary(request, target, '/GRPCTest/RemoteFunc',
data__pb2.request_data.SerializeToString,
data__pb2.response_data.FromString,
options, channel_credentials,
insecure, call_credentials, compression, wait_for_ready, timeout, metadata)
data_pb2.py:
# -*- coding: utf-8 -*-
# Generated by the protocol buffer compiler. DO NOT EDIT!
# source: data.proto
"""Generated protocol buffer code."""
from google.protobuf import descriptor as _descriptor
from google.protobuf import descriptor_pool as _descriptor_pool
from google.protobuf import message as _message
from google.protobuf import reflection as _reflection
from google.protobuf import symbol_database as _symbol_database
# @@protoc_insertion_point(imports)
_sym_db = _symbol_database.Default()
DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\ndata.proto\"\x1c\n\x0crequest_data\x12\x0c\n\x04text\x18\x01 \x01(\
t\"-\n\rresponse_data\x12\x0c\n\x04\x63ode\x18\x01 \x01(\x05\x12\x0e\n\x06result\x18\x02 \x01(\t27\n\x08GRPCTest\x12+\n\nRemoteFunc
\x12\r.request_data\x1a\x0e.response_datab\x06proto3')
_REQUEST_DATA = DESCRIPTOR.message_types_by_name['request_data']
_RESPONSE_DATA = DESCRIPTOR.message_types_by_name['response_data']
request_data = _reflection.GeneratedProtocolMessageType('request_data', (_message.Message,), {
'DESCRIPTOR' : _REQUEST_DATA,
'__module__' : 'data_pb2'
# @@protoc_insertion_point(class_scope:request_data)
})
_sym_db.RegisterMessage(request_data)
response_data = _reflection.GeneratedProtocolMessageType('response_data', (_message.Message,), {
'DESCRIPTOR' : _RESPONSE_DATA,
'__module__' : 'data_pb2'
# @@protoc_insertion_point(class_scope:response_data)
})
_sym_db.RegisterMessage(response_data)
_GRPCTEST = DESCRIPTOR.services_by_name['GRPCTest']
if _descriptor._USE_C_DESCRIPTORS == False:
DESCRIPTOR._options = None
_REQUEST_DATA._serialized_start=14
_REQUEST_DATA._serialized_end=42
_RESPONSE_DATA._serialized_start=44
_RESPONSE_DATA._serialized_end=89
_GRPCTEST._serialized_start=91
_GRPCTEST._serialized_end=146
# @@protoc_insertion_point(module_scope)
服务端代码
server.py:
import grpc
import time
from concurrent import futures
import data_pb2, data_pb2_grpc
_ONE_DAY_IN_SECONDS = 60 * 60 * 24
_HOST = 'localhost'
_PORT = '8080'
# 继承类并且复写接口
class GRPCTestServicer(data_pb2_grpc.GRPCTestServicer):
# 复写函数
def RemoteFunc(self, request, context):
text = request.text
if text:
return data_pb2.response_data(result=text.upper(), code=0)
else:
return data_pb2.response_data(result='传入参数为空字符!', code=1)
def serve():
# 定义服务器并设置最大连接数,corcurrent.futures是一个并发库,类似于线程池的概念
grpcServer = grpc.server(futures.ThreadPoolExecutor(max_workers=4)) # 创建一个服务器
data_pb2_grpc.add_GRPCTestServicer_to_server(GRPCTestServicer(), grpcServer) # 在服务器中添加派生的接口服务(自己实现了处理函数)
grpcServer.add_insecure_port(_HOST + ':' + _PORT) # 添加监听端口
grpcServer.start() # 启动服务器
print('启动RPC Server')
try:
while True:
time.sleep(_ONE_DAY_IN_SECONDS)
except KeyboardInterrupt:
grpcServer.stop(0) # 关闭服务器
if __name__ == '__main__':
serve()
客户端代码
import grpc
import data_pb2, data_pb2_grpc
_HOST = 'localhost'
_PORT = '8080'
def run():
text = input('输入字符串,字母转为大写:')
conn = grpc.insecure_channel(_HOST + ':' + _PORT) # 监听频道
client = data_pb2_grpc.GRPCTestStub(channel=conn) # 客户端使用Stub类发送请求,参数为频道,为了绑定链接
response = client.RemoteFunc(data_pb2.request_data(text=text)) # 返回的结果就是proto中定义的类
code = response.code
if code == 0:
print("返回结果:" + response.result)
else:
print("操作错误:" + response.result)
if __name__ == '__main__':
run()
运行结果
# 启动服务端:
$ python3 server.py
启动RPC Server
# 客户端请求:
$ python3 client.py
输入字符串,字母转为大写:test123
返回结果:TEST123
$ python3 client.py
输入字符串,字母转为大写:
操作错误:传入参数为空字符!
赏
使用支付宝打赏
使用微信打赏
若你觉得我的文章对你有帮助,欢迎点击上方按钮对我打赏