diff --git a/reposerver/src/main/scala/com/advancedtelematic/tuf/reposerver/target_store/AzureTargetStoreEngine.scala b/reposerver/src/main/scala/com/advancedtelematic/tuf/reposerver/target_store/AzureTargetStoreEngine.scala index d72baf74..e1dac67f 100644 --- a/reposerver/src/main/scala/com/advancedtelematic/tuf/reposerver/target_store/AzureTargetStoreEngine.scala +++ b/reposerver/src/main/scala/com/advancedtelematic/tuf/reposerver/target_store/AzureTargetStoreEngine.scala @@ -1,37 +1,27 @@ package com.advancedtelematic.tuf.reposerver.target_store -import java.nio.charset.StandardCharsets -import java.nio.ByteBuffer -import java.time.OffsetDateTime -import java.util.{Base64, UUID} - import akka.http.scaladsl.model.Uri import akka.http.scaladsl.util.FastFuture -import akka.stream.scaladsl.{Flow, Framing, Keep, Sink, Source} -import akka.stream.stage.{GraphStage, GraphStageLogic, InHandler} -import akka.stream.{Attributes, Inlet, Materializer, SinkShape} +import akka.stream.scaladsl.{Broadcast, Flow, GraphDSL, Keep, Sink, Source} +import akka.stream.{Materializer, SinkShape} import akka.util.ByteString -import akka.NotUsed -import akka.actor.ActorSystem -import com.advancedtelematic.libats.data.DataType.{Checksum, HashMethod, ValidChecksum} import com.advancedtelematic.libtuf.data.TufDataType import com.advancedtelematic.libtuf.data.TufDataType.{MultipartUploadId, RepoId, TargetFilename} import com.advancedtelematic.libtuf_server.crypto.Sha256Digest import com.advancedtelematic.tuf.reposerver.http.Errors import com.advancedtelematic.tuf.reposerver.target_store.AzureTargetStoreEngine.BlobStorageSettings -import com.azure.core.util.logging.ClientLogger -import com.azure.identity.{ClientSecretCredentialBuilder, DefaultAzureCredentialBuilder} -import com.azure.storage.blob.{BlobAsyncClient, BlobClientBuilder, BlobContainerAsyncClient, BlobContainerClient, BlobContainerClientBuilder, BlobServiceAsyncClient, BlobServiceClientBuilder} -import com.azure.storage.blob.models.BlockBlobItem import com.azure.storage.blob.sas.{BlobSasPermission, BlobServiceSasSignatureValues} import com.azure.storage.blob.specialized.BlockBlobAsyncClient -import com.azure.storage.common.implementation.connectionstring.StorageConnectionString -import org.bouncycastle.util.encoders.Hex +import com.azure.storage.blob.{BlobClientBuilder, BlobContainerAsyncClient, BlobContainerClientBuilder} import org.slf4j.LoggerFactory import reactor.core.publisher.{Flux, Mono} +import java.nio.ByteBuffer +import java.nio.charset.StandardCharsets +import java.time.OffsetDateTime +import java.util.{Base64, UUID} import scala.collection.mutable.ListBuffer -import scala.concurrent.{ExecutionContext, Future, Promise} import scala.concurrent.duration.FiniteDuration +import scala.concurrent.{ExecutionContext, Future} object AzureTargetStoreEngine { @@ -41,9 +31,13 @@ object AzureTargetStoreEngine { class AzureTargetStoreEngine(private val settings: BlobStorageSettings)(implicit mat: Materializer, ec: ExecutionContext) extends TargetStoreEngine { import scala.compat.java8.FutureConverters._ + import scala.concurrent.duration._ private[this] val log = LoggerFactory.getLogger(this.getClass) + private[this] val GroupWithinBytes = 1024 * 1024 * 25 //25Mb + private[this] val GroupWithinTime = 2.seconds + private[this] def containerClientFor(repoId: TufDataType.RepoId): BlobContainerAsyncClient = { new BlobContainerClientBuilder() .connectionString(settings.connectionString) @@ -58,23 +52,41 @@ class AzureTargetStoreEngine(private val settings: BlobStorageSettings)(implicit blobClient.stageBlock(blockId, Flux.just[ByteBuffer](data.toByteBuffer), data.length.toLong).toFuture.toScala.map(_ => blockId) } - def upload(blobClient: BlockBlobAsyncClient): Sink[ByteString, Future[TargetStoreEngine.TargetStoreResult]] = - Flow[ByteString] - .alsoToMat(Sha256Digest.asSink)(Keep.right) - .alsoToMat(Sink.fold(0)(_ + _.length))(Keep.both) + def upload(blobClient: BlockBlobAsyncClient): Sink[ByteString, Future[TargetStoreEngine.TargetStoreResult]] = { + + val checksumSink = Flow[ByteString].toMat(Sha256Digest.asSink)(Keep.right) + val lengthSink = Flow[ByteString].toMat(Sink.fold(0)(_ + _.length))(Keep.right) + val uploadSink = Flow[ByteString] .mapAsync(1)(uploadBlock(blobClient)) .fold(ListBuffer.empty[String])((xs, x) => xs += x) .mapAsync(1) { xs => import scala.collection.JavaConverters._ blobClient.commitBlockList(xs.asJava).toFuture.toScala + }.toMat(Sink.head)(Keep.right) + + val combinedSink = Sink.fromGraph(GraphDSL.create(checksumSink, lengthSink, uploadSink)(Tuple3.apply) { + implicit builder => + (checksumS, lengthS, uploadS) => + import GraphDSL.Implicits._ + val broadcast = builder.add(Broadcast[ByteString](3)) + broadcast.out(0) ~> checksumS + broadcast.out(1) ~> lengthS + broadcast.out(2) ~> uploadS + SinkShape(broadcast.in) + }) + + Flow[ByteString] + .groupedWeightedWithin(GroupWithinBytes, GroupWithinTime)(_.size) + .map(_.fold(ByteString.empty)(_.concat(_))) + .toMat(combinedSink)(Keep.right) + .mapMaterializedValue { case (checksumF, lengthF, uploadResultF) => + for { + _ <- uploadResultF + cs <- checksumF + ln <- lengthF + } yield TargetStoreEngine.TargetStoreResult(Uri(blobClient.getBlobUrl), cs, ln) } - .mapMaterializedValue { - case (futureCs, futureLn) => - for { - cs <- futureCs - ln <- futureLn - } yield TargetStoreEngine.TargetStoreResult(Uri(blobClient.getBlobUrl), cs, ln) - }.to(Sink.ignore) + } override def storeStream(repoId: TufDataType.RepoId, filename: TargetFilename,