Python gRPC: Protocol Buffers and Streaming RPCs

gRPC is Google's open-source RPC framework that uses Protocol Buffers (protobuf) for schema definition and binary serialization. It's 5–10x faster than REST+JSON for service-to-service communication, supports four RPC types (unary, server streaming, client streaming, bidirectional streaming), and provides a strongly-typed contract shared between services in any language. This guide covers writing .proto files, generating Python code, implementing all four RPC types, and adding interceptors for logging and authentication.

Protocol Buffer Definitions

Protocol Buffers (.proto files) define the service contract: the messages (data types) and the RPC methods. Protobuf uses a compact binary encoding that is roughly 3–10x smaller and 5–10x faster to parse than JSON. Fields are identified by integer tags, enabling backward and forward compatibility — adding new optional fields doesn't break existing clients or servers.

// user_service.proto
syntax = "proto3";

package techoral.users.v1;

option python_package = "generated.user_service_pb2";

// Enum types
enum UserRole {
  USER_ROLE_UNSPECIFIED = 0;
  USER_ROLE_ADMIN = 1;
  USER_ROLE_EDITOR = 2;
  USER_ROLE_VIEWER = 3;
}

// Message definitions
message User {
  string id = 1;
  string name = 2;
  string email = 3;
  UserRole role = 4;
  int64 created_at = 5;  // Unix timestamp
  optional string bio = 6;
}

message GetUserRequest { string user_id = 1; }
message GetUserResponse { User user = 1; }

message ListUsersRequest {
  int32 limit = 1;
  int32 offset = 2;
  string filter_role = 3;
}
message ListUsersResponse {
  repeated User users = 1;
  int32 total_count = 2;
}

message CreateUserRequest {
  string name = 1;
  string email = 2;
  UserRole role = 3;
}

// Streaming messages
message EventStreamRequest { string user_id = 1; }
message UserEvent {
  string event_type = 1;
  string user_id = 2;
  bytes payload = 3;
  int64 timestamp = 4;
}

// Service definition
service UserService {
  // Unary: single request → single response
  rpc GetUser(GetUserRequest) returns (GetUserResponse);
  rpc CreateUser(CreateUserRequest) returns (User);

  // Server streaming: single request → multiple responses
  rpc ListUsers(ListUsersRequest) returns (stream User);

  // Client streaming: multiple requests → single response
  rpc BulkCreateUsers(stream CreateUserRequest) returns (ListUsersResponse);

  // Bidirectional streaming: multiple requests ↔ multiple responses
  rpc WatchUserEvents(stream EventStreamRequest) returns (stream UserEvent);
}

Code Generation Setup

gRPC generates Python stubs from .proto files. The grpcio-tools package includes the protoc compiler. Generated files come in pairs: *_pb2.py (message classes) and *_pb2_grpc.py (service stubs and server base classes).

pip install grpcio grpcio-tools grpcio-reflection

# Generate Python code from proto
python -m grpc_tools.protoc \
  -I ./proto \
  --python_out=./generated \
  --grpc_python_out=./generated \
  --pyi_out=./generated \
  ./proto/user_service.proto

# This generates:
# generated/user_service_pb2.py       — message classes
# generated/user_service_pb2_grpc.py  — service stubs
# generated/user_service_pb2.pyi      — type stubs for IDE support
# Project structure
"""
grpc_service/
├── proto/
│   └── user_service.proto
├── generated/
│   ├── user_service_pb2.py
│   ├── user_service_pb2_grpc.py
│   └── user_service_pb2.pyi
├── server.py
├── client.py
└── interceptors.py
"""

Unary RPC: Request-Response

Unary RPC is the simplest pattern — one request, one response, just like a regular function call. The server implements the service base class generated by protoc. For the synchronous server, each RPC method receives the request and a ServicerContext for sending metadata, setting status codes, and controlling the response lifecycle.

import grpc
from concurrent import futures
import time
from generated import user_service_pb2 as pb2
from generated import user_service_pb2_grpc as pb2_grpc

# In-memory store for demo
USERS = {
    "1": {"id": "1", "name": "Alice", "email": "alice@techoral.com", "role": 1},
    "2": {"id": "2", "name": "Bob",   "email": "bob@techoral.com",   "role": 2},
}

class UserServicer(pb2_grpc.UserServiceServicer):
    """Server implementation of UserService."""

    def GetUser(self, request: pb2.GetUserRequest, context: grpc.ServicerContext) -> pb2.GetUserResponse:
        user_data = USERS.get(request.user_id)
        if not user_data:
            context.set_code(grpc.StatusCode.NOT_FOUND)
            context.set_details(f"User {request.user_id} not found")
            return pb2.GetUserResponse()

        user = pb2.User(**user_data, created_at=int(time.time()))
        return pb2.GetUserResponse(user=user)

    def CreateUser(self, request: pb2.CreateUserRequest, context: grpc.ServicerContext) -> pb2.User:
        user_id = str(len(USERS) + 1)
        user_data = {
            "id": user_id,
            "name": request.name,
            "email": request.email,
            "role": request.role,
        }
        USERS[user_id] = user_data
        return pb2.User(**user_data, created_at=int(time.time()))

def serve():
    server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
    pb2_grpc.add_UserServiceServicer_to_server(UserServicer(), server)
    server.add_insecure_port("[::]:50051")
    server.start()
    print("gRPC server running on :50051")
    server.wait_for_termination()

# Client usage
def create_client() -> pb2_grpc.UserServiceStub:
    channel = grpc.insecure_channel("localhost:50051")
    return pb2_grpc.UserServiceStub(channel)

stub = create_client()
response = stub.GetUser(pb2.GetUserRequest(user_id="1"))
print(response.user.name)  # Alice

Streaming RPCs

gRPC's streaming capabilities enable real-time data flows. Server streaming lets the server push multiple responses for a single request (useful for streaming logs, live data feeds). Bidirectional streaming enables full-duplex communication — both sides can send and receive independently, making it ideal for chat applications and real-time collaboration.

import time
from typing import Iterator

class UserServicer(pb2_grpc.UserServiceServicer):
    # Server streaming: one request → multiple responses
    def ListUsers(
        self,
        request: pb2.ListUsersRequest,
        context: grpc.ServicerContext
    ) -> Iterator[pb2.User]:
        """Stream users one by one — efficient for large result sets."""
        all_users = list(USERS.values())
        # Apply role filter
        if request.filter_role:
            all_users = [u for u in all_users if u.get("role_name") == request.filter_role]
        # Paginate
        page = all_users[request.offset:request.offset + request.limit]
        for user_data in page:
            if context.is_active():  # check client is still connected
                yield pb2.User(**user_data, created_at=int(time.time()))
                time.sleep(0.01)  # simulate DB fetch per user

    # Client streaming: multiple requests → one response
    def BulkCreateUsers(
        self,
        request_iterator: Iterator[pb2.CreateUserRequest],
        context: grpc.ServicerContext
    ) -> pb2.ListUsersResponse:
        """Accept a stream of users and return the batch result."""
        created = []
        for req in request_iterator:
            user_id = str(len(USERS) + len(created) + 1)
            user = pb2.User(id=user_id, name=req.name, email=req.email,
                           role=req.role, created_at=int(time.time()))
            USERS[user_id] = {"id": user_id, "name": req.name, "email": req.email}
            created.append(user)
        return pb2.ListUsersResponse(users=created, total_count=len(created))

    # Bidirectional streaming
    def WatchUserEvents(
        self,
        request_iterator: Iterator[pb2.EventStreamRequest],
        context: grpc.ServicerContext
    ) -> Iterator[pb2.UserEvent]:
        """Real-time event feed — client sends subscriptions, server sends events."""
        subscribed_users = set()
        for req in request_iterator:
            subscribed_users.add(req.user_id)
            # Stream back simulated events for each subscribed user
            for event_type in ["login", "profile_updated", "logout"]:
                yield pb2.UserEvent(
                    event_type=event_type,
                    user_id=req.user_id,
                    timestamp=int(time.time()),
                )

# Client-side streaming usage
def stream_users_example():
    stub = create_client()
    # Server streaming
    for user in stub.ListUsers(pb2.ListUsersRequest(limit=10, offset=0)):
        print(f"Received: {user.name}")

    # Client streaming
    requests = [
        pb2.CreateUserRequest(name=f"User{i}", email=f"user{i}@techoral.com", role=2)
        for i in range(5)
    ]
    result = stub.BulkCreateUsers(iter(requests))
    print(f"Created {result.total_count} users")

Interceptors: Middleware for gRPC

gRPC interceptors are the equivalent of HTTP middleware — they wrap RPC calls for cross-cutting concerns like authentication, logging, rate limiting, and metrics. Server interceptors receive the handler function and can modify the request, check authorization before calling the handler, or catch errors uniformly.

import grpc
import time
import logging

class LoggingInterceptor(grpc.ServerInterceptor):
    def intercept_service(self, continuation, handler_call_details):
        def logging_wrapper(request, context):
            method = handler_call_details.method
            start = time.time()
            try:
                response = continuation(handler_call_details)(request, context)
                elapsed = (time.time() - start) * 1000
                logging.info(f"gRPC {method} OK [{elapsed:.1f}ms]")
                return response
            except Exception as e:
                logging.error(f"gRPC {method} FAILED: {e}")
                raise
        # Return a wrapper handler
        handler = continuation(handler_call_details)
        if handler and handler.unary_unary:
            return grpc.unary_unary_rpc_method_handler(
                logging_wrapper,
                request_deserializer=handler.request_deserializer,
                response_serializer=handler.response_serializer,
            )
        return handler

class AuthInterceptor(grpc.ServerInterceptor):
    PUBLIC_METHODS = {"/techoral.users.v1.UserService/GetUser"}

    def intercept_service(self, continuation, handler_call_details):
        if handler_call_details.method in self.PUBLIC_METHODS:
            return continuation(handler_call_details)

        def auth_wrapper(request, context):
            metadata = dict(context.invocation_metadata())
            token = metadata.get("authorization", "").replace("bearer ", "")
            if not token or not self._verify_token(token):
                context.abort(grpc.StatusCode.UNAUTHENTICATED, "Invalid or missing token")
                return None
            return continuation(handler_call_details)(request, context)

        handler = continuation(handler_call_details)
        if handler and handler.unary_unary:
            return grpc.unary_unary_rpc_method_handler(auth_wrapper,
                request_deserializer=handler.request_deserializer,
                response_serializer=handler.response_serializer)
        return handler

    def _verify_token(self, token: str) -> bool:
        try:
            import jwt
            jwt.decode(token, "secret", algorithms=["HS256"])
            return True
        except Exception:
            return False

# Apply interceptors
def serve_with_interceptors():
    server = grpc.server(
        futures.ThreadPoolExecutor(max_workers=10),
        interceptors=[AuthInterceptor(), LoggingInterceptor()],
    )
    pb2_grpc.add_UserServiceServicer_to_server(UserServicer(), server)
    server.add_insecure_port("[::]:50051")
    server.start()
    server.wait_for_termination()

Error Handling and Status Codes

gRPC has a rich set of canonical status codes that map well to HTTP: NOT_FOUND, INVALID_ARGUMENT, UNAUTHENTICATED, PERMISSION_DENIED, RESOURCE_EXHAUSTED (rate limit), INTERNAL, UNAVAILABLE, DEADLINE_EXCEEDED. Clients should handle these codes specifically to implement retry logic and user-friendly error messages.

import grpc

# Server: abort with status code
def GetUser(self, request, context):
    if not request.user_id:
        context.abort(grpc.StatusCode.INVALID_ARGUMENT, "user_id is required")
    user = USERS.get(request.user_id)
    if not user:
        context.abort(grpc.StatusCode.NOT_FOUND, f"User {request.user_id} not found")
    return pb2.GetUserResponse(user=pb2.User(**user))

# Client: handle errors
import grpc
from grpc import RpcError

def get_user_safe(user_id: str) -> dict | None:
    stub = create_client()
    try:
        response = stub.GetUser(
            pb2.GetUserRequest(user_id=user_id),
            timeout=5.0,  # seconds
            metadata=[("authorization", "bearer my-token")],
        )
        return {"id": response.user.id, "name": response.user.name}
    except RpcError as e:
        code = e.code()
        if code == grpc.StatusCode.NOT_FOUND:
            return None
        elif code == grpc.StatusCode.UNAUTHENTICATED:
            raise PermissionError("Not authenticated")
        elif code == grpc.StatusCode.DEADLINE_EXCEEDED:
            raise TimeoutError("gRPC call timed out")
        else:
            raise RuntimeError(f"gRPC error {code}: {e.details()}")

# Retry with exponential backoff
import time
from functools import wraps

def grpc_retry(max_retries=3, retryable_codes=None):
    retryable_codes = retryable_codes or {
        grpc.StatusCode.UNAVAILABLE,
        grpc.StatusCode.DEADLINE_EXCEEDED,
    }
    def decorator(fn):
        @wraps(fn)
        def wrapper(*args, **kwargs):
            for attempt in range(max_retries):
                try:
                    return fn(*args, **kwargs)
                except RpcError as e:
                    if e.code() not in retryable_codes or attempt == max_retries - 1:
                        raise
                    delay = 0.1 * (2 ** attempt)
                    time.sleep(delay)
        return wrapper
    return decorator

@grpc_retry(max_retries=3)
def reliable_get_user(user_id: str):
    stub = create_client()
    return stub.GetUser(pb2.GetUserRequest(user_id=user_id), timeout=5.0)

TLS, Reflection, and Production

Production gRPC services need TLS for encryption, server reflection for tooling like grpcurl and Postman, health checks for load balancers, and connection pooling on the client side. These are all standard patterns in the gRPC ecosystem.

import grpc
from grpc_reflection.v1alpha import reflection
from grpc_health.v1 import health, health_pb2, health_pb2_grpc

# TLS server
def serve_with_tls():
    with open("server.key", "rb") as f:
        private_key = f.read()
    with open("server.crt", "rb") as f:
        certificate_chain = f.read()

    credentials = grpc.ssl_server_credentials([(private_key, certificate_chain)])
    server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
    pb2_grpc.add_UserServiceServicer_to_server(UserServicer(), server)

    # Add health check
    health_servicer = health.HealthServicer()
    health_pb2_grpc.add_HealthServicer_to_server(health_servicer, server)
    health_servicer.set("UserService", health_pb2.HealthCheckResponse.SERVING)

    # Add server reflection (enables grpcurl, Postman)
    SERVICE_NAMES = (
        pb2.DESCRIPTOR.services_by_name["UserService"].full_name,
        reflection.SERVICE_NAME,
    )
    reflection.enable_server_reflection(SERVICE_NAMES, server)

    server.add_secure_port("[::]:443", credentials)
    server.start()
    server.wait_for_termination()

# TLS client
def create_tls_client():
    with open("ca.crt", "rb") as f:
        trusted_certs = f.read()
    credentials = grpc.ssl_channel_credentials(trusted_certs)
    channel = grpc.secure_channel("api.techoral.com:443", credentials)
    return pb2_grpc.UserServiceStub(channel)

# Test with grpcurl:
# grpcurl -plaintext localhost:50051 list
# grpcurl -plaintext -d '{"user_id":"1"}' localhost:50051 techoral.users.v1.UserService/GetUser
gRPC vs REST: gRPC excels for internal service-to-service communication (microservices), real-time streaming, and polyglot environments where one .proto file generates clients in Go, Java, Python, and TypeScript simultaneously. REST is better for public APIs where browser access and human readability matter. Many teams use both: REST for external APIs, gRPC internally.