Skip to content

Commit

Permalink
Implement quantized stream upload #6
Browse files Browse the repository at this point in the history
  • Loading branch information
iShafayet committed Aug 29, 2022
1 parent acd78a6 commit 6b73189
Show file tree
Hide file tree
Showing 7 changed files with 142 additions and 12 deletions.
8 changes: 6 additions & 2 deletions src/component/page/ExplorePage/FileUploadModal.svelte
Original file line number Diff line number Diff line change
Expand Up @@ -328,8 +328,12 @@
<span slot="label">Streaming</span>
</FormField>
<FormField>
<Radio bind:group={selectedUploadMethod} value={"vfs"} touch />
<span slot="label">Virtual File System</span>
<Radio
bind:group={selectedUploadMethod}
value={"chunkedStream"}
touch
/>
<span slot="label">Quantized Streams (Multi-request)</span>
</FormField>
</div>
{/if}
Expand Down
3 changes: 3 additions & 0 deletions src/constant/common-constants.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
import { BLOB_CHUNK_SIZE_INCLUDING_TAG_BYTES } from "./crypto-specs.js";

export const CommonConstant = {
APP_VERSION: "0.0.1",
COPYRIGHT: "Sayem Shafayet",
COPYRIGHT_HREF: "https://ishafayet.me",
DEFAULT_SERVER_URL: "http://localhost:9041",
PACKET_SIZE_FOR_QUANTIZED_STREAMS: 100 * BLOB_CHUNK_SIZE_INCLUDING_TAG_BYTES,
};
22 changes: 22 additions & 0 deletions src/integration/blob-apis.ts
Original file line number Diff line number Diff line change
Expand Up @@ -66,3 +66,25 @@ export const callBlobReadBasicApi = async (
progressNotifierFn
);
};

export const callBlobWriteQuantizedApi = async (
bucketId,
fileId,
sourceContentLength: number,
data: ArrayBuffer,
cryptoHeaderContent: string,
progressNotifierFn: Function,
blobId,
offset,
shouldEnd
) => {
return await callPostArrayBufferUploadApi(
_storedSession.serverUrl,
_storedSession.apiKey,
`/api/blob/write-quantized/${bucketId}/${fileId}/${blobId}/${offset}/${shouldEnd}`,
sourceContentLength,
data,
cryptoHeaderContent,
progressNotifierFn
);
};
2 changes: 1 addition & 1 deletion src/lib/crypto-transit-basic.ts
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ export const encryptAndUploadFile = async (
bucketId,
fileId,
file.size,
encryptedBuffer,
encryptedBuffer.buffer,
cryptoHeader,
progressNotifierFn
);
Expand Down
107 changes: 100 additions & 7 deletions src/lib/crypto-transit-stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import {
} from "../utility/crypto-api-utils.js";
import {
callBlobReadStreamApi,
callBlobWriteQuantizedApi,
callBlobWriteStreamApi,
} from "../integration/blob-apis.js";
import {
Expand All @@ -21,11 +22,12 @@ import {
} from "../utility/stream-and-buffer-utils.js";
import {
BLOB_CHUNK_SIZE_BYTES,
BLOB_CHUNK_SIZE_INCLUDING_TAG_BYTES,
ENCRYPTION_TAGLENGTH_IN_BITS,
IV_LENGTH,
SALT_LENGTH,
} from "./crypto-specs.js";
import { handleErrorIfAny } from "./error-handling.js";
} from "../constant/crypto-specs.js";
import { CodedError, handleErrorIfAny } from "./error-handling.js";

import streamSaver from "streamsaver";
import { CommonConstant } from "../constant/common-constants.js";
Expand All @@ -51,12 +53,12 @@ const createEncryptedPseudoTransformStream = async (

let inputStream: ReadableStream = file.stream() as any;
// let inputStreamReader = inputStream.getReader();
let meteredBytedReader = new MeteredByteStreamReader(inputStream);
let meteredByteReader = new MeteredByteStreamReader(inputStream, "PlaintextStreamForEncryption");

// Note: We are not using transform streams due to a lack of browser support.
return new ReadableStream({
async pull(controller) {
const { value: chunk, done } = await meteredBytedReader.readBytes(
const { value: chunk, done } = await meteredByteReader.readBytes(
BLOB_CHUNK_SIZE_BYTES
);

Expand Down Expand Up @@ -111,6 +113,97 @@ export const encryptAndUploadFile = async (
return response;
};

const collectChunksAndUploadFromStream = async (
bucketId,
fileId,
fileSize,
encryptedDataStream: ReadableStream<any>,
cryptoHeader: string
) => {
const packetSize = CommonConstant.PACKET_SIZE_FOR_QUANTIZED_STREAMS;

let meteredByteReader = new MeteredByteStreamReader(
encryptedDataStream,
"EncryptedStreamForPacketing"
);

let blobId = null;
let startingOffset = 0;
while (true) {
const { value: packet, done } = await meteredByteReader.readBytes(
packetSize
);

if (done) return true;

let shouldEnd = packet.length < packetSize;
const progressNotifierFnOverride = (total, encrypted, uploaded) => {
console.log("PACKET PROGRESS", total, encrypted, uploaded);
};

let response = await callBlobWriteQuantizedApi(
bucketId,
fileId,
Math.min(packetSize, packet.length),
packet.buffer,
cryptoHeader,
progressNotifierFnOverride,
blobId,
startingOffset,
shouldEnd
);

if (await handleErrorIfAny(response)) return null;

if (!response.blobId) {
throw new CodedError(
"BLOBID_NOT_GIVEN",
"BlobId was not propagated by server"
);
}
blobId = response.blobId;

startingOffset += Math.min(packetSize, packet.length);
}

return true;
};

export const encryptAndUploadFileAsChunkedStream = async (
bucketId: string,
fileId: string,
file: File,
bucketPassword: string,
progressNotifierFn: Function
) => {
let cipherProps = await createCipherProperties(bucketPassword);

let encryptedDataStream = await createEncryptedPseudoTransformStream(
file,
cipherProps,
progressNotifierFn,
bucketPassword
);

let iv = convertSmallUint8ArrayToString(cipherProps.iv);
let salt = convertSmallUint8ArrayToString(cipherProps.salt);
let cryptoHeader = buildCryptoHeader(iv, salt);

let inputStream: ReadableStream = file.stream() as any;

let response = await collectChunksAndUploadFromStream(
bucketId,
fileId,
file.size,
encryptedDataStream,
cryptoHeader
);

progressNotifierFn(file.size, file.size, file.size);

return response;
};

const createDeryptedPseudoTransformStream = async (
inputStream: ReadableStream,
{ iv, key },
Expand All @@ -121,13 +214,13 @@ const createDeryptedPseudoTransformStream = async (
let bytesRead = 0;
progressNotifierFn(totalBytes, 0, 0);

let meteredBytedReader = new MeteredByteStreamReader(inputStream);
let meteredByteReader = new MeteredByteStreamReader(inputStream, "EncryptedStreamForDecryption");

// Note: We are not using transform streams due to a lack of browser support.
return new ReadableStream({
async pull(controller) {
const { value: chunk, done } = await meteredBytedReader.readBytes(
BLOB_CHUNK_SIZE_BYTES + ENCRYPTION_TAGLENGTH_IN_BITS / 8
const { value: chunk, done } = await meteredByteReader.readBytes(
BLOB_CHUNK_SIZE_INCLUDING_TAG_BYTES
);

if (done) {
Expand Down
8 changes: 8 additions & 0 deletions src/lib/crypto-transit.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,14 @@ export const encryptAndUploadFile = async (
bucketPassword,
progressNotifierFn
);
} else if (uploadMethod === "chunkedStream") {
return await cryptoTransitStream.encryptAndUploadFileAsChunkedStream(
bucketId,
fileId,
file,
bucketPassword,
progressNotifierFn
);
}

return null;
Expand Down
4 changes: 2 additions & 2 deletions src/utility/stream-and-buffer-utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,9 @@ export class MeteredByteStreamReader {

private remainingChunkOffset: number = 0;
private remainingChunk: Uint8Array = null;
id: number;
id: string;

constructor(readableStream: ReadableStream, id) {
constructor(readableStream: ReadableStream, id: string) {
this.readableStream = readableStream;
this.reader = this.readableStream.getReader();
this.id = id;
Expand Down

0 comments on commit 6b73189

Please sign in to comment.