← Back to ArticlesEvent Hubs

Real-Time Data Streaming with Event Hubs

Comprehensive guide to building real-time data streaming solutions with Azure Event Hubs including checkpointing, consumer groups, Kafka protocol integration, and production patterns

Real-Time Data Streaming with Event Hubs

Why Event Hubs?

Azure Event Hubs is a fully managed streaming platform capable of processing millions of events per second:

Perfect for:


Understanding Event Hubs Architecture

Core Concepts

┌─────────────────────────────────────────────────────────────────────────────┐
│                        Event Hubs Architecture                              │
└─────────────────────────────────────────────────────────────────────────────┘

┌─────────────────┐         ┌──────────────────────────────────────────────┐
│  Publishers     │         │              Event Hubs Namespace            │
│                 │         │                                              │
│  - IoT Devices  │         │  ┌────────────────────────────────────────┐  │
│  - Applications │────────▶│  │         Event Hub (orders)             │  │
│  - Logs        │          │  │                                        │  │
│                 │         │  │  Partitions: 0, 1, 2, 3, 4             │  │
│                 │         │  │  Partition Keys: orderId, customerId   │  │
│                 │         │  │  Retention: 7 days                     │  │
│                 │         │  │                                        │  │
└─────────────────┘         │  └────────────────────┬───────────────────┘  │
                            │                       │                      │
                            │  ┌────────────────────┴───────────────────┐  │
                            │  │           Consumer Groups              │  │
                            │  │                                        │  │
                            │  │   - processing-group-1 (Function)      │  │
                            │  │   - analytics-group (Stream Analytics) │  │
                            │  │   - archive-group (Blob Storage)       │  │
                            │  │                                        │  │
                            │  └────────────────────────────────────────┘  │
                            └──────────────────────────────────────────────┘
                                        │
                                        ▼
                           ┌─────────────────────────────────┐
                           │        Consumers                │
                           │                                 │
                           │  - Azure Functions              │
                           │  - Event Processor Client       │
                           │  - Kafka Consumers              │
                           │  - Stream Analytics             │
                           └─────────────────────────────────┘

Partition Keys and Ordering

// Why partition keys matter:

// Without partition key - round robin
// Events distributed evenly across partitions
// No ordering guaranteed between partitions

// With partition key - same key goes to same partition
// Events with same orderId go to same partition
// Ordering guaranteed within partition

var eventData = new EventData(eventBody);
eventData.PartitionKey = orderId;  // All events for same order go to same partition
await eventHubProducerClient.SendAsync(eventData);

Step 1: Publishing Events

Event Hub Producer

using Azure.Messaging.EventHubs;
using Azure.Messaging.EventHubs.Producer;

public class OrderEventProducer
{
    private readonly EventHubProducerClient _producer;
    private readonly ILogger<OrderEventProducer> _logger;

    public OrderEventProducer(
        EventHubProducerClient producer,
        ILogger<OrderEventProducer> logger)
    {
        _producer = producer;
        _logger = logger;
    }

    public async Task PublishOrderCreatedAsync(Order order)
    {
        // Create event data with partition key
        var eventData = new EventData(
            BinaryData.FromObjectAsJson(new OrderCreatedEvent
            {
                OrderId = order.Id,
                CustomerId = order.CustomerId,
                TotalAmount = order.TotalAmount,
                Items = order.Items,
                Region = order.Region,
                Timestamp = DateTime.UtcNow
            }))
        {
            // Partition key ensures same order goes to same partition
            PartitionKey = order.Id.ToString(),
            
            // Properties for filtering and debugging
            Properties =
            {
                ["eventType"] = "OrderCreated",
                ["correlationId"] = order.CorrelationId,
                ["source"] = "order-service"
            }
        };

        // Send single event
        await _producer.SendAsync(eventData);

        _logger.LogInformation("Published order created event: {OrderId}", order.Id);
    }

    public async Task PublishOrderBatchAsync(List<Order> orders)
    {
        // Create batch for efficient sending
        using var eventBatch = await _producer.CreateBatchAsync(new CreateBatchOptions
        {
            MaxSizeInBytes = 1024 * 1024  // 1 MB
        });

        foreach (var order in orders)
        {
            var eventData = new EventData(
                BinaryData.FromObjectAsJson(new OrderCreatedEvent
                {
                    OrderId = order.Id,
                    CustomerId = order.CustomerId,
                    TotalAmount = order.TotalAmount
                }))
            {
                PartitionKey = order.Id.ToString()
            };

            if (!eventBatch.TryAdd(eventData))
            {
                // Batch is full, send current batch
                await _producer.SendAsync(eventBatch);
                
                // Start new batch
                eventBatch.Clear();
                eventBatch.TryAdd(eventData);
            }
        }

        // Send remaining
        if (eventBatch.Count > 0)
        {
            await _producer.SendAsync(eventBatch);
        }

        _logger.LogInformation("Published {Count} order events", orders.Count);
    }

    public async Task PublishWithCompressionAsync(Order order)
    {
        // Compress large events
        var json = JsonSerializer.Serialize(order);
        var compressed = Compress(json);
        
        var eventData = new EventData(BinaryData.FromBytes(compressed))
        {
            PartitionKey = order.Id.ToString(),
            ContentType = "application/json+gzip"
        };

        await _producer.SendAsync(eventData);
    }

    private byte[] Compress(string data)
    {
        using var output = new MemoryStream();
        using (var gzip = new GZipStream(output, CompressionLevel.Optimal))
        {
            using var writer = new StreamWriter(gzip);
            writer.Write(data);
        }
        return output.ToArray();
    }
}

Using Partition Routing

// Route to specific partition for ordered processing
public async Task PublishInOrderAsync(List<OrderEvent> events)
{
    // Group by partition key
    var partitionGroups = events.GroupBy(e => e.OrderId);
    
    foreach (var group in partitionGroups)
    {
        var batch = await _producer.CreateBatchAsync(new CreateBatchOptions
        {
            PartitionId = GetPartitionForKey(group.Key)  // Specific partition
        });
        
        foreach (var evt in group)
        {
            batch.TryAdd(new EventData(BinaryData.FromObjectAsJson(evt)));
        }
        
        await _producer.SendAsync(batch);
    }
}

private string GetPartitionForKey(string key)
{
    // Hash to consistent partition
    var hash = key.GetHashCode();
    var partition = Math.Abs(hash) % 4;  // 4 partitions
    return partition.ToString();
}

Step 2: Processing Events

Azure Function with Event Hub Trigger

using Azure.Messaging.EventHubs;

public class OrderEventProcessor
{
    private readonly IOrderService _orderService;
    private readonly ILogger<OrderEventProcessor> _logger;

    public OrderEventProcessor(
        IOrderService orderService,
        ILogger<OrderEventProcessor> logger)
    {
        _orderService = orderService;
        _logger = logger;
    }

    [Function("ProcessOrders")]
    public async Task Run(
        [EventHubTrigger("orders", Connection = "EventHubConnection")] 
        EventData[] events,
        PartitionContext partitionContext)
    {
        _logger.LogInformation(
            "Processing partition {PartitionId}, {Count} events",
            partitionContext.PartitionId,
            events.Length);

        foreach (var eventData in events)
        {
            try
            {
                await ProcessEventAsync(eventData, partitionContext);
            }
            catch (Exception ex)
            {
                _logger.LogError(ex, 
                    "Failed to process event {EventId}", 
                    eventData.MessageId);
                
                // Either throw to let system handle retry
                // or handle individually
                throw;
            }
        }
    }

    private async Task ProcessEventAsync(EventData eventData, PartitionContext context)
    {
        // Get event properties
        var eventType = eventData.Properties["eventType"]?.ToString();
        var correlationId = eventData.Properties["correlationId"]?.ToString();

        _logger.LogInformation(
            "Processing event {EventId}, Type: {Type}, Partition: {Partition}",
            eventData.MessageId, eventType, context.PartitionId);

        // Deserialize based on content type
        var contentType = eventData.ContentType;
        object orderEvent;

        if (contentType?.Contains("gzip") ?? false)
        {
            // Decompress
            var decompressed = Decompress(eventData.Body.ToArray());
            orderEvent = JsonSerializer.Deserialize<OrderCreatedEvent>(decompressed);
        }
        else
        {
            orderEvent = eventData.Body.ToObjectFromJson<OrderCreatedEvent>();
        }

        // Process the event
        switch (eventType)
        {
            case "OrderCreated":
                await _orderService.ProcessOrderCreatedAsync((OrderCreatedEvent)orderEvent);
                break;
                
            case "OrderUpdated":
                await _orderService.ProcessOrderUpdatedAsync((OrderUpdatedEvent)orderEvent);
                break;
                
            default:
                _logger.LogWarning("Unknown event type: {Type}", eventType);
                break;
        }

        // Update checkpoint
        await context.UpdateCheckpointAsync(eventData);
    }

    private byte[] Decompress(byte[] data)
    {
        using var input = new MemoryStream(data);
        using var gzip = new GZipStream(input, CompressionMode.Decompress);
        using var output = new MemoryStream();
        gzip.CopyTo(output);
        return output.ToArray();
    }
}

Step 3: Event Processor Client

Using EventProcessorClient with Checkpointing

using Azure.Messaging.EventHubs;
using Azure.Messaging.EventHubs.Processor;
using Azure.Storage.Blobs;

public class EventHubConsumer
{
    private readonly BlobContainerClient _checkpointStore;
    private readonly EventProcessorClient _processor;
    private readonly ILogger<EventHubConsumer> _logger;

    public EventHubConsumer(
        EventHubClient client,
        BlobContainerClient checkpointStore,
        ILogger<EventHubConsumer> logger)
    {
        _checkpointStore = checkpointStore;
        _logger = logger;

        _processor = new EventProcessorClient(
            checkpointStore,
            consumerGroup: "processing-group",
            fullyQualifiedNamespace: client.FullyQualifiedNamespace,
            eventHubName: client.EventHubName,
            eventHubConsumerGroup: "processing-group");

        _processor.ProcessEventAsync += ProcessEventAsync;
        _processor.ProcessErrorAsync += ProcessErrorAsync;
    }

    public async Task StartProcessingAsync()
    {
        await _processor.StartProcessingAsync();
        _logger.LogInformation("Started processing events");
    }

    private async Task ProcessEventAsync(ProcessEventArgs args)
    {
        try
        {
            var eventData = args.Data;
            
            // Process the event
            var orderEvent = eventData.Body.ToObjectFromJson<OrderCreatedEvent>();
            await ProcessOrderAsync(orderEvent);

            // Checkpoint - save offset so we resume here on restart
            await args.UpdateCheckpointAsync();

            _logger.LogInformation(
                "Processed event {Id}, partition {Partition}, offset {Offset}",
                eventData.MessageId,
                args.Partition.PartitionId,
                eventData.Offset);
        }
        catch (Exception ex)
        {
            _logger.LogError(ex, "Error processing event");
            
            // Don't update checkpoint - will be retried
            throw;
        }
    }

    private Task ProcessErrorAsync(ProcessErrorEventArgs args)
    {
        _logger.LogError(args.Exception,
            "Error in partition {Partition}, source: {Source}",
            args.Partition?.PartitionId,
            args.ErrorSource);

        return Task.CompletedTask;
    }

    private async Task ProcessOrderAsync(OrderCreatedEvent order)
    {
        // Your business logic here
        await Task.Delay(100);
    }
}

Why Checkpointing Matters

Without Checkpointing:                    With Checkpointing:

┌─────────────────────────────────┐     ┌─────────────────────────────────┐
│ Process events 1-100            │     │ Process events 1-100            │
│                                 │     │                                 │
│ Fails at event 50               │     │ Succeeds - checkpoint at 100    │
│                                 │     │                                 │
│ RESTART                         │     │ RESTART                         │
│                                 │     │                                 │
│ Process events 1-100 AGAIN!     │     │ Resume from 100                 │
│ Wasted work on 1-49             │     │ Process 101-200                 │
└─────────────────────────────────┘     └─────────────────────────────────┘

Benefits:
- Exactly-once processing semantics
- Resilient to failures
- Progress preserved across restarts
- Parallel processing scales

Step 4: Consumer Groups

Why Multiple Consumer Groups?

┌─────────────────────────────────────────────────────────────────┐
│              Multiple Consumer Groups                          │
└─────────────────────────────────────────────────────────────────┘

Event Hub:
  Partition 0: ────Events: 1, 2, 3, 4, 5, 6, 7, 8, 9, 10──
  Partition 1: ────Events: 1, 2, 3, 4, 5, 6, 7, 8, 9, 10──

Consumer Groups:                         Each gets all events:
                                         
┌──────────────────────┐                ┌──────────────────────┐
│  processing-group    │                │  processing-group:   │
│  (Azure Functions)   │                │  Events 1, 3, 5, 7, 9│
└──────────────────────┘                └──────────────────────┘
                                         
┌──────────────────────┐                ┌──────────────────────┐
│  analytics-group     │                │  analytics-group:    │
│  (Stream Analytics)  │                │  Events 1, 2, 4, 6, 8│
└──────────────────────┘                └──────────────────────┘
                                         
┌──────────────────────┐                ┌──────────────────────┐
│  archive-group       │                │  archive-group:      │
│  (Blob Storage)      │                │  Events 1, 3, 5, 7, 9│
└──────────────────────┘                └──────────────────────┘

Why use different groups?
- Different processing logic
- Independent scaling
- No interference between consumers
- Each maintains own position

Configuring Consumer Groups

// Create processor with specific consumer group
var processor = new EventProcessorClient(
    checkpointStore,
    consumerGroup: "my-consumer-group",  // Each processor gets its own
    eventHubConnectionString: "...",
    eventHubName: "orders");

// Azure Functions configuration
[EventHubTrigger("orders", 
    Connection = "EventHubConnection", 
    ConsumerGroup = "$Default")]  // Or custom group

// Programmatic creation
public async Task CreateConsumerGroupAsync()
{
    var adminClient = new EventHubAdministrationClient(connectionString);
    
    await adminClient.CreateConsumerGroupAsync(
        eventHubName: "orders",
        consumerGroupName: "analytics-group",
        consumerGroupProperties: new ConsumerGroupProperties
        {
            UserMetadata = "Events for analytics pipeline"
        });
}

Step 5: Kafka Integration

Using Kafka Protocol

using Confluent.Kafka;

// Event Hubs is Kafka-compatible!
// Just change bootstrap servers

var config = new ProducerConfig
{
    BootstrapServers = "my-namespace.servicebus.windows.net:9093",
    SecurityProtocol = SecurityProtocol.SaslSsl,
    SaslMechanism = SaslMechanism.Plain,
    SaslUsername = "$ConnectionString",
    SaslPassword = "Endpoint=sb://..."
};

using var producer = new ProducerBuilder<string, string>(config).Build();

await producer.ProduceAsync("orders", new Message<string, string>
{
    Key = orderId,
    Value = JsonSerializer.Serialize(order)
});

// Consume with Kafka consumer
var consumerConfig = new ConsumerConfig
{
    BootstrapServers = "my-namespace.servicebus.windows.net:9093",
    GroupId = "my-consumer-group",
    SecurityProtocol = SecurityProtocol.SaslSsl,
    SaslMechanism = SaslMechanism.Plain,
    SaslUsername = "$ConnectionString",
    SaslPassword = "Endpoint=sb://...",
    AutoOffsetReset = AutoOffsetReset.Earliest,
    EnableAutoCommit = false
};

using var consumer = new ConsumerBuilder<string, string>(consumerConfig).Build();
consumer.Subscribe("orders");

while (true)
{
    var result = consumer.Consume();
    await ProcessOrderAsync(result.Message.Value);
    consumer.Commit(result);
}

Why Kafka Compatibility?

Kafka Tools Work with Event Hubs:

┌─────────────────────────────────────────────────────────────────┐
│                    Kafka Ecosystem                              │
└─────────────────────────────────────────────────────────────────┘

 Producers:                     Consumers:
 - Java/C# producers            - Kafka consumers
 - Flink                       - Flink
 - Spark                       - Spark
 - Custom apps                 - Custom apps

 Tools:
 - Kafka Connect               - Kafka Streams
 - ksqlDB                      - Debezium (CDC)
 - Kafka Monitor              - Consumer group management

All work with Event Hubs!

Benefits:
- No Kafka expertise needed for Event Hubs
- Existing Kafka tools work
- Easy migration from Kafka
- Best of both worlds

Step 6: Scaling and Performance

Partition Strategies

// Choosing partition count

// Low volume: 2-4 partitions
// - Up to 1K events/sec
// - Simple processing

// Medium volume: 8-16 partitions  
// - Up to 10K events/sec
// - Good parallelism

// High volume: 32+ partitions
// - 10K+ events/sec
// - Maximum throughput

// Dynamic scaling (Premium)
var eventHub = new EventHubClient(...);
await eventHub.ScaleStorageClusterAsync(
    new ScaleOptions { 
        CapacityUnits = 4  // Add throughput units
    });

Performance Optimization

// Optimize batch processing
public class OptimizedProcessor
{
    // Batch events for better throughput
    [Function("ProcessBatch")]
    public async Task Run(
        [EventHubTrigger("orders", 
            Connection = "EventHubConnection",
            BatchOptions = new BatchOptions 
            { 
                MaxBatchSize = 100,      // Process up to 100 at once
                MaxBatchDuration = TimeSpan.FromSeconds(5)  // Or after 5 sec
            })] 
        EventData[] events)
    {
        // Process in parallel
        var tasks = events.Select(ProcessEventAsync);
        await Task.WhenAll(tasks);
    }

    // Pre-fetch for lower latency
    [Function("OptimizedProcessor")]
    public async Task Run(
        [EventHubTrigger("orders",
            Connection = "EventHubConnection",
            PrefetchCount = 200)]  // Keep 200 events in memory
        EventData[] events)
    {
        // Lower latency processing
    }
}

Step 7: Monitoring and Diagnostics

public class EventHubMonitor
{
    private readonly EventHubAdministrationClient _adminClient;
    private readonly ILogger<EventHubMonitor> _logger;

    public async Task<EventHubMetrics> GetMetricsAsync(string eventHubName)
    {
        var metrics = new EventHubMetrics();

        // Get runtime info
        var runtimeInfo = await _adminClient.GetEventHubAsync(eventHubName);
        
        metrics.PartitionCount = runtimeInfo.Value.PartitionCount;
        metrics.PartitionIds = runtimeInfo.Value.PartitionIds;

        // Get partition metrics
        foreach (var partitionId in runtimeInfo.Value.PartitionIds)
        {
            var partitionInfo = await _adminClient.GetPartitionAsync(eventHubName, partitionId);
            
            metrics.Partitions[partitionId] = new PartitionMetrics
            {
                LatestOffset = partitionInfo.Value.LastEnqueuedOffset,
                IncomingEvents = partitionInfo.Value.IncomingEventsPerSecond,
                OutgoingEvents = partitionInfo.Value.OutgoingEventsPerSecond,
                Lag = partitionInfo.Value.LagInEvents,
                LastEnqueuedTime = partitionInfo.Value.LastEnqueuedTime
            };
        }

        return metrics;
    }

    public async Task CheckBacklogAsync(string eventHubName)
    {
        var metrics = await GetMetricsAsync(eventHubName);
        
        foreach (var (partitionId, partitionMetrics) in metrics.Partitions)
        {
            if (partitionMetrics.Lag > 1000)
            {
                _logger.LogWarning(
                    "Partition {Partition} has backlog: {Lag} events",
                    partitionId, partitionMetrics.Lag);
            }
        }
    }
}

Best Practices Summary

PracticeWhyImplementation
Use partition keysEnsure ordering within partitionKey by business entity
Implement checkpointingResume from last positionUse Blob/ADLS
Create consumer groupsIndependent processingOne per processor type
Batch processingHigher throughputConfigure batch size
Monitor lagDetect processing issuesTrack partition lag

Conclusion

Event Hubs provides enterprise-grade streaming:

Key takeaways:

  1. Use partition keys for related event ordering
  2. Implement checkpointing for resilience
  3. Create separate consumer groups for different processing paths
  4. Monitor partition lag to detect issues
  5. Take advantage of Kafka compatibility for tooling

Azure Integration Hub - Event Hubs