Jupyter AI

4 智能体的推理引擎和执行模块

📅发表日期: 2025-03-12

🏷️分类: AI智能体Agent

👁️阅读次数: 0

4.1 推理引擎:智能体的"思考中枢"

推理引擎是智能体分析信息、形成判断和生成行动计划的核心组件。一个强大的推理引擎使智能体能够处理复杂问题、适应新情境并做出合理决策。

4.1.1 推理的基本类型

演绎推理

从一般原则推导出特定结论:

class DeductiveReasoning:
    def __init__(self):
        # 存储规则库
        self.rules = []
        
    def add_rule(self, condition, conclusion):
        """添加规则到规则库"""
        self.rules.append({
            "condition": condition,
            "conclusion": conclusion
        })
        
    def reason(self, facts):
        """基于事实和规则进行推理"""
        conclusions = []
        
        # 应用所有规则
        for rule in self.rules:
            # 检查条件是否满足
            if self._evaluate_condition(rule["condition"], facts):
                # 添加结论到结论列表
                conclusion = rule["conclusion"]
                conclusions.append(conclusion)
                
                # 将新结论添加到事实列表以支持链式推理
                if isinstance(conclusion, dict):
                    facts.update(conclusion)
                
        return conclusions
    
    def _evaluate_condition(self, condition, facts):
        """评估条件是否满足"""
        if callable(condition):
            # 函数形式的条件
            return condition(facts)
        elif isinstance(condition, dict):
            # 字典形式的条件
            for key, value in condition.items():
                if key not in facts or facts[key] != value:
                    return False
            return True
        else:
            # 不支持的条件类型
            raise ValueError(f"不支持的条件类型: {type(condition)}")

使用示例:

# 创建演绎推理引擎
deductive = DeductiveReasoning()

# 添加规则
deductive.add_rule(
    # 条件: 如果是哺乳动物且会飞
    {"is_mammal": True, "can_fly": True},
    # 结论: 可能是蝙蝠
    {"likely_animal": "bat"}
)

deductive.add_rule(
    # 条件: 如果是哺乳动物且生活在水中
    {"is_mammal": True, "lives_in_water": True},
    # 结论: 可能是鲸鱼或海豚
    {"likely_animal": ["whale", "dolphin"]}
)

# 使用推理引擎
facts = {"is_mammal": True, "can_fly": True}
conclusions = deductive.reason(facts)
print("推理结论:", conclusions)

归纳推理

从特定实例归纳出一般规律:

import numpy as np
from sklearn.tree import DecisionTreeClassifier

class InductiveReasoning:
    def __init__(self):
        self.classifier = DecisionTreeClassifier()
        self.trained = False
        
    def learn_from_examples(self, examples, labels):
        """从例子中学习模式"""
        if len(examples) != len(labels):
            raise ValueError("例子和标签数量不匹配")
            
        # 训练分类器
        self.classifier.fit(examples, labels)
        self.trained = True
        
    def infer_pattern(self, new_instance):
        """推断新实例的模式"""
        if not self.trained:
            raise RuntimeError("推理引擎尚未通过例子学习")
            
        # 预测新实例的类别
        prediction = self.classifier.predict([new_instance])[0]
        
        # 计算置信度(使用类概率)
        probabilities = self.classifier.predict_proba([new_instance])[0]
        confidence = max(probabilities)
        
        return prediction, confidence
    
    def extract_rules(self):
        """从学习的模型中提取规则"""
        if not self.trained:
            raise RuntimeError("推理引擎尚未通过例子学习")
            
        # 获取特征重要性
        importances = self.classifier.feature_importances_
        
        # 提取规则(简化实现)
        rules = []
        for i, importance in enumerate(importances):
            if importance > 0.1:  # 只关注重要特征
                rules.append(f"特征{i}对分类很重要,重要性为{importance:.4f}")
                
        return rules

使用示例:

# 创建归纳推理引擎
inductive = InductiveReasoning()

# 准备训练数据
# 每个例子是[大小, 重量, 颜色编码]
examples = [
    [2, 200, 1],  # 小,轻,红色 -> 苹果
    [3, 300, 1],  # 中,中,红色 -> 苹果
    [2, 150, 2],  # 小,轻,橙色 -> 橘子
    [4, 150, 2],  # 中,轻,橙色 -> 橘子
    [8, 1000, 3], # 大,重,黄色 -> 香蕉
    [9, 900, 3]   # 大,重,黄色 -> 香蕉
]
labels = ["apple", "apple", "orange", "orange", "banana", "banana"]

# 从例子中学习
inductive.learn_from_examples(examples, labels)

# 推断新实例
new_fruit = [2, 180, 1]  # 小,轻,红色
prediction, confidence = inductive.infer_pattern(new_fruit)
print(f"推断结果: {prediction},置信度: {confidence:.4f}")

# 提取学到的规则
rules = inductive.extract_rules()
print("提取的规则:")
for rule in rules:
    print(f"- {rule}")

溯因推理

从结果推断可能的原因:

class AbductiveReasoning:
    def __init__(self):
        # 存储因果关系
        self.causal_relations = {}
        # 存储先验概率
        self.priors = {}
        
    def add_causal_relation(self, cause, effect, likelihood):
        """添加因果关系
        
        参数:
            cause: 原因
            effect: 结果
            likelihood: P(effect|cause) 原因导致结果的概率
        """
        if effect not in self.causal_relations:
            self.causal_relations[effect] = []
            
        self.causal_relations[effect].append({
            "cause": cause,
            "likelihood": likelihood
        })
        
    def set_prior(self, cause, probability):
        """设置原因的先验概率"""
        self.priors[cause] = probability
        
    def reason(self, observed_effects):
        """基于观察结果推断最可能的原因"""
        if not observed_effects:
            return []
            
        # 计算每个原因的后验概率
        posterior_probs = {}
        
        # 收集所有可能的原因
        all_causes = set()
        for effect in observed_effects:
            if effect in self.causal_relations:
                for relation in self.causal_relations[effect]:
                    all_causes.add(relation["cause"])
        
        # 对每个原因计算后验概率
        for cause in all_causes:
            # 获取先验概率,默认为0.5
            prior = self.priors.get(cause, 0.5)
            
            # 计算似然率
            likelihood = 1.0
            for effect in observed_effects:
                effect_likelihood = 0.01  # 很小的默认值
                
                if effect in self.causal_relations:
                    for relation in self.causal_relations[effect]:
                        if relation["cause"] == cause:
                            effect_likelihood = relation["likelihood"]
                            break
                
                likelihood *= effect_likelihood
            
            # 简化的后验计算(非标准贝叶斯公式)
            posterior = prior * likelihood
            posterior_probs[cause] = posterior
        
        # 对原因按后验概率排序
        sorted_causes = sorted(posterior_probs.items(), key=lambda x: x[1], reverse=True)
        
        return sorted_causes

使用示例:

# 创建溯因推理引擎
abductive = AbductiveReasoning()

# 添加医疗诊断的因果关系
# 疾病 -> 症状,概率
abductive.add_causal_relation("感冒", "发热", 0.8)
abductive.add_causal_relation("感冒", "咳嗽", 0.9)
abductive.add_causal_relation("感冒", "流鼻涕", 0.95)

abductive.add_causal_relation("流感", "发热", 0.95)
abductive.add_causal_relation("流感", "咳嗽", 0.8)
abductive.add_causal_relation("流感", "肌肉疼痛", 0.7)

abductive.add_causal_relation("新冠", "发热", 0.9)
abductive.add_causal_relation("新冠", "咳嗽", 0.85)
abductive.add_causal_relation("新冠", "呼吸困难", 0.4)
abductive.add_causal_relation("新冠", "味觉丧失", 0.6)

# 设置先验概率(疾病发生率)
abductive.set_prior("感冒", 0.1)  # 感冒相对常见
abductive.set_prior("流感", 0.05)  # 流感次之
abductive.set_prior("新冠", 0.01)  # 新冠较少见

# 观察症状
observed_symptoms = ["发热", "咳嗽", "味觉丧失"]

# 推断可能的疾病
likely_diseases = abductive.reason(observed_symptoms)
print("可能的诊断结果:")
for disease, probability in likely_diseases:
    print(f"- {disease}: 概率 {probability:.4f}")

4.1.2 符号推理与概率推理

基于规则的符号推理

class RuleBasedSystem:
    def __init__(self):
        self.facts = set()
        self.rules = []
        
    def add_fact(self, fact):
        """添加事实"""
        self.facts.add(fact)
        
    def add_rule(self, conditions, conclusion):
        """添加规则"""
        self.rules.append({
            "conditions": conditions,
            "conclusion": conclusion
        })
        
    def infer(self, max_iterations=100):
        """进行推理,返回推导出的新事实"""
        new_facts = set()
        iterations = 0
        
        while iterations < max_iterations:
            iteration_new_facts = set()
            
            for rule in self.rules:
                # 检查所有条件是否满足
                if all(cond in self.facts for cond in rule["conditions"]):
                    # 如果结论还不是已知事实,则添加
                    if rule["conclusion"] not in self.facts:
                        iteration_new_facts.add(rule["conclusion"])
            
            # 如果没有新事实被添加,则结束推理
            if not iteration_new_facts:
                break
                
            # 添加新推导出的事实
            for fact in iteration_new_facts:
                self.facts.add(fact)
                new_facts.add(fact)
                
            iterations += 1
            
        return new_facts

贝叶斯网络的概率推理

import numpy as np

class BayesianNode:
    def __init__(self, name, states):
        self.name = name
        self.states = states
        self.parents = []
        self.children = []
        self.cpt = None  # 条件概率表
        
    def set_parents(self, parents):
        """设置父节点"""
        self.parents = parents
        
        # 为父节点添加当前节点为子节点
        for parent in parents:
            if self not in parent.children:
                parent.children.append(self)
                
    def set_cpt(self, cpt):
        """设置条件概率表"""
        self.cpt = cpt

class BayesianNetwork:
    def __init__(self):
        self.nodes = {}
        
    def add_node(self, name, states):
        """添加节点"""
        node = BayesianNode(name, states)
        self.nodes[name] = node
        return node
        
    def add_edge(self, parent_name, child_name):
        """添加边(因果关系)"""
        if parent_name not in self.nodes or child_name not in self.nodes:
            raise ValueError("父节点或子节点不存在")
            
        parent = self.nodes[parent_name]
        child = self.nodes[child_name]
        
        # 设置父子关系
        if parent not in child.parents:
            child.parents.append(parent)
        if child not in parent.children:
            parent.children.append(child)
            
    def query(self, query_var, evidence):
        """查询变量的后验概率
        
        参数:
            query_var: 查询变量名
            evidence: 证据变量及其观察值的字典
        """
        # 简化实现,使用近似推理
        # 实际应用中可使用变量消除或采样等方法
        
        # 执行简化的吉布斯采样
        samples = self._gibbs_sampling(evidence, num_samples=1000)
        
        # 计算查询变量的后验分布
        query_node = self.nodes[query_var]
        counts = {state: 0 for state in query_node.states}
        
        for sample in samples:
            state = sample[query_var]
            counts[state] += 1
            
        # 归一化
        total = sum(counts.values())
        distribution = {state: count/total for state, count in counts.items()}
        
        return distribution
    
    def _gibbs_sampling(self, evidence, num_samples=1000, burn_in=100):
        """吉布斯采样实现"""
        # 初始化样本
        current_state = self._initialize_state(evidence)
        samples = []
        
        # 生成样本
        for i in range(num_samples + burn_in):
            # 对每个非证据变量进行采样
            for node_name, node in self.nodes.items():
                if node_name not in evidence:
                    # 计算条件概率
                    probs = self._compute_conditional_probability(node, current_state)
                    # 采样新状态
                    current_state[node_name] = np.random.choice(node.states, p=probs)
            
            # 忽略预热期
            if i >= burn_in:
                samples.append(current_state.copy())
                
        return samples
    
    def _initialize_state(self, evidence):
        """初始化采样状态"""
        state = {}
        
        # 设置证据变量
        for var, val in evidence.items():
            state[var] = val
            
        # 随机初始化非证据变量
        for node_name, node in self.nodes.items():
            if node_name not in evidence:
                state[node_name] = np.random.choice(node.states)
                
        return state
    
    def _compute_conditional_probability(self, node, state):
        """计算节点的条件概率"""
        # 简化实现,实际应用中需要考虑各种情况
        
        # 如果节点没有父节点,使用先验概率
        if not node.parents:
            return node.cpt
            
        # 获取父节点状态组合的索引
        parent_states = tuple(state[parent.name] for parent in node.parents)
        
        # 返回对应的条件概率分布
        if parent_states in node.cpt:
            return node.cpt[parent_states]
        else:
            # 默认均匀分布
            return [1.0/len(node.states)] * len(node.states)

4.1.3 基于神经网络的深度推理

深度神经网络可用于各种推理任务,特别是在处理非结构化数据时:

import torch
import torch.nn as nn
import torch.optim as optim

class DeepReasoningNetwork(nn.Module):
    def __init__(self, input_size, hidden_size, output_size):
        super(DeepReasoningNetwork, self).__init__()
        
        # 模型架构
        self.model = nn.Sequential(
            nn.Linear(input_size, hidden_size),
            nn.ReLU(),
            nn.Dropout(0.2),
            nn.Linear(hidden_size, hidden_size),
            nn.ReLU(),
            nn.Dropout(0.2),
            nn.Linear(hidden_size, output_size),
            nn.Softmax(dim=1)
        )
        
    def forward(self, x):
        return self.model(x)

class DeepReasoning:
    def __init__(self, input_size, hidden_size, output_size, learning_rate=0.001):
        # 创建模型
        self.model = DeepReasoningNetwork(input_size, hidden_size, output_size)
        
        # 损失函数和优化器
        self.criterion = nn.CrossEntropyLoss()
        self.optimizer = optim.Adam(self.model.parameters(), lr=learning_rate)
        
        # 输出标签映射
        self.output_labels = None
        
    def train(self, inputs, labels, epochs=100, batch_size=32):
        """训练模型"""
        # 转换为PyTorch张量
        inputs_tensor = torch.FloatTensor(inputs)
        labels_tensor = torch.LongTensor(labels)
        
        # 训练循环
        for epoch in range(epochs):
            # 随机打乱数据
            indices = torch.randperm(len(inputs))
            
            total_loss = 0
            batch_count = 0
            
            # 批量训练
            for i in range(0, len(inputs), batch_size):
                # 获取批次索引
                batch_indices = indices[i:i+batch_size]
                
                # 准备批次数据
                batch_inputs = inputs_tensor[batch_indices]
                batch_labels = labels_tensor[batch_indices]
                
                # 前向传播
                outputs = self.model(batch_inputs)
                loss = self.criterion(outputs, batch_labels)
                
                # 反向传播和优化
                self.optimizer.zero_grad()
                loss.backward()
                self.optimizer.step()
                
                total_loss += loss.item()
                batch_count += 1
                
            # 打印训练进度
            if (epoch + 1) % 10 == 0:
                print(f'Epoch [{epoch+1}/{epochs}], Loss: {total_loss/batch_count:.4f}')
                
    def reason(self, inputs):
        """使用训练好的模型进行推理"""
        # 转换为PyTorch张量
        inputs_tensor = torch.FloatTensor(inputs)
        
        # 使用模型推理
        with torch.no_grad():
            outputs = self.model(inputs_tensor)
            
        # 获取最可能的类别及其概率
        probabilities, predicted = torch.max(outputs, 1)
        
        # 如果有标签映射,转换为标签
        if self.output_labels is not None:
            predictions = [self.output_labels[p.item()] for p in predicted]
        else:
            predictions = predicted.numpy()
            
        # 返回预测结果和置信度
        return predictions, probabilities.numpy()
    
    def set_output_labels(self, labels):
        """设置输出标签映射"""
        self.output_labels = labels

4.1.4 混合推理系统

组合多种推理方法可以创建更强大的推理系统:

class HybridReasoningSystem:
    def __init__(self):
        # 初始化各种推理引擎
        self.rule_based = RuleBasedSystem()
        self.bayesian = BayesianNetwork()
        self.deep_reasoning = None  # 延迟初始化
        
        # 推理结果置信度阈值
        self.confidence_threshold = 0.7
        
    def configure_deep_reasoning(self, input_size, hidden_size, output_size):
        """配置深度推理模块"""
        self.deep_reasoning = DeepReasoning(input_size, hidden_size, output_size)
        
    def add_knowledge(self, knowledge_type, *args, **kwargs):
        """添加知识到相应的推理引擎"""
        if knowledge_type == "fact":
            self.rule_based.add_fact(*args, **kwargs)
        elif knowledge_type == "rule":
            self.rule_based.add_rule(*args, **kwargs)
        elif knowledge_type == "bayes_node":
            return self.bayesian.add_node(*args, **kwargs)
        elif knowledge_type == "bayes_edge":
            self.bayesian.add_edge(*args, **kwargs)
        else:
            raise ValueError(f"不支持的知识类型: {knowledge_type}")
            
    def reason(self, input_data, reasoning_type=None):
        """进行推理
        
        参数:
            input_data: 输入数据
            reasoning_type: 推理类型,如果为None则自动选择
        """
        results = {}
        
        # 根据输入数据和任务选择最合适的推理方法
        if reasoning_type is None:
            reasoning_type = self._select_reasoning_type(input_data)
            
        # 执行指定的推理
        if reasoning_type == "rule":
            # 使用规则推理
            new_facts = self.rule_based.infer()
            results["conclusions"] = list(new_facts)
            results["confidence"] = 1.0  # 规则推理确定性为1
            
        elif reasoning_type == "bayes":
            # 检查输入是否包含查询和证据
            if "query" not in input_data or "evidence" not in input_data:
                raise ValueError("贝叶斯推理需要查询变量和证据")
                
            # 贝叶斯推理
            query_var = input_data["query"]
            evidence = input_data["evidence"]
            distribution = self.bayesian.query(query_var, evidence)
            
            # 找出最可能的状态
            most_likely_state = max(distribution.items(), key=lambda x: x[1])
            
            results["distribution"] = distribution
            results["most_likely"] = most_likely_state[0]
            results["confidence"] = most_likely_state[1]
            
        elif reasoning_type == "deep":
            # 检查深度推理模型是否初始化
            if self.deep_reasoning is None:
                raise RuntimeError("深度推理模型尚未配置")
                
            # 检查输入格式
            if not isinstance(input_data, list):
                raise ValueError("深度推理需要特征向量列表")
                
            # 深度推理
            predictions, confidences = self.deep_reasoning.reason(input_data)
            
            results["predictions"] = predictions
            results["confidences"] = confidences
            
        else:
            raise ValueError(f"不支持的推理类型: {reasoning_type}")
            
        return results
    
    def _select_reasoning_type(self, input_data):
        """自动选择最合适的推理类型"""
        # 简化的选择逻辑
        if isinstance(input_data, dict) and "query" in input_data and "evidence" in input_data:
            return "bayes"
        elif isinstance(input_data, list) and all(isinstance(x, (list, tuple, np.ndarray)) for x in input_data):
            return "deep"
        else:
            return "rule"

4.2 执行模块:智能体的"行动系统"

执行模块负责将智能体的决策转化为具体行动,与环境进行交互。

4.2.1 执行模块的基本架构

class ExecutionModule:
    def __init__(self):
        # 注册可用的行动
        self.available_actions = {}
        
        # 执行历史
        self.execution_history = []
        
        # 行动监控标志
        self.monitor_enabled = True
        
    def register_action(self, action_name, action_func, required_args=None):
        """注册一个可执行的行动"""
        if required_args is None:
            required_args = []
            
        self.available_actions[action_name] = {
            "function": action_func,
            "required_args": required_args
        }
        
    def execute(self, action_name, **kwargs):
        """执行指定的行动"""
        # 检查行动是否存在
        if action_name not in self.available_actions:
            raise ValueError(f"未知行动: {action_name}")
            
        action_info = self.available_actions[action_name]
        
        # 检查必要参数
        for arg in action_info["required_args"]:
            if arg not in kwargs:
                raise ValueError(f"执行{action_name}缺少必要参数: {arg}")
                
        # 记录执行前的时间
        start_time = time.time()
        
        try:
            # 执行行动
            result = action_info["function"](**kwargs)
            success = True
            error = None
        except Exception as e:
            # 捕获执行错误
            result = None
            success = False
            error = str(e)
            
        # 记录执行后的时间
        end_time = time.time()
        execution_time = end_time - start_time
        
        # 记录执行历史
        execution_record = {
            "action": action_name,
            "parameters": kwargs,
            "timestamp": start_time,
            "execution_time": execution_time,
            "success": success,
            "result": result,
            "error": error
        }
        
        self.execution_history.append(execution_record)
        
        # 如果启用监控,检查执行结果
        if self.monitor_enabled:
            self._monitor_execution(execution_record)
            
        # 如果执行失败,抛出异常
        if not success:
            raise RuntimeError(f"执行{action_name}失败: {error}")
            
        return result
    
    def get_history(self, n=None):
        """获取执行历史"""
        if n is None:
            return self.execution_history
        return self.execution_history[-n:]
    
    def clear_history(self):
        """清空执行历史"""
        self.execution_history = []
        
    def _monitor_execution(self, execution_record):
        """监控执行情况"""
        # 检查执行时间
        if execution_record["execution_time"] > 5.0:
            print(f"警告: 行动{execution_record['action']}执行时间过长: {execution_record['execution_time']:.2f}秒")
            
        # 检查执行成功率
        recent_records = self.execution_history[-10:]
        success_rate = sum(1 for r in recent_records if r["success"]) / len(recent_records)
        
        if success_rate < 0.7:
            print(f"警告: 最近行动成功率低: {success_rate:.2f}")

4.2.2 常见执行操作类型

API调用执行器

import requests

class APIExecutor:
    def __init__(self, base_url, auth_token=None, timeout=10):
        self.base_url = base_url
        self.auth_token = auth_token
        self.timeout = timeout
        
    def make_request(self, endpoint, method="GET", params=None, data=None, headers=None):
        """发送API请求"""
        # 构建完整URL
        url = f"{self.base_url}/{endpoint}"
        
        # 准备请求头
        request_headers = {}
        if self.auth_token:
            request_headers["Authorization"] = f"Bearer {self.auth_token}"
            
        if headers:
            request_headers.update(headers)
            
        # 发送请求
        try:
            if method.upper() == "GET":
                response = requests.get(url, params=params, headers=request_headers, timeout=self.timeout)
            elif method.upper() == "POST":
                response = requests.post(url, params=params, json=data, headers=request_headers, timeout=self.timeout)
            elif method.upper() == "PUT":
                response = requests.put(url, params=params, json=data, headers=request_headers, timeout=self.timeout)
            elif method.upper() == "DELETE":
                response = requests.delete(url, params=params, headers=request_headers, timeout=self.timeout)
            else:
                raise ValueError(f"不支持的HTTP方法: {method}")
                
            # 检查响应状态
            response.raise_for_status()
            
            # 解析响应数据
            if response.headers.get("content-type") == "application/json":
                result = response.json()
            else:
                result = response.text
                
            return {
                "status_code": response.status_code,
                "headers": dict(response.headers),
                "data": result
            }
            
        except requests.exceptions.RequestException as e:
            # 处理请求异常
            raise RuntimeError(f"API请求失败: {str(e)}")

数据库操作执行器

import sqlite3

class DatabaseExecutor:
    def __init__(self, db_path):
        self.db_path = db_path
        self.connection = None
        self.cursor = None
        
    def connect(self):
        """连接到数据库"""
        try:
            self.connection = sqlite3.connect(self.db_path)
            self.cursor = self.connection.cursor()
            return True
        except sqlite3.Error as e:
            raise RuntimeError(f"数据库连接失败: {str(e)}")
            
    def disconnect(self):
        """断开数据库连接"""
        if self.connection:
            self.connection.close()
            self.connection = None
            self.cursor = None
            
    def execute_query(self, query, params=None):
        """执行查询语句"""
        if not self.connection:
            self.connect()
            
        try:
            if params:
                self.cursor.execute(query, params)
            else:
                self.cursor.execute(query)
                
            # 获取列名
            if self.cursor.description:
                columns = [desc[0] for desc in self.cursor.description]
                
                # 获取结果
                results = []
                for row in self.cursor.fetchall():
                    results.append(dict(zip(columns, row)))
                    
                return results
            return None
            
        except sqlite3.Error as e:
            raise RuntimeError(f"查询执行失败: {str(e)}")
            
    def execute_update(self, query, params=None):
        """执行更新语句"""
        if not self.connection:
            self.connect()
            
        try:
            if params:
                self.cursor.execute(query, params)
            else:
                self.cursor.execute(query)
                
            # 提交更改
            self.connection.commit()
            
            # 返回影响的行数
            return self.cursor.rowcount
            
        except sqlite3.Error as e:
            # 回滚更改
            if self.connection:
                self.connection.rollback()
            raise RuntimeError(f"更新执行失败: {str(e)}")
            
    def execute_transaction(self, queries):
        """执行事务"""
        if not self.connection:
            self.connect()
            
        try:
            # 开始事务
            self.connection.execute("BEGIN TRANSACTION")
            
            results = []
            # 执行所有查询
            for query, params in queries:
                if params:
                    self.cursor.execute(query, params)
                else:
                    self.cursor.execute(query)
                    
                # 如果是查询语句,获取结果
                if query.strip().upper().startswith("SELECT"):
                    columns = [desc[0] for desc in self.cursor.description]
                    query_results = []
                    for row in self.cursor.fetchall():
                        query_results.append(dict(zip(columns, row)))
                    results.append(query_results)
                else:
                    results.append({"rowcount": self.cursor.rowcount})
                    
            # 提交事务
            self.connection.commit()
            
            return results
            
        except sqlite3.Error as e:
            # 回滚事务
            if self.connection:
                self.connection.rollback()
            raise RuntimeError(f"事务执行失败: {str(e)}")

文件操作执行器

import os
import json
import csv

class FileExecutor:
    def __init__(self, base_directory="."):
        self.base_directory = base_directory
        
    def read_text_file(self, file_path, encoding="utf-8"):
        """读取文本文件"""
        full_path = os.path.join(self.base_directory, file_path)
        
        try:
            with open(full_path, "r", encoding=encoding) as file:
                content = file.read()
            return content
        except Exception as e:
            raise RuntimeError(f"读取文件失败: {str(e)}")
            
    def write_text_file(self, file_path, content, encoding="utf-8", mode="w"):
        """写入文本文件"""
        full_path = os.path.join(self.base_directory, file_path)
        
        # 确保目录存在
        os.makedirs(os.path.dirname(full_path), exist_ok=True)
        
        try:
            with open(full_path, mode, encoding=encoding) as file:
                file.write(content)
            return {"success": True, "path": full_path}
        except Exception as e:
            raise RuntimeError(f"写入文件失败: {str(e)}")
            
    def read_json_file(self, file_path, encoding="utf-8"):
        """读取JSON文件"""
        content = self.read_text_file(file_path, encoding)
        try:
            return json.loads(content)
        except json.JSONDecodeError as e:
            raise RuntimeError(f"JSON解析失败: {str(e)}")
            
    def write_json_file(self, file_path, data, encoding="utf-8", indent=2):
        """写入JSON文件"""
        try:
            content = json.dumps(data, ensure_ascii=False, indent=indent)
            return self.write_text_file(file_path, content, encoding)
        except Exception as e:
            raise RuntimeError(f"JSON写入失败: {str(e)}")
            
    def read_csv_file(self, file_path, encoding="utf-8", delimiter=","):
        """读取CSV文件"""
        full_path = os.path.join(self.base_directory, file_path)
        
        try:
            rows = []
            with open(full_path, "r", encoding=encoding, newline="") as file:
                reader = csv.DictReader(file, delimiter=delimiter)
                for row in reader:
                    rows.append(dict(row))
            return rows
        except Exception as e:
            raise RuntimeError(f"CSV读取失败: {str(e)}")
            
    def write_csv_file(self, file_path, data, encoding="utf-8", delimiter=","):
        """写入CSV文件"""
        if not data:
            raise ValueError("没有数据可写入CSV")
            
        full_path = os.path.join(self.base_directory, file_path)
        
        # 确保目录存在
        os.makedirs(os.path.dirname(full_path), exist_ok=True)
        
        try:
            with open(full_path, "w", encoding=encoding, newline="") as file:
                # 获取列名
                fieldnames = data[0].keys()
                writer = csv.DictWriter(file, fieldnames=fieldnames, delimiter=delimiter)
                
                # 写入表头
                writer.writeheader()
                
                # 写入数据
                writer.writerows(data)
                
            return {"success": True, "path": full_path, "rows": len(data)}
        except Exception as e:
            raise RuntimeError(f"CSV写入失败: {str(e)}")

4.2.3 执行监控与错误处理

class ExecutionMonitor:
    def __init__(self, execution_module):
        self.execution_module = execution_module
        self.error_counts = {}
        self.performance_metrics = {
            "total_executions": 0,
            "successful_executions": 0,
            "failed_executions": 0,
            "total_execution_time": 0
        }
        
    def start_monitoring(self):
        """开始监控执行模块"""
        self.execution_module.monitor_enabled = True
        
        # 清空现有指标
        self.error_counts = {}
        self.performance_metrics = {
            "total_executions": 0,
            "successful_executions": 0,
            "failed_executions": 0,
            "total_execution_time": 0
        }
        
    def stop_monitoring(self):
        """停止监控"""
        self.execution_module.monitor_enabled = False
        
    def update_metrics(self, execution_record):
        """更新性能指标"""
        # 更新总执行次数
        self.performance_metrics["total_executions"] += 1
        
        # 更新执行时间
        self.performance_metrics["total_execution_time"] += execution_record["execution_time"]
        
        # 更新成功/失败计数
        if execution_record["success"]:
            self.performance_metrics["successful_executions"] += 1
        else:
            self.performance_metrics["failed_executions"] += 1
            
            # 更新错误计数
            error_type = execution_record["error"] or "未知错误"
            if error_type not in self.error_counts:
                self.error_counts[error_type] = 0
            self.error_counts[error_type] += 1
            
    def get_report(self):
        """获取监控报告"""
        # 计算平均执行时间
        avg_execution_time = 0
        if self.performance_metrics["total_executions"] > 0:
            avg_execution_time = (
                self.performance_metrics["total_execution_time"] / 
                self.performance_metrics["total_executions"]
            )
            
        # 计算成功率
        success_rate = 0
        if self.performance_metrics["total_executions"] > 0:
            success_rate = (
                self.performance_metrics["successful_executions"] / 
                self.performance_metrics["total_executions"]
            )
            
        # 组装报告
        report = {
            "total_executions": self.performance_metrics["total_executions"],
            "successful_executions": self.performance_metrics["successful_executions"],
            "failed_executions": self.performance_metrics["failed_executions"],
            "success_rate": success_rate,
            "avg_execution_time": avg_execution_time,
            "top_errors": sorted(
                self.error_counts.items(),
                key=lambda x: x[1],
                reverse=True
            )[:5]  # 前5个最常见错误
        }
        
        return report
    
    def handle_error(self, execution_record, max_retries=3):
        """处理执行错误"""
        action = execution_record["action"]
        parameters = execution_record["parameters"]
        error = execution_record["error"]
        
        # 检查是否可以重试
        if self._is_retryable_error(error):
            # 获取此行动的历史失败次数
            action_history = [
                r for r in self.execution_module.execution_history
                if r["action"] == action and not r["success"]
            ]
            retry_count = len(action_history)
            
            # 如果未超过最大重试次数,则重试
            if retry_count < max_retries:
                print(f"重试行动 {action} (第 {retry_count + 1}/{max_retries} 次)")
                
                # 添加重试标记
                parameters["_retry_count"] = retry_count + 1
                
                # 重试执行
                try:
                    return self.execution_module.execute(action, **parameters)
                except Exception as e:
                    print(f"重试失败: {str(e)}")
                    
        # 不能重试或重试失败,返回备用响应
        return self._get_fallback_response(action, error)
        
    def _is_retryable_error(self, error):
        """判断错误是否可重试"""
        # 简化实现,实际应用需要更详细的错误分类
        non_retryable_errors = [
            "权限拒绝",
            "资源不存在",
            "参数无效",
            "语法错误"
        ]
        
        for non_retryable in non_retryable_errors:
            if non_retryable in error:
                return False
                
        return True
        
    def _get_fallback_response(self, action, error):
        """获取备用响应"""
        # 根据行动类型返回不同的备用响应
        if "query" in action.lower():
            return {"error": error, "fallback": "无法检索数据,请稍后重试"}
        elif "create" in action.lower() or "update" in action.lower() or "delete" in action.lower():
            return {"error": error, "fallback": "操作失败,请检查输入并重试"}
        else:
            return {"error": error, "fallback": "行动执行失败,请联系管理员"}

4.3 整合推理引擎和执行模块

现在,我们将推理引擎和执行模块整合起来,构建一个功能完整的智能体核心:

class AgentCore:
    def __init__(self):
        # 初始化推理引擎
        self.reasoning = HybridReasoningSystem()
        
        # 初始化执行模块
        self.execution = ExecutionModule()
        
        # 初始化执行监控器
        self.monitor = ExecutionMonitor(self.execution)
        
        # 初始化各种执行器
        self.api_executor = None
        self.db_executor = None
        self.file_executor = None
        
        # 注册基本行动
        self._register_basic_actions()
        
    def configure_api_executor(self, base_url, auth_token=None):
        """配置API执行器"""
        self.api_executor = APIExecutor(base_url, auth_token)
        self._register_api_actions()
        
    def configure_db_executor(self, db_path):
        """配置数据库执行器"""
        self.db_executor = DatabaseExecutor(db_path)
        self._register_db_actions()
        
    def configure_file_executor(self, base_directory="."):
        """配置文件执行器"""
        self.file_executor = FileExecutor(base_directory)
        self._register_file_actions()
        
    def _register_basic_actions(self):
        """注册基本行动"""
        # 打印消息
        self.execution.register_action(
            "print_message",
            lambda message: print(message),
            ["message"]
        )
        
        # 等待指定时间
        self.execution.register_action(
            "wait",
            lambda seconds: time.sleep(seconds),
            ["seconds"]
        )
        
    def _register_api_actions(self):
        """注册API相关行动"""
        if not self.api_executor:
            return
            
        # GET请求
        self.execution.register_action(
            "api_get",
            lambda endpoint, params=None, headers=None: 
                self.api_executor.make_request(endpoint, "GET", params, None, headers),
            ["endpoint"]
        )
        
        # POST请求
        self.execution.register_action(
            "api_post",
            lambda endpoint, data=None, params=None, headers=None: 
                self.api_executor.make_request(endpoint, "POST", params, data, headers),
            ["endpoint"]
        )
        
        # PUT请求
        self.execution.register_action(
            "api_put",
            lambda endpoint, data=None, params=None, headers=None: 
                self.api_executor.make_request(endpoint, "PUT", params, data, headers),
            ["endpoint"]
        )
        
        # DELETE请求
        self.execution.register_action(
            "api_delete",
            lambda endpoint, params=None, headers=None: 
                self.api_executor.make_request(endpoint, "DELETE", params, None, headers),
            ["endpoint"]
        )
        
    def _register_db_actions(self):
        """注册数据库相关行动"""
        if not self.db_executor:
            return
            
        # 查询
        self.execution.register_action(
            "db_query",
            lambda query, params=None: self.db_executor.execute_query(query, params),
            ["query"]
        )
        
        # 更新
        self.execution.register_action(
            "db_update",
            lambda query, params=None: self.db_executor.execute_update(query, params),
            ["query"]
        )
        
        # 事务
        self.execution.register_action(
            "db_transaction",
            lambda queries: self.db_executor.execute_transaction(queries),
            ["queries"]
        )
        
    def _register_file_actions(self):
        """注册文件相关行动"""
        if not self.file_executor:
            return
            
        # 读取文本文件
        self.execution.register_action(
            "read_text_file",
            lambda file_path, encoding="utf-8": 
                self.file_executor.read_text_file(file_path, encoding),
            ["file_path"]
        )
        
        # 写入文本文件
        self.execution.register_action(
            "write_text_file",
            lambda file_path, content, encoding="utf-8", mode="w": 
                self.file_executor.write_text_file(file_path, content, encoding, mode),
            ["file_path", "content"]
        )
        
        # 读取JSON文件
        self.execution.register_action(
            "read_json_file",
            lambda file_path, encoding="utf-8": 
                self.file_executor.read_json_file(file_path, encoding),
            ["file_path"]
        )
        
        # 写入JSON文件
        self.execution.register_action(
            "write_json_file",
            lambda file_path, data, encoding="utf-8", indent=2: 
                self.file_executor.write_json_file(file_path, data, encoding, indent),
            ["file_path", "data"]
        )
        
        # 读取CSV文件
        self.execution.register_action(
            "read_csv_file",
            lambda file_path, encoding="utf-8", delimiter=",": 
                self.file_executor.read_csv_file(file_path, encoding, delimiter),
            ["file_path"]
        )
        
        # 写入CSV文件
        self.execution.register_action(
            "write_csv_file",
            lambda file_path, data, encoding="utf-8", delimiter=",": 
                self.file_executor.write_csv_file(file_path, data, encoding, delimiter),
            ["file_path", "data"]
        )
        
    def process(self, input_data, execution_required=True):
        """处理输入,进行推理和执行"""
        # 开始监控
        self.monitor.start_monitoring()
        
        try:
            # 1. 进行推理
            reasoning_result = self.reasoning.reason(input_data)
            
            # 如果不需要执行,直接返回推理结果
            if not execution_required:
                return {
                    "reasoning": reasoning_result,
                    "execution": None,
                    "status": "reasoning_only"
                }
                
            # 2. 根据推理结果确定执行行动
            action_plan = self._create_action_plan(reasoning_result)
            
            # 3. 执行行动
            execution_results = []
            for action in action_plan:
                try:
                    result = self.execution.execute(
                        action["name"], 
                        **action["parameters"]
                    )
                    execution_results.append({
                        "action": action["name"],
                        "success": True,
                        "result": result
                    })
                except Exception as e:
                    # 处理执行错误
                    error_record = self.execution.execution_history[-1]
                    fallback = self.monitor.handle_error(error_record)
                    
                    execution_results.append({
                        "action": action["name"],
                        "success": False,
                        "error": str(e),
                        "fallback": fallback
                    })
                    
            # 4. 返回完整结果
            return {
                "reasoning": reasoning_result,
                "execution": execution_results,
                "status": "completed",
                "metrics": self.monitor.get_report()
            }
            
        finally:
            # 停止监控
            self.monitor.stop_monitoring()
            
    def _create_action_plan(self, reasoning_result):
        """基于推理结果创建行动计划"""
        # 简化实现,根据推理类型创建不同的行动
        action_plan = []
        
        # 根据推理类型处理
        if "conclusions" in reasoning_result:
            # 规则推理结果
            for conclusion in reasoning_result["conclusions"]:
                # 创建对应的行动
                if isinstance(conclusion, dict) and "action" in conclusion:
                    action_plan.append({
                        "name": conclusion["action"],
                        "parameters": conclusion.get("parameters", {})
                    })
                else:
                    # 默认行动:打印结论
                    action_plan.append({
                        "name": "print_message",
                        "parameters": {"message": f"结论: {conclusion}"}
                    })
                    
        elif "most_likely" in reasoning_result:
            # 贝叶斯推理结果
            state = reasoning_result["most_likely"]
            confidence = reasoning_result["confidence"]
            
            action_name = f"handle_{state}"
            if action_name in self.execution.available_actions:
                action_plan.append({
                    "name": action_name,
                    "parameters": {"confidence": confidence}
                })
            else:
                # 默认行动:打印最可能状态
                action_plan.append({
                    "name": "print_message",
                    "parameters": {
                        "message": f"最可能状态: {state}, 置信度: {confidence:.4f}"
                    }
                })
                
        elif "predictions" in reasoning_result:
            # 深度推理结果
            for i, (prediction, confidence) in enumerate(
                zip(reasoning_result["predictions"], reasoning_result["confidences"])
            ):
                # 只执行高置信度的预测
                if confidence > 0.7:
                    action_name = f"execute_prediction_{prediction}"
                    if action_name in self.execution.available_actions:
                        action_plan.append({
                            "name": action_name,
                            "parameters": {"confidence": float(confidence)}
                        })
                    else:
                        # 默认行动:打印预测
                        action_plan.append({
                            "name": "print_message",
                            "parameters": {
                                "message": f"预测 {i+1}: {prediction}, 置信度: {float(confidence):.4f}"
                            }
                        })
        
        return action_plan

4.4 实例:构建信息检索和分析智能体

下面我们将构建一个具备推理和执行能力的信息检索与分析智能体,展示如何应用本章所学概念:

import os
import time
import json
import requests
import pandas as pd
import matplotlib.pyplot as plt
from datetime import datetime

class InfoRetrievalAgent:
    def __init__(self):
        # 初始化代理核心
        self.core = AgentCore()
        
        # 配置执行器
        self.core.configure_file_executor("./data")
        
        # 添加自定义行动
        self._register_custom_actions()
        
        # 配置推理系统
        self._configure_reasoning()
        
    def _register_custom_actions(self):
        """注册自定义行动"""
        # 数据分析
        self.core.execution.register_action(
            "analyze_data",
            self._action_analyze_data,
            ["data", "analysis_type"]
        )
        
        # 可视化
        self.core.execution.register_action(
            "visualize_data",
            self._action_visualize_data,
            ["data", "visualization_type", "output_path"]
        )
        
        # 信息提取
        self.core.execution.register_action(
            "extract_information",
            self._action_extract_information,
            ["text", "extraction_type"]
        )
        
        # 结果总结
        self.core.execution.register_action(
            "summarize_results",
            self._action_summarize_results,
            ["results", "summary_type"]
        )
        
    def _configure_reasoning(self):
        """配置推理系统"""
        # 添加推理规则
        self.core.reasoning.add_knowledge("rule", 
            [{"request_type": "data_analysis"}],
            {"action": "analyze_data", "parameters": {"analysis_type": "statistical"}}
        )
        
        self.core.reasoning.add_knowledge("rule", 
            [{"request_type": "visualization"}],
            {"action": "visualize_data", "parameters": {"visualization_type": "bar_chart"}}
        )
        
        self.core.reasoning.add_knowledge("rule", 
            [{"request_type": "information_extraction"}],
            {"action": "extract_information", "parameters": {"extraction_type": "entities"}}
        )
        
        self.core.reasoning.add_knowledge("rule", 
            [{"request_type": "summarization"}],
            {"action": "summarize_results", "parameters": {"summary_type": "brief"}}
        )
        
        # 添加更复杂的规则
        self.core.reasoning.add_knowledge("rule",
            lambda facts: "data_source" in facts and facts["data_source"].endswith(".csv"),
            {"action": "read_csv_file", "parameters": {"file_path": lambda facts: facts["data_source"]}}
        )
        
        self.core.reasoning.add_knowledge("rule",
            lambda facts: "data_source" in facts and facts["data_source"].endswith(".json"),
            {"action": "read_json_file", "parameters": {"file_path": lambda facts: facts["data_source"]}}
        )
        
    def process_request(self, request):
        """处理用户请求"""
        # 解析请求
        try:
            if isinstance(request, str):
                # 如果是字符串,解析为字典
                request_facts = self._parse_text_request(request)
            else:
                # 已经是字典格式
                request_facts = request
                
            # 处理请求
            result = self.core.process(request_facts)
            
            return {
                "success": True,
                "result": result,
                "timestamp": datetime.now().isoformat()
            }
            
        except Exception as e:
            return {
                "success": False,
                "error": str(e),
                "timestamp": datetime.now().isoformat()
            }
            
    def _parse_text_request(self, text):
        """解析文本请求为结构化格式"""
        request_facts = {"original_text": text}
        
        # 识别请求类型
        if "分析" in text or "统计" in text or "计算" in text:
            request_facts["request_type"] = "data_analysis"
            
        elif "可视化" in text or "图表" in text or "绘制" in text:
            request_facts["request_type"] = "visualization"
            
        elif "提取" in text or "找出" in text or "识别" in text:
            request_facts["request_type"] = "information_extraction"
            
        elif "总结" in text or "概括" in text or "汇总" in text:
            request_facts["request_type"] = "summarization"
            
        # 识别数据源
        if "文件" in text:
            # 简单的文件名提取
            words = text.split()
            for word in words:
                if word.endswith(".csv") or word.endswith(".json") or word.endswith(".txt"):
                    request_facts["data_source"] = word
                    break
                    
        return request_facts
        
    # 行动实现
    def _action_analyze_data(self, data, analysis_type):
        """数据分析行动"""
        if isinstance(data, str):
            # 如果是文件路径,先读取数据
            if data.endswith(".csv"):
                data = self.core.execution.execute("read_csv_file", file_path=data)
            elif data.endswith(".json"):
                data = self.core.execution.execute("read_json_file", file_path=data)
                
        # 转换为pandas DataFrame
        if isinstance(data, list):
            df = pd.DataFrame(data)
        else:
            df = pd.DataFrame([data])
            
        # 根据分析类型执行不同分析
        if analysis_type == "statistical":
            # 统计分析
            results = {
                "row_count": len(df),
                "column_count": len(df.columns),
                "column_types": {col: str(dtype) for col, dtype in df.dtypes.items()},
                "missing_values": df.isnull().sum().to_dict(),
            }
            
            # 数值列的描述性统计
            numeric_columns = df.select_dtypes(include=["number"])
            if not numeric_columns.empty:
                results["statistics"] = {}
                for col in numeric_columns.columns:
                    results["statistics"][col] = {
                        "mean": df[col].mean(),
                        "median": df[col].median(),
                        "std": df[col].std(),
                        "min": df[col].min(),
                        "max": df[col].max()
                    }
                    
            return results
            
        elif analysis_type == "correlation":
            # 相关性分析
            numeric_columns = df.select_dtypes(include=["number"])
            if numeric_columns.empty:
                return {"error": "没有数值列可进行相关性分析"}
                
            corr_matrix = numeric_columns.corr().to_dict()
            return {"correlation_matrix": corr_matrix}
            
        elif analysis_type == "groupby":
            # 分组分析
            if "group_column" not in data and len(df.columns) > 0:
                group_column = df.columns[0]  # 默认使用第一列
            else:
                group_column = data["group_column"]
                
            if "agg_column" not in data and len(df.columns) > 1:
                # 尝试找到数值列
                numeric_cols = df.select_dtypes(include=["number"]).columns
                if not numeric_cols.empty:
                    agg_column = numeric_cols[0]
                else:
                    agg_column = df.columns[1]  # 退化为使用第二列
            else:
                agg_column = data["agg_column"]
                
            # 执行分组聚合
            grouped = df.groupby(group_column)[agg_column].agg(['count', 'mean', 'sum']).reset_index()
            return grouped.to_dict(orient="records")
            
        else:
            return {"error": f"不支持的分析类型: {analysis_type}"}
            
    def _action_visualize_data(self, data, visualization_type, output_path):
        """数据可视化行动"""
        if isinstance(data, str):
            # 如果是文件路径,先读取数据
            if data.endswith(".csv"):
                data = self.core.execution.execute("read_csv_file", file_path=data)
            elif data.endswith(".json"):
                data = self.core.execution.execute("read_json_file", file_path=data)
                
        # 转换为pandas DataFrame
        if isinstance(data, list):
            df = pd.DataFrame(data)
        else:
            df = pd.DataFrame([data])
            
        # 创建图表
        plt.figure(figsize=(10, 6))
        
        if visualization_type == "bar_chart":
            # 条形图
            if len(df.columns) < 2:
                return {"error": "数据至少需要两列才能创建条形图"}
                
            # 选择列
            x_column = df.columns[0]
            y_column = df.select_dtypes(include=["number"]).columns[0] if df.select_dtypes(include=["number"]).columns.any() else df.columns[1]
            
            plt.bar(df[x_column], df[y_column])
            plt.xlabel(x_column)
            plt.ylabel(y_column)
            plt.title(f"{y_column} by {x_column}")
            plt.xticks(rotation=45)
            
        elif visualization_type == "line_chart":
            # 折线图
            if len(df.columns) < 2:
                return {"error": "数据至少需要两列才能创建折线图"}
                
            # 选择列
            x_column = df.columns[0]
            y_column = df.select_dtypes(include=["number"]).columns[0] if df.select_dtypes(include=["number"]).columns.any() else df.columns[1]
            
            plt.plot(df[x_column], df[y_column], marker='o')
            plt.xlabel(x_column)
            plt.ylabel(y_column)
            plt.title(f"{y_column} Trend by {x_column}")
            plt.xticks(rotation=45)
            
        elif visualization_type == "pie_chart":
            # 饼图
            if len(df.columns) < 2:
                return {"error": "数据至少需要两列才能创建饼图"}
                
            # 选择列
            label_column = df.columns[0]
            value_column = df.select_dtypes(include=["number"]).columns[0] if df.select_dtypes(include=["number"]).columns.any() else df.columns[1]
            
            plt.pie(df[value_column], labels=df[label_column], autopct='%1.1f%%')
            plt.title(f"Distribution of {value_column}")
            
        elif visualization_type == "scatter_plot":
            # 散点图
            if len(df.select_dtypes(include=["number"]).columns) < 2:
                return {"error": "数据至少需要两个数值列才能创建散点图"}
                
            # 选择列
            numeric_columns = df.select_dtypes(include=["number"]).columns
            x_column = numeric_columns[0]
            y_column = numeric_columns[1]
            
            plt.scatter(df[x_column], df[y_column])
            plt.xlabel(x_column)
            plt.ylabel(y_column)
            plt.title(f"{y_column} vs {x_column}")
            
        else:
            return {"error": f"不支持的可视化类型: {visualization_type}"}
            
        # 保存图表
        plt.tight_layout()
        plt.savefig(output_path)
        plt.close()
        
        return {
            "visualization_type": visualization_type,
            "output_path": output_path,
            "success": True
        }
        
    def _action_extract_information(self, text, extraction_type):
        """信息提取行动"""
        if isinstance(text, str) and text.endswith((".txt", ".csv", ".json")):
            # 如果是文件路径,先读取内容
            try:
                text = self.core.execution.execute("read_text_file", file_path=text)
            except:
                pass
                
        if extraction_type == "entities":
            # 简单的实体提取(实际应用需要使用NLP库)
            entities = {
                "persons": [],
                "organizations": [],
                "locations": [],
                "dates": []
            }
            
            # 简化实现,使用关键词匹配
            common_orgs = ["公司", "集团", "企业", "部门", "机构", "学校", "大学", "协会"]
            common_locs = ["市", "省", "区", "县", "路", "街", "国", "城"]
            
            words = text.split()
            for word in words:
                # 日期检测
                if word.count("-") == 2 and len(word) >= 8:
                    entities["dates"].append(word)
                
                # 组织检测    
                for org in common_orgs:
                    if org in word and len(word) > len(org):
                        entities["organizations"].append(word)
                        break
                        
                # 地点检测
                for loc in common_locs:
                    if word.endswith(loc) and len(word) > len(loc):
                        entities["locations"].append(word)
                        break
                        
            return {"entities": entities}
            
        elif extraction_type == "keywords":
            # 关键词提取
            words = text.split()
            # 简化实现,仅计数
            word_counts = {}
            for word in words:
                if len(word) > 1:  # 忽略单字符
                    if word not in word_counts:
                        word_counts[word] = 0
                    word_counts[word] += 1
                    
            # 按频率排序
            sorted_words = sorted(word_counts.items(), key=lambda x: x[1], reverse=True)
            keywords = [word for word, count in sorted_words[:10]]
            
            return {"keywords": keywords}
            
        elif extraction_type == "summary":
            # 简单的提取式摘要
            sentences = text.split("。")
            if len(sentences) <= 3:
                return {"summary": text}
                
            # 简化实现,选择前三个句子
            summary = "。".join(sentences[:3]) + "。"
            
            return {"summary": summary}
            
        else:
            return {"error": f"不支持的提取类型: {extraction_type}"}
            
    def _action_summarize_results(self, results, summary_type):
        """结果总结行动"""
        if not results:
            return {"error": "没有结果可总结"}
            
        if summary_type == "brief":
            # 简要总结
            summary = {
                "result_type": type(results).__name__,
                "timestamp": datetime.now().isoformat()
            }
            
            if isinstance(results, dict):
                summary["keys"] = list(results.keys())
                summary["total_items"] = len(results)
            elif isinstance(results, list):
                summary["total_items"] = len(results)
                if results and isinstance(results[0], dict):
                    summary["sample_keys"] = list(results[0].keys())
            else:
                summary["data"] = str(results)
                
            return {"summary": summary}
            
        elif summary_type == "detailed":
            # 详细总结
            summary = {
                "result_type": type(results).__name__,
                "timestamp": datetime.now().isoformat(),
                "analysis": {}
            }
            
            if isinstance(results, dict):
                summary["keys"] = list(results.keys())
                summary["total_items"] = len(results)
                
                # 分析每个键的值类型
                for key, value in results.items():
                    summary["analysis"][key] = {
                        "type": type(value).__name__,
                        "sample": str(value)[:100] if isinstance(value, str) else value
                    }
                    
            elif isinstance(results, list):
                summary["total_items"] = len(results)
                
                if results:
                    first_item = results[0]
                    summary["first_item_type"] = type(first_item).__name__
                    
                    if isinstance(first_item, dict):
                        summary["common_keys"] = list(first_item.keys())
                        
                        # 检查是否所有项目都有相同的键
                        all_keys_same = all(set(item.keys()) == set(first_item.keys()) for item in results)
                        summary["all_items_same_structure"] = all_keys_same
                        
            return {"summary": summary}
            
        else:
            return {"error": f"不支持的总结类型: {summary_type}"}

使用示例

# 创建信息检索分析智能体
agent = InfoRetrievalAgent()

# 示例1:数据分析请求
result1 = agent.process_request("分析 sales_data.csv 文件")
print(json.dumps(result1, indent=2))

# 示例2:可视化请求
result2 = agent.process_request("为 sales_data.csv 创建销售趋势的折线图")
print(json.dumps(result2, indent=2))

# 示例3:信息提取请求
result3 = agent.process_request("从 news_articles.txt 中提取主要实体")
print(json.dumps(result3, indent=2))

# 示例4:结果总结请求
result4 = agent.process_request("总结 customer_feedback.json 的内容")
print(json.dumps(result4, indent=2))

4.5 总结与下一步

本章详细探讨了智能体的推理引擎和执行模块,这两个组件使智能体能够从信息分析到实际行动,完成复杂任务。我们学习了:

  1. 各种推理方法及其实现,包括演绎推理、归纳推理、溯因推理
  2. 符号推理和概率推理的区别与应用
  3. 基于神经网络的深度推理及其与传统推理的结合
  4. 执行模块的设计与实现,包括API调用、数据库操作和文件操作
  5. 执行监控与错误处理机制
  6. 如何整合推理引擎和执行模块构建完整的智能体核心

通过构建信息检索和分析智能体,我们展示了这些概念的实际应用。这种智能体能够:

  • 分析不同格式的数据
  • 创建各种类型的可视化
  • 从文本中提取有用信息
  • 对结果进行总结

在实际应用中,推理引擎和执行模块使智能体能够在复杂环境中自主运作,是构建高级智能体的关键组件。

至此,我们已经完成了AI智能体五大核心组件的学习。在下一章中,我们将探讨如何将这些组件整合成一个完整的智能体系统,并介绍智能体的高级应用场景。

💬 评论

暂无评论