Azure Event Hubs — Capture to ADLS Gen2
Auto-Capture Events as Avro/Parquet for Analytics
Introduction
Azure Event Hubs Capture is a powerful feature that automatically streams your event data to Azure Data Lake Storage Gen2 (ADLS Gen2) for long-term storage, batch analytics, and data warehousing scenarios. Instead of processing events in real-time, you can capture them for later analysis.
This guide covers:
- Capture fundamentals — How automatic capture works
- Format options — Avro vs Parquet comparison
- Configuration — Azure Portal and CLI setup
- Data pipeline — Building analytics workflows
- Optimization — Performance and cost best practices
How Event Hubs Capture Works
Architecture Overview
┌─────────────────────────────────────────────────────────────────────┐
│ EVENT HUBS CAPURE ARCHITECTURE │
├─────────────────────────────────────────────────────────────────────┤
│ |
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ Producers │ ───▶ │ Event Hub │ ───▶ │ Capture │ │
│ │ (Apps/Devices) │ (Partitioned) │ (Auto-Flow) │ │
│ └──────────────┘ └──────────────┘ └──────┬───────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────────┐ │
│ │ Azure Data Lake Gen2 │ │
│ │ /namespace/eventhubs/partition/YYYY/MM/DD/HH/file.avro │ │
│ └─────────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ Synapse │ │ Spark │ │ ADF │ │
│ │ Analytics │ │ Databricks │ │ pipelines │ │
│ └──────────────┘ └──────────────┘ └──────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────┘
Capture Flow
- Events arrive — Producers send events to Event Hub partitions
- Capture triggers — When interval OR size threshold is reached
- Batch creation — Events within time window are grouped
- Format conversion — Events serialized to Avro or Parquet
- File upload — Written to ADLS Gen2 as append blobs
- Finalization — File closed and renamed for reading
Why Use Capture?
| Scenario | Benefit |
|---|---|
| Historical analysis | Query old events for trends |
| Regulatory compliance | Retain data for audit requirements |
| Data warehousing | Load into Synapse for BI reporting |
| Machine learning | Train models on historical data |
| Disaster recovery | Replay events after outages |
| Cost optimization | Use cheaper storage for cold data |
Enable Capture
Prerequisites
- Event Hubs namespace — Premium or Dedicated tier required for Parquet
- Storage account — ADLS Gen2 enabled with hierarchical namespace
- Container — Blob container for captured files
- Permissions — Storage Blob Data Contributor role
Azure Portal Configuration
- Navigate to your Event Hubs namespace
- Select the Event Hub to configure
- Go to "Capture" settings
- Toggle "Enable Capture" to ON
- Select Azure Storage or Azure Data Lake
- Choose container or file system
- Set capture window (time interval) and size limit
- Save configuration
CLI Configuration
# Create Event Hub with capture enabled
az eventhubs eventhub create \
--name my-eventhub \
--namespace-name my-namespace \
--resource-group my-rg \
--capture-enabled true \
--capture-destination-storage-account mystorageaccount \
--capture-name captures \
--interval-in-seconds 300 \
--size-limit-in-bytes 314572800
Parameters explained:
- interval-in-seconds — Time window (default: 300 = 5 minutes)
- size-limit-in-bytes — Size threshold (default: 314572800 = 300MB)
- Capture triggers when either threshold is reached
Update Existing Event Hub
# Enable capture on existing Event Hub
az eventhubs eventhub update \
--name my-eventhub \
--namespace-name my-namespace \
--resource-group my-rg \
--capture-enabled true \
--capture-destination-storage-account mystorageaccount \
--capture-name captures
ARM Template Configuration
{
"resources": [
{
"type": "Microsoft.EventHub/namespaces/eventhubs",
"apiVersion": "2022-10-01-preview",
"name": "[parameters('eventHubName')]",
"location": "[parameters('location')]",
"properties": {
"capture": {
"enabled": true,
"intervalInSeconds": 300,
"sizeLimitInBytes": 314572800,
"destination": {
"name": "EventHubArchive.AzureStorage",
"properties": {
"storageAccount": {
"subscriptionId": "[subscription().subscriptionId]",
"resourceGroup": "[parameters('storageResourceGroup')]",
"accountName": "[parameters('storageAccountName')]"
},
"blobContainer": "captures"
}
}
}
}
}
]
}
Capture File Format
Directory Structure
https://<storage>.blob.core.windows.net/<container>/
└── <namespace>/
└── <eventhub-name>/
└── <partition-id>/
└── <year>/
└── <month>/
└── <day>/
└── <hour>/
└── <sequence-number>.avro
Example path:
https://mystorage.blob.core.windows.net/captures/
mynamespace/
orders-events/
2/
2024/
05/
16/
10/
169701234567.avro
Avro Schema
Events are captured with Avro schema embedded in the file header:
{
"type": "record",
"name": "EventData",
"fields": [
{
"name": "SequenceNumber",
"type": "long"
},
{
"name": "Offset",
"type": "long"
},
{
"name": "EnqueuedTimeUtc",
"type": "string"
},
{
"name": "SystemProperties",
"type": {
"type": "map",
"values": ["null", "string", "long", "boolean"]
}
},
{
"name": "Properties",
"type": {
"type": "map",
"values": ["null", "string", "bytes"]
}
},
{
"name": "Body",
"type": ["null", "bytes"]
}
]
}
Body Decoding
The event body is stored as bytes — decode based on your content type:
# Python: Decode event body from Avro
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
# Read captured events
df = spark.read.format("avro").load(
"abfss://container@storage.dfs.core.windows.net/events/*"
)
# Decode body based on content type
from pyspark.sql.functions import col, when, base64
df_decoded = df.withColumn(
"body_decoded",
when(col("Body").isNotNull(), col("Body").cast("string"))
)
# For JSON content, parse it
from pyspark.sql.functions import from_json
from pyspark.sql.types import StructType
json_schema = StructType().add("orderId", "string").add("amount", "double")
df_json = df_decoded.withColumn("body_json", from_json(col("body_decoded"), json_schema))
Use with Azure Synapse
Direct Query with OPENROWSET
-- Query Avro files directly in Synapse SQL
SELECT TOP 100
CAST(Body AS VARCHAR(MAX)) AS EventBody,
SequenceNumber,
Offset,
EnqueuedTimeUtc
FROM OPENROWSET(
BULK 'https://mystorage.dfs.core.windows.net/captures/mynamespace/orders/*/*/*/*/*/*.avro',
FORMAT = 'AVRO'
) AS rows
ORDER BY EnqueuedTimeUtc DESC;
Create External Table
-- Create external table over captured data
CREATE EXTERNAL DATA SOURCE CaptureStorage
WITH (
TYPE = BLOB_STORAGE,
LOCATION = 'https://mystorage.blob.core.windows.net/captures'
);
CREATE EXTERNAL TABLE EventsRaw
(
Body NVARCHAR(MAX),
SequenceNumber BIGINT,
Offset BIGINT,
EnqueuedTimeUtc DATETIME2
)
WITH (
LOCATION = 'mynamespace/orders/',
DATA_SOURCE = CaptureStorage,
FILE_FORMAT = AvroFormat
);
-- Query like a regular table
SELECT
DATEPART(HOUR, EnqueuedTimeUtc) AS Hour,
COUNT(*) AS EventCount
FROM EventsRaw
WHERE EnqueuedTimeUtc >= DATEADD(DAY, -7, GETDATE())
GROUP BY DATEPART(HOUR, EnqueuedTimeUtc)
ORDER BY Hour;
PySpark in Synapse Notebook
# Read and analyze events with PySpark
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, window, avg, count
# Create Spark session
spark = SparkSession.builder \
.appName("EventHubsCaptureAnalysis") \
.getOrCreate()
# Read captured events
df = spark.read.format("avro").load(
"abfss://captures@mystorage.dfs.core.windows.net/mynamespace/orders/*"
)
# Basic aggregations
event_stats = df.groupBy(
window("EnqueuedTimeUtc", "1 hour")
).agg(
count("*").alias("event_count"),
count("distinct Offset").alias("unique_events")
).orderBy("window")
event_stats.show()
# Parse JSON body for specific analysis
from pyspark.sql.functions import from_json, col
json_schema = """
struct<orderId:string, amount:double, customerId:string, items:array<struct<productId:string, quantity:int>>>
"""
df_parsed = df.withColumn(
"order_data",
from_json(col("Body").cast("string"), json_schema)
)
# Aggregate by customer
customer_stats = df_parsed.groupBy("order_data.customerId").agg(
sum("order_data.amount").alias("total_spend"),
count("*").alias("order_count")
).orderBy(col("total_spend").desc())
customer_stats.show(20)
Configure Capture Format
Avro Format (Default)
Default format — works on all tiers:
az eventhubs eventhub update \
--name my-eventhub \
--namespace-name my-namespace \
--resource-group my-rg \
--capture-interval 300 \
--capture-size-limit 314572800 \
--capture-destination-name ArchiveStore
Parquet Format (Premium/Dedicated)
Parquet provides better query performance:
# Requires Premium or Dedicated namespace
az eventhubs eventhub update \
--name my-eventhub \
--namespace-name my-namespace \
--resource-group my-rg \
--capture-format "Parquet" \
--capture-interval 300 \
--capture-size-limit 314572800
Destination Configuration
{
"StorageAccount": {
"subscriptionId": "xxx-xxx-xxx",
"resourceGroupName": "myrg",
"accountName": "mystorage",
"blobContainerName": "captures"
},
"EventHubName": "my-eventhub"
}
Destination Properties File
{
"StorageAccount": {
"subscriptionId": "xxx-xxx-xxx",
"resourceGroupName": "myrg",
"accountName": "mystorage",
"blobContainerName": "eventhubs-capture"
},
"EventHubName": "orders",
"ConsumerGroup": "$Default"
}
Event Processing Pipeline
Batch Analytics Architecture
┌─────────────────────────────────────────────────────────────────────┐
│ EVENT CAPTURE PIPELINE │
├─────────────────────────────────────────────────────────────────────┤
│ │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ Event Hub │ ───▶ │ Capture │ ───▶ │ ADLS Gen2 │ │
│ │ (Streaming) │ │ (5min/300MB)│ │ (Raw Zone) │ │
│ └──────────────┘ └──────────────┘ └──────┬───────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────────┐ │
│ │ Azure Data Factory │ │
│ │ Copy & Transform: Raw → Curated (Bronze→Silver→Gold) │ │
│ └─────────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ Synapse │ │ Power BI │ │ Databricks │ │
│ │ SQL Pool │ │ Reports │ │ ML Models │ │
│ └──────────────┘ └──────────────┘ └──────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────┘
Bronze/Silver/Gold Pattern
# Bronze: Raw captured data
bronze_df = spark.read.format("avro").load(
"abfss://raw@mystorage.dfs.core.windows.net/events/*"
)
# Silver: Cleaned and enriched data
silver_df = bronze_df.select(
"EnqueuedTimeUtc",
"Offset",
"SequenceNumber",
from_json(col("Body").cast("string"), json_schema).alias("data")
).select(
"EnqueuedTimeUtc",
"data.*",
year("EnqueuedTimeUtc").alias("year"),
month("EnqueuedTimeUtc").alias("month"),
day("EnqueuedTimeUtc").alias("day")
)
# Write to Silver layer
silver_df.write.partitionBy("year", "month", "day") \
.format("delta") \
.mode("append") \
.save("abfss://silver@mystorage.dfs.core.windows.net/events/")
# Gold: Aggregated metrics
gold_df = silver_df.groupBy("date", "customerId").agg(
sum("amount").alias("total_amount"),
count("*").alias("order_count")
)
gold_df.write.format("delta") \
.mode("overwrite") \
.save("abfss://gold@mystorage.dfs.core.windows.net/daily_metrics/")
Performance Optimization
Partition Strategy
| Consideration | Recommendation |
|---|---|
| Partition count | Match to downstream parallelism |
| Capture frequency | Smaller intervals = more files |
| File size | Larger files = better compression |
| Cost | Balance compute vs. storage |
Optimization Tips
# Optimize Spark reading with partitioning
df = spark.read \
.format("avro") \
.option("mergeSchema", "true") \
.load("abfss://container@storage.dfs.core.windows.net/events/*")
# Use partition pruning
df_filtered = spark.read \
.format("avro") \
.load("abfss://container@storage.dfs.core.windows.net/events/year=2024/month=05/")
# Coalesce for smaller output
df.coalesce(10).write \
.format("parquet") \
.save("output-path")
Cost Management
- Storage tiering — Move old data to Cool/Archive tier
- Compression — Enable GZIP or Snappy
- Retention — Set lifecycle policies
- Partition pruning — Only read needed data
Best Practices
| Practice | Description |
|---|---|
| Use Parquet | Better compression and query performance |
| Set appropriate intervals | Balance file size vs. latency |
| Enable partition pruning | Filter by date in path |
| Use Delta format | Enable time travel and ACID |
| Implement lifecycle policies | Auto-archive old files |
| Monitor capture lag | Track capture behind processing |
Troubleshooting
Capture Not Working
# Check capture status
az eventhubs eventhub show \
--name my-eventhub \
--namespace-name my-namespace \
--query "capture"
# Verify storage permissions
az role assignment list \
--assignee <eventhubs-resource-id> \
--role "Storage Blob Data Contributor"
Files Not Appearing
- Check Event Hub is receiving events
- Verify storage account has hierarchical namespace
- Confirm container exists and permissions are correct
- Check capture is enabled (not just configured)
Performance Issues
- Increase partition count for more parallelism
- Use Premium namespace for lower latency
- Enable compression on storage
- Optimize Spark cluster sizing
Azure Integration Hub - Advanced Level