← Back to ArticlesStorage

Blob Storage — Concurrent Uploads and Lease Lock Patterns

Implementing high-performance concurrent uploads, file locking, and preventing race conditions with Azure Blob Storage.

Blob Storage — Concurrent Uploads and Lease Lock Patterns

The Problem

Your blob upload system has issues:

You need robust concurrent upload handling with proper locking.

Solution Implementation

Step 1: Parallel Block Upload for Large Files

import { BlockBlobClient, StorageClient } from "@azure/storage-blob";

class ParallelUploader {
    private blockSize: number = 4 * 1024 * 1024; // 4MB blocks
    
    async uploadLargeFile(
        localFilePath: string,
        containerName: string,
        blobName: string,
        connectionString: string
    ): Promise<void> {
        const blockBlobClient = new BlockBlobClient(
            connectionString,
            containerName,
            blobName
        );
        
        // Get file size
        const fs = require("fs");
        const stats = fs.statSync(localFilePath);
        const fileSize = stats.size;
        
        // Calculate blocks
        const blockCount = Math.ceil(fileSize / this.blockSize);
        const blockIds: string[] = [];
        
        // Upload blocks in parallel
        const uploadPromises: Promise<void>[] = [];
        
        for (let i = 0; i < blockCount; i++) {
            const start = i * this.blockSize;
            const end = Math.min(start + this.blockSize, fileSize);
            
            // Read block
            const buffer = Buffer.alloc(end - start);
            const fd = fs.openSync(localFilePath, "r");
            fs.readSync(fd, buffer, 0, end - start, start);
            fs.closeSync(fd);
            
            // Generate block ID
            const blockId = Buffer.from(`block-${i.toString().padStart(6, "0")}`).toString("base64");
            blockIds.push(blockId);
            
            // Upload block
            const uploadPromise = blockBlobClient.stageBlock(blockId, buffer, buffer.length)
                .then(() => console.log(`Uploaded block ${i + 1}/${blockCount}`));
            
            uploadPromises.push(uploadPromise);
            
            // Limit concurrent uploads to prevent memory issues
            if (uploadPromises.length >= 10) {
                await Promise.all(uploadPromises);
                uploadPromises.length = 0;
            }
        }
        
        // Wait for remaining uploads
        if (uploadPromises.length > 0) {
            await Promise.all(uploadPromises);
        }
        
        // Commit blocks
        await blockBlobClient.commitBlockList(blockIds);
        
        console.log(`Upload complete: ${blobName}`);
    }
}

Step 2: Blob Lease for Safe Updates

import { BlobServiceClient, LeaseClient } from "@azure/storage-blob";

class SafeBlobUpdater {
    private blobServiceClient: BlobServiceClient;
    
    constructor(connectionString: string) {
        this.blobServiceClient = new BlobServiceClient(connectionString);
    }
    
    // Acquire lease, update, release
    async updateBlobSafely(
        containerName: string,
        blobName: string,
        newContent: string
    ): Promise<void> {
        const containerClient = this.blobServiceClient.getContainerClient(containerName);
        const blobClient = containerClient.getBlockBlobClient(blobName);
        
        // Step 1: Acquire lease (exclusive, 60 seconds)
        const leaseClient = new LeaseClient(blobClient);
        const leaseDuration = 60;
        
        let leaseResponse;
        try {
            leaseResponse = await leaseClient.acquireLease(leaseDuration);
        } catch (error) {
            if (error.statusCode === 409) {
                throw new Error("Blob is already locked by another process");
            }
            throw error;
        }
        
        try {
            // Step 2: Read current content while holding lease
            const existingContent = await this.downloadBlob(containerName, blobName);
            const updatedContent = this.mergeContent(existingContent, newContent);
            
            // Step 3: Upload new content
            await blobClient.upload(updatedContent, { 
                conditions: { leaseId: leaseResponse.leaseId }
            });
            
            console.log(`Blob ${blobName} updated successfully`);
        }
        finally {
            // Step 4: Release lease
            await leaseClient.releaseLease(leaseResponse.leaseId);
        }
    }
    
    private async downloadBlob(containerName: string, blobName: string): Promise<string> {
        const containerClient = this.blobServiceClient.getContainerClient(containerName);
        const blobClient = containerClient.getBlockBlobClient(blobName);
        
        const response = await blobClient.download();
        const chunks: Buffer[] = [];
        
        for await (const chunk of response.value) {
            chunks.push(chunk);
        }
        
        return Buffer.concat(chunks).toString();
    }
    
    private mergeContent(existing: string, update: string): string {
        // Implement your merge logic
        return update;
    }
}

Step 3: Optimistic Concurrency with ETag

class OptimisticConcurrencyUploader {
    async uploadWithETagCheck(
        containerName: string,
        blobName: string,
        content: string,
        expectedETag?: string
    ): Promise<boolean> {
        const blobServiceClient = new BlobServiceClient(process.env["AZURE_STORAGE_CONNECTION"]!);
        const containerClient = blobServiceClient.getContainerClient(containerName);
        const blobClient = containerClient.getBlockBlobClient(blobName);
        
        try {
            const uploadResponse = await blobClient.upload(content, {
                conditions: expectedETag 
                    ? { ifMatch: expectedETag }
                    : undefined
            });
            
            console.log(`Upload successful, ETag: ${uploadResponse.etag}`);
            return true;
        }
        catch (error: any) {
            if (error.statusCode === 412) {
                // Precondition failed - blob was modified
                console.log("Blob was modified by another process");
                return false;
            }
            throw error;
        }
    }
    
    async getBlobETag(containerName: string, blobName: string): Promise<string> {
        const blobServiceClient = new BlobServiceClient(process.env["AZURE_STORAGE_CONNECTION"]!);
        const containerClient = blobServiceClient.getContainerClient(containerName);
        const blobClient = containerClient.getBlockBlobClient(blobName);
        
        const props = await blobClient.getProperties();
        return props.etag;
    }
    
    // Use case: update configuration safely
    async updateConfig(newConfig: any): Promise<void> {
        const containerName = "config";
        const blobName = "settings.json";
        
        // Get current ETag
        const currentETag = await this.getBlobETag(containerName, blobName);
        
        // Modify config
        const newContent = JSON.stringify(newConfig, null, 2);
        
        // Try upload
        const success = await this.uploadWithETagCheck(
            containerName,
            blobName,
            newContent,
            currentETag
        );
        
        if (!success) {
            // Fetch latest, merge, retry
            const latestConfig = await this.downloadConfig();
            const mergedConfig = this.mergeConfig(latestConfig, newConfig);
            await this.updateConfig(mergedConfig);
        }
    }
}

Step 4: Distributed Lock with Blob

class DistributedBlobLock {
    private blobServiceClient: BlobServiceClient;
    private lockContainer = "locks";
    private defaultTimeout = 300; // 5 minutes
    
    constructor(connectionString: string) {
        this.blobServiceClient = new BlobServiceClient(connectionString);
    }
    
    async acquireLock(
        resourceName: string,
        ownerId: string,
        timeoutSeconds: number = this.defaultTimeout
    ): Promise<boolean> {
        const containerClient = this.blobServiceClient.getContainerClient(this.lockContainer);
        await containerClient.createIfNotExists();
        
        const lockBlobClient = containerClient.getBlockBlobClient(`${resourceName}.lock`);
        
        const lockContent = JSON.stringify({
            owner: ownerId,
            acquiredAt: new Date().toISOString(),
            expiresAt: new Date(Date.now() + timeoutSeconds * 1000).toISOString()
        });
        
        try {
            await lockBlobClient.upload(lockContent, { 
                conditions: { ifNoneMatch: "*" }  // Only if doesn't exist
            });
            console.log(`Lock acquired for ${resourceName}`);
            return true;
        }
        catch (error: any) {
            if (error.statusCode === 412) {
                // Lock exists, check if expired
                const existing = await this.getLockInfo(resourceName);
                if (this.isLockExpired(existing)) {
                    // Force acquire
                    await this.forceAcquireLock(resourceName, ownerId, timeoutSeconds);
                    return true;
                }
                return false;
            }
            throw error;
        }
    }
    
    async releaseLock(resourceName: string, ownerId: string): Promise<void> {
        const containerClient = this.blobServiceClient.getContainerClient(this.lockContainer);
        const lockBlobClient = containerClient.getBlockBlobClient(`${resourceName}.lock`);
        
        try {
            // Only release if we own it
            const props = await lockBlobClient.getProperties();
            const lockContent = await this.downloadLockContent(resourceName);
            
            if (lockContent.owner === ownerId) {
                await lockBlobClient.delete();
                console.log(`Lock released for ${resourceName}`);
            }
        }
        catch (error) {
            console.log(`Failed to release lock: ${error}`);
        }
    }
    
    private async getLockInfo(resourceName: string): Promise<any> {
        const containerClient = this.blobServiceClient.getContainerClient(this.lockContainer);
        const lockBlobClient = containerClient.getBlockBlobClient(`${resourceName}.lock`);
        
        try {
            const response = await lockBlobClient.download();
            const chunks: Buffer[] = [];
            for await (const chunk of response.value) {
                chunks.push(chunk);
            }
            return JSON.parse(Buffer.concat(chunks).toString());
        }
        catch {
            return null;
        }
    }
    
    private isLockExpired(lock: any): boolean {
        if (!lock) return true;
        return new Date(lock.expiresAt) < new Date();
    }
}

// Usage with lock
async function processWithLock() {
    const lockManager = new DistributedBlobLock(process.env["AZURE_STORAGE_CONNECTION"]!);
    const ownerId = `worker-${process.pid}`;
    const resource = "important-file.json";
    
    const acquired = await lockManager.acquireLock(resource, ownerId);
    
    if (acquired) {
        try {
            // Do work
            await processImportantFile();
        }
        finally {
            await lockManager.releaseLock(resource, ownerId);
        }
    }
    else {
        console.log("Could not acquire lock, will retry later");
    }
}

Step 5: Append Blob for Streaming Writes

import { AppendBlobClient } from "@azure/storage-blob";

class StreamingLogWriter {
    private appendBlobClient: AppendBlobClient;
    
    constructor(connectionString: string) {
        const blobServiceClient = new BlobServiceClient(connectionString);
        const containerClient = blobServiceClient.getContainerClient("logs");
        this.appendBlobClient = containerClient.getAppendBlobClient("application.log");
    }
    
    async appendLog(entry: string): Promise<void> {
        // Create if doesn't exist
        try {
            await this.appendBlobClient.createIfNotExists({
                headers: { 
                    "x-ms-blob-content-type": "text/plain" 
                }
            });
        }
        catch (e) {
            // Already exists
        }
        
        // Append new content
        const content = `${new Date().toISOString()} - ${entry}\n`;
        const buffer = Buffer.from(content, "utf-8");
        
        await this.appendBlobClient.appendBlock(buffer);
    }
    
    // Batch appends for performance
    async appendLogsBatch(entries: string[]): Promise<void> {
        const content = entries
            .map(e => `${new Date().toISOString()} - ${e}\n`)
            .join("");
        
        const buffer = Buffer.from(content, "utf-8");
        
        await this.appendBlobClient.appendBlock(buffer);
    }
}

Step 6: Resume Failed Upload

class ResumableUploader {
    private chunkSize = 1024 * 1024; // 1MB
    
    async uploadWithResume(
        localPath: string,
        containerName: string,
        blobName: string
    ): Promise<void> {
        const blobServiceClient = new BlobServiceClient(process.env["AZURE_STORAGE_CONNECTION"]!);
        const containerClient = blobServiceClient.getContainerClient(containerName);
        const blockBlobClient = containerClient.getBlockBlobClient(blobName);
        
        const fs = require("fs");
        const stats = fs.statSync(localPath);
        const totalSize = stats.size;
        
        // Get existing blocks
        let committedBlocks: string[] = [];
        try {
            const blockList = await blockBlobClient.getBlockList("committed");
            committedBlocks = blockList.blockList?.map(b => b.name) || [];
        }
        catch {
            // No existing upload
        }
        
        // Calculate which blocks need upload
        const blockCount = Math.ceil(totalSize / this.chunkSize);
        const blocksToUpload: number[] = [];
        
        for (let i = 0; i < blockCount; i++) {
            const blockId = Buffer.from(`block-${i.toString().padStart(6, "0")}`).toString("base64");
            if (!committedBlocks.includes(blockId)) {
                blocksToUpload.push(i);
            }
        }
        
        console.log(`Resuming: ${blocksToUpload.length} blocks to upload`);
        
        // Upload missing blocks
        for (const blockIndex of blocksToUpload) {
            const start = blockIndex * this.chunkSize;
            const end = Math.min(start + this.chunkSize, totalSize);
            
            const buffer = Buffer.alloc(end - start);
            const fd = fs.openSync(localPath, "r");
            fs.readSync(fd, buffer, 0, end - start, start);
            fs.closeSync(fd);
            
            const blockId = Buffer.from(`block-${blockIndex.toString().padStart(6, "0")}`).toString("base64");
            await blockBlobClient.stageBlock(blockId, buffer);
        }
        
        // Commit all blocks
        const allBlockIds = Array.from({ length: blockCount }, (_, i) => 
            Buffer.from(`block-${i.toString().padStart(6, "0")}`).toString("base64")
        );
        
        await blockBlobClient.commitBlockList(allBlockIds);
        console.log("Upload complete");
    }
}

Best Practices

PracticeBenefit
Use block blobs for large filesParallel upload
Use append blobs for logsIncrementally add
Implement leases for updatesPrevent corruption
Use ETags for optimistic lockingDetect conflicts
Use distributed locksCoordinate workers
Implement resume logicHandle failures

Summary