Blob Storage — Concurrent Uploads and Lease Lock Patterns
The Problem
Your blob upload system has issues:
- Multiple workers uploading same file causes corruption
- Large files timeout or fail mid-upload
- No way to safely update files being read by others
- Upload performance is poor (not utilizing bandwidth)
- Race conditions cause data loss
You need robust concurrent upload handling with proper locking.
Solution Implementation
Step 1: Parallel Block Upload for Large Files
import { BlockBlobClient, StorageClient } from "@azure/storage-blob";
class ParallelUploader {
private blockSize: number = 4 * 1024 * 1024; // 4MB blocks
async uploadLargeFile(
localFilePath: string,
containerName: string,
blobName: string,
connectionString: string
): Promise<void> {
const blockBlobClient = new BlockBlobClient(
connectionString,
containerName,
blobName
);
// Get file size
const fs = require("fs");
const stats = fs.statSync(localFilePath);
const fileSize = stats.size;
// Calculate blocks
const blockCount = Math.ceil(fileSize / this.blockSize);
const blockIds: string[] = [];
// Upload blocks in parallel
const uploadPromises: Promise<void>[] = [];
for (let i = 0; i < blockCount; i++) {
const start = i * this.blockSize;
const end = Math.min(start + this.blockSize, fileSize);
// Read block
const buffer = Buffer.alloc(end - start);
const fd = fs.openSync(localFilePath, "r");
fs.readSync(fd, buffer, 0, end - start, start);
fs.closeSync(fd);
// Generate block ID
const blockId = Buffer.from(`block-${i.toString().padStart(6, "0")}`).toString("base64");
blockIds.push(blockId);
// Upload block
const uploadPromise = blockBlobClient.stageBlock(blockId, buffer, buffer.length)
.then(() => console.log(`Uploaded block ${i + 1}/${blockCount}`));
uploadPromises.push(uploadPromise);
// Limit concurrent uploads to prevent memory issues
if (uploadPromises.length >= 10) {
await Promise.all(uploadPromises);
uploadPromises.length = 0;
}
}
// Wait for remaining uploads
if (uploadPromises.length > 0) {
await Promise.all(uploadPromises);
}
// Commit blocks
await blockBlobClient.commitBlockList(blockIds);
console.log(`Upload complete: ${blobName}`);
}
}
Step 2: Blob Lease for Safe Updates
import { BlobServiceClient, LeaseClient } from "@azure/storage-blob";
class SafeBlobUpdater {
private blobServiceClient: BlobServiceClient;
constructor(connectionString: string) {
this.blobServiceClient = new BlobServiceClient(connectionString);
}
// Acquire lease, update, release
async updateBlobSafely(
containerName: string,
blobName: string,
newContent: string
): Promise<void> {
const containerClient = this.blobServiceClient.getContainerClient(containerName);
const blobClient = containerClient.getBlockBlobClient(blobName);
// Step 1: Acquire lease (exclusive, 60 seconds)
const leaseClient = new LeaseClient(blobClient);
const leaseDuration = 60;
let leaseResponse;
try {
leaseResponse = await leaseClient.acquireLease(leaseDuration);
} catch (error) {
if (error.statusCode === 409) {
throw new Error("Blob is already locked by another process");
}
throw error;
}
try {
// Step 2: Read current content while holding lease
const existingContent = await this.downloadBlob(containerName, blobName);
const updatedContent = this.mergeContent(existingContent, newContent);
// Step 3: Upload new content
await blobClient.upload(updatedContent, {
conditions: { leaseId: leaseResponse.leaseId }
});
console.log(`Blob ${blobName} updated successfully`);
}
finally {
// Step 4: Release lease
await leaseClient.releaseLease(leaseResponse.leaseId);
}
}
private async downloadBlob(containerName: string, blobName: string): Promise<string> {
const containerClient = this.blobServiceClient.getContainerClient(containerName);
const blobClient = containerClient.getBlockBlobClient(blobName);
const response = await blobClient.download();
const chunks: Buffer[] = [];
for await (const chunk of response.value) {
chunks.push(chunk);
}
return Buffer.concat(chunks).toString();
}
private mergeContent(existing: string, update: string): string {
// Implement your merge logic
return update;
}
}
Step 3: Optimistic Concurrency with ETag
class OptimisticConcurrencyUploader {
async uploadWithETagCheck(
containerName: string,
blobName: string,
content: string,
expectedETag?: string
): Promise<boolean> {
const blobServiceClient = new BlobServiceClient(process.env["AZURE_STORAGE_CONNECTION"]!);
const containerClient = blobServiceClient.getContainerClient(containerName);
const blobClient = containerClient.getBlockBlobClient(blobName);
try {
const uploadResponse = await blobClient.upload(content, {
conditions: expectedETag
? { ifMatch: expectedETag }
: undefined
});
console.log(`Upload successful, ETag: ${uploadResponse.etag}`);
return true;
}
catch (error: any) {
if (error.statusCode === 412) {
// Precondition failed - blob was modified
console.log("Blob was modified by another process");
return false;
}
throw error;
}
}
async getBlobETag(containerName: string, blobName: string): Promise<string> {
const blobServiceClient = new BlobServiceClient(process.env["AZURE_STORAGE_CONNECTION"]!);
const containerClient = blobServiceClient.getContainerClient(containerName);
const blobClient = containerClient.getBlockBlobClient(blobName);
const props = await blobClient.getProperties();
return props.etag;
}
// Use case: update configuration safely
async updateConfig(newConfig: any): Promise<void> {
const containerName = "config";
const blobName = "settings.json";
// Get current ETag
const currentETag = await this.getBlobETag(containerName, blobName);
// Modify config
const newContent = JSON.stringify(newConfig, null, 2);
// Try upload
const success = await this.uploadWithETagCheck(
containerName,
blobName,
newContent,
currentETag
);
if (!success) {
// Fetch latest, merge, retry
const latestConfig = await this.downloadConfig();
const mergedConfig = this.mergeConfig(latestConfig, newConfig);
await this.updateConfig(mergedConfig);
}
}
}
Step 4: Distributed Lock with Blob
class DistributedBlobLock {
private blobServiceClient: BlobServiceClient;
private lockContainer = "locks";
private defaultTimeout = 300; // 5 minutes
constructor(connectionString: string) {
this.blobServiceClient = new BlobServiceClient(connectionString);
}
async acquireLock(
resourceName: string,
ownerId: string,
timeoutSeconds: number = this.defaultTimeout
): Promise<boolean> {
const containerClient = this.blobServiceClient.getContainerClient(this.lockContainer);
await containerClient.createIfNotExists();
const lockBlobClient = containerClient.getBlockBlobClient(`${resourceName}.lock`);
const lockContent = JSON.stringify({
owner: ownerId,
acquiredAt: new Date().toISOString(),
expiresAt: new Date(Date.now() + timeoutSeconds * 1000).toISOString()
});
try {
await lockBlobClient.upload(lockContent, {
conditions: { ifNoneMatch: "*" } // Only if doesn't exist
});
console.log(`Lock acquired for ${resourceName}`);
return true;
}
catch (error: any) {
if (error.statusCode === 412) {
// Lock exists, check if expired
const existing = await this.getLockInfo(resourceName);
if (this.isLockExpired(existing)) {
// Force acquire
await this.forceAcquireLock(resourceName, ownerId, timeoutSeconds);
return true;
}
return false;
}
throw error;
}
}
async releaseLock(resourceName: string, ownerId: string): Promise<void> {
const containerClient = this.blobServiceClient.getContainerClient(this.lockContainer);
const lockBlobClient = containerClient.getBlockBlobClient(`${resourceName}.lock`);
try {
// Only release if we own it
const props = await lockBlobClient.getProperties();
const lockContent = await this.downloadLockContent(resourceName);
if (lockContent.owner === ownerId) {
await lockBlobClient.delete();
console.log(`Lock released for ${resourceName}`);
}
}
catch (error) {
console.log(`Failed to release lock: ${error}`);
}
}
private async getLockInfo(resourceName: string): Promise<any> {
const containerClient = this.blobServiceClient.getContainerClient(this.lockContainer);
const lockBlobClient = containerClient.getBlockBlobClient(`${resourceName}.lock`);
try {
const response = await lockBlobClient.download();
const chunks: Buffer[] = [];
for await (const chunk of response.value) {
chunks.push(chunk);
}
return JSON.parse(Buffer.concat(chunks).toString());
}
catch {
return null;
}
}
private isLockExpired(lock: any): boolean {
if (!lock) return true;
return new Date(lock.expiresAt) < new Date();
}
}
// Usage with lock
async function processWithLock() {
const lockManager = new DistributedBlobLock(process.env["AZURE_STORAGE_CONNECTION"]!);
const ownerId = `worker-${process.pid}`;
const resource = "important-file.json";
const acquired = await lockManager.acquireLock(resource, ownerId);
if (acquired) {
try {
// Do work
await processImportantFile();
}
finally {
await lockManager.releaseLock(resource, ownerId);
}
}
else {
console.log("Could not acquire lock, will retry later");
}
}
Step 5: Append Blob for Streaming Writes
import { AppendBlobClient } from "@azure/storage-blob";
class StreamingLogWriter {
private appendBlobClient: AppendBlobClient;
constructor(connectionString: string) {
const blobServiceClient = new BlobServiceClient(connectionString);
const containerClient = blobServiceClient.getContainerClient("logs");
this.appendBlobClient = containerClient.getAppendBlobClient("application.log");
}
async appendLog(entry: string): Promise<void> {
// Create if doesn't exist
try {
await this.appendBlobClient.createIfNotExists({
headers: {
"x-ms-blob-content-type": "text/plain"
}
});
}
catch (e) {
// Already exists
}
// Append new content
const content = `${new Date().toISOString()} - ${entry}\n`;
const buffer = Buffer.from(content, "utf-8");
await this.appendBlobClient.appendBlock(buffer);
}
// Batch appends for performance
async appendLogsBatch(entries: string[]): Promise<void> {
const content = entries
.map(e => `${new Date().toISOString()} - ${e}\n`)
.join("");
const buffer = Buffer.from(content, "utf-8");
await this.appendBlobClient.appendBlock(buffer);
}
}
Step 6: Resume Failed Upload
class ResumableUploader {
private chunkSize = 1024 * 1024; // 1MB
async uploadWithResume(
localPath: string,
containerName: string,
blobName: string
): Promise<void> {
const blobServiceClient = new BlobServiceClient(process.env["AZURE_STORAGE_CONNECTION"]!);
const containerClient = blobServiceClient.getContainerClient(containerName);
const blockBlobClient = containerClient.getBlockBlobClient(blobName);
const fs = require("fs");
const stats = fs.statSync(localPath);
const totalSize = stats.size;
// Get existing blocks
let committedBlocks: string[] = [];
try {
const blockList = await blockBlobClient.getBlockList("committed");
committedBlocks = blockList.blockList?.map(b => b.name) || [];
}
catch {
// No existing upload
}
// Calculate which blocks need upload
const blockCount = Math.ceil(totalSize / this.chunkSize);
const blocksToUpload: number[] = [];
for (let i = 0; i < blockCount; i++) {
const blockId = Buffer.from(`block-${i.toString().padStart(6, "0")}`).toString("base64");
if (!committedBlocks.includes(blockId)) {
blocksToUpload.push(i);
}
}
console.log(`Resuming: ${blocksToUpload.length} blocks to upload`);
// Upload missing blocks
for (const blockIndex of blocksToUpload) {
const start = blockIndex * this.chunkSize;
const end = Math.min(start + this.chunkSize, totalSize);
const buffer = Buffer.alloc(end - start);
const fd = fs.openSync(localPath, "r");
fs.readSync(fd, buffer, 0, end - start, start);
fs.closeSync(fd);
const blockId = Buffer.from(`block-${blockIndex.toString().padStart(6, "0")}`).toString("base64");
await blockBlobClient.stageBlock(blockId, buffer);
}
// Commit all blocks
const allBlockIds = Array.from({ length: blockCount }, (_, i) =>
Buffer.from(`block-${i.toString().padStart(6, "0")}`).toString("base64")
);
await blockBlobClient.commitBlockList(allBlockIds);
console.log("Upload complete");
}
}
Best Practices
| Practice | Benefit |
|---|---|
| Use block blobs for large files | Parallel upload |
| Use append blobs for logs | Incrementally add |
| Implement leases for updates | Prevent corruption |
| Use ETags for optimistic locking | Detect conflicts |
| Use distributed locks | Coordinate workers |
| Implement resume logic | Handle failures |
Summary
- Use parallel block uploads for large files (up to 4MB blocks)
- Implement blob leases to prevent concurrent modification
- Use ETag-based optimistic concurrency for safe updates
- Create distributed locks using blobs for multi-worker coordination
- Implement append blobs for streaming log writes
- Build resume capability for failed uploads