Python Elasticsearch: Indexing and Full-Text Search

Elasticsearch is the go-to engine for full-text search, log analytics, and vector search. The official elasticsearch-py client provides both a synchronous and async API with a Pythonic interface for indexing, querying, aggregating, and managing indices. This guide covers index mapping, bulk indexing large datasets, multi-field search queries, aggregations, and integrating Elasticsearch search into a FastAPI service.

Installation and Connection

pip install elasticsearch[async]
from elasticsearch import Elasticsearch, AsyncElasticsearch
import os

# Basic auth (Elastic Cloud or self-hosted with security)
es = Elasticsearch(
    hosts=["https://localhost:9200"],
    basic_auth=("elastic", os.environ["ES_PASSWORD"]),
    ca_certs="/path/to/ca.crt",
    verify_certs=True,
)

# Elastic Cloud
es_cloud = Elasticsearch(
    cloud_id=os.environ["ELASTIC_CLOUD_ID"],
    api_key=os.environ["ELASTIC_API_KEY"],
)

# Local development (no auth)
es_dev = Elasticsearch("http://localhost:9200")

# Health check
info = es.info()
print(f"Elasticsearch {info['version']['number']} connected")
print(f"Cluster: {info['cluster_name']}")

Index Mapping and Settings

Define explicit mappings to control how fields are analyzed, tokenized, and stored. Without explicit mapping Elasticsearch infers types (dynamic mapping) which often produces suboptimal results for production search.

PRODUCTS_MAPPING = {
    "settings": {
        "number_of_shards": 3,
        "number_of_replicas": 1,
        "analysis": {
            "analyzer": {
                "product_analyzer": {
                    "type": "custom",
                    "tokenizer": "standard",
                    "filter": ["lowercase", "asciifolding", "stop", "snowball"],
                }
            }
        },
    },
    "mappings": {
        "properties": {
            "id":          {"type": "keyword"},
            "name":        {"type": "text", "analyzer": "product_analyzer", "fields": {"raw": {"type": "keyword"}}},
            "description": {"type": "text", "analyzer": "product_analyzer"},
            "category":    {"type": "keyword"},
            "brand":       {"type": "keyword"},
            "price":       {"type": "float"},
            "stock":       {"type": "integer"},
            "tags":        {"type": "keyword"},
            "created_at":  {"type": "date"},
            "embedding":   {"type": "dense_vector", "dims": 768, "index": True, "similarity": "cosine"},
        }
    },
}

def create_index(es: Elasticsearch, index: str):
    if not es.indices.exists(index=index):
        es.indices.create(index=index, body=PRODUCTS_MAPPING)
        print(f"Created index: {index}")
    else:
        print(f"Index {index} already exists")

create_index(es, "products")

Indexing Documents

from datetime import datetime, timezone

# Index a single document
doc = {
    "id": "prod-001",
    "name": "Mechanical Gaming Keyboard",
    "description": "RGB backlit mechanical keyboard with Cherry MX switches, N-key rollover",
    "category": "electronics",
    "brand": "TechGear",
    "price": 149.99,
    "stock": 250,
    "tags": ["keyboard", "gaming", "mechanical", "rgb"],
    "created_at": datetime.now(timezone.utc).isoformat(),
}

result = es.index(index="products", id=doc["id"], document=doc)
print(f"Indexed: {result['result']} — id={result['_id']}")

# Update a document (partial)
es.update(index="products", id="prod-001", doc={"price": 139.99, "stock": 245})

# Delete a document
es.delete(index="products", id="prod-001")

# Check existence
exists = es.exists(index="products", id="prod-001")

Bulk Indexing with helpers

For indexing thousands of documents, use elasticsearch.helpers.bulk() or streaming_bulk(). Bulk operations are 50–100x faster than individual index calls because they batch operations into single HTTP requests.

from elasticsearch.helpers import bulk, streaming_bulk
from typing import Generator


def generate_products(products: list[dict]) -> Generator:
    for product in products:
        yield {
            "_index": "products",
            "_id": product["id"],
            "_source": product,
        }


# Bulk index — loads all into memory, returns summary
products = [
    {"id": f"prod-{i:04d}", "name": f"Product {i}", "price": i * 9.99,
     "category": "electronics", "stock": 100, "created_at": "2026-06-14"}
    for i in range(1, 10001)
]

success, failed = bulk(
    es,
    generate_products(products),
    chunk_size=500,       # documents per bulk request
    request_timeout=60,
    raise_on_error=False,
)
print(f"Indexed: {success} | Failed: {len(failed)}")


# Streaming bulk — handles errors per-chunk without loading all into memory
for ok, result in streaming_bulk(
    es,
    generate_products(products),
    chunk_size=500,
    raise_on_error=False,
):
    if not ok:
        print(f"Failed to index: {result}")

Elasticsearch's Query DSL is a JSON structure. The most common patterns are multi-match for full-text search, bool queries for combining conditions, and term/range for exact filtering.

def search_products(
    es: Elasticsearch,
    query: str,
    category: str | None = None,
    min_price: float | None = None,
    max_price: float | None = None,
    page: int = 1,
    size: int = 20,
) -> dict:
    must = []
    filter_clauses = []

    if query:
        must.append({
            "multi_match": {
                "query": query,
                "fields": ["name^3", "description^1", "tags^2"],
                "type": "best_fields",
                "fuzziness": "AUTO",
            }
        })
    else:
        must.append({"match_all": {}})

    if category:
        filter_clauses.append({"term": {"category": category}})

    if min_price is not None or max_price is not None:
        price_range = {}
        if min_price is not None: price_range["gte"] = min_price
        if max_price is not None: price_range["lte"] = max_price
        filter_clauses.append({"range": {"price": price_range}})

    body = {
        "query": {"bool": {"must": must, "filter": filter_clauses}},
        "from": (page - 1) * size,
        "size": size,
        "sort": [{"_score": "desc"}, {"price": "asc"}],
        "highlight": {
            "fields": {"name": {}, "description": {"fragment_size": 150, "number_of_fragments": 2}}
        },
        "_source": ["id", "name", "price", "category", "brand", "stock"],
    }

    response = es.search(index="products", body=body)
    hits = response["hits"]
    return {
        "total": hits["total"]["value"],
        "page": page,
        "size": size,
        "results": [
            {**h["_source"], "score": h["_score"],
             "highlights": h.get("highlight", {})}
            for h in hits["hits"]
        ],
    }


results = search_products(es, "mechanical keyboard", category="electronics", max_price=200)
print(f"Found {results['total']} products")

Aggregations

Aggregations compute analytics on search results — counts, averages, histograms, and nested bucket groupings. They run server-side over matching documents and return summaries without streaming all documents.

def get_product_facets(es: Elasticsearch, query: str = "") -> dict:
    body = {
        "query": {"multi_match": {"query": query, "fields": ["name", "description"]}} if query else {"match_all": {}},
        "size": 0,   # don't return documents — only aggregations
        "aggs": {
            "by_category": {
                "terms": {"field": "category", "size": 20}
            },
            "by_brand": {
                "terms": {"field": "brand", "size": 10}
            },
            "price_stats": {
                "stats": {"field": "price"}  # min, max, avg, sum, count
            },
            "price_histogram": {
                "histogram": {"field": "price", "interval": 50}
            },
            "in_stock": {
                "filter": {"range": {"stock": {"gt": 0}}}
            },
        },
    }

    response = es.search(index="products", body=body)
    aggs = response["aggregations"]

    return {
        "categories": [
            {"name": b["key"], "count": b["doc_count"]}
            for b in aggs["by_category"]["buckets"]
        ],
        "brands": [
            {"name": b["key"], "count": b["doc_count"]}
            for b in aggs["by_brand"]["buckets"]
        ],
        "price_range": {
            "min": aggs["price_stats"]["min"],
            "max": aggs["price_stats"]["max"],
            "avg": round(aggs["price_stats"]["avg"], 2),
        },
        "in_stock_count": aggs["in_stock"]["doc_count"],
    }

Async Client with FastAPI

from contextlib import asynccontextmanager
from fastapi import FastAPI, Query
from elasticsearch import AsyncElasticsearch
import os

_es: AsyncElasticsearch | None = None

@asynccontextmanager
async def lifespan(app: FastAPI):
    global _es
    _es = AsyncElasticsearch(
        cloud_id=os.environ["ELASTIC_CLOUD_ID"],
        api_key=os.environ["ELASTIC_API_KEY"],
    )
    yield
    await _es.close()

app = FastAPI(lifespan=lifespan)

@app.get("/search")
async def search(
    q: str = Query(..., min_length=1),
    category: str | None = None,
    page: int = Query(1, ge=1),
):
    body = {
        "query": {
            "bool": {
                "must": [{"multi_match": {"query": q, "fields": ["name^3", "description"]}}],
                "filter": [{"term": {"category": category}}] if category else [],
            }
        },
        "from": (page - 1) * 20,
        "size": 20,
    }
    response = await _es.search(index="products", body=body)
    return {
        "total": response["hits"]["total"]["value"],
        "results": [h["_source"] for h in response["hits"]["hits"]],
    }

Frequently Asked Questions

When should I use Elasticsearch vs PostgreSQL full-text search?
Use Elasticsearch when you need relevance ranking, fuzzy matching, multi-language analyzers, autocomplete, aggregations over millions of documents, or vector/semantic search. PostgreSQL's tsvector is excellent for simple full-text search on structured data already in Postgres — it avoids a separate service and has good performance for <10M rows.
How do I keep Elasticsearch in sync with my database?
Common patterns: (1) dual-write — write to DB and ES in the same request handler; (2) change data capture (CDC) — Debezium reads Postgres WAL and publishes to Kafka, a consumer syncs to ES; (3) scheduled reindex — a batch job periodically rebuilds the ES index. Dual-write is simplest; CDC is most reliable at scale.
What is the performance impact of many aggregation fields?
Aggregations run on doc values (columnar storage), not the inverted index. They are fast even over millions of documents. Cardinality aggregations on high-cardinality keyword fields can be memory-intensive — use execution_hint: map for small sets or precision_threshold to trade accuracy for memory.