Real-Time Data Streaming with Event Hubs
Why Event Hubs?
Azure Event Hubs is a fully managed streaming platform capable of processing millions of events per second:
- Massive throughput - Handle millions of events/second
- Low latency - Sub-second processing delay
- Kafka compatibility - Use Kafka ecosystem tools
- Scalability - Scale partitions up or down
- Durability - Events stored for retention period
Perfect for:
- IoT telemetry ingestion
- Clickstream analytics
- Application logging
- Real-time dashboards
Understanding Event Hubs Architecture
Core Concepts
┌─────────────────────────────────────────────────────────────────────────────┐
│ Event Hubs Architecture │
└─────────────────────────────────────────────────────────────────────────────┘
┌─────────────────┐ ┌──────────────────────────────────────────────┐
│ Publishers │ │ Event Hubs Namespace │
│ │ │ │
│ - IoT Devices │ │ ┌────────────────────────────────────────┐ │
│ - Applications │────────▶│ │ Event Hub (orders) │ │
│ - Logs │ │ │ │ │
│ │ │ │ Partitions: 0, 1, 2, 3, 4 │ │
│ │ │ │ Partition Keys: orderId, customerId │ │
│ │ │ │ Retention: 7 days │ │
│ │ │ │ │ │
└─────────────────┘ │ └────────────────────┬───────────────────┘ │
│ │ │
│ ┌────────────────────┴───────────────────┐ │
│ │ Consumer Groups │ │
│ │ │ │
│ │ - processing-group-1 (Function) │ │
│ │ - analytics-group (Stream Analytics) │ │
│ │ - archive-group (Blob Storage) │ │
│ │ │ │
│ └────────────────────────────────────────┘ │
└──────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────┐
│ Consumers │
│ │
│ - Azure Functions │
│ - Event Processor Client │
│ - Kafka Consumers │
│ - Stream Analytics │
└─────────────────────────────────┘
Partition Keys and Ordering
// Why partition keys matter:
// Without partition key - round robin
// Events distributed evenly across partitions
// No ordering guaranteed between partitions
// With partition key - same key goes to same partition
// Events with same orderId go to same partition
// Ordering guaranteed within partition
var eventData = new EventData(eventBody);
eventData.PartitionKey = orderId; // All events for same order go to same partition
await eventHubProducerClient.SendAsync(eventData);
Step 1: Publishing Events
Event Hub Producer
using Azure.Messaging.EventHubs;
using Azure.Messaging.EventHubs.Producer;
public class OrderEventProducer
{
private readonly EventHubProducerClient _producer;
private readonly ILogger<OrderEventProducer> _logger;
public OrderEventProducer(
EventHubProducerClient producer,
ILogger<OrderEventProducer> logger)
{
_producer = producer;
_logger = logger;
}
public async Task PublishOrderCreatedAsync(Order order)
{
// Create event data with partition key
var eventData = new EventData(
BinaryData.FromObjectAsJson(new OrderCreatedEvent
{
OrderId = order.Id,
CustomerId = order.CustomerId,
TotalAmount = order.TotalAmount,
Items = order.Items,
Region = order.Region,
Timestamp = DateTime.UtcNow
}))
{
// Partition key ensures same order goes to same partition
PartitionKey = order.Id.ToString(),
// Properties for filtering and debugging
Properties =
{
["eventType"] = "OrderCreated",
["correlationId"] = order.CorrelationId,
["source"] = "order-service"
}
};
// Send single event
await _producer.SendAsync(eventData);
_logger.LogInformation("Published order created event: {OrderId}", order.Id);
}
public async Task PublishOrderBatchAsync(List<Order> orders)
{
// Create batch for efficient sending
using var eventBatch = await _producer.CreateBatchAsync(new CreateBatchOptions
{
MaxSizeInBytes = 1024 * 1024 // 1 MB
});
foreach (var order in orders)
{
var eventData = new EventData(
BinaryData.FromObjectAsJson(new OrderCreatedEvent
{
OrderId = order.Id,
CustomerId = order.CustomerId,
TotalAmount = order.TotalAmount
}))
{
PartitionKey = order.Id.ToString()
};
if (!eventBatch.TryAdd(eventData))
{
// Batch is full, send current batch
await _producer.SendAsync(eventBatch);
// Start new batch
eventBatch.Clear();
eventBatch.TryAdd(eventData);
}
}
// Send remaining
if (eventBatch.Count > 0)
{
await _producer.SendAsync(eventBatch);
}
_logger.LogInformation("Published {Count} order events", orders.Count);
}
public async Task PublishWithCompressionAsync(Order order)
{
// Compress large events
var json = JsonSerializer.Serialize(order);
var compressed = Compress(json);
var eventData = new EventData(BinaryData.FromBytes(compressed))
{
PartitionKey = order.Id.ToString(),
ContentType = "application/json+gzip"
};
await _producer.SendAsync(eventData);
}
private byte[] Compress(string data)
{
using var output = new MemoryStream();
using (var gzip = new GZipStream(output, CompressionLevel.Optimal))
{
using var writer = new StreamWriter(gzip);
writer.Write(data);
}
return output.ToArray();
}
}
Using Partition Routing
// Route to specific partition for ordered processing
public async Task PublishInOrderAsync(List<OrderEvent> events)
{
// Group by partition key
var partitionGroups = events.GroupBy(e => e.OrderId);
foreach (var group in partitionGroups)
{
var batch = await _producer.CreateBatchAsync(new CreateBatchOptions
{
PartitionId = GetPartitionForKey(group.Key) // Specific partition
});
foreach (var evt in group)
{
batch.TryAdd(new EventData(BinaryData.FromObjectAsJson(evt)));
}
await _producer.SendAsync(batch);
}
}
private string GetPartitionForKey(string key)
{
// Hash to consistent partition
var hash = key.GetHashCode();
var partition = Math.Abs(hash) % 4; // 4 partitions
return partition.ToString();
}
Step 2: Processing Events
Azure Function with Event Hub Trigger
using Azure.Messaging.EventHubs;
public class OrderEventProcessor
{
private readonly IOrderService _orderService;
private readonly ILogger<OrderEventProcessor> _logger;
public OrderEventProcessor(
IOrderService orderService,
ILogger<OrderEventProcessor> logger)
{
_orderService = orderService;
_logger = logger;
}
[Function("ProcessOrders")]
public async Task Run(
[EventHubTrigger("orders", Connection = "EventHubConnection")]
EventData[] events,
PartitionContext partitionContext)
{
_logger.LogInformation(
"Processing partition {PartitionId}, {Count} events",
partitionContext.PartitionId,
events.Length);
foreach (var eventData in events)
{
try
{
await ProcessEventAsync(eventData, partitionContext);
}
catch (Exception ex)
{
_logger.LogError(ex,
"Failed to process event {EventId}",
eventData.MessageId);
// Either throw to let system handle retry
// or handle individually
throw;
}
}
}
private async Task ProcessEventAsync(EventData eventData, PartitionContext context)
{
// Get event properties
var eventType = eventData.Properties["eventType"]?.ToString();
var correlationId = eventData.Properties["correlationId"]?.ToString();
_logger.LogInformation(
"Processing event {EventId}, Type: {Type}, Partition: {Partition}",
eventData.MessageId, eventType, context.PartitionId);
// Deserialize based on content type
var contentType = eventData.ContentType;
object orderEvent;
if (contentType?.Contains("gzip") ?? false)
{
// Decompress
var decompressed = Decompress(eventData.Body.ToArray());
orderEvent = JsonSerializer.Deserialize<OrderCreatedEvent>(decompressed);
}
else
{
orderEvent = eventData.Body.ToObjectFromJson<OrderCreatedEvent>();
}
// Process the event
switch (eventType)
{
case "OrderCreated":
await _orderService.ProcessOrderCreatedAsync((OrderCreatedEvent)orderEvent);
break;
case "OrderUpdated":
await _orderService.ProcessOrderUpdatedAsync((OrderUpdatedEvent)orderEvent);
break;
default:
_logger.LogWarning("Unknown event type: {Type}", eventType);
break;
}
// Update checkpoint
await context.UpdateCheckpointAsync(eventData);
}
private byte[] Decompress(byte[] data)
{
using var input = new MemoryStream(data);
using var gzip = new GZipStream(input, CompressionMode.Decompress);
using var output = new MemoryStream();
gzip.CopyTo(output);
return output.ToArray();
}
}
Step 3: Event Processor Client
Using EventProcessorClient with Checkpointing
using Azure.Messaging.EventHubs;
using Azure.Messaging.EventHubs.Processor;
using Azure.Storage.Blobs;
public class EventHubConsumer
{
private readonly BlobContainerClient _checkpointStore;
private readonly EventProcessorClient _processor;
private readonly ILogger<EventHubConsumer> _logger;
public EventHubConsumer(
EventHubClient client,
BlobContainerClient checkpointStore,
ILogger<EventHubConsumer> logger)
{
_checkpointStore = checkpointStore;
_logger = logger;
_processor = new EventProcessorClient(
checkpointStore,
consumerGroup: "processing-group",
fullyQualifiedNamespace: client.FullyQualifiedNamespace,
eventHubName: client.EventHubName,
eventHubConsumerGroup: "processing-group");
_processor.ProcessEventAsync += ProcessEventAsync;
_processor.ProcessErrorAsync += ProcessErrorAsync;
}
public async Task StartProcessingAsync()
{
await _processor.StartProcessingAsync();
_logger.LogInformation("Started processing events");
}
private async Task ProcessEventAsync(ProcessEventArgs args)
{
try
{
var eventData = args.Data;
// Process the event
var orderEvent = eventData.Body.ToObjectFromJson<OrderCreatedEvent>();
await ProcessOrderAsync(orderEvent);
// Checkpoint - save offset so we resume here on restart
await args.UpdateCheckpointAsync();
_logger.LogInformation(
"Processed event {Id}, partition {Partition}, offset {Offset}",
eventData.MessageId,
args.Partition.PartitionId,
eventData.Offset);
}
catch (Exception ex)
{
_logger.LogError(ex, "Error processing event");
// Don't update checkpoint - will be retried
throw;
}
}
private Task ProcessErrorAsync(ProcessErrorEventArgs args)
{
_logger.LogError(args.Exception,
"Error in partition {Partition}, source: {Source}",
args.Partition?.PartitionId,
args.ErrorSource);
return Task.CompletedTask;
}
private async Task ProcessOrderAsync(OrderCreatedEvent order)
{
// Your business logic here
await Task.Delay(100);
}
}
Why Checkpointing Matters
Without Checkpointing: With Checkpointing:
┌─────────────────────────────────┐ ┌─────────────────────────────────┐
│ Process events 1-100 │ │ Process events 1-100 │
│ │ │ │
│ Fails at event 50 │ │ Succeeds - checkpoint at 100 │
│ │ │ │
│ RESTART │ │ RESTART │
│ │ │ │
│ Process events 1-100 AGAIN! │ │ Resume from 100 │
│ Wasted work on 1-49 │ │ Process 101-200 │
└─────────────────────────────────┘ └─────────────────────────────────┘
Benefits:
- Exactly-once processing semantics
- Resilient to failures
- Progress preserved across restarts
- Parallel processing scales
Step 4: Consumer Groups
Why Multiple Consumer Groups?
┌─────────────────────────────────────────────────────────────────┐
│ Multiple Consumer Groups │
└─────────────────────────────────────────────────────────────────┘
Event Hub:
Partition 0: ────Events: 1, 2, 3, 4, 5, 6, 7, 8, 9, 10──
Partition 1: ────Events: 1, 2, 3, 4, 5, 6, 7, 8, 9, 10──
Consumer Groups: Each gets all events:
┌──────────────────────┐ ┌──────────────────────┐
│ processing-group │ │ processing-group: │
│ (Azure Functions) │ │ Events 1, 3, 5, 7, 9│
└──────────────────────┘ └──────────────────────┘
┌──────────────────────┐ ┌──────────────────────┐
│ analytics-group │ │ analytics-group: │
│ (Stream Analytics) │ │ Events 1, 2, 4, 6, 8│
└──────────────────────┘ └──────────────────────┘
┌──────────────────────┐ ┌──────────────────────┐
│ archive-group │ │ archive-group: │
│ (Blob Storage) │ │ Events 1, 3, 5, 7, 9│
└──────────────────────┘ └──────────────────────┘
Why use different groups?
- Different processing logic
- Independent scaling
- No interference between consumers
- Each maintains own position
Configuring Consumer Groups
// Create processor with specific consumer group
var processor = new EventProcessorClient(
checkpointStore,
consumerGroup: "my-consumer-group", // Each processor gets its own
eventHubConnectionString: "...",
eventHubName: "orders");
// Azure Functions configuration
[EventHubTrigger("orders",
Connection = "EventHubConnection",
ConsumerGroup = "$Default")] // Or custom group
// Programmatic creation
public async Task CreateConsumerGroupAsync()
{
var adminClient = new EventHubAdministrationClient(connectionString);
await adminClient.CreateConsumerGroupAsync(
eventHubName: "orders",
consumerGroupName: "analytics-group",
consumerGroupProperties: new ConsumerGroupProperties
{
UserMetadata = "Events for analytics pipeline"
});
}
Step 5: Kafka Integration
Using Kafka Protocol
using Confluent.Kafka;
// Event Hubs is Kafka-compatible!
// Just change bootstrap servers
var config = new ProducerConfig
{
BootstrapServers = "my-namespace.servicebus.windows.net:9093",
SecurityProtocol = SecurityProtocol.SaslSsl,
SaslMechanism = SaslMechanism.Plain,
SaslUsername = "$ConnectionString",
SaslPassword = "Endpoint=sb://..."
};
using var producer = new ProducerBuilder<string, string>(config).Build();
await producer.ProduceAsync("orders", new Message<string, string>
{
Key = orderId,
Value = JsonSerializer.Serialize(order)
});
// Consume with Kafka consumer
var consumerConfig = new ConsumerConfig
{
BootstrapServers = "my-namespace.servicebus.windows.net:9093",
GroupId = "my-consumer-group",
SecurityProtocol = SecurityProtocol.SaslSsl,
SaslMechanism = SaslMechanism.Plain,
SaslUsername = "$ConnectionString",
SaslPassword = "Endpoint=sb://...",
AutoOffsetReset = AutoOffsetReset.Earliest,
EnableAutoCommit = false
};
using var consumer = new ConsumerBuilder<string, string>(consumerConfig).Build();
consumer.Subscribe("orders");
while (true)
{
var result = consumer.Consume();
await ProcessOrderAsync(result.Message.Value);
consumer.Commit(result);
}
Why Kafka Compatibility?
Kafka Tools Work with Event Hubs:
┌─────────────────────────────────────────────────────────────────┐
│ Kafka Ecosystem │
└─────────────────────────────────────────────────────────────────┘
Producers: Consumers:
- Java/C# producers - Kafka consumers
- Flink - Flink
- Spark - Spark
- Custom apps - Custom apps
Tools:
- Kafka Connect - Kafka Streams
- ksqlDB - Debezium (CDC)
- Kafka Monitor - Consumer group management
All work with Event Hubs!
Benefits:
- No Kafka expertise needed for Event Hubs
- Existing Kafka tools work
- Easy migration from Kafka
- Best of both worlds
Step 6: Scaling and Performance
Partition Strategies
// Choosing partition count
// Low volume: 2-4 partitions
// - Up to 1K events/sec
// - Simple processing
// Medium volume: 8-16 partitions
// - Up to 10K events/sec
// - Good parallelism
// High volume: 32+ partitions
// - 10K+ events/sec
// - Maximum throughput
// Dynamic scaling (Premium)
var eventHub = new EventHubClient(...);
await eventHub.ScaleStorageClusterAsync(
new ScaleOptions {
CapacityUnits = 4 // Add throughput units
});
Performance Optimization
// Optimize batch processing
public class OptimizedProcessor
{
// Batch events for better throughput
[Function("ProcessBatch")]
public async Task Run(
[EventHubTrigger("orders",
Connection = "EventHubConnection",
BatchOptions = new BatchOptions
{
MaxBatchSize = 100, // Process up to 100 at once
MaxBatchDuration = TimeSpan.FromSeconds(5) // Or after 5 sec
})]
EventData[] events)
{
// Process in parallel
var tasks = events.Select(ProcessEventAsync);
await Task.WhenAll(tasks);
}
// Pre-fetch for lower latency
[Function("OptimizedProcessor")]
public async Task Run(
[EventHubTrigger("orders",
Connection = "EventHubConnection",
PrefetchCount = 200)] // Keep 200 events in memory
EventData[] events)
{
// Lower latency processing
}
}
Step 7: Monitoring and Diagnostics
public class EventHubMonitor
{
private readonly EventHubAdministrationClient _adminClient;
private readonly ILogger<EventHubMonitor> _logger;
public async Task<EventHubMetrics> GetMetricsAsync(string eventHubName)
{
var metrics = new EventHubMetrics();
// Get runtime info
var runtimeInfo = await _adminClient.GetEventHubAsync(eventHubName);
metrics.PartitionCount = runtimeInfo.Value.PartitionCount;
metrics.PartitionIds = runtimeInfo.Value.PartitionIds;
// Get partition metrics
foreach (var partitionId in runtimeInfo.Value.PartitionIds)
{
var partitionInfo = await _adminClient.GetPartitionAsync(eventHubName, partitionId);
metrics.Partitions[partitionId] = new PartitionMetrics
{
LatestOffset = partitionInfo.Value.LastEnqueuedOffset,
IncomingEvents = partitionInfo.Value.IncomingEventsPerSecond,
OutgoingEvents = partitionInfo.Value.OutgoingEventsPerSecond,
Lag = partitionInfo.Value.LagInEvents,
LastEnqueuedTime = partitionInfo.Value.LastEnqueuedTime
};
}
return metrics;
}
public async Task CheckBacklogAsync(string eventHubName)
{
var metrics = await GetMetricsAsync(eventHubName);
foreach (var (partitionId, partitionMetrics) in metrics.Partitions)
{
if (partitionMetrics.Lag > 1000)
{
_logger.LogWarning(
"Partition {Partition} has backlog: {Lag} events",
partitionId, partitionMetrics.Lag);
}
}
}
}
Best Practices Summary
| Practice | Why | Implementation |
|---|---|---|
| Use partition keys | Ensure ordering within partition | Key by business entity |
| Implement checkpointing | Resume from last position | Use Blob/ADLS |
| Create consumer groups | Independent processing | One per processor type |
| Batch processing | Higher throughput | Configure batch size |
| Monitor lag | Detect processing issues | Track partition lag |
Conclusion
Event Hubs provides enterprise-grade streaming:
- Massive scale - Handle millions of events/second
- Kafka compatibility - Use Kafka ecosystem tools
- Checkpointing - Exactly-once processing semantics
- Consumer groups - Multiple independent consumers
- Partitioning - Order and parallelism control
Key takeaways:
- Use partition keys for related event ordering
- Implement checkpointing for resilience
- Create separate consumer groups for different processing paths
- Monitor partition lag to detect issues
- Take advantage of Kafka compatibility for tooling
Azure Integration Hub - Event Hubs