AWS EventBridge Pipes: Point-to-Point Event Integration Made Simple (2026)
AWS EventBridge Pipes, launched in late 2022 and now a production-grade integration primitive, solves a specific and common problem: you have a stream or queue (SQS, DynamoDB Streams, Kinesis, MSK, MQ) and you want to connect it to a target (Lambda, Step Functions, SQS, EventBridge bus, API destination) — optionally filtering noise and enriching payloads along the way. Before Pipes, this meant writing and maintaining Lambda functions whose only job was to poll a queue, filter records, call an enrichment service, and forward to a target. Pipes does all of that without any glue code, at $0.40 per million events processed.
This guide covers everything: Pipes anatomy, all supported sources and targets, SQS→Lambda, DynamoDB Streams→SQS, Kinesis→DynamoDB, filter criteria syntax, enrichment patterns, real-world use cases, full Terraform configuration, and monitoring. By the end you'll know exactly when to reach for Pipes instead of an Event Bus rule or a Step Functions workflow.
Pipes vs Event Bus vs Step Functions — When Pipes Wins
Choosing the right EventBridge primitive matters for architecture clarity, cost, and operational simplicity. The three options — Event Bus rules, EventBridge Pipes, and Step Functions — overlap in surface area but serve different patterns.
| Dimension | Event Bus Rule | EventBridge Pipes | Step Functions |
|---|---|---|---|
| Topology | 1 bus → N targets (fan-out) | 1 source → 1 target (point-to-point) | Orchestrator calling N services |
| Source types | Events put onto the bus | SQS, Kinesis, DynamoDB Streams, MSK, MQ, self-managed Kafka | Manual start, API Gateway, EventBridge rule |
| Built-in polling | No — must use event trigger | Yes — managed polling of streams/queues | No |
| Filtering | Rich JSON pattern on bus | Filter criteria before enrichment and target | Choice/condition states inside workflow |
| Enrichment | No built-in enrichment | Lambda / API GW / Step Functions sync | Every step can call any service |
| Glue code needed | None (for routing) | None (for polling + filter + enrich + route) | Yes — define every state explicitly |
| Cost model | $1.00/M events (custom bus) | $0.40/M events processed | $0.025/1000 state transitions |
| Max targets | 5 per rule | 1 (by design) | Unlimited via states |
| When to use | Fan-out, cross-account routing, SaaS events | Stream/queue → single consumer with filter + enrich | Multi-step orchestration, long-running workflows |
Key differentiator: Event Bus rules don't poll. They react to events already on the bus. To get events from SQS or DynamoDB Streams onto a bus you'd need a Lambda trigger — and Pipes is that trigger, but fully managed with no code. Step Functions is the right choice when you need branching logic, retries across multiple services, or workflows that span minutes or hours.
Pipe Anatomy: Source → Filter → Enrich → Target
A Pipe is a fully managed, serverless pipeline with four stages. Only source and target are required; filter and enrichment are optional.
┌──────────────┐ ┌──────────────┐ ┌──────────────────┐ ┌──────────────┐
│ SOURCE │───▶│ FILTER │───▶│ ENRICHMENT │───▶│ TARGET │
│ (polling) │ │ (drop noise) │ │ (Lambda/APIGW/ │ │ (invoke, │
│ │ │ │ │ Step Functions) │ │ send, put) │
└──────────────┘ └──────────────┘ └──────────────────┘ └──────────────┘
Stage 1 — Source: Pipes polls the source on your behalf. You configure batch size, batch window, starting position (for Kinesis/DynamoDB), and concurrency. The managed poller handles checkpointing, retries, and shard/partition scaling.
Stage 2 — Filter: Before any enrichment or target invocation, Pipes evaluates filter criteria against each record in the batch. Records that don't match are silently dropped. This is the cheapest way to reduce downstream invocations — you're billed only for records that pass the filter.
Stage 3 — Enrichment (optional): The filtered batch is sent synchronously to a Lambda function, API Gateway REST/HTTP endpoint, or a Step Functions Express Workflow. The enrichment service receives the batch, augments the records (e.g., fetches additional data from DynamoDB or a third-party API), and returns the enriched payload. The enriched payload is what the target receives.
Stage 4 — Target: The enriched records are delivered to the target. Pipes supports a wide range of targets.
| Supported Sources | Supported Targets |
|---|---|
| Amazon SQS (standard & FIFO) | AWS Lambda |
| Amazon Kinesis Data Streams | Amazon SQS (standard & FIFO) |
| Amazon DynamoDB Streams | Amazon SNS |
| Amazon MSK (managed Kafka) | Amazon Kinesis Data Streams |
| Self-managed Apache Kafka | Amazon EventBridge event bus |
| Amazon MQ (ActiveMQ / RabbitMQ) | Amazon EventBridge API Destination |
| AWS Step Functions (Express or Standard) | |
| Amazon ECS task (run task) | |
| Amazon API Gateway REST / HTTP endpoint | |
| AWS Batch job queue | |
| Amazon SageMaker Pipeline | |
| Amazon Redshift Data API |
SQS → Lambda Pipe
The most common Pipes pattern replaces a manually-wired SQS trigger on a Lambda function. The managed Pipes version adds built-in filtering and enrichment that would otherwise require code inside the Lambda.
Creating the Pipe via AWS CLI
# 1. Create an IAM role that Pipes can assume
aws iam create-role \
--role-name EventBridgePipesSQSRole \
--assume-role-policy-document '{
"Version": "2012-10-17",
"Statement": [{
"Effect": "Allow",
"Principal": {"Service": "pipes.amazonaws.com"},
"Action": "sts:AssumeRole"
}]
}'
# 2. Attach permissions for SQS read and Lambda invoke
aws iam put-role-policy \
--role-name EventBridgePipesSQSRole \
--policy-name PipesPolicy \
--policy-document '{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"sqs:ReceiveMessage",
"sqs:DeleteMessage",
"sqs:GetQueueAttributes"
],
"Resource": "arn:aws:sqs:us-east-1:123456789012:my-orders-queue"
},
{
"Effect": "Allow",
"Action": "lambda:InvokeFunction",
"Resource": "arn:aws:lambda:us-east-1:123456789012:function:process-order"
}
]
}'
# 3. Create the pipe with filter criteria
aws pipes create-pipe \
--name "orders-sqs-to-lambda" \
--role-arn "arn:aws:iam::123456789012:role/EventBridgePipesSQSRole" \
--source "arn:aws:sqs:us-east-1:123456789012:my-orders-queue" \
--source-parameters '{
"SqsQueueParameters": {
"BatchSize": 10,
"MaximumBatchingWindowInSeconds": 5
},
"FilterCriteria": {
"Filters": [{
"Pattern": "{\"body\":{\"orderStatus\":[\"PENDING\"]}}"
}]
}
}' \
--target "arn:aws:lambda:us-east-1:123456789012:function:process-order" \
--target-parameters '{
"LambdaFunctionParameters": {
"InvocationMode": "FIRE_AND_FORGET"
}
}'
Dead-Letter Queue Configuration
For SQS sources, configure a DLQ directly on the SQS queue (not on the Pipe). Messages that fail Lambda processing are returned to the queue, retried up to maxReceiveCount times, then moved to the DLQ. For Kinesis and DynamoDB Streams sources, configure the DLQ on the Pipe's source parameters.
# Set DLQ on the SQS queue itself (recommended for SQS sources)
aws sqs set-queue-attributes \
--queue-url https://sqs.us-east-1.amazonaws.com/123456789012/my-orders-queue \
--attributes '{
"RedrivePolicy": "{\"deadLetterTargetArn\":\"arn:aws:sqs:us-east-1:123456789012:orders-dlq\",\"maxReceiveCount\":\"3\"}"
}'
FIRE_AND_FORGET for async Lambda invocation (higher throughput, no response needed). Use REQUEST_RESPONSE when the Lambda response will be used — this is required for enrichment Lambdas. For targets, FIRE_AND_FORGET is typically correct; for enrichment, Pipes always uses synchronous invocation internally.DynamoDB Streams → EventBridge Pipe
DynamoDB Streams emit change records for every item write. A common problem: you want to process only INSERT events (ignoring MODIFY and REMOVE), and enrich the record with additional attributes before forwarding downstream. Without Pipes, this requires a Lambda trigger with conditional logic. With Pipes, the filter handles the INSERT-only requirement, and enrichment handles the data augmentation — zero glue code.
Stream View Type
Choose the right stream view type when enabling DynamoDB Streams. This cannot be changed without disabling and re-enabling the stream:
NEW_IMAGE— The item after the change (sufficient for INSERT and MODIFY processing)OLD_IMAGE— The item before the change (useful for REMOVE and audit logs)NEW_AND_OLD_IMAGES— Both images (most flexible, higher data volume)KEYS_ONLY— Only the primary key (lowest overhead; use when you'll fetch full item separately)
# Enable DynamoDB Streams with NEW_IMAGE view
aws dynamodb update-table \
--table-name orders \
--stream-specification StreamEnabled=true,StreamViewType=NEW_IMAGE
# Get the stream ARN
aws dynamodb describe-table --table-name orders \
--query 'Table.LatestStreamArn' --output text
Pipe Configuration — INSERT Only, Enrichment to Lambda, Target to SQS
aws pipes create-pipe \
--name "orders-dynamodb-to-sqs" \
--role-arn "arn:aws:iam::123456789012:role/EventBridgePipesDDBRole" \
--source "arn:aws:dynamodb:us-east-1:123456789012:table/orders/stream/2026-06-09T00:00:00.000" \
--source-parameters '{
"DynamoDBStreamParameters": {
"StartingPosition": "TRIM_HORIZON",
"BatchSize": 25,
"MaximumBatchingWindowInSeconds": 10,
"MaximumRetryAttempts": 3,
"DeadLetterConfig": {
"Arn": "arn:aws:sqs:us-east-1:123456789012:orders-ddb-dlq"
}
},
"FilterCriteria": {
"Filters": [{
"Pattern": "{\"dynamodb\":{\"NewImage\":{\"status\":{\"S\":[\"PLACED\"]}},\"StreamViewType\":[\"NEW_IMAGE\"]},\"eventName\":[\"INSERT\"]}"
}]
}
}' \
--enrichment "arn:aws:lambda:us-east-1:123456789012:function:enrich-order" \
--target "arn:aws:sqs:us-east-1:123456789012:downstream-orders-queue"
Enrichment Lambda (Python)
import json
import boto3
dynamodb = boto3.resource('dynamodb')
customers_table = dynamodb.Table('customers')
def lambda_handler(event, context):
"""
Enrichment Lambda receives a list of DynamoDB stream records.
Must return a list of the same length — each item is the enriched payload
for the corresponding input record. Return None for a record to drop it.
"""
enriched_records = []
for record in event:
new_image = record.get('dynamodb', {}).get('NewImage', {})
customer_id = new_image.get('customerId', {}).get('S')
order_id = new_image.get('orderId', {}).get('S')
amount = float(new_image.get('totalAmount', {}).get('N', '0'))
# Fetch customer tier from a separate table
customer_tier = 'standard'
if customer_id:
response = customers_table.get_item(Key={'customerId': customer_id})
if 'Item' in response:
customer_tier = response['Item'].get('tier', 'standard')
enriched_records.append({
'orderId': order_id,
'customerId': customer_id,
'totalAmount': amount,
'customerTier': customer_tier,
'priorityProcessing': customer_tier == 'premium',
'originalRecord': record
})
return enriched_records
{"skip": true}) for records you want to pass through unchanged, and handle it in the target Lambda.Kinesis → Pipe
Kinesis Data Streams are the highest-throughput source for Pipes. The key configuration choices are starting position, batch size, bisect-on-error, and partial batch response — all of which affect how failures behave.
aws pipes create-pipe \
--name "iot-kinesis-to-dynamodb" \
--role-arn "arn:aws:iam::123456789012:role/EventBridgePipesKinesisRole" \
--source "arn:aws:kinesis:us-east-1:123456789012:stream/iot-telemetry" \
--source-parameters '{
"KinesisStreamParameters": {
"StartingPosition": "LATEST",
"BatchSize": 100,
"MaximumBatchingWindowInSeconds": 30,
"MaximumRetryAttempts": 5,
"BisectBatchOnFunctionError": true,
"ParallelizationFactor": 2,
"DeadLetterConfig": {
"Arn": "arn:aws:sqs:us-east-1:123456789012:iot-dlq"
}
},
"FilterCriteria": {
"Filters": [{
"Pattern": "{\"data\":{\"sensorType\":[\"TEMPERATURE\",\"HUMIDITY\"]}}"
}]
}
}' \
--target "arn:aws:dynamodb:us-east-1:123456789012:table/iot-readings" \
--target-parameters '{
"DynamoDBParameters": {
"TableName": "iot-readings"
}
}'
Starting Position options:
LATEST— Process only new records (use for live data; skips historical backlog)TRIM_HORIZON— Process all records in the stream (use for backfill or initial load)AT_TIMESTAMP— Start from a specific point in time (use for recovery scenarios)
BisectBatchOnFunctionError: When a batch fails, Pipes splits it in half and retries each half independently. This isolates poison-pill records efficiently — instead of retrying a batch of 100 records for every failure caused by one bad record, bisection will isolate the bad record in O(log N) iterations and send only it to the DLQ.
{"batchItemFailures": [{"itemIdentifier": "shardId-sequence"}]}. This prevents successfully processed records from being retried while re-driving only the failed ones. Pipes supports this natively — set FunctionResponseTypes: ["ReportBatchItemFailures"] in the source parameters.Filter Criteria Syntax
Pipes filter criteria use the same JSON pattern syntax as EventBridge event bus rules. Filters are evaluated before enrichment — records that don't match are dropped and never billed as enrichment or target invocations. This is the most cost-effective optimization in a Pipes pipeline.
Basic Equality Filter
{
"Filters": [{
"Pattern": "{\"body\":{\"eventType\":[\"ORDER_PLACED\"]}}"
}]
}
Multiple Values (OR within a field)
{
"Filters": [{
"Pattern": "{\"body\":{\"status\":[\"PENDING\",\"PROCESSING\"]}}"
}]
}
AND Conditions (multiple fields in same filter)
{
"Filters": [{
"Pattern": "{\"body\":{\"region\":[\"us-east-1\"],\"amount\":[{\"numeric\":[\">=\",100]}]}}"
}]
}
Numeric Comparisons
{
"Filters": [{
"Pattern": "{\"body\":{\"amount\":[{\"numeric\":[\">\",0,\"<=\",10000]}]}}"
}]
}
Supported numeric operators: =, !=, >, >=, <, <=. Combine two into a range with a single array: [">", 0, "<=", 1000].
Exists / Not-Exists Patterns
{
"Filters": [{
"Pattern": "{\"body\":{\"couponCode\":[{\"exists\":true}]}}"
}]
}
// Drop records where errorCode is present
{
"Filters": [{
"Pattern": "{\"body\":{\"errorCode\":[{\"exists\":false}]}}"
}]
}
Prefix Matching
{
"Filters": [{
"Pattern": "{\"body\":{\"orderId\":[{\"prefix\":\"ORD-2026\"}]}}"
}]
}
OR across Filters (multiple filter objects)
Multiple filter objects in the Filters array are evaluated with OR logic — a record passes if it matches ANY filter.
{
"Filters": [
{"Pattern": "{\"body\":{\"eventType\":[\"ORDER_PLACED\"]}}"},
{"Pattern": "{\"body\":{\"eventType\":[\"PAYMENT_RECEIVED\"]}}"}
]
}
Testing Filters Without Deploying
aws pipes test-event-pattern \
--event-pattern '{"body":{"eventType":["ORDER_PLACED"]}}' \
--event '{"body":{"eventType":"ORDER_PLACED","orderId":"ORD-001"}}'
# Output: {"Result": "MATCHED"}
Enrichment: Lambda, API Gateway, Step Functions
Enrichment is the optional middle stage where Pipes pauses the pipeline, calls a service synchronously, waits for a response, and uses that response as the input to the target. Three enrichment types are supported.
Lambda Enrichment
The enrichment Lambda receives the filtered batch as a JSON array. It must return a JSON array of the same length. Pipes enforces the one-to-one mapping — the Nth output element becomes the input to the target for the Nth input record.
import json
import boto3
import requests
ssm = boto3.client('ssm')
def lambda_handler(event, context):
"""
Enrichment Lambda: augments SQS messages with product catalog data.
event = list of SQS message objects from the Pipe
"""
# Fetch config from SSM (cached after first call via Lambda runtime reuse)
catalog_api_url = ssm.get_parameter(Name='/myapp/catalog-api-url')['Parameter']['Value']
enriched = []
for record in event:
body = json.loads(record['body'])
product_id = body.get('productId')
# Call external catalog API
product_details = {}
if product_id:
resp = requests.get(f"{catalog_api_url}/products/{product_id}", timeout=2)
if resp.status_code == 200:
product_details = resp.json()
enriched.append({
**body,
'productName': product_details.get('name', 'Unknown'),
'productCategory': product_details.get('category', 'Unknown'),
'productWeight': product_details.get('weightKg', 0)
})
return enriched
API Gateway / HTTP Endpoint Enrichment
When enrichment is an API Gateway REST or HTTP endpoint, Pipes POSTs the batch as the request body and expects a JSON array response. This is useful for enrichment services that are already deployed as REST APIs — no Lambda wrapper needed.
aws pipes create-pipe \
--name "orders-enriched-via-api" \
--source "arn:aws:sqs:us-east-1:123456789012:raw-orders" \
--enrichment "arn:aws:execute-api:us-east-1:123456789012:abc123def/prod/POST/enrich" \
--enrichment-parameters '{
"HttpParameters": {
"PathParameterValues": [],
"HeaderParameters": {"Content-Type": "application/json"},
"QueryStringParameters": {"version": "v2"}
}
}' \
--target "arn:aws:sqs:us-east-1:123456789012:enriched-orders" \
--role-arn "arn:aws:iam::123456789012:role/PipesRole"
Step Functions Express Workflow Enrichment
For enrichment that requires its own branching logic — such as conditionally calling different APIs based on record content — use a Step Functions Express Workflow as the enrichment stage. Pipes invokes it synchronously (StartSyncExecution), waits for completion, and uses the workflow output as the enriched payload.
EnrichmentDuration CloudWatch metric and set alarms at 60 seconds to catch enrichment bottlenecks early.Real-World Patterns
1. CDC: DynamoDB → Pipe → OpenSearch Sync
Change Data Capture (CDC) keeps a search index in sync with a primary database. The Pipe listens to DynamoDB Streams, filters to INSERT and MODIFY events (ignoring REMOVE), enriches with full item data, and writes to an OpenSearch endpoint via API Destination.
# Filter: only INSERT and MODIFY, ignore deletes
FILTER='{
"Filters": [{
"Pattern": "{\"eventName\":[\"INSERT\",\"MODIFY\"]}"
}]
}'
# API Destination targeting OpenSearch bulk API
aws events create-connection \
--name opensearch-conn \
--authorization-type API_KEY \
--auth-parameters '{
"ApiKeyAuthParameters": {
"ApiKeyName": "Authorization",
"ApiKeyValue": "Basic "
}
}'
aws events create-api-destination \
--name opensearch-bulk \
--connection-arn arn:aws:events:us-east-1:123456789012:connection/opensearch-conn \
--invocation-endpoint https://my-opensearch-domain.us-east-1.es.amazonaws.com/orders/_bulk \
--http-method POST \
--invocation-rate-limit-per-second 300
2. Order Processing: SQS → Pipe → Step Functions
Raw order messages arrive in SQS. The Pipe filters to PENDING orders only, enriches with customer fraud score (Lambda enrichment), and starts a Step Functions Standard Workflow for each order. The workflow handles payment authorization, inventory reservation, and fulfillment — all with built-in retries and error handling.
aws pipes create-pipe \
--name "pending-orders-to-sfn" \
--source "arn:aws:sqs:us-east-1:123456789012:order-intake-queue" \
--source-parameters '{
"SqsQueueParameters": {"BatchSize": 1},
"FilterCriteria": {
"Filters": [{"Pattern": "{\"body\":{\"status\":[\"PENDING\"]}}"}]
}
}' \
--enrichment "arn:aws:lambda:us-east-1:123456789012:function:fraud-score-enricher" \
--target "arn:aws:states:us-east-1:123456789012:stateMachine:OrderProcessing" \
--target-parameters '{
"StepFunctionStateMachineParameters": {
"InvocationMode": "FIRE_AND_FORGET"
}
}' \
--role-arn "arn:aws:iam::123456789012:role/PipesOrderRole"
3. IoT Data Pipeline: Kinesis → Pipe → DynamoDB
IoT sensors publish telemetry to a Kinesis stream at high volume. The Pipe filters to critical alert readings only (temperature above threshold), enriches by resolving the device ID to a facility name, and writes directly to DynamoDB for real-time dashboard queries.
# Enrichment Lambda for IoT pipe
import json
import boto3
devices_table = boto3.resource('dynamodb').Table('device-registry')
def lambda_handler(event, context):
enriched = []
for record in event:
# Kinesis records have base64-encoded data
import base64
data = json.loads(base64.b64decode(record['kinesis']['data']))
device_id = data.get('deviceId')
facility = 'unknown'
alert_level = 'LOW'
if device_id:
resp = devices_table.get_item(Key={'deviceId': device_id})
if 'Item' in resp:
facility = resp['Item'].get('facilityName', 'unknown')
temp = data.get('temperatureCelsius', 0)
if temp > 90:
alert_level = 'CRITICAL'
elif temp > 75:
alert_level = 'HIGH'
elif temp > 60:
alert_level = 'MEDIUM'
enriched.append({
'deviceId': device_id,
'facilityName': facility,
'temperatureCelsius': temp,
'alertLevel': alert_level,
'timestamp': data.get('timestamp'),
'rawData': data
})
return enriched
Terraform Setup
The Terraform AWS provider has supported aws_pipes_pipe since version 4.56. Here is a production-ready Pipes configuration for the SQS → Lambda pattern including IAM roles, CloudWatch log group, and dead-letter queue.
terraform {
required_providers {
aws = { source = "hashicorp/aws", version = "~> 5.0" }
}
}
# --- SQS Source Queue ---
resource "aws_sqs_queue" "source" {
name = "order-intake"
message_retention_seconds = 86400
redrive_policy = jsonencode({
deadLetterTargetArn = aws_sqs_queue.dlq.arn
maxReceiveCount = 3
})
}
resource "aws_sqs_queue" "dlq" {
name = "order-intake-dlq"
}
# --- IAM Role for Pipes ---
data "aws_iam_policy_document" "pipes_assume" {
statement {
effect = "Allow"
actions = ["sts:AssumeRole"]
principals {
type = "Service"
identifiers = ["pipes.amazonaws.com"]
}
condition {
test = "StringEquals"
variable = "aws:SourceAccount"
values = [data.aws_caller_identity.current.account_id]
}
}
}
resource "aws_iam_role" "pipes_role" {
name = "eventbridge-pipes-orders-role"
assume_role_policy = data.aws_iam_policy_document.pipes_assume.json
}
data "aws_iam_policy_document" "pipes_policy" {
# Source: SQS read permissions
statement {
effect = "Allow"
actions = [
"sqs:ReceiveMessage",
"sqs:DeleteMessage",
"sqs:GetQueueAttributes"
]
resources = [aws_sqs_queue.source.arn]
}
# Enrichment: Lambda invoke
statement {
effect = "Allow"
actions = ["lambda:InvokeFunction"]
resources = [aws_lambda_function.enrichment.arn]
}
# Target: Lambda invoke
statement {
effect = "Allow"
actions = ["lambda:InvokeFunction"]
resources = [aws_lambda_function.target.arn]
}
# Logging: CloudWatch
statement {
effect = "Allow"
actions = [
"logs:CreateLogStream",
"logs:PutLogEvents"
]
resources = ["${aws_cloudwatch_log_group.pipes_logs.arn}:*"]
}
}
resource "aws_iam_role_policy" "pipes_policy" {
name = "pipes-policy"
role = aws_iam_role.pipes_role.id
policy = data.aws_iam_policy_document.pipes_policy.json
}
data "aws_caller_identity" "current" {}
# --- CloudWatch Log Group ---
resource "aws_cloudwatch_log_group" "pipes_logs" {
name = "/aws/pipes/orders-sqs-to-lambda"
retention_in_days = 14
}
# --- EventBridge Pipe ---
resource "aws_pipes_pipe" "orders" {
name = "orders-sqs-to-lambda"
role_arn = aws_iam_role.pipes_role.arn
source = aws_sqs_queue.source.arn
target = aws_lambda_function.target.arn
source_parameters {
sqs_queue_parameters {
batch_size = 10
maximum_batching_window_in_seconds = 5
}
filter_criteria {
filter {
pattern = jsonencode({
body = {
orderStatus = ["PENDING"]
}
})
}
}
}
enrichment = aws_lambda_function.enrichment.arn
enrichment_parameters {
# No additional HTTP parameters needed for Lambda enrichment
}
target_parameters {
lambda_function_parameters {
invocation_mode = "FIRE_AND_FORGET"
}
}
log_configuration {
cloudwatch_logs_log_destination {
log_group_arn = aws_cloudwatch_log_group.pipes_logs.arn
}
level = "ERROR" # Options: OFF, ERROR, INFO, TRACE
}
tags = {
Team = "platform"
Environment = "production"
ManagedBy = "terraform"
}
}
# --- Lambda placeholder (your actual Lambda resources) ---
resource "aws_lambda_function" "enrichment" {
function_name = "order-enrichment"
# ... runtime, handler, role, filename
}
resource "aws_lambda_function" "target" {
function_name = "order-processor"
# ... runtime, handler, role, filename
}
aws:SourceAccount condition on the Pipes assume-role policy prevents confused deputy attacks — it ensures only Pipes from your own account can assume the role, not Pipes in another account that might reference your resources.Monitoring and Observability
EventBridge Pipes emits CloudWatch metrics in the AWS/EventBridgePipes namespace. Enable X-Ray tracing and structured logging for end-to-end visibility across the source → enrichment → target path.
Key CloudWatch Metrics
| Metric | Description | Alarm Threshold |
|---|---|---|
| ExecutionThrottled | Executions that were throttled due to concurrency limits | > 0 for 5 minutes |
| ExecutionFailed | Executions that failed after retries | > 0 for any period |
| ExecutionStarted | Total executions started (throughput metric) | Baseline alert for anomaly |
| EnrichmentDuration | Time spent in enrichment stage (ms) | > 30000ms (30s) |
| IteratorAge (Kinesis/DDB) | Age of the oldest record in the batch | > 60000ms (1 min) |
| NumberOfMessagesSent | Records successfully delivered to target | Drops >20% from baseline |
CloudWatch Alarms via CLI
# Alarm: any execution failure
aws cloudwatch put-metric-alarm \
--alarm-name "pipes-orders-execution-failed" \
--namespace "AWS/EventBridgePipes" \
--metric-name "ExecutionFailed" \
--dimensions "Name=PipeName,Value=orders-sqs-to-lambda" \
--statistic Sum \
--period 300 \
--threshold 1 \
--comparison-operator GreaterThanOrEqualToThreshold \
--evaluation-periods 1 \
--alarm-actions "arn:aws:sns:us-east-1:123456789012:ops-alerts"
# Alarm: high iterator age (Kinesis source falling behind)
aws cloudwatch put-metric-alarm \
--alarm-name "pipes-iot-iterator-age-high" \
--namespace "AWS/EventBridgePipes" \
--metric-name "IteratorAge" \
--dimensions "Name=PipeName,Value=iot-kinesis-to-dynamodb" \
--statistic Maximum \
--period 60 \
--threshold 60000 \
--comparison-operator GreaterThanThreshold \
--evaluation-periods 3 \
--alarm-actions "arn:aws:sns:us-east-1:123456789012:ops-alerts"
Enable X-Ray Tracing
aws pipes update-pipe \
--name "orders-sqs-to-lambda" \
--desired-state RUNNING \
--source-parameters '{}' \
--log-configuration '{
"CloudwatchLogsLogDestination": {
"LogGroupArn": "arn:aws:logs:us-east-1:123456789012:log-group:/aws/pipes/orders-sqs-to-lambda"
},
"Level": "INFO",
"IncludeExecutionData": ["ALL"]
}'
# Note: X-Ray is enabled at the Lambda function level for enrichment/target Lambdas
# Set TracingConfig.Mode = Active on each Lambda to get end-to-end traces
Structured Logging — What Pipes Logs
When logging is set to INFO or TRACE, Pipes writes structured JSON to the CloudWatch log group for each execution. Each log entry includes:
executionId— unique ID for the pipe execution (correlates source → enrichment → target)status— STARTED, ENRICHMENT_SUCCESS, ENRICHMENT_ERROR, TARGET_SUCCESS, TARGET_ERRORsource— source ARN and partition/shard identifierbatchSize— number of records in this executionfilteredCount— records dropped by filter criteriaenrichmentDuration— milliseconds spent in enrichment
TRACE includes the full record payload in every log entry. For high-volume pipes processing large records, this can generate significant CloudWatch Logs costs. Use ERROR in production and switch to INFO only during debugging.FAQ
Q: What is the maximum batch size for each source?
SQS: up to 10,000 messages per batch. Kinesis: up to 10,000 records or 6 MB, whichever is smaller. DynamoDB Streams: up to 10,000 records or 6 MB. MSK / self-managed Kafka: up to 10,000 records or 6 MB. In practice, keep batches small enough that your enrichment Lambda completes well within its timeout, and your target can handle the full batch atomically.
Q: Does Pipes support FIFO SQS queues?
Yes, both standard and FIFO SQS queues are supported as sources and targets. For FIFO sources, Pipes processes messages in order per message group and maintains exactly-once delivery semantics with the target.
Q: What happens if the enrichment Lambda throws an exception?
Pipes treats enrichment failures as pipeline errors and applies the retry policy configured on the source (for Kinesis/DynamoDB Streams) or returns messages to the queue (for SQS). After retries are exhausted, the batch is sent to the DLQ if configured. The enrichment error is logged with the execution ID for debugging.
Q: Can I pause a Pipe without deleting it?
Yes. Set DesiredState to STOPPED: aws pipes update-pipe --name my-pipe --desired-state STOPPED. While stopped, no records are polled from the source. For SQS, messages accumulate in the queue. For Kinesis and DynamoDB Streams, the iterator advances from where it paused when the Pipe resumes, but records within the stream retention period are not lost.
Q: How does Pipes pricing compare to a Lambda trigger?
Pipes costs $0.40 per million events processed (records that pass the filter). A native Lambda SQS trigger has no extra cost beyond Lambda invocation charges. However, Pipes eliminates the glue Lambda entirely — you pay $0.40/M for Pipes instead of paying for Lambda compute time for a function that only filters and forwards. For most workloads, Pipes is cheaper once you account for the eliminated Lambda's duration cost, and dramatically cheaper in developer time.
Related Articles
Quick Reference
Pipes Pricing
$0.40 per million events processed
Supported Sources
SQS, Kinesis, DynamoDB Streams, MSK, MQ, Kafka
Enrichment Types
Lambda, API Gateway, Step Functions Express
Max Batch Size
Up to 10,000 records (source-dependent)
Filter Logic
Multiple filters = OR; multiple conditions in one filter = AND