Skip to content

Commit

Permalink
Merge branch 'bugfix/OTAPMCL-92/fix-large-file-upload-to-azure-blob-s…
Browse files Browse the repository at this point in the history
…torage' into 'master'

OTAPMCL-92 Fix uploading file to AzureBlobStorage

Closes OTAPMCL-92

See merge request olp/edge/ota/connect/back-end/ota-tuf!309
  • Loading branch information
viktormolitskyihere committed Apr 9, 2021
2 parents df073eb + 74b05ce commit 87e8d57
Showing 1 changed file with 41 additions and 29 deletions.
Original file line number Diff line number Diff line change
@@ -1,37 +1,27 @@
package com.advancedtelematic.tuf.reposerver.target_store
import java.nio.charset.StandardCharsets
import java.nio.ByteBuffer
import java.time.OffsetDateTime
import java.util.{Base64, UUID}

import akka.http.scaladsl.model.Uri
import akka.http.scaladsl.util.FastFuture
import akka.stream.scaladsl.{Flow, Framing, Keep, Sink, Source}
import akka.stream.stage.{GraphStage, GraphStageLogic, InHandler}
import akka.stream.{Attributes, Inlet, Materializer, SinkShape}
import akka.stream.scaladsl.{Broadcast, Flow, GraphDSL, Keep, Sink, Source}
import akka.stream.{Materializer, SinkShape}
import akka.util.ByteString
import akka.NotUsed
import akka.actor.ActorSystem
import com.advancedtelematic.libats.data.DataType.{Checksum, HashMethod, ValidChecksum}
import com.advancedtelematic.libtuf.data.TufDataType
import com.advancedtelematic.libtuf.data.TufDataType.{MultipartUploadId, RepoId, TargetFilename}
import com.advancedtelematic.libtuf_server.crypto.Sha256Digest
import com.advancedtelematic.tuf.reposerver.http.Errors
import com.advancedtelematic.tuf.reposerver.target_store.AzureTargetStoreEngine.BlobStorageSettings
import com.azure.core.util.logging.ClientLogger
import com.azure.identity.{ClientSecretCredentialBuilder, DefaultAzureCredentialBuilder}
import com.azure.storage.blob.{BlobAsyncClient, BlobClientBuilder, BlobContainerAsyncClient, BlobContainerClient, BlobContainerClientBuilder, BlobServiceAsyncClient, BlobServiceClientBuilder}
import com.azure.storage.blob.models.BlockBlobItem
import com.azure.storage.blob.sas.{BlobSasPermission, BlobServiceSasSignatureValues}
import com.azure.storage.blob.specialized.BlockBlobAsyncClient
import com.azure.storage.common.implementation.connectionstring.StorageConnectionString
import org.bouncycastle.util.encoders.Hex
import com.azure.storage.blob.{BlobClientBuilder, BlobContainerAsyncClient, BlobContainerClientBuilder}
import org.slf4j.LoggerFactory
import reactor.core.publisher.{Flux, Mono}

import java.nio.ByteBuffer
import java.nio.charset.StandardCharsets
import java.time.OffsetDateTime
import java.util.{Base64, UUID}
import scala.collection.mutable.ListBuffer
import scala.concurrent.{ExecutionContext, Future, Promise}
import scala.concurrent.duration.FiniteDuration
import scala.concurrent.{ExecutionContext, Future}

object AzureTargetStoreEngine {

Expand All @@ -41,9 +31,13 @@ object AzureTargetStoreEngine {

class AzureTargetStoreEngine(private val settings: BlobStorageSettings)(implicit mat: Materializer, ec: ExecutionContext) extends TargetStoreEngine {
import scala.compat.java8.FutureConverters._
import scala.concurrent.duration._

private[this] val log = LoggerFactory.getLogger(this.getClass)

private[this] val GroupWithinBytes = 1024 * 1024 * 25 //25Mb
private[this] val GroupWithinTime = 2.seconds

private[this] def containerClientFor(repoId: TufDataType.RepoId): BlobContainerAsyncClient = {
new BlobContainerClientBuilder()
.connectionString(settings.connectionString)
Expand All @@ -58,23 +52,41 @@ class AzureTargetStoreEngine(private val settings: BlobStorageSettings)(implicit
blobClient.stageBlock(blockId, Flux.just[ByteBuffer](data.toByteBuffer), data.length.toLong).toFuture.toScala.map(_ => blockId)
}

def upload(blobClient: BlockBlobAsyncClient): Sink[ByteString, Future[TargetStoreEngine.TargetStoreResult]] =
Flow[ByteString]
.alsoToMat(Sha256Digest.asSink)(Keep.right)
.alsoToMat(Sink.fold(0)(_ + _.length))(Keep.both)
def upload(blobClient: BlockBlobAsyncClient): Sink[ByteString, Future[TargetStoreEngine.TargetStoreResult]] = {

val checksumSink = Flow[ByteString].toMat(Sha256Digest.asSink)(Keep.right)
val lengthSink = Flow[ByteString].toMat(Sink.fold(0)(_ + _.length))(Keep.right)
val uploadSink = Flow[ByteString]
.mapAsync(1)(uploadBlock(blobClient))
.fold(ListBuffer.empty[String])((xs, x) => xs += x)
.mapAsync(1) { xs =>
import scala.collection.JavaConverters._
blobClient.commitBlockList(xs.asJava).toFuture.toScala
}.toMat(Sink.head)(Keep.right)

val combinedSink = Sink.fromGraph(GraphDSL.create(checksumSink, lengthSink, uploadSink)(Tuple3.apply) {
implicit builder =>
(checksumS, lengthS, uploadS) =>
import GraphDSL.Implicits._
val broadcast = builder.add(Broadcast[ByteString](3))
broadcast.out(0) ~> checksumS
broadcast.out(1) ~> lengthS
broadcast.out(2) ~> uploadS
SinkShape(broadcast.in)
})

Flow[ByteString]
.groupedWeightedWithin(GroupWithinBytes, GroupWithinTime)(_.size)
.map(_.fold(ByteString.empty)(_.concat(_)))
.toMat(combinedSink)(Keep.right)
.mapMaterializedValue { case (checksumF, lengthF, uploadResultF) =>
for {
_ <- uploadResultF
cs <- checksumF
ln <- lengthF
} yield TargetStoreEngine.TargetStoreResult(Uri(blobClient.getBlobUrl), cs, ln)
}
.mapMaterializedValue {
case (futureCs, futureLn) =>
for {
cs <- futureCs
ln <- futureLn
} yield TargetStoreEngine.TargetStoreResult(Uri(blobClient.getBlobUrl), cs, ln)
}.to(Sink.ignore)
}

override def storeStream(repoId: TufDataType.RepoId,
filename: TargetFilename,
Expand Down

0 comments on commit 87e8d57

Please sign in to comment.