Python Elasticsearch: Indexing and Full-Text Search
Elasticsearch is the go-to engine for full-text search, log analytics, and vector search. The official elasticsearch-py client provides both a synchronous and async API with a Pythonic interface for indexing, querying, aggregating, and managing indices. This guide covers index mapping, bulk indexing large datasets, multi-field search queries, aggregations, and integrating Elasticsearch search into a FastAPI service.
Table of Contents
Installation and Connection
pip install elasticsearch[async]
from elasticsearch import Elasticsearch, AsyncElasticsearch
import os
# Basic auth (Elastic Cloud or self-hosted with security)
es = Elasticsearch(
hosts=["https://localhost:9200"],
basic_auth=("elastic", os.environ["ES_PASSWORD"]),
ca_certs="/path/to/ca.crt",
verify_certs=True,
)
# Elastic Cloud
es_cloud = Elasticsearch(
cloud_id=os.environ["ELASTIC_CLOUD_ID"],
api_key=os.environ["ELASTIC_API_KEY"],
)
# Local development (no auth)
es_dev = Elasticsearch("http://localhost:9200")
# Health check
info = es.info()
print(f"Elasticsearch {info['version']['number']} connected")
print(f"Cluster: {info['cluster_name']}")
Index Mapping and Settings
Define explicit mappings to control how fields are analyzed, tokenized, and stored. Without explicit mapping Elasticsearch infers types (dynamic mapping) which often produces suboptimal results for production search.
PRODUCTS_MAPPING = {
"settings": {
"number_of_shards": 3,
"number_of_replicas": 1,
"analysis": {
"analyzer": {
"product_analyzer": {
"type": "custom",
"tokenizer": "standard",
"filter": ["lowercase", "asciifolding", "stop", "snowball"],
}
}
},
},
"mappings": {
"properties": {
"id": {"type": "keyword"},
"name": {"type": "text", "analyzer": "product_analyzer", "fields": {"raw": {"type": "keyword"}}},
"description": {"type": "text", "analyzer": "product_analyzer"},
"category": {"type": "keyword"},
"brand": {"type": "keyword"},
"price": {"type": "float"},
"stock": {"type": "integer"},
"tags": {"type": "keyword"},
"created_at": {"type": "date"},
"embedding": {"type": "dense_vector", "dims": 768, "index": True, "similarity": "cosine"},
}
},
}
def create_index(es: Elasticsearch, index: str):
if not es.indices.exists(index=index):
es.indices.create(index=index, body=PRODUCTS_MAPPING)
print(f"Created index: {index}")
else:
print(f"Index {index} already exists")
create_index(es, "products")
Indexing Documents
from datetime import datetime, timezone
# Index a single document
doc = {
"id": "prod-001",
"name": "Mechanical Gaming Keyboard",
"description": "RGB backlit mechanical keyboard with Cherry MX switches, N-key rollover",
"category": "electronics",
"brand": "TechGear",
"price": 149.99,
"stock": 250,
"tags": ["keyboard", "gaming", "mechanical", "rgb"],
"created_at": datetime.now(timezone.utc).isoformat(),
}
result = es.index(index="products", id=doc["id"], document=doc)
print(f"Indexed: {result['result']} — id={result['_id']}")
# Update a document (partial)
es.update(index="products", id="prod-001", doc={"price": 139.99, "stock": 245})
# Delete a document
es.delete(index="products", id="prod-001")
# Check existence
exists = es.exists(index="products", id="prod-001")
Bulk Indexing with helpers
For indexing thousands of documents, use elasticsearch.helpers.bulk() or streaming_bulk(). Bulk operations are 50–100x faster than individual index calls because they batch operations into single HTTP requests.
from elasticsearch.helpers import bulk, streaming_bulk
from typing import Generator
def generate_products(products: list[dict]) -> Generator:
for product in products:
yield {
"_index": "products",
"_id": product["id"],
"_source": product,
}
# Bulk index — loads all into memory, returns summary
products = [
{"id": f"prod-{i:04d}", "name": f"Product {i}", "price": i * 9.99,
"category": "electronics", "stock": 100, "created_at": "2026-06-14"}
for i in range(1, 10001)
]
success, failed = bulk(
es,
generate_products(products),
chunk_size=500, # documents per bulk request
request_timeout=60,
raise_on_error=False,
)
print(f"Indexed: {success} | Failed: {len(failed)}")
# Streaming bulk — handles errors per-chunk without loading all into memory
for ok, result in streaming_bulk(
es,
generate_products(products),
chunk_size=500,
raise_on_error=False,
):
if not ok:
print(f"Failed to index: {result}")
Search Queries
Elasticsearch's Query DSL is a JSON structure. The most common patterns are multi-match for full-text search, bool queries for combining conditions, and term/range for exact filtering.
def search_products(
es: Elasticsearch,
query: str,
category: str | None = None,
min_price: float | None = None,
max_price: float | None = None,
page: int = 1,
size: int = 20,
) -> dict:
must = []
filter_clauses = []
if query:
must.append({
"multi_match": {
"query": query,
"fields": ["name^3", "description^1", "tags^2"],
"type": "best_fields",
"fuzziness": "AUTO",
}
})
else:
must.append({"match_all": {}})
if category:
filter_clauses.append({"term": {"category": category}})
if min_price is not None or max_price is not None:
price_range = {}
if min_price is not None: price_range["gte"] = min_price
if max_price is not None: price_range["lte"] = max_price
filter_clauses.append({"range": {"price": price_range}})
body = {
"query": {"bool": {"must": must, "filter": filter_clauses}},
"from": (page - 1) * size,
"size": size,
"sort": [{"_score": "desc"}, {"price": "asc"}],
"highlight": {
"fields": {"name": {}, "description": {"fragment_size": 150, "number_of_fragments": 2}}
},
"_source": ["id", "name", "price", "category", "brand", "stock"],
}
response = es.search(index="products", body=body)
hits = response["hits"]
return {
"total": hits["total"]["value"],
"page": page,
"size": size,
"results": [
{**h["_source"], "score": h["_score"],
"highlights": h.get("highlight", {})}
for h in hits["hits"]
],
}
results = search_products(es, "mechanical keyboard", category="electronics", max_price=200)
print(f"Found {results['total']} products")
Aggregations
Aggregations compute analytics on search results — counts, averages, histograms, and nested bucket groupings. They run server-side over matching documents and return summaries without streaming all documents.
def get_product_facets(es: Elasticsearch, query: str = "") -> dict:
body = {
"query": {"multi_match": {"query": query, "fields": ["name", "description"]}} if query else {"match_all": {}},
"size": 0, # don't return documents — only aggregations
"aggs": {
"by_category": {
"terms": {"field": "category", "size": 20}
},
"by_brand": {
"terms": {"field": "brand", "size": 10}
},
"price_stats": {
"stats": {"field": "price"} # min, max, avg, sum, count
},
"price_histogram": {
"histogram": {"field": "price", "interval": 50}
},
"in_stock": {
"filter": {"range": {"stock": {"gt": 0}}}
},
},
}
response = es.search(index="products", body=body)
aggs = response["aggregations"]
return {
"categories": [
{"name": b["key"], "count": b["doc_count"]}
for b in aggs["by_category"]["buckets"]
],
"brands": [
{"name": b["key"], "count": b["doc_count"]}
for b in aggs["by_brand"]["buckets"]
],
"price_range": {
"min": aggs["price_stats"]["min"],
"max": aggs["price_stats"]["max"],
"avg": round(aggs["price_stats"]["avg"], 2),
},
"in_stock_count": aggs["in_stock"]["doc_count"],
}
Async Client with FastAPI
from contextlib import asynccontextmanager
from fastapi import FastAPI, Query
from elasticsearch import AsyncElasticsearch
import os
_es: AsyncElasticsearch | None = None
@asynccontextmanager
async def lifespan(app: FastAPI):
global _es
_es = AsyncElasticsearch(
cloud_id=os.environ["ELASTIC_CLOUD_ID"],
api_key=os.environ["ELASTIC_API_KEY"],
)
yield
await _es.close()
app = FastAPI(lifespan=lifespan)
@app.get("/search")
async def search(
q: str = Query(..., min_length=1),
category: str | None = None,
page: int = Query(1, ge=1),
):
body = {
"query": {
"bool": {
"must": [{"multi_match": {"query": q, "fields": ["name^3", "description"]}}],
"filter": [{"term": {"category": category}}] if category else [],
}
},
"from": (page - 1) * 20,
"size": 20,
}
response = await _es.search(index="products", body=body)
return {
"total": response["hits"]["total"]["value"],
"results": [h["_source"] for h in response["hits"]["hits"]],
}
Frequently Asked Questions
- When should I use Elasticsearch vs PostgreSQL full-text search?
- Use Elasticsearch when you need relevance ranking, fuzzy matching, multi-language analyzers, autocomplete, aggregations over millions of documents, or vector/semantic search. PostgreSQL's
tsvectoris excellent for simple full-text search on structured data already in Postgres — it avoids a separate service and has good performance for <10M rows. - How do I keep Elasticsearch in sync with my database?
- Common patterns: (1) dual-write — write to DB and ES in the same request handler; (2) change data capture (CDC) — Debezium reads Postgres WAL and publishes to Kafka, a consumer syncs to ES; (3) scheduled reindex — a batch job periodically rebuilds the ES index. Dual-write is simplest; CDC is most reliable at scale.
- What is the performance impact of many aggregation fields?
- Aggregations run on doc values (columnar storage), not the inverted index. They are fast even over millions of documents. Cardinality aggregations on high-cardinality keyword fields can be memory-intensive — use
execution_hint: mapfor small sets orprecision_thresholdto trade accuracy for memory.