Azure Event Hubs — Function App + Event Hubs Trigger
Batch Processing, Max Batch Size, Prefetch, and Performance Optimization
Introduction
Azure Functions provides native integration with Azure Event Hubs through the Event Hubs trigger. This integration enables serverless event processing with automatic scaling and optimized throughput.
This guide covers:
- Trigger configuration — How Functions connect to Event Hubs
- Batch processing — Handling multiple events efficiently
- Prefetch optimization — Improving throughput with prefetch
- Error handling — Retry policies and dead letter queues
- Performance tuning — Optimizing for your workload
How the Trigger Works
Architecture
┌─────────────────────────────────────────────────────────────────────┐
│ EVENT HUBS TRIGGER ARCHITECTURE │
├─────────────────────────────────────────────────────────────────────┤
│ │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ Event Hubs │ │ SDK │ │ Azure │ │
│ │ Events │─────▶│ Polling │─────▶│ Functions │ │
│ │ (Partition) │ │ (Checkpoint) │ Runtime │ │
│ └──────────────┘ └──────────────┘ └──────┬───────┘ │
│ │ │
│ Checkpointing ▼ │
│ ┌──────────────┐ ┌──────────────┐ │
│ │ Azure │◀──────────────────────│ Processing │ │
│ │ Storage │ │ Complete │ │
│ └──────────────┘ └──────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────┘
Event Processor Client
Under the hood, Functions uses the EventProcessorClient:
// What happens inside the trigger
var processor = new EventProcessorClientBuilder()
.Connection(connectionString)
.ConsumerGroup("$Default")
.EventProcessor<MyProcessor>()
.BuildEventProcessorClient();
// Checkpointing stores the last processed offset
await context.CheckpointAsync();
Basic Trigger Implementation
Simple Event Processing
using System.Text;
using System.Text.Json;
using Azure.Messaging.EventHubs;
using Microsoft.Azure.WebJobs;
using Microsoft.Extensions.Logging;
public static class OrderProcessor
{
[FunctionName("ProcessOrders")]
public static async Task ProcessOrders(
[EventHubTrigger("orders", Connection = "EventHubConnectionString")]
EventData[] events,
ILogger log)
{
log.LogInformation($"Received batch of {events.Length} events");
foreach (var eventData in events)
{
try
{
// Get message body
var body = Encoding.UTF8.GetString(eventData.Body);
// Deserialize order
var order = JsonSerializer.Deserialize<Order>(body, new JsonSerializerOptions
{
PropertyNameCaseInsensitive = true
});
// Process the order
await ProcessOrderAsync(order, log);
log.LogInformation("Order {OrderId} processed successfully", order.OrderId);
}
catch (Exception ex)
{
log.LogError(ex, "Failed to process event: {EventId}", eventData.MessageId);
throw; // Rethrow to trigger retry
}
}
}
private static async Task ProcessOrderAsync(Order order, ILogger log)
{
// Business logic here
await Task.CompletedTask;
}
}
Configuration in host.json
{
"version": "2.0",
"extensions": {
"eventHubs": {
"batch": {
"maxBatchSize": 10,
"maxConcurrentBatches": 2
}
}
},
"functionTimeout": "00:05:00"
}
Batch Processing Deep Dive
Understanding Batch Settings
| Setting | Description | Recommended Value |
|---|---|---|
maxBatchSize | Maximum events per function invocation | 10-100 |
maxConcurrentBatches | Parallel batch processing threads | 2-10 |
prefetchCount | Events to fetch ahead of processing | 2x maxBatchSize |
visibilityTimeout | Time before lock auto-renews | 30 seconds |
High-Throughput Configuration
{
"extensions": {
"eventHubs": {
"batch": {
"maxBatchSize": 100,
"maxConcurrentBatches": 10
},
"prefetchCount": 200,
"eventProcessorOptions": {
"maxRetryAttempts": 3,
"retryDelayInSeconds": 5,
"prefetchInterval": "00:00:00.100"
}
}
}
}
Batch Processing Logic
[FunctionName("ProcessHighVolume")]
public static async Task ProcessHighVolume(
[EventHubTrigger("high-volume", Connection = "EventHubConnection")]
EventData[] events,
ILogger log)
{
// events array contains up to maxBatchSize events
log.LogInformation($"Processing {events.Length} events");
// Option 1: Sequential processing
foreach (var evt in events)
{
await ProcessSingleAsync(evt);
}
// Option 2: Parallel processing with Task.WhenAll
var tasks = events.Select(evt => ProcessSingleAsync(evt));
await Task.WhenAll(tasks);
// Option 3: Batch processing - process all at once
await ProcessBatchAsync(events.ToList());
}
Prefetch Optimization
Why Prefetch Matters:
WITHOUT PREFETCH: WITH PREFETCH:
┌────────────────────────────┐ ┌────────────────────────────┐
│ Time │ │ Time │
│ │ │ │
│ ┌────┐ ┌────┐ ┌────┐ │ │ ┌────┐┌────┐┌────┐ │
│ │Wait│ │Wait│ │Wait│ │ │ │████││████││████│ │
│ └────┘ └────┘ └────┘ │ │ └────┘└────┘└────┘ │
│ Fetch Process Fetch │ │ Prefetch Processing │
│ └────────┬─────────┘ │ │ └────────┬─────────┘ │
│ Idle time │ │ No idle │
└────────────────────────────┘ └────────────────────────────┘
Configuration:
// Programmatic configuration (C# script)
public class Startup : FunctionsStartup
{
public override void Configure(IFunctionsHostBuilder builder)
{
builder.Services.AddSingleton<Azure.Messaging.EventHubs.Primitives.EventProcessorClient>(
new EventProcessorClientBuilder()
.Connection(Environment.GetEnvironmentVariable("EventHubConnection"))
.ConsumerGroup("$Default")
.PrefetchCount(200) // Fetch 200 events ahead
.Build());
}
}
Retry Configuration
Built-in Retry
{
"extensions": {
"eventHubs": {
"eventProcessorOptions": {
"maxRetryAttempts": 3,
"retryDelayInSeconds": 5
}
}
}
}
Custom Retry with Exponential Backoff
[FunctionName("ProcessWithRetry")]
public static async Task ProcessWithRetry(
[EventHubTrigger("orders", Connection = "EventHubConnection")]
EventData[] events,
ILogger log)
{
foreach (var evt in events)
{
var retryCount = 0;
var maxRetries = 3;
while (retryCount < maxRetries)
{
try
{
await ProcessOrderAsync(evt, log);
break; // Success, exit retry loop
}
catch (Exception ex)
{
retryCount++;
if (retryCount >= maxRetries)
{
log.LogError(ex, "All retries exhausted for message");
throw;
}
// Exponential backoff
var delay = TimeSpan.FromSeconds(Math.Pow(2, retryCount));
log.LogWarning("Retry {RetryCount} after {Delay}s", retryCount, delay.TotalSeconds);
await Task.Delay(delay);
}
}
}
}
Error Handling and Dead Letter Queue
Handling Failed Events
[FunctionName("ProcessWithDLQ")]
public static async Task ProcessWithDLQ(
[EventHubTrigger("orders", Connection = "EventHubConnection")]
EventData[] events,
[ServiceBusQueue("orders-dlq", Connection = "ServiceBusConnection")]
MessageSender dlqSender,
ILogger log)
{
var failedEvents = new List<EventData>();
foreach (var evt in events)
{
try
{
await ProcessOrderAsync(evt, log);
}
catch (Exception ex)
{
log.LogError(ex, "Failed to process event {MessageId}", evt.MessageId);
failedEvents.Add(evt);
}
}
// Send failed events to dead letter queue
if (failedEvents.Any())
{
log.LogWarning("Sending {Count} failed events to DLQ", failedEvents.Count);
foreach (var failed in failedEvents)
{
var dlqMessage = new Message(failed.Body)
{
ContentType = failed.ContentType,
MessageId = failed.MessageId,
Properties = failed.Properties
};
// Add failure info
dlqMessage.ApplicationProperties["FailedAt"] = DateTime.UtcNow.ToString("O");
dlqMessage.ApplicationProperties["OriginalQueue"] = "orders";
await dlqSender.SendMessageAsync(dlqMessage);
}
}
}
Poison Message Handling
[FunctionName("ProcessWithPoisonHandling")]
public static async Task ProcessWithPoisonHandling(
[EventHubTrigger("orders", Connection = "EventHubConnection")]
EventData[] events,
ILogger log)
{
foreach (var evt in events)
{
try
{
// Check for malformed messages
if (evt.Body == null || evt.Body.Length == 0)
{
log.LogWarning("Empty message detected, skipping");
continue; // Don't retry empty messages
}
var body = Encoding.UTF8.GetString(evt.Body);
// Validate JSON
if (!IsValidJson(body))
{
log.LogWarning("Invalid JSON, skipping message");
continue; // Skip invalid JSON
}
await ProcessOrderAsync(evt, log);
}
catch (JsonException ex)
{
// Don't retry - message is permanently invalid
log.LogError(ex, "Invalid JSON format, skipping message");
}
catch (Exception ex) when (ex.Message.Contains("timeout"))
{
// Retry timeout errors
throw;
}
}
}
private static bool IsValidJson(string text)
{
try
{
JsonDocument.Parse(text);
return true;
}
catch
{
return false;
}
}
Checkpointing
Automatic Checkpointing
// Function automatically checkpoints after successful execution
[FunctionName("ProcessAndCheckpoint")]
public static async Task ProcessAndCheckpoint(
[EventHubTrigger("orders", Connection = "EventHubConnection")]
EventData[] events,
PartitionContext context,
ILogger log)
{
log.LogInformation($"Processing {events.Length} events from partition {context.PartitionId}");
// Process all events
foreach (var evt in events)
{
await ProcessAsync(evt);
}
// Checkpoint saves the offset after successful processing
// This is called automatically if no exception is thrown
await context.CheckpointAsync();
log.LogInformation("Checkpoint saved for partition {PartitionId}", context.PartitionId);
}
Manual Checkpointing
[FunctionName("ProcessWithManualCheckpoint")]
public static async Task ProcessWithManualCheckpoint(
[EventHubTrigger("orders", Connection = "EventHubConnection")]
EventData[] events,
PartitionContext context,
ILogger log)
{
var successCount = 0;
foreach (var evt in events)
{
try
{
await ProcessAsync(evt);
successCount++;
// Checkpoint every 10 successful events
if (successCount % 10 == 0)
{
await context.CheckpointAsync();
log.LogInformation("Manual checkpoint at {Count}", successCount);
}
}
catch (Exception ex)
{
log.LogError(ex, "Failed at event {Offset}", evt.Offset);
// Don't checkpoint - message will be retried
throw;
}
}
// Final checkpoint
await context.CheckpointAsync();
}
Dependency Injection
Using DI with Event Hubs Trigger
public class Startup : FunctionsStartup
{
public override void Configure(IFunctionsHostBuilder builder)
{
builder.Services.AddSingleton<IOrderService, OrderService>();
builder.Services.AddSingleton<IInventoryService, InventoryService>();
builder.Services.AddOptions<EventHubOptions>()
.Configure(config =>
{
config.MaxBatchSize = 50;
config.PrefetchCount = 100;
});
}
}
public class OrderFunction
{
private readonly IOrderService _orderService;
private readonly ILogger<OrderFunction> _logger;
public OrderFunction(IOrderService orderService, ILogger<OrderFunction> logger)
{
_orderService = orderService;
_logger = logger;
}
[FunctionName("ProcessOrdersDI")]
public async Task ProcessOrdersDI(
[EventHubTrigger("orders", Connection = "EventHubConnection")]
EventData[] events)
{
foreach (var evt in events)
{
await _orderService.ProcessOrderAsync(evt);
}
}
}
Performance Optimization
Best Practices
| Practice | Description |
|---|---|
| Use prefetch | Set prefetchCount to 2-3x maxBatchSize |
| Process in parallel | Use Task.WhenAll for independent processing |
| Minimize logging | Avoid excessive logging in hot path |
| Reuse connections | Use singleton HttpClient |
| Batch checkpoint | Checkpoint less frequently for high throughput |
| Avoid large messages | Keep message size under 256KB |
Monitoring
[FunctionName("MonitoredProcessor")]
public static async Task MonitoredProcessor(
[EventHubTrigger("orders", Connection = "EventHubConnection")]
EventData[] events,
ILogger log)
{
var sw = System.Diagnostics.Stopwatch.StartNew();
// Process events
await ProcessEventsAsync(events);
sw.Stop();
// Log metrics
log.LogInformation(
"Processed {Count} events in {ElapsedMs}ms ({EventsPerSec} events/sec)",
events.Length,
sw.ElapsedMilliseconds,
events.Length * 1000 / sw.ElapsedMilliseconds);
}
Complete Configuration Example
{
"version": "2.0",
"logging": {
"logLevel": {
"default": "Information",
"Microsoft.Azure.WebJobs": "Warning",
"Microsoft.Azure.WebJobs.Extensions.EventHubs": "Information"
}
},
"extensions": {
"eventHubs": {
"batch": {
"maxBatchSize": 100,
"maxConcurrentBatches": 8
},
"prefetchCount": 200,
"eventProcessorOptions": {
"maxRetryAttempts": 3,
"retryDelayInSeconds": 5,
"preferNewPartitionProcessor": true
}
}
},
"functionTimeout": "00:10:00",
"retry": {
"strategy": "exponentialBackoff",
"maxRetryCount": 3,
"delayInterval": "00:00:05",
"maxDelayInterval": "00:01:00"
}
}
Troubleshooting
Common Issues
| Issue | Solution |
|---|---|
| Function not triggered | Check connection string, ensure Event Hub has events |
| Scale not working | Verify in Consumption/Premium plan, check maxConcurrentBatches |
| Checkpoint errors | Ensure Storage account is accessible |
| Duplicate processing | Increase visibilityTimeout or reduce batch size |
| Memory issues | Reduce prefetchCount or batch size |
Diagnostic Queries
// Check for errors in Application Insights
requests
| where timestamp > ago(1h)
| where cloud_RoleName == "YourFunctionApp"
| where name == "ProcessOrders"
| where success == false
| project timestamp, operation_Name, exception
Azure Integration Hub - Advanced Level