Python gRPC: Protocol Buffers and Streaming RPCs
gRPC is Google's high-performance RPC framework built on HTTP/2 and Protocol Buffers. It delivers 5–10x lower latency and 3–8x smaller message sizes compared to REST/JSON, making it the standard for internal microservice communication. This guide covers defining services in .proto files, generating Python code, implementing all four RPC types, async servers, and production patterns like authentication and health checks.
Table of Contents
Setup and Proto File
pip install grpcio grpcio-tools grpcio-reflection grpcio-health-checking
// user_service.proto
syntax = "proto3";
package userservice;
service UserService {
// Unary — single request, single response
rpc GetUser (GetUserRequest) returns (User);
rpc CreateUser (CreateUserRequest) returns (User);
// Server streaming — single request, stream of responses
rpc ListUsers (ListUsersRequest) returns (stream User);
// Client streaming — stream of requests, single response
rpc BatchCreateUsers (stream CreateUserRequest) returns (BatchResult);
// Bidirectional streaming
rpc Chat (stream ChatMessage) returns (stream ChatMessage);
}
message User {
string id = 1;
string username = 2;
string email = 3;
int64 created_at = 4;
repeated string roles = 5;
}
message GetUserRequest { string id = 1; }
message CreateUserRequest {
string username = 1;
string email = 2;
string password = 3;
}
message ListUsersRequest {
int32 page_size = 1;
string page_token = 2;
}
message BatchResult { int32 created = 1; repeated string errors = 2; }
message ChatMessage { string user_id = 1; string text = 2; int64 timestamp = 3; }
Code Generation
# Generate Python stubs from .proto file
python -m grpc_tools.protoc \
-I./protos \
--python_out=./generated \
--grpc_python_out=./generated \
./protos/user_service.proto
# This creates:
# generated/user_service_pb2.py — message classes
# generated/user_service_pb2_grpc.py — service stubs and base classes
Unary RPC
Unary RPC is the simplest pattern: one request, one response — like a function call over the network. Implement it by overriding the method in your servicer class.
import grpc
from concurrent import futures
from generated import user_service_pb2 as pb2
from generated import user_service_pb2_grpc as pb2_grpc
import uuid
import time
# In-memory store for demo
_users = {}
class UserServicer(pb2_grpc.UserServiceServicer):
def GetUser(self, request, context):
user_id = request.id
if user_id not in _users:
context.set_code(grpc.StatusCode.NOT_FOUND)
context.set_details(f"User {user_id} not found")
return pb2.User()
u = _users[user_id]
return pb2.User(
id=u["id"],
username=u["username"],
email=u["email"],
created_at=int(u["created_at"]),
roles=u.get("roles", ["user"]),
)
def CreateUser(self, request, context):
user_id = str(uuid.uuid4())
_users[user_id] = {
"id": user_id,
"username": request.username,
"email": request.email,
"created_at": time.time(),
"roles": ["user"],
}
return pb2.User(id=user_id, username=request.username, email=request.email)
def serve():
server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
pb2_grpc.add_UserServiceServicer_to_server(UserServicer(), server)
server.add_insecure_port("[::]:50051")
server.start()
print("Server started on port 50051")
server.wait_for_termination()
if __name__ == "__main__":
serve()
Server and Client Streaming
class UserServicer(pb2_grpc.UserServiceServicer):
def ListUsers(self, request, context):
"""Server streaming — yields users one at a time."""
page_size = request.page_size or 10
count = 0
for user_id, u in _users.items():
if count >= page_size:
break
yield pb2.User(
id=u["id"],
username=u["username"],
email=u["email"],
created_at=int(u["created_at"]),
)
count += 1
def BatchCreateUsers(self, request_iterator, context):
"""Client streaming — receives stream of CreateUserRequests."""
created = 0
errors = []
for req in request_iterator:
try:
user_id = str(uuid.uuid4())
_users[user_id] = {
"id": user_id,
"username": req.username,
"email": req.email,
"created_at": time.time(),
}
created += 1
except Exception as e:
errors.append(str(e))
return pb2.BatchResult(created=created, errors=errors)
# Client usage
def client_example():
with grpc.insecure_channel("localhost:50051") as channel:
stub = pb2_grpc.UserServiceStub(channel)
# Server streaming — iterate the response
for user in stub.ListUsers(pb2.ListUsersRequest(page_size=5)):
print(f" {user.id}: {user.username}")
# Client streaming — send a generator
def user_generator():
for username in ["alice", "bob", "charlie"]:
yield pb2.CreateUserRequest(
username=username,
email=f"{username}@example.com",
password="secret",
)
result = stub.BatchCreateUsers(user_generator())
print(f"Created {result.created} users, {len(result.errors)} errors")
Bidirectional Streaming
import time as _time
class UserServicer(pb2_grpc.UserServiceServicer):
def Chat(self, request_iterator, context):
"""Bidirectional streaming — read messages and yield replies."""
for msg in request_iterator:
# Echo with server-side timestamp
reply = pb2.ChatMessage(
user_id="server",
text=f"Echo: {msg.text}",
timestamp=int(_time.time() * 1000),
)
yield reply
# Client bidirectional streaming
def chat_client():
def generate_messages():
for text in ["Hello", "How are you?", "Goodbye"]:
yield pb2.ChatMessage(
user_id="client_1",
text=text,
timestamp=int(_time.time() * 1000),
)
_time.sleep(0.5)
with grpc.insecure_channel("localhost:50051") as channel:
stub = pb2_grpc.UserServiceStub(channel)
responses = stub.Chat(generate_messages())
for resp in responses:
print(f"Server: {resp.text}")
Async gRPC Server
The grpc.aio module provides a fully async gRPC server that integrates with asyncio, enabling high-concurrency async handlers without thread pools.
import asyncio
import grpc
from grpc import aio
from generated import user_service_pb2 as pb2
from generated import user_service_pb2_grpc as pb2_grpc
class AsyncUserServicer(pb2_grpc.UserServiceServicer):
async def GetUser(self, request, context):
user = await db.find_user(request.id)
if not user:
await context.abort(grpc.StatusCode.NOT_FOUND, f"User {request.id} not found")
return pb2.User(id=user.id, username=user.username, email=user.email)
async def ListUsers(self, request, context):
async for user in db.stream_users(page_size=request.page_size or 10):
yield pb2.User(id=user.id, username=user.username, email=user.email)
async def serve():
server = aio.server()
pb2_grpc.add_UserServiceServicer_to_server(AsyncUserServicer(), server)
server.add_insecure_port("[::]:50051")
await server.start()
print("Async gRPC server on :50051")
await server.wait_for_termination()
if __name__ == "__main__":
asyncio.run(serve())
Interceptors for Auth and Logging
import grpc
from grpc import ServerInterceptor
class AuthInterceptor(grpc.ServerInterceptor):
def __init__(self, valid_tokens):
self.valid_tokens = valid_tokens
def intercept_service(self, continuation, handler_call_details):
metadata = dict(handler_call_details.invocation_metadata)
token = metadata.get("authorization", "").removeprefix("Bearer ")
if token not in self.valid_tokens:
def abort(request, context):
context.abort(grpc.StatusCode.UNAUTHENTICATED, "Invalid token")
return grpc.unary_unary_rpc_method_handler(abort)
return continuation(handler_call_details)
# Apply interceptor to server
server = grpc.server(
futures.ThreadPoolExecutor(max_workers=10),
interceptors=[AuthInterceptor(valid_tokens={"secret-token-123"})],
)
# Client: send metadata with each call
with grpc.insecure_channel("localhost:50051") as channel:
stub = pb2_grpc.UserServiceStub(channel)
metadata = [("authorization", "Bearer secret-token-123")]
user = stub.GetUser(pb2.GetUserRequest(id="123"), metadata=metadata)
Frequently Asked Questions
- When should I use gRPC instead of REST?
- Use gRPC for internal microservice-to-microservice communication where you control both client and server, need high throughput or low latency, or want strong typing across language boundaries. Use REST for public APIs, browser clients, or when HTTP caching matters.
- Can browsers call gRPC services?
- Not natively — browsers cannot use HTTP/2 trailers which gRPC requires. Use gRPC-Web with Envoy as a proxy, or expose a REST/GraphQL gateway alongside your gRPC services for browser clients.
- How do I test gRPC services?
- Use
grpcurl(the curl equivalent for gRPC) or Postman (which supports gRPC). For unit tests, usegrpc.experimental.channel_ready_futurewith an in-process channel, or mock the servicer directly without a network.