11import { z } from "zod" ;
2+ import type { AggregationCursor } from "mongodb" ;
23import type { CallToolResult } from "@modelcontextprotocol/sdk/types.js" ;
4+ import type { NodeDriverServiceProvider } from "@mongosh/service-provider-node-driver" ;
35import { DbOperationArgs , MongoDBToolBase } from "../mongodbTool.js" ;
4- import type { ToolArgs , OperationType } from "../../tool.js" ;
6+ import type { ToolArgs , OperationType , ToolExecutionContext } from "../../tool.js" ;
57import { formatUntrustedData } from "../../tool.js" ;
68import { checkIndexUsage } from "../../../helpers/indexCheck.js" ;
7- import { EJSON } from "bson" ;
9+ import { type Document , EJSON } from "bson" ;
810import { ErrorCodes , MongoDBError } from "../../../common/errors.js" ;
11+ import { collectCursorUntilMaxBytesLimit } from "../../../helpers/collectCursorUntilMaxBytes.js" ;
12+ import { operationWithFallback } from "../../../helpers/operationWithFallback.js" ;
13+ import { AGG_COUNT_MAX_TIME_MS_CAP , ONE_MB , CURSOR_LIMITS_TO_LLM_TEXT } from "../../../helpers/constants.js" ;
914import { zEJSON } from "../../args.js" ;
15+ import { LogId } from "../../../common/logger.js" ;
1016
1117export const AggregateArgs = {
1218 pipeline : z . array ( zEJSON ( ) ) . describe ( "An array of aggregation stages to execute" ) ,
19+ responseBytesLimit : z . number ( ) . optional ( ) . default ( ONE_MB ) . describe ( `\
20+ The maximum number of bytes to return in the response. This value is capped by the server’s configured maxBytesPerQuery and cannot be exceeded. \
21+ Note to LLM: If the entire aggregation result is required, use the "export" tool instead of increasing this limit.\
22+ ` ) ,
1323} ;
1424
1525export class AggregateTool extends MongoDBToolBase {
@@ -21,32 +31,80 @@ export class AggregateTool extends MongoDBToolBase {
2131 } ;
2232 public operationType : OperationType = "read" ;
2333
24- protected async execute ( {
25- database,
26- collection,
27- pipeline,
28- } : ToolArgs < typeof this . argsShape > ) : Promise < CallToolResult > {
29- const provider = await this . ensureConnected ( ) ;
34+ protected async execute (
35+ { database, collection, pipeline, responseBytesLimit } : ToolArgs < typeof this . argsShape > ,
36+ { signal } : ToolExecutionContext
37+ ) : Promise < CallToolResult > {
38+ let aggregationCursor : AggregationCursor | undefined = undefined ;
39+ try {
40+ const provider = await this . ensureConnected ( ) ;
3041
31- this . assertOnlyUsesPermittedStages ( pipeline ) ;
42+ this . assertOnlyUsesPermittedStages ( pipeline ) ;
3243
33- // Check if aggregate operation uses an index if enabled
34- if ( this . config . indexCheck ) {
35- await checkIndexUsage ( provider , database , collection , "aggregate" , async ( ) => {
36- return provider
37- . aggregate ( database , collection , pipeline , { } , { writeConcern : undefined } )
38- . explain ( "queryPlanner" ) ;
39- } ) ;
40- }
44+ // Check if aggregate operation uses an index if enabled
45+ if ( this . config . indexCheck ) {
46+ await checkIndexUsage ( provider , database , collection , "aggregate" , async ( ) => {
47+ return provider
48+ . aggregate ( database , collection , pipeline , { } , { writeConcern : undefined } )
49+ . explain ( "queryPlanner" ) ;
50+ } ) ;
51+ }
4152
42- const documents = await provider . aggregate ( database , collection , pipeline ) . toArray ( ) ;
53+ const cappedResultsPipeline = [ ...pipeline ] ;
54+ if ( this . config . maxDocumentsPerQuery > 0 ) {
55+ cappedResultsPipeline . push ( { $limit : this . config . maxDocumentsPerQuery } ) ;
56+ }
57+ aggregationCursor = provider . aggregate ( database , collection , cappedResultsPipeline ) ;
4358
44- return {
45- content : formatUntrustedData (
46- `The aggregation resulted in ${ documents . length } documents.` ,
47- documents . length > 0 ? EJSON . stringify ( documents ) : undefined
48- ) ,
49- } ;
59+ const [ totalDocuments , cursorResults ] = await Promise . all ( [
60+ this . countAggregationResultDocuments ( { provider, database, collection, pipeline } ) ,
61+ collectCursorUntilMaxBytesLimit ( {
62+ cursor : aggregationCursor ,
63+ configuredMaxBytesPerQuery : this . config . maxBytesPerQuery ,
64+ toolResponseBytesLimit : responseBytesLimit ,
65+ abortSignal : signal ,
66+ } ) ,
67+ ] ) ;
68+
69+ // If the total number of documents that the aggregation would've
70+ // resulted in would be greater than the configured
71+ // maxDocumentsPerQuery then we know for sure that the results were
72+ // capped.
73+ const aggregationResultsCappedByMaxDocumentsLimit =
74+ this . config . maxDocumentsPerQuery > 0 &&
75+ ! ! totalDocuments &&
76+ totalDocuments > this . config . maxDocumentsPerQuery ;
77+
78+ return {
79+ content : formatUntrustedData (
80+ this . generateMessage ( {
81+ aggResultsCount : totalDocuments ,
82+ documents : cursorResults . documents ,
83+ appliedLimits : [
84+ aggregationResultsCappedByMaxDocumentsLimit ? "config.maxDocumentsPerQuery" : undefined ,
85+ cursorResults . cappedBy ,
86+ ] . filter ( ( limit ) : limit is keyof typeof CURSOR_LIMITS_TO_LLM_TEXT => ! ! limit ) ,
87+ } ) ,
88+ cursorResults . documents . length > 0 ? EJSON . stringify ( cursorResults . documents ) : undefined
89+ ) ,
90+ } ;
91+ } finally {
92+ if ( aggregationCursor ) {
93+ void this . safeCloseCursor ( aggregationCursor ) ;
94+ }
95+ }
96+ }
97+
98+ private async safeCloseCursor ( cursor : AggregationCursor < unknown > ) : Promise < void > {
99+ try {
100+ await cursor . close ( ) ;
101+ } catch ( error ) {
102+ this . session . logger . warning ( {
103+ id : LogId . mongodbCursorCloseError ,
104+ context : "aggregate tool" ,
105+ message : `Error when closing the cursor - ${ error instanceof Error ? error . message : String ( error ) } ` ,
106+ } ) ;
107+ }
50108 }
51109
52110 private assertOnlyUsesPermittedStages ( pipeline : Record < string , unknown > [ ] ) : void {
@@ -70,4 +128,57 @@ export class AggregateTool extends MongoDBToolBase {
70128 }
71129 }
72130 }
131+
132+ private async countAggregationResultDocuments ( {
133+ provider,
134+ database,
135+ collection,
136+ pipeline,
137+ } : {
138+ provider : NodeDriverServiceProvider ;
139+ database : string ;
140+ collection : string ;
141+ pipeline : Document [ ] ;
142+ } ) : Promise < number | undefined > {
143+ const resultsCountAggregation = [ ...pipeline , { $count : "totalDocuments" } ] ;
144+ return await operationWithFallback ( async ( ) : Promise < number | undefined > => {
145+ const aggregationResults = await provider
146+ . aggregate ( database , collection , resultsCountAggregation )
147+ . maxTimeMS ( AGG_COUNT_MAX_TIME_MS_CAP )
148+ . toArray ( ) ;
149+
150+ const documentWithCount : unknown = aggregationResults . length === 1 ? aggregationResults [ 0 ] : undefined ;
151+ const totalDocuments =
152+ documentWithCount &&
153+ typeof documentWithCount === "object" &&
154+ "totalDocuments" in documentWithCount &&
155+ typeof documentWithCount . totalDocuments === "number"
156+ ? documentWithCount . totalDocuments
157+ : 0 ;
158+
159+ return totalDocuments ;
160+ } , undefined ) ;
161+ }
162+
163+ private generateMessage ( {
164+ aggResultsCount,
165+ documents,
166+ appliedLimits,
167+ } : {
168+ aggResultsCount : number | undefined ;
169+ documents : unknown [ ] ;
170+ appliedLimits : ( keyof typeof CURSOR_LIMITS_TO_LLM_TEXT ) [ ] ;
171+ } ) : string {
172+ const appliedLimitText = appliedLimits . length
173+ ? `\
174+ while respecting the applied limits of ${ appliedLimits . map ( ( limit ) => CURSOR_LIMITS_TO_LLM_TEXT [ limit ] ) . join ( ", " ) } . \
175+ Note to LLM: If the entire query result is required then use "export" tool to export the query results.\
176+ `
177+ : "" ;
178+
179+ return `\
180+ The aggregation resulted in ${ aggResultsCount === undefined ? "indeterminable number of" : aggResultsCount } documents. \
181+ Returning ${ documents . length } documents${ appliedLimitText ? ` ${ appliedLimitText } ` : "." } \
182+ ` ;
183+ }
73184}
0 commit comments