Skip to main content

Python and gRPC - Mastering Code Generation for Efficient Communication

· 15 min read

SHARE THIS BLOG:

Python and gRPC: Mastering Code Generation for Efficient Communication

In today's fast-paced world, building efficient and responsive distributed systems is paramount. Among the many tools at a developer's disposal, gRPC stands out as a go-to choice for creating high-performance communication channels. And when combined with Python's intuitive syntax and wide range of libraries, you're set for success.

In this comprehensive guide, we'll demystify gRPC, set up a Python project from scratch, and delve deep into the functionalities of Python's gRPC toolkit. By the end, you'll be equipped to craft a fully operational gRPC service in Python, ready to be integrated into any modern system. Whether you're new to gRPC, Python, or both, this guide is tailored to provide a structured path to mastery.

Understanding gRPC: A Framework for Modern, Efficient Communication

gRPC, standing for gRPC Remote Procedure Call, is an open-source framework initiated by Google. It capitalizes on the advanced features of HTTP/2, such as multiplexing and header compression, combined with Protocol Buffers, to ensure efficient, high-performance inter-service communication.

Core Features:

  • HTTP/2 Driven: Utilizing the strengths of HTTP/2, gRPC achieves reduced latency, efficient connection usage, and improved data transport.
  • Performance-Oriented: Specifically designed for high throughput with low latency.
  • Adaptable Streaming: Supports diverse streaming types, encompassing client, server, and bidirectional.
  • Type Safeguard: With Protocol Buffers at its core for both interface definition and data serialization, gRPC guarantees consistent, strongly-typed interactions.

Protobuf Crashcourse and Protoc

Protocol Buffers, or protobuf for short, is a data serialization format. It's also used as an Interface Definition Language (IDL) for describing the service and message types.

Example Proto File

syntax = "proto3";
service HelloWorld {
rpc SayHello (HelloRequest) returns (HelloReply);
}

message HelloRequest {
string name = 1;
}

message HelloReply {
string message = 1;
}

Protoc Compiler

After defining your .proto files, you use the Protocol Buffer Compiler (protoc) to generate data classes in various languages.

protoc --python_out=. --grpc_python_out=. hello_world.proto

Project Setup

Requirements

  • Python 3.7 or higher

  • gRPC and gRPC-tools Python packages

Installation

pip install grpcio grpcio-tools

Directory Structure

my_grpc_project/
|-- protos/
| |-- hello_world.proto
|-- server/
| |-- server.py
|-- client/
| |-- client.py

Compiling protobuf files to python

After we have setup our protobuf files we need to compile them with the protoc for that we will use the shipped binary protoc included with the grpc plugin to generate our services boilerplate code.

python -m grpc_tools.protoc -Iprotos/ --python_out=. --pyi_out=. --grpc_python_out=.

Generated Modules in Python

After running protoc, you'll get two generated Python files:

  • hello_world_pb2.py: Contains message classes
  • hello_world_pb2.pyi: Contains type hinting for classes
  • hello_world_pb2_grpc.py: Contains server and client classes

Type Hinting with pyi Files

Type hinting is not automatically included in the generated Python gRPC files, but you can create .pyi files to add this information manually. This is useful for providing a better developer experience with tools like linters and IDEs that use these hints.

Creating a pyi File for the Generated Modules

Create a file named hello_world_pb2.pyi with the same module name as your generated hello_world_pb2.py file and add your type hints there. Do the same for hello_world_pb2_grpc.pyi.

Example hello_world_pb2.pyi:

from typing import Any
class HelloRequest:
name: str
def __init__(self, name: str = ...) -> None: ...

class HelloReply:
message: str
def __init__(self, message: str = ...) -> None: ...

Using the .pyi File

Once the .pyi file is in the same directory as the generated .py file, your IDE should automatically pick up the type hints.

gRPC Server Setup Logic

Imports

from concurrent import futures
import grpc
import hello_world_pb2
import hello_world_pb2_grpc

Imports:

  • First we import gRPC official lib in python to access the gRPC framework API’s
  • Then we imported futures from concurrent python core lib, we will use the futures module to initialize a threadpool for our gRPC server that will handle the requests
  • Also we are requiring the generated modules from the protoc and our service implementations if they are on different files (here we specified the service implementation inline with the same server setup)

Service implementation

HelloWorldService Class

In any gRPC application, defining the server-side logic is crucial. In our Python example, this is done in the HelloWorldService class. This class is essentially the heart of our gRPC server, where we implement all the methods declared in the .proto file. Let's dissect this class to understand its structure and functionality.

Class Inheritance

class HelloWorldService(hello_world_pb2_grpc.HelloWorldServicer):

Our HelloWorldService class inherits from hello_world_pb2_grpc.HelloWorldServicer, which is an auto-generated class by the gRPC framework after compiling the .proto file. By inheriting from this class, HelloWorldService promises to implement all the methods that the HelloWorldServicer declares. In our example, it's the SayHello method.

Method Implementation: SayHello

def SayHello(self, request, context):
return hello_world_pb2.HelloReply(message=f"Hello, {request.name}")

Here's a breakdown of the SayHello method:

  • Parameters:

    • self: The instance of the class itself. Standard in Python object-oriented programming.
    • request: This is the incoming request from the client. It will be an instance of the HelloRequest message defined in your .proto file.
    • context: Provides RPC-related information like timeout or cancellation.
  • Method Body:

    • return hello_world_pb2.HelloReply(message=f"Hello, {request.name}"): The method processes the request, which is expected to contain a field named name, and sends back a HelloReply message containing the response string.

What’s Happening Behind the Scenes?

When a client calls the SayHello RPC method, the gRPC server handling the client's request knows to direct that call to the SayHello method in our HelloWorldService class. The request parameter is automatically populated with the client’s message, which should be of the HelloRequest type as defined in the .proto file. The SayHello method then processes this message and returns an appropriate HelloReply message.

Here’s how the SayHello method would work in practice:

  1. The client sends a HelloRequest message with a name field (e.g., name="Alice").

  2. The server receives this request and invokes the SayHello method.

  3. The SayHello method processes the name ("Alice") and prepares a HelloReply message (e.g., "Hello, Alice").

  4. This HelloReply message is sent back to the client.

Full Class Code for Reference:

from concurrent import futures
import grpc
import hello_world_pb2
import hello_world_pb2_grpc

class HelloWorldService(hello_world_pb2_grpc.HelloWorldServicer):

def SayHello(self, request, context):
return hello_world_pb2.HelloReply(message=f"Hello, {request.name}")

Server setup

After we have understood the role of the HelloWorldService class, the next cornerstone in building a gRPC server in Python is setting up the server itself. This is orchestrated by the serve() function. The function combines multiple aspects, such as thread pool management, servicer addition, port binding, server starting, and waiting for server termination. Let's dissect each of these elements.

ThreadPool

server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))

The first step inside the serve() function involves creating a new gRPC server instance and associating it with a ThreadPool. A ThreadPool is a pool of threads that can execute tasks asynchronously. The ThreadPoolExecutor from Python’s concurrent.futures library is used here with a maximum of 10 worker threads. This allows the server to handle multiple requests concurrently, enhancing performance.

Adding the Servicer

hello_world_pb2_grpc.add_HelloWorldServicer_to_server(HelloWorldService(), server)

Once we have our server instance, we need to add the service logic to it. This is where our previously defined HelloWorldService class comes into play. The add_HelloWorldServicer_to_server function attaches the HelloWorldService to our server instance. This essentially means that the server now knows that for any incoming RPC requests that match the methods defined in HelloWorldService, the corresponding methods in that class should be invoked.

Adding the Port


server.add_insecure_port('[::]:50051')

After adding the servicer, the next step is to specify the port where the server should listen for incoming requests. In this example, the server is set to listen on all available network interfaces ([::]) at port 50051. The add_insecure_port method is used, which means that the communication will not be encrypted (ideal for local testing but not for production).

Starting the Server

server.start()

With everything set up, the server can be started using the start() method. This fires up the server, and it begins listening for incoming RPC calls.

Waiting for Termination

server.wait_for_termination()

The final step in our serve() function is to keep the server running. The wait_for_termination() method ensures that the server will not shut down immediately and will wait for an explicit termination call. This is useful for keeping the server alive and responsive to incoming RPCs indefinitely, until manually terminated.

Complete serve() Function for Reference

def serve():
server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
hello_world_pb2_grpc.add_HelloWorldServicer_to_server(HelloWorldService(), server)
server.add_insecure_port('[::]:50051')
server.start()
server.wait_for_termination()

By now, you should have a solid understanding of how the serve() function configures and starts the gRPC server. Each step plays a crucial role in making your server functional and accessible. Armed with this knowledge, you can extend this basic server to accommodate more advanced use-cases and functionalities.

Here's how this setup all together:

from concurrent import futures
import grpc
import hello_world_pb2
import hello_world_pb2_grpc

class HelloWorldService(hello_world_pb2_grpc.HelloWorldServicer):

def SayHello(self, request, context):
return hello_world_pb2.HelloReply(message=f"Hello, {request.name}")

def serve():
server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
hello_world_pb2_grpc.add_HelloWorldServicer_to_server(HelloWorldService(), server)
server.add_insecure_port('[::]:50051')
server.start()
server.wait_for_termination()

if __name__ == '__main__':
serve()

Lets break down the server setup as shown above:

  • HelloWorldService: In any gRPC application, defining the server-side logic is crucial. In our Python example, this is done in the HelloWorldService class. This class is essentially the heart of our gRPC server, where we implement all the methods declared in the .proto file. Let's dissect this class to understand its structure and functionality.

Service Setup Logic: Async and Sync

gRPC supports both asynchronous and synchronous service implementations. Here's a brief introduction:

Synchronous Service

In a synchronous server like the example above, each method corresponds to an RPC.

Asynchronous Service

You can use Python's asyncio to create asynchronous methods. These methods should be coroutines and use asynchronous I/O operations.


class HelloWorldService(hello_world_pb2_grpc.HelloWorldServicer):

async def SayHello(self, request, context):
await asyncio.sleep(1)
return hello_world_pb2.HelloReply(message=f"Hello, {request.name}")

Client Module Example

Creating a client in gRPC involves creating a channel and a stub, and then calling methods on the stub as if it were a local object.

client.py

Here is a simple example for a Python gRPC client for our HelloWorld service.

import grpc
import hello_world_pb2
import hello_world_pb2_grpc

def run():

# Create a channel to the server
channel = grpc.insecure_channel('localhost:50051')

# Create a stub
stub = hello_world_pb2_grpc.HelloWorldStub(channel)

# Create a HelloRequest message
hello_request = hello_world_pb2.HelloRequest(name='World')

# Make the call
response = stub.SayHello(hello_request)

print(f"Client received: {response.message}")

if __name__ == '__main__':
run()

How to Run

To run the client and server, first start the server:

python server/server.py

Then, run the client:

python client/client.py

You should see the output "Client received: Hello, World" on the client terminal, confirming the successful execution of the gRPC call.

Generated project with Sylk CLI

To optimize the gRPC project development process, we introduce Sylk CLI—an open-source, component-based Command Line Interface specifically engineered for building, maintaining, and compiling Protocol Buffers (protobuf) files for gRPC-based APIs.

Architecture Initialization and Schema Compilation

Utilizing Sylk CLI, you can initialize the entire project architecture in a matter of seconds. As you continue to design and compile your schema files, Sylk CLI incrementally populates your project with the necessary boilerplate code. This not only alleviates the repetitive chore of updating endpoints manually but also automates the process of generating new files, ensuring consistency throughout your project.

Getting Started with Sylk CLI

For an in-depth understanding, you can refer to our comprehensive Quick Start Guide. Alternatively, the following minimalistic tutorial should suffice for a quick start:

First install Sylk CLI

pip install sylk

Then after short download you can use the commands on your terminal:

sylk –help

After the CLI tool is set up, you can proceed to create a new project from scratch, utilize a pre-configured template, or even migrate your existing schemas into the Sylk format using the sylk weave command.

Start from scratch:

sylk init my-project
cd my-project
Sylk generate package my_package.v1
Sylk generate message my_package.v1.my_message
Sylk generate service my_package.v1.my_service
Sylk generate rpc my_package.v1.my_service.get_some
Sylk build

Integrate into existing schema:

Sylk init my-existing-project –path .
Sylk weave –dry-run
Sylk build
–dry-run

For weave command to take effect you should drop the –dry-run flag, also you can run with --build flag build the protobuf files and compile them sequentially after the weave process has finished to migrate your schema

Upon executing the sylk build command, the corresponding project directories and files—including the compiled Protocol Buffers in your chosen languages—are automatically generated for both server and client-side modules.

Code Generation with Sylk CLI

Here's a sample of the auto-generated server-side code using Sylk:

"""sylk.build Generated Server Code"""
_ONE_DAY_IN_SECONDS = 60 * 60 * 24
from concurrent import futures
import time
import grpc
from generated.services.protos.sylklabs.scheduler.v1 import scheduler_pb2_grpc as scheduler_v1_pb2_grpc
from generated.services.protos.sylklabs.scheduler.v1 import scheduler_worker_pb2_grpc as scheduler_worker_v1_pb2_grpc
from generated.services.SchedulerService.v1.SchedulerService import SchedulerService as SchedulerService_v1
from generated.services.SchedulerWorkerService.v1.SchedulerWorkerService import SchedulerWorkerService as SchedulerWorkerService_v1

def serve(host="0.0.0.0:44880"):
server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
scheduler_v1_pb2_grpc.add_SchedulerServiceServicer_to_server(SchedulerService_v1(),server)
scheduler_worker_v1_pb2_grpc.add_SchedulerWorkerServiceServicer_to_server(SchedulerWorkerService_v1(),server)
server.add_insecure_port(host)
server.start()
print("[*] Started sylk.build server at -> %s" % (host))

try:
while True:
time.sleep(_ONE_DAY_IN_SECONDS)

except KeyboardInterrupt:
server.stop(0)

if __name__ == "__main__":
serve()

Service Implementation:

"""sylk.build service implemantation for -> SchedulerService"""
import grpc
from google.protobuf.timestamp_pb2 import Timestamp
from typing import Iterator
from generated.services.protos.sylklabs.scheduler.v1 import scheduler_pb2_grpc, scheduler_pb2

class SchedulerService(scheduler_pb2_grpc.SchedulerServiceServicer):
def Execute(self, request: scheduler_pb2.ExecuteRequest, context: grpc.ServicerContext) -> scheduler_pb2.ExecuteResponse:
# response = scheduler_pb2.ExecuteResponse(task_id=None,state=None)
# return response
super().Execute(request, context)

def Schedule(self, request: scheduler_pb2.ScheduleRequest, context: grpc.ServicerContext) -> scheduler_pb2.ScheduleResponse:
# response = scheduler_pb2.ScheduleResponse(scheduled_task_id=None)
# return response
super().Schedule(request, context)

Similarly, the client-side code is generated as follows:

from typing import Tuple, Iterator, Any
import grpc
import os
import sys
# Adding protos module path if needed
script_dir = os.path.dirname(os.path.abspath(__file__))
proto_module = os.path.join(script_dir, "protos")
if proto_module not in sys.path:
# Insert the protos modules path at the beginning of sys.path (to give it higher priority)
sys.path.insert(0, proto_module)

from functools import partial
import logging
from generated.clients.python.protos.protot.core import task_pb2 as task_, task_pb2_grpc as task__grpc
from generated.clients.python.protos.protot.core import configs_pb2 as configs_, configs_pb2_grpc as configs__grpc
from generated.clients.python.protos.protot.metrics.v1 import metrics_pb2 as metrics_v1, metrics_pb2_grpc as metrics_v1_grpc
from generated.clients.python.protos.protot.scheduler.v1 import scheduler_worker_pb2 as scheduler_worker_v1, scheduler_worker_pb2_grpc as scheduler_worker_v1_grpc
from generated.clients.python.protos.protot.scheduler.v1 import scheduler_pb2 as scheduler_v1, scheduler_pb2_grpc as scheduler_v1_grpc
from google.protobuf import struct_pb2
from google.protobuf import timestamp_pb2
from google.protobuf import any_pb2
from google.protobuf import wrappers_pb2
from google.protobuf import field_mask_pb2
from google.protobuf import empty_pb2
from google.protobuf import duration_pb2

# For available channel options in python visit https://github.com/grpc/grpc/blob/v1.46.x/include/grpc/impl/codegen/grpc_types.h
_CHANNEL_OPTIONS = (("grpc.keepalive_permit_without_calls", 1),
("grpc.keepalive_time_ms", 120000),
("grpc.keepalive_timeout_ms", 20000),
("grpc.http2.min_time_between_pings_ms", 120000),
("grpc.http2.max_pings_without_data", 1),)
# Global metadata
_METADATA = (('sylk-version','0.4.1'),)
# Global auth key that will be verified by sylk client
_GLOBAL_AUTH_KEY = None
# Generated thanks to [sylk.build](https://www.sylk.build)
class SchedulerService_v1:
"""
service class generated by sylk.build
File: protot.scheduler.v1.SchedulerService
Service: SchedulerService
Version: v1
"""

def __init__(self,channel: grpc.ChannelCredentials = None, client_opt = {}):
logging.root.setLevel(client_opt.get('log_level','ERROR'))
if channel is None:
self.channel = grpc.insecure_channel('{0}:{1}'.format(client_opt.get('host','localhost'), client_opt.get('port',44880)),_CHANNEL_OPTIONS)
try:
grpc.channel_ready_future(self.channel).result(timeout=client_opt.get('timeout',10))
except grpc.FutureTimeoutError:
logging.error('Timedout: server seems to be offline. verify your connection configs.')
sys.exit(1)

else:
self.channel = channel
self.SchedulerService_v1_stub = scheduler_v1_grpc.SchedulerServiceStub(self.channel)

def Execute_WithCall(self, request: scheduler_v1.ExecuteRequest, metadata: Tuple[Tuple[str,str]] = _METADATA) -> Tuple[scheduler_v1.ExecuteResponse, Any]:
"""sylk - Returns: RPC output and a call object"""
return self.SchedulerService_v1_stub.Execute.with_call(request,metadata=metadata)

def Execute(self, request: scheduler_v1.ExecuteRequest, metadata: Tuple[Tuple[str,str]] = _METADATA) -> scheduler_v1.ExecuteResponse:
"""sylk - """
return self.SchedulerService_v1_stub.Execute(request,metadata=metadata)

def Schedule_WithCall(self, request: scheduler_v1.ScheduleRequest, metadata: Tuple[Tuple[str,str]] = _METADATA) -> Tuple[scheduler_v1.ScheduleResponse, Any]:
"""sylk - Returns: RPC output and a call object"""
return self.SchedulerService_v1_stub.Schedule.with_call(request,metadata=metadata)

def Schedule(self, request: scheduler_v1.ScheduleRequest, metadata: Tuple[Tuple[str,str]] = _METADATA) -> scheduler_v1.ScheduleResponse:
"""sylk - """
return self.SchedulerService_v1_stub.Schedule(request,metadata=metadata)

class SchedulerWorkerService_v1:
"""
service class generated by sylk.build
File: protot.scheduler.v1.SchedulerWorkerService
Service: SchedulerWorkerService
Version: v1
"""

def __init__(self,channel: grpc.ChannelCredentials = None, client_opt = {}):
logging.root.setLevel(client_opt.get('log_level','ERROR'))
if channel is None:
self.channel = grpc.insecure_channel('{0}:{1}'.format(client_opt.get('host','localhost'), client_opt.get('port',44880)),_CHANNEL_OPTIONS)
try:
grpc.channel_ready_future(self.channel).result(timeout=client_opt.get('timeout',10))
except grpc.FutureTimeoutError:
logging.error('Timedout: server seems to be offline. verify your connection configs.')
sys.exit(1)

else:
self.channel = channel
self.SchedulerWorkerService_v1_stub = scheduler_worker_v1_grpc.SchedulerWorkerServiceStub(self.channel)

def Communicate_WithCall(self, request: Iterator[scheduler_worker_v1.WorkerMessage], metadata: Tuple[Tuple[str,str]] = _METADATA) -> Tuple[Iterator[scheduler_worker_v1.SchedulerMessage], Any]:
"""sylk - The Communicate RPC method sets up a bidirectional stream between
the scheduler and a worker node. Returns: RPC output and a call object"""
return self.SchedulerWorkerService_v1_stub.Communicate.with_call(request,metadata=metadata)

def Communicate(self, request: Iterator[scheduler_worker_v1.WorkerMessage], metadata: Tuple[Tuple[str,str]] = _METADATA) -> Iterator[scheduler_worker_v1.SchedulerMessage]:
"""sylk - The Communicate RPC method sets up a bidirectional stream between
the scheduler and a worker node."""
return self.SchedulerWorkerService_v1_stub.Communicate(request,metadata=metadata)

Conclusion

The modern world of distributed systems demands efficiency, and that's exactly what the combination of Python and gRPC brings to the table. As we have demonstrated, gRPC with its HTTP/2-driven, performance-oriented, and strongly-typed nature is a formidable tool for building efficient service-to-service communication channels. When you pair this with Python's simplicity and extensive library ecosystem, you've got a recipe for developing robust and maintainable distributed systems.

In this guide, we took a deep dive into setting up a gRPC server in Python, touching upon essential topics like Protocol Buffers, code generation using protoc, type hinting, and the nuts and bolts of server and service implementation. For those seeking to optimize their development workflow, we also introduced Sylk CLI—a tool designed to automate the repetitive aspects of gRPC project setup.

Whether you're a beginner or an experienced developer, gRPC offers a range of functionalities that cater to various use-cases. We hope this guide serves as a comprehensive resource for your journey towards mastering gRPC in Python. With the principles and practices laid out here, you are now well-equipped to implement gRPC in your projects, enabling efficient, scalable, and versatile communication between services. Happy coding!

SHARE THIS BLOG:

Get Started

Copy & paste the following command line in your terminal to create your first Sylk project.

pip install sylkCopy

Sylk Cloud

Experience a streamlined gRPC environment built to optimize your development process.

Get Started

Redefine Your protobuf & gRPC Workflow

Collaborative Development, Safeguarding Against Breaking Changes, Ensuring Schema Integrity.