Skip to content

Commit

Permalink
Multi-threaded upload
Browse files Browse the repository at this point in the history
  • Loading branch information
Ceredron committed Nov 9, 2024
1 parent 3ddb3ab commit cd8dbe5
Showing 1 changed file with 32 additions and 8 deletions.
40 changes: 32 additions & 8 deletions src/Altinn.Broker.Integrations/Azure/AzureStorageService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ public class AzureStorageService(IResourceManager resourceManager, ILogger<Azure
{
private const int BLOCK_SIZE = 1024 * 1024 * 32; // 32MB
private const int BLOCKS_BEFORE_COMMIT = 5;
private const int UPLOAD_THREADS = 3;

private async Task<BlobContainerClient> GetBlobContainerClient(FileTransferEntity fileTransferEntity, ServiceOwnerEntity serviceOwnerEntity)
{
Expand Down Expand Up @@ -51,7 +52,7 @@ public async Task<Stream> DownloadFile(ServiceOwnerEntity serviceOwnerEntity, Fi
}

public async Task<string?> UploadFile(ServiceOwnerEntity serviceOwnerEntity, FileTransferEntity fileTransferEntity,
Stream stream, long streamLength, CancellationToken cancellationToken)
Stream stream, long streamLength, CancellationToken cancellationToken)
{
logger.LogInformation($"Starting upload of {fileTransferEntity.FileTransferId} for {serviceOwnerEntity.Name}");
var stopwatch = System.Diagnostics.Stopwatch.StartNew();
Expand All @@ -61,12 +62,14 @@ public async Task<Stream> DownloadFile(ServiceOwnerEntity serviceOwnerEntity, Fi
BlockBlobClient blockBlobClient = blobContainerClient.GetBlockBlobClient(fileTransferEntity.FileTransferId.ToString());

using var accumulationBuffer = new MemoryStream();
var networkReadBuffer = new byte[1024*1024];
var networkReadBuffer = new byte[1024 * 1024];
var blockList = new List<string>();
long position = 0;
using var blobMd5 = MD5.Create();

int blocksInBatch = 0;
var commitTasks = new List<Task>();
var semaphore = new SemaphoreSlim(UPLOAD_THREADS); // Limit to 3 concurrent commit operations

while (position < streamLength)
{
Expand All @@ -88,21 +91,41 @@ public async Task<Stream> DownloadFile(ServiceOwnerEntity serviceOwnerEntity, Fi
blockList.Add(blockId);
blocksInBatch++;
accumulationBuffer.SetLength(0); // Clear accumulation buffer for next block

if (blocksInBatch >= BLOCKS_BEFORE_COMMIT)
{
logger.LogInformation($"Committing intermediate batch of {blocksInBatch} blocks at position " +
logger.LogInformation($"Queuing commit of {blocksInBatch} blocks at position " +
$"{position / (1024.0 * 1024.0 * 1024.0):N2} GiB");

await CommitBlocks(blockBlobClient, blockList, firstCommit: blockList.Count == blocksInBatch, null, cancellationToken);
var blocksToCommit = blockList.ToList();
var isFirstCommit = blockList.Count == blocksInBatch;

commitTasks.Add(CommitBlocksAsync());
async Task CommitBlocksAsync()
{
try
{
await semaphore.WaitAsync(cancellationToken);
await CommitBlocks(blockBlobClient, blocksToCommit, firstCommit: isFirstCommit, null, cancellationToken);

var uploadSpeedMBps = position / (1024.0 * 1024) / (stopwatch.ElapsedMilliseconds / 1000.0);
logger.LogInformation($"Completed intermediate commit. Upload progress for {fileTransferEntity.FileTransferId}: " +
$"{position / (1024.0 * 1024.0 * 1024.0):N2} GiB ({uploadSpeedMBps:N2} MB/s)");
}
finally
{
semaphore.Release();
}
}

blocksInBatch = 0;
// Keep the block list for the final commit
var uploadSpeedMBps = position / (1024.0 * 1024) / (stopwatch.ElapsedMilliseconds / 1000.0);
logger.LogInformation($"Upload progress for {fileTransferEntity.FileTransferId}: " +
$"{position / (1024.0 * 1024.0 * 1024.0):N2} GiB ({uploadSpeedMBps:N2} MB/s)");
}
}
}

// Wait for all pending commits to complete
await Task.WhenAll(commitTasks);

// Final commit with all blocks
blobMd5.TransformFinalBlock(Array.Empty<byte>(), 0, 0);
if (blobMd5.Hash is null)
Expand All @@ -126,6 +149,7 @@ public async Task<Stream> DownloadFile(ServiceOwnerEntity serviceOwnerEntity, Fi
}
}


private async Task UploadBlock(BlockBlobClient client, string blockId, byte[] blockData, CancellationToken cancellationToken)
{
await BlobRetryPolicy.ExecuteAsync(async () =>
Expand Down

0 comments on commit cd8dbe5

Please sign in to comment.