首先通过Python瞥见GRPC(第2部分)
#python #microservices #grpc #restapi

1.上一篇文章

First glimpse into gRPC through Python (Part 1)

2.实施和示例

在上一篇文章中引入了GRPC概念和开发流程后,我们将展示如何在Python中实施GRPC。

Example Code link(要运行以下示例,请在链接中浏览README.md

2.1。 4种服务方法

GRPC中有4种服务方法,它们之间的区别是应用蒸汽或不在请求或响应中。

2.1.1。简单的RPC

这种类型就像HTTP请求和响应一样。客户端将单个请求发送到服务器和服务器回复单个响应。

原始文件

rpc doSimple ( Request ) returns ( Response ) {}

服务器

def doSimple(self, request, context):
    output = f"Hello { request.input }!"
    return Response( output=f"Hello { request.input }!" )

客户端

def call_doSimple( self ):
    with grpc.insecure_channel('localhost:50051') as channel:
        stub = SampleServiceStub( channel )
        request = Request( input=self.faker.name() )
        logger.info( f"doSimple client sent: { request.input }" )
        response = stub.doSimple( request )
        logger.info( f"doSimple client received: { response.output }" )

命令行输出

[ 2022-07-15 09:43:27,530 ][ INFO ][ call_doSimple ] doSimple client sent: Lindsay Ross
[ 2022-07-15 09:43:27,531 ][ INFO ][ call_doSimple ] doSimple client received: Hello Lindsay Ross!

2.1.2。响应流式RPC

对于这种类型,流在响应中发生。客户端向服务器发送一个请求,服务器返回多个响应的流。

原始文件

rpc doResponseStreaming( Request ) returns ( stream Response ) {}

服务器

def doResponseStreaming(self, request, context):
    faker = Faker()
    name_list = [ *[ faker.name() for i in range( 3 ) ], request.input ]

    for name in name_list:
        time.sleep( 0.5 )
        yield Response( output=name )

客户端

def call_doResponseStreaming( self ):
    with grpc.insecure_channel('localhost:50051') as channel:
        stub = SampleServiceStub( channel )
        request = Request( input=self.faker.name() )
        logger.info( f"doResponseStreaming client sent: { request.input }" )
        response_generator = stub.doResponseStreaming( request )
        for response in response_generator:
            logger.info( f"doResponseStreaming client received: { response.output }" )

命令行输出

[ 2022-07-15 10:14:27,347 ][ INFO ][ call_doResponseStreaming ] doResponseStreaming client sent: Veronica Good
[ 2022-07-15 10:14:27,971 ][ INFO ][ call_doResponseStreaming ] doResponseStreaming client received: Richard Torres
[ 2022-07-15 10:14:28,472 ][ INFO ][ call_doResponseStreaming ] doResponseStreaming client received: Monica Russo
[ 2022-07-15 10:14:28,985 ][ INFO ][ call_doResponseStreaming ] doResponseStreaming client received: Sean Lane
[ 2022-07-15 10:14:29,498 ][ INFO ][ call_doResponseStreaming ] doResponseStreaming client received: Veronica Good

2.1.3。请求流RPC

类似于上述类型,但是流中的流中发生在请求中。客户端将多个请求的流传输发送到服务器,服务器返回单个响应。

原始文件

rpc doRequestStreaming ( stream Request ) returns ( Response ) {}

服务器

def doRequestStreaming(self, request_iterator, context):
    result_list = []
    for request in request_iterator:
        result_list.append( request.input.upper() )

    return Response( output=", ".join( result_list ) )

客户端

def call_doRequestStreaming( self ):

    def get_fake_name_generator():
        faker = Faker()
        for _ in range( 10 ):
            time.sleep( 0.5 )
            name = faker.name()
            logger.info( f"doRequestStreaming client sent: { name }." )
            yield Request( input=name )

    with grpc.insecure_channel('localhost:50051') as channel:
        stub = SampleServiceStub( channel )
        request = get_fake_name_generator()
        response = stub.doRequestStreaming( request )
        logger.info( f"doRequestStreaming client received: { response.output }" )

命令行输出

[ 2022-07-15 10:21:08,058 ][ INFO ][ get_fake_name_generator ] doRequestStreaming client sent: Courtney Hammond.
[ 2022-07-15 10:21:08,562 ][ INFO ][ get_fake_name_generator ] doRequestStreaming client sent: James Petersen.
[ 2022-07-15 10:21:09,070 ][ INFO ][ get_fake_name_generator ] doRequestStreaming client sent: Tom Anderson.
[ 2022-07-15 10:21:09,073 ][ INFO ][ call_doRequestStreaming ] doRequestStreaming client received: COURTNEY HAMMOND, JAMES PETERSEN, TOM ANDERSON

2.1.4。双向流RPC

您可能已经猜到了名称的含义,流中流都会在请求和响应中发生。

原始文件

rpc doBidirectional ( stream Request ) returns ( stream Response ) {}

服务器

def doBidirectional(self, request_iterator, context):
    for request in request_iterator:
        yield Response( output=request.input + " is excellent!" )

客户端

def call_doBidirectional( self ):

    def get_fake_name_generator():
        faker = Faker()
        for _ in range( 3 ):
            time.sleep( 0.5 )
            name = faker.name()
            logger.info( f"doRequestStreaming client sent: { name }." )
            yield Request( input=name )

    with grpc.insecure_channel('localhost:50051') as channel:
        stub = SampleServiceStub( channel )
        request = get_fake_name_generator()
        response_generator = stub.doBidirectional( request )
        for response in response_generator:
            logger.info( f"doBidirectional client received: { response.output }" )

命令行输出

[ 2022-07-15 10:41:11,994 ][ INFO ][ get_fake_name_generator ] doRequestStreaming client sent: Sherry Hanson.
[ 2022-07-15 10:41:11,996 ][ INFO ][ call_doBidirectional ] doBidirectional client received: Sherry Hanson is excellent!
[ 2022-07-15 10:41:12,497 ][ INFO ][ get_fake_name_generator ] doRequestStreaming client sent: Danielle Jones.
[ 2022-07-15 10:41:12,499 ][ INFO ][ call_doBidirectional ] doBidirectional client received: Danielle Jones is excellent!
[ 2022-07-15 10:41:12,999 ][ INFO ][ get_fake_name_generator ] doRequestStreaming client sent: Alexis Goodwin.
[ 2022-07-15 10:41:13,001 ][ INFO ][ call_doBidirectional ] doBidirectional client received: Alexis Goodwin is excellent!

2.2。特殊数据类型和默认值

grpc可以处理某些子数据类型,例如日期时间,列表或地图,但需要注意以避免错误。

2.2.1。约会时间

grpc使用时间戳来处理日期时间,并使用时区。

在原始文件中

google.protobuf.Timestamp date = 1;

您需要在输入日期中包含时区才能正确地将数据传输到服务器端。

以下是输出发送日期没有时区从客户端到服务器端:

[ 2022-07-15 11:04:52,633 ][ INFO ][ call_doSpecialDataType ] doSpecialDataType client sent: request.date=2022-07-15 11:04:52
[ 2022-07-15 11:04:52,654 ][ INFO ][ doSpecialDataType ] doSpecialDataType Server received: request.date=2022-07-15 20:04:52

我们可以看到客户端和服务器之间有9个小时的差异,因为GRPC将没有时区的日期时间视为UTC时间。当服务器收到的日期输入时,它会自动添加9小时,因为我们在UTC+09:00区域中。

2.2.2。列表

GRPC可以通过在原始文件中的字段前面添加repeated来处理列表数据类型。

repeated string names = 1;

2.2.3。地图

GRPC可以通过将映射定义为类型并指定proto文件中的键和值类型来处理地图数据类型。

map<string, string> name2phoneNumMap = 3;

2.2.4。默认值

根据此link,它提到

如果标量消息字段设置为默认值,则该值不会在电线上序列化。

我们将在以下示例中解释含义:

原始文件

repeated CardInfo cardInfos = 4;

message CardInfo {
    string name = 1;
    int32 numberOfCreditCard = 2;
}

我们有Cardinfo的列表,每个卡信息包含一个名为numberOfCreditCard的整数字段。在响应中,我们将Cardinfo列表的最后一个Cardinfo的numberOfCreditCard设置为0。

服务器

cardInfos = request.cardInfos
cardInfos.append( CardInfo( name="Boris Lee", numberOfCreditCard=0 ) )

命令行输出

[ 2022-07-15 11:21:39,200 ][ INFO ][ call_doSpecialDataType ] doSpecialDataType client received:  response.cardInfos= [name: "Katherine Soto"
numberOfCreditCard: 1
, name: "Kerry Powell"
numberOfCreditCard: 1
, name: "Mrs. Christina Hicks DDS"
numberOfCreditCard: 1
, name: "Boris Lee" <- No "numberOfCreditCard"
]

我们可以看到,鲍里斯·李(Boris Lee)做 具有数字。由于0被视为默认值,因此不会在电线上序列化并转移回客户。

要解决这个问题,我们需要在原始文件中添加optional

optional int32 numberOfCreditCard = 2;

生成代码并再次运行程序,您可以看到出现零。

, name: "Boris Lee"
numberOfCreditCard: 0
]

3.单位测试

我们可以使用Python本机unit test framework为GRPC编写单元测试。这是至关重要的,因为您无需在代码中发生更改即可一次又一次地打开/关闭GRPC服务器以手动测试您的代码。

3.1。创建测试服务器

首先,您需要创建一个测试服务器,以在setUp方法中接收GRPC调用,以便每当调用测试方法时,它将首先设置测试服务器。

class SampleServiceTest(unittest.TestCase):

    def setUp(self):
        logger.info( f"=== Method: { self._testMethodName } =======" )
        servicers = {
            sample_service_pb2.DESCRIPTOR.services_by_name['SampleService']: SampleService()
        }

        self.test_server = grpc_testing.server_from_dictionary(
            servicers, grpc_testing.strict_real_time())

3.2。创建测试方法

接下来,您需要为要测试的每个GRPC方法创建至少一个测试方法。

def test_doSimple(self):
    faker = Faker()
    request = sample_service_pb2.Request( input=faker.name() )

    doSimple_method = self.test_server.invoke_unary_unary(
        method_descriptor=(sample_service_pb2.DESCRIPTOR
            .services_by_name['SampleService']
            .methods_by_name['doSimple']),
        invocation_metadata={},
        request=request, timeout=1)

    response, _, code, _ = doSimple_method.termination()
    self.assertEqual( code, grpc.StatusCode.OK )
    self.assertEqual( response.output, f"Hello { request.input }!" )

3.3。运行测试

最后,在主线程中运行测试

if __name__ == '__main__':
    unittest.main()

有关详细信息,请在sample_service_test.py

中查看

4.我写了这一系列博客的原因

我写了这个博客,因为我发现当我第一次了解GRPC时,很难以快速的方式掌握开发方法。 GRPC网站中有很多内置示例,但对于初学者来说太复杂了。

起初,我最想知道的是:

1. How can I make a new method?
2. How can I call the method in a microservice through gRPC?
3. How can I handle the input and output?
4. How can I do the unit test?

互联网中有资源,但是它们要么散布在周围,要么使用很多单词来解释而没有图表。结果,我决定以一种简单且实用的方式撰写此博客以介绍GRPC(使用Python),以便新来者可以轻松地拾取该概念,然后立即开始开发。