Python gRPC: Protocol Buffers and Streaming RPCs
gRPC is Google's open-source RPC framework that uses Protocol Buffers (protobuf) for schema definition and binary serialization. It's 5–10x faster than REST+JSON for service-to-service communication, supports four RPC types (unary, server streaming, client streaming, bidirectional streaming), and provides a strongly-typed contract shared between services in any language. This guide covers writing .proto files, generating Python code, implementing all four RPC types, and adding interceptors for logging and authentication.
Table of Contents
Protocol Buffer Definitions
Protocol Buffers (.proto files) define the service contract: the messages (data types) and the RPC methods. Protobuf uses a compact binary encoding that is roughly 3–10x smaller and 5–10x faster to parse than JSON. Fields are identified by integer tags, enabling backward and forward compatibility — adding new optional fields doesn't break existing clients or servers.
// user_service.proto
syntax = "proto3";
package techoral.users.v1;
option python_package = "generated.user_service_pb2";
// Enum types
enum UserRole {
USER_ROLE_UNSPECIFIED = 0;
USER_ROLE_ADMIN = 1;
USER_ROLE_EDITOR = 2;
USER_ROLE_VIEWER = 3;
}
// Message definitions
message User {
string id = 1;
string name = 2;
string email = 3;
UserRole role = 4;
int64 created_at = 5; // Unix timestamp
optional string bio = 6;
}
message GetUserRequest { string user_id = 1; }
message GetUserResponse { User user = 1; }
message ListUsersRequest {
int32 limit = 1;
int32 offset = 2;
string filter_role = 3;
}
message ListUsersResponse {
repeated User users = 1;
int32 total_count = 2;
}
message CreateUserRequest {
string name = 1;
string email = 2;
UserRole role = 3;
}
// Streaming messages
message EventStreamRequest { string user_id = 1; }
message UserEvent {
string event_type = 1;
string user_id = 2;
bytes payload = 3;
int64 timestamp = 4;
}
// Service definition
service UserService {
// Unary: single request → single response
rpc GetUser(GetUserRequest) returns (GetUserResponse);
rpc CreateUser(CreateUserRequest) returns (User);
// Server streaming: single request → multiple responses
rpc ListUsers(ListUsersRequest) returns (stream User);
// Client streaming: multiple requests → single response
rpc BulkCreateUsers(stream CreateUserRequest) returns (ListUsersResponse);
// Bidirectional streaming: multiple requests ↔ multiple responses
rpc WatchUserEvents(stream EventStreamRequest) returns (stream UserEvent);
}
Code Generation Setup
gRPC generates Python stubs from .proto files. The grpcio-tools package includes the protoc compiler. Generated files come in pairs: *_pb2.py (message classes) and *_pb2_grpc.py (service stubs and server base classes).
pip install grpcio grpcio-tools grpcio-reflection
# Generate Python code from proto
python -m grpc_tools.protoc \
-I ./proto \
--python_out=./generated \
--grpc_python_out=./generated \
--pyi_out=./generated \
./proto/user_service.proto
# This generates:
# generated/user_service_pb2.py — message classes
# generated/user_service_pb2_grpc.py — service stubs
# generated/user_service_pb2.pyi — type stubs for IDE support
# Project structure
"""
grpc_service/
├── proto/
│ └── user_service.proto
├── generated/
│ ├── user_service_pb2.py
│ ├── user_service_pb2_grpc.py
│ └── user_service_pb2.pyi
├── server.py
├── client.py
└── interceptors.py
"""
Unary RPC: Request-Response
Unary RPC is the simplest pattern — one request, one response, just like a regular function call. The server implements the service base class generated by protoc. For the synchronous server, each RPC method receives the request and a ServicerContext for sending metadata, setting status codes, and controlling the response lifecycle.
import grpc
from concurrent import futures
import time
from generated import user_service_pb2 as pb2
from generated import user_service_pb2_grpc as pb2_grpc
# In-memory store for demo
USERS = {
"1": {"id": "1", "name": "Alice", "email": "alice@techoral.com", "role": 1},
"2": {"id": "2", "name": "Bob", "email": "bob@techoral.com", "role": 2},
}
class UserServicer(pb2_grpc.UserServiceServicer):
"""Server implementation of UserService."""
def GetUser(self, request: pb2.GetUserRequest, context: grpc.ServicerContext) -> pb2.GetUserResponse:
user_data = USERS.get(request.user_id)
if not user_data:
context.set_code(grpc.StatusCode.NOT_FOUND)
context.set_details(f"User {request.user_id} not found")
return pb2.GetUserResponse()
user = pb2.User(**user_data, created_at=int(time.time()))
return pb2.GetUserResponse(user=user)
def CreateUser(self, request: pb2.CreateUserRequest, context: grpc.ServicerContext) -> pb2.User:
user_id = str(len(USERS) + 1)
user_data = {
"id": user_id,
"name": request.name,
"email": request.email,
"role": request.role,
}
USERS[user_id] = user_data
return pb2.User(**user_data, created_at=int(time.time()))
def serve():
server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
pb2_grpc.add_UserServiceServicer_to_server(UserServicer(), server)
server.add_insecure_port("[::]:50051")
server.start()
print("gRPC server running on :50051")
server.wait_for_termination()
# Client usage
def create_client() -> pb2_grpc.UserServiceStub:
channel = grpc.insecure_channel("localhost:50051")
return pb2_grpc.UserServiceStub(channel)
stub = create_client()
response = stub.GetUser(pb2.GetUserRequest(user_id="1"))
print(response.user.name) # Alice
Streaming RPCs
gRPC's streaming capabilities enable real-time data flows. Server streaming lets the server push multiple responses for a single request (useful for streaming logs, live data feeds). Bidirectional streaming enables full-duplex communication — both sides can send and receive independently, making it ideal for chat applications and real-time collaboration.
import time
from typing import Iterator
class UserServicer(pb2_grpc.UserServiceServicer):
# Server streaming: one request → multiple responses
def ListUsers(
self,
request: pb2.ListUsersRequest,
context: grpc.ServicerContext
) -> Iterator[pb2.User]:
"""Stream users one by one — efficient for large result sets."""
all_users = list(USERS.values())
# Apply role filter
if request.filter_role:
all_users = [u for u in all_users if u.get("role_name") == request.filter_role]
# Paginate
page = all_users[request.offset:request.offset + request.limit]
for user_data in page:
if context.is_active(): # check client is still connected
yield pb2.User(**user_data, created_at=int(time.time()))
time.sleep(0.01) # simulate DB fetch per user
# Client streaming: multiple requests → one response
def BulkCreateUsers(
self,
request_iterator: Iterator[pb2.CreateUserRequest],
context: grpc.ServicerContext
) -> pb2.ListUsersResponse:
"""Accept a stream of users and return the batch result."""
created = []
for req in request_iterator:
user_id = str(len(USERS) + len(created) + 1)
user = pb2.User(id=user_id, name=req.name, email=req.email,
role=req.role, created_at=int(time.time()))
USERS[user_id] = {"id": user_id, "name": req.name, "email": req.email}
created.append(user)
return pb2.ListUsersResponse(users=created, total_count=len(created))
# Bidirectional streaming
def WatchUserEvents(
self,
request_iterator: Iterator[pb2.EventStreamRequest],
context: grpc.ServicerContext
) -> Iterator[pb2.UserEvent]:
"""Real-time event feed — client sends subscriptions, server sends events."""
subscribed_users = set()
for req in request_iterator:
subscribed_users.add(req.user_id)
# Stream back simulated events for each subscribed user
for event_type in ["login", "profile_updated", "logout"]:
yield pb2.UserEvent(
event_type=event_type,
user_id=req.user_id,
timestamp=int(time.time()),
)
# Client-side streaming usage
def stream_users_example():
stub = create_client()
# Server streaming
for user in stub.ListUsers(pb2.ListUsersRequest(limit=10, offset=0)):
print(f"Received: {user.name}")
# Client streaming
requests = [
pb2.CreateUserRequest(name=f"User{i}", email=f"user{i}@techoral.com", role=2)
for i in range(5)
]
result = stub.BulkCreateUsers(iter(requests))
print(f"Created {result.total_count} users")
Interceptors: Middleware for gRPC
gRPC interceptors are the equivalent of HTTP middleware — they wrap RPC calls for cross-cutting concerns like authentication, logging, rate limiting, and metrics. Server interceptors receive the handler function and can modify the request, check authorization before calling the handler, or catch errors uniformly.
import grpc
import time
import logging
class LoggingInterceptor(grpc.ServerInterceptor):
def intercept_service(self, continuation, handler_call_details):
def logging_wrapper(request, context):
method = handler_call_details.method
start = time.time()
try:
response = continuation(handler_call_details)(request, context)
elapsed = (time.time() - start) * 1000
logging.info(f"gRPC {method} OK [{elapsed:.1f}ms]")
return response
except Exception as e:
logging.error(f"gRPC {method} FAILED: {e}")
raise
# Return a wrapper handler
handler = continuation(handler_call_details)
if handler and handler.unary_unary:
return grpc.unary_unary_rpc_method_handler(
logging_wrapper,
request_deserializer=handler.request_deserializer,
response_serializer=handler.response_serializer,
)
return handler
class AuthInterceptor(grpc.ServerInterceptor):
PUBLIC_METHODS = {"/techoral.users.v1.UserService/GetUser"}
def intercept_service(self, continuation, handler_call_details):
if handler_call_details.method in self.PUBLIC_METHODS:
return continuation(handler_call_details)
def auth_wrapper(request, context):
metadata = dict(context.invocation_metadata())
token = metadata.get("authorization", "").replace("bearer ", "")
if not token or not self._verify_token(token):
context.abort(grpc.StatusCode.UNAUTHENTICATED, "Invalid or missing token")
return None
return continuation(handler_call_details)(request, context)
handler = continuation(handler_call_details)
if handler and handler.unary_unary:
return grpc.unary_unary_rpc_method_handler(auth_wrapper,
request_deserializer=handler.request_deserializer,
response_serializer=handler.response_serializer)
return handler
def _verify_token(self, token: str) -> bool:
try:
import jwt
jwt.decode(token, "secret", algorithms=["HS256"])
return True
except Exception:
return False
# Apply interceptors
def serve_with_interceptors():
server = grpc.server(
futures.ThreadPoolExecutor(max_workers=10),
interceptors=[AuthInterceptor(), LoggingInterceptor()],
)
pb2_grpc.add_UserServiceServicer_to_server(UserServicer(), server)
server.add_insecure_port("[::]:50051")
server.start()
server.wait_for_termination()
Error Handling and Status Codes
gRPC has a rich set of canonical status codes that map well to HTTP: NOT_FOUND, INVALID_ARGUMENT, UNAUTHENTICATED, PERMISSION_DENIED, RESOURCE_EXHAUSTED (rate limit), INTERNAL, UNAVAILABLE, DEADLINE_EXCEEDED. Clients should handle these codes specifically to implement retry logic and user-friendly error messages.
import grpc
# Server: abort with status code
def GetUser(self, request, context):
if not request.user_id:
context.abort(grpc.StatusCode.INVALID_ARGUMENT, "user_id is required")
user = USERS.get(request.user_id)
if not user:
context.abort(grpc.StatusCode.NOT_FOUND, f"User {request.user_id} not found")
return pb2.GetUserResponse(user=pb2.User(**user))
# Client: handle errors
import grpc
from grpc import RpcError
def get_user_safe(user_id: str) -> dict | None:
stub = create_client()
try:
response = stub.GetUser(
pb2.GetUserRequest(user_id=user_id),
timeout=5.0, # seconds
metadata=[("authorization", "bearer my-token")],
)
return {"id": response.user.id, "name": response.user.name}
except RpcError as e:
code = e.code()
if code == grpc.StatusCode.NOT_FOUND:
return None
elif code == grpc.StatusCode.UNAUTHENTICATED:
raise PermissionError("Not authenticated")
elif code == grpc.StatusCode.DEADLINE_EXCEEDED:
raise TimeoutError("gRPC call timed out")
else:
raise RuntimeError(f"gRPC error {code}: {e.details()}")
# Retry with exponential backoff
import time
from functools import wraps
def grpc_retry(max_retries=3, retryable_codes=None):
retryable_codes = retryable_codes or {
grpc.StatusCode.UNAVAILABLE,
grpc.StatusCode.DEADLINE_EXCEEDED,
}
def decorator(fn):
@wraps(fn)
def wrapper(*args, **kwargs):
for attempt in range(max_retries):
try:
return fn(*args, **kwargs)
except RpcError as e:
if e.code() not in retryable_codes or attempt == max_retries - 1:
raise
delay = 0.1 * (2 ** attempt)
time.sleep(delay)
return wrapper
return decorator
@grpc_retry(max_retries=3)
def reliable_get_user(user_id: str):
stub = create_client()
return stub.GetUser(pb2.GetUserRequest(user_id=user_id), timeout=5.0)
TLS, Reflection, and Production
Production gRPC services need TLS for encryption, server reflection for tooling like grpcurl and Postman, health checks for load balancers, and connection pooling on the client side. These are all standard patterns in the gRPC ecosystem.
import grpc
from grpc_reflection.v1alpha import reflection
from grpc_health.v1 import health, health_pb2, health_pb2_grpc
# TLS server
def serve_with_tls():
with open("server.key", "rb") as f:
private_key = f.read()
with open("server.crt", "rb") as f:
certificate_chain = f.read()
credentials = grpc.ssl_server_credentials([(private_key, certificate_chain)])
server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
pb2_grpc.add_UserServiceServicer_to_server(UserServicer(), server)
# Add health check
health_servicer = health.HealthServicer()
health_pb2_grpc.add_HealthServicer_to_server(health_servicer, server)
health_servicer.set("UserService", health_pb2.HealthCheckResponse.SERVING)
# Add server reflection (enables grpcurl, Postman)
SERVICE_NAMES = (
pb2.DESCRIPTOR.services_by_name["UserService"].full_name,
reflection.SERVICE_NAME,
)
reflection.enable_server_reflection(SERVICE_NAMES, server)
server.add_secure_port("[::]:443", credentials)
server.start()
server.wait_for_termination()
# TLS client
def create_tls_client():
with open("ca.crt", "rb") as f:
trusted_certs = f.read()
credentials = grpc.ssl_channel_credentials(trusted_certs)
channel = grpc.secure_channel("api.techoral.com:443", credentials)
return pb2_grpc.UserServiceStub(channel)
# Test with grpcurl:
# grpcurl -plaintext localhost:50051 list
# grpcurl -plaintext -d '{"user_id":"1"}' localhost:50051 techoral.users.v1.UserService/GetUser