AWS EventBridge Pipes: Point-to-Point Event Integration Made Simple (2026)

AWS EventBridge Pipes

AWS EventBridge Pipes, launched in late 2022 and now a production-grade integration primitive, solves a specific and common problem: you have a stream or queue (SQS, DynamoDB Streams, Kinesis, MSK, MQ) and you want to connect it to a target (Lambda, Step Functions, SQS, EventBridge bus, API destination) — optionally filtering noise and enriching payloads along the way. Before Pipes, this meant writing and maintaining Lambda functions whose only job was to poll a queue, filter records, call an enrichment service, and forward to a target. Pipes does all of that without any glue code, at $0.40 per million events processed.

This guide covers everything: Pipes anatomy, all supported sources and targets, SQS→Lambda, DynamoDB Streams→SQS, Kinesis→DynamoDB, filter criteria syntax, enrichment patterns, real-world use cases, full Terraform configuration, and monitoring. By the end you'll know exactly when to reach for Pipes instead of an Event Bus rule or a Step Functions workflow.

Pipes vs Event Bus vs Step Functions — When Pipes Wins

Choosing the right EventBridge primitive matters for architecture clarity, cost, and operational simplicity. The three options — Event Bus rules, EventBridge Pipes, and Step Functions — overlap in surface area but serve different patterns.

DimensionEvent Bus RuleEventBridge PipesStep Functions
Topology1 bus → N targets (fan-out)1 source → 1 target (point-to-point)Orchestrator calling N services
Source typesEvents put onto the busSQS, Kinesis, DynamoDB Streams, MSK, MQ, self-managed KafkaManual start, API Gateway, EventBridge rule
Built-in pollingNo — must use event triggerYes — managed polling of streams/queuesNo
FilteringRich JSON pattern on busFilter criteria before enrichment and targetChoice/condition states inside workflow
EnrichmentNo built-in enrichmentLambda / API GW / Step Functions syncEvery step can call any service
Glue code neededNone (for routing)None (for polling + filter + enrich + route)Yes — define every state explicitly
Cost model$1.00/M events (custom bus)$0.40/M events processed$0.025/1000 state transitions
Max targets5 per rule1 (by design)Unlimited via states
When to useFan-out, cross-account routing, SaaS eventsStream/queue → single consumer with filter + enrichMulti-step orchestration, long-running workflows
Rule of Thumb: If you find yourself writing a Lambda that only polls SQS, checks a condition, and calls another service — that is exactly the Pipes use case. Replace the Lambda entirely. Pipes wins when the pattern is strictly point-to-point and the source is a polling-based stream or queue.

Key differentiator: Event Bus rules don't poll. They react to events already on the bus. To get events from SQS or DynamoDB Streams onto a bus you'd need a Lambda trigger — and Pipes is that trigger, but fully managed with no code. Step Functions is the right choice when you need branching logic, retries across multiple services, or workflows that span minutes or hours.

Pipe Anatomy: Source → Filter → Enrich → Target

A Pipe is a fully managed, serverless pipeline with four stages. Only source and target are required; filter and enrichment are optional.

┌──────────────┐    ┌──────────────┐    ┌──────────────────┐    ┌──────────────┐
│    SOURCE    │───▶│   FILTER     │───▶│   ENRICHMENT     │───▶│    TARGET    │
│ (polling)    │    │ (drop noise) │    │ (Lambda/APIGW/   │    │ (invoke,     │
│              │    │              │    │  Step Functions) │    │  send, put)  │
└──────────────┘    └──────────────┘    └──────────────────┘    └──────────────┘

Stage 1 — Source: Pipes polls the source on your behalf. You configure batch size, batch window, starting position (for Kinesis/DynamoDB), and concurrency. The managed poller handles checkpointing, retries, and shard/partition scaling.

Stage 2 — Filter: Before any enrichment or target invocation, Pipes evaluates filter criteria against each record in the batch. Records that don't match are silently dropped. This is the cheapest way to reduce downstream invocations — you're billed only for records that pass the filter.

Stage 3 — Enrichment (optional): The filtered batch is sent synchronously to a Lambda function, API Gateway REST/HTTP endpoint, or a Step Functions Express Workflow. The enrichment service receives the batch, augments the records (e.g., fetches additional data from DynamoDB or a third-party API), and returns the enriched payload. The enriched payload is what the target receives.

Stage 4 — Target: The enriched records are delivered to the target. Pipes supports a wide range of targets.

Supported SourcesSupported Targets
Amazon SQS (standard & FIFO)AWS Lambda
Amazon Kinesis Data StreamsAmazon SQS (standard & FIFO)
Amazon DynamoDB StreamsAmazon SNS
Amazon MSK (managed Kafka)Amazon Kinesis Data Streams
Self-managed Apache KafkaAmazon EventBridge event bus
Amazon MQ (ActiveMQ / RabbitMQ)Amazon EventBridge API Destination
AWS Step Functions (Express or Standard)
Amazon ECS task (run task)
Amazon API Gateway REST / HTTP endpoint
AWS Batch job queue
Amazon SageMaker Pipeline
Amazon Redshift Data API
Note: Each pipe connects exactly one source to exactly one target. For fan-out (one source, multiple consumers), use the target as an EventBridge event bus and add rules there, or use an SNS target.

SQS → Lambda Pipe

The most common Pipes pattern replaces a manually-wired SQS trigger on a Lambda function. The managed Pipes version adds built-in filtering and enrichment that would otherwise require code inside the Lambda.

Creating the Pipe via AWS CLI

# 1. Create an IAM role that Pipes can assume
aws iam create-role \
  --role-name EventBridgePipesSQSRole \
  --assume-role-policy-document '{
    "Version": "2012-10-17",
    "Statement": [{
      "Effect": "Allow",
      "Principal": {"Service": "pipes.amazonaws.com"},
      "Action": "sts:AssumeRole"
    }]
  }'

# 2. Attach permissions for SQS read and Lambda invoke
aws iam put-role-policy \
  --role-name EventBridgePipesSQSRole \
  --policy-name PipesPolicy \
  --policy-document '{
    "Version": "2012-10-17",
    "Statement": [
      {
        "Effect": "Allow",
        "Action": [
          "sqs:ReceiveMessage",
          "sqs:DeleteMessage",
          "sqs:GetQueueAttributes"
        ],
        "Resource": "arn:aws:sqs:us-east-1:123456789012:my-orders-queue"
      },
      {
        "Effect": "Allow",
        "Action": "lambda:InvokeFunction",
        "Resource": "arn:aws:lambda:us-east-1:123456789012:function:process-order"
      }
    ]
  }'

# 3. Create the pipe with filter criteria
aws pipes create-pipe \
  --name "orders-sqs-to-lambda" \
  --role-arn "arn:aws:iam::123456789012:role/EventBridgePipesSQSRole" \
  --source "arn:aws:sqs:us-east-1:123456789012:my-orders-queue" \
  --source-parameters '{
    "SqsQueueParameters": {
      "BatchSize": 10,
      "MaximumBatchingWindowInSeconds": 5
    },
    "FilterCriteria": {
      "Filters": [{
        "Pattern": "{\"body\":{\"orderStatus\":[\"PENDING\"]}}"
      }]
    }
  }' \
  --target "arn:aws:lambda:us-east-1:123456789012:function:process-order" \
  --target-parameters '{
    "LambdaFunctionParameters": {
      "InvocationMode": "FIRE_AND_FORGET"
    }
  }'

Dead-Letter Queue Configuration

For SQS sources, configure a DLQ directly on the SQS queue (not on the Pipe). Messages that fail Lambda processing are returned to the queue, retried up to maxReceiveCount times, then moved to the DLQ. For Kinesis and DynamoDB Streams sources, configure the DLQ on the Pipe's source parameters.

# Set DLQ on the SQS queue itself (recommended for SQS sources)
aws sqs set-queue-attributes \
  --queue-url https://sqs.us-east-1.amazonaws.com/123456789012/my-orders-queue \
  --attributes '{
    "RedrivePolicy": "{\"deadLetterTargetArn\":\"arn:aws:sqs:us-east-1:123456789012:orders-dlq\",\"maxReceiveCount\":\"3\"}"
  }'
Invocation Modes: Use FIRE_AND_FORGET for async Lambda invocation (higher throughput, no response needed). Use REQUEST_RESPONSE when the Lambda response will be used — this is required for enrichment Lambdas. For targets, FIRE_AND_FORGET is typically correct; for enrichment, Pipes always uses synchronous invocation internally.

DynamoDB Streams → EventBridge Pipe

DynamoDB Streams emit change records for every item write. A common problem: you want to process only INSERT events (ignoring MODIFY and REMOVE), and enrich the record with additional attributes before forwarding downstream. Without Pipes, this requires a Lambda trigger with conditional logic. With Pipes, the filter handles the INSERT-only requirement, and enrichment handles the data augmentation — zero glue code.

Stream View Type

Choose the right stream view type when enabling DynamoDB Streams. This cannot be changed without disabling and re-enabling the stream:

  • NEW_IMAGE — The item after the change (sufficient for INSERT and MODIFY processing)
  • OLD_IMAGE — The item before the change (useful for REMOVE and audit logs)
  • NEW_AND_OLD_IMAGES — Both images (most flexible, higher data volume)
  • KEYS_ONLY — Only the primary key (lowest overhead; use when you'll fetch full item separately)
# Enable DynamoDB Streams with NEW_IMAGE view
aws dynamodb update-table \
  --table-name orders \
  --stream-specification StreamEnabled=true,StreamViewType=NEW_IMAGE

# Get the stream ARN
aws dynamodb describe-table --table-name orders \
  --query 'Table.LatestStreamArn' --output text

Pipe Configuration — INSERT Only, Enrichment to Lambda, Target to SQS

aws pipes create-pipe \
  --name "orders-dynamodb-to-sqs" \
  --role-arn "arn:aws:iam::123456789012:role/EventBridgePipesDDBRole" \
  --source "arn:aws:dynamodb:us-east-1:123456789012:table/orders/stream/2026-06-09T00:00:00.000" \
  --source-parameters '{
    "DynamoDBStreamParameters": {
      "StartingPosition": "TRIM_HORIZON",
      "BatchSize": 25,
      "MaximumBatchingWindowInSeconds": 10,
      "MaximumRetryAttempts": 3,
      "DeadLetterConfig": {
        "Arn": "arn:aws:sqs:us-east-1:123456789012:orders-ddb-dlq"
      }
    },
    "FilterCriteria": {
      "Filters": [{
        "Pattern": "{\"dynamodb\":{\"NewImage\":{\"status\":{\"S\":[\"PLACED\"]}},\"StreamViewType\":[\"NEW_IMAGE\"]},\"eventName\":[\"INSERT\"]}"
      }]
    }
  }' \
  --enrichment "arn:aws:lambda:us-east-1:123456789012:function:enrich-order" \
  --target "arn:aws:sqs:us-east-1:123456789012:downstream-orders-queue"

Enrichment Lambda (Python)

import json
import boto3

dynamodb = boto3.resource('dynamodb')
customers_table = dynamodb.Table('customers')

def lambda_handler(event, context):
    """
    Enrichment Lambda receives a list of DynamoDB stream records.
    Must return a list of the same length — each item is the enriched payload
    for the corresponding input record. Return None for a record to drop it.
    """
    enriched_records = []

    for record in event:
        new_image = record.get('dynamodb', {}).get('NewImage', {})
        customer_id = new_image.get('customerId', {}).get('S')
        order_id = new_image.get('orderId', {}).get('S')
        amount = float(new_image.get('totalAmount', {}).get('N', '0'))

        # Fetch customer tier from a separate table
        customer_tier = 'standard'
        if customer_id:
            response = customers_table.get_item(Key={'customerId': customer_id})
            if 'Item' in response:
                customer_tier = response['Item'].get('tier', 'standard')

        enriched_records.append({
            'orderId': order_id,
            'customerId': customer_id,
            'totalAmount': amount,
            'customerTier': customer_tier,
            'priorityProcessing': customer_tier == 'premium',
            'originalRecord': record
        })

    return enriched_records
Enrichment Contract: The enrichment Lambda must return a list of the same length as the input batch. Pipes uses the response as the payload sent to the target. If your enrichment returns fewer items, Pipes raises a validation error. Return a sentinel value (e.g., {"skip": true}) for records you want to pass through unchanged, and handle it in the target Lambda.

Kinesis → Pipe

Kinesis Data Streams are the highest-throughput source for Pipes. The key configuration choices are starting position, batch size, bisect-on-error, and partial batch response — all of which affect how failures behave.

aws pipes create-pipe \
  --name "iot-kinesis-to-dynamodb" \
  --role-arn "arn:aws:iam::123456789012:role/EventBridgePipesKinesisRole" \
  --source "arn:aws:kinesis:us-east-1:123456789012:stream/iot-telemetry" \
  --source-parameters '{
    "KinesisStreamParameters": {
      "StartingPosition": "LATEST",
      "BatchSize": 100,
      "MaximumBatchingWindowInSeconds": 30,
      "MaximumRetryAttempts": 5,
      "BisectBatchOnFunctionError": true,
      "ParallelizationFactor": 2,
      "DeadLetterConfig": {
        "Arn": "arn:aws:sqs:us-east-1:123456789012:iot-dlq"
      }
    },
    "FilterCriteria": {
      "Filters": [{
        "Pattern": "{\"data\":{\"sensorType\":[\"TEMPERATURE\",\"HUMIDITY\"]}}"
      }]
    }
  }' \
  --target "arn:aws:dynamodb:us-east-1:123456789012:table/iot-readings" \
  --target-parameters '{
    "DynamoDBParameters": {
      "TableName": "iot-readings"
    }
  }'

Starting Position options:

  • LATEST — Process only new records (use for live data; skips historical backlog)
  • TRIM_HORIZON — Process all records in the stream (use for backfill or initial load)
  • AT_TIMESTAMP — Start from a specific point in time (use for recovery scenarios)

BisectBatchOnFunctionError: When a batch fails, Pipes splits it in half and retries each half independently. This isolates poison-pill records efficiently — instead of retrying a batch of 100 records for every failure caused by one bad record, bisection will isolate the bad record in O(log N) iterations and send only it to the DLQ.

Partial Batch Response: For Kinesis and DynamoDB Streams, your target Lambda can report partial success by returning {"batchItemFailures": [{"itemIdentifier": "shardId-sequence"}]}. This prevents successfully processed records from being retried while re-driving only the failed ones. Pipes supports this natively — set FunctionResponseTypes: ["ReportBatchItemFailures"] in the source parameters.

Filter Criteria Syntax

Pipes filter criteria use the same JSON pattern syntax as EventBridge event bus rules. Filters are evaluated before enrichment — records that don't match are dropped and never billed as enrichment or target invocations. This is the most cost-effective optimization in a Pipes pipeline.

Basic Equality Filter

{
  "Filters": [{
    "Pattern": "{\"body\":{\"eventType\":[\"ORDER_PLACED\"]}}"
  }]
}

Multiple Values (OR within a field)

{
  "Filters": [{
    "Pattern": "{\"body\":{\"status\":[\"PENDING\",\"PROCESSING\"]}}"
  }]
}

AND Conditions (multiple fields in same filter)

{
  "Filters": [{
    "Pattern": "{\"body\":{\"region\":[\"us-east-1\"],\"amount\":[{\"numeric\":[\">=\",100]}]}}"
  }]
}

Numeric Comparisons

{
  "Filters": [{
    "Pattern": "{\"body\":{\"amount\":[{\"numeric\":[\">\",0,\"<=\",10000]}]}}"
  }]
}

Supported numeric operators: =, !=, >, >=, <, <=. Combine two into a range with a single array: [">", 0, "<=", 1000].

Exists / Not-Exists Patterns

{
  "Filters": [{
    "Pattern": "{\"body\":{\"couponCode\":[{\"exists\":true}]}}"
  }]
}

// Drop records where errorCode is present
{
  "Filters": [{
    "Pattern": "{\"body\":{\"errorCode\":[{\"exists\":false}]}}"
  }]
}

Prefix Matching

{
  "Filters": [{
    "Pattern": "{\"body\":{\"orderId\":[{\"prefix\":\"ORD-2026\"}]}}"
  }]
}

OR across Filters (multiple filter objects)

Multiple filter objects in the Filters array are evaluated with OR logic — a record passes if it matches ANY filter.

{
  "Filters": [
    {"Pattern": "{\"body\":{\"eventType\":[\"ORDER_PLACED\"]}}"},
    {"Pattern": "{\"body\":{\"eventType\":[\"PAYMENT_RECEIVED\"]}}"}
  ]
}

Testing Filters Without Deploying

aws pipes test-event-pattern \
  --event-pattern '{"body":{"eventType":["ORDER_PLACED"]}}' \
  --event '{"body":{"eventType":"ORDER_PLACED","orderId":"ORD-001"}}'
# Output: {"Result": "MATCHED"}

Enrichment: Lambda, API Gateway, Step Functions

Enrichment is the optional middle stage where Pipes pauses the pipeline, calls a service synchronously, waits for a response, and uses that response as the input to the target. Three enrichment types are supported.

Lambda Enrichment

The enrichment Lambda receives the filtered batch as a JSON array. It must return a JSON array of the same length. Pipes enforces the one-to-one mapping — the Nth output element becomes the input to the target for the Nth input record.

import json
import boto3
import requests

ssm = boto3.client('ssm')

def lambda_handler(event, context):
    """
    Enrichment Lambda: augments SQS messages with product catalog data.
    event = list of SQS message objects from the Pipe
    """
    # Fetch config from SSM (cached after first call via Lambda runtime reuse)
    catalog_api_url = ssm.get_parameter(Name='/myapp/catalog-api-url')['Parameter']['Value']

    enriched = []
    for record in event:
        body = json.loads(record['body'])
        product_id = body.get('productId')

        # Call external catalog API
        product_details = {}
        if product_id:
            resp = requests.get(f"{catalog_api_url}/products/{product_id}", timeout=2)
            if resp.status_code == 200:
                product_details = resp.json()

        enriched.append({
            **body,
            'productName': product_details.get('name', 'Unknown'),
            'productCategory': product_details.get('category', 'Unknown'),
            'productWeight': product_details.get('weightKg', 0)
        })

    return enriched

API Gateway / HTTP Endpoint Enrichment

When enrichment is an API Gateway REST or HTTP endpoint, Pipes POSTs the batch as the request body and expects a JSON array response. This is useful for enrichment services that are already deployed as REST APIs — no Lambda wrapper needed.

aws pipes create-pipe \
  --name "orders-enriched-via-api" \
  --source "arn:aws:sqs:us-east-1:123456789012:raw-orders" \
  --enrichment "arn:aws:execute-api:us-east-1:123456789012:abc123def/prod/POST/enrich" \
  --enrichment-parameters '{
    "HttpParameters": {
      "PathParameterValues": [],
      "HeaderParameters": {"Content-Type": "application/json"},
      "QueryStringParameters": {"version": "v2"}
    }
  }' \
  --target "arn:aws:sqs:us-east-1:123456789012:enriched-orders" \
  --role-arn "arn:aws:iam::123456789012:role/PipesRole"

Step Functions Express Workflow Enrichment

For enrichment that requires its own branching logic — such as conditionally calling different APIs based on record content — use a Step Functions Express Workflow as the enrichment stage. Pipes invokes it synchronously (StartSyncExecution), waits for completion, and uses the workflow output as the enriched payload.

Enrichment Timeout: The enrichment call must complete within 5 minutes. For Kinesis and DynamoDB Streams sources, this counts against the iterator age — if enrichment is slow and the stream is high-throughput, iterator age will grow. Monitor the EnrichmentDuration CloudWatch metric and set alarms at 60 seconds to catch enrichment bottlenecks early.

Real-World Patterns

1. CDC: DynamoDB → Pipe → OpenSearch Sync

Change Data Capture (CDC) keeps a search index in sync with a primary database. The Pipe listens to DynamoDB Streams, filters to INSERT and MODIFY events (ignoring REMOVE), enriches with full item data, and writes to an OpenSearch endpoint via API Destination.

# Filter: only INSERT and MODIFY, ignore deletes
FILTER='{
  "Filters": [{
    "Pattern": "{\"eventName\":[\"INSERT\",\"MODIFY\"]}"
  }]
}'

# API Destination targeting OpenSearch bulk API
aws events create-connection \
  --name opensearch-conn \
  --authorization-type API_KEY \
  --auth-parameters '{
    "ApiKeyAuthParameters": {
      "ApiKeyName": "Authorization",
      "ApiKeyValue": "Basic "
    }
  }'

aws events create-api-destination \
  --name opensearch-bulk \
  --connection-arn arn:aws:events:us-east-1:123456789012:connection/opensearch-conn \
  --invocation-endpoint https://my-opensearch-domain.us-east-1.es.amazonaws.com/orders/_bulk \
  --http-method POST \
  --invocation-rate-limit-per-second 300

2. Order Processing: SQS → Pipe → Step Functions

Raw order messages arrive in SQS. The Pipe filters to PENDING orders only, enriches with customer fraud score (Lambda enrichment), and starts a Step Functions Standard Workflow for each order. The workflow handles payment authorization, inventory reservation, and fulfillment — all with built-in retries and error handling.

aws pipes create-pipe \
  --name "pending-orders-to-sfn" \
  --source "arn:aws:sqs:us-east-1:123456789012:order-intake-queue" \
  --source-parameters '{
    "SqsQueueParameters": {"BatchSize": 1},
    "FilterCriteria": {
      "Filters": [{"Pattern": "{\"body\":{\"status\":[\"PENDING\"]}}"}]
    }
  }' \
  --enrichment "arn:aws:lambda:us-east-1:123456789012:function:fraud-score-enricher" \
  --target "arn:aws:states:us-east-1:123456789012:stateMachine:OrderProcessing" \
  --target-parameters '{
    "StepFunctionStateMachineParameters": {
      "InvocationMode": "FIRE_AND_FORGET"
    }
  }' \
  --role-arn "arn:aws:iam::123456789012:role/PipesOrderRole"

3. IoT Data Pipeline: Kinesis → Pipe → DynamoDB

IoT sensors publish telemetry to a Kinesis stream at high volume. The Pipe filters to critical alert readings only (temperature above threshold), enriches by resolving the device ID to a facility name, and writes directly to DynamoDB for real-time dashboard queries.

# Enrichment Lambda for IoT pipe
import json
import boto3

devices_table = boto3.resource('dynamodb').Table('device-registry')

def lambda_handler(event, context):
    enriched = []
    for record in event:
        # Kinesis records have base64-encoded data
        import base64
        data = json.loads(base64.b64decode(record['kinesis']['data']))

        device_id = data.get('deviceId')
        facility = 'unknown'
        alert_level = 'LOW'

        if device_id:
            resp = devices_table.get_item(Key={'deviceId': device_id})
            if 'Item' in resp:
                facility = resp['Item'].get('facilityName', 'unknown')

        temp = data.get('temperatureCelsius', 0)
        if temp > 90:
            alert_level = 'CRITICAL'
        elif temp > 75:
            alert_level = 'HIGH'
        elif temp > 60:
            alert_level = 'MEDIUM'

        enriched.append({
            'deviceId': device_id,
            'facilityName': facility,
            'temperatureCelsius': temp,
            'alertLevel': alert_level,
            'timestamp': data.get('timestamp'),
            'rawData': data
        })

    return enriched

Terraform Setup

The Terraform AWS provider has supported aws_pipes_pipe since version 4.56. Here is a production-ready Pipes configuration for the SQS → Lambda pattern including IAM roles, CloudWatch log group, and dead-letter queue.

terraform {
  required_providers {
    aws = { source = "hashicorp/aws", version = "~> 5.0" }
  }
}

# --- SQS Source Queue ---
resource "aws_sqs_queue" "source" {
  name                      = "order-intake"
  message_retention_seconds = 86400
  redrive_policy = jsonencode({
    deadLetterTargetArn = aws_sqs_queue.dlq.arn
    maxReceiveCount     = 3
  })
}

resource "aws_sqs_queue" "dlq" {
  name = "order-intake-dlq"
}

# --- IAM Role for Pipes ---
data "aws_iam_policy_document" "pipes_assume" {
  statement {
    effect  = "Allow"
    actions = ["sts:AssumeRole"]
    principals {
      type        = "Service"
      identifiers = ["pipes.amazonaws.com"]
    }
    condition {
      test     = "StringEquals"
      variable = "aws:SourceAccount"
      values   = [data.aws_caller_identity.current.account_id]
    }
  }
}

resource "aws_iam_role" "pipes_role" {
  name               = "eventbridge-pipes-orders-role"
  assume_role_policy = data.aws_iam_policy_document.pipes_assume.json
}

data "aws_iam_policy_document" "pipes_policy" {
  # Source: SQS read permissions
  statement {
    effect  = "Allow"
    actions = [
      "sqs:ReceiveMessage",
      "sqs:DeleteMessage",
      "sqs:GetQueueAttributes"
    ]
    resources = [aws_sqs_queue.source.arn]
  }

  # Enrichment: Lambda invoke
  statement {
    effect    = "Allow"
    actions   = ["lambda:InvokeFunction"]
    resources = [aws_lambda_function.enrichment.arn]
  }

  # Target: Lambda invoke
  statement {
    effect    = "Allow"
    actions   = ["lambda:InvokeFunction"]
    resources = [aws_lambda_function.target.arn]
  }

  # Logging: CloudWatch
  statement {
    effect  = "Allow"
    actions = [
      "logs:CreateLogStream",
      "logs:PutLogEvents"
    ]
    resources = ["${aws_cloudwatch_log_group.pipes_logs.arn}:*"]
  }
}

resource "aws_iam_role_policy" "pipes_policy" {
  name   = "pipes-policy"
  role   = aws_iam_role.pipes_role.id
  policy = data.aws_iam_policy_document.pipes_policy.json
}

data "aws_caller_identity" "current" {}

# --- CloudWatch Log Group ---
resource "aws_cloudwatch_log_group" "pipes_logs" {
  name              = "/aws/pipes/orders-sqs-to-lambda"
  retention_in_days = 14
}

# --- EventBridge Pipe ---
resource "aws_pipes_pipe" "orders" {
  name     = "orders-sqs-to-lambda"
  role_arn = aws_iam_role.pipes_role.arn
  source   = aws_sqs_queue.source.arn
  target   = aws_lambda_function.target.arn

  source_parameters {
    sqs_queue_parameters {
      batch_size                         = 10
      maximum_batching_window_in_seconds = 5
    }

    filter_criteria {
      filter {
        pattern = jsonencode({
          body = {
            orderStatus = ["PENDING"]
          }
        })
      }
    }
  }

  enrichment = aws_lambda_function.enrichment.arn

  enrichment_parameters {
    # No additional HTTP parameters needed for Lambda enrichment
  }

  target_parameters {
    lambda_function_parameters {
      invocation_mode = "FIRE_AND_FORGET"
    }
  }

  log_configuration {
    cloudwatch_logs_log_destination {
      log_group_arn = aws_cloudwatch_log_group.pipes_logs.arn
    }
    level = "ERROR"  # Options: OFF, ERROR, INFO, TRACE
  }

  tags = {
    Team        = "platform"
    Environment = "production"
    ManagedBy   = "terraform"
  }
}

# --- Lambda placeholder (your actual Lambda resources) ---
resource "aws_lambda_function" "enrichment" {
  function_name = "order-enrichment"
  # ... runtime, handler, role, filename
}

resource "aws_lambda_function" "target" {
  function_name = "order-processor"
  # ... runtime, handler, role, filename
}
IAM Condition Best Practice: The aws:SourceAccount condition on the Pipes assume-role policy prevents confused deputy attacks — it ensures only Pipes from your own account can assume the role, not Pipes in another account that might reference your resources.

Monitoring and Observability

EventBridge Pipes emits CloudWatch metrics in the AWS/EventBridgePipes namespace. Enable X-Ray tracing and structured logging for end-to-end visibility across the source → enrichment → target path.

Key CloudWatch Metrics

MetricDescriptionAlarm Threshold
ExecutionThrottledExecutions that were throttled due to concurrency limits> 0 for 5 minutes
ExecutionFailedExecutions that failed after retries> 0 for any period
ExecutionStartedTotal executions started (throughput metric)Baseline alert for anomaly
EnrichmentDurationTime spent in enrichment stage (ms)> 30000ms (30s)
IteratorAge (Kinesis/DDB)Age of the oldest record in the batch> 60000ms (1 min)
NumberOfMessagesSentRecords successfully delivered to targetDrops >20% from baseline

CloudWatch Alarms via CLI

# Alarm: any execution failure
aws cloudwatch put-metric-alarm \
  --alarm-name "pipes-orders-execution-failed" \
  --namespace "AWS/EventBridgePipes" \
  --metric-name "ExecutionFailed" \
  --dimensions "Name=PipeName,Value=orders-sqs-to-lambda" \
  --statistic Sum \
  --period 300 \
  --threshold 1 \
  --comparison-operator GreaterThanOrEqualToThreshold \
  --evaluation-periods 1 \
  --alarm-actions "arn:aws:sns:us-east-1:123456789012:ops-alerts"

# Alarm: high iterator age (Kinesis source falling behind)
aws cloudwatch put-metric-alarm \
  --alarm-name "pipes-iot-iterator-age-high" \
  --namespace "AWS/EventBridgePipes" \
  --metric-name "IteratorAge" \
  --dimensions "Name=PipeName,Value=iot-kinesis-to-dynamodb" \
  --statistic Maximum \
  --period 60 \
  --threshold 60000 \
  --comparison-operator GreaterThanThreshold \
  --evaluation-periods 3 \
  --alarm-actions "arn:aws:sns:us-east-1:123456789012:ops-alerts"

Enable X-Ray Tracing

aws pipes update-pipe \
  --name "orders-sqs-to-lambda" \
  --desired-state RUNNING \
  --source-parameters '{}' \
  --log-configuration '{
    "CloudwatchLogsLogDestination": {
      "LogGroupArn": "arn:aws:logs:us-east-1:123456789012:log-group:/aws/pipes/orders-sqs-to-lambda"
    },
    "Level": "INFO",
    "IncludeExecutionData": ["ALL"]
  }'
# Note: X-Ray is enabled at the Lambda function level for enrichment/target Lambdas
# Set TracingConfig.Mode = Active on each Lambda to get end-to-end traces

Structured Logging — What Pipes Logs

When logging is set to INFO or TRACE, Pipes writes structured JSON to the CloudWatch log group for each execution. Each log entry includes:

  • executionId — unique ID for the pipe execution (correlates source → enrichment → target)
  • status — STARTED, ENRICHMENT_SUCCESS, ENRICHMENT_ERROR, TARGET_SUCCESS, TARGET_ERROR
  • source — source ARN and partition/shard identifier
  • batchSize — number of records in this execution
  • filteredCount — records dropped by filter criteria
  • enrichmentDuration — milliseconds spent in enrichment
Cost Warning: Setting log level to TRACE includes the full record payload in every log entry. For high-volume pipes processing large records, this can generate significant CloudWatch Logs costs. Use ERROR in production and switch to INFO only during debugging.

FAQ

Q: What is the maximum batch size for each source?

SQS: up to 10,000 messages per batch. Kinesis: up to 10,000 records or 6 MB, whichever is smaller. DynamoDB Streams: up to 10,000 records or 6 MB. MSK / self-managed Kafka: up to 10,000 records or 6 MB. In practice, keep batches small enough that your enrichment Lambda completes well within its timeout, and your target can handle the full batch atomically.

Q: Does Pipes support FIFO SQS queues?

Yes, both standard and FIFO SQS queues are supported as sources and targets. For FIFO sources, Pipes processes messages in order per message group and maintains exactly-once delivery semantics with the target.

Q: What happens if the enrichment Lambda throws an exception?

Pipes treats enrichment failures as pipeline errors and applies the retry policy configured on the source (for Kinesis/DynamoDB Streams) or returns messages to the queue (for SQS). After retries are exhausted, the batch is sent to the DLQ if configured. The enrichment error is logged with the execution ID for debugging.

Q: Can I pause a Pipe without deleting it?

Yes. Set DesiredState to STOPPED: aws pipes update-pipe --name my-pipe --desired-state STOPPED. While stopped, no records are polled from the source. For SQS, messages accumulate in the queue. For Kinesis and DynamoDB Streams, the iterator advances from where it paused when the Pipe resumes, but records within the stream retention period are not lost.

Q: How does Pipes pricing compare to a Lambda trigger?

Pipes costs $0.40 per million events processed (records that pass the filter). A native Lambda SQS trigger has no extra cost beyond Lambda invocation charges. However, Pipes eliminates the glue Lambda entirely — you pay $0.40/M for Pipes instead of paying for Lambda compute time for a function that only filters and forwards. For most workloads, Pipes is cheaper once you account for the eliminated Lambda's duration cost, and dramatically cheaper in developer time.

Quick Reference

Pipes Pricing
$0.40 per million events processed

Supported Sources
SQS, Kinesis, DynamoDB Streams, MSK, MQ, Kafka

Enrichment Types
Lambda, API Gateway, Step Functions Express

Max Batch Size
Up to 10,000 records (source-dependent)

Filter Logic
Multiple filters = OR; multiple conditions in one filter = AND