Azure Event Hubs — Kafka Protocol on Event Hubs

Use Kafka SDK Without Changing Code


Introduction

Azure Event Hubs provides a native Kafka-compatible endpoint, allowing you to use existing Kafka applications with Azure Event Hubs without any code changes. This powerful feature enables:

  • Lift-and-shift migration — Move Kafka workloads to Azure without rewriting
  • Hybrid architectures — Use both Kafka and Event Hubs in same pipeline
  • Kafka ecosystem tools — Use Kafka CLI, ksqlDB, Kafka Connect
  • Azure integration — Leverage Event Hubs features (Capture, scaling)

This guide covers:

  • Protocol compatibility — How it works under the hood
  • Configuration — Complete setup for different clients
  • Code examples — Producer and consumer implementations
  • Advanced features — Transactions, exactly-once semantics
  • Migration strategies — Moving from self-managed Kafka

How Kafka Compatibility Works

Architecture Overview

┌─────────────────────────────────────────────────────────────────────┐
│              EVENT HUBS KAFKA COMPATIBILITY                         │
├─────────────────────────────────────────────────────────────────────┤
│                                                                     │
│   ┌─────────────────┐         ┌─────────────────┐                   │
│   │   Kafka Client  │ ───────▶│  Event Hubs     │                   │
│   │                 │         │  Kafka Endpoint │                   │
│   │ • Producer      │         │                 │                   │ 
│   │ • Consumer      │   SASL  │  • AMQP Bridge  │                   │
│   │ • Streams       │  over   │  • Partition    │                   │
│   │ • Connect       │   SSL   │    Mapping      │                   │
│   └─────────────────┘         └────────┬────────┘                   │
│                                        │                            │
│                              ┌─────────▼─────────┐                  │
│                              │   Event Hubs      │                  │
│                              │   Core            │                  │
│                              └───────────────────┘                  │
│                                                                     │
└─────────────────────────────────────────────────────────────────────┘

Protocol Translation

Event Hubs Kafka endpoint translates:

  • Kafka Protocol → AMQP — For internal processing
  • SASL/PLAIN → Azure AD — Authentication
  • Kafka Offsets → Event Hubs Offsets — Checkpointing

Configure Kafka Endpoint

Namespace Configuration

# Create Event Hubs namespace with Kafka enabled (enabled by default)
az eventhubs namespace create \
  --name my-kafka-namespace \
  --resource-group my-rg \
  --location eastus \
  --sku Standard  # Standard, Premium, or Dedicated

# Verify Kafka endpoint is available
az eventhubs namespace show \
  --name my-kafka-namespace \
  --query "properties.kafkaEnabled"

Connection Details

PropertyValue
Bootstrap Servers<namespace>.servicebus.windows.net:9093
ProtocolSASL_SSL
SASL MechanismPLAIN
Port9093 (SSL), 9094 (SSL + OAuth)

Connection String Format

Endpoint=sb://mynamespace.servicebus.windows.net/;
SharedAccessKeyName=RootManageSharedAccessKey;
SharedAccessKey=xxx;
EntityPath=myeventhub

Kafka Producer Implementation

Java Producer

import org.apache.kafka.clients.producer.*;
import java.util.Properties;

public class EventHubsProducer {
    public static void main(String[] args) {
        Properties props = new Properties();
        
        // Event Hubs Kafka endpoint
        props.put("bootstrap.servers", "mynamespace.servicebus.windows.net:9093");
        
        // Security configuration
        props.put("security.protocol", "SASL_SSL");
        props.put("sasl.mechanism", "PLAIN");
        
        // Authentication - use connection string as password
        String connectionString = System.getenv("EVENTHUB_CONNECTION_STRING");
        props.put("sasl.jaas.config", 
            "org.apache.kafka.common.security.plain.PlainLoginModule required " +
            "username=\"$ConnectionString\" " +
            "password=\"" + connectionString + "\";");
        
        // Producer settings
        props.put("client.id", "my-producer");
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        
        // Reliability settings
        props.put("acks", "all");
        props.put("retries", 3);
        props.put("enable.idempotence", true);
        
        KafkaProducer<String, String> producer = new KafkaProducer<>(props);
        
        // Send messages
        for (int i = 0; i < 100; i++) {
            ProducerRecord<String, String> record = 
                new ProducerRecord<>("my-topic", "key-" + i, "message-" + i);
            
            producer.send(record, (metadata, exception) -> {
                if (exception != null) {
                    System.err.println("Send failed: " + exception.getMessage());
                } else {
                    System.out.println("Sent to " + metadata.topic() + 
                        ":" + metadata.partition() + ":" + metadata.offset());
                }
            });
        }
        
        producer.flush();
        producer.close();
    }
}

.NET Producer

using System;
using System.Threading.Tasks;
using Confluent.Kafka;

public class EventHubsProducer
{
    public static async Task Main(string[] args)
    {
        var config = new ProducerConfig
        {
            BootstrapServers = "mynamespace.servicebus.windows.net:9093",
            SecurityProtocol = SecurityProtocol.SaslSsl,
            SaslMechanism = SaslMechanism.Plain,
            SaslUsername = "$ConnectionString",
            SaslPassword = Environment.GetEnvironmentVariable("EVENTHUB_CONNECTION_STRING"),
            Acks = Acks.Leader,
            EnableIdempotence = true,
            ClientId = "my-dotnet-producer"
        };
        
        using var producer = new ProducerBuilder<string, string>(config).Build();
        
        for (int i = 0; i < 100; i++)
        {
            var message = new Message<string, string>
            {
                Key = $"key-{i}",
                Value = $"message-{i}",
                Timestamp = Timestamp.Default
            };
            
            var result = await producer.ProduceAsync("my-topic", message);
            
            Console.WriteLine($"Delivered to {result.TopicPartitionOffset}");
        }
        
        producer.Flush(TimeSpan.FromSeconds(10));
    }
}

Python Producer

from confluent_kafka import Producer
import os

def delivery_callback(err, msg):
    if err:
        print(f'Message delivery failed: {err}')
    else:
        print(f'Message delivered to {msg.topic()} [{msg.partition()}]')

producer = Producer({
    'bootstrap.servers': 'mynamespace.servicebus.windows.net:9093',
    'security.protocol': 'SASL_SSL',
    'sasl.mechanism': 'PLAIN',
    'sasl.username': '$ConnectionString',
    'sasl.password': os.environ['EVENTHUB_CONNECTION_STRING'],
    'client.id': 'my-python-producer'
})

for i in range(100):
    producer.produce(
        'my-topic',
        key=f'key-{i}',
        value=f'message-{i}',
        callback=delivery_callback
    )

producer.flush()

Kafka Consumer Implementation

Java Consumer

import org.apache.kafka.clients.consumer.*;
import java.util.Arrays;
import java.util.Properties;

public class EventHubsConsumer {
    public static void main(String[] args) {
        Properties props = new Properties();
        
        // Event Hubs Kafka endpoint
        props.put("bootstrap.servers", "mynamespace.servicebus.windows.net:9093");
        
        // Security configuration
        props.put("security.protocol", "SASL_SSL");
        props.put("sasl.mechanism", "PLAIN");
        
        // Authentication
        String connectionString = System.getenv("EVENTHUB_CONNECTION_STRING");
        props.put("sasl.jaas.config", 
            "org.apache.kafka.common.security.plain.PlainLoginModule required " +
            "username=\"$ConnectionString\" " +
            "password=\"" + connectionString + "\";");
        
        // Consumer settings
        props.put("group.id", "my-consumer-group");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        
        // Starting offset
        props.put("auto.offset.reset", "earliest");
        props.put("enable.auto.commit", "true");
        
        // Reliability settings
        props.put("fetch.min.bytes", 1);
        props.put("max.poll.records", 500);
        props.put("session.timeout.ms", 30000);
        
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        
        // Subscribe to topics
        consumer.subscribe(Arrays.asList("my-topic", "another-topic"));
        
        // Poll for messages
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
            
            for (ConsumerRecord<String, String> record : records) {
                System.out.printf("Topic: %s, Partition: %d, Offset: %d, Key: %s, Value: %s%n",
                    record.topic(), record.partition(), record.offset(),
                    record.key(), record.value());
            }
        }
    }
}

.NET Consumer

using System;
using System.Threading;
using System.Threading.Tasks;
using Confluent.Kafka;

public class EventHubsConsumer
{
    public static void Main()
    {
        var config = new ConsumerConfig
        {
            BootstrapServers = "mynamespace.servicebus.windows.net:9093",
            SecurityProtocol = SecurityProtocol.SaslSsl,
            SaslMechanism = SaslMechanism.Plain,
            SaslUsername = "$ConnectionString",
            SaslPassword = Environment.GetEnvironmentVariable("EVENTHUB_CONNECTION_STRING"),
            GroupId = "my-consumer-group",
            AutoOffsetReset = AutoOffsetReset.Earliest,
            EnableAutoCommit = true,
            EnablePartitionEof = true
        };
        
        using var consumer = new ConsumerBuilder<string, string>(config).Build();
        
        consumer.Subscribe("my-topic");
        
        try
        {
            while (true)
            {
                try
                {
                    var cr = consumer.Consume(TimeSpan.FromSeconds(1));
                    if (cr == null) continue;
                    
                    Console.WriteLine($"Consumed: {cr.Message.Value} from {cr.TopicPartitionOffset}");
                }
                catch (ConsumeException e)
                {
                    Console.WriteLine($"Error: {e.Error.Reason}");
                }
            }
        }
        catch (OperationCanceledException)
        {
            consumer.Close();
        }
    }
}

Advanced Features

Partition Assignment

// Manual partition assignment
consumer.assign(Arrays.asList(
    new TopicPartition("my-topic", 0),
    new TopicPartition("my-topic", 1),
    new TopicPartition("my-topic", 2)
));

Offset Management

// Commit offset manually
consumer.commitSync();

// Seek to specific offset
consumer.seek(new TopicPartition("my-topic", 0), 100L);

// Get current position
long currentPosition = consumer.position(new TopicPartition("my-topic", 0));

// Get committed offset
OffsetAndMetadata committed = consumer.committed(new TopicPartition("my-topic", 0));

Transactions (Premium Tier)

// Enable transactions
props.put("transactional.id", "my-transactional-producer");
props.put("enable.idempotence", true);

KafkaProducer<String, String> producer = new KafkaProducer<>(props);
producer.initTransactions();
producer.beginTransaction();

try {
    for (int i = 0; i < 1000; i++) {
        producer.send(new ProducerRecord<>("topic1", "key", "value1"));
        producer.send(new ProducerRecord<>("topic2", "key", "value2"));
    }
    producer.commitTransaction();
} catch (Exception e) {
    producer.abortTransaction();
}

Exactly-Once Semantics

// Producer configuration for exactly-once
props.put("enable.idempotence", true);
props.put("transactional.id", "unique-id");
props.put("transaction.timeout.ms", 10000);

// Consumer configuration
props.put("isolation.level", "read_committed");

Supported Kafka Operations

OperationSupportNotes
ProduceFull support
ConsumeFull support
Create TopicsVia Azure Portal or ARM
List TopicsAutomatic with Event Hubs
Offset ManagementCommit/checkpoint
Partition ManagementBuilt-in
RebalancingConsumer groups
Admin ClientLimited
TransactionsPremium only
Exactly-OncePremium only
Kafka StreamsFull support
kSQLDBFull support
Kafka ConnectFull support

Migrate from Self-Managed Kafka

Step-by-Step Migration

  1. Inventory Kafka Clusters

    # List current topics
    kafka-topics.sh --bootstrap-server localhost:9092 --list
    
    # Check consumer groups
    kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list
    
  2. Update Producer Configuration

    # Before (self-managed Kafka)
    bootstrap.servers=my-kafka-cluster:9092
    
    # After (Event Hubs)
    bootstrap.servers=mynamespace.servicebus.windows.net:9093
    security.protocol=SASL_SSL
    sasl.mechanism=PLAIN
    sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{connection-string}";
    
  3. Update Consumer Configuration

    # Same changes as producer + consumer group
    group.id=my-consumer-group
    
  4. Test in Staging

    • Deploy to staging environment
    • Run smoke tests
    • Verify throughput and ordering
  5. Production Cutover

    • Use blue-green deployment pattern
    • Route small % traffic initially
    • Monitor for issues
    • Increase traffic gradually

Connection String Management

# Get connection string
az eventhubs namespace authorization-rule list \
  --resource-group my-rg \
  --namespace-name my-namespace \
  --query "[0].primaryConnectionString"

# Use Key Vault for secrets
# In Java:
props.put("sasl.jaas.config", 
    "org.apache.kafka.common.security.plain.PlainLoginModule required " +
    "username=\"$ConnectionString\" " +
    "password=\"" + System.getenv("EVENTHUB_KEY_VAULT_SECRET") + "\";");

Best Practices

PracticeDescription
Use Managed IdentityAvoid storing connection strings
Configure retriesHandle transient failures
Set appropriate batch sizeBalance latency and throughput
Use compressionLZ4 or Snappy for large messages
Monitor consumer lagTrack offset behind
Use consumer groupsScale consumers horizontally

Performance Tuning

// High throughput configuration
props.put("batch.size", 16384);
props.put("linger.ms", 10);
props.put("buffer.memory", 33554432);
props.put("compression.type", "lz4");
props.put("max.in.flight.requests.per.connection", 5);

Comparison: Kafka vs Event Hubs

AspectSelf-Managed KafkaEvent Hubs Kafka
InfrastructureYou manageFully managed
ScalingManual partition managementAuto-scale
AvailabilityRequires replication setup99.9% SLA
SecurityYou configureAzure AD, SASL
MonitoringSelf-configuredAzure Monitor
CostInfrastructure + operationsPer TU

Azure Integration Hub - Advanced Level