Azure Event Hubs — Capture to ADLS Gen2

Auto-Capture Events as Avro/Parquet for Analytics


Introduction

Azure Event Hubs Capture is a powerful feature that automatically streams your event data to Azure Data Lake Storage Gen2 (ADLS Gen2) for long-term storage, batch analytics, and data warehousing scenarios. Instead of processing events in real-time, you can capture them for later analysis.

This guide covers:

  • Capture fundamentals — How automatic capture works
  • Format options — Avro vs Parquet comparison
  • Configuration — Azure Portal and CLI setup
  • Data pipeline — Building analytics workflows
  • Optimization — Performance and cost best practices

How Event Hubs Capture Works

Architecture Overview

┌─────────────────────────────────────────────────────────────────────┐
│                    EVENT HUBS CAPURE ARCHITECTURE                   │
├─────────────────────────────────────────────────────────────────────┤
│                                                                     |
│  ┌──────────────┐      ┌──────────────┐      ┌──────────────┐       │
│  │  Producers   │ ───▶ │  Event Hub   │ ───▶ │    Capture   │       │
│  │  (Apps/Devices)     │  (Partitioned)      │  (Auto-Flow) │       │
│  └──────────────┘      └──────────────┘      └──────┬───────┘       │
│                                                      │              │
│                                                      ▼              │
│  ┌─────────────────────────────────────────────────────────────┐    │
│  │                    Azure Data Lake Gen2                     │    │
│  │   /namespace/eventhubs/partition/YYYY/MM/DD/HH/file.avro    │    │
│  └─────────────────────────────────────────────────────────────┘    │
│                              │                                      │
│                              ▼                                      │
│  ┌──────────────┐      ┌──────────────┐      ┌──────────────┐       │
│  │   Synapse    │      │    Spark     │      │    ADF       │       │
│  │   Analytics  │      │  Databricks  │      │  pipelines   │       │
│  └──────────────┘      └──────────────┘      └──────────────┘       │
│                                                                     │
└─────────────────────────────────────────────────────────────────────┘

Capture Flow

  1. Events arrive — Producers send events to Event Hub partitions
  2. Capture triggers — When interval OR size threshold is reached
  3. Batch creation — Events within time window are grouped
  4. Format conversion — Events serialized to Avro or Parquet
  5. File upload — Written to ADLS Gen2 as append blobs
  6. Finalization — File closed and renamed for reading

Why Use Capture?

ScenarioBenefit
Historical analysisQuery old events for trends
Regulatory complianceRetain data for audit requirements
Data warehousingLoad into Synapse for BI reporting
Machine learningTrain models on historical data
Disaster recoveryReplay events after outages
Cost optimizationUse cheaper storage for cold data

Enable Capture

Prerequisites

  • Event Hubs namespace — Premium or Dedicated tier required for Parquet
  • Storage account — ADLS Gen2 enabled with hierarchical namespace
  • Container — Blob container for captured files
  • Permissions — Storage Blob Data Contributor role

Azure Portal Configuration

  1. Navigate to your Event Hubs namespace
  2. Select the Event Hub to configure
  3. Go to "Capture" settings
  4. Toggle "Enable Capture" to ON
  5. Select Azure Storage or Azure Data Lake
  6. Choose container or file system
  7. Set capture window (time interval) and size limit
  8. Save configuration

CLI Configuration

# Create Event Hub with capture enabled
az eventhubs eventhub create \
  --name my-eventhub \
  --namespace-name my-namespace \
  --resource-group my-rg \
  --capture-enabled true \
  --capture-destination-storage-account mystorageaccount \
  --capture-name captures \
  --interval-in-seconds 300 \
  --size-limit-in-bytes 314572800

Parameters explained:

  • interval-in-seconds — Time window (default: 300 = 5 minutes)
  • size-limit-in-bytes — Size threshold (default: 314572800 = 300MB)
  • Capture triggers when either threshold is reached

Update Existing Event Hub

# Enable capture on existing Event Hub
az eventhubs eventhub update \
  --name my-eventhub \
  --namespace-name my-namespace \
  --resource-group my-rg \
  --capture-enabled true \
  --capture-destination-storage-account mystorageaccount \
  --capture-name captures

ARM Template Configuration

{
  "resources": [
    {
      "type": "Microsoft.EventHub/namespaces/eventhubs",
      "apiVersion": "2022-10-01-preview",
      "name": "[parameters('eventHubName')]",
      "location": "[parameters('location')]",
      "properties": {
        "capture": {
          "enabled": true,
          "intervalInSeconds": 300,
          "sizeLimitInBytes": 314572800,
          "destination": {
            "name": "EventHubArchive.AzureStorage",
            "properties": {
              "storageAccount": {
                "subscriptionId": "[subscription().subscriptionId]",
                "resourceGroup": "[parameters('storageResourceGroup')]",
                "accountName": "[parameters('storageAccountName')]"
              },
              "blobContainer": "captures"
            }
          }
        }
      }
    }
  ]
}

Capture File Format

Directory Structure

https://<storage>.blob.core.windows.net/<container>/
  └── <namespace>/
      └── <eventhub-name>/
          └── <partition-id>/
              └── <year>/
                  └── <month>/
                      └── <day>/
                          └── <hour>/
                              └── <sequence-number>.avro

Example path:

https://mystorage.blob.core.windows.net/captures/
  mynamespace/
    orders-events/
      2/
        2024/
          05/
            16/
              10/
                169701234567.avro

Avro Schema

Events are captured with Avro schema embedded in the file header:

{
  "type": "record",
  "name": "EventData",
  "fields": [
    {
      "name": "SequenceNumber",
      "type": "long"
    },
    {
      "name": "Offset",
      "type": "long"
    },
    {
      "name": "EnqueuedTimeUtc",
      "type": "string"
    },
    {
      "name": "SystemProperties",
      "type": {
        "type": "map",
        "values": ["null", "string", "long", "boolean"]
      }
    },
    {
      "name": "Properties",
      "type": {
        "type": "map",
        "values": ["null", "string", "bytes"]
      }
    },
    {
      "name": "Body",
      "type": ["null", "bytes"]
    }
  ]
}

Body Decoding

The event body is stored as bytes — decode based on your content type:

# Python: Decode event body from Avro
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

# Read captured events
df = spark.read.format("avro").load(
    "abfss://container@storage.dfs.core.windows.net/events/*"
)

# Decode body based on content type
from pyspark.sql.functions import col, when, base64

df_decoded = df.withColumn(
    "body_decoded",
    when(col("Body").isNotNull(), col("Body").cast("string"))
)

# For JSON content, parse it
from pyspark.sql.functions import from_json
from pyspark.sql.types import StructType

json_schema = StructType().add("orderId", "string").add("amount", "double")
df_json = df_decoded.withColumn("body_json", from_json(col("body_decoded"), json_schema))

Use with Azure Synapse

Direct Query with OPENROWSET

-- Query Avro files directly in Synapse SQL
SELECT TOP 100
    CAST(Body AS VARCHAR(MAX)) AS EventBody,
    SequenceNumber,
    Offset,
    EnqueuedTimeUtc
FROM OPENROWSET(
    BULK 'https://mystorage.dfs.core.windows.net/captures/mynamespace/orders/*/*/*/*/*/*.avro',
    FORMAT = 'AVRO'
) AS rows
ORDER BY EnqueuedTimeUtc DESC;

Create External Table

-- Create external table over captured data
CREATE EXTERNAL DATA SOURCE CaptureStorage
WITH (
    TYPE = BLOB_STORAGE,
    LOCATION = 'https://mystorage.blob.core.windows.net/captures'
);

CREATE EXTERNAL TABLE EventsRaw
(
    Body NVARCHAR(MAX),
    SequenceNumber BIGINT,
    Offset BIGINT,
    EnqueuedTimeUtc DATETIME2
)
WITH (
    LOCATION = 'mynamespace/orders/',
    DATA_SOURCE = CaptureStorage,
    FILE_FORMAT = AvroFormat
);

-- Query like a regular table
SELECT 
    DATEPART(HOUR, EnqueuedTimeUtc) AS Hour,
    COUNT(*) AS EventCount
FROM EventsRaw
WHERE EnqueuedTimeUtc >= DATEADD(DAY, -7, GETDATE())
GROUP BY DATEPART(HOUR, EnqueuedTimeUtc)
ORDER BY Hour;

PySpark in Synapse Notebook

# Read and analyze events with PySpark
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, window, avg, count

# Create Spark session
spark = SparkSession.builder \
    .appName("EventHubsCaptureAnalysis") \
    .getOrCreate()

# Read captured events
df = spark.read.format("avro").load(
    "abfss://captures@mystorage.dfs.core.windows.net/mynamespace/orders/*"
)

# Basic aggregations
event_stats = df.groupBy(
    window("EnqueuedTimeUtc", "1 hour")
).agg(
    count("*").alias("event_count"),
    count("distinct Offset").alias("unique_events")
).orderBy("window")

event_stats.show()

# Parse JSON body for specific analysis
from pyspark.sql.functions import from_json, col

json_schema = """
    struct<orderId:string, amount:double, customerId:string, items:array<struct<productId:string, quantity:int>>>
"""

df_parsed = df.withColumn(
    "order_data", 
    from_json(col("Body").cast("string"), json_schema)
)

# Aggregate by customer
customer_stats = df_parsed.groupBy("order_data.customerId").agg(
    sum("order_data.amount").alias("total_spend"),
    count("*").alias("order_count")
).orderBy(col("total_spend").desc())

customer_stats.show(20)

Configure Capture Format

Avro Format (Default)

Default format — works on all tiers:

az eventhubs eventhub update \
  --name my-eventhub \
  --namespace-name my-namespace \
  --resource-group my-rg \
  --capture-interval 300 \
  --capture-size-limit 314572800 \
  --capture-destination-name ArchiveStore

Parquet Format (Premium/Dedicated)

Parquet provides better query performance:

# Requires Premium or Dedicated namespace
az eventhubs eventhub update \
  --name my-eventhub \
  --namespace-name my-namespace \
  --resource-group my-rg \
  --capture-format "Parquet" \
  --capture-interval 300 \
  --capture-size-limit 314572800

Destination Configuration

{
  "StorageAccount": {
    "subscriptionId": "xxx-xxx-xxx",
    "resourceGroupName": "myrg",
    "accountName": "mystorage",
    "blobContainerName": "captures"
  },
  "EventHubName": "my-eventhub"
}

Destination Properties File

{
  "StorageAccount": {
    "subscriptionId": "xxx-xxx-xxx",
    "resourceGroupName": "myrg",
    "accountName": "mystorage",
    "blobContainerName": "eventhubs-capture"
  },
  "EventHubName": "orders",
  "ConsumerGroup": "$Default"
}

Event Processing Pipeline

Batch Analytics Architecture

┌─────────────────────────────────────────────────────────────────────┐
│                    EVENT CAPTURE PIPELINE                           │
├─────────────────────────────────────────────────────────────────────┤
│                                                                     │
│  ┌──────────────┐      ┌──────────────┐      ┌──────────────┐       │
│  │  Event Hub   │ ───▶ │   Capture    │ ───▶ │  ADLS Gen2   │       │
│  │  (Streaming) │      │  (5min/300MB)│      │  (Raw Zone)  │       │
│  └──────────────┘      └──────────────┘      └──────┬───────┘       │
│                                                     │               │
│                                                     ▼               │
│  ┌─────────────────────────────────────────────────────────────┐    │
│  │                    Azure Data Factory                       │    │
│  │     Copy & Transform: Raw → Curated (Bronze→Silver→Gold)    │    │
│  └─────────────────────────────────────────────────────────────┘    │
│                              │                                      │
│                              ▼                                      │
│  ┌──────────────┐      ┌──────────────┐      ┌──────────────┐       │
│  │   Synapse    │      │    Power BI  │      │   Databricks │       │
│  │   SQL Pool   │      │   Reports    │      │   ML Models  │       │
│  └──────────────┘      └──────────────┘      └──────────────┘       │
│                                                                     │
└─────────────────────────────────────────────────────────────────────┘

Bronze/Silver/Gold Pattern

# Bronze: Raw captured data
bronze_df = spark.read.format("avro").load(
    "abfss://raw@mystorage.dfs.core.windows.net/events/*"
)

# Silver: Cleaned and enriched data
silver_df = bronze_df.select(
    "EnqueuedTimeUtc",
    "Offset",
    "SequenceNumber",
    from_json(col("Body").cast("string"), json_schema).alias("data")
).select(
    "EnqueuedTimeUtc",
    "data.*",
    year("EnqueuedTimeUtc").alias("year"),
    month("EnqueuedTimeUtc").alias("month"),
    day("EnqueuedTimeUtc").alias("day")
)

# Write to Silver layer
silver_df.write.partitionBy("year", "month", "day") \
    .format("delta") \
    .mode("append") \
    .save("abfss://silver@mystorage.dfs.core.windows.net/events/")

# Gold: Aggregated metrics
gold_df = silver_df.groupBy("date", "customerId").agg(
    sum("amount").alias("total_amount"),
    count("*").alias("order_count")
)

gold_df.write.format("delta") \
    .mode("overwrite") \
    .save("abfss://gold@mystorage.dfs.core.windows.net/daily_metrics/")

Performance Optimization

Partition Strategy

ConsiderationRecommendation
Partition countMatch to downstream parallelism
Capture frequencySmaller intervals = more files
File sizeLarger files = better compression
CostBalance compute vs. storage

Optimization Tips

# Optimize Spark reading with partitioning
df = spark.read \
    .format("avro") \
    .option("mergeSchema", "true") \
    .load("abfss://container@storage.dfs.core.windows.net/events/*")

# Use partition pruning
df_filtered = spark.read \
    .format("avro") \
    .load("abfss://container@storage.dfs.core.windows.net/events/year=2024/month=05/")

# Coalesce for smaller output
df.coalesce(10).write \
    .format("parquet") \
    .save("output-path")

Cost Management

  • Storage tiering — Move old data to Cool/Archive tier
  • Compression — Enable GZIP or Snappy
  • Retention — Set lifecycle policies
  • Partition pruning — Only read needed data

Best Practices

PracticeDescription
Use ParquetBetter compression and query performance
Set appropriate intervalsBalance file size vs. latency
Enable partition pruningFilter by date in path
Use Delta formatEnable time travel and ACID
Implement lifecycle policiesAuto-archive old files
Monitor capture lagTrack capture behind processing

Troubleshooting

Capture Not Working

# Check capture status
az eventhubs eventhub show \
  --name my-eventhub \
  --namespace-name my-namespace \
  --query "capture"

# Verify storage permissions
az role assignment list \
  --assignee <eventhubs-resource-id> \
  --role "Storage Blob Data Contributor"

Files Not Appearing

  1. Check Event Hub is receiving events
  2. Verify storage account has hierarchical namespace
  3. Confirm container exists and permissions are correct
  4. Check capture is enabled (not just configured)

Performance Issues

  • Increase partition count for more parallelism
  • Use Premium namespace for lower latency
  • Enable compression on storage
  • Optimize Spark cluster sizing

Azure Integration Hub - Advanced Level