Azure Event Hubs — Function App + Event Hubs Trigger

Batch Processing, Max Batch Size, Prefetch, and Performance Optimization


Introduction

Azure Functions provides native integration with Azure Event Hubs through the Event Hubs trigger. This integration enables serverless event processing with automatic scaling and optimized throughput.

This guide covers:

  • Trigger configuration — How Functions connect to Event Hubs
  • Batch processing — Handling multiple events efficiently
  • Prefetch optimization — Improving throughput with prefetch
  • Error handling — Retry policies and dead letter queues
  • Performance tuning — Optimizing for your workload

How the Trigger Works

Architecture

┌─────────────────────────────────────────────────────────────────────┐
│              EVENT HUBS TRIGGER ARCHITECTURE                        │
├─────────────────────────────────────────────────────────────────────┤
│                                                                     │
│   ┌──────────────┐      ┌──────────────┐      ┌──────────────┐      │
│   │ Event Hubs   │      │   SDK        │      │    Azure     │      │
│   │   Events     │─────▶│   Polling    │─────▶│   Functions  │      │
│   │  (Partition) │      │   (Checkpoint)      │   Runtime    │      │
│   └──────────────┘      └──────────────┘      └──────┬───────┘      │
│                                                      │              │
│                                     Checkpointing    ▼              │
│   ┌──────────────┐                       ┌──────────────┐           │
│   │   Azure      │◀──────────────────────│   Processing │           │
│   │   Storage    │                       │   Complete   │           │
│   └──────────────┘                       └──────────────┘           │
│                                                                     │
└─────────────────────────────────────────────────────────────────────┘

Event Processor Client

Under the hood, Functions uses the EventProcessorClient:

// What happens inside the trigger
var processor = new EventProcessorClientBuilder()
    .Connection(connectionString)
    .ConsumerGroup("$Default")
    .EventProcessor<MyProcessor>()
    .BuildEventProcessorClient();

// Checkpointing stores the last processed offset
await context.CheckpointAsync();

Basic Trigger Implementation

Simple Event Processing

using System.Text;
using System.Text.Json;
using Azure.Messaging.EventHubs;
using Microsoft.Azure.WebJobs;
using Microsoft.Extensions.Logging;

public static class OrderProcessor
{
    [FunctionName("ProcessOrders")]
    public static async Task ProcessOrders(
        [EventHubTrigger("orders", Connection = "EventHubConnectionString")] 
        EventData[] events,
        ILogger log)
    {
        log.LogInformation($"Received batch of {events.Length} events");
        
        foreach (var eventData in events)
        {
            try
            {
                // Get message body
                var body = Encoding.UTF8.GetString(eventData.Body);
                
                // Deserialize order
                var order = JsonSerializer.Deserialize<Order>(body, new JsonSerializerOptions
                {
                    PropertyNameCaseInsensitive = true
                });
                
                // Process the order
                await ProcessOrderAsync(order, log);
                
                log.LogInformation("Order {OrderId} processed successfully", order.OrderId);
            }
            catch (Exception ex)
            {
                log.LogError(ex, "Failed to process event: {EventId}", eventData.MessageId);
                throw; // Rethrow to trigger retry
            }
        }
    }
    
    private static async Task ProcessOrderAsync(Order order, ILogger log)
    {
        // Business logic here
        await Task.CompletedTask;
    }
}

Configuration in host.json

{
  "version": "2.0",
  "extensions": {
    "eventHubs": {
      "batch": {
        "maxBatchSize": 10,
        "maxConcurrentBatches": 2
      }
    }
  },
  "functionTimeout": "00:05:00"
}

Batch Processing Deep Dive

Understanding Batch Settings

SettingDescriptionRecommended Value
maxBatchSizeMaximum events per function invocation10-100
maxConcurrentBatchesParallel batch processing threads2-10
prefetchCountEvents to fetch ahead of processing2x maxBatchSize
visibilityTimeoutTime before lock auto-renews30 seconds

High-Throughput Configuration

{
  "extensions": {
    "eventHubs": {
      "batch": {
        "maxBatchSize": 100,
        "maxConcurrentBatches": 10
      },
      "prefetchCount": 200,
      "eventProcessorOptions": {
        "maxRetryAttempts": 3,
        "retryDelayInSeconds": 5,
        "prefetchInterval": "00:00:00.100"
      }
    }
  }
}

Batch Processing Logic

[FunctionName("ProcessHighVolume")]
public static async Task ProcessHighVolume(
    [EventHubTrigger("high-volume", Connection = "EventHubConnection")] 
    EventData[] events,
    ILogger log)
{
    // events array contains up to maxBatchSize events
    log.LogInformation($"Processing {events.Length} events");
    
    // Option 1: Sequential processing
    foreach (var evt in events)
    {
        await ProcessSingleAsync(evt);
    }
    
    // Option 2: Parallel processing with Task.WhenAll
    var tasks = events.Select(evt => ProcessSingleAsync(evt));
    await Task.WhenAll(tasks);
    
    // Option 3: Batch processing - process all at once
    await ProcessBatchAsync(events.ToList());
}

Prefetch Optimization

Why Prefetch Matters:

WITHOUT PREFETCH:                    WITH PREFETCH:
┌────────────────────────────┐    ┌────────────────────────────┐
│ Time                       │    │ Time                       │
│                            │    │                            │
│ ┌────┐  ┌────┐  ┌────┐     │    │ ┌────┐┌────┐┌────┐         │
│ │Wait│  │Wait│  │Wait│     │    │ │████││████││████│         │
│ └────┘  └────┘  └────┘     │    │ └────┘└────┘└────┘         │
│ Fetch  Process  Fetch      │    │ Prefetch Processing        │
│    └────────┬─────────┘    │    │    └────────┬─────────┘    │
│            Idle time       │    │            No idle         │
└────────────────────────────┘    └────────────────────────────┘

Configuration:

// Programmatic configuration (C# script)
public class Startup : FunctionsStartup
{
    public override void Configure(IFunctionsHostBuilder builder)
    {
        builder.Services.AddSingleton<Azure.Messaging.EventHubs.Primitives.EventProcessorClient>(
            new EventProcessorClientBuilder()
                .Connection(Environment.GetEnvironmentVariable("EventHubConnection"))
                .ConsumerGroup("$Default")
                .PrefetchCount(200)  // Fetch 200 events ahead
                .Build());
    }
}

Retry Configuration

Built-in Retry

{
  "extensions": {
    "eventHubs": {
      "eventProcessorOptions": {
        "maxRetryAttempts": 3,
        "retryDelayInSeconds": 5
      }
    }
  }
}

Custom Retry with Exponential Backoff

[FunctionName("ProcessWithRetry")]
public static async Task ProcessWithRetry(
    [EventHubTrigger("orders", Connection = "EventHubConnection")] 
    EventData[] events,
    ILogger log)
{
    foreach (var evt in events)
    {
        var retryCount = 0;
        var maxRetries = 3;
        
        while (retryCount < maxRetries)
        {
            try
            {
                await ProcessOrderAsync(evt, log);
                break; // Success, exit retry loop
            }
            catch (Exception ex)
            {
                retryCount++;
                if (retryCount >= maxRetries)
                {
                    log.LogError(ex, "All retries exhausted for message");
                    throw;
                }
                
                // Exponential backoff
                var delay = TimeSpan.FromSeconds(Math.Pow(2, retryCount));
                log.LogWarning("Retry {RetryCount} after {Delay}s", retryCount, delay.TotalSeconds);
                await Task.Delay(delay);
            }
        }
    }
}

Error Handling and Dead Letter Queue

Handling Failed Events

[FunctionName("ProcessWithDLQ")]
public static async Task ProcessWithDLQ(
    [EventHubTrigger("orders", Connection = "EventHubConnection")] 
    EventData[] events,
    [ServiceBusQueue("orders-dlq", Connection = "ServiceBusConnection")]
    MessageSender dlqSender,
    ILogger log)
{
    var failedEvents = new List<EventData>();
    
    foreach (var evt in events)
    {
        try
        {
            await ProcessOrderAsync(evt, log);
        }
        catch (Exception ex)
        {
            log.LogError(ex, "Failed to process event {MessageId}", evt.MessageId);
            failedEvents.Add(evt);
        }
    }
    
    // Send failed events to dead letter queue
    if (failedEvents.Any())
    {
        log.LogWarning("Sending {Count} failed events to DLQ", failedEvents.Count);
        
        foreach (var failed in failedEvents)
        {
            var dlqMessage = new Message(failed.Body)
            {
                ContentType = failed.ContentType,
                MessageId = failed.MessageId,
                Properties = failed.Properties
            };
            
            // Add failure info
            dlqMessage.ApplicationProperties["FailedAt"] = DateTime.UtcNow.ToString("O");
            dlqMessage.ApplicationProperties["OriginalQueue"] = "orders";
            
            await dlqSender.SendMessageAsync(dlqMessage);
        }
    }
}

Poison Message Handling

[FunctionName("ProcessWithPoisonHandling")]
public static async Task ProcessWithPoisonHandling(
    [EventHubTrigger("orders", Connection = "EventHubConnection")] 
    EventData[] events,
    ILogger log)
{
    foreach (var evt in events)
    {
        try
        {
            // Check for malformed messages
            if (evt.Body == null || evt.Body.Length == 0)
            {
                log.LogWarning("Empty message detected, skipping");
                continue; // Don't retry empty messages
            }
            
            var body = Encoding.UTF8.GetString(evt.Body);
            
            // Validate JSON
            if (!IsValidJson(body))
            {
                log.LogWarning("Invalid JSON, skipping message");
                continue; // Skip invalid JSON
            }
            
            await ProcessOrderAsync(evt, log);
        }
        catch (JsonException ex)
        {
            // Don't retry - message is permanently invalid
            log.LogError(ex, "Invalid JSON format, skipping message");
        }
        catch (Exception ex) when (ex.Message.Contains("timeout"))
        {
            // Retry timeout errors
            throw;
        }
    }
}

private static bool IsValidJson(string text)
{
    try
    {
        JsonDocument.Parse(text);
        return true;
    }
    catch
    {
        return false;
    }
}

Checkpointing

Automatic Checkpointing

// Function automatically checkpoints after successful execution
[FunctionName("ProcessAndCheckpoint")]
public static async Task ProcessAndCheckpoint(
    [EventHubTrigger("orders", Connection = "EventHubConnection")] 
    EventData[] events,
    PartitionContext context,
    ILogger log)
{
    log.LogInformation($"Processing {events.Length} events from partition {context.PartitionId}");
    
    // Process all events
    foreach (var evt in events)
    {
        await ProcessAsync(evt);
    }
    
    // Checkpoint saves the offset after successful processing
    // This is called automatically if no exception is thrown
    await context.CheckpointAsync();
    
    log.LogInformation("Checkpoint saved for partition {PartitionId}", context.PartitionId);
}

Manual Checkpointing

[FunctionName("ProcessWithManualCheckpoint")]
public static async Task ProcessWithManualCheckpoint(
    [EventHubTrigger("orders", Connection = "EventHubConnection")] 
    EventData[] events,
    PartitionContext context,
    ILogger log)
{
    var successCount = 0;
    
    foreach (var evt in events)
    {
        try
        {
            await ProcessAsync(evt);
            successCount++;
            
            // Checkpoint every 10 successful events
            if (successCount % 10 == 0)
            {
                await context.CheckpointAsync();
                log.LogInformation("Manual checkpoint at {Count}", successCount);
            }
        }
        catch (Exception ex)
        {
            log.LogError(ex, "Failed at event {Offset}", evt.Offset);
            // Don't checkpoint - message will be retried
            throw;
        }
    }
    
    // Final checkpoint
    await context.CheckpointAsync();
}

Dependency Injection

Using DI with Event Hubs Trigger

public class Startup : FunctionsStartup
{
    public override void Configure(IFunctionsHostBuilder builder)
    {
        builder.Services.AddSingleton<IOrderService, OrderService>();
        builder.Services.AddSingleton<IInventoryService, InventoryService>();
        builder.Services.AddOptions<EventHubOptions>()
            .Configure(config =>
            {
                config.MaxBatchSize = 50;
                config.PrefetchCount = 100;
            });
    }
}

public class OrderFunction
{
    private readonly IOrderService _orderService;
    private readonly ILogger<OrderFunction> _logger;
    
    public OrderFunction(IOrderService orderService, ILogger<OrderFunction> logger)
    {
        _orderService = orderService;
        _logger = logger;
    }
    
    [FunctionName("ProcessOrdersDI")]
    public async Task ProcessOrdersDI(
        [EventHubTrigger("orders", Connection = "EventHubConnection")] 
        EventData[] events)
    {
        foreach (var evt in events)
        {
            await _orderService.ProcessOrderAsync(evt);
        }
    }
}

Performance Optimization

Best Practices

PracticeDescription
Use prefetchSet prefetchCount to 2-3x maxBatchSize
Process in parallelUse Task.WhenAll for independent processing
Minimize loggingAvoid excessive logging in hot path
Reuse connectionsUse singleton HttpClient
Batch checkpointCheckpoint less frequently for high throughput
Avoid large messagesKeep message size under 256KB

Monitoring

[FunctionName("MonitoredProcessor")]
public static async Task MonitoredProcessor(
    [EventHubTrigger("orders", Connection = "EventHubConnection")] 
    EventData[] events,
    ILogger log)
{
    var sw = System.Diagnostics.Stopwatch.StartNew();
    
    // Process events
    await ProcessEventsAsync(events);
    
    sw.Stop();
    
    // Log metrics
    log.LogInformation(
        "Processed {Count} events in {ElapsedMs}ms ({EventsPerSec} events/sec)",
        events.Length,
        sw.ElapsedMilliseconds,
        events.Length * 1000 / sw.ElapsedMilliseconds);
}

Complete Configuration Example

{
  "version": "2.0",
  "logging": {
    "logLevel": {
      "default": "Information",
      "Microsoft.Azure.WebJobs": "Warning",
      "Microsoft.Azure.WebJobs.Extensions.EventHubs": "Information"
    }
  },
  "extensions": {
    "eventHubs": {
      "batch": {
        "maxBatchSize": 100,
        "maxConcurrentBatches": 8
      },
      "prefetchCount": 200,
      "eventProcessorOptions": {
        "maxRetryAttempts": 3,
        "retryDelayInSeconds": 5,
        "preferNewPartitionProcessor": true
      }
    }
  },
  "functionTimeout": "00:10:00",
  "retry": {
    "strategy": "exponentialBackoff",
    "maxRetryCount": 3,
    "delayInterval": "00:00:05",
    "maxDelayInterval": "00:01:00"
  }
}

Troubleshooting

Common Issues

IssueSolution
Function not triggeredCheck connection string, ensure Event Hub has events
Scale not workingVerify in Consumption/Premium plan, check maxConcurrentBatches
Checkpoint errorsEnsure Storage account is accessible
Duplicate processingIncrease visibilityTimeout or reduce batch size
Memory issuesReduce prefetchCount or batch size

Diagnostic Queries

// Check for errors in Application Insights
requests
| where timestamp > ago(1h)
| where cloud_RoleName == "YourFunctionApp"
| where name == "ProcessOrders"
| where success == false
| project timestamp, operation_Name, exception

Azure Integration Hub - Advanced Level