Azure Event Grid + Blob Storage Integration
Event-Driven Blob Processing
Introduction
Azure Event Grid provides a powerful event-driven architecture for Azure Blob Storage. Instead of polling for changes or using timer-based triggers, Event Grid pushes notifications immediately when blob operations occur. This enables real-time processing of uploaded files, instant reactions to deletions, and sophisticated event-driven workflows without the overhead of continuous polling.
This comprehensive guide covers:
- Blob events — Available event types and when they fire
- Event Grid topic — Creating and configuring topics
- Event subscriptions — Subscribing to blob events
- Function triggers — Processing events in Azure Functions
- Use cases — Real-world event-driven patterns
- Best practices — Production implementation guidance
Understanding the Architecture
Event-Driven Blob Processing Flow
┌─────────────────────────────────────────────────────────────────────┐
│ EVENT-DRIVEN BLOB PROCESSING FLOW │
├─────────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────────────────────────────────────────────────────┐ │
│ │ BLOB STORAGE │ │
│ │ │ │
│ │ User uploads file │ │
│ │ │ │ │
│ │ ▼ │ │
│ │ Blob written to container │ │
│ │ │ │ │
│ │ ▼ │ │
│ │ Event Grid emits: BlobCreated │ │
│ └────────────────────────────┬────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────────┐ │
│ │ EVENT GRID │ │
│ │ │ │
│ │ Event: { │ │
│ │ eventType: Microsoft.Storage.BlobCreated, │ │
│ │ subject: /blobServices/default/containers/input/... │ │
│ │ data: { url: "...", api: "PutBlockList" } │ │
│ │ } │ │
│ │ │ │
│ └────────────────────────────┬────────────────────────────────┘ │
│ │ │
│ ┌───────────────────┼───────────────────┐ │
│ │ │ │ │
│ ▼ ▼ ▼ │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ Azure Func │ │ Logic Apps │ │ Webhook │ │
│ │ (Process) │ │ (Workflow) │ │ (Custom) │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────┘
Event Types Reference
┌─────────────────────────────────────────────────────────────────────┐
│ BLOB STORAGE EVENT TYPES │
├─────────────────────────────────────────────────────────────────────┤
│ │
│ Microsoft.Storage.BlobCreated │
│ ───────────────────────────────── │
│ When: New blob created, overwritten, or copied │
│ Trigger: Put, Put Block List, Copy, etc. │
│ Use: Process uploads, validate content │
│ │
│ Microsoft.Storage.BlobDeleted │
│ ───────────────────────────── │
│ When: Blob is permanently deleted │
│ Trigger: Delete operation │
│ Use: Log deletions, trigger cleanup workflows │
│ │
│ Microsoft.Storage.BlobTierChanged │
│ ───────────────────────────────── │
│ When: Access tier changed (Hot → Cool → Archive) │
│ Trigger: SetTier operation │
│ Use: Monitor storage costs, compliance tracking │
│ │
│ Microsoft.Storage.BlobRenamed │
│ ────────────────────────────── │
│ When: Blob is renamed │
│ Trigger: Rename operation │
│ Use: Track file movements, update references │
│ │
│ Microsoft.Storage.BlobSnapshotCreated │
│ ──────────────────────────────────── │
│ When: Snapshot of blob is created │
│ Trigger: CreateSnapshot operation │
│ Use: Version management, backup tracking │
│ │
│ Microsoft.Storage.AsyncOperationInitiated │
│ ────────────────────────────────────────── │
│ When: Long-running operation started │
│ Trigger: Copy, tier change operations │
│ Use: Monitor async operation status │
│ │
└─────────────────────────────────────────────────────────────────────┘
Enable Event Grid on Storage
Enable via Azure Portal
- Navigate to your Storage Account
- Select Events from the left menu
- Click + Event Subscription
- Configure:
- Name:
blob-events - Topic: System topic (auto-created)
- Event Types: Select events to subscribe to
- Endpoint: Choose Function App, Logic App, or Webhook
- Name:
Enable via Azure CLI
# Create system topic (automatic for storage events)
az eventgrid system-topic create \
--name storage-events-topic \
--resource-group my-rg \
--location eastus \
--topic-type Microsoft.Storage.StorageAccounts \
--source /subscriptions/xxx/resourceGroups/my-rg/providers/Microsoft.Storage/storageAccounts/mystorage
# Create event subscription
az eventgrid event-subscription create \
--name blob-processor \
--resource-group my-rg \
--source-resource /subscriptions/xxx/resourceGroups/my-rg/providers/Microsoft.Storage/storageAccounts/mystorage \
--endpoint-type azurefunction \
--endpoint /subscriptions/xxx/resourceGroups/my-rg/providers/Microsoft.Web/sites/my-function-app/functions/BlobHandler \
--included-event-types Microsoft.Storage.BlobCreated Microsoft.Storage.BlobDeleted \
--filter-subject-begins-with /blobServices/default/containers/uploads/
Filter Events
{
"filter": {
"subjectBeginsWith": "blobServices/default/containers/uploads/",
"includedEventTypes": [
"Microsoft.Storage.BlobCreated",
"Microsoft.Storage.BlobDeleted"
],
"advancedFilters": [
{
"key": "data.api",
"operatorType": "StringContains",
"value": "PutBlockList"
}
]
}
}
Function Implementation
Blob Trigger Function
using Microsoft.Azure.WebJobs;
using Microsoft.Extensions.Logging;
public class BlobProcessorFunction
{
private readonly ILogger<BlobProcessorFunction> _logger;
public BlobProcessorFunction(ILogger<BlobProcessorFunction> logger)
{
_logger = logger;
}
[FunctionName("ProcessUploadedBlob")]
public async Task Run(
[BlobTrigger("uploads/{name}", Connection = "StorageConnection")]
Stream blobStream,
string name,
Uri uri)
{
_logger.LogInformation("Processing blob: {Name}", name);
_logger.LogInformation("Blob URI: {Uri}", uri);
_logger.LogInformation("Blob size: {Size} bytes", blobStream.Length);
// Determine file type and process accordingly
var extension = Path.GetExtension(name).ToLowerInvariant();
switch (extension)
{
case ".csv":
await ProcessCsvAsync(blobStream, name);
break;
case ".json":
await ProcessJsonAsync(blobStream, name);
break;
case ".png":
case ".jpg":
case ".jpeg":
await ProcessImageAsync(blobStream, name);
break;
default:
_logger.LogWarning("Unsupported file type: {Extension}", extension);
break;
}
}
private async Task ProcessCsvAsync(Stream stream, string fileName)
{
using var reader = new StreamReader(stream);
var content = await reader.ReadToEndAsync();
_logger.LogInformation("Processing CSV file: {FileName}, {LineCount} lines",
fileName, content.Split('\n').Length);
// Process CSV data...
}
}
Event Grid Trigger Function
using Microsoft.Azure.EventGrid.Models;
using Microsoft.Azure.WebJobs;
using Azure.Messaging.EventGrid;
using Microsoft.Extensions.Logging;
public class EventGridBlobHandler
{
private readonly ILogger<EventGridBlobHandler> _logger;
public EventGridBlobHandler(ILogger<EventGridBlobHandler> logger)
{
_logger = logger;
}
[FunctionName("BlobEventHandler")]
public async Task Run(
[EventGridTrigger] EventGridEvent[] events)
{
foreach (var eventGridEvent in events)
{
_logger.LogInformation("Received event: {EventType}", eventGridEvent.EventType);
switch (eventGridEvent.EventType)
{
case "Microsoft.Storage.BlobCreated":
await HandleBlobCreatedAsync(eventGridEvent);
break;
case "Microsoft.Storage.BlobDeleted":
await HandleBlobDeletedAsync(eventGridEvent);
break;
case "Microsoft.Storage.BlobTierChanged":
await HandleBlobTierChangedAsync(eventGridEvent);
break;
default:
_logger.LogWarning("Unhandled event type: {EventType}", eventGridEvent.EventType);
break;
}
}
}
private async Task HandleBlobCreatedAsync(EventGridEvent eventGridEvent)
{
var blobData = eventGridEvent.Data.ToObjectFromJson<BlobCreatedEventData>();
_logger.LogInformation("Blob created: {Url}", blobData.Url);
_logger.LogInformation("API: {Api}", blobData.Api);
// Process the new blob
await ProcessBlobAsync(blobData.Url);
}
private async Task HandleBlobDeletedAsync(EventGridEvent eventGridEvent)
{
var blobData = eventGridEvent.Data.ToObjectFromJson<BlobDeletedEventData>();
_logger.LogInformation("Blob deleted: {Url}", blobData.Url);
// Log deletion, update references
await LogDeletionAsync(blobData.Url);
}
private async Task HandleBlobTierChangedAsync(EventGridEvent eventGridEvent)
{
var tierData = eventGridEvent.Data.ToObjectFromJson<BlobTierChangedEventData>();
_logger.LogInformation("Blob tier changed: {Url}, New tier: {Tier}",
tierData.Url, tierData.Tier);
// Track tier changes for cost management
await TrackTierChangeAsync(tierData);
}
}
// Data models
public class BlobCreatedEventData
{
public string Url { get; set; }
public string Api { get; set; }
public string ClientRequestId { get; set; }
public string RequestId { get; set; }
}
public class BlobDeletedEventData
{
public string Url { get; set; }
public string ClientRequestId { get; set; }
public string RequestId { get; set; }
}
public class BlobTierChangedEventData
{
public string Url { get; set; }
public string Tier { get; set; }
public string PreviousTier { get; set; }
}
Real-World Use Cases
Image Processing Pipeline
┌─────────────────────────────────────────────────────────────────────┐
│ IMAGE PROCESSING WORKFLOW │
├─────────────────────────────────────────────────────────────────────┤
│ │
│ [Event: BlobCreated] │
│ │ Filter: containers/images/* │
│ └──────┬────────────────────────────────┘ │
│ │ │
│ ▼ │
│ [Check File Type] │
│ └──────┬────────────────────────────────┘ │
│ │ │
│ ▼ │
│ [Resize Image] ────▶ [thumbnail] │
│ └──────┬────────────────────────────────┘ │
│ │ │
│ ▼ │
│ [Extract Metadata] │
│ │ - Width, height │
│ │ - Format │
│ └──────┬────────────────────────────────┘ │
│ │ │
│ ▼ │
│ [Save to Cosmos DB] │
│ └──────┬────────────────────────────────┘ │
│ │ │
│ ▼ │
│ [Send Notification] │
│ │
└─────────────────────────────────────────────────────────────────────┘
[Function("ImageProcessingPipeline")]
public async Task Run(
[EventGridTrigger] EventGridEvent eventGridEvent)
{
var blobData = eventGridEvent.Data.ToObjectFromJson<BlobCreatedEventData>();
// Only process images
if (!IsImageFile(blobData.Url))
{
_logger.LogInformation("Not an image file, skipping");
return;
}
// Download and process
var blobClient = new BlobClient(blobData.Url, new DefaultAzureCredential());
var response = await blobClient.DownloadContentAsync();
var imageData = response.Value.Content;
// Generate thumbnail
await GenerateThumbnailAsync(blobClient);
// Extract and store metadata
var metadata = await ExtractMetadataAsync(imageData);
await StoreMetadataAsync(metadata);
// Send notification
await SendCompletionNotificationAsync(blobData.Url);
}
Data Ingestion Pipeline
[Function("DataIngestionHandler")]
public async Task Run(
[EventGridTrigger] EventGridEvent eventGridEvent)
{
var blobData = eventGridEvent.Data.ToObjectFromJson<BlobCreatedEventData>();
var fileName = Path.GetFileName(blobData.Url);
var extension = Path.GetExtension(fileName).ToLowerInvariant();
switch (extension)
{
case ".csv":
await IngestCsvAsync(blobData.Url);
break;
case ".json":
await IngestJsonAsync(blobData.Url);
break;
case ".parquet":
await IngestParquetAsync(blobData.Url);
break;
default:
_logger.LogWarning("Unsupported file type: {Extension}", extension);
break;
}
}
Filter and Route Events
Subject-Based Filtering
# Subscribe only to specific container
az eventgrid event-subscription create \
--name process-uploads \
--source-resource /subscriptions/xxx/resourceGroups/my-rg/providers/Microsoft.Storage/storageAccounts/mystorage \
--endpoint-type azurefunction \
--endpoint /subscriptions/.../functions/Handler \
--filter-subject-begins-with /blobServices/default/containers/uploads/
Advanced Filtering
# Filter by API operation
az eventgrid event-subscription create \
--name process-block-list \
--source-resource /subscriptions/.../storageAccounts/mystorage \
--endpoint-type azurefunction \
--endpoint /subscriptions/.../functions/Handler \
--advanced-filter "data.api.StringContains:PutBlockList"
# Filter by file extension
az eventgrid event-subscription create \
--name process-csv \
--source-resource /subscriptions/.../storageAccounts/mystorage \
--endpoint-type azurefunction \
--endpoint /subscriptions/.../functions/Handler \
--advanced-filter "subject.StringEndsWith:.csv"
Best Practices
Event Handling
| Practice | Description |
|---|---|
| Use batching | Process multiple events in single invocation |
| Implement idempotency | Handle duplicate events gracefully |
| Add error handling | Catch and log errors properly |
| Use dead-letter | Capture failed events for analysis |
Performance Considerations
[Function("BatchedBlobHandler")]
public async Task Run(
[EventGridTrigger] EventGridEvent[] events)
{
// Process events in batch
var tasks = events.Select(ProcessEventAsync);
await Task.WhenAll(tasks);
}
Related Topics
- Blob Storage — Storage fundamentals
- Blob Managed Identity — Secure access
- Event Grid — Event Grid fundamentals
Azure Integration Hub - Intermediate Level