diff --git a/packages/runtime/container-runtime/package.json b/packages/runtime/container-runtime/package.json index a36dccb5803f..0cc1cf4ec745 100644 --- a/packages/runtime/container-runtime/package.json +++ b/packages/runtime/container-runtime/package.json @@ -63,16 +63,6 @@ "default": "./dist/deltaScheduler.js" } }, - "./internal/test/scheduleManager": { - "import": { - "types": "./lib/scheduleManager.d.ts", - "default": "./lib/scheduleManager.js" - }, - "require": { - "types": "./dist/scheduleManager.d.ts", - "default": "./dist/scheduleManager.js" - } - }, "./internal/test/blobManager": { "import": { "types": "./lib/blobManager/index.d.ts", @@ -122,7 +112,7 @@ "build:test": "npm run build:test:esm && npm run build:test:cjs", "build:test:cjs": "fluid-tsc commonjs --project ./src/test/tsconfig.cjs.json", "build:test:esm": "tsc --project ./src/test/tsconfig.json", - "check:are-the-types-wrong": "attw --pack . --exclude-entrypoints ./internal/test/containerRuntime ./internal/test/deltaScheduler ./internal/test/scheduleManager ./internal/test/blobManager ./internal/test/summary ./internal/test/gc", + "check:are-the-types-wrong": "attw --pack . --exclude-entrypoints ./internal/test/containerRuntime ./internal/test/deltaScheduler ./internal/test/blobManager ./internal/test/summary ./internal/test/gc", "check:biome": "biome check .", "check:exports": "concurrently \"npm:check:exports:*\"", "check:exports:bundle-release-tags": "api-extractor run --config api-extractor/api-extractor-lint-bundle.json", diff --git a/packages/runtime/container-runtime/src/containerRuntime.ts b/packages/runtime/container-runtime/src/containerRuntime.ts index 2f1a413b5782..d980f879a72d 100644 --- a/packages/runtime/container-runtime/src/containerRuntime.ts +++ b/packages/runtime/container-runtime/src/containerRuntime.ts @@ -156,6 +156,7 @@ import { DeltaManagerPendingOpsProxy, DeltaManagerSummarizerProxy, } from "./deltaManagerProxies.js"; +import { DeltaScheduler } from "./deltaScheduler.js"; import { GCNodeType, GarbageCollector, @@ -165,6 +166,7 @@ import { gcGenerationOptionName, type GarbageCollectionMessage, } from "./gc/index.js"; +import { InboundBatchAggregator } from "./inboundBatchAggregator.js"; import { ContainerMessageType, type ContainerRuntimeDocumentSchemaMessage, @@ -198,7 +200,6 @@ import { IPendingLocalState, PendingStateManager, } from "./pendingStateManager.js"; -import { ScheduleManager } from "./scheduleManager.js"; import { DocumentsSchemaController, EnqueueSummarizeResult, @@ -1410,7 +1411,8 @@ export class ContainerRuntime * It is created only by summarizing container (i.e. one with clientType === "summarizer") */ private readonly _summarizer?: Summarizer; - private readonly scheduleManager: ScheduleManager; + private readonly deltaScheduler: DeltaScheduler; + private readonly inboundBatchAggregator: InboundBatchAggregator; private readonly blobManager: BlobManager; private readonly pendingStateManager: PendingStateManager; private readonly duplicateBatchDetector: DuplicateBatchDetector | undefined; @@ -1901,12 +1903,17 @@ export class ContainerRuntime closeContainer: (error?: ICriticalContainerError) => this.closeFn(error), }); - this.scheduleManager = new ScheduleManager( + this.deltaScheduler = new DeltaScheduler( + this.innerDeltaManager, + createChildLogger({ logger: this.logger, namespace: "DeltaScheduler" }), + ); + + this.inboundBatchAggregator = new InboundBatchAggregator( this.innerDeltaManager, - this, () => this.clientId, - createChildLogger({ logger: this.logger, namespace: "ScheduleManager" }), + createChildLogger({ logger: this.logger, namespace: "InboundBatchAggregator" }), ); + this.inboundBatchAggregator.setupListeners(); const disablePartialFlush = this.mc.config.getBoolean( "Fluid.ContainerRuntime.DisablePartialFlush", @@ -2923,7 +2930,7 @@ export class ContainerRuntime private _processedClientSequenceNumber: number | undefined; /** - * Processes inbound message(s). It calls schedule manager according to the messages' location in the batch. + * Processes inbound message(s). It calls delta scheduler according to the messages' location in the batch. * @param messagesWithMetadata - messages to process along with their metadata. * @param locationInBatch - Are we processing the start and/or end of a batch? * @param local - true if the messages were originally generated by the client receiving it. @@ -2945,7 +2952,8 @@ export class ContainerRuntime if (locationInBatch.batchStart) { const firstMessage = messagesWithMetadata[0]?.message; assert(firstMessage !== undefined, 0xa31 /* Batch must have at least one message */); - this.scheduleManager.batchBegin(firstMessage); + this.deltaScheduler.batchBegin(firstMessage); + this.emit("batchBegin", firstMessage); } let error: unknown; @@ -3045,7 +3053,8 @@ export class ContainerRuntime if (locationInBatch.batchEnd) { const lastMessage = messagesWithMetadata[messagesWithMetadata.length - 1]?.message; assert(lastMessage !== undefined, 0xa32 /* Batch must have at least one message */); - this.scheduleManager.batchEnd(error, lastMessage); + this.deltaScheduler.batchEnd(lastMessage); + this.emit("batchEnd", error, lastMessage); } } } diff --git a/packages/runtime/container-runtime/src/scheduleManager.ts b/packages/runtime/container-runtime/src/inboundBatchAggregator.ts similarity index 73% rename from packages/runtime/container-runtime/src/scheduleManager.ts rename to packages/runtime/container-runtime/src/inboundBatchAggregator.ts index 2a11537a303a..b56b6881872e 100644 --- a/packages/runtime/container-runtime/src/scheduleManager.ts +++ b/packages/runtime/container-runtime/src/inboundBatchAggregator.ts @@ -3,24 +3,18 @@ * Licensed under the MIT License. */ -import type { EventEmitter } from "@fluid-internal/client-utils"; import { performance } from "@fluid-internal/client-utils"; import { IDeltaManagerFull } from "@fluidframework/container-definitions/internal"; import { assert } from "@fluidframework/core-utils/internal"; -import { - IDocumentMessage, - ISequencedDocumentMessage, -} from "@fluidframework/driver-definitions/internal"; +import { ISequencedDocumentMessage } from "@fluidframework/driver-definitions/internal"; import { isRuntimeMessage } from "@fluidframework/driver-utils/internal"; import { ITelemetryLoggerExt, DataCorruptionError, DataProcessingError, - createChildLogger, extractSafePropertiesFromMessage, } from "@fluidframework/telemetry-utils/internal"; -import { DeltaScheduler } from "./deltaScheduler.js"; import { IBatchMetadata } from "./metadata.js"; import { pkgVersion } from "./packageVersion.js"; @@ -31,46 +25,10 @@ type IRuntimeMessageMetadata = }; /** - * This class has the following responsibilities: - * - * 1. It tracks batches as we process ops and raises "batchBegin" and "batchEnd" events. - * As part of it, it validates batch correctness (i.e. no system ops in the middle of batch) - * - * 2. It creates instance of ScheduleManagerCore that ensures we never start processing ops from batch - * unless all ops of the batch are in. - */ -export class ScheduleManager { - private readonly deltaScheduler: DeltaScheduler; - - constructor( - private readonly deltaManager: IDeltaManagerFull, - private readonly emitter: EventEmitter, - readonly getClientId: () => string | undefined, - private readonly logger: ITelemetryLoggerExt, - ) { - this.deltaScheduler = new DeltaScheduler( - this.deltaManager, - createChildLogger({ logger: this.logger, namespace: "DeltaScheduler" }), - ); - void new ScheduleManagerCore(deltaManager, getClientId, logger); - } - - public batchBegin(message: ISequencedDocumentMessage) { - this.emitter.emit("batchBegin", message); - this.deltaScheduler.batchBegin(message); - } - - public batchEnd(error: any | undefined, message: ISequencedDocumentMessage) { - this.emitter.emit("batchEnd", error, message); - this.deltaScheduler.batchEnd(message); - } -} - -/** - * This class controls pausing and resuming of inbound queue to ensure that we never - * start processing ops in a batch IF we do not have all ops in the batch. + * This class ensures that we aggregate a complete batch of incoming ops before processing them. It basically ensures + * that we never start processing ops in ab batch IF we do not have all ops in the batch. */ -class ScheduleManagerCore { +export class InboundBatchAggregator { private pauseSequenceNumber: number | undefined; private currentBatchClientId: string | undefined; private localPaused = false; @@ -82,55 +40,29 @@ class ScheduleManagerCore { private readonly getClientId: () => string | undefined, private readonly logger: ITelemetryLoggerExt, ) { - // Listen for delta manager sends and add batch metadata to messages - this.deltaManager.on("prepareSend", (messages: IDocumentMessage[]) => { - if (messages.length === 0) { - return; - } - - // First message will have the batch flag set to true if doing a batched send - const firstMessageMetadata = messages[0].metadata as IRuntimeMessageMetadata; - if (!firstMessageMetadata?.batch) { - return; - } - - // If the batch contains only a single op, clear the batch flag. - if (messages.length === 1) { - delete firstMessageMetadata.batch; - return; - } - - // Set the batch flag to false on the last message to indicate the end of the send batch - const lastMessage = messages[messages.length - 1]; - // TODO: It's not clear if this shallow clone is required, as opposed to just setting "batch" to false. - - lastMessage.metadata = { ...(lastMessage.metadata as any), batch: false }; - }); + const allPending = this.deltaManager.inbound.toArray(); + for (const pending of allPending) { + this.trackPending(pending); + } + } + public setupListeners() { // Listen for updates and peek at the inbound this.deltaManager.inbound.on("push", (message: ISequencedDocumentMessage) => { this.trackPending(message); }); - // Start with baseline - empty inbound queue. - assert(!this.localPaused, 0x293 /* "initial state" */); - - const allPending = this.deltaManager.inbound.toArray(); - for (const pending of allPending) { - this.trackPending(pending); - } - // We are intentionally directly listening to the "op" to inspect system ops as well. - // If we do not observe system ops, we are likely to hit 0x296 assert when system ops + // If we do not observe system ops, we are likely to hit an error when system ops // precedes start of incomplete batch. this.deltaManager.on("op", (message) => this.afterOpProcessing(message)); } /** - * The only public function in this class - called when we processed an op, - * to make decision if op processing should be paused or not after that. + * This is called when delta manager processes an op to make decision if op processing should + * be paused or not after that. */ - public afterOpProcessing(message: ISequencedDocumentMessage) { + private afterOpProcessing(message: ISequencedDocumentMessage) { assert( !this.localPaused, 0x294 /* "can't have op processing paused if we are processing an op" */, @@ -157,7 +89,7 @@ class ScheduleManagerCore { throw DataProcessingError.create( // Former assert 0x296 "Incomplete batch", - "ScheduleManager", + "InboundBatchAggregator", message, { type: message.type, diff --git a/packages/runtime/container-runtime/src/pendingStateManager.ts b/packages/runtime/container-runtime/src/pendingStateManager.ts index 69b9ece6c000..30cb046e194b 100644 --- a/packages/runtime/container-runtime/src/pendingStateManager.ts +++ b/packages/runtime/container-runtime/src/pendingStateManager.ts @@ -638,7 +638,7 @@ export class PendingStateManager implements IDisposable { /** * We must preserve the distinct batches on resubmit. * Note: It is not possible for the PendingStateManager to receive a partially acked batch. It will - * either receive the whole batch ack or nothing at all. @see ScheduleManager for how this works. + * either receive the whole batch ack or nothing at all. @see InboundBatchAggregator for how this works. */ if (batchMetadataFlag === undefined) { // Single-message batch diff --git a/packages/runtime/container-runtime/src/test/batching.spec.ts b/packages/runtime/container-runtime/src/test/batching.spec.ts index 639f6511f810..225f664b8fd0 100644 --- a/packages/runtime/container-runtime/src/test/batching.spec.ts +++ b/packages/runtime/container-runtime/src/test/batching.spec.ts @@ -132,14 +132,14 @@ describe("Runtime batching", () => { * processing the messages in the queue. */ function processBatch(batch: ISequencedDocumentMessage[], cr: ContainerRuntime) { - // Push the messages in the inbound queue. This is done because ScheduleManager listens to the "push" event + // Push the messages in the inbound queue. This is done because InboundBatchAggregator listens to the "push" event // emitted by the inbound queue to do batch validations. for (const batchMessage of batch) { mockDeltaManager.inbound.push(batchMessage); } // Process the messages in the inbound queue. - // Process is called on the delta manager because ScheduleManager listens to the "op" event on delta manager + // Process is called on the delta manager because InboundBatchAggregator listens to the "op" event on delta manager // as well to do validation. // Process is called on the container runtime because it is the one that actually processes the messages and // has its own set of validations. @@ -278,19 +278,19 @@ describe("Runtime batching", () => { let schedulerBatchBeginStub: sinon.SinonStub; let schedulerBatchEndStub: sinon.SinonStub; - type ContainerRuntimeWithScheduler = Omit & { - scheduleManager: { deltaScheduler: DeltaScheduler }; + type ContainerRuntimeWithScheduler = Omit & { + deltaScheduler: DeltaScheduler; }; beforeEach(async () => { const containerRuntimeWithDeltaScheduler = containerRuntime as unknown as ContainerRuntimeWithScheduler; schedulerBatchBeginStub = sandbox.stub( - containerRuntimeWithDeltaScheduler.scheduleManager.deltaScheduler, + containerRuntimeWithDeltaScheduler.deltaScheduler, "batchBegin", ); schedulerBatchEndStub = sandbox.stub( - containerRuntimeWithDeltaScheduler.scheduleManager.deltaScheduler, + containerRuntimeWithDeltaScheduler.deltaScheduler, "batchEnd", ); containerRuntimeStub = patchContainerRuntime(containerRuntime); diff --git a/packages/runtime/container-runtime/src/test/scheduleManager.spec.ts b/packages/runtime/container-runtime/src/test/scheduleManager.spec.ts deleted file mode 100644 index be594207815c..000000000000 --- a/packages/runtime/container-runtime/src/test/scheduleManager.spec.ts +++ /dev/null @@ -1,424 +0,0 @@ -/*! - * Copyright (c) Microsoft Corporation and contributors. All rights reserved. - * Licensed under the MIT License. - */ - -import { strict as assert } from "assert"; - -import { EventEmitter } from "@fluid-internal/client-utils"; -import { - MessageType, - ISequencedDocumentMessage, -} from "@fluidframework/driver-definitions/internal"; -import { createChildLogger } from "@fluidframework/telemetry-utils/internal"; -import { MockDeltaManager } from "@fluidframework/test-runtime-utils/internal"; - -import { asBatchMetadata } from "../metadata.js"; -import { ScheduleManager } from "../scheduleManager.js"; - -describe("ScheduleManager", () => { - describe("Batch processing events", () => { - let batchBegin: number = 0; - let batchEnd: number = 0; - let sequenceNumber: number = 0; - let emitter: EventEmitter; - let deltaManager: MockDeltaManager; - let scheduleManager: ScheduleManager; - const testClientId = "test-client"; - let batchClientId: string | undefined; - - beforeEach(() => { - emitter = new EventEmitter(); - deltaManager = new MockDeltaManager(); - deltaManager.inbound.processCallback = (message: ISequencedDocumentMessage) => { - // Simulate batch accumulation by container runtime's remote message processor. - // It saves batch messages until the entire batch is received. It calls batch begin - // and end on schedule manager accordingly. - const batchMetadataFlag = asBatchMetadata(message.metadata)?.batch; - if (batchMetadataFlag === true) { - assert( - batchClientId === undefined, - "Received batch message while another batch is in progress", - ); - scheduleManager.batchBegin(message); - batchClientId = message.clientId ?? undefined; - } else if (batchMetadataFlag === false) { - assert(batchClientId !== undefined, "Received batch end message without batch"); - scheduleManager.batchEnd(undefined /* error */, message); - batchClientId = undefined; - } else if (batchClientId === undefined) { - scheduleManager.batchBegin(message); - scheduleManager.batchEnd(undefined /* error */, message); - } - deltaManager.emit("op", message); - }; - scheduleManager = new ScheduleManager( - deltaManager, - emitter, - () => testClientId, - createChildLogger({ namespace: "fluid:testScheduleManager" }), - ); - - emitter.on("batchBegin", () => { - // When we receive a "batchBegin" event, we should not have any outstanding - // events, i.e., batchBegin and batchEnd should be equal. - assert.strictEqual( - batchBegin, - batchEnd, - "Received batchBegin before previous batchEnd", - ); - batchBegin++; - }); - - emitter.on("batchEnd", () => { - batchEnd++; - // Every "batchEnd" event should correspond to a "batchBegin" event, i.e., - // batchBegin and batchEnd should be equal. - assert.strictEqual( - batchBegin, - batchEnd, - "Received batchEnd without corresponding batchBegin", - ); - }); - }); - - afterEach(() => { - batchBegin = 0; - batchEnd = 0; - sequenceNumber = 0; - }); - - /** - * Pushes single op to the inbound queue. Adds proper sequence numbers to them - */ - function pushOp(partialMessage: Partial) { - sequenceNumber++; - const message = { ...partialMessage, sequenceNumber }; - deltaManager.inbound.push(message as ISequencedDocumentMessage); - } - - /** - * awaits until all ops that could be processed are processed. - */ - async function processOps() { - const inbound = deltaManager.inbound; - while (!inbound.paused && inbound.length > 0) { - await Promise.resolve(); - } - } - - it("Single non-batch message", async () => { - const message: Partial = { - clientId: testClientId, - type: MessageType.Operation, - }; - - // Send a non-batch message. - pushOp(message); - - await processOps(); - - assert.strictEqual(deltaManager.inbound.length, 0, "Did not process all ops"); - assert.strictEqual(batchBegin, 1, "Did not receive correct batchBegin events"); - assert.strictEqual(batchEnd, 1, "Did not receive correct batchEnd events"); - }); - - it("Multiple non-batch messages", async () => { - const message: Partial = { - clientId: testClientId, - type: MessageType.Operation, - }; - - // Sent 5 non-batch messages. - pushOp(message); - pushOp(message); - pushOp(message); - pushOp(message); - pushOp(message); - - await processOps(); - - assert.strictEqual(deltaManager.inbound.length, 0, "Did not process all ops"); - assert.strictEqual(batchBegin, 5, "Did not receive correct batchBegin events"); - assert.strictEqual(batchEnd, 5, "Did not receive correct batchEnd events"); - }); - - it("Message with non batch-related metadata", async () => { - const message: Partial = { - clientId: testClientId, - type: MessageType.Operation, - metadata: { foo: 1 }, - }; - - pushOp(message); - await processOps(); - - // We should have a "batchBegin" and a "batchEnd" event for the batch. - assert.strictEqual(deltaManager.inbound.length, 0, "Did not process all ops"); - assert.strictEqual( - batchBegin, - 1, - "Did not receive correct batchBegin event for the batch", - ); - assert.strictEqual(batchEnd, 1, "Did not receive correct batchEnd event for the batch"); - }); - - it("Messages in a single batch", async () => { - const batchBeginMessage: Partial = { - clientId: testClientId, - type: MessageType.Operation, - metadata: { batch: true }, - }; - - const batchMessage: Partial = { - clientId: testClientId, - type: MessageType.Operation, - }; - - const batchEndMessage: Partial = { - clientId: testClientId, - type: MessageType.Operation, - metadata: { batch: false }, - }; - - // Send a batch with 4 messages. - pushOp(batchBeginMessage); - pushOp(batchMessage); - pushOp(batchMessage); - - await processOps(); - assert.strictEqual( - deltaManager.inbound.length, - 3, - "Some of partial batch ops were processed", - ); - - pushOp(batchEndMessage); - await processOps(); - - // We should have only received one "batchBegin" and one "batchEnd" event for the batch. - assert.strictEqual(deltaManager.inbound.length, 0, "Did not process all ops"); - assert.strictEqual( - batchBegin, - 1, - "Did not receive correct batchBegin event for the batch", - ); - assert.strictEqual(batchEnd, 1, "Did not receive correct batchEnd event for the batch"); - }); - - it("two batches", async () => { - const batchBeginMessage: Partial = { - clientId: testClientId, - type: MessageType.Operation, - metadata: { batch: true }, - }; - - const batchMessage: Partial = { - clientId: testClientId, - type: MessageType.Operation, - }; - - const batchEndMessage: Partial = { - clientId: testClientId, - type: MessageType.Operation, - metadata: { batch: false }, - }; - - // Pause to not allow ops to be processed while we accumulated them. - await deltaManager.inbound.pause(); - - // Send a batch with 4 messages. - pushOp(batchBeginMessage); - pushOp(batchMessage); - pushOp(batchMessage); - pushOp(batchEndMessage); - - // Add incomplete batch - pushOp(batchBeginMessage); - pushOp(batchMessage); - pushOp(batchMessage); - - assert.strictEqual( - deltaManager.inbound.length, - 7, - "none of the batched ops are processed yet", - ); - - void deltaManager.inbound.resume(); - await processOps(); - - assert.strictEqual( - deltaManager.inbound.length, - 3, - "none of the second batch ops are processed yet", - ); - assert.strictEqual( - batchBegin, - 1, - "Did not receive correct batchBegin event for the batch", - ); - assert.strictEqual(batchEnd, 1, "Did not receive correct batchEnd event for the batch"); - - // End the batch - all ops should be processed. - pushOp(batchEndMessage); - await processOps(); - - assert.strictEqual(deltaManager.inbound.length, 0, "processed all ops"); - assert.strictEqual( - batchBegin, - 2, - "Did not receive correct batchBegin event for the batch", - ); - assert.strictEqual(batchEnd, 2, "Did not receive correct batchEnd event for the batch"); - }); - - it("non-batched ops followed by batch", async () => { - const batchBeginMessage: Partial = { - clientId: testClientId, - type: MessageType.Operation, - metadata: { batch: true }, - }; - - const batchMessage: Partial = { - clientId: testClientId, - type: MessageType.Operation, - }; - - const batchEndMessage: Partial = { - clientId: testClientId, - type: MessageType.Operation, - metadata: { batch: false }, - }; - - // Pause to not allow ops to be processed while we accumulated them. - await deltaManager.inbound.pause(); - - // Send a batch with 2 messages. - pushOp(batchMessage); - pushOp(batchMessage); - - // Add incomplete batch - pushOp(batchBeginMessage); - pushOp(batchMessage); - pushOp(batchMessage); - - await processOps(); - - assert.strictEqual( - deltaManager.inbound.length, - 5, - "none of the batched ops are processed yet", - ); - - void deltaManager.inbound.resume(); - await processOps(); - - assert.strictEqual( - deltaManager.inbound.length, - 3, - "none of the second batch ops are processed yet", - ); - - // End the batch - all ops should be processed. - pushOp(batchEndMessage); - await processOps(); - - assert.strictEqual(deltaManager.inbound.length, 0, "processed all ops"); - assert.strictEqual( - batchBegin, - 3, - "Did not receive correct batchBegin event for the batch", - ); - assert.strictEqual(batchEnd, 3, "Did not receive correct batchEnd event for the batch"); - }); - - function testWrongBatches() { - const clientId1: string = "test-client-1"; - const clientId2: string = "test-client-2"; - - const batchBeginMessage: Partial = { - clientId: clientId1, - type: MessageType.Operation, - metadata: { batch: true }, - }; - - const batchMessage: Partial = { - clientId: clientId1, - type: MessageType.Operation, - }; - - const messagesToFail: Partial[] = [ - // System op from same client - { - clientId: clientId1, - type: MessageType.NoOp, - }, - - // Batch messages interleaved with a batch begin message from same client - batchBeginMessage, - - // Send a message from another client. This should result in a a violation! - { - clientId: clientId2, - type: MessageType.Operation, - }, - - // Send a message from another client with non batch-related metadata. This should result - // in a "batchEnd" event for the previous batch since the client id changes. Also, we - // should get a "batchBegin" and a "batchEnd" event for the new client. - { - clientId: clientId2, - type: MessageType.Operation, - metadata: { foo: 1 }, - }, - - // Send a batch from another client. This should result in a "batchEnd" event for the - // previous batch since the client id changes. Also, we should get one "batchBegin" and - // one "batchEnd" event for the batch from the new client. - { - clientId: clientId2, - type: MessageType.Operation, - metadata: { batch: true }, - }, - ]; - - let counter = 0; - for (const messageToFail of messagesToFail) { - counter++; - it(`Partial batch messages, case ${counter}`, async () => { - // Send a batch with 3 messages from first client but don't send batch end message. - pushOp(batchBeginMessage); - pushOp(batchMessage); - pushOp(batchMessage); - - await processOps(); - assert.strictEqual( - deltaManager.inbound.length, - 3, - "Some of partial batch ops were processed", - ); - - assert.throws(() => pushOp(messageToFail)); - - assert.strictEqual( - deltaManager.inbound.length, - 4, - "Some of batch ops were processed", - ); - assert.strictEqual( - batchBegin, - 0, - "Did not receive correct batchBegin event for the batch", - ); - assert.strictEqual( - batchEnd, - 0, - "Did not receive correct batchBegin event for the batch", - ); - }); - } - } - - testWrongBatches(); - }); -}); diff --git a/packages/test/functional-tests/package.json b/packages/test/functional-tests/package.json index 28cb60d360b6..3543d147b2ff 100644 --- a/packages/test/functional-tests/package.json +++ b/packages/test/functional-tests/package.json @@ -68,8 +68,10 @@ "@fluidframework/build-common": "^2.0.3", "@fluidframework/build-tools": "^0.51.0", "@fluidframework/cell": "workspace:~", + "@fluidframework/container-definitions": "workspace:~", "@fluidframework/container-loader": "workspace:~", "@fluidframework/container-runtime": "workspace:~", + "@fluidframework/container-runtime-definitions": "workspace:~", "@fluidframework/datastore-definitions": "workspace:~", "@fluidframework/driver-definitions": "workspace:~", "@fluidframework/eslint-config-fluid": "^5.6.0", diff --git a/packages/test/functional-tests/src/test/containerRuntime.spec.ts b/packages/test/functional-tests/src/test/containerRuntime.spec.ts index 8b645e6f5fc3..7200c7937116 100644 --- a/packages/test/functional-tests/src/test/containerRuntime.spec.ts +++ b/packages/test/functional-tests/src/test/containerRuntime.spec.ts @@ -9,26 +9,37 @@ import { MockDocumentDeltaConnection, MockDocumentService, } from "@fluid-private/test-loader-utils"; +import { + AttachState, + IContainerContext, + ICriticalContainerError, + IRuntime, +} from "@fluidframework/container-definitions/internal"; // eslint-disable-next-line import/no-internal-modules import { ConnectionManager } from "@fluidframework/container-loader/internal/test/connectionManager"; // eslint-disable-next-line import/no-internal-modules import { IConnectionManagerFactoryArgs } from "@fluidframework/container-loader/internal/test/contracts"; // eslint-disable-next-line import/no-internal-modules import { DeltaManager } from "@fluidframework/container-loader/internal/test/deltaManager"; +import { + ContainerMessageType, + loadContainerRuntime, +} from "@fluidframework/container-runtime/internal"; // eslint-disable-next-line import/no-internal-modules import { DeltaScheduler } from "@fluidframework/container-runtime/internal/test/deltaScheduler"; -// ADO:1981 // eslint-disable-next-line import/no-internal-modules -import { ScheduleManager } from "@fluidframework/container-runtime/internal/test/scheduleManager"; +import { IContainerRuntime } from "@fluidframework/container-runtime-definitions/internal"; import { IClient } from "@fluidframework/driver-definitions"; import { ISequencedDocumentSystemMessage, MessageType, ISequencedDocumentMessage, } from "@fluidframework/driver-definitions/internal"; -import { createChildLogger } from "@fluidframework/telemetry-utils/internal"; -import events_pkg from "events_pkg"; -const { EventEmitter } = events_pkg; +import { + createChildLogger, + mixinMonitoringContext, +} from "@fluidframework/telemetry-utils/internal"; +import { MockAudience, MockQuorumClients } from "@fluidframework/test-runtime-utils/internal"; describe("Container Runtime", () => { /** @@ -38,13 +49,32 @@ describe("Container Runtime", () => { */ describe("Async op processing", () => { let deltaManager: DeltaManager; - let scheduleManager: ScheduleManager; let deltaConnection: MockDocumentDeltaConnection; + let containerRuntime: IContainerRuntime & IRuntime; let seq: number; const docId = "docId"; let batchBegin: number = 0; let batchEnd: number = 0; - let batchClientId: string | undefined; + + // Create a mock container context to be used with container runtime. + const getMockContext = ( + dm: DeltaManager, + ): Partial => { + const mockContext = { + attachState: AttachState.Attached, + deltaManager: dm, + audience: new MockAudience(), + quorum: new MockQuorumClients(), + taggedLogger: mixinMonitoringContext(createChildLogger({})).logger, + clientDetails: { capabilities: { interactive: true } }, + closeFn: (_error?: ICriticalContainerError): void => {}, + updateDirtyContainerState: (_dirty: boolean) => {}, + getLoadedFromVersion: () => undefined, + clientId: "test-client-1", + connected: true, + }; + return mockContext; + }; const startDeltaManager = async (): Promise => new Promise((resolve) => { @@ -73,6 +103,11 @@ describe("Container Runtime", () => { minimumSequenceNumber: 0, sequenceNumber: seq++, type: MessageType.Operation, + // Use Rejoin message type to avoid processing the op. Rejoin is a no-op in container runtime. + contents: { + type: ContainerMessageType.Rejoin, + contents: "", + }, }; messages.push(message); } @@ -82,41 +117,14 @@ describe("Container Runtime", () => { // Function to process an inbound op. It adds delay to simulate time taken in processing an op. function processOp(message: ISequencedDocumentMessage): void { - // Simulate batch accumulation by container runtime's remote message processor. - // It saves batch messages until the entire batch is received. It calls batch begin - // and end on schedule manager accordingly. - const batchMetadataFlag = (message.metadata as { batch?: boolean } | undefined)?.batch; - let batchStart: boolean = false; - let batchComplete: boolean = false; - if (batchMetadataFlag === true) { - assert( - batchClientId === undefined, - "Received batch message while another batch is in progress", - ); - batchClientId = message.clientId ?? undefined; - batchStart = true; - } else if (batchMetadataFlag === false) { - assert(batchClientId !== undefined, "Received batch end message without batch"); - batchClientId = undefined; - batchComplete = true; - } else if (batchClientId === undefined) { - batchStart = true; - batchComplete = true; - } - - if (batchStart) { - scheduleManager.batchBegin(message); - } - - // Add delay such that each op takes greater than the DeltaScheduler's processing time to process. + // Add delay to container runtime's op processing such that each op takes greater than the + // DeltaScheduler's processing time to process. const processingDelay = DeltaScheduler.processingTime + 10; - const startTime = Date.now(); - while (Date.now() - startTime < processingDelay) {} - - if (batchComplete) { - scheduleManager.batchEnd(undefined /* error */, message); - } - + containerRuntime.once("op", () => { + const startTime = Date.now(); + while (Date.now() - startTime < processingDelay) {} + }); + containerRuntime.process(message, false); deltaManager.emit("op", message); } @@ -144,15 +152,19 @@ describe("Container Runtime", () => { ), ); - const emitter = new EventEmitter(); - scheduleManager = new ScheduleManager( - deltaManager, - emitter, - () => "test-client", // clientId, - createChildLogger({ namespace: "fluid:testScheduleManager" }), - ); + const mockProvideEntryPoint = async () => ({ + myProp: "myValue", + }); + containerRuntime = await loadContainerRuntime({ + context: getMockContext(deltaManager) as IContainerContext, + registryEntries: [], + existing: true, + runtimeOptions: {}, + provideEntryPoint: mockProvideEntryPoint, + }); + assert(containerRuntime !== undefined, "Container runtime should be defined"); - emitter.on("batchBegin", () => { + containerRuntime.on("batchBegin", () => { // When we receive a "batchBegin" event, we should not have any outstanding // events, i.e., batchBegin and batchEnd should be equal. assert.strictEqual( @@ -163,7 +175,7 @@ describe("Container Runtime", () => { batchBegin++; }); - emitter.on("batchEnd", () => { + containerRuntime.on("batchEnd", () => { batchEnd++; // Every "batchEnd" event should correspond to a "batchBegin" event, i.e., // batchBegin and batchEnd should be equal. @@ -204,8 +216,8 @@ describe("Container Runtime", () => { // Batch messages are processed in a single turn. So, we should have received the batch events. assert.strictEqual( - 1, batchBegin, + 1, "Did not receive correct batchBegin event for the batch", ); assert.strictEqual(1, batchEnd, "Did not receive correct batchEnd event for the batch"); @@ -239,13 +251,13 @@ describe("Container Runtime", () => { // We should have received all the batch events. assert.strictEqual( - count, batchBegin, + count, "Did not receive correct batchBegin event for the batch", ); assert.strictEqual( - count, batchEnd, + count, "Did not receive correct batchEnd event for the batch", ); }); @@ -267,11 +279,11 @@ describe("Container Runtime", () => { // We should have received the batch events for the non-batch message in the first turn. assert.strictEqual( - 1, batchBegin, + 1, "Did not receive correct batchBegin event for the batch", ); - assert.strictEqual(1, batchEnd, "Did not receive correct batchEnd event for the batch"); + assert.strictEqual(batchEnd, 1, "Did not receive correct batchEnd event for the batch"); // Yield the event loop so that the batch messages can be processed. await yieldEventLoop(); @@ -279,11 +291,11 @@ describe("Container Runtime", () => { // We should have now received the batch events for the batch ops since they would have processed in // a single turn. assert.strictEqual( - 2, batchBegin, + 2, "Did not receive correct batchBegin event for the batch", ); - assert.strictEqual(2, batchEnd, "Did not receive correct batchEnd event for the batch"); + assert.strictEqual(batchEnd, 2, "Did not receive correct batchEnd event for the batch"); }); it(`Batch messages followed by a non-batch message that take longer than @@ -303,11 +315,11 @@ describe("Container Runtime", () => { // We should have received the batch events for the batch messages in the first turn. assert.strictEqual( - 1, batchBegin, + 1, "Did not receive correct batchBegin event for the batch", ); - assert.strictEqual(1, batchEnd, "Did not receive correct batchEnd event for the batch"); + assert.strictEqual(batchEnd, 1, "Did not receive correct batchEnd event for the batch"); // Yield the event loop so that the single non-batch op can be processed. await yieldEventLoop(); @@ -315,11 +327,11 @@ describe("Container Runtime", () => { // We should have now received the batch events for the non-batch op since it would have processed in // a single turn. assert.strictEqual( - 2, batchBegin, + 2, "Did not receive correct batchBegin event for the batch", ); - assert.strictEqual(2, batchEnd, "Did not receive correct batchEnd event for the batch"); + assert.strictEqual(batchEnd, 2, "Did not receive correct batchEnd event for the batch"); }); it("Reconnects after receiving a leave op", async () => { diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index ae203b8374d2..91f12db124d8 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -13488,12 +13488,18 @@ importers: '@fluidframework/cell': specifier: workspace:~ version: link:../../dds/cell + '@fluidframework/container-definitions': + specifier: workspace:~ + version: link:../../common/container-definitions '@fluidframework/container-loader': specifier: workspace:~ version: link:../../loader/container-loader '@fluidframework/container-runtime': specifier: workspace:~ version: link:../../runtime/container-runtime + '@fluidframework/container-runtime-definitions': + specifier: workspace:~ + version: link:../../runtime/container-runtime-definitions '@fluidframework/datastore-definitions': specifier: workspace:~ version: link:../../runtime/datastore-definitions