From cd8dbe5b58b9fb7396c9a229134e4a1d77759de2 Mon Sep 17 00:00:00 2001 From: Roar Mjelde Date: Sat, 9 Nov 2024 22:51:45 +0100 Subject: [PATCH] Multi-threaded upload --- .../Azure/AzureStorageService.cs | 40 +++++++++++++++---- 1 file changed, 32 insertions(+), 8 deletions(-) diff --git a/src/Altinn.Broker.Integrations/Azure/AzureStorageService.cs b/src/Altinn.Broker.Integrations/Azure/AzureStorageService.cs index ea768fd6..499841b6 100644 --- a/src/Altinn.Broker.Integrations/Azure/AzureStorageService.cs +++ b/src/Altinn.Broker.Integrations/Azure/AzureStorageService.cs @@ -18,6 +18,7 @@ public class AzureStorageService(IResourceManager resourceManager, ILogger GetBlobContainerClient(FileTransferEntity fileTransferEntity, ServiceOwnerEntity serviceOwnerEntity) { @@ -51,7 +52,7 @@ public async Task DownloadFile(ServiceOwnerEntity serviceOwnerEntity, Fi } public async Task UploadFile(ServiceOwnerEntity serviceOwnerEntity, FileTransferEntity fileTransferEntity, - Stream stream, long streamLength, CancellationToken cancellationToken) + Stream stream, long streamLength, CancellationToken cancellationToken) { logger.LogInformation($"Starting upload of {fileTransferEntity.FileTransferId} for {serviceOwnerEntity.Name}"); var stopwatch = System.Diagnostics.Stopwatch.StartNew(); @@ -61,12 +62,14 @@ public async Task DownloadFile(ServiceOwnerEntity serviceOwnerEntity, Fi BlockBlobClient blockBlobClient = blobContainerClient.GetBlockBlobClient(fileTransferEntity.FileTransferId.ToString()); using var accumulationBuffer = new MemoryStream(); - var networkReadBuffer = new byte[1024*1024]; + var networkReadBuffer = new byte[1024 * 1024]; var blockList = new List(); long position = 0; using var blobMd5 = MD5.Create(); int blocksInBatch = 0; + var commitTasks = new List(); + var semaphore = new SemaphoreSlim(UPLOAD_THREADS); // Limit to 3 concurrent commit operations while (position < streamLength) { @@ -88,21 +91,41 @@ public async Task DownloadFile(ServiceOwnerEntity serviceOwnerEntity, Fi blockList.Add(blockId); blocksInBatch++; accumulationBuffer.SetLength(0); // Clear accumulation buffer for next block + if (blocksInBatch >= BLOCKS_BEFORE_COMMIT) { - logger.LogInformation($"Committing intermediate batch of {blocksInBatch} blocks at position " + + logger.LogInformation($"Queuing commit of {blocksInBatch} blocks at position " + $"{position / (1024.0 * 1024.0 * 1024.0):N2} GiB"); - await CommitBlocks(blockBlobClient, blockList, firstCommit: blockList.Count == blocksInBatch, null, cancellationToken); + var blocksToCommit = blockList.ToList(); + var isFirstCommit = blockList.Count == blocksInBatch; + + commitTasks.Add(CommitBlocksAsync()); + async Task CommitBlocksAsync() + { + try + { + await semaphore.WaitAsync(cancellationToken); + await CommitBlocks(blockBlobClient, blocksToCommit, firstCommit: isFirstCommit, null, cancellationToken); + + var uploadSpeedMBps = position / (1024.0 * 1024) / (stopwatch.ElapsedMilliseconds / 1000.0); + logger.LogInformation($"Completed intermediate commit. Upload progress for {fileTransferEntity.FileTransferId}: " + + $"{position / (1024.0 * 1024.0 * 1024.0):N2} GiB ({uploadSpeedMBps:N2} MB/s)"); + } + finally + { + semaphore.Release(); + } + } + blocksInBatch = 0; - // Keep the block list for the final commit - var uploadSpeedMBps = position / (1024.0 * 1024) / (stopwatch.ElapsedMilliseconds / 1000.0); - logger.LogInformation($"Upload progress for {fileTransferEntity.FileTransferId}: " + - $"{position / (1024.0 * 1024.0 * 1024.0):N2} GiB ({uploadSpeedMBps:N2} MB/s)"); } } } + // Wait for all pending commits to complete + await Task.WhenAll(commitTasks); + // Final commit with all blocks blobMd5.TransformFinalBlock(Array.Empty(), 0, 0); if (blobMd5.Hash is null) @@ -126,6 +149,7 @@ public async Task DownloadFile(ServiceOwnerEntity serviceOwnerEntity, Fi } } + private async Task UploadBlock(BlockBlobClient client, string blockId, byte[] blockData, CancellationToken cancellationToken) { await BlobRetryPolicy.ExecuteAsync(async () =>