1
1
"""
2
2
Code Implementation Agent for File-by-File Development
3
- 文件逐个开发的代码实现代理
4
3
5
4
Handles systematic code implementation with progress tracking and
6
5
memory optimization for long-running development sessions.
7
- 处理系统性代码实现,具有进度跟踪和长时间开发会话的内存优化。
8
6
"""
9
7
10
8
import json
35
33
class CodeImplementationAgent :
36
34
"""
37
35
Code Implementation Agent for systematic file-by-file development
38
- 用于系统性文件逐个开发的代码实现代理
39
-
40
- Responsibilities / 职责:
41
- - Track file implementation progress / 跟踪文件实现进度
42
- - Execute MCP tool calls for code generation / 执行MCP工具调用进行代码生成
43
- - Monitor implementation status / 监控实现状态
44
- - Coordinate with Summary Agent for memory optimization / 与总结代理协调进行内存优化
45
- - Calculate token usage for context management / 计算token使用量用于上下文管理
36
+
37
+ Responsibilities:
38
+ - Track file implementation progress
39
+ - Execute MCP tool calls for code generation
40
+ - Monitor implementation status
41
+ - Coordinate with Summary Agent for memory optimization
42
+ - Calculate token usage for context management
46
43
"""
47
44
48
45
def __init__ (
@@ -53,7 +50,6 @@ def __init__(
53
50
):
54
51
"""
55
52
Initialize Code Implementation Agent
56
- 初始化代码实现代理
57
53
58
54
Args:
59
55
mcp_agent: MCP agent instance for tool calls
@@ -72,26 +68,32 @@ def __init__(
72
68
"dependency_analysis" : [], # Track dependency analysis and file reads
73
69
}
74
70
self .files_implemented_count = 0
75
- self .implemented_files_set = set () # Track unique file paths to avoid duplicate counting / 跟踪唯一文件路径以避免重复计数
71
+ self .implemented_files_set = (
72
+ set ()
73
+ ) # Track unique file paths to avoid duplicate counting
76
74
self .files_read_for_dependencies = (
77
75
set ()
78
- ) # Track files read for dependency analysis / 跟踪为依赖分析而读取的文件
79
- self .last_summary_file_count = 0 # Track the file count when last summary was triggered / 跟踪上次触发总结时的文件数
76
+ ) # Track files read for dependency analysis
77
+ self .last_summary_file_count = (
78
+ 0 # Track the file count when last summary was triggered
79
+ )
80
80
81
- # Token calculation settings / Token计算设置
82
- self .max_context_tokens = 200000 # Default max context tokens for Claude-3.5-Sonnet / Claude-3.5-Sonnet的默认最大上下文tokens
83
- self .token_buffer = (
84
- 10000 # Safety buffer before reaching max / 达到最大值前的安全缓冲区
81
+ # Token calculation settings
82
+ self .max_context_tokens = (
83
+ 200000 # Default max context tokens for Claude-3.5-Sonnet
85
84
)
85
+ self .token_buffer = 10000 # Safety buffer before reaching max
86
86
self .summary_trigger_tokens = (
87
87
self .max_context_tokens - self .token_buffer
88
- ) # Trigger summary when approaching limit / 接近限制时触发总结
89
- self .last_summary_token_count = 0 # Track token count when last summary was triggered / 跟踪上次触发总结时的token数
88
+ ) # Trigger summary when approaching limit
89
+ self .last_summary_token_count = (
90
+ 0 # Track token count when last summary was triggered
91
+ )
90
92
91
- # Initialize tokenizer / 初始化tokenizer
93
+ # Initialize tokenizer
92
94
if TIKTOKEN_AVAILABLE :
93
95
try :
94
- # Use Claude-3 tokenizer (approximation with OpenAI's o200k_base) / 使用Claude-3 tokenizer(用OpenAI的o200k_base近似)
96
+ # Use Claude-3 tokenizer (approximation with OpenAI's o200k_base)
95
97
self .tokenizer = tiktoken .get_encoding ("o200k_base" )
96
98
self .logger .info ("Token calculation enabled with o200k_base encoding" )
97
99
except Exception as e :
@@ -103,14 +105,14 @@ def __init__(
103
105
"tiktoken not available, token-based summary triggering disabled"
104
106
)
105
107
106
- # Analysis loop detection / 分析循环检测
107
- self .recent_tool_calls = [] # Track recent tool calls to detect analysis loops / 跟踪最近的工具调用以检测分析循环
108
- self .max_read_without_write = 5 # Max read_file calls without write_file / 没有write_file的最大read_file调用次数
108
+ # Analysis loop detection
109
+ self .recent_tool_calls = [] # Track recent tool calls to detect analysis loops
110
+ self .max_read_without_write = 5 # Max read_file calls without write_file
109
111
110
- # Memory agent integration / 内存代理集成
111
- self .memory_agent = None # Will be set externally / 将从外部设置
112
- self .llm_client = None # Will be set externally / 将从外部设置
113
- self .llm_client_type = None # Will be set externally / 将从外部设置
112
+ # Memory agent integration
113
+ self .memory_agent = None # Will be set externally
114
+ self .llm_client = None # Will be set externally
115
+ self .llm_client_type = None # Will be set externally
114
116
115
117
# Log read tools configuration
116
118
read_tools_status = "ENABLED" if self .enable_read_tools else "DISABLED"
@@ -123,7 +125,7 @@ def __init__(
123
125
)
124
126
125
127
def _create_default_logger (self ) -> logging .Logger :
126
- """Create default logger if none provided / 如果未提供则创建默认日志记录器 """
128
+ """Create default logger if none provided"""
127
129
logger = logging .getLogger (f"{ __name__ } .CodeImplementationAgent" )
128
130
# Don't add handlers to child loggers - let them propagate to root
129
131
logger .setLevel (logging .INFO )
@@ -132,14 +134,12 @@ def _create_default_logger(self) -> logging.Logger:
132
134
def get_system_prompt (self ) -> str :
133
135
"""
134
136
Get the system prompt for code implementation
135
- 获取代码实现的系统提示词
136
137
"""
137
138
return GENERAL_CODE_IMPLEMENTATION_SYSTEM_PROMPT
138
139
139
140
def set_memory_agent (self , memory_agent , llm_client = None , llm_client_type = None ):
140
141
"""
141
142
Set memory agent for code summary generation
142
- 设置内存代理用于代码总结生成
143
143
144
144
Args:
145
145
memory_agent: Memory agent instance
@@ -154,7 +154,6 @@ def set_memory_agent(self, memory_agent, llm_client=None, llm_client_type=None):
154
154
async def execute_tool_calls (self , tool_calls : List [Dict ]) -> List [Dict ]:
155
155
"""
156
156
Execute MCP tool calls and track implementation progress
157
- 执行MCP工具调用并跟踪实现进度
158
157
159
158
Args:
160
159
tool_calls: List of tool calls to execute
@@ -226,18 +225,18 @@ async def execute_tool_calls(self, tool_calls: List[Dict]) -> List[Dict]:
226
225
)
227
226
228
227
if self .mcp_agent :
229
- # Execute tool call through MCP protocol / 通过MCP协议执行工具调用
228
+ # Execute tool call through MCP protocol
230
229
result = await self .mcp_agent .call_tool (tool_name , tool_input )
231
230
232
- # Track file implementation progress / 跟踪文件实现进度
231
+ # Track file implementation progress
233
232
if tool_name == "write_file" :
234
233
await self ._track_file_implementation_with_summary (
235
234
tool_call , result
236
235
)
237
236
elif tool_name == "read_file" :
238
237
self ._track_dependency_analysis (tool_call , result )
239
238
240
- # Track tool calls for analysis loop detection / 跟踪工具调用以检测分析循环
239
+ # Track tool calls for analysis loop detection
241
240
self ._track_tool_call_for_loop_detection (tool_name )
242
241
243
242
results .append (
@@ -282,8 +281,6 @@ async def _handle_read_file_with_memory_optimization(self, tool_call: Dict) -> D
282
281
"""
283
282
Intercept read_file calls and redirect to read_code_mem if a summary exists.
284
283
This prevents unnecessary file reads if the summary is already available.
285
- 拦截read_file调用,如果存在摘要则重定向到read_code_mem。
286
- 这可以防止在摘要已经存在时进行不必要的文件读取。
287
284
"""
288
285
file_path = tool_call ["input" ].get ("file_path" )
289
286
if not file_path :
@@ -389,7 +386,6 @@ async def _track_file_implementation_with_summary(
389
386
):
390
387
"""
391
388
Track file implementation and create code summary
392
- 跟踪文件实现并创建代码总结
393
389
394
390
Args:
395
391
tool_call: The write_file tool call
@@ -428,31 +424,30 @@ async def _track_file_implementation_with_summary(
428
424
def _track_file_implementation (self , tool_call : Dict , result : Any ):
429
425
"""
430
426
Track file implementation progress
431
- 跟踪文件实现进度
432
427
"""
433
428
try :
434
- # Handle different result types from MCP / 处理MCP的不同结果类型
429
+ # Handle different result types from MCP
435
430
result_data = None
436
431
437
- # Check if result is a CallToolResult object / 检查结果是否为CallToolResult对象
432
+ # Check if result is a CallToolResult object
438
433
if hasattr (result , "content" ):
439
- # Extract content from CallToolResult / 从CallToolResult提取内容
434
+ # Extract content from CallToolResult
440
435
if hasattr (result .content , "text" ):
441
436
result_content = result .content .text
442
437
else :
443
438
result_content = str (result .content )
444
439
445
- # Try to parse as JSON / 尝试解析为JSON
440
+ # Try to parse as JSON
446
441
try :
447
442
result_data = json .loads (result_content )
448
443
except json .JSONDecodeError :
449
- # If not JSON, create a structure / 如果不是JSON,创建一个结构
444
+ # If not JSON, create a structure
450
445
result_data = {
451
446
"status" : "success" ,
452
447
"file_path" : tool_call ["input" ].get ("file_path" , "unknown" ),
453
448
}
454
449
elif isinstance (result , str ):
455
- # Try to parse string result / 尝试解析字符串结果
450
+ # Try to parse string result
456
451
try :
457
452
result_data = json .loads (result )
458
453
except json .JSONDecodeError :
@@ -461,16 +456,16 @@ def _track_file_implementation(self, tool_call: Dict, result: Any):
461
456
"file_path" : tool_call ["input" ].get ("file_path" , "unknown" ),
462
457
}
463
458
elif isinstance (result , dict ):
464
- # Direct dictionary result / 直接字典结果
459
+ # Direct dictionary result
465
460
result_data = result
466
461
else :
467
- # Fallback: assume success and extract file path from input / 后备方案:假设成功并从输入中提取文件路径
462
+ # Fallback: assume success and extract file path from input
468
463
result_data = {
469
464
"status" : "success" ,
470
465
"file_path" : tool_call ["input" ].get ("file_path" , "unknown" ),
471
466
}
472
467
473
- # Extract file path for tracking / 提取文件路径用于跟踪
468
+ # Extract file path for tracking
474
469
file_path = None
475
470
if result_data and result_data .get ("status" ) == "success" :
476
471
file_path = result_data .get (
@@ -479,15 +474,15 @@ def _track_file_implementation(self, tool_call: Dict, result: Any):
479
474
else :
480
475
file_path = tool_call ["input" ].get ("file_path" )
481
476
482
- # Only count unique files, not repeated tool calls on same file / 只计数唯一文件,不重复计数同一文件的工具调用
477
+ # Only count unique files, not repeated tool calls on same file
483
478
if file_path and file_path not in self .implemented_files_set :
484
- # This is a new file implementation / 这是一个新的文件实现
479
+ # This is a new file implementation
485
480
self .implemented_files_set .add (file_path )
486
481
self .files_implemented_count += 1
487
482
# self.logger.info(f"New file implementation tracked: count={self.files_implemented_count}, file={file_path}")
488
483
# print(f"New file implementation tracked: count={self.files_implemented_count}, file={file_path}")
489
484
490
- # Add to completed files list / 添加到已完成文件列表
485
+ # Add to completed files list
491
486
self .implementation_summary ["completed_files" ].append (
492
487
{
493
488
"file" : file_path ,
@@ -503,17 +498,17 @@ def _track_file_implementation(self, tool_call: Dict, result: Any):
503
498
# print(f"📝 NEW FILE IMPLEMENTED: count={self.files_implemented_count}, file={file_path}")
504
499
# print(f"🔧 OPTIMIZATION NOW ENABLED: files_implemented_count > 0 = {self.files_implemented_count > 0}")
505
500
elif file_path and file_path in self .implemented_files_set :
506
- # This file was already implemented (duplicate tool call) / 这个文件已经被实现过了(重复工具调用)
501
+ # This file was already implemented (duplicate tool call)
507
502
self .logger .debug (
508
503
f"File already tracked, skipping duplicate count: { file_path } "
509
504
)
510
505
else :
511
- # No valid file path found / 没有找到有效的文件路径
506
+ # No valid file path found
512
507
self .logger .warning ("No valid file path found for tracking" )
513
508
514
509
except Exception as e :
515
510
self .logger .warning (f"Failed to track file implementation: { e } " )
516
- # Even if tracking fails, try to count based on tool input (but check for duplicates) / 即使跟踪失败,也尝试根据工具输入计数(但检查重复)
511
+ # Even if tracking fails, try to count based on tool input (but check for duplicates)
517
512
518
513
file_path = tool_call ["input" ].get ("file_path" )
519
514
if file_path and file_path not in self .implemented_files_set :
@@ -526,16 +521,15 @@ def _track_file_implementation(self, tool_call: Dict, result: Any):
526
521
def _track_dependency_analysis (self , tool_call : Dict , result : Any ):
527
522
"""
528
523
Track dependency analysis through read_file calls
529
- 跟踪通过read_file调用进行的依赖分析
530
524
"""
531
525
try :
532
526
file_path = tool_call ["input" ].get ("file_path" )
533
527
if file_path :
534
- # Track unique files read for dependency analysis / 跟踪为依赖分析而读取的唯一文件
528
+ # Track unique files read for dependency analysis
535
529
if file_path not in self .files_read_for_dependencies :
536
530
self .files_read_for_dependencies .add (file_path )
537
531
538
- # Add to dependency analysis summary / 添加到依赖分析总结
532
+ # Add to dependency analysis summary
539
533
self .implementation_summary ["dependency_analysis" ].append (
540
534
{
541
535
"file_read" : file_path ,
@@ -554,7 +548,6 @@ def _track_dependency_analysis(self, tool_call: Dict, result: Any):
554
548
def calculate_messages_token_count (self , messages : List [Dict ]) -> int :
555
549
"""
556
550
Calculate total token count for a list of messages
557
- 计算消息列表的总token数
558
551
559
552
Args:
560
553
messages: List of chat messages with 'role' and 'content' keys
@@ -563,9 +556,9 @@ def calculate_messages_token_count(self, messages: List[Dict]) -> int:
563
556
Total token count
564
557
"""
565
558
if not self .tokenizer :
566
- # Fallback: rough estimation based on character count / 回退:基于字符数的粗略估计
559
+ # Fallback: rough estimation based on character count
567
560
total_chars = sum (len (str (msg .get ("content" , "" ))) for msg in messages )
568
- # Rough approximation: 1 token ≈ 4 characters / 粗略近似:1个token ≈ 4个字符
561
+ # Rough approximation: 1 token ≈ 4 characters
569
562
return total_chars // 4
570
563
571
564
try :
@@ -574,31 +567,28 @@ def calculate_messages_token_count(self, messages: List[Dict]) -> int:
574
567
content = str (message .get ("content" , "" ))
575
568
role = message .get ("role" , "" )
576
569
577
- # Count tokens for content / 计算内容的token数
570
+ # Count tokens for content
578
571
if content :
579
572
content_tokens = len (
580
573
self .tokenizer .encode (content , disallowed_special = ())
581
574
)
582
575
total_tokens += content_tokens
583
576
584
- # Add tokens for role and message structure / 为角色和消息结构添加token
577
+ # Add tokens for role and message structure
585
578
role_tokens = len (self .tokenizer .encode (role , disallowed_special = ()))
586
- total_tokens += (
587
- role_tokens + 4
588
- ) # Extra tokens for message formatting / 消息格式化的额外token
579
+ total_tokens += role_tokens + 4 # Extra tokens for message formatting
589
580
590
581
return total_tokens
591
582
592
583
except Exception as e :
593
584
self .logger .warning (f"Token calculation failed: { e } " )
594
- # Fallback estimation / 回退估计
585
+ # Fallback estimation
595
586
total_chars = sum (len (str (msg .get ("content" , "" ))) for msg in messages )
596
587
return total_chars // 4
597
588
598
589
def should_trigger_summary_by_tokens (self , messages : List [Dict ]) -> bool :
599
590
"""
600
591
Check if summary should be triggered based on token count
601
- 根据token数检查是否应触发总结
602
592
603
593
Args:
604
594
messages: Current conversation messages
0 commit comments