4 智能体的推理引擎和执行模块
4.1 推理引擎:智能体的"思考中枢"
推理引擎是智能体分析信息、形成判断和生成行动计划的核心组件。一个强大的推理引擎使智能体能够处理复杂问题、适应新情境并做出合理决策。
4.1.1 推理的基本类型
演绎推理
从一般原则推导出特定结论:
class DeductiveReasoning:
def __init__(self):
# 存储规则库
self.rules = []
def add_rule(self, condition, conclusion):
"""添加规则到规则库"""
self.rules.append({
"condition": condition,
"conclusion": conclusion
})
def reason(self, facts):
"""基于事实和规则进行推理"""
conclusions = []
# 应用所有规则
for rule in self.rules:
# 检查条件是否满足
if self._evaluate_condition(rule["condition"], facts):
# 添加结论到结论列表
conclusion = rule["conclusion"]
conclusions.append(conclusion)
# 将新结论添加到事实列表以支持链式推理
if isinstance(conclusion, dict):
facts.update(conclusion)
return conclusions
def _evaluate_condition(self, condition, facts):
"""评估条件是否满足"""
if callable(condition):
# 函数形式的条件
return condition(facts)
elif isinstance(condition, dict):
# 字典形式的条件
for key, value in condition.items():
if key not in facts or facts[key] != value:
return False
return True
else:
# 不支持的条件类型
raise ValueError(f"不支持的条件类型: {type(condition)}")
使用示例:
# 创建演绎推理引擎
deductive = DeductiveReasoning()
# 添加规则
deductive.add_rule(
# 条件: 如果是哺乳动物且会飞
{"is_mammal": True, "can_fly": True},
# 结论: 可能是蝙蝠
{"likely_animal": "bat"}
)
deductive.add_rule(
# 条件: 如果是哺乳动物且生活在水中
{"is_mammal": True, "lives_in_water": True},
# 结论: 可能是鲸鱼或海豚
{"likely_animal": ["whale", "dolphin"]}
)
# 使用推理引擎
facts = {"is_mammal": True, "can_fly": True}
conclusions = deductive.reason(facts)
print("推理结论:", conclusions)
归纳推理
从特定实例归纳出一般规律:
import numpy as np
from sklearn.tree import DecisionTreeClassifier
class InductiveReasoning:
def __init__(self):
self.classifier = DecisionTreeClassifier()
self.trained = False
def learn_from_examples(self, examples, labels):
"""从例子中学习模式"""
if len(examples) != len(labels):
raise ValueError("例子和标签数量不匹配")
# 训练分类器
self.classifier.fit(examples, labels)
self.trained = True
def infer_pattern(self, new_instance):
"""推断新实例的模式"""
if not self.trained:
raise RuntimeError("推理引擎尚未通过例子学习")
# 预测新实例的类别
prediction = self.classifier.predict([new_instance])[0]
# 计算置信度(使用类概率)
probabilities = self.classifier.predict_proba([new_instance])[0]
confidence = max(probabilities)
return prediction, confidence
def extract_rules(self):
"""从学习的模型中提取规则"""
if not self.trained:
raise RuntimeError("推理引擎尚未通过例子学习")
# 获取特征重要性
importances = self.classifier.feature_importances_
# 提取规则(简化实现)
rules = []
for i, importance in enumerate(importances):
if importance > 0.1: # 只关注重要特征
rules.append(f"特征{i}对分类很重要,重要性为{importance:.4f}")
return rules
使用示例:
# 创建归纳推理引擎
inductive = InductiveReasoning()
# 准备训练数据
# 每个例子是[大小, 重量, 颜色编码]
examples = [
[2, 200, 1], # 小,轻,红色 -> 苹果
[3, 300, 1], # 中,中,红色 -> 苹果
[2, 150, 2], # 小,轻,橙色 -> 橘子
[4, 150, 2], # 中,轻,橙色 -> 橘子
[8, 1000, 3], # 大,重,黄色 -> 香蕉
[9, 900, 3] # 大,重,黄色 -> 香蕉
]
labels = ["apple", "apple", "orange", "orange", "banana", "banana"]
# 从例子中学习
inductive.learn_from_examples(examples, labels)
# 推断新实例
new_fruit = [2, 180, 1] # 小,轻,红色
prediction, confidence = inductive.infer_pattern(new_fruit)
print(f"推断结果: {prediction},置信度: {confidence:.4f}")
# 提取学到的规则
rules = inductive.extract_rules()
print("提取的规则:")
for rule in rules:
print(f"- {rule}")
溯因推理
从结果推断可能的原因:
class AbductiveReasoning:
def __init__(self):
# 存储因果关系
self.causal_relations = {}
# 存储先验概率
self.priors = {}
def add_causal_relation(self, cause, effect, likelihood):
"""添加因果关系
参数:
cause: 原因
effect: 结果
likelihood: P(effect|cause) 原因导致结果的概率
"""
if effect not in self.causal_relations:
self.causal_relations[effect] = []
self.causal_relations[effect].append({
"cause": cause,
"likelihood": likelihood
})
def set_prior(self, cause, probability):
"""设置原因的先验概率"""
self.priors[cause] = probability
def reason(self, observed_effects):
"""基于观察结果推断最可能的原因"""
if not observed_effects:
return []
# 计算每个原因的后验概率
posterior_probs = {}
# 收集所有可能的原因
all_causes = set()
for effect in observed_effects:
if effect in self.causal_relations:
for relation in self.causal_relations[effect]:
all_causes.add(relation["cause"])
# 对每个原因计算后验概率
for cause in all_causes:
# 获取先验概率,默认为0.5
prior = self.priors.get(cause, 0.5)
# 计算似然率
likelihood = 1.0
for effect in observed_effects:
effect_likelihood = 0.01 # 很小的默认值
if effect in self.causal_relations:
for relation in self.causal_relations[effect]:
if relation["cause"] == cause:
effect_likelihood = relation["likelihood"]
break
likelihood *= effect_likelihood
# 简化的后验计算(非标准贝叶斯公式)
posterior = prior * likelihood
posterior_probs[cause] = posterior
# 对原因按后验概率排序
sorted_causes = sorted(posterior_probs.items(), key=lambda x: x[1], reverse=True)
return sorted_causes
使用示例:
# 创建溯因推理引擎
abductive = AbductiveReasoning()
# 添加医疗诊断的因果关系
# 疾病 -> 症状,概率
abductive.add_causal_relation("感冒", "发热", 0.8)
abductive.add_causal_relation("感冒", "咳嗽", 0.9)
abductive.add_causal_relation("感冒", "流鼻涕", 0.95)
abductive.add_causal_relation("流感", "发热", 0.95)
abductive.add_causal_relation("流感", "咳嗽", 0.8)
abductive.add_causal_relation("流感", "肌肉疼痛", 0.7)
abductive.add_causal_relation("新冠", "发热", 0.9)
abductive.add_causal_relation("新冠", "咳嗽", 0.85)
abductive.add_causal_relation("新冠", "呼吸困难", 0.4)
abductive.add_causal_relation("新冠", "味觉丧失", 0.6)
# 设置先验概率(疾病发生率)
abductive.set_prior("感冒", 0.1) # 感冒相对常见
abductive.set_prior("流感", 0.05) # 流感次之
abductive.set_prior("新冠", 0.01) # 新冠较少见
# 观察症状
observed_symptoms = ["发热", "咳嗽", "味觉丧失"]
# 推断可能的疾病
likely_diseases = abductive.reason(observed_symptoms)
print("可能的诊断结果:")
for disease, probability in likely_diseases:
print(f"- {disease}: 概率 {probability:.4f}")
4.1.2 符号推理与概率推理
基于规则的符号推理
class RuleBasedSystem:
def __init__(self):
self.facts = set()
self.rules = []
def add_fact(self, fact):
"""添加事实"""
self.facts.add(fact)
def add_rule(self, conditions, conclusion):
"""添加规则"""
self.rules.append({
"conditions": conditions,
"conclusion": conclusion
})
def infer(self, max_iterations=100):
"""进行推理,返回推导出的新事实"""
new_facts = set()
iterations = 0
while iterations < max_iterations:
iteration_new_facts = set()
for rule in self.rules:
# 检查所有条件是否满足
if all(cond in self.facts for cond in rule["conditions"]):
# 如果结论还不是已知事实,则添加
if rule["conclusion"] not in self.facts:
iteration_new_facts.add(rule["conclusion"])
# 如果没有新事实被添加,则结束推理
if not iteration_new_facts:
break
# 添加新推导出的事实
for fact in iteration_new_facts:
self.facts.add(fact)
new_facts.add(fact)
iterations += 1
return new_facts
贝叶斯网络的概率推理
import numpy as np
class BayesianNode:
def __init__(self, name, states):
self.name = name
self.states = states
self.parents = []
self.children = []
self.cpt = None # 条件概率表
def set_parents(self, parents):
"""设置父节点"""
self.parents = parents
# 为父节点添加当前节点为子节点
for parent in parents:
if self not in parent.children:
parent.children.append(self)
def set_cpt(self, cpt):
"""设置条件概率表"""
self.cpt = cpt
class BayesianNetwork:
def __init__(self):
self.nodes = {}
def add_node(self, name, states):
"""添加节点"""
node = BayesianNode(name, states)
self.nodes[name] = node
return node
def add_edge(self, parent_name, child_name):
"""添加边(因果关系)"""
if parent_name not in self.nodes or child_name not in self.nodes:
raise ValueError("父节点或子节点不存在")
parent = self.nodes[parent_name]
child = self.nodes[child_name]
# 设置父子关系
if parent not in child.parents:
child.parents.append(parent)
if child not in parent.children:
parent.children.append(child)
def query(self, query_var, evidence):
"""查询变量的后验概率
参数:
query_var: 查询变量名
evidence: 证据变量及其观察值的字典
"""
# 简化实现,使用近似推理
# 实际应用中可使用变量消除或采样等方法
# 执行简化的吉布斯采样
samples = self._gibbs_sampling(evidence, num_samples=1000)
# 计算查询变量的后验分布
query_node = self.nodes[query_var]
counts = {state: 0 for state in query_node.states}
for sample in samples:
state = sample[query_var]
counts[state] += 1
# 归一化
total = sum(counts.values())
distribution = {state: count/total for state, count in counts.items()}
return distribution
def _gibbs_sampling(self, evidence, num_samples=1000, burn_in=100):
"""吉布斯采样实现"""
# 初始化样本
current_state = self._initialize_state(evidence)
samples = []
# 生成样本
for i in range(num_samples + burn_in):
# 对每个非证据变量进行采样
for node_name, node in self.nodes.items():
if node_name not in evidence:
# 计算条件概率
probs = self._compute_conditional_probability(node, current_state)
# 采样新状态
current_state[node_name] = np.random.choice(node.states, p=probs)
# 忽略预热期
if i >= burn_in:
samples.append(current_state.copy())
return samples
def _initialize_state(self, evidence):
"""初始化采样状态"""
state = {}
# 设置证据变量
for var, val in evidence.items():
state[var] = val
# 随机初始化非证据变量
for node_name, node in self.nodes.items():
if node_name not in evidence:
state[node_name] = np.random.choice(node.states)
return state
def _compute_conditional_probability(self, node, state):
"""计算节点的条件概率"""
# 简化实现,实际应用中需要考虑各种情况
# 如果节点没有父节点,使用先验概率
if not node.parents:
return node.cpt
# 获取父节点状态组合的索引
parent_states = tuple(state[parent.name] for parent in node.parents)
# 返回对应的条件概率分布
if parent_states in node.cpt:
return node.cpt[parent_states]
else:
# 默认均匀分布
return [1.0/len(node.states)] * len(node.states)
4.1.3 基于神经网络的深度推理
深度神经网络可用于各种推理任务,特别是在处理非结构化数据时:
import torch
import torch.nn as nn
import torch.optim as optim
class DeepReasoningNetwork(nn.Module):
def __init__(self, input_size, hidden_size, output_size):
super(DeepReasoningNetwork, self).__init__()
# 模型架构
self.model = nn.Sequential(
nn.Linear(input_size, hidden_size),
nn.ReLU(),
nn.Dropout(0.2),
nn.Linear(hidden_size, hidden_size),
nn.ReLU(),
nn.Dropout(0.2),
nn.Linear(hidden_size, output_size),
nn.Softmax(dim=1)
)
def forward(self, x):
return self.model(x)
class DeepReasoning:
def __init__(self, input_size, hidden_size, output_size, learning_rate=0.001):
# 创建模型
self.model = DeepReasoningNetwork(input_size, hidden_size, output_size)
# 损失函数和优化器
self.criterion = nn.CrossEntropyLoss()
self.optimizer = optim.Adam(self.model.parameters(), lr=learning_rate)
# 输出标签映射
self.output_labels = None
def train(self, inputs, labels, epochs=100, batch_size=32):
"""训练模型"""
# 转换为PyTorch张量
inputs_tensor = torch.FloatTensor(inputs)
labels_tensor = torch.LongTensor(labels)
# 训练循环
for epoch in range(epochs):
# 随机打乱数据
indices = torch.randperm(len(inputs))
total_loss = 0
batch_count = 0
# 批量训练
for i in range(0, len(inputs), batch_size):
# 获取批次索引
batch_indices = indices[i:i+batch_size]
# 准备批次数据
batch_inputs = inputs_tensor[batch_indices]
batch_labels = labels_tensor[batch_indices]
# 前向传播
outputs = self.model(batch_inputs)
loss = self.criterion(outputs, batch_labels)
# 反向传播和优化
self.optimizer.zero_grad()
loss.backward()
self.optimizer.step()
total_loss += loss.item()
batch_count += 1
# 打印训练进度
if (epoch + 1) % 10 == 0:
print(f'Epoch [{epoch+1}/{epochs}], Loss: {total_loss/batch_count:.4f}')
def reason(self, inputs):
"""使用训练好的模型进行推理"""
# 转换为PyTorch张量
inputs_tensor = torch.FloatTensor(inputs)
# 使用模型推理
with torch.no_grad():
outputs = self.model(inputs_tensor)
# 获取最可能的类别及其概率
probabilities, predicted = torch.max(outputs, 1)
# 如果有标签映射,转换为标签
if self.output_labels is not None:
predictions = [self.output_labels[p.item()] for p in predicted]
else:
predictions = predicted.numpy()
# 返回预测结果和置信度
return predictions, probabilities.numpy()
def set_output_labels(self, labels):
"""设置输出标签映射"""
self.output_labels = labels
4.1.4 混合推理系统
组合多种推理方法可以创建更强大的推理系统:
class HybridReasoningSystem:
def __init__(self):
# 初始化各种推理引擎
self.rule_based = RuleBasedSystem()
self.bayesian = BayesianNetwork()
self.deep_reasoning = None # 延迟初始化
# 推理结果置信度阈值
self.confidence_threshold = 0.7
def configure_deep_reasoning(self, input_size, hidden_size, output_size):
"""配置深度推理模块"""
self.deep_reasoning = DeepReasoning(input_size, hidden_size, output_size)
def add_knowledge(self, knowledge_type, *args, **kwargs):
"""添加知识到相应的推理引擎"""
if knowledge_type == "fact":
self.rule_based.add_fact(*args, **kwargs)
elif knowledge_type == "rule":
self.rule_based.add_rule(*args, **kwargs)
elif knowledge_type == "bayes_node":
return self.bayesian.add_node(*args, **kwargs)
elif knowledge_type == "bayes_edge":
self.bayesian.add_edge(*args, **kwargs)
else:
raise ValueError(f"不支持的知识类型: {knowledge_type}")
def reason(self, input_data, reasoning_type=None):
"""进行推理
参数:
input_data: 输入数据
reasoning_type: 推理类型,如果为None则自动选择
"""
results = {}
# 根据输入数据和任务选择最合适的推理方法
if reasoning_type is None:
reasoning_type = self._select_reasoning_type(input_data)
# 执行指定的推理
if reasoning_type == "rule":
# 使用规则推理
new_facts = self.rule_based.infer()
results["conclusions"] = list(new_facts)
results["confidence"] = 1.0 # 规则推理确定性为1
elif reasoning_type == "bayes":
# 检查输入是否包含查询和证据
if "query" not in input_data or "evidence" not in input_data:
raise ValueError("贝叶斯推理需要查询变量和证据")
# 贝叶斯推理
query_var = input_data["query"]
evidence = input_data["evidence"]
distribution = self.bayesian.query(query_var, evidence)
# 找出最可能的状态
most_likely_state = max(distribution.items(), key=lambda x: x[1])
results["distribution"] = distribution
results["most_likely"] = most_likely_state[0]
results["confidence"] = most_likely_state[1]
elif reasoning_type == "deep":
# 检查深度推理模型是否初始化
if self.deep_reasoning is None:
raise RuntimeError("深度推理模型尚未配置")
# 检查输入格式
if not isinstance(input_data, list):
raise ValueError("深度推理需要特征向量列表")
# 深度推理
predictions, confidences = self.deep_reasoning.reason(input_data)
results["predictions"] = predictions
results["confidences"] = confidences
else:
raise ValueError(f"不支持的推理类型: {reasoning_type}")
return results
def _select_reasoning_type(self, input_data):
"""自动选择最合适的推理类型"""
# 简化的选择逻辑
if isinstance(input_data, dict) and "query" in input_data and "evidence" in input_data:
return "bayes"
elif isinstance(input_data, list) and all(isinstance(x, (list, tuple, np.ndarray)) for x in input_data):
return "deep"
else:
return "rule"
4.2 执行模块:智能体的"行动系统"
执行模块负责将智能体的决策转化为具体行动,与环境进行交互。
4.2.1 执行模块的基本架构
class ExecutionModule:
def __init__(self):
# 注册可用的行动
self.available_actions = {}
# 执行历史
self.execution_history = []
# 行动监控标志
self.monitor_enabled = True
def register_action(self, action_name, action_func, required_args=None):
"""注册一个可执行的行动"""
if required_args is None:
required_args = []
self.available_actions[action_name] = {
"function": action_func,
"required_args": required_args
}
def execute(self, action_name, **kwargs):
"""执行指定的行动"""
# 检查行动是否存在
if action_name not in self.available_actions:
raise ValueError(f"未知行动: {action_name}")
action_info = self.available_actions[action_name]
# 检查必要参数
for arg in action_info["required_args"]:
if arg not in kwargs:
raise ValueError(f"执行{action_name}缺少必要参数: {arg}")
# 记录执行前的时间
start_time = time.time()
try:
# 执行行动
result = action_info["function"](**kwargs)
success = True
error = None
except Exception as e:
# 捕获执行错误
result = None
success = False
error = str(e)
# 记录执行后的时间
end_time = time.time()
execution_time = end_time - start_time
# 记录执行历史
execution_record = {
"action": action_name,
"parameters": kwargs,
"timestamp": start_time,
"execution_time": execution_time,
"success": success,
"result": result,
"error": error
}
self.execution_history.append(execution_record)
# 如果启用监控,检查执行结果
if self.monitor_enabled:
self._monitor_execution(execution_record)
# 如果执行失败,抛出异常
if not success:
raise RuntimeError(f"执行{action_name}失败: {error}")
return result
def get_history(self, n=None):
"""获取执行历史"""
if n is None:
return self.execution_history
return self.execution_history[-n:]
def clear_history(self):
"""清空执行历史"""
self.execution_history = []
def _monitor_execution(self, execution_record):
"""监控执行情况"""
# 检查执行时间
if execution_record["execution_time"] > 5.0:
print(f"警告: 行动{execution_record['action']}执行时间过长: {execution_record['execution_time']:.2f}秒")
# 检查执行成功率
recent_records = self.execution_history[-10:]
success_rate = sum(1 for r in recent_records if r["success"]) / len(recent_records)
if success_rate < 0.7:
print(f"警告: 最近行动成功率低: {success_rate:.2f}")
4.2.2 常见执行操作类型
API调用执行器
import requests
class APIExecutor:
def __init__(self, base_url, auth_token=None, timeout=10):
self.base_url = base_url
self.auth_token = auth_token
self.timeout = timeout
def make_request(self, endpoint, method="GET", params=None, data=None, headers=None):
"""发送API请求"""
# 构建完整URL
url = f"{self.base_url}/{endpoint}"
# 准备请求头
request_headers = {}
if self.auth_token:
request_headers["Authorization"] = f"Bearer {self.auth_token}"
if headers:
request_headers.update(headers)
# 发送请求
try:
if method.upper() == "GET":
response = requests.get(url, params=params, headers=request_headers, timeout=self.timeout)
elif method.upper() == "POST":
response = requests.post(url, params=params, json=data, headers=request_headers, timeout=self.timeout)
elif method.upper() == "PUT":
response = requests.put(url, params=params, json=data, headers=request_headers, timeout=self.timeout)
elif method.upper() == "DELETE":
response = requests.delete(url, params=params, headers=request_headers, timeout=self.timeout)
else:
raise ValueError(f"不支持的HTTP方法: {method}")
# 检查响应状态
response.raise_for_status()
# 解析响应数据
if response.headers.get("content-type") == "application/json":
result = response.json()
else:
result = response.text
return {
"status_code": response.status_code,
"headers": dict(response.headers),
"data": result
}
except requests.exceptions.RequestException as e:
# 处理请求异常
raise RuntimeError(f"API请求失败: {str(e)}")
数据库操作执行器
import sqlite3
class DatabaseExecutor:
def __init__(self, db_path):
self.db_path = db_path
self.connection = None
self.cursor = None
def connect(self):
"""连接到数据库"""
try:
self.connection = sqlite3.connect(self.db_path)
self.cursor = self.connection.cursor()
return True
except sqlite3.Error as e:
raise RuntimeError(f"数据库连接失败: {str(e)}")
def disconnect(self):
"""断开数据库连接"""
if self.connection:
self.connection.close()
self.connection = None
self.cursor = None
def execute_query(self, query, params=None):
"""执行查询语句"""
if not self.connection:
self.connect()
try:
if params:
self.cursor.execute(query, params)
else:
self.cursor.execute(query)
# 获取列名
if self.cursor.description:
columns = [desc[0] for desc in self.cursor.description]
# 获取结果
results = []
for row in self.cursor.fetchall():
results.append(dict(zip(columns, row)))
return results
return None
except sqlite3.Error as e:
raise RuntimeError(f"查询执行失败: {str(e)}")
def execute_update(self, query, params=None):
"""执行更新语句"""
if not self.connection:
self.connect()
try:
if params:
self.cursor.execute(query, params)
else:
self.cursor.execute(query)
# 提交更改
self.connection.commit()
# 返回影响的行数
return self.cursor.rowcount
except sqlite3.Error as e:
# 回滚更改
if self.connection:
self.connection.rollback()
raise RuntimeError(f"更新执行失败: {str(e)}")
def execute_transaction(self, queries):
"""执行事务"""
if not self.connection:
self.connect()
try:
# 开始事务
self.connection.execute("BEGIN TRANSACTION")
results = []
# 执行所有查询
for query, params in queries:
if params:
self.cursor.execute(query, params)
else:
self.cursor.execute(query)
# 如果是查询语句,获取结果
if query.strip().upper().startswith("SELECT"):
columns = [desc[0] for desc in self.cursor.description]
query_results = []
for row in self.cursor.fetchall():
query_results.append(dict(zip(columns, row)))
results.append(query_results)
else:
results.append({"rowcount": self.cursor.rowcount})
# 提交事务
self.connection.commit()
return results
except sqlite3.Error as e:
# 回滚事务
if self.connection:
self.connection.rollback()
raise RuntimeError(f"事务执行失败: {str(e)}")
文件操作执行器
import os
import json
import csv
class FileExecutor:
def __init__(self, base_directory="."):
self.base_directory = base_directory
def read_text_file(self, file_path, encoding="utf-8"):
"""读取文本文件"""
full_path = os.path.join(self.base_directory, file_path)
try:
with open(full_path, "r", encoding=encoding) as file:
content = file.read()
return content
except Exception as e:
raise RuntimeError(f"读取文件失败: {str(e)}")
def write_text_file(self, file_path, content, encoding="utf-8", mode="w"):
"""写入文本文件"""
full_path = os.path.join(self.base_directory, file_path)
# 确保目录存在
os.makedirs(os.path.dirname(full_path), exist_ok=True)
try:
with open(full_path, mode, encoding=encoding) as file:
file.write(content)
return {"success": True, "path": full_path}
except Exception as e:
raise RuntimeError(f"写入文件失败: {str(e)}")
def read_json_file(self, file_path, encoding="utf-8"):
"""读取JSON文件"""
content = self.read_text_file(file_path, encoding)
try:
return json.loads(content)
except json.JSONDecodeError as e:
raise RuntimeError(f"JSON解析失败: {str(e)}")
def write_json_file(self, file_path, data, encoding="utf-8", indent=2):
"""写入JSON文件"""
try:
content = json.dumps(data, ensure_ascii=False, indent=indent)
return self.write_text_file(file_path, content, encoding)
except Exception as e:
raise RuntimeError(f"JSON写入失败: {str(e)}")
def read_csv_file(self, file_path, encoding="utf-8", delimiter=","):
"""读取CSV文件"""
full_path = os.path.join(self.base_directory, file_path)
try:
rows = []
with open(full_path, "r", encoding=encoding, newline="") as file:
reader = csv.DictReader(file, delimiter=delimiter)
for row in reader:
rows.append(dict(row))
return rows
except Exception as e:
raise RuntimeError(f"CSV读取失败: {str(e)}")
def write_csv_file(self, file_path, data, encoding="utf-8", delimiter=","):
"""写入CSV文件"""
if not data:
raise ValueError("没有数据可写入CSV")
full_path = os.path.join(self.base_directory, file_path)
# 确保目录存在
os.makedirs(os.path.dirname(full_path), exist_ok=True)
try:
with open(full_path, "w", encoding=encoding, newline="") as file:
# 获取列名
fieldnames = data[0].keys()
writer = csv.DictWriter(file, fieldnames=fieldnames, delimiter=delimiter)
# 写入表头
writer.writeheader()
# 写入数据
writer.writerows(data)
return {"success": True, "path": full_path, "rows": len(data)}
except Exception as e:
raise RuntimeError(f"CSV写入失败: {str(e)}")
4.2.3 执行监控与错误处理
class ExecutionMonitor:
def __init__(self, execution_module):
self.execution_module = execution_module
self.error_counts = {}
self.performance_metrics = {
"total_executions": 0,
"successful_executions": 0,
"failed_executions": 0,
"total_execution_time": 0
}
def start_monitoring(self):
"""开始监控执行模块"""
self.execution_module.monitor_enabled = True
# 清空现有指标
self.error_counts = {}
self.performance_metrics = {
"total_executions": 0,
"successful_executions": 0,
"failed_executions": 0,
"total_execution_time": 0
}
def stop_monitoring(self):
"""停止监控"""
self.execution_module.monitor_enabled = False
def update_metrics(self, execution_record):
"""更新性能指标"""
# 更新总执行次数
self.performance_metrics["total_executions"] += 1
# 更新执行时间
self.performance_metrics["total_execution_time"] += execution_record["execution_time"]
# 更新成功/失败计数
if execution_record["success"]:
self.performance_metrics["successful_executions"] += 1
else:
self.performance_metrics["failed_executions"] += 1
# 更新错误计数
error_type = execution_record["error"] or "未知错误"
if error_type not in self.error_counts:
self.error_counts[error_type] = 0
self.error_counts[error_type] += 1
def get_report(self):
"""获取监控报告"""
# 计算平均执行时间
avg_execution_time = 0
if self.performance_metrics["total_executions"] > 0:
avg_execution_time = (
self.performance_metrics["total_execution_time"] /
self.performance_metrics["total_executions"]
)
# 计算成功率
success_rate = 0
if self.performance_metrics["total_executions"] > 0:
success_rate = (
self.performance_metrics["successful_executions"] /
self.performance_metrics["total_executions"]
)
# 组装报告
report = {
"total_executions": self.performance_metrics["total_executions"],
"successful_executions": self.performance_metrics["successful_executions"],
"failed_executions": self.performance_metrics["failed_executions"],
"success_rate": success_rate,
"avg_execution_time": avg_execution_time,
"top_errors": sorted(
self.error_counts.items(),
key=lambda x: x[1],
reverse=True
)[:5] # 前5个最常见错误
}
return report
def handle_error(self, execution_record, max_retries=3):
"""处理执行错误"""
action = execution_record["action"]
parameters = execution_record["parameters"]
error = execution_record["error"]
# 检查是否可以重试
if self._is_retryable_error(error):
# 获取此行动的历史失败次数
action_history = [
r for r in self.execution_module.execution_history
if r["action"] == action and not r["success"]
]
retry_count = len(action_history)
# 如果未超过最大重试次数,则重试
if retry_count < max_retries:
print(f"重试行动 {action} (第 {retry_count + 1}/{max_retries} 次)")
# 添加重试标记
parameters["_retry_count"] = retry_count + 1
# 重试执行
try:
return self.execution_module.execute(action, **parameters)
except Exception as e:
print(f"重试失败: {str(e)}")
# 不能重试或重试失败,返回备用响应
return self._get_fallback_response(action, error)
def _is_retryable_error(self, error):
"""判断错误是否可重试"""
# 简化实现,实际应用需要更详细的错误分类
non_retryable_errors = [
"权限拒绝",
"资源不存在",
"参数无效",
"语法错误"
]
for non_retryable in non_retryable_errors:
if non_retryable in error:
return False
return True
def _get_fallback_response(self, action, error):
"""获取备用响应"""
# 根据行动类型返回不同的备用响应
if "query" in action.lower():
return {"error": error, "fallback": "无法检索数据,请稍后重试"}
elif "create" in action.lower() or "update" in action.lower() or "delete" in action.lower():
return {"error": error, "fallback": "操作失败,请检查输入并重试"}
else:
return {"error": error, "fallback": "行动执行失败,请联系管理员"}
4.3 整合推理引擎和执行模块
现在,我们将推理引擎和执行模块整合起来,构建一个功能完整的智能体核心:
class AgentCore:
def __init__(self):
# 初始化推理引擎
self.reasoning = HybridReasoningSystem()
# 初始化执行模块
self.execution = ExecutionModule()
# 初始化执行监控器
self.monitor = ExecutionMonitor(self.execution)
# 初始化各种执行器
self.api_executor = None
self.db_executor = None
self.file_executor = None
# 注册基本行动
self._register_basic_actions()
def configure_api_executor(self, base_url, auth_token=None):
"""配置API执行器"""
self.api_executor = APIExecutor(base_url, auth_token)
self._register_api_actions()
def configure_db_executor(self, db_path):
"""配置数据库执行器"""
self.db_executor = DatabaseExecutor(db_path)
self._register_db_actions()
def configure_file_executor(self, base_directory="."):
"""配置文件执行器"""
self.file_executor = FileExecutor(base_directory)
self._register_file_actions()
def _register_basic_actions(self):
"""注册基本行动"""
# 打印消息
self.execution.register_action(
"print_message",
lambda message: print(message),
["message"]
)
# 等待指定时间
self.execution.register_action(
"wait",
lambda seconds: time.sleep(seconds),
["seconds"]
)
def _register_api_actions(self):
"""注册API相关行动"""
if not self.api_executor:
return
# GET请求
self.execution.register_action(
"api_get",
lambda endpoint, params=None, headers=None:
self.api_executor.make_request(endpoint, "GET", params, None, headers),
["endpoint"]
)
# POST请求
self.execution.register_action(
"api_post",
lambda endpoint, data=None, params=None, headers=None:
self.api_executor.make_request(endpoint, "POST", params, data, headers),
["endpoint"]
)
# PUT请求
self.execution.register_action(
"api_put",
lambda endpoint, data=None, params=None, headers=None:
self.api_executor.make_request(endpoint, "PUT", params, data, headers),
["endpoint"]
)
# DELETE请求
self.execution.register_action(
"api_delete",
lambda endpoint, params=None, headers=None:
self.api_executor.make_request(endpoint, "DELETE", params, None, headers),
["endpoint"]
)
def _register_db_actions(self):
"""注册数据库相关行动"""
if not self.db_executor:
return
# 查询
self.execution.register_action(
"db_query",
lambda query, params=None: self.db_executor.execute_query(query, params),
["query"]
)
# 更新
self.execution.register_action(
"db_update",
lambda query, params=None: self.db_executor.execute_update(query, params),
["query"]
)
# 事务
self.execution.register_action(
"db_transaction",
lambda queries: self.db_executor.execute_transaction(queries),
["queries"]
)
def _register_file_actions(self):
"""注册文件相关行动"""
if not self.file_executor:
return
# 读取文本文件
self.execution.register_action(
"read_text_file",
lambda file_path, encoding="utf-8":
self.file_executor.read_text_file(file_path, encoding),
["file_path"]
)
# 写入文本文件
self.execution.register_action(
"write_text_file",
lambda file_path, content, encoding="utf-8", mode="w":
self.file_executor.write_text_file(file_path, content, encoding, mode),
["file_path", "content"]
)
# 读取JSON文件
self.execution.register_action(
"read_json_file",
lambda file_path, encoding="utf-8":
self.file_executor.read_json_file(file_path, encoding),
["file_path"]
)
# 写入JSON文件
self.execution.register_action(
"write_json_file",
lambda file_path, data, encoding="utf-8", indent=2:
self.file_executor.write_json_file(file_path, data, encoding, indent),
["file_path", "data"]
)
# 读取CSV文件
self.execution.register_action(
"read_csv_file",
lambda file_path, encoding="utf-8", delimiter=",":
self.file_executor.read_csv_file(file_path, encoding, delimiter),
["file_path"]
)
# 写入CSV文件
self.execution.register_action(
"write_csv_file",
lambda file_path, data, encoding="utf-8", delimiter=",":
self.file_executor.write_csv_file(file_path, data, encoding, delimiter),
["file_path", "data"]
)
def process(self, input_data, execution_required=True):
"""处理输入,进行推理和执行"""
# 开始监控
self.monitor.start_monitoring()
try:
# 1. 进行推理
reasoning_result = self.reasoning.reason(input_data)
# 如果不需要执行,直接返回推理结果
if not execution_required:
return {
"reasoning": reasoning_result,
"execution": None,
"status": "reasoning_only"
}
# 2. 根据推理结果确定执行行动
action_plan = self._create_action_plan(reasoning_result)
# 3. 执行行动
execution_results = []
for action in action_plan:
try:
result = self.execution.execute(
action["name"],
**action["parameters"]
)
execution_results.append({
"action": action["name"],
"success": True,
"result": result
})
except Exception as e:
# 处理执行错误
error_record = self.execution.execution_history[-1]
fallback = self.monitor.handle_error(error_record)
execution_results.append({
"action": action["name"],
"success": False,
"error": str(e),
"fallback": fallback
})
# 4. 返回完整结果
return {
"reasoning": reasoning_result,
"execution": execution_results,
"status": "completed",
"metrics": self.monitor.get_report()
}
finally:
# 停止监控
self.monitor.stop_monitoring()
def _create_action_plan(self, reasoning_result):
"""基于推理结果创建行动计划"""
# 简化实现,根据推理类型创建不同的行动
action_plan = []
# 根据推理类型处理
if "conclusions" in reasoning_result:
# 规则推理结果
for conclusion in reasoning_result["conclusions"]:
# 创建对应的行动
if isinstance(conclusion, dict) and "action" in conclusion:
action_plan.append({
"name": conclusion["action"],
"parameters": conclusion.get("parameters", {})
})
else:
# 默认行动:打印结论
action_plan.append({
"name": "print_message",
"parameters": {"message": f"结论: {conclusion}"}
})
elif "most_likely" in reasoning_result:
# 贝叶斯推理结果
state = reasoning_result["most_likely"]
confidence = reasoning_result["confidence"]
action_name = f"handle_{state}"
if action_name in self.execution.available_actions:
action_plan.append({
"name": action_name,
"parameters": {"confidence": confidence}
})
else:
# 默认行动:打印最可能状态
action_plan.append({
"name": "print_message",
"parameters": {
"message": f"最可能状态: {state}, 置信度: {confidence:.4f}"
}
})
elif "predictions" in reasoning_result:
# 深度推理结果
for i, (prediction, confidence) in enumerate(
zip(reasoning_result["predictions"], reasoning_result["confidences"])
):
# 只执行高置信度的预测
if confidence > 0.7:
action_name = f"execute_prediction_{prediction}"
if action_name in self.execution.available_actions:
action_plan.append({
"name": action_name,
"parameters": {"confidence": float(confidence)}
})
else:
# 默认行动:打印预测
action_plan.append({
"name": "print_message",
"parameters": {
"message": f"预测 {i+1}: {prediction}, 置信度: {float(confidence):.4f}"
}
})
return action_plan
4.4 实例:构建信息检索和分析智能体
下面我们将构建一个具备推理和执行能力的信息检索与分析智能体,展示如何应用本章所学概念:
import os
import time
import json
import requests
import pandas as pd
import matplotlib.pyplot as plt
from datetime import datetime
class InfoRetrievalAgent:
def __init__(self):
# 初始化代理核心
self.core = AgentCore()
# 配置执行器
self.core.configure_file_executor("./data")
# 添加自定义行动
self._register_custom_actions()
# 配置推理系统
self._configure_reasoning()
def _register_custom_actions(self):
"""注册自定义行动"""
# 数据分析
self.core.execution.register_action(
"analyze_data",
self._action_analyze_data,
["data", "analysis_type"]
)
# 可视化
self.core.execution.register_action(
"visualize_data",
self._action_visualize_data,
["data", "visualization_type", "output_path"]
)
# 信息提取
self.core.execution.register_action(
"extract_information",
self._action_extract_information,
["text", "extraction_type"]
)
# 结果总结
self.core.execution.register_action(
"summarize_results",
self._action_summarize_results,
["results", "summary_type"]
)
def _configure_reasoning(self):
"""配置推理系统"""
# 添加推理规则
self.core.reasoning.add_knowledge("rule",
[{"request_type": "data_analysis"}],
{"action": "analyze_data", "parameters": {"analysis_type": "statistical"}}
)
self.core.reasoning.add_knowledge("rule",
[{"request_type": "visualization"}],
{"action": "visualize_data", "parameters": {"visualization_type": "bar_chart"}}
)
self.core.reasoning.add_knowledge("rule",
[{"request_type": "information_extraction"}],
{"action": "extract_information", "parameters": {"extraction_type": "entities"}}
)
self.core.reasoning.add_knowledge("rule",
[{"request_type": "summarization"}],
{"action": "summarize_results", "parameters": {"summary_type": "brief"}}
)
# 添加更复杂的规则
self.core.reasoning.add_knowledge("rule",
lambda facts: "data_source" in facts and facts["data_source"].endswith(".csv"),
{"action": "read_csv_file", "parameters": {"file_path": lambda facts: facts["data_source"]}}
)
self.core.reasoning.add_knowledge("rule",
lambda facts: "data_source" in facts and facts["data_source"].endswith(".json"),
{"action": "read_json_file", "parameters": {"file_path": lambda facts: facts["data_source"]}}
)
def process_request(self, request):
"""处理用户请求"""
# 解析请求
try:
if isinstance(request, str):
# 如果是字符串,解析为字典
request_facts = self._parse_text_request(request)
else:
# 已经是字典格式
request_facts = request
# 处理请求
result = self.core.process(request_facts)
return {
"success": True,
"result": result,
"timestamp": datetime.now().isoformat()
}
except Exception as e:
return {
"success": False,
"error": str(e),
"timestamp": datetime.now().isoformat()
}
def _parse_text_request(self, text):
"""解析文本请求为结构化格式"""
request_facts = {"original_text": text}
# 识别请求类型
if "分析" in text or "统计" in text or "计算" in text:
request_facts["request_type"] = "data_analysis"
elif "可视化" in text or "图表" in text or "绘制" in text:
request_facts["request_type"] = "visualization"
elif "提取" in text or "找出" in text or "识别" in text:
request_facts["request_type"] = "information_extraction"
elif "总结" in text or "概括" in text or "汇总" in text:
request_facts["request_type"] = "summarization"
# 识别数据源
if "文件" in text:
# 简单的文件名提取
words = text.split()
for word in words:
if word.endswith(".csv") or word.endswith(".json") or word.endswith(".txt"):
request_facts["data_source"] = word
break
return request_facts
# 行动实现
def _action_analyze_data(self, data, analysis_type):
"""数据分析行动"""
if isinstance(data, str):
# 如果是文件路径,先读取数据
if data.endswith(".csv"):
data = self.core.execution.execute("read_csv_file", file_path=data)
elif data.endswith(".json"):
data = self.core.execution.execute("read_json_file", file_path=data)
# 转换为pandas DataFrame
if isinstance(data, list):
df = pd.DataFrame(data)
else:
df = pd.DataFrame([data])
# 根据分析类型执行不同分析
if analysis_type == "statistical":
# 统计分析
results = {
"row_count": len(df),
"column_count": len(df.columns),
"column_types": {col: str(dtype) for col, dtype in df.dtypes.items()},
"missing_values": df.isnull().sum().to_dict(),
}
# 数值列的描述性统计
numeric_columns = df.select_dtypes(include=["number"])
if not numeric_columns.empty:
results["statistics"] = {}
for col in numeric_columns.columns:
results["statistics"][col] = {
"mean": df[col].mean(),
"median": df[col].median(),
"std": df[col].std(),
"min": df[col].min(),
"max": df[col].max()
}
return results
elif analysis_type == "correlation":
# 相关性分析
numeric_columns = df.select_dtypes(include=["number"])
if numeric_columns.empty:
return {"error": "没有数值列可进行相关性分析"}
corr_matrix = numeric_columns.corr().to_dict()
return {"correlation_matrix": corr_matrix}
elif analysis_type == "groupby":
# 分组分析
if "group_column" not in data and len(df.columns) > 0:
group_column = df.columns[0] # 默认使用第一列
else:
group_column = data["group_column"]
if "agg_column" not in data and len(df.columns) > 1:
# 尝试找到数值列
numeric_cols = df.select_dtypes(include=["number"]).columns
if not numeric_cols.empty:
agg_column = numeric_cols[0]
else:
agg_column = df.columns[1] # 退化为使用第二列
else:
agg_column = data["agg_column"]
# 执行分组聚合
grouped = df.groupby(group_column)[agg_column].agg(['count', 'mean', 'sum']).reset_index()
return grouped.to_dict(orient="records")
else:
return {"error": f"不支持的分析类型: {analysis_type}"}
def _action_visualize_data(self, data, visualization_type, output_path):
"""数据可视化行动"""
if isinstance(data, str):
# 如果是文件路径,先读取数据
if data.endswith(".csv"):
data = self.core.execution.execute("read_csv_file", file_path=data)
elif data.endswith(".json"):
data = self.core.execution.execute("read_json_file", file_path=data)
# 转换为pandas DataFrame
if isinstance(data, list):
df = pd.DataFrame(data)
else:
df = pd.DataFrame([data])
# 创建图表
plt.figure(figsize=(10, 6))
if visualization_type == "bar_chart":
# 条形图
if len(df.columns) < 2:
return {"error": "数据至少需要两列才能创建条形图"}
# 选择列
x_column = df.columns[0]
y_column = df.select_dtypes(include=["number"]).columns[0] if df.select_dtypes(include=["number"]).columns.any() else df.columns[1]
plt.bar(df[x_column], df[y_column])
plt.xlabel(x_column)
plt.ylabel(y_column)
plt.title(f"{y_column} by {x_column}")
plt.xticks(rotation=45)
elif visualization_type == "line_chart":
# 折线图
if len(df.columns) < 2:
return {"error": "数据至少需要两列才能创建折线图"}
# 选择列
x_column = df.columns[0]
y_column = df.select_dtypes(include=["number"]).columns[0] if df.select_dtypes(include=["number"]).columns.any() else df.columns[1]
plt.plot(df[x_column], df[y_column], marker='o')
plt.xlabel(x_column)
plt.ylabel(y_column)
plt.title(f"{y_column} Trend by {x_column}")
plt.xticks(rotation=45)
elif visualization_type == "pie_chart":
# 饼图
if len(df.columns) < 2:
return {"error": "数据至少需要两列才能创建饼图"}
# 选择列
label_column = df.columns[0]
value_column = df.select_dtypes(include=["number"]).columns[0] if df.select_dtypes(include=["number"]).columns.any() else df.columns[1]
plt.pie(df[value_column], labels=df[label_column], autopct='%1.1f%%')
plt.title(f"Distribution of {value_column}")
elif visualization_type == "scatter_plot":
# 散点图
if len(df.select_dtypes(include=["number"]).columns) < 2:
return {"error": "数据至少需要两个数值列才能创建散点图"}
# 选择列
numeric_columns = df.select_dtypes(include=["number"]).columns
x_column = numeric_columns[0]
y_column = numeric_columns[1]
plt.scatter(df[x_column], df[y_column])
plt.xlabel(x_column)
plt.ylabel(y_column)
plt.title(f"{y_column} vs {x_column}")
else:
return {"error": f"不支持的可视化类型: {visualization_type}"}
# 保存图表
plt.tight_layout()
plt.savefig(output_path)
plt.close()
return {
"visualization_type": visualization_type,
"output_path": output_path,
"success": True
}
def _action_extract_information(self, text, extraction_type):
"""信息提取行动"""
if isinstance(text, str) and text.endswith((".txt", ".csv", ".json")):
# 如果是文件路径,先读取内容
try:
text = self.core.execution.execute("read_text_file", file_path=text)
except:
pass
if extraction_type == "entities":
# 简单的实体提取(实际应用需要使用NLP库)
entities = {
"persons": [],
"organizations": [],
"locations": [],
"dates": []
}
# 简化实现,使用关键词匹配
common_orgs = ["公司", "集团", "企业", "部门", "机构", "学校", "大学", "协会"]
common_locs = ["市", "省", "区", "县", "路", "街", "国", "城"]
words = text.split()
for word in words:
# 日期检测
if word.count("-") == 2 and len(word) >= 8:
entities["dates"].append(word)
# 组织检测
for org in common_orgs:
if org in word and len(word) > len(org):
entities["organizations"].append(word)
break
# 地点检测
for loc in common_locs:
if word.endswith(loc) and len(word) > len(loc):
entities["locations"].append(word)
break
return {"entities": entities}
elif extraction_type == "keywords":
# 关键词提取
words = text.split()
# 简化实现,仅计数
word_counts = {}
for word in words:
if len(word) > 1: # 忽略单字符
if word not in word_counts:
word_counts[word] = 0
word_counts[word] += 1
# 按频率排序
sorted_words = sorted(word_counts.items(), key=lambda x: x[1], reverse=True)
keywords = [word for word, count in sorted_words[:10]]
return {"keywords": keywords}
elif extraction_type == "summary":
# 简单的提取式摘要
sentences = text.split("。")
if len(sentences) <= 3:
return {"summary": text}
# 简化实现,选择前三个句子
summary = "。".join(sentences[:3]) + "。"
return {"summary": summary}
else:
return {"error": f"不支持的提取类型: {extraction_type}"}
def _action_summarize_results(self, results, summary_type):
"""结果总结行动"""
if not results:
return {"error": "没有结果可总结"}
if summary_type == "brief":
# 简要总结
summary = {
"result_type": type(results).__name__,
"timestamp": datetime.now().isoformat()
}
if isinstance(results, dict):
summary["keys"] = list(results.keys())
summary["total_items"] = len(results)
elif isinstance(results, list):
summary["total_items"] = len(results)
if results and isinstance(results[0], dict):
summary["sample_keys"] = list(results[0].keys())
else:
summary["data"] = str(results)
return {"summary": summary}
elif summary_type == "detailed":
# 详细总结
summary = {
"result_type": type(results).__name__,
"timestamp": datetime.now().isoformat(),
"analysis": {}
}
if isinstance(results, dict):
summary["keys"] = list(results.keys())
summary["total_items"] = len(results)
# 分析每个键的值类型
for key, value in results.items():
summary["analysis"][key] = {
"type": type(value).__name__,
"sample": str(value)[:100] if isinstance(value, str) else value
}
elif isinstance(results, list):
summary["total_items"] = len(results)
if results:
first_item = results[0]
summary["first_item_type"] = type(first_item).__name__
if isinstance(first_item, dict):
summary["common_keys"] = list(first_item.keys())
# 检查是否所有项目都有相同的键
all_keys_same = all(set(item.keys()) == set(first_item.keys()) for item in results)
summary["all_items_same_structure"] = all_keys_same
return {"summary": summary}
else:
return {"error": f"不支持的总结类型: {summary_type}"}
使用示例
# 创建信息检索分析智能体
agent = InfoRetrievalAgent()
# 示例1:数据分析请求
result1 = agent.process_request("分析 sales_data.csv 文件")
print(json.dumps(result1, indent=2))
# 示例2:可视化请求
result2 = agent.process_request("为 sales_data.csv 创建销售趋势的折线图")
print(json.dumps(result2, indent=2))
# 示例3:信息提取请求
result3 = agent.process_request("从 news_articles.txt 中提取主要实体")
print(json.dumps(result3, indent=2))
# 示例4:结果总结请求
result4 = agent.process_request("总结 customer_feedback.json 的内容")
print(json.dumps(result4, indent=2))
4.5 总结与下一步
本章详细探讨了智能体的推理引擎和执行模块,这两个组件使智能体能够从信息分析到实际行动,完成复杂任务。我们学习了:
- 各种推理方法及其实现,包括演绎推理、归纳推理、溯因推理
- 符号推理和概率推理的区别与应用
- 基于神经网络的深度推理及其与传统推理的结合
- 执行模块的设计与实现,包括API调用、数据库操作和文件操作
- 执行监控与错误处理机制
- 如何整合推理引擎和执行模块构建完整的智能体核心
通过构建信息检索和分析智能体,我们展示了这些概念的实际应用。这种智能体能够:
- 分析不同格式的数据
- 创建各种类型的可视化
- 从文本中提取有用信息
- 对结果进行总结
在实际应用中,推理引擎和执行模块使智能体能够在复杂环境中自主运作,是构建高级智能体的关键组件。
至此,我们已经完成了AI智能体五大核心组件的学习。在下一章中,我们将探讨如何将这些组件整合成一个完整的智能体系统,并介绍智能体的高级应用场景。
No next page