AWS Glue ETL: Serverless Data Transformation at Scale
AWS Glue is Amazon's fully managed, serverless ETL (Extract, Transform, Load) service. You write transformation logic once — in PySpark or Scala — and Glue provisions Apache Spark clusters, runs the job, then tears everything down. You pay per second of DPU (Data Processing Unit) usage only when the job is running. No cluster sizing, no Hadoop ops, no patching. When paired with the built-in Data Catalog, Glue becomes the metadata backbone of a modern AWS data lake: crawlers automatically discover schemas in S3, RDS, and Redshift; the Catalog stores the schemas for Athena, Redshift Spectrum, and EMR to query; and Glue ETL jobs transform raw data into clean, optimized Parquet at scale.
This guide goes deep — architecture internals, PySpark job writing, DynamicFrame vs DataFrame trade-offs, incremental processing with job bookmarks, orchestration with Glue Workflows, Data Quality rules, Streaming ETL on Kinesis and MSK, and real performance tuning. Whether you are migrating on-premises ETL workloads, building a fresh data lake, or looking to cut Spark cluster costs, this is the reference you need.
Table of Contents
- Glue Architecture: Data Catalog, Crawlers, Jobs, Triggers, Workflows
- Glue vs Lambda ETL vs EMR vs Databricks — Decision Matrix
- Crawlers: S3, RDS, JDBC — Setup and Partition Handling
- Glue Studio Visual ETL and Generated PySpark Code
- Writing Glue PySpark Jobs: DynamicFrame, Joins, Aggregations, Deduplication
- Job Bookmarks: Incremental Processing and Reset
- Glue Workflows: Orchestration with CLI and Terraform
- Glue Data Quality: DQDL Rules and Quarantine Pattern
- Glue Streaming ETL: Kinesis and MSK Sources
- Performance Tuning: Worker Types, Pushdown Predicates, Cost Optimization
Glue Architecture: Data Catalog, Crawlers, Jobs, Triggers, Workflows
AWS Glue is not a single service — it is a suite of loosely coupled components that work together. Understanding each component's role is the foundation for building reliable pipelines.
Data Catalog
The Glue Data Catalog is a fully managed Hive Metastore-compatible metadata repository. It stores databases, tables, partitions, and schema versions. Every AWS analytics service — Athena, Redshift Spectrum, EMR, Lake Formation — can query the same catalog, making it the single source of truth for your data lake schemas. Tables in the catalog are external: the schema lives in Glue, the data stays in S3 (or RDS, DynamoDB, etc.).
The catalog supports schema versioning. When a crawler detects a column was added or a type changed, it records a new schema version and optionally updates the table in place. You can retrieve any historical version via the AWS CLI or SDK. Each AWS account gets one Data Catalog per region; you can share catalogs across accounts using AWS RAM.
Crawlers
A crawler is a Glue-managed process that reads a data store (S3 prefix, RDS table, JDBC database, DynamoDB table, Delta Lake, etc.), infers the schema, and writes or updates a table definition in the Data Catalog. Crawlers understand partitions — if your S3 data follows the Hive partition convention (s3://bucket/prefix/year=2026/month=06/day=08/), the crawler registers those partition keys automatically. You schedule crawlers via cron or trigger them programmatically from a Glue Workflow.
Glue ETL Jobs
A Glue ETL job is the unit of computation. It runs Apache Spark (PySpark or Scala) on a managed cluster you never touch. You choose the worker type (G.1X, G.2X, G.4X, or Z.2X for memory-intensive), the number of workers, the IAM role, and whether to enable job bookmarks or automatic scaling. The job reads source data (via a Glue connection, S3, or catalog table), transforms it, and writes output to a destination (S3, Redshift, RDS, Kafka, etc.).
Glue jobs come in three flavors: Spark (batch, the most common), Spark Streaming (continuous micro-batch), and Python Shell (lightweight Python scripts for small datasets that don't need Spark). Python Shell jobs are billed at a flat 0.0625 DPU per second — ideal for post-job notifications, small file merges, or API calls.
Triggers
Triggers fire jobs or crawlers based on three event types: Scheduled (cron), On-Demand (manual), or Conditional (when another job or crawler in a Workflow completes with a specified status — SUCCEEDED, FAILED, STOPPED, or ANY). Conditional triggers are how you chain together multi-step pipelines within a Workflow.
Glue Workflows
A Workflow is a directed acyclic graph (DAG) of crawlers, jobs, and triggers. Workflows give you visual orchestration inside the Glue console: you can see which steps succeeded, which failed, and drill into individual run logs. For complex pipelines (crawl raw → transform → crawl curated → aggregate → notify), Workflows replace ad-hoc Lambda state machines or Step Functions with a purpose-built ETL orchestrator.
Glue vs Lambda ETL vs EMR vs Databricks — Decision Matrix
Choosing the right ETL tool on AWS is a frequent source of confusion. Here is a direct comparison across the most common options.
| Dimension | AWS Glue | Lambda ETL | EMR | Databricks on AWS |
|---|---|---|---|---|
| Execution model | Managed Spark (serverless) | Function-as-a-Service | Managed Hadoop/Spark cluster | Managed Spark (collaborative notebooks) |
| Max data size | Petabytes (horizontal scale) | ~100 GB practical limit | Petabytes | Petabytes |
| Max runtime | 48 hours per job | 15 minutes | Indefinite (cluster lifetime) | Indefinite |
| Cold start | 2–4 minutes (Spark init) | <1 second | 3–10 minutes (cluster start) | <30 seconds (serverless); minutes (cluster) |
| Native AWS catalog integration | Built-in (first-class) | Via SDK calls | Native via EMR-Glue catalog link | Via Unity Catalog or Glue integration |
| Incremental processing | Job bookmarks (built-in) | Manual (DynamoDB watermark) | Manual | Delta Lake change data feed |
| Streaming ETL | Glue Streaming (micro-batch) | Via Kinesis trigger | Spark Streaming on EMR | Structured Streaming |
| Cost model | Per DPU-second (no idle cost) | Per invocation + duration | Per EC2 instance-hour (idle cost) | Per DBU-second + EC2 |
| Best for | Batch ETL, data lake pipelines, catalog management | Small files, event-driven transforms, fan-out | Custom Hadoop ecosystem, long-running ML workloads, spot fleets | Data science collaboration, Delta Lake, ML pipelines |
Crawlers: S3, RDS, JDBC — Setup and Partition Handling
Crawlers are often the first Glue component you interact with. A well-configured crawler populates your Data Catalog automatically as new data arrives, eliminating the need to write and maintain DDL by hand.
Creating an S3 Crawler with the AWS CLI
# 1. Create a Glue database to hold the crawled tables
aws glue create-database \
--database-input '{"Name":"raw_data","Description":"Raw landing zone tables"}'
# 2. Create an S3 crawler for a partitioned dataset
aws glue create-crawler \
--name raw-orders-crawler \
--role arn:aws:iam::123456789012:role/GlueServiceRole \
--database-name raw_data \
--targets '{"S3Targets":[{"Path":"s3://my-data-lake/raw/orders/","Exclusions":["**/_temporary/**","**/.spark-staging/**"]}]}' \
--recrawl-policy '{"RecrawlBehavior":"CRAWL_NEW_FOLDERS_ONLY"}' \
--schema-change-policy '{"UpdateBehavior":"UPDATE_IN_DATABASE","DeleteBehavior":"LOG"}' \
--configuration '{"Version":1.0,"CrawlerOutput":{"Partitions":{"AddOrUpdateBehavior":"InheritFromTable"}}}' \
--schedule 'cron(0 2 * * ? *)'
# 3. Start the crawler manually for the first run
aws glue start-crawler --name raw-orders-crawler
# 4. Poll crawler status
aws glue get-crawler --name raw-orders-crawler \
--query 'Crawler.State' --output text
JDBC / RDS Crawler
To crawl a relational database you need a Glue connection that holds the JDBC URL and credentials. Store the password in AWS Secrets Manager and reference it from the connection.
# Create a Glue connection to RDS PostgreSQL
aws glue create-connection \
--connection-input '{
"Name": "rds-postgres-prod",
"ConnectionType": "JDBC",
"ConnectionProperties": {
"JDBC_CONNECTION_URL": "jdbc:postgresql://prod-db.cluster-xyz.us-east-1.rds.amazonaws.com:5432/orders",
"USERNAME": "glue_reader",
"PASSWORD": "{{resolve:secretsmanager:rds/glue-reader:SecretString:password}}"
},
"PhysicalConnectionRequirements": {
"SubnetId": "subnet-0abc1234",
"SecurityGroupIdList": ["sg-0def5678"],
"AvailabilityZone": "us-east-1a"
}
}'
# Create a JDBC crawler
aws glue create-crawler \
--name rds-orders-crawler \
--role arn:aws:iam::123456789012:role/GlueServiceRole \
--database-name raw_data \
--targets '{
"JdbcTargets":[{
"ConnectionName":"rds-postgres-prod",
"Path":"orders/%",
"Exclusions":["orders/tmp_%"]
}]
}'
Partition Handling
By default, Glue crawlers detect Hive-style partition columns (e.g., year=2026/month=06/day=08) and register them as partition keys on the table. Use CRAWL_NEW_FOLDERS_ONLY recrawl behavior in production — on a large partitioned dataset this reduces crawler runtime from minutes to seconds by only examining folders added since the last run. If your S3 data uses a non-Hive path style (e.g., 2026/06/08/ without key=value), you can configure a custom partition extractor in the crawler's advanced settings or use a Custom classifier with a regex partition key pattern.
Glue Studio Visual ETL and Generated PySpark Code
AWS Glue Studio is a visual drag-and-drop interface for building ETL pipelines. You connect source nodes (S3, catalog tables, JDBC), add transform nodes (Filter, Join, ApplyMapping, SelectFields, FillMissingValues, Aggregate), and connect to target nodes (S3, catalog tables, Redshift). Glue Studio generates production-ready PySpark code from your visual graph — making it an excellent tool for understanding the Glue APIs even if you prefer to write code directly.
Example: Visual ETL Scenario
Suppose you want to join a raw orders table (CSV in S3) with a customers reference table (RDS), rename columns, filter out cancelled orders, and write the result as Parquet partitioned by region. In Glue Studio:
- Add two source nodes: S3 (orders) and Catalog table (customers)
- Add a Join node (inner join on
customer_id) - Add ApplyMapping to rename
order_ts→order_timestamp, drop internal columns - Add a Filter node:
status != 'CANCELLED' - Add an S3 target: format Parquet, compression Snappy, partition by
region
Generated PySpark Code (annotated)
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.dynamicframe import DynamicFrame
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
# --- Source 1: raw orders from S3 (CSV) ---
orders_dyf = glueContext.create_dynamic_frame.from_catalog(
database="raw_data",
table_name="orders",
transformation_ctx="orders_dyf"
)
# --- Source 2: customers from RDS via catalog ---
customers_dyf = glueContext.create_dynamic_frame.from_catalog(
database="raw_data",
table_name="customers",
transformation_ctx="customers_dyf"
)
# --- Join ---
joined_dyf = Join.apply(
frame1=orders_dyf,
frame2=customers_dyf,
keys1=["customer_id"],
keys2=["id"],
transformation_ctx="joined_dyf"
)
# --- ApplyMapping: rename + drop columns ---
mapped_dyf = ApplyMapping.apply(
frame=joined_dyf,
mappings=[
("order_id", "string", "order_id", "string"),
("customer_id", "long", "customer_id", "long"),
("order_ts", "string", "order_timestamp", "timestamp"),
("total_amount", "double", "total_amount", "double"),
("status", "string", "status", "string"),
("region", "string", "region", "string"),
("email", "string", "customer_email", "string"),
],
transformation_ctx="mapped_dyf"
)
# --- Filter: remove cancelled orders ---
filtered_dyf = Filter.apply(
frame=mapped_dyf,
f=lambda x: x["status"] != "CANCELLED",
transformation_ctx="filtered_dyf"
)
# --- Write to S3 as Parquet, partitioned by region ---
glueContext.write_dynamic_frame.from_options(
frame=filtered_dyf,
connection_type="s3",
connection_options={
"path": "s3://my-data-lake/curated/orders/",
"partitionKeys": ["region"]
},
format="glueparquet",
format_options={"compression": "snappy"},
transformation_ctx="output_s3"
)
job.commit()
Writing Glue PySpark Jobs: DynamicFrame, Joins, Aggregations, Deduplication
DynamicFrame vs DataFrame
The DynamicFrame is Glue's extension of the Spark DataFrame. The key difference: DynamicFrames support choice types — a column can hold both strings and integers across different rows without failing the schema. This makes them resilient for ingesting semi-structured or inconsistent data (raw JSON, CSV with missing fields). When you need full Spark SQL expressiveness (window functions, complex groupBy, ML), convert to a DataFrame with .toDF(), transform, then convert back with DynamicFrame.fromDF().
from awsglue.dynamicframe import DynamicFrame
# Convert DynamicFrame -> DataFrame for complex transforms
df = my_dyf.toDF()
# --- Aggregation: total revenue per customer per month ---
from pyspark.sql import functions as F
agg_df = df.groupBy("customer_id", F.date_trunc("month", "order_timestamp").alias("month")) \
.agg(
F.sum("total_amount").alias("monthly_revenue"),
F.count("order_id").alias("order_count"),
F.avg("total_amount").alias("avg_order_value")
)
# --- Window function: running total per customer ---
from pyspark.sql.window import Window
customer_window = Window.partitionBy("customer_id").orderBy("order_timestamp")
df_with_running = df.withColumn(
"running_revenue",
F.sum("total_amount").over(customer_window)
)
# --- Deduplication: keep the latest record per order_id ---
dedup_window = Window.partitionBy("order_id").orderBy(F.desc("order_timestamp"))
deduped_df = df \
.withColumn("rn", F.row_number().over(dedup_window)) \
.filter(F.col("rn") == 1) \
.drop("rn")
# --- Convert back to DynamicFrame for Glue sink ---
output_dyf = DynamicFrame.fromDF(deduped_df, glueContext, "output_dyf")
Resolving Choice Types
When a crawler infers a column as a choice type (e.g., some rows have an integer, others have a string), you must resolve it before writing to a typed format like Parquet. Use ResolveChoice:
from awsglue.transforms import ResolveChoice
# Cast the ambiguous column to a single type
resolved_dyf = ResolveChoice.apply(
frame=raw_dyf,
choice="cast:double",
specs=[("price", "cast:double"), ("quantity", "cast:long")]
)
# Or use make_struct to preserve both variants as a nested struct
resolved_dyf2 = ResolveChoice.apply(
frame=raw_dyf,
choice="make_struct"
)
Writing to Multiple Destinations
# Write to S3 in Parquet with dynamic partition overwrite
glueContext.write_dynamic_frame.from_options(
frame=output_dyf,
connection_type="s3",
connection_options={
"path": "s3://my-data-lake/curated/orders/",
"partitionKeys": ["region", "year", "month"],
"enableUpdateCatalog": True, # update Glue catalog after write
"updateBehavior": "UPDATE_IN_DATABASE",
"databaseName": "curated_data",
"tableName": "orders"
},
format="glueparquet",
format_options={"compression": "snappy", "useGlueParquetWriter": True}
)
# Also write summary to Redshift
glueContext.write_dynamic_frame.from_jdbc_conf(
frame=DynamicFrame.fromDF(agg_df, glueContext, "agg"),
catalog_connection="redshift-prod",
connection_options={
"dbtable": "analytics.monthly_revenue",
"database": "warehouse"
},
redshift_tmp_dir="s3://my-temp-bucket/redshift-staging/"
)
spark.sql.shuffle.partitions to match your expected output file count. Default is 200, which creates 200 small files per partition. For most production jobs, 50–100 shuffle partitions per 100 GB of data is a reasonable starting point.Job Bookmarks: Incremental Processing and Reset
Glue job bookmarks are the built-in mechanism for incremental processing. When enabled, Glue tracks which S3 objects or JDBC rows have already been processed by recording a "bookmark" after each successful job run. The next run only processes data that arrived after the previous bookmark — without you writing any watermark logic.
Enabling Job Bookmarks
# Enable bookmark when creating a job
aws glue create-job \
--name orders-transform \
--role arn:aws:iam::123456789012:role/GlueServiceRole \
--command '{"Name":"glueetl","ScriptLocation":"s3://my-scripts/orders_transform.py","PythonVersion":"3"}' \
--default-arguments '{
"--job-bookmark-option": "job-bookmark-enable",
"--enable-metrics": "",
"--enable-continuous-cloudwatch-log": "true",
"--TempDir": "s3://my-temp-bucket/glue-temp/"
}' \
--worker-type G.1X \
--number-of-workers 10 \
--glue-version "4.0"
# Or enable on an existing job
aws glue update-job \
--job-name orders-transform \
--job-update '{"DefaultArguments":{"--job-bookmark-option":"job-bookmark-enable"}}'
How Bookmarks Work Internally
For S3 sources, Glue uses S3 object last-modified timestamps and ETags to identify new files. For JDBC sources, it tracks the maximum value of a user-specified sequential column (e.g., created_at timestamp or auto-increment id). After a successful run, Glue commits the bookmark. If the job fails mid-run, the bookmark is not committed — the next run re-processes the same data, making bookmark-enabled jobs effectively at-least-once.
Resetting and Pausing Bookmarks
# Reset the bookmark — next run reprocesses ALL data
aws glue reset-job-bookmark --job-name orders-transform
# Pause bookmark — process all data once more, then re-enable bookmark
aws glue update-job \
--job-name orders-transform \
--job-update '{"DefaultArguments":{"--job-bookmark-option":"job-bookmark-pause"}}'
# Disable bookmark entirely
aws glue update-job \
--job-name orders-transform \
--job-update '{"DefaultArguments":{"--job-bookmark-option":"job-bookmark-disable"}}'
# Get current bookmark state
aws glue get-job-bookmark --job-name orders-transform
Bookmark Gotchas
Bookmarks only work reliably when S3 objects are immutable after write. If upstream processes overwrite files in place, Glue won't detect the change (the ETag changes but the last-modified may not, depending on how the overwrite was done). For mutable data sources, use JDBC bookmarks with a strict created_at column, or implement your own watermark in DynamoDB and pass it as a job parameter.
Glue Workflows: Orchestration with CLI and Terraform
Glue Workflows let you build multi-step ETL pipelines that automatically chain crawlers, jobs, and conditional triggers. A typical production Workflow looks like: crawl raw S3 → transform job → crawl curated S3 → aggregate job → send SNS notification.
Creating a Workflow with the CLI
# 1. Create the workflow container
aws glue create-workflow \
--name daily-orders-pipeline \
--description "Crawl raw orders, transform to Parquet, aggregate"
# 2. Add a start trigger (schedules the first crawler)
aws glue create-trigger \
--name start-raw-crawl \
--workflow-name daily-orders-pipeline \
--type SCHEDULED \
--schedule 'cron(0 3 * * ? *)' \
--actions '[{"CrawlerName":"raw-orders-crawler"}]' \
--start-on-creation
# 3. Trigger the transform job when the crawler succeeds
aws glue create-trigger \
--name after-raw-crawl \
--workflow-name daily-orders-pipeline \
--type CONDITIONAL \
--predicate '{
"Logical":"AND",
"Conditions":[{
"LogicalOperator":"EQUALS",
"CrawlerName":"raw-orders-crawler",
"CrawlState":"SUCCEEDED"
}]
}' \
--actions '[{"JobName":"orders-transform"}]'
# 4. Trigger the curated crawler after the job
aws glue create-trigger \
--name after-transform \
--workflow-name daily-orders-pipeline \
--type CONDITIONAL \
--predicate '{
"Logical":"AND",
"Conditions":[{
"LogicalOperator":"EQUALS",
"JobName":"orders-transform",
"State":"SUCCEEDED"
}]
}' \
--actions '[{"CrawlerName":"curated-orders-crawler"}]'
Glue Workflow with Terraform
resource "aws_glue_workflow" "daily_orders" {
name = "daily-orders-pipeline"
description = "Daily orders ETL: crawl → transform → aggregate"
}
resource "aws_glue_trigger" "start_schedule" {
name = "start-raw-crawl"
workflow_name = aws_glue_workflow.daily_orders.name
type = "SCHEDULED"
schedule = "cron(0 3 * * ? *)"
actions {
crawler_name = aws_glue_crawler.raw_orders.name
}
}
resource "aws_glue_trigger" "after_crawl" {
name = "after-raw-crawl"
workflow_name = aws_glue_workflow.daily_orders.name
type = "CONDITIONAL"
predicate {
logical = "AND"
conditions {
logical_operator = "EQUALS"
crawler_name = aws_glue_crawler.raw_orders.name
crawl_state = "SUCCEEDED"
}
}
actions {
job_name = aws_glue_job.orders_transform.name
arguments = {
"--job-bookmark-option" = "job-bookmark-enable"
}
}
}
resource "aws_glue_job" "orders_transform" {
name = "orders-transform"
role_arn = aws_iam_role.glue_role.arn
glue_version = "4.0"
worker_type = "G.1X"
number_of_workers = 10
command {
name = "glueetl"
script_location = "s3://${aws_s3_bucket.scripts.bucket}/orders_transform.py"
python_version = "3"
}
default_arguments = {
"--job-bookmark-option" = "job-bookmark-enable"
"--enable-metrics" = ""
"--enable-continuous-cloudwatch-log" = "true"
"--TempDir" = "s3://${aws_s3_bucket.temp.bucket}/glue-temp/"
}
}
resource "aws_glue_crawler" "raw_orders" {
name = "raw-orders-crawler"
role = aws_iam_role.glue_role.arn
database_name = aws_glue_catalog_database.raw.name
s3_target {
path = "s3://${aws_s3_bucket.raw.bucket}/orders/"
}
schema_change_policy {
update_behavior = "UPDATE_IN_DATABASE"
delete_behavior = "LOG"
}
recrawl_policy {
recrawl_behavior = "CRAWL_NEW_FOLDERS_ONLY"
}
}
aws glue get-workflow-run --name daily-orders-pipeline --run-id wr_12345 to get a full execution graph with timing for each node. Set up CloudWatch alarms on the glue.driver.aggregate.elapsedTime metric and an SNS action on workflow failure triggers for production alerting.Glue Data Quality: DQDL Rules and Quarantine Pattern
AWS Glue Data Quality (GDQ) lets you define declarative data quality rules using DQDL (Data Quality Definition Language) and evaluate them as part of your ETL pipeline. Rules can check completeness, uniqueness, value ranges, referential integrity, and statistical anomalies — without writing custom validation code.
DQDL Rule Examples
-- DQDL ruleset saved as a Glue Data Quality ruleset resource
Rules = [
-- Completeness: critical columns must be non-null
Completeness "customer_id" >= 0.999,
Completeness "order_id" >= 1.0,
-- Uniqueness: order_id must be a primary key
Uniqueness "order_id" >= 0.999,
-- Value range
ColumnValues "total_amount" between 0 and 100000,
ColumnValues "status" in ["PENDING","PROCESSING","SHIPPED","DELIVERED","CANCELLED"],
-- Row count anomaly detection (flags if row count drops >20% vs previous run)
RowCountMatch "orders_yesterday" >= 0.80,
-- Referential integrity: every order's customer_id must exist in customers table
ReferentialIntegrity "customer_id" "customers.id" >= 0.999,
-- Custom SQL rule
CustomSql "SELECT COUNT(*) FROM primary WHERE total_amount < 0" = 0
]
Quarantine Pattern: Routing Bad Records
The cleanest production pattern is to route records that fail quality checks to a quarantine prefix in S3 for human review, rather than failing the entire job. This keeps the pipeline running while ensuring bad data doesn't pollute curated tables.
from awsglue.context import GlueContext
from pyspark.context import SparkContext
from pyspark.sql import functions as F
from awsglue.dynamicframe import DynamicFrame
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
# Load data
df = glueContext.create_dynamic_frame.from_catalog(
database="raw_data", table_name="orders"
).toDF()
# --- Define quality checks as boolean columns ---
df_validated = df \
.withColumn("_err_null_order_id", F.col("order_id").isNull()) \
.withColumn("_err_null_customer", F.col("customer_id").isNull()) \
.withColumn("_err_negative_amount", F.col("total_amount") < 0) \
.withColumn("_err_invalid_status", ~F.col("status").isin(
"PENDING","PROCESSING","SHIPPED","DELIVERED","CANCELLED")) \
.withColumn("_has_error",
F.col("_err_null_order_id") | F.col("_err_null_customer") |
F.col("_err_negative_amount") | F.col("_err_invalid_status"))
# --- Split into good and quarantine ---
good_df = df_validated.filter(~F.col("_has_error")) \
.drop("_err_null_order_id","_err_null_customer",
"_err_negative_amount","_err_invalid_status","_has_error")
quarantine_df = df_validated.filter(F.col("_has_error"))
# --- Write good records to curated zone ---
glueContext.write_dynamic_frame.from_options(
frame=DynamicFrame.fromDF(good_df, glueContext, "good"),
connection_type="s3",
connection_options={"path": "s3://my-data-lake/curated/orders/"},
format="glueparquet",
format_options={"compression": "snappy"}
)
# --- Write quarantine records with error flags for review ---
glueContext.write_dynamic_frame.from_options(
frame=DynamicFrame.fromDF(quarantine_df, glueContext, "quarantine"),
connection_type="s3",
connection_options={"path": "s3://my-data-lake/quarantine/orders/"},
format="glueparquet",
format_options={"compression": "snappy"}
)
# --- Emit CloudWatch custom metric for quarantine rate ---
quarantine_count = quarantine_df.count()
total_count = df.count()
quarantine_rate = quarantine_count / max(total_count, 1)
print(f"QUARANTINE_RATE: {quarantine_rate:.4f} ({quarantine_count}/{total_count})")
# Alarm in CloudWatch if quarantine_rate > 0.01 (1%)
"EnableRecommendedRules": true in your ruleset configuration.Glue Streaming ETL: Kinesis and MSK Sources
Glue Streaming ETL runs as a long-running Spark Structured Streaming job. Instead of reading a finite S3 dataset, it reads continuously from a Kinesis Data Stream or Apache Kafka (MSK) topic, processes micro-batches, and writes to S3 or another sink. The same DynamicFrame/PySpark APIs you use in batch jobs work in streaming mode — the context switches automatically based on the source type.
Streaming from Kinesis Data Streams
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
import json
from pyspark.sql import functions as F
from pyspark.sql.types import *
from awsglue.dynamicframe import DynamicFrame
args = getResolvedOptions(sys.argv, ['JOB_NAME', 'stream_name', 'output_path'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
# --- Define the Kinesis source as a streaming DynamicFrame ---
kinesis_options = {
"streamARN": f"arn:aws:kinesis:us-east-1:123456789012:stream/{args['stream_name']}",
"startingPosition": "TRIM_HORIZON",
"inferSchema": "true",
"classification": "json"
}
streaming_dyf = glueContext.create_data_frame.from_options(
connection_type="kinesis",
connection_options=kinesis_options,
transformation_ctx="kinesis_src"
)
# --- Schema for the event payload ---
event_schema = StructType([
StructField("event_id", StringType(), True),
StructField("user_id", LongType(), True),
StructField("event_type", StringType(), True),
StructField("page", StringType(), True),
StructField("ts", LongType(), True),
StructField("properties", StringType(), True),
])
def process_batch(data_frame, batch_id):
if data_frame.count() == 0:
return
# Parse JSON payload from Kinesis record `data` column
parsed_df = data_frame.select(
F.from_json(F.col("data").cast("string"), event_schema).alias("evt")
).select("evt.*") \
.withColumn("event_dt", F.from_unixtime(F.col("ts") / 1000).cast("timestamp")) \
.withColumn("year", F.year("event_dt").cast("string")) \
.withColumn("month", F.month("event_dt").cast("string")) \
.withColumn("day", F.dayofmonth("event_dt").cast("string"))
# Deduplicate within micro-batch
parsed_df = parsed_df.dropDuplicates(["event_id"])
output_dyf = DynamicFrame.fromDF(parsed_df, glueContext, f"batch_{batch_id}")
glueContext.write_dynamic_frame.from_options(
frame=output_dyf,
connection_type="s3",
connection_options={
"path": args['output_path'],
"partitionKeys": ["year", "month", "day"]
},
format="glueparquet",
format_options={"compression": "snappy"}
)
glueContext.forEachBatch(
frame=streaming_dyf,
batch_function=process_batch,
options={
"windowSize": "60 seconds",
"checkpointLocation": "s3://my-temp-bucket/glue-streaming-checkpoint/"
}
)
job.commit()
Streaming from Amazon MSK (Kafka)
# MSK source — requires a Glue connection pointing to your MSK cluster
kafka_options = {
"connectionName": "msk-prod-connection",
"topicName": "orders-events",
"startingOffsets": "earliest",
"inferSchema": "true",
"classification": "json"
}
msk_streaming_dyf = glueContext.create_data_frame.from_options(
connection_type="kafka",
connection_options=kafka_options,
transformation_ctx="msk_src"
)
# Process with the same forEachBatch pattern as Kinesis above
glueContext.forEachBatch(
frame=msk_streaming_dyf,
batch_function=process_batch,
options={
"windowSize": "30 seconds",
"checkpointLocation": "s3://my-temp-bucket/msk-checkpoint/"
}
)
"--enable-auto-scaling": "") so Glue automatically adds/removes workers as Kinesis or Kafka lag changes. Set a minimum of 2 workers to keep the checkpoint alive even during low-traffic periods. Monitor the glue.driver.streaming.numRecords CloudWatch metric to track throughput.Performance Tuning: Worker Types, Pushdown Predicates, Cost Optimization
Worker Types Compared
| Worker Type | vCPUs | Memory | DPU | Best For |
|---|---|---|---|---|
| Standard | 4 | 16 GB | 1 | Legacy (avoid for new jobs) |
| G.1X | 4 | 16 GB | 1 | Most batch ETL, default choice |
| G.2X | 8 | 32 GB | 2 | Complex joins, shuffle-heavy workloads |
| G.4X | 16 | 64 GB | 4 | ML feature engineering, very wide schemas, OOM-prone jobs |
| G.8X | 32 | 128 GB | 8 | Single-node memory-intensive operations |
| Z.2X | 8 | 32 GB + GPU | 2 | Glue Studio ML transforms, embedding generation |
Start with G.1X and profile. Upgrade to G.2X only if you see executor OOM errors or excessive spill-to-disk in the Spark UI. Horizontal scale (more G.1X workers) is almost always cheaper than vertical scale (fewer G.4X workers) for data-parallel ETL.
Pushdown Predicates
When reading from partitioned catalog tables or JDBC sources, pass a push_down_predicate to avoid scanning partitions you don't need. This can reduce data scanned (and job runtime) by orders of magnitude on large partitioned datasets.
# Read only the last 7 days of partitioned data
orders_dyf = glueContext.create_dynamic_frame.from_catalog(
database="raw_data",
table_name="orders",
push_down_predicate="year = '2026' AND month = '06' AND day >= '01'",
transformation_ctx="orders_dyf"
)
# For JDBC sources, use additional_options with a WHERE clause
jdbc_dyf = glueContext.create_dynamic_frame.from_catalog(
database="rds_data",
table_name="transactions",
additional_options={
"pushDownPredicate": "created_at >= '2026-06-01'",
"hashfield": "transaction_id", # parallel JDBC reads
"hashpartitions": "20" # 20 parallel JDBC partitions
}
)
Partition and Shuffle Optimization
# Set shuffle partitions based on data size
# Rule of thumb: 1 partition per 200 MB of shuffled data
spark.conf.set("spark.sql.shuffle.partitions", "200")
# Enable adaptive query execution (Glue 3.0+)
spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")
# Broadcast small tables to avoid shuffle joins
from pyspark.sql.functions import broadcast
result_df = large_orders_df.join(
broadcast(small_lookup_df), "region_code"
)
# Repartition before writing to control output file count
# Target: ~128 MB per Parquet file
output_df = transformed_df.repartition(50, "region") # 50 files per region partition
Cost Optimization Strategies
- Enable auto-scaling (
"--enable-auto-scaling": ""in job args) — Glue scales workers up and down automatically. Most jobs use 20–40% fewer DPU-seconds with auto-scaling vs a fixed worker count. - Use job bookmarks — process only new data on each run instead of re-scanning the full dataset.
- Right-size Python Shell for small jobs — if the transform is simple and the dataset fits in a single node, use Python Shell (0.0625 DPU) instead of a Spark job (minimum 2 DPUs).
- Write Parquet, not CSV — Parquet with Snappy compression is 5–10x smaller than CSV, reducing both S3 storage costs and downstream Athena scan costs.
- Compact small files — if upstream writes many small files (common with Kinesis Firehose), run a periodic Glue compaction job that reads a partition and rewrites it as a few large Parquet files. This dramatically speeds up Athena queries.
- Schedule jobs intelligently — batch jobs at off-peak hours to take advantage of Glue's capacity pool; avoid scheduling hundreds of jobs at the top of the hour when capacity is contested.
Monitoring with CloudWatch Metrics
# Key Glue metrics to monitor in CloudWatch
# glue.driver.aggregate.bytesRead — total bytes read from source
# glue.driver.aggregate.elapsedTime — total job wall time (ms)
# glue.driver.aggregate.numOutputRows — rows written to sink
# glue.driver.jvm.heap.usage — JVM heap pressure (watch for OOM)
# glue.ALL.jvm.heap.usage — heap across all executors
# glue.driver.streaming.numRecords — streaming records processed per batch
# Create a CloudWatch alarm for job duration > 30 minutes
aws cloudwatch put-metric-alarm \
--alarm-name "GlueJob-orders-transform-TooSlow" \
--metric-name "glue.driver.aggregate.elapsedTime" \
--namespace "Glue" \
--dimensions Name=JobName,Value=orders-transform \
--statistic Maximum \
--period 300 \
--threshold 1800000 \
--comparison-operator GreaterThanThreshold \
--evaluation-periods 1 \
--alarm-actions arn:aws:sns:us-east-1:123456789012:glue-alerts
"--execution-class": "FLEX" in job arguments. Flex jobs run on spare Glue capacity and cost approximately 34% less than standard jobs — at the cost of potentially longer startup times and possible preemption (the job is restarted, not lost).