Skip to content

Commit

Permalink
Simplified ScheduleManager and renamed it to InboundBatchAggregator
Browse files Browse the repository at this point in the history
  • Loading branch information
agarwal-navin committed Jan 3, 2025
1 parent cd34c30 commit d0f9420
Show file tree
Hide file tree
Showing 9 changed files with 121 additions and 594 deletions.
12 changes: 1 addition & 11 deletions packages/runtime/container-runtime/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -63,16 +63,6 @@
"default": "./dist/deltaScheduler.js"
}
},
"./internal/test/scheduleManager": {
"import": {
"types": "./lib/scheduleManager.d.ts",
"default": "./lib/scheduleManager.js"
},
"require": {
"types": "./dist/scheduleManager.d.ts",
"default": "./dist/scheduleManager.js"
}
},
"./internal/test/blobManager": {
"import": {
"types": "./lib/blobManager/index.d.ts",
Expand Down Expand Up @@ -122,7 +112,7 @@
"build:test": "npm run build:test:esm && npm run build:test:cjs",
"build:test:cjs": "fluid-tsc commonjs --project ./src/test/tsconfig.cjs.json",
"build:test:esm": "tsc --project ./src/test/tsconfig.json",
"check:are-the-types-wrong": "attw --pack . --exclude-entrypoints ./internal/test/containerRuntime ./internal/test/deltaScheduler ./internal/test/scheduleManager ./internal/test/blobManager ./internal/test/summary ./internal/test/gc",
"check:are-the-types-wrong": "attw --pack . --exclude-entrypoints ./internal/test/containerRuntime ./internal/test/deltaScheduler ./internal/test/blobManager ./internal/test/summary ./internal/test/gc",
"check:biome": "biome check .",
"check:exports": "concurrently \"npm:check:exports:*\"",
"check:exports:bundle-release-tags": "api-extractor run --config api-extractor/api-extractor-lint-bundle.json",
Expand Down
25 changes: 17 additions & 8 deletions packages/runtime/container-runtime/src/containerRuntime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@ import {
DeltaManagerPendingOpsProxy,
DeltaManagerSummarizerProxy,
} from "./deltaManagerProxies.js";
import { DeltaScheduler } from "./deltaScheduler.js";
import {
GCNodeType,
GarbageCollector,
Expand All @@ -165,6 +166,7 @@ import {
gcGenerationOptionName,
type GarbageCollectionMessage,
} from "./gc/index.js";
import { InboundBatchAggregator } from "./inboundBatchAggregator.js";
import {
ContainerMessageType,
type ContainerRuntimeDocumentSchemaMessage,
Expand Down Expand Up @@ -198,7 +200,6 @@ import {
IPendingLocalState,
PendingStateManager,
} from "./pendingStateManager.js";
import { ScheduleManager } from "./scheduleManager.js";
import {
DocumentsSchemaController,
EnqueueSummarizeResult,
Expand Down Expand Up @@ -1410,7 +1411,8 @@ export class ContainerRuntime
* It is created only by summarizing container (i.e. one with clientType === "summarizer")
*/
private readonly _summarizer?: Summarizer;
private readonly scheduleManager: ScheduleManager;
private readonly deltaScheduler: DeltaScheduler;
private readonly inboundBatchAggregator: InboundBatchAggregator;
private readonly blobManager: BlobManager;
private readonly pendingStateManager: PendingStateManager;
private readonly duplicateBatchDetector: DuplicateBatchDetector | undefined;
Expand Down Expand Up @@ -1901,12 +1903,17 @@ export class ContainerRuntime
closeContainer: (error?: ICriticalContainerError) => this.closeFn(error),
});

this.scheduleManager = new ScheduleManager(
this.deltaScheduler = new DeltaScheduler(
this.innerDeltaManager,
createChildLogger({ logger: this.logger, namespace: "DeltaScheduler" }),
);

this.inboundBatchAggregator = new InboundBatchAggregator(
this.innerDeltaManager,
this,
() => this.clientId,
createChildLogger({ logger: this.logger, namespace: "ScheduleManager" }),
createChildLogger({ logger: this.logger, namespace: "InboundBatchAggregator" }),
);
this.inboundBatchAggregator.setupListeners();

const disablePartialFlush = this.mc.config.getBoolean(
"Fluid.ContainerRuntime.DisablePartialFlush",
Expand Down Expand Up @@ -2923,7 +2930,7 @@ export class ContainerRuntime
private _processedClientSequenceNumber: number | undefined;

/**
* Processes inbound message(s). It calls schedule manager according to the messages' location in the batch.
* Processes inbound message(s). It calls delta scheduler according to the messages' location in the batch.
* @param messagesWithMetadata - messages to process along with their metadata.
* @param locationInBatch - Are we processing the start and/or end of a batch?
* @param local - true if the messages were originally generated by the client receiving it.
Expand All @@ -2945,7 +2952,8 @@ export class ContainerRuntime
if (locationInBatch.batchStart) {
const firstMessage = messagesWithMetadata[0]?.message;
assert(firstMessage !== undefined, 0xa31 /* Batch must have at least one message */);
this.scheduleManager.batchBegin(firstMessage);
this.deltaScheduler.batchBegin(firstMessage);
this.emit("batchBegin", firstMessage);
}

let error: unknown;
Expand Down Expand Up @@ -3045,7 +3053,8 @@ export class ContainerRuntime
if (locationInBatch.batchEnd) {
const lastMessage = messagesWithMetadata[messagesWithMetadata.length - 1]?.message;
assert(lastMessage !== undefined, 0xa32 /* Batch must have at least one message */);
this.scheduleManager.batchEnd(error, lastMessage);
this.deltaScheduler.batchEnd(lastMessage);
this.emit("batchEnd", error, lastMessage);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,24 +3,18 @@
* Licensed under the MIT License.
*/

import type { EventEmitter } from "@fluid-internal/client-utils";
import { performance } from "@fluid-internal/client-utils";
import { IDeltaManagerFull } from "@fluidframework/container-definitions/internal";
import { assert } from "@fluidframework/core-utils/internal";
import {
IDocumentMessage,
ISequencedDocumentMessage,
} from "@fluidframework/driver-definitions/internal";
import { ISequencedDocumentMessage } from "@fluidframework/driver-definitions/internal";
import { isRuntimeMessage } from "@fluidframework/driver-utils/internal";
import {
ITelemetryLoggerExt,
DataCorruptionError,
DataProcessingError,
createChildLogger,
extractSafePropertiesFromMessage,
} from "@fluidframework/telemetry-utils/internal";

import { DeltaScheduler } from "./deltaScheduler.js";
import { IBatchMetadata } from "./metadata.js";
import { pkgVersion } from "./packageVersion.js";

Expand All @@ -31,46 +25,10 @@ type IRuntimeMessageMetadata =
};

/**
* This class has the following responsibilities:
*
* 1. It tracks batches as we process ops and raises "batchBegin" and "batchEnd" events.
* As part of it, it validates batch correctness (i.e. no system ops in the middle of batch)
*
* 2. It creates instance of ScheduleManagerCore that ensures we never start processing ops from batch
* unless all ops of the batch are in.
*/
export class ScheduleManager {
private readonly deltaScheduler: DeltaScheduler;

constructor(
private readonly deltaManager: IDeltaManagerFull,
private readonly emitter: EventEmitter,
readonly getClientId: () => string | undefined,
private readonly logger: ITelemetryLoggerExt,
) {
this.deltaScheduler = new DeltaScheduler(
this.deltaManager,
createChildLogger({ logger: this.logger, namespace: "DeltaScheduler" }),
);
void new ScheduleManagerCore(deltaManager, getClientId, logger);
}

public batchBegin(message: ISequencedDocumentMessage) {
this.emitter.emit("batchBegin", message);
this.deltaScheduler.batchBegin(message);
}

public batchEnd(error: any | undefined, message: ISequencedDocumentMessage) {
this.emitter.emit("batchEnd", error, message);
this.deltaScheduler.batchEnd(message);
}
}

/**
* This class controls pausing and resuming of inbound queue to ensure that we never
* start processing ops in a batch IF we do not have all ops in the batch.
* This class ensures that we aggregate a complete batch of incoming ops before processing them. It basically ensures
* that we never start processing ops in ab batch IF we do not have all ops in the batch.
*/
class ScheduleManagerCore {
export class InboundBatchAggregator {
private pauseSequenceNumber: number | undefined;
private currentBatchClientId: string | undefined;
private localPaused = false;
Expand All @@ -82,55 +40,29 @@ class ScheduleManagerCore {
private readonly getClientId: () => string | undefined,
private readonly logger: ITelemetryLoggerExt,
) {
// Listen for delta manager sends and add batch metadata to messages
this.deltaManager.on("prepareSend", (messages: IDocumentMessage[]) => {
if (messages.length === 0) {
return;
}

// First message will have the batch flag set to true if doing a batched send
const firstMessageMetadata = messages[0].metadata as IRuntimeMessageMetadata;
if (!firstMessageMetadata?.batch) {
return;
}

// If the batch contains only a single op, clear the batch flag.
if (messages.length === 1) {
delete firstMessageMetadata.batch;
return;
}

// Set the batch flag to false on the last message to indicate the end of the send batch
const lastMessage = messages[messages.length - 1];
// TODO: It's not clear if this shallow clone is required, as opposed to just setting "batch" to false.

lastMessage.metadata = { ...(lastMessage.metadata as any), batch: false };
});
const allPending = this.deltaManager.inbound.toArray();
for (const pending of allPending) {
this.trackPending(pending);
}
}

public setupListeners() {
// Listen for updates and peek at the inbound
this.deltaManager.inbound.on("push", (message: ISequencedDocumentMessage) => {
this.trackPending(message);
});

// Start with baseline - empty inbound queue.
assert(!this.localPaused, 0x293 /* "initial state" */);

const allPending = this.deltaManager.inbound.toArray();
for (const pending of allPending) {
this.trackPending(pending);
}

// We are intentionally directly listening to the "op" to inspect system ops as well.
// If we do not observe system ops, we are likely to hit 0x296 assert when system ops
// If we do not observe system ops, we are likely to hit an error when system ops
// precedes start of incomplete batch.
this.deltaManager.on("op", (message) => this.afterOpProcessing(message));
}

/**
* The only public function in this class - called when we processed an op,
* to make decision if op processing should be paused or not after that.
* This is called when delta manager processes an op to make decision if op processing should
* be paused or not after that.
*/
public afterOpProcessing(message: ISequencedDocumentMessage) {
private afterOpProcessing(message: ISequencedDocumentMessage) {
assert(
!this.localPaused,
0x294 /* "can't have op processing paused if we are processing an op" */,
Expand All @@ -157,7 +89,7 @@ class ScheduleManagerCore {
throw DataProcessingError.create(
// Former assert 0x296
"Incomplete batch",
"ScheduleManager",
"InboundBatchAggregator",
message,
{
type: message.type,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -638,7 +638,7 @@ export class PendingStateManager implements IDisposable {
/**
* We must preserve the distinct batches on resubmit.
* Note: It is not possible for the PendingStateManager to receive a partially acked batch. It will
* either receive the whole batch ack or nothing at all. @see ScheduleManager for how this works.
* either receive the whole batch ack or nothing at all. @see InboundBatchAggregator for how this works.
*/
if (batchMetadataFlag === undefined) {
// Single-message batch
Expand Down
12 changes: 6 additions & 6 deletions packages/runtime/container-runtime/src/test/batching.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -132,14 +132,14 @@ describe("Runtime batching", () => {
* processing the messages in the queue.
*/
function processBatch(batch: ISequencedDocumentMessage[], cr: ContainerRuntime) {
// Push the messages in the inbound queue. This is done because ScheduleManager listens to the "push" event
// Push the messages in the inbound queue. This is done because InboundBatchAggregator listens to the "push" event
// emitted by the inbound queue to do batch validations.
for (const batchMessage of batch) {
mockDeltaManager.inbound.push(batchMessage);
}

// Process the messages in the inbound queue.
// Process is called on the delta manager because ScheduleManager listens to the "op" event on delta manager
// Process is called on the delta manager because InboundBatchAggregator listens to the "op" event on delta manager
// as well to do validation.
// Process is called on the container runtime because it is the one that actually processes the messages and
// has its own set of validations.
Expand Down Expand Up @@ -278,19 +278,19 @@ describe("Runtime batching", () => {
let schedulerBatchBeginStub: sinon.SinonStub;
let schedulerBatchEndStub: sinon.SinonStub;

type ContainerRuntimeWithScheduler = Omit<ContainerRuntime, "scheduleManager"> & {
scheduleManager: { deltaScheduler: DeltaScheduler };
type ContainerRuntimeWithScheduler = Omit<ContainerRuntime, "deltaScheduler"> & {
deltaScheduler: DeltaScheduler;
};

beforeEach(async () => {
const containerRuntimeWithDeltaScheduler =
containerRuntime as unknown as ContainerRuntimeWithScheduler;
schedulerBatchBeginStub = sandbox.stub(
containerRuntimeWithDeltaScheduler.scheduleManager.deltaScheduler,
containerRuntimeWithDeltaScheduler.deltaScheduler,
"batchBegin",
);
schedulerBatchEndStub = sandbox.stub(
containerRuntimeWithDeltaScheduler.scheduleManager.deltaScheduler,
containerRuntimeWithDeltaScheduler.deltaScheduler,
"batchEnd",
);
containerRuntimeStub = patchContainerRuntime(containerRuntime);
Expand Down
Loading

0 comments on commit d0f9420

Please sign in to comment.