Azure Event Hubs — Kafka Protocol on Event Hubs
Use Kafka SDK Without Changing Code
Introduction
Azure Event Hubs provides a native Kafka-compatible endpoint, allowing you to use existing Kafka applications with Azure Event Hubs without any code changes. This powerful feature enables:
- Lift-and-shift migration — Move Kafka workloads to Azure without rewriting
- Hybrid architectures — Use both Kafka and Event Hubs in same pipeline
- Kafka ecosystem tools — Use Kafka CLI, ksqlDB, Kafka Connect
- Azure integration — Leverage Event Hubs features (Capture, scaling)
This guide covers:
- Protocol compatibility — How it works under the hood
- Configuration — Complete setup for different clients
- Code examples — Producer and consumer implementations
- Advanced features — Transactions, exactly-once semantics
- Migration strategies — Moving from self-managed Kafka
How Kafka Compatibility Works
Architecture Overview
┌─────────────────────────────────────────────────────────────────────┐
│ EVENT HUBS KAFKA COMPATIBILITY │
├─────────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────────┐ ┌─────────────────┐ │
│ │ Kafka Client │ ───────▶│ Event Hubs │ │
│ │ │ │ Kafka Endpoint │ │
│ │ • Producer │ │ │ │
│ │ • Consumer │ SASL │ • AMQP Bridge │ │
│ │ • Streams │ over │ • Partition │ │
│ │ • Connect │ SSL │ Mapping │ │
│ └─────────────────┘ └────────┬────────┘ │
│ │ │
│ ┌─────────▼─────────┐ │
│ │ Event Hubs │ │
│ │ Core │ │
│ └───────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────┘
Protocol Translation
Event Hubs Kafka endpoint translates:
- Kafka Protocol → AMQP — For internal processing
- SASL/PLAIN → Azure AD — Authentication
- Kafka Offsets → Event Hubs Offsets — Checkpointing
Configure Kafka Endpoint
Namespace Configuration
# Create Event Hubs namespace with Kafka enabled (enabled by default)
az eventhubs namespace create \
--name my-kafka-namespace \
--resource-group my-rg \
--location eastus \
--sku Standard # Standard, Premium, or Dedicated
# Verify Kafka endpoint is available
az eventhubs namespace show \
--name my-kafka-namespace \
--query "properties.kafkaEnabled"
Connection Details
| Property | Value |
|---|---|
| Bootstrap Servers | <namespace>.servicebus.windows.net:9093 |
| Protocol | SASL_SSL |
| SASL Mechanism | PLAIN |
| Port | 9093 (SSL), 9094 (SSL + OAuth) |
Connection String Format
Endpoint=sb://mynamespace.servicebus.windows.net/;
SharedAccessKeyName=RootManageSharedAccessKey;
SharedAccessKey=xxx;
EntityPath=myeventhub
Kafka Producer Implementation
Java Producer
import org.apache.kafka.clients.producer.*;
import java.util.Properties;
public class EventHubsProducer {
public static void main(String[] args) {
Properties props = new Properties();
// Event Hubs Kafka endpoint
props.put("bootstrap.servers", "mynamespace.servicebus.windows.net:9093");
// Security configuration
props.put("security.protocol", "SASL_SSL");
props.put("sasl.mechanism", "PLAIN");
// Authentication - use connection string as password
String connectionString = System.getenv("EVENTHUB_CONNECTION_STRING");
props.put("sasl.jaas.config",
"org.apache.kafka.common.security.plain.PlainLoginModule required " +
"username=\"$ConnectionString\" " +
"password=\"" + connectionString + "\";");
// Producer settings
props.put("client.id", "my-producer");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// Reliability settings
props.put("acks", "all");
props.put("retries", 3);
props.put("enable.idempotence", true);
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
// Send messages
for (int i = 0; i < 100; i++) {
ProducerRecord<String, String> record =
new ProducerRecord<>("my-topic", "key-" + i, "message-" + i);
producer.send(record, (metadata, exception) -> {
if (exception != null) {
System.err.println("Send failed: " + exception.getMessage());
} else {
System.out.println("Sent to " + metadata.topic() +
":" + metadata.partition() + ":" + metadata.offset());
}
});
}
producer.flush();
producer.close();
}
}
.NET Producer
using System;
using System.Threading.Tasks;
using Confluent.Kafka;
public class EventHubsProducer
{
public static async Task Main(string[] args)
{
var config = new ProducerConfig
{
BootstrapServers = "mynamespace.servicebus.windows.net:9093",
SecurityProtocol = SecurityProtocol.SaslSsl,
SaslMechanism = SaslMechanism.Plain,
SaslUsername = "$ConnectionString",
SaslPassword = Environment.GetEnvironmentVariable("EVENTHUB_CONNECTION_STRING"),
Acks = Acks.Leader,
EnableIdempotence = true,
ClientId = "my-dotnet-producer"
};
using var producer = new ProducerBuilder<string, string>(config).Build();
for (int i = 0; i < 100; i++)
{
var message = new Message<string, string>
{
Key = $"key-{i}",
Value = $"message-{i}",
Timestamp = Timestamp.Default
};
var result = await producer.ProduceAsync("my-topic", message);
Console.WriteLine($"Delivered to {result.TopicPartitionOffset}");
}
producer.Flush(TimeSpan.FromSeconds(10));
}
}
Python Producer
from confluent_kafka import Producer
import os
def delivery_callback(err, msg):
if err:
print(f'Message delivery failed: {err}')
else:
print(f'Message delivered to {msg.topic()} [{msg.partition()}]')
producer = Producer({
'bootstrap.servers': 'mynamespace.servicebus.windows.net:9093',
'security.protocol': 'SASL_SSL',
'sasl.mechanism': 'PLAIN',
'sasl.username': '$ConnectionString',
'sasl.password': os.environ['EVENTHUB_CONNECTION_STRING'],
'client.id': 'my-python-producer'
})
for i in range(100):
producer.produce(
'my-topic',
key=f'key-{i}',
value=f'message-{i}',
callback=delivery_callback
)
producer.flush()
Kafka Consumer Implementation
Java Consumer
import org.apache.kafka.clients.consumer.*;
import java.util.Arrays;
import java.util.Properties;
public class EventHubsConsumer {
public static void main(String[] args) {
Properties props = new Properties();
// Event Hubs Kafka endpoint
props.put("bootstrap.servers", "mynamespace.servicebus.windows.net:9093");
// Security configuration
props.put("security.protocol", "SASL_SSL");
props.put("sasl.mechanism", "PLAIN");
// Authentication
String connectionString = System.getenv("EVENTHUB_CONNECTION_STRING");
props.put("sasl.jaas.config",
"org.apache.kafka.common.security.plain.PlainLoginModule required " +
"username=\"$ConnectionString\" " +
"password=\"" + connectionString + "\";");
// Consumer settings
props.put("group.id", "my-consumer-group");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
// Starting offset
props.put("auto.offset.reset", "earliest");
props.put("enable.auto.commit", "true");
// Reliability settings
props.put("fetch.min.bytes", 1);
props.put("max.poll.records", 500);
props.put("session.timeout.ms", 30000);
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
// Subscribe to topics
consumer.subscribe(Arrays.asList("my-topic", "another-topic"));
// Poll for messages
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("Topic: %s, Partition: %d, Offset: %d, Key: %s, Value: %s%n",
record.topic(), record.partition(), record.offset(),
record.key(), record.value());
}
}
}
}
.NET Consumer
using System;
using System.Threading;
using System.Threading.Tasks;
using Confluent.Kafka;
public class EventHubsConsumer
{
public static void Main()
{
var config = new ConsumerConfig
{
BootstrapServers = "mynamespace.servicebus.windows.net:9093",
SecurityProtocol = SecurityProtocol.SaslSsl,
SaslMechanism = SaslMechanism.Plain,
SaslUsername = "$ConnectionString",
SaslPassword = Environment.GetEnvironmentVariable("EVENTHUB_CONNECTION_STRING"),
GroupId = "my-consumer-group",
AutoOffsetReset = AutoOffsetReset.Earliest,
EnableAutoCommit = true,
EnablePartitionEof = true
};
using var consumer = new ConsumerBuilder<string, string>(config).Build();
consumer.Subscribe("my-topic");
try
{
while (true)
{
try
{
var cr = consumer.Consume(TimeSpan.FromSeconds(1));
if (cr == null) continue;
Console.WriteLine($"Consumed: {cr.Message.Value} from {cr.TopicPartitionOffset}");
}
catch (ConsumeException e)
{
Console.WriteLine($"Error: {e.Error.Reason}");
}
}
}
catch (OperationCanceledException)
{
consumer.Close();
}
}
}
Advanced Features
Partition Assignment
// Manual partition assignment
consumer.assign(Arrays.asList(
new TopicPartition("my-topic", 0),
new TopicPartition("my-topic", 1),
new TopicPartition("my-topic", 2)
));
Offset Management
// Commit offset manually
consumer.commitSync();
// Seek to specific offset
consumer.seek(new TopicPartition("my-topic", 0), 100L);
// Get current position
long currentPosition = consumer.position(new TopicPartition("my-topic", 0));
// Get committed offset
OffsetAndMetadata committed = consumer.committed(new TopicPartition("my-topic", 0));
Transactions (Premium Tier)
// Enable transactions
props.put("transactional.id", "my-transactional-producer");
props.put("enable.idempotence", true);
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
producer.initTransactions();
producer.beginTransaction();
try {
for (int i = 0; i < 1000; i++) {
producer.send(new ProducerRecord<>("topic1", "key", "value1"));
producer.send(new ProducerRecord<>("topic2", "key", "value2"));
}
producer.commitTransaction();
} catch (Exception e) {
producer.abortTransaction();
}
Exactly-Once Semantics
// Producer configuration for exactly-once
props.put("enable.idempotence", true);
props.put("transactional.id", "unique-id");
props.put("transaction.timeout.ms", 10000);
// Consumer configuration
props.put("isolation.level", "read_committed");
Supported Kafka Operations
| Operation | Support | Notes |
|---|---|---|
| Produce | ✅ | Full support |
| Consume | ✅ | Full support |
| Create Topics | ✅ | Via Azure Portal or ARM |
| List Topics | ✅ | Automatic with Event Hubs |
| Offset Management | ✅ | Commit/checkpoint |
| Partition Management | ✅ | Built-in |
| Rebalancing | ✅ | Consumer groups |
| Admin Client | ✅ | Limited |
| Transactions | ✅ | Premium only |
| Exactly-Once | ✅ | Premium only |
| Kafka Streams | ✅ | Full support |
| kSQLDB | ✅ | Full support |
| Kafka Connect | ✅ | Full support |
Migrate from Self-Managed Kafka
Step-by-Step Migration
-
Inventory Kafka Clusters
# List current topics kafka-topics.sh --bootstrap-server localhost:9092 --list # Check consumer groups kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list -
Update Producer Configuration
# Before (self-managed Kafka) bootstrap.servers=my-kafka-cluster:9092 # After (Event Hubs) bootstrap.servers=mynamespace.servicebus.windows.net:9093 security.protocol=SASL_SSL sasl.mechanism=PLAIN sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{connection-string}"; -
Update Consumer Configuration
# Same changes as producer + consumer group group.id=my-consumer-group -
Test in Staging
- Deploy to staging environment
- Run smoke tests
- Verify throughput and ordering
-
Production Cutover
- Use blue-green deployment pattern
- Route small % traffic initially
- Monitor for issues
- Increase traffic gradually
Connection String Management
# Get connection string
az eventhubs namespace authorization-rule list \
--resource-group my-rg \
--namespace-name my-namespace \
--query "[0].primaryConnectionString"
# Use Key Vault for secrets
# In Java:
props.put("sasl.jaas.config",
"org.apache.kafka.common.security.plain.PlainLoginModule required " +
"username=\"$ConnectionString\" " +
"password=\"" + System.getenv("EVENTHUB_KEY_VAULT_SECRET") + "\";");
Best Practices
| Practice | Description |
|---|---|
| Use Managed Identity | Avoid storing connection strings |
| Configure retries | Handle transient failures |
| Set appropriate batch size | Balance latency and throughput |
| Use compression | LZ4 or Snappy for large messages |
| Monitor consumer lag | Track offset behind |
| Use consumer groups | Scale consumers horizontally |
Performance Tuning
// High throughput configuration
props.put("batch.size", 16384);
props.put("linger.ms", 10);
props.put("buffer.memory", 33554432);
props.put("compression.type", "lz4");
props.put("max.in.flight.requests.per.connection", 5);
Comparison: Kafka vs Event Hubs
| Aspect | Self-Managed Kafka | Event Hubs Kafka |
|---|---|---|
| Infrastructure | You manage | Fully managed |
| Scaling | Manual partition management | Auto-scale |
| Availability | Requires replication setup | 99.9% SLA |
| Security | You configure | Azure AD, SASL |
| Monitoring | Self-configured | Azure Monitor |
| Cost | Infrastructure + operations | Per TU |
Azure Integration Hub - Advanced Level