Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added test cases for enableQueryControl flag in FeedOptions #32499

Merged
merged 14 commits into from
Jan 17, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -167,11 +167,6 @@ export class PipelinedQueryExecutionContext implements ExecutionContext {

// Removed callback here beacuse it wouldn't have ever worked...
public hasMoreResults(): boolean {
console.log(
"this.fetchBuffer.length, this.endpoint.hasMoreResults: ",
this.fetchBuffer.length,
this.endpoint.hasMoreResults(),
);
return this.fetchBuffer.length !== 0 || this.endpoint.hasMoreResults();
}

Expand Down
7 changes: 0 additions & 7 deletions sdk/cosmosdb/cosmos/src/queryIterator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -241,13 +241,8 @@ export class QueryIterator<T> {
if (!this.isInitialized) {
await this.init(diagnosticNode);
}
console.log("toArrayImplementation");
while (this.queryExecutionContext.hasMoreResults()) {
let response: Response<any>;
console.log(
"toArrayImplementation fetchMore loop,",
this.queryExecutionContext.hasMoreResults(),
);
try {
response = await this.queryExecutionContext.fetchMore(diagnosticNode);
} catch (error: any) {
Expand All @@ -259,14 +254,12 @@ export class QueryIterator<T> {
}
}
const { result, headers } = response;
console.log("toArrayImplementation fetchMore result", result);
// concatenate the results and fetch more
mergeHeaders(this.fetchAllLastResHeaders, headers);
if (result) {
this.fetchAllTempResources.push(...result);
}
}
console.log("toArrayImplementation fetchAllTempResources", this.fetchAllTempResources);
return new FeedResponse(
this.fetchAllTempResources,
this.fetchAllLastResHeaders,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import {
ExecutionContext,
ParallelQueryExecutionContextBase,
} from "../../../../src/queryExecutionContext";
import { Response } from "../../../../src/request";
import { DiagnosticNodeInternal } from "../../../../src/diagnostics/DiagnosticNodeInternal";

export class TestParallelQueryExecutionContext
extends ParallelQueryExecutionContextBase
Expand All @@ -17,4 +19,15 @@ export class TestParallelQueryExecutionContext
): number {
return docProd1.generation - docProd2.generation;
}

private async bufferMore(diagnosticNode?: DiagnosticNodeInternal): Promise<void> {
// TODO: need to upadte headers from here, so make sure it returns it
await this.bufferDocumentProducers(diagnosticNode);
await this.fillBufferFromBufferQueue();
}

public async fetchMore(diagnosticNode?: DiagnosticNodeInternal): Promise<Response<any>> {
await this.bufferMore(diagnosticNode);
return this.drainBufferedItems();
}
}
266 changes: 266 additions & 0 deletions sdk/cosmosdb/cosmos/test/internal/unit/partitionMerge.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,266 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
import {
ClientConfigDiagnostic,
ClientContext,
ConsistencyLevel,
Constants,
CosmosClientOptions,
CosmosDbDiagnosticLevel,
DiagnosticNodeInternal,
FeedOptions,
GlobalEndpointManager,
QueryInfo,
RequestOptions,
QueryIterator,
PartitionKeyRange,
Resource,
StatusCodes,
} from "../../../src";
import { expect, assert } from "chai";
import { TestParallelQueryExecutionContext } from "./common/TestParallelQueryExecutionContext";
import sinon from "sinon";
import { SubStatusCodes } from "../../../src/common";

const createMockPartitionKeyRange = (id: string, minInclusive: string, maxExclusive: string) => ({
id, // Range ID
_rid: "range-rid", // Resource ID of the partition key range
minInclusive, // Minimum value of the partition key range
maxExclusive, // Maximum value of the partition key range
_etag: "sample-etag", // ETag for concurrency control
_self: `/dbs/sample-db/colls/sample-collection/pkranges/${id}`, // Self-link
throughputFraction: 1.0, // Throughput assigned to this partition
status: "Online", // Status of the partition
});

const createMockDocument = (id: string, name: string, value: string) => ({
id,
_rid: "sample-rid-2",
_ts: Date.now(),
_self: "/dbs/sample-db/colls/sample-collection/docs/sample-id-2",
_etag: "sample-etag-2",
name: name,
value: value,
});

function createTestClientContext(
options: Partial<CosmosClientOptions>,
diagnosticLevel: CosmosDbDiagnosticLevel,
) {
const clientOps: CosmosClientOptions = {
endpoint: "",
connectionPolicy: {
enableEndpointDiscovery: false,
preferredLocations: ["https://localhhost"],
},
...options,
};
const globalEndpointManager = new GlobalEndpointManager(
clientOps,
async (diagnosticNode: DiagnosticNodeInternal, opts: RequestOptions) => {
expect(opts).to.exist; // eslint-disable-line no-unused-expressions
const dummyAccount: any = diagnosticNode;
return dummyAccount;
},
);
const clientConfig: ClientConfigDiagnostic = {
endpoint: "",
resourceTokensConfigured: true,
tokenProviderConfigured: true,
aadCredentialsConfigured: true,
connectionPolicyConfigured: true,
consistencyLevel: ConsistencyLevel.BoundedStaleness,
defaultHeaders: {},
agentConfigured: true,
userAgentSuffix: "",
pluginsConfigured: true,
sDKVersion: Constants.SDKVersion,
...options,
};
const clientContext = new ClientContext(
clientOps,
globalEndpointManager,
clientConfig,
diagnosticLevel,
);
return clientContext;
}

const collectionLink = "/dbs/testDb/colls/testCollection"; // Sample collection link
const query = "SELECT * FROM c"; // Example query string or SqlQuerySpec object
const options: FeedOptions = { maxItemCount: 2, maxDegreeOfParallelism: 1 };
const queryInfo: QueryInfo = {
orderBy: ["Ascending"],
rewrittenQuery: "SELECT * FROM c",
} as QueryInfo;
const partitionedQueryExecutionInfo = {
queryRanges: [
{
min: "",
max: "1FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF",
isMinInclusive: true, // Whether the minimum value is inclusive
isMaxInclusive: false,
},
{
min: "1FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF",
max: "FF",
isMinInclusive: true, // Whether the minimum value is inclusive
isMaxInclusive: false,
},
],
queryInfo: queryInfo,
partitionedQueryExecutionInfoVersion: 1,
};
const cosmosClientOptions = {
endpoint: "https://your-cosmos-db.documents.azure.com:443/",
key: "your-cosmos-db-key",
userAgentSuffix: "MockClient",
};
const correlatedActivityId = "sample-activity-id"; // Example correlated activity ID

const diagnosticLevel = CosmosDbDiagnosticLevel.info;

describe("Partition-Merge", function () {
const clientContext = createTestClientContext(cosmosClientOptions, diagnosticLevel); // Mock ClientContext instance
const mockPartitionKeyRange1 = createMockPartitionKeyRange(
"parent1",
"",
"1FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF",
);
const mockPartitionKeyRange2 = createMockPartitionKeyRange(
"parent2",
"1FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF",
"FF",
);

const fetchAllInternalStub = sinon.stub().resolves({
resources: [mockPartitionKeyRange1, mockPartitionKeyRange2],
headers: { "x-ms-request-charge": "1.23" },
code: 200,
});
sinon.stub(clientContext, "queryPartitionKeyRanges").returns({
fetchAllInternal: fetchAllInternalStub, // Add fetchAllInternal to mimic expected structure
} as unknown as QueryIterator<PartitionKeyRange>);

const mockDocument1 = createMockDocument(
"sample-id-1",
"Sample Document 1",
"This is the first sample document",
);
const mockDocument2 = createMockDocument(
"sample-id-2",
"Sample Document 2",
"This is the second sample document",
);

// Define a stub for queryFeed in clientContext
sinon.stub(clientContext, "queryFeed").resolves({
result: [mockDocument1, mockDocument2] as unknown as Resource, // Add result to mimic expected structure
headers: {
"x-ms-request-charge": "3.5", // Example RU charge
"x-ms-continuation": "token-for-next-page", // Continuation token for pagination
},
code: 200, // Optional status code
});

// Create a new instance of TestParallelQueryExecutionContext
const context = new TestParallelQueryExecutionContext(
clientContext,
collectionLink,
query,
options,
partitionedQueryExecutionInfo,
correlatedActivityId,
);
context["options"] = options;

it("there should be 2 document producers in the unfilledDocumentProducersQueue as there are two partition key ranges", async function () {
// Assert that the priority queue has 2 document producers
assert.equal(context["unfilledDocumentProducersQueue"].size(), 2);

// Assert that the document producers have the correct start and end EPKs and populateEpkRangeHeaders is false
context["unfilledDocumentProducersQueue"].forEach((docProd) => {
if (docProd.targetPartitionKeyRange.id === mockPartitionKeyRange1.id) {
assert.equal(docProd.startEpk, mockPartitionKeyRange1.minInclusive);
assert.equal(docProd.endEpk, mockPartitionKeyRange1.maxExclusive);
} else if (docProd.targetPartitionKeyRange.id === mockPartitionKeyRange2.id) {
assert.equal(docProd.startEpk, mockPartitionKeyRange2.minInclusive);
assert.equal(docProd.endEpk, mockPartitionKeyRange2.maxExclusive);
}
assert.equal(docProd.populateEpkRangeHeaders, false);
});
});

it("Correct parent epk ranges are picked up in the newly created child document producers and _enqueueReplacementDocumentProducers function should be called if partition is gone due to merge", async function () {
const parentDocProd1 = context["unfilledDocumentProducersQueue"].peek();

// Stub the bufferMore method of the document producers to throw a Gone error
context["unfilledDocumentProducersQueue"].forEach((docProd) => {
sinon.stub(docProd, "bufferMore").rejects({
code: StatusCodes.Gone,
substatus: SubStatusCodes.PartitionKeyRangeGone,
message: "Partition key range is gone",
});
});
const parentDocumentProducer1StartEpk = parentDocProd1.startEpk;
const parentDocumentProducer1EndEpk = parentDocProd1.endEpk;

// Mocking the _getReplacementPartitionKeyRanges function to return a single partition key range
const getReplacementPartitionKeyRangesStub = sinon
.stub(context as any, "_getReplacementPartitionKeyRanges")
.resolves([createMockPartitionKeyRange("child1", "", "1FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF")]);

// Creating a spy on the _enqueueReplacementDocumentProducers function
const enqueueSpy = sinon.spy(context as any, "_enqueueReplacementDocumentProducers");

try {
// The query fails because the fetchMore method of the first document producer throws a Gone error
await context.fetchMore(context["diagnosticNodeWrapper"]["diagnosticNode"]);
assert.fail("Expected query to fail");
} catch (err) {
assert(err);
}

// Assert that the _enqueueReplacementDocumentProducers function was called once
assert(enqueueSpy.calledOnce);
enqueueSpy.restore();

// Assert that the priority queue has 2 document producers. One parent and one newly created child
assert.equal(context["unfilledDocumentProducersQueue"].size(), 2);

// Assert that the newly created document producer has the correct start and end EPKs from Parent and populateEpkRangeHeaders is true
context["unfilledDocumentProducersQueue"].forEach((docProd) => {
if (docProd.targetPartitionKeyRange.id === "child1") {
assert.equal(docProd.startEpk, parentDocumentProducer1StartEpk);
assert.equal(docProd.endEpk, parentDocumentProducer1EndEpk);
assert.equal(docProd.populateEpkRangeHeaders, true);
}
});

// Removing the child document producer from the priority queue
context["unfilledDocumentProducersQueue"].deq();

// Assert that the priority queue has 1 document producer
assert.equal(context["unfilledDocumentProducersQueue"].size(), 1);

const parentDocProd2 = context["unfilledDocumentProducersQueue"].peek();

const parentDocumentProducer2StartEpk = parentDocProd2.startEpk;
const parentDocumentProducer2EndEpk = parentDocProd2.endEpk;

// Restoring and mocking again the _getReplacementPartitionKeyRanges function
getReplacementPartitionKeyRangesStub.restore();
sinon
.stub(context as any, "_getReplacementPartitionKeyRanges")
.resolves([createMockPartitionKeyRange("child2", "1FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF", "FF")]);

// Assert that the newly created document producer has the correct start and end EPKs from Parent and populateEpkRangeHeaders is true
context["unfilledDocumentProducersQueue"].forEach((docProd) => {
if (docProd.targetPartitionKeyRange.id === "child2") {
assert.equal(docProd.startEpk, parentDocumentProducer2StartEpk);
assert.equal(docProd.endEpk, parentDocumentProducer2EndEpk);
assert.equal(docProd.populateEpkRangeHeaders, true);
}
});
});
});
Loading
Loading