AWS OpenSearch Service: Search, Analytics and Log Management (2026)

AWS OpenSearch Service — Search and Analytics

AWS OpenSearch Service is a fully managed, open-source search and analytics engine built on OpenSearch (the Apache-licensed fork of Elasticsearch 7.10). It handles petabyte-scale log analytics, sub-second full-text search, and real-time application monitoring — without the operational overhead of running your own cluster. This guide covers everything: domain creation, index design, the full REST query DSL, Python and Java SDK integration, Kinesis-powered ingestion pipelines, ALB/CloudFront log analytics, OpenSearch Dashboards, fine-grained access control, and storage-tier cost optimisation.

1. OpenSearch vs Elasticsearch — History and Why AWS Forked It

Elasticsearch was created by Elastic NV in 2010 and quickly became the standard for distributed full-text search and log analytics. For years, Amazon ran a managed Elasticsearch Service, contributing patches and providing enterprise-grade availability. In January 2021, Elastic re-licensed Elasticsearch and Kibana from Apache 2.0 to the Server Side Public License (SSPL) — a copyleft licence that prevents cloud providers from offering the software as a service without open-sourcing their entire platform.

AWS responded in April 2021 by forking Elasticsearch 7.10 (the last Apache 2.0 release) and creating the OpenSearch Project under the Apache 2.0 licence. The project is governed by a community including AWS, SAP, Aryn, and others. Today OpenSearch and OpenSearch Dashboards (the Kibana fork) are at version 2.x, with active development adding k-NN vector search, SQL and PPL query languages, anomaly detection, alerting, index management policies (ISM), and a neural search plugin for semantic search via ML models.

Key differences from Elasticsearch today:
  • OpenSearch 2.x has diverged meaningfully — cross-cluster replication, point-in-time (PIT) search, and new ML APIs differ from Elastic's versions.
  • AWS OpenSearch Service supports OpenSearch 1.x and 2.x as well as the legacy Elasticsearch 7.x and 6.x for migrations.
  • The managed service handles patching, node replacement, cross-zone replica placement, automated snapshots, and VPC integration — saving 40–60% of operational engineering time versus self-managed clusters.

The managed service benefits are significant: you get automated daily snapshots to S3, blue/green deployments for version upgrades (zero-downtime rolling restart), UltraWarm and Cold Storage tiers for dramatic cost reduction on historical data, and native integrations with Kinesis Data Firehose, CloudWatch Logs, and IAM — all without touching a single systemd unit file.

2. Creating a Domain — Console, CLI, and Terraform

An OpenSearch domain is your managed cluster. Choose instance type and count, storage, and network settings at creation. Production domains should always span three Availability Zones with dedicated master nodes.

AWS CLI — create a production domain

aws opensearch create-domain \
  --domain-name techoral-search \
  --engine-version OpenSearch_2.11 \
  --cluster-config '{
    "InstanceType": "r6g.large.search",
    "InstanceCount": 3,
    "DedicatedMasterEnabled": true,
    "DedicatedMasterType": "c6g.large.search",
    "DedicatedMasterCount": 3,
    "MultiAZWithStandbyEnabled": false,
    "ZoneAwarenessEnabled": true,
    "ZoneAwarenessConfig": {"AvailabilityZoneCount": 3}
  }' \
  --ebs-options '{
    "EBSEnabled": true,
    "VolumeType": "gp3",
    "VolumeSize": 100,
    "Iops": 3000,
    "Throughput": 125
  }' \
  --node-to-node-encryption-options '{"Enabled": true}' \
  --encryption-at-rest-options '{"Enabled": true}' \
  --domain-endpoint-options '{"EnforceHTTPS": true, "TLSSecurityPolicy": "Policy-Min-TLS-1-2-2019-07"}' \
  --advanced-security-options '{
    "Enabled": true,
    "InternalUserDatabaseEnabled": true,
    "MasterUserOptions": {"MasterUserName": "admin", "MasterUserPassword": "Str0ng!Pass"}
  }' \
  --region us-east-1
Instance type guide: Use r6g (memory-optimised Graviton2) for log analytics and aggregation-heavy workloads. Use c6g for compute-intensive indexing pipelines. For small dev domains, t3.small.search is the cheapest but does not support UltraWarm or fine-grained access control. Always enable encryption-at-rest and node-to-node encryption — they add negligible latency and are required for compliance (HIPAA, PCI).

Terraform — production-ready module

resource "aws_opensearch_domain" "techoral" {
  domain_name    = "techoral-search"
  engine_version = "OpenSearch_2.11"

  cluster_config {
    instance_type            = "r6g.large.search"
    instance_count           = 3
    dedicated_master_enabled = true
    dedicated_master_type    = "c6g.large.search"
    dedicated_master_count   = 3
    zone_awareness_enabled   = true
    zone_awareness_config {
      availability_zone_count = 3
    }
  }

  ebs_options {
    ebs_enabled = true
    volume_type = "gp3"
    volume_size = 100
    iops        = 3000
    throughput  = 125
  }

  encrypt_at_rest { enabled = true }
  node_to_node_encryption { enabled = true }

  domain_endpoint_options {
    enforce_https       = true
    tls_security_policy = "Policy-Min-TLS-1-2-2019-07"
  }

  advanced_security_options {
    enabled                        = true
    internal_user_database_enabled = true
    master_user_options {
      master_user_name     = var.os_master_user
      master_user_password = var.os_master_password
    }
  }

  log_publishing_options {
    cloudwatch_log_group_arn = aws_cloudwatch_log_group.opensearch.arn
    log_type                 = "INDEX_SLOW_LOGS"
  }

  tags = { Environment = "production", Team = "platform" }
}

resource "aws_opensearch_domain_policy" "main" {
  domain_name     = aws_opensearch_domain.techoral.domain_name
  access_policies = jsonencode({
    Version = "2012-10-17"
    Statement = [{
      Effect    = "Allow"
      Principal = { AWS = "arn:aws:iam::${var.account_id}:root" }
      Action    = "es:*"
      Resource  = "${aws_opensearch_domain.techoral.arn}/*"
    }]
  })
}

3. Index Management — Mappings, Analyzers and Templates

Every document in OpenSearch belongs to an index. An index has a mapping (schema) and settings (shards, replicas, analyzers). Getting your mapping right before ingestion is critical — you cannot change field types after documents are indexed without a full reindex.

Create an index with custom mapping

curl -X PUT "https://<domain-endpoint>/app-logs-2026" \
  -H "Content-Type: application/json" \
  -u admin:Str0ng!Pass \
  -d '{
    "settings": {
      "number_of_shards": 3,
      "number_of_replicas": 1,
      "analysis": {
        "analyzer": {
          "log_analyzer": {
            "type": "custom",
            "tokenizer": "standard",
            "filter": ["lowercase", "stop", "snowball"]
          },
          "url_analyzer": {
            "type": "custom",
            "tokenizer": "uax_url_email",
            "filter": ["lowercase"]
          }
        }
      },
      "index.mapping.total_fields.limit": 2000
    },
    "mappings": {
      "properties": {
        "@timestamp":   { "type": "date" },
        "level":        { "type": "keyword" },
        "service":      { "type": "keyword" },
        "host":         { "type": "keyword" },
        "message":      { "type": "text", "analyzer": "log_analyzer" },
        "trace_id":     { "type": "keyword", "index": false },
        "duration_ms":  { "type": "float" },
        "status_code":  { "type": "short" },
        "request_path": { "type": "text", "analyzer": "url_analyzer",
                          "fields": { "raw": { "type": "keyword" } } },
        "user_id":      { "type": "keyword" },
        "tags":         { "type": "keyword" }
      }
    }
  }'
Mapping best practices:
  • Use keyword for fields you filter, sort, or aggregate on (log level, service name, status codes).
  • Use text for full-text search fields (log messages, descriptions).
  • Set "index": false on high-cardinality fields you never search (trace IDs, raw UUIDs) to save index space.
  • Use "fields" to dual-map a field as both text (for search) and keyword (for aggregations) — essential for request paths.
  • Disable dynamic mapping in production: "dynamic": "strict" prevents accidental field explosion from uncontrolled log sources.

Index Template — auto-apply settings to rolling daily indices

curl -X PUT "https://<domain-endpoint>/_index_template/app-logs-template" \
  -H "Content-Type: application/json" \
  -u admin:Str0ng!Pass \
  -d '{
    "index_patterns": ["app-logs-*"],
    "priority": 100,
    "template": {
      "settings": {
        "number_of_shards": 3,
        "number_of_replicas": 1,
        "index.lifecycle.name": "app-logs-policy"
      },
      "mappings": {
        "dynamic": "strict",
        "properties": {
          "@timestamp": { "type": "date" },
          "level":      { "type": "keyword" },
          "message":    { "type": "text" },
          "service":    { "type": "keyword" },
          "duration_ms": { "type": "float" }
        }
      }
    }
  }'

Index State Management (ISM) Policy — auto-rollover and archive

{
  "policy": {
    "description": "Rollover daily, move to UltraWarm after 7 days, delete after 90",
    "default_state": "hot",
    "states": [
      {
        "name": "hot",
        "actions": [{"rollover": {"min_index_age": "1d", "min_primary_shard_size": "30gb"}}],
        "transitions": [{"state_name": "warm", "conditions": {"min_index_age": "7d"}}]
      },
      {
        "name": "warm",
        "actions": [{"warm_migration": {}}, {"replica_count": {"number_of_replicas": 0}}],
        "transitions": [{"state_name": "cold", "conditions": {"min_index_age": "30d"}}]
      },
      {
        "name": "cold",
        "actions": [{"cold_migration": {"timestamp_field": "@timestamp"}}],
        "transitions": [{"state_name": "delete", "conditions": {"min_index_age": "90d"}}]
      },
      {"name": "delete", "actions": [{"delete": {}}], "transitions": []}
    ]
  }
}

4. REST API Queries — match, bool, aggregations

OpenSearch uses the same JSON query DSL as Elasticsearch. Here are the most important query patterns with real examples.

Full-text search with highlighting

curl -X GET "https://<domain-endpoint>/app-logs-*/_search" \
  -H "Content-Type: application/json" \
  -u admin:Str0ng!Pass \
  -d '{
    "query": {
      "match": {
        "message": {
          "query": "NullPointerException connection timeout",
          "operator": "or",
          "fuzziness": "AUTO"
        }
      }
    },
    "highlight": {
      "fields": { "message": {} }
    },
    "sort": [{"@timestamp": "desc"}],
    "size": 20
  }'

Bool query — filter + must + must_not

curl -X GET "https://<domain-endpoint>/app-logs-*/_search" \
  -H "Content-Type: application/json" \
  -u admin:Str0ng!Pass \
  -d '{
    "query": {
      "bool": {
        "must": [
          { "match": { "message": "database connection" } }
        ],
        "filter": [
          { "term": { "level": "ERROR" } },
          { "term": { "service": "payment-service" } },
          { "range": {
            "@timestamp": {
              "gte": "now-1h/h",
              "lt": "now/h"
            }
          }}
        ],
        "must_not": [
          { "term": { "message": "test" } }
        ]
      }
    },
    "size": 100
  }'
filter vs must: Use filter (not must) for exact-match and range conditions that do not need relevance scoring — filter clauses are cached and dramatically faster for log analytics queries where you don't need BM25 scoring.

Aggregations — error rate by service with time histogram

curl -X GET "https://<domain-endpoint>/app-logs-*/_search" \
  -H "Content-Type: application/json" \
  -u admin:Str0ng!Pass \
  -d '{
    "size": 0,
    "query": {
      "range": { "@timestamp": { "gte": "now-24h", "lt": "now" } }
    },
    "aggs": {
      "by_service": {
        "terms": { "field": "service", "size": 20 },
        "aggs": {
          "error_count": {
            "filter": { "term": { "level": "ERROR" } }
          },
          "p99_duration": {
            "percentiles": { "field": "duration_ms", "percents": [50, 95, 99] }
          },
          "over_time": {
            "date_histogram": {
              "field": "@timestamp",
              "calendar_interval": "1h"
            },
            "aggs": {
              "errors": { "filter": { "term": { "level": "ERROR" } } }
            }
          }
        }
      }
    }
  }'

Bulk indexing — high-throughput document ingestion

curl -X POST "https://<domain-endpoint>/_bulk" \
  -H "Content-Type: application/x-ndjson" \
  -u admin:Str0ng!Pass \
  --data-binary @- <<'EOF'
{"index":{"_index":"app-logs-2026.06.08"}}
{"@timestamp":"2026-06-08T10:00:00Z","level":"ERROR","service":"payment-service","message":"DB connection timeout after 5000ms","duration_ms":5000}
{"index":{"_index":"app-logs-2026.06.08"}}
{"@timestamp":"2026-06-08T10:00:01Z","level":"INFO","service":"auth-service","message":"User login successful","duration_ms":45}
EOF

5. Python and Java Client Code

Python — opensearch-py

from opensearchpy import OpenSearch, RequestsHttpConnection, AWSV4SignerAuth
import boto3, json
from datetime import datetime

# Option A: basic auth (dev/test)
client = OpenSearch(
    hosts=[{"host": "<domain-endpoint>", "port": 443}],
    http_auth=("admin", "Str0ng!Pass"),
    use_ssl=True,
    verify_certs=True,
    connection_class=RequestsHttpConnection,
)

# Option B: IAM SigV4 (production — no stored credentials)
credentials = boto3.Session().get_credentials()
auth = AWSV4SignerAuth(credentials, "us-east-1", "es")
client_iam = OpenSearch(
    hosts=[{"host": "<domain-endpoint>", "port": 443}],
    http_auth=auth,
    use_ssl=True,
    verify_certs=True,
    connection_class=RequestsHttpConnection,
)

# Index a document
def index_log_event(service: str, level: str, message: str, duration_ms: float):
    doc = {
        "@timestamp": datetime.utcnow().isoformat() + "Z",
        "service": service,
        "level": level,
        "message": message,
        "duration_ms": duration_ms,
    }
    response = client.index(
        index=f"app-logs-{datetime.utcnow().strftime('%Y.%m.%d')}",
        body=doc
    )
    return response["_id"]

# Bulk index with error handling
def bulk_index(events: list[dict]) -> dict:
    actions = []
    for event in events:
        actions.append({"index": {"_index": f"app-logs-{datetime.utcnow().strftime('%Y.%m.%d')}"}})
        actions.append(event)
    response = client.bulk(body=actions)
    errors = [item for item in response["items"] if "error" in item.get("index", {})]
    return {"indexed": len(events) - len(errors), "errors": len(errors)}

# Search with aggregations
def get_error_summary(hours: int = 24) -> dict:
    query = {
        "size": 0,
        "query": {"range": {"@timestamp": {"gte": f"now-{hours}h", "lt": "now"}}},
        "aggs": {
            "by_service": {
                "terms": {"field": "service", "size": 50},
                "aggs": {
                    "errors": {"filter": {"term": {"level": "ERROR"}}},
                    "p99": {"percentiles": {"field": "duration_ms", "percents": [99]}}
                }
            }
        }
    }
    result = client.search(index="app-logs-*", body=query)
    buckets = result["aggregations"]["by_service"]["buckets"]
    return {
        b["key"]: {
            "total": b["doc_count"],
            "errors": b["errors"]["doc_count"],
            "p99_ms": b["p99"]["values"].get("99.0")
        }
        for b in buckets
    }

# Scroll through large result sets
def scroll_all_errors(index_pattern: str):
    query = {"query": {"term": {"level": "ERROR"}}, "size": 500}
    response = client.search(index=index_pattern, body=query, scroll="2m")
    scroll_id = response["_scroll_id"]
    hits = response["hits"]["hits"]

    while hits:
        for hit in hits:
            yield hit["_source"]
        response = client.scroll(scroll_id=scroll_id, scroll="2m")
        scroll_id = response["_scroll_id"]
        hits = response["hits"]["hits"]

    client.clear_scroll(scroll_id=scroll_id)

Java — OpenSearch Java Client (2.x)

import org.opensearch.client.opensearch.OpenSearchClient;
import org.opensearch.client.opensearch._types.query_dsl.*;
import org.opensearch.client.opensearch.core.*;
import org.opensearch.client.opensearch.core.search.*;
import org.opensearch.client.transport.httpclient5.ApacheHttpClient5TransportBuilder;
import org.apache.hc.core5.http.HttpHost;
import com.fasterxml.jackson.annotation.JsonProperty;
import java.io.IOException;
import java.time.Instant;
import java.util.List;

// Domain model
record LogEvent(
    @JsonProperty("@timestamp") String timestamp,
    String level,
    String service,
    String message,
    @JsonProperty("duration_ms") double durationMs
) {}

public class OpenSearchService {

    private final OpenSearchClient client;

    public OpenSearchService(String host, int port) {
        var transport = ApacheHttpClient5TransportBuilder
            .builder(new HttpHost("https", host, port))
            .setMapper(new JacksonJsonpMapper())
            .build();
        this.client = new OpenSearchClient(transport);
    }

    // Index a log event
    public String indexEvent(String index, LogEvent event) throws IOException {
        IndexResponse response = client.index(i -> i
            .index(index)
            .document(event)
        );
        return response.id();
    }

    // Full-text search with bool filter
    public List<LogEvent> searchErrors(String service, int lastHours) throws IOException {
        SearchResponse<LogEvent> response = client.search(s -> s
            .index("app-logs-*")
            .query(q -> q
                .bool(b -> b
                    .filter(f -> f.term(t -> t.field("service").value(service)))
                    .filter(f -> f.term(t -> t.field("level").value("ERROR")))
                    .filter(f -> f.range(r -> r
                        .field("@timestamp")
                        .gte(JsonData.of("now-" + lastHours + "h"))
                        .lt(JsonData.of("now"))
                    ))
                )
            )
            .size(1000)
            .sort(so -> so.field(f -> f.field("@timestamp").order(SortOrder.Desc))),
            LogEvent.class
        );

        return response.hits().hits().stream()
            .map(Hit::source)
            .toList();
    }

    // Aggregation — p99 latency per service
    public void getLatencyStats() throws IOException {
        SearchResponse<Void> response = client.search(s -> s
            .index("app-logs-*")
            .size(0)
            .aggregations("by_service", a -> a
                .terms(t -> t.field("service").size(20))
                .aggregations("p99", pa -> pa
                    .percentiles(p -> p.field("duration_ms").percents(99.0))
                )
            ),
            Void.class
        );

        var buckets = response.aggregations()
            .get("by_service").sterms().buckets().array();
        buckets.forEach(bucket -> {
            String service = bucket.key().stringValue();
            double p99 = bucket.aggregations().get("p99")
                .tdigestPercentiles().values().keyed().get("99.0");
            System.out.printf("Service: %-30s P99: %.1fms%n", service, p99);
        });
    }
}

6. Ingestion Pipelines — Kinesis Firehose and Lambda

For high-volume log and event ingestion, you never write directly to OpenSearch from every application. Instead, use Kinesis Data Firehose as a buffering layer that batches records, handles back-pressure, and delivers to OpenSearch with built-in retries.

Architecture: Application → Kinesis Firehose → OpenSearch

# Step 1: Create the Firehose delivery stream
aws firehose create-delivery-stream \
  --delivery-stream-name app-logs-to-opensearch \
  --delivery-stream-type DirectPut \
  --amazon-open-search-service-destination-configuration '{
    "RoleARN": "arn:aws:iam::123456789:role/firehose-opensearch-role",
    "DomainARN": "arn:aws:es:us-east-1:123456789:domain/techoral-search",
    "IndexName": "app-logs",
    "IndexRotationPeriod": "OneDay",
    "TypeName": "_doc",
    "S3BackupMode": "FailedDocumentsOnly",
    "S3Configuration": {
      "RoleARN": "arn:aws:iam::123456789:role/firehose-opensearch-role",
      "BucketARN": "arn:aws:s3:::my-firehose-backup",
      "Prefix": "opensearch-failed/",
      "BufferingHints": {"SizeInMBs": 5, "IntervalInSeconds": 60}
    },
    "BufferingHints": {"IntervalInSeconds": 60, "SizeInMBs": 5},
    "RetryOptions": {"DurationInSeconds": 300},
    "ProcessingConfiguration": {
      "Enabled": true,
      "Processors": [{
        "Type": "Lambda",
        "Parameters": [{
          "ParameterName": "LambdaArn",
          "ParameterValue": "arn:aws:lambda:us-east-1:123456789:function:log-transformer"
        }]
      }]
    }
  }'

Lambda transformation function — enrich records before OpenSearch

import base64, json, time

def lambda_handler(event, context):
    output = []
    for record in event["records"]:
        payload = json.loads(base64.b64decode(record["data"]).decode("utf-8"))

        # Enrich with derived fields
        payload["@timestamp"] = payload.get("timestamp") or time.strftime(
            "%Y-%m-%dT%H:%M:%SZ", time.gmtime()
        )
        # Classify severity
        message = payload.get("message", "").lower()
        if "exception" in message or "error" in message:
            payload["severity"] = "high"
        elif "warn" in message:
            payload["severity"] = "medium"
        else:
            payload["severity"] = "low"

        # Firehose requires base64-encoded newline-terminated JSON
        transformed = json.dumps(payload) + "\n"
        output.append({
            "recordId": record["recordId"],
            "result": "Ok",
            "data": base64.b64encode(transformed.encode("utf-8")).decode("utf-8")
        })
    return {"records": output}

Direct Lambda → OpenSearch (low-latency path)

For real-time alerting use cases where sub-second indexing matters, Lambda can write directly:

import boto3, json
from opensearchpy import OpenSearch, RequestsHttpConnection, AWSV4SignerAuth
from datetime import datetime

credentials = boto3.Session().get_credentials()
auth = AWSV4SignerAuth(credentials, "us-east-1", "es")

os_client = OpenSearch(
    hosts=[{"host": "DOMAIN_ENDPOINT", "port": 443}],
    http_auth=auth,
    use_ssl=True,
    verify_certs=True,
    connection_class=RequestsHttpConnection,
)

def lambda_handler(event, context):
    # Process SQS messages and index alerts immediately
    docs = []
    for record in event["Records"]:
        body = json.loads(record["body"])
        docs.append({"index": {"_index": f"alerts-{datetime.utcnow().strftime('%Y.%m')}"}})
        docs.append({
            "@timestamp": datetime.utcnow().isoformat() + "Z",
            "alert_type": body.get("type"),
            "severity": body.get("severity"),
            "resource": body.get("resource"),
            "message": body.get("message"),
        })

    if docs:
        result = os_client.bulk(body=docs)
        errors = [i for i in result["items"] if "error" in i.get("index", {})]
        if errors:
            print(f"Bulk errors: {json.dumps(errors)}")
    return {"indexed": len(docs) // 2}

7. Log Analytics — Parsing ALB and CloudFront Logs

One of the most common OpenSearch use cases is centralising AWS access logs for security analysis, performance monitoring, and compliance. ALB and CloudFront logs land in S3 — use Kinesis Data Firehose with S3 event notifications or a Lambda S3 trigger to stream them into OpenSearch.

ALB log field mapping

curl -X PUT "https://<domain-endpoint>/alb-logs-2026" \
  -H "Content-Type: application/json" \
  -u admin:Str0ng!Pass \
  -d '{
    "mappings": {
      "properties": {
        "@timestamp":      { "type": "date" },
        "type":            { "type": "keyword" },
        "elb":             { "type": "keyword" },
        "client_ip":       { "type": "ip" },
        "client_port":     { "type": "integer" },
        "target_ip":       { "type": "ip" },
        "request_method":  { "type": "keyword" },
        "request_url":     { "type": "keyword" },
        "http_version":    { "type": "keyword" },
        "status_code":     { "type": "short" },
        "target_status":   { "type": "short" },
        "received_bytes":  { "type": "long" },
        "sent_bytes":      { "type": "long" },
        "request_time":    { "type": "float" },
        "target_time":     { "type": "float" },
        "user_agent":      { "type": "text",
                             "fields": { "raw": { "type": "keyword" } } },
        "ssl_cipher":      { "type": "keyword" },
        "ssl_protocol":    { "type": "keyword" },
        "target_group_arn":{ "type": "keyword" },
        "trace_id":        { "type": "keyword", "index": false },
        "country":         { "type": "keyword" },
        "city":            { "type": "keyword" }
      }
    }
  }'

Lambda — parse and index ALB access logs from S3

import boto3, re, json
from opensearchpy import OpenSearch, RequestsHttpConnection, AWSV4SignerAuth
from datetime import datetime

s3 = boto3.client("s3")
credentials = boto3.Session().get_credentials()
auth = AWSV4SignerAuth(credentials, "us-east-1", "es")
os_client = OpenSearch(
    hosts=[{"host": "DOMAIN_ENDPOINT", "port": 443}],
    http_auth=auth, use_ssl=True, verify_certs=True,
    connection_class=RequestsHttpConnection,
)

# ALB log regex (space-delimited)
ALB_PATTERN = re.compile(
    r'(?P<type>\S+) (?P<timestamp>\S+) (?P<elb>\S+) '
    r'(?P<client>\S+) (?P<target>\S+) \S+ \S+ \S+ '
    r'(?P<status>\d+) (?P<target_status>\d+|-) '
    r'(?P<received_bytes>\d+) (?P<sent_bytes>\d+) '
    r'(?P<request_time>[\d.]+) \S+ \S+ '
    r'"(?P<method>\S+) (?P<url>\S+) (?P<http_version>\S+)" '
    r'"(?P<user_agent>[^"]*)"'
)

def lambda_handler(event, context):
    docs = []
    for record in event["Records"]:
        bucket = record["s3"]["bucket"]["name"]
        key = record["s3"]["object"]["key"]
        obj = s3.get_object(Bucket=bucket, Key=key)
        for line in obj["Body"].read().decode("utf-8").splitlines():
            m = ALB_PATTERN.match(line)
            if not m:
                continue
            g = m.groupdict()
            client_ip = g["client"].split(":")[0]
            docs.append({"index": {"_index": "alb-logs-2026"}})
            docs.append({
                "@timestamp": g["timestamp"],
                "type": g["type"],
                "elb": g["elb"],
                "client_ip": client_ip,
                "request_method": g["method"],
                "request_url": g["url"],
                "http_version": g["http_version"],
                "status_code": int(g["status"]),
                "received_bytes": int(g["received_bytes"]),
                "sent_bytes": int(g["sent_bytes"]),
                "request_time": float(g["request_time"]),
                "user_agent": g["user_agent"],
            })
        if len(docs) >= 1000:
            os_client.bulk(body=docs)
            docs = []
    if docs:
        os_client.bulk(body=docs)

Useful ALB analytics queries

# Top 10 slowest endpoints in the last hour
curl -X GET "https://<domain-endpoint>/alb-logs-*/_search" \
  -u admin:Str0ng!Pass -H "Content-Type: application/json" \
  -d '{
    "size": 0,
    "query": {"range": {"@timestamp": {"gte": "now-1h"}}},
    "aggs": {
      "by_url": {
        "terms": {"field": "request_url", "size": 10,
                  "order": {"avg_time": "desc"}},
        "aggs": {
          "avg_time": {"avg": {"field": "request_time"}},
          "p99_time": {"percentiles": {"field": "request_time", "percents": [99]}}
        }
      }
    }
  }'

# 5xx error rate by client IP (detect scrapers / attacks)
curl -X GET "https://<domain-endpoint>/alb-logs-*/_search" \
  -u admin:Str0ng!Pass -H "Content-Type: application/json" \
  -d '{
    "size": 0,
    "query": {
      "bool": {
        "filter": [
          {"range": {"status_code": {"gte": 500}}},
          {"range": {"@timestamp": {"gte": "now-24h"}}}
        ]
      }
    },
    "aggs": {
      "top_offenders": {
        "terms": {"field": "client_ip", "size": 20}
      }
    }
  }'

8. OpenSearch Dashboards

OpenSearch Dashboards (the Kibana fork) ships with every AWS OpenSearch domain. Access it at https://<domain-endpoint>/_dashboards. It provides data visualisation, saved queries, alerting, anomaly detection, and the Dev Tools console for running DSL queries interactively.

Initial setup

  1. Navigate to Management → Stack Management → Index Patterns.
  2. Create an index pattern: app-logs-* with @timestamp as the time field.
  3. Open Discover to explore raw documents. Use the KQL (Kibana Query Language) bar: level: "ERROR" AND service: "payment-service".
  4. Create a Lens visualisation — drag @timestamp onto the X-axis and Count onto Y to get an event rate histogram.
  5. Add a Data Table aggregation: split rows by service keyword field, then level. Add a metric column for 99th percentile of duration_ms.
  6. Pin visualisations to a Dashboard and set auto-refresh to 30 seconds for live monitoring.
Useful Dashboards panels for log analytics:
  • Error rate over time — date histogram + filter on level=ERROR.
  • Top error messages — terms aggregation on message.raw, sorted by count.
  • Service heat map — service on X, hour-of-day on Y, error count as metric.
  • P99 latency per service — horizontal bar chart with percentile metric.
  • Geo map — if you index client_ip as type ip and enable GeoIP enrichment, plot request origins on a world map.

Alerting — notify on anomalies

{
  "name": "High error rate alert",
  "type": "monitor",
  "monitor_type": "query_level_monitor",
  "enabled": true,
  "schedule": {"period": {"interval": 5, "unit": "MINUTES"}},
  "inputs": [{
    "search": {
      "indices": ["app-logs-*"],
      "query": {
        "size": 0,
        "query": {
          "bool": {
            "filter": [
              {"term": {"level": "ERROR"}},
              {"range": {"@timestamp": {"gte": "now-5m", "lt": "now"}}}
            ]
          }
        }
      }
    }
  }],
  "triggers": [{
    "name": "more than 50 errors in 5 min",
    "severity": "2",
    "condition": {"script": {"source": "ctx.results[0].hits.total.value > 50"}},
    "actions": [{
      "name": "notify-slack",
      "destination_id": "<slack-destination-id>",
      "message_template": {
        "source": "Error spike: {{ctx.results.0.hits.total.value}} errors in the last 5 minutes on {{ctx.monitor.name}}"
      }
    }]
  }]
}

9. Security — Fine-Grained Access Control, SAML, VPC

OpenSearch Service provides multiple security layers that work together. You should enable all of them.

Fine-Grained Access Control (FGAC)

FGAC lets you control access at the index, document, and field level using roles mapped to IAM principals or internal users. The master user (set at domain creation) configures roles via the _plugins/_security API:

# Create a read-only role for a specific index
curl -X PUT "https://<domain-endpoint>/_plugins/_security/api/roles/app-logs-reader" \
  -H "Content-Type: application/json" \
  -u admin:Str0ng!Pass \
  -d '{
    "index_permissions": [{
      "index_patterns": ["app-logs-*"],
      "allowed_actions": ["read", "indices:data/read/search", "indices:data/read/msearch"]
    }],
    "cluster_permissions": ["cluster_composite_ops_ro"]
  }'

# Map an IAM role to the OpenSearch role
curl -X PUT "https://<domain-endpoint>/_plugins/_security/api/rolesmapping/app-logs-reader" \
  -H "Content-Type: application/json" \
  -u admin:Str0ng!Pass \
  -d '{
    "backend_roles": ["arn:aws:iam::123456789:role/AppLogsReadRole"],
    "hosts": [],
    "users": []
  }'

# Create an internal user for OpenSearch Dashboards
curl -X PUT "https://<domain-endpoint>/_plugins/_security/api/internalusers/dashboard-user" \
  -H "Content-Type: application/json" \
  -u admin:Str0ng!Pass \
  -d '{
    "password": "DashboardPass#99",
    "backend_roles": ["app-logs-reader"],
    "attributes": {"department": "engineering"}
  }'

SAML SSO for OpenSearch Dashboards

# Configure SAML via the security API (requires master credentials)
curl -X PUT "https://<domain-endpoint>/_plugins/_security/api/securityconfig" \
  -H "Content-Type: application/json" \
  -u admin:Str0ng!Pass \
  -d '{
    "dynamic": {
      "authc": {
        "saml_auth_domain": {
          "http_enabled": true,
          "order": 5,
          "http_authenticator": {
            "type": "saml",
            "challenge": true,
            "config": {
              "idp": {
                "metadata_url": "https://sso.company.com/saml/metadata",
                "entity_id": "https://sso.company.com"
              },
              "sp": {
                "entity_id": "https://<domain-endpoint>"
              },
              "kibana_url": "https://<domain-endpoint>/_dashboards",
              "roles_key": "Role",
              "exchange_key": "<random-256-bit-string>"
            }
          },
          "authentication_backend": {"type": "noop"}
        }
      }
    }
  }'
VPC deployment: For production, always place your OpenSearch domain inside a VPC. Set VPCOptions with the appropriate subnet IDs and security groups at domain creation. VPC domains are not accessible from the public internet — access goes through VPC endpoints, Transit Gateway, or VPN. This is the most important security control and cannot be changed after domain creation.

Resource-based access policy (deny public access)

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Principal": {
        "AWS": [
          "arn:aws:iam::123456789:role/OpenSearchIngestionRole",
          "arn:aws:iam::123456789:role/DashboardAccessRole"
        ]
      },
      "Action": "es:*",
      "Resource": "arn:aws:es:us-east-1:123456789:domain/techoral-search/*"
    },
    {
      "Effect": "Deny",
      "Principal": {"AWS": "*"},
      "Action": "es:*",
      "Resource": "arn:aws:es:us-east-1:123456789:domain/techoral-search/*",
      "Condition": {
        "StringNotEquals": {
          "aws:PrincipalOrgID": "o-xxxxxxxxxx"
        }
      }
    }
  ]
}

10. Cost Optimisation — UltraWarm and Cold Storage

Hot storage (EBS gp3) is expensive for data older than a few days that is rarely queried. AWS OpenSearch Service provides two lower-cost storage tiers that can reduce your storage bill by up to 90%.

Storage tier comparison:
Tier Storage Cost (approx) Query latency Use case
HotEBS gp3$0.10/GB/monthMillisecondsLast 7 days of active data
UltraWarmS3 + node cache$0.024/GB/monthSeconds7–30 days of searchable history
ColdS3 only$0.007/GB/monthMinutes (after attach)Compliance / rare access

Enable UltraWarm on an existing domain

aws opensearch update-domain-config \
  --domain-name techoral-search \
  --warm-enabled \
  --warm-type ultrawarm1.medium.search \
  --warm-count 2 \
  --region us-east-1

Manually migrate an index to UltraWarm

# Migrate index older than 7 days to UltraWarm
curl -X POST "https://<domain-endpoint>/_ultrawarm/migration/app-logs-2026.05.30/_warm" \
  -u admin:Str0ng!Pass

# Check migration status
curl -X GET "https://<domain-endpoint>/_ultrawarm/migration/app-logs-2026.05.30/_status" \
  -u admin:Str0ng!Pass

Cold Storage — attach/detach for compliance access

# Move index from UltraWarm to Cold Storage
curl -X POST "https://<domain-endpoint>/_cold/migration/app-logs-2026.04.01/_cold" \
  -u admin:Str0ng!Pass

# Attach a cold index for querying (takes 1–5 minutes)
curl -X POST "https://<domain-endpoint>/_cold/migration/app-logs-2026.04.01/_warm" \
  -u admin:Str0ng!Pass

Cost optimisation checklist

  • Use ISM policies to automatically migrate indices through hot → warm → cold → delete tiers. A 90-day lifecycle can cut storage costs by 70%.
  • Right-size your data nodes — don't use r6g.2xlarge for a cluster that indexes 1GB/day. Start with r6g.large and scale out (add nodes) rather than up.
  • Use Graviton2 instances (r6g, c6g, m6g) over r5/c5 — same performance, ~20% cheaper.
  • Set replica count to 0 on UltraWarm indices — replicas live in S3, not on nodes.
  • Enable gp3 EBS (not gp2) for hot nodes — gp3 provides 3000 IOPS baseline included, no need to over-provision storage just to get IOPS.
  • Use Reserved Instance pricing for 1- or 3-year terms on stable baseline nodes — up to 40% off on-demand pricing.
  • Monitor HotToWarmMigrationQueueSize and WarmFreeStorageSpace CloudWatch metrics to detect migration bottlenecks early.

Frequently Asked Questions

Can I use the Elasticsearch client libraries with AWS OpenSearch?

Yes, for OpenSearch 1.x domains that run Elasticsearch-compatible APIs. However, for OpenSearch 2.x AWS recommends using the official opensearch-py (Python) or opensearch-java (Java) clients. The Elasticsearch 7.x clients will work for basic operations but miss OpenSearch-specific APIs like ISM, anomaly detection, and the k-NN plugin. Migration is straightforward — the client API surface is nearly identical.

How many shards should I use per index?

Target 10–50 GB per primary shard for most log analytics workloads. For a daily index that receives 30 GB, 3 primary shards is ideal. Avoid the "one shard per node" antipattern — too many small shards creates coordination overhead that exceeds query latency savings. A good formula: primary_shards = ceil(daily_data_GB / 30). Always set replicas to at least 1 in production for high availability.

What is the difference between OpenSearch Ingestion and Kinesis Firehose?

Amazon OpenSearch Ingestion (OSI, launched 2023) is a serverless, purpose-built data pipeline for OpenSearch based on the OpenTelemetry Data Prepper engine. It supports Grok parsing, routing, conditional filtering, and aggregations — closer to a Logstash replacement. Kinesis Firehose is simpler and better for pure buffering/delivery with Lambda transformation. Use OSI when you need complex log parsing or fan-out to multiple indices. Use Firehose when you already have Kinesis streams or need the simplest possible path from an AWS service to OpenSearch.

How do I perform a zero-downtime reindex (change mapping)?

Use the Reindex API combined with index aliases. Create the new index with the corrected mapping, reindex from old to new, then atomically swap the alias: POST /_aliases {"actions": [{"remove": {"index": "logs-v1", "alias": "logs"}}, {"add": {"index": "logs-v2", "alias": "logs"}}]}. Applications read and write through the alias logs and see no disruption. The reindex of a large index can take hours — always run it during low-traffic periods with throttling: add "requests_per_second": 500 to the reindex body.

How do I monitor OpenSearch domain health?

Key CloudWatch metrics to alarm on: ClusterStatus.red (primary shards unassigned — critical), ClusterStatus.yellow (replica shards unassigned — warning), FreeStorageSpace (alert below 20%), JVMMemoryPressure (alert above 85%), CPUUtilization (alert above 80% sustained), KibanaHealthyNodes, and SearchLatency p99. Set a CloudWatch alarm on FreeStorageSpace < 20480 (20 GB) to trigger auto-scaling or UltraWarm migration before the domain goes read-only.

Stay Updated with Techoral

Get the latest AWS, Java, and DevOps articles straight to your inbox.