Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Cosmos] Query pipeline rewrite #32578

Open
wants to merge 18 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions sdk/cosmosdb/cosmos/review/cosmos.api.md
Original file line number Diff line number Diff line change
Expand Up @@ -1054,6 +1054,7 @@ export interface FeedOptions extends SharedOptions {
continuationToken?: string;
continuationTokenLimitInKB?: number;
disableNonStreamingOrderByQuery?: boolean;
enableQueryControl?: boolean;
enableScanInQuery?: boolean;
forceQueryPlan?: boolean;
maxDegreeOfParallelism?: number;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.

import { GlobalStatistics } from "../../request/globalStatistics";
import { Aggregator } from "./Aggregator";
import type { GlobalStatistics } from "../../request/globalStatistics";
import type { Aggregator } from "./Aggregator";

export class GlobalStatisticsAggregator implements Aggregator {
private globalStatistics: GlobalStatistics;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,6 @@ import { getInitialHeader, mergeHeaders } from "../headerUtils";
import { emptyGroup, extractAggregateResult } from "./emptyGroup";
import type { DiagnosticNodeInternal } from "../../diagnostics/DiagnosticNodeInternal";

interface GroupByResponse {
result: GroupByResult;
headers: CosmosHeaders;
}

interface GroupByResult {
groupByItems: any[];
payload: any;
Expand All @@ -32,52 +27,51 @@ export class GroupByEndpointComponent implements ExecutionContext {
private readonly aggregateResultArray: any[] = [];
private completed: boolean = false;

public async nextItem(diagnosticNode: DiagnosticNodeInternal): Promise<Response<any>> {
// If we have a full result set, begin returning results
if (this.aggregateResultArray.length > 0) {
return {
result: this.aggregateResultArray.pop(),
headers: getInitialHeader(),
};
}
public hasMoreResults(): boolean {
return this.executionContext.hasMoreResults();
}

public async fetchMore(diagnosticNode: DiagnosticNodeInternal): Promise<Response<any>> {
if (this.completed) {
return {
result: undefined,
headers: getInitialHeader(),
};
}

const aggregateHeaders = getInitialHeader();
const response = await this.executionContext.fetchMore(diagnosticNode);
mergeHeaders(aggregateHeaders, response.headers);

while (this.executionContext.hasMoreResults()) {
// Grab the next result
const { result, headers } = (await this.executionContext.nextItem(
diagnosticNode,
)) as GroupByResponse;
mergeHeaders(aggregateHeaders, headers);
if (response === undefined || response.result === undefined) {
// If there are any groupings, consolidate and return them
if (this.groupings.size > 0) {
return this.consolidateGroupResults(aggregateHeaders);
}
return { result: undefined, headers: aggregateHeaders };
}

for (const item of response.result as GroupByResult[]) {
// If it exists, process it via aggregators
if (result) {
const group = result.groupByItems ? await hashObject(result.groupByItems) : emptyGroup;
if (item) {
const group = item.groupByItems ? await hashObject(item.groupByItems) : emptyGroup;
const aggregators = this.groupings.get(group);
const payload = result.payload;
const payload = item.payload;
if (aggregators) {
// Iterator over all results in the payload
Object.keys(payload).map((key) => {
for (const key of Object.keys(payload)) {
// in case the value of a group is null make sure we create a dummy payload with item2==null
const effectiveGroupByValue = payload[key]
? payload[key]
: new Map().set("item2", null);
const aggregateResult = extractAggregateResult(effectiveGroupByValue);
aggregators.get(key).aggregate(aggregateResult);
});
}
} else {
// This is the first time we have seen a grouping. Setup the initial result without aggregate values
const grouping = new Map();
this.groupings.set(group, grouping);
// Iterator over all results in the payload
Object.keys(payload).map((key) => {
for (const key of Object.keys(payload)) {
const aggregateType = this.queryInfo.groupByAliasToAggregateType[key];
// Create a new aggregator for this specific aggregate field
const aggregator = createAggregator(aggregateType);
Expand All @@ -88,11 +82,22 @@ export class GroupByEndpointComponent implements ExecutionContext {
} else {
aggregator.aggregate(payload[key]);
}
});
}
}
}
}

if (this.executionContext.hasMoreResults()) {
return {
result: [],
headers: aggregateHeaders,
};
} else {
return this.consolidateGroupResults(aggregateHeaders);
}
}

private consolidateGroupResults(aggregateHeaders: CosmosHeaders): Response<any> {
for (const grouping of this.groupings.values()) {
const groupResult: any = {};
for (const [aggregateKey, aggregator] of grouping.entries()) {
Expand All @@ -101,13 +106,6 @@ export class GroupByEndpointComponent implements ExecutionContext {
this.aggregateResultArray.push(groupResult);
}
this.completed = true;
return {
result: this.aggregateResultArray.pop(),
headers: aggregateHeaders,
};
}

public hasMoreResults(): boolean {
return this.executionContext.hasMoreResults() || this.aggregateResultArray.length > 0;
return { result: this.aggregateResultArray, headers: aggregateHeaders };
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,6 @@ import { getInitialHeader, mergeHeaders } from "../headerUtils";
import { emptyGroup, extractAggregateResult } from "./emptyGroup";
import type { DiagnosticNodeInternal } from "../../diagnostics/DiagnosticNodeInternal";

interface GroupByResponse {
result: GroupByResult;
headers: CosmosHeaders;
}

interface GroupByResult {
groupByItems: any[];
payload: any;
Expand All @@ -36,39 +31,36 @@ export class GroupByValueEndpointComponent implements ExecutionContext {
this.aggregateType = this.queryInfo.aggregates[0];
}

public async nextItem(diagnosticNode: DiagnosticNodeInternal): Promise<Response<any>> {
// Start returning results if we have processed a full results set
if (this.aggregateResultArray.length > 0) {
return {
result: this.aggregateResultArray.pop(),
headers: getInitialHeader(),
};
}
public hasMoreResults(): boolean {
return this.executionContext.hasMoreResults();
}

public async fetchMore(diagnosticNode: DiagnosticNodeInternal): Promise<Response<any>> {
if (this.completed) {
return {
result: undefined,
headers: getInitialHeader(),
};
}

const aggregateHeaders = getInitialHeader();
const response = await this.executionContext.fetchMore(diagnosticNode);
mergeHeaders(aggregateHeaders, response.headers);

while (this.executionContext.hasMoreResults()) {
// Grab the next result
const { result, headers } = (await this.executionContext.nextItem(
diagnosticNode,
)) as GroupByResponse;
mergeHeaders(aggregateHeaders, headers);
if (response === undefined || response.result === undefined) {
if (this.aggregators.size > 0) {
return this.generateAggregateResponse(aggregateHeaders);
}
return { result: undefined, headers: aggregateHeaders };
}

// If it exists, process it via aggregators
if (result) {
for (const item of response.result as GroupByResult[]) {
if (item) {
let grouping: string = emptyGroup;
let payload: any = result;
if (result.groupByItems) {
let payload: any = item;
if (item.groupByItems) {
// If the query contains a GROUP BY clause, it will have a payload property and groupByItems
payload = result.payload;
grouping = await hashObject(result.groupByItems);
payload = item.payload;
grouping = await hashObject(item.groupByItems);
}

const aggregator = this.aggregators.get(grouping);
Expand Down Expand Up @@ -99,18 +91,26 @@ export class GroupByValueEndpointComponent implements ExecutionContext {
headers: aggregateHeaders,
};
}
// If no results are left in the underlying execution context, convert our aggregate results to an array

if (this.executionContext.hasMoreResults()) {
return { result: [], headers: aggregateHeaders };
} else {
// If no results are left in the underlying execution context, convert our aggregate results to an array
return this.generateAggregateResponse(aggregateHeaders);
}
}

private generateAggregateResponse(aggregateHeaders: CosmosHeaders): Response<any> {
for (const aggregator of this.aggregators.values()) {
this.aggregateResultArray.push(aggregator.getResult());
const result = aggregator.getResult();
if (result !== undefined) {
this.aggregateResultArray.push(result);
}
}
this.completed = true;
return {
result: this.aggregateResultArray.pop(),
result: this.aggregateResultArray,
headers: aggregateHeaders,
};
}

public hasMoreResults(): boolean {
return this.executionContext.hasMoreResults() || this.aggregateResultArray.length > 0;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import { getInitialHeader } from "../headerUtils";
import type { DiagnosticNodeInternal } from "../../diagnostics/DiagnosticNodeInternal";
import { hashObject } from "../../utils/hashObject";
import type { NonStreamingOrderByResult } from "../nonStreamingOrderByResult";
import type { NonStreamingOrderByResponse } from "../nonStreamingOrderByResponse";
import { FixedSizePriorityQueue } from "../../utils/fixedSizePriorityQueue";
import { NonStreamingOrderByMap } from "../../utils/nonStreamingOrderByMap";
import { OrderByComparator } from "../orderByComparator";
Expand Down Expand Up @@ -56,7 +55,48 @@ export class NonStreamingOrderByDistinctEndpointComponent implements ExecutionCo
);
}

public async nextItem(diagnosticNode: DiagnosticNodeInternal): Promise<Response<any>> {
/**
* Build final sorted result array from which responses will be served.
*/
private async buildFinalResultArray(): Promise<void> {
// Fetch all distinct values from the map and store in priority queue.
const allValues = this.aggregateMap.getAllValuesAndReset();
for (const value of allValues) {
this.nonStreamingOrderByPQ.enqueue(value);
}

// Compute the final result array size based on offset and limit.
const offSet = this.queryInfo.offset ? this.queryInfo.offset : 0;
const queueSize = this.nonStreamingOrderByPQ.size();
const finalArraySize = queueSize - offSet;

if (finalArraySize <= 0) {
this.finalResultArray = [];
} else {
this.finalResultArray = new Array(finalArraySize);
// Only keep the final result array size number of items in the final result array and discard the rest.
for (let count = finalArraySize - 1; count >= 0; count--) {
if (this.emitRawOrderByPayload) {
this.finalResultArray[count] = this.nonStreamingOrderByPQ.dequeue();
} else {
this.finalResultArray[count] = this.nonStreamingOrderByPQ.dequeue()?.payload;
}
}
}
}

public hasMoreResults(): boolean {
if (this.priorityQueueBufferSize === 0) return false;
return this.executionContext.hasMoreResults();
}

public async fetchMore(diagnosticNode?: DiagnosticNodeInternal): Promise<Response<any>> {
if (this.isCompleted) {
topshot99 marked this conversation as resolved.
Show resolved Hide resolved
return {
result: undefined,
headers: getInitialHeader(),
};
}
let resHeaders = getInitialHeader();
// if size is 0, just return undefined to signal to more results. Valid if query is TOP 0 or LIMIT 0
if (this.priorityQueueBufferSize <= 0) {
Expand All @@ -69,20 +109,30 @@ export class NonStreamingOrderByDistinctEndpointComponent implements ExecutionCo
// If there are more results in backend, keep filling map.
if (this.executionContext.hasMoreResults()) {
// Grab the next result
const { result, headers } = (await this.executionContext.nextItem(
diagnosticNode,
)) as NonStreamingOrderByResponse;
resHeaders = headers;
if (result) {
// make hash of result object and update the map if required.
const key = await hashObject(result?.payload);
this.aggregateMap.set(key, result);
const response = await this.executionContext.fetchMore(diagnosticNode);
if (response === undefined || response.result === undefined) {
this.isCompleted = true;
if (this.aggregateMap.size() > 0) {
await this.buildFinalResultArray();
return {
result: this.finalResultArray,
headers: response.headers,
};
}
return { result: undefined, headers: response.headers };
}
resHeaders = response.headers;
for (const item of response.result) {
if (item) {
const key = await hashObject(item?.payload);
this.aggregateMap.set(key, item);
}
}

// return {} to signal that there are more results to fetch.
// return [] to signal that there are more results to fetch.
if (this.executionContext.hasMoreResults()) {
return {
result: {},
result: [],
headers: resHeaders,
};
}
Expand All @@ -92,12 +142,8 @@ export class NonStreamingOrderByDistinctEndpointComponent implements ExecutionCo
if (!this.executionContext.hasMoreResults() && !this.isCompleted) {
this.isCompleted = true;
await this.buildFinalResultArray();
}

// Return results from final array.
if (this.finalResultArray.length > 0) {
return {
result: this.finalResultArray.shift(),
result: this.finalResultArray,
headers: resHeaders,
};
}
Expand All @@ -107,39 +153,4 @@ export class NonStreamingOrderByDistinctEndpointComponent implements ExecutionCo
headers: resHeaders,
};
}

/**
* Build final sorted result array from which responses will be served.
*/
private async buildFinalResultArray(): Promise<void> {
// Fetch all distinct values from the map and store in priority queue.
const allValues = this.aggregateMap.getAllValuesAndReset();
for (const value of allValues) {
this.nonStreamingOrderByPQ.enqueue(value);
}

// Compute the final result array size based on offset and limit.
const offSet = this.queryInfo.offset ? this.queryInfo.offset : 0;
const queueSize = this.nonStreamingOrderByPQ.size();
const finalArraySize = queueSize - offSet;

if (finalArraySize <= 0) {
this.finalResultArray = [];
} else {
this.finalResultArray = new Array(finalArraySize);
// Only keep the final result array size number of items in the final result array and discard the rest.
for (let count = finalArraySize - 1; count >= 0; count--) {
if (this.emitRawOrderByPayload) {
this.finalResultArray[count] = this.nonStreamingOrderByPQ.dequeue();
} else {
this.finalResultArray[count] = this.nonStreamingOrderByPQ.dequeue()?.payload;
}
}
}
}

public hasMoreResults(): boolean {
if (this.priorityQueueBufferSize === 0) return false;
return this.executionContext.hasMoreResults() || this.finalResultArray.length > 0;
}
}
Loading
Loading