Jupyter AI

3 智能体的记忆系统和学习机制

📅发表日期: 2025-03-12

🏷️分类: AI智能体Agent

👁️阅读次数: 0

3.1 记忆系统:智能体的"经验库"

记忆系统是智能体存储和检索过去经验的关键组件,它使智能体能够基于历史信息做出更明智的决策,实现上下文感知能力和长期学习。

3.1.1 记忆系统的基本类型

智能体的记忆系统通常分为以下几种类型,各自服务于不同的功能需求:

短期记忆(工作记忆)

短期记忆存储当前任务或对话的即时上下文,类似于人类的工作记忆:

class ShortTermMemory:
    def __init__(self, capacity=5):
        self.capacity = capacity  # 最大容量
        self.memory = []          # 存储最近交互
        
    def add(self, interaction):
        """添加新交互到短期记忆"""
        # 如果达到容量上限,移除最旧的记忆
        if len(self.memory) >= self.capacity:
            self.memory.pop(0)
        
        # 添加新交互
        self.memory.append({
            "timestamp": time.time(),
            "data": interaction,
        })
        
    def get_recent(self, n=None):
        """获取最近n条记忆"""
        if n is None or n > len(self.memory):
            return self.memory
        return self.memory[-n:]
    
    def clear(self):
        """清空短期记忆"""
        self.memory = []

长期记忆

长期记忆存储持久性知识和经验,用于跨会话保持智能体的知识:

import sqlite3
import json
import time

class LongTermMemory:
    def __init__(self, db_path="agent_memory.db"):
        # 连接数据库
        self.conn = sqlite3.connect(db_path)
        self.cursor = self.conn.cursor()
        
        # 创建记忆表(如果不存在)
        self.cursor.execute('''
        CREATE TABLE IF NOT EXISTS memories (
            id INTEGER PRIMARY KEY,
            type TEXT,
            content TEXT,
            timestamp REAL,
            importance REAL,
            metadata TEXT
        )
        ''')
        self.conn.commit()
        
    def add(self, memory_type, content, importance=0.5, metadata=None):
        """添加新记忆到长期存储"""
        if metadata is None:
            metadata = {}
            
        self.cursor.execute('''
        INSERT INTO memories (type, content, timestamp, importance, metadata)
        VALUES (?, ?, ?, ?, ?)
        ''', (memory_type, json.dumps(content), time.time(), importance, json.dumps(metadata)))
        
        self.conn.commit()
        
    def retrieve(self, memory_type=None, query=None, limit=10):
        """检索记忆"""
        sql = "SELECT * FROM memories"
        params = []
        
        # 构建查询条件
        conditions = []
        
        if memory_type:
            conditions.append("type = ?")
            params.append(memory_type)
            
        if query:
            conditions.append("content LIKE ?")
            params.append(f"%{query}%")
            
        if conditions:
            sql += " WHERE " + " AND ".join(conditions)
            
        # 按重要性和时间排序
        sql += " ORDER BY importance DESC, timestamp DESC LIMIT ?"
        params.append(limit)
        
        # 执行查询
        self.cursor.execute(sql, params)
        results = self.cursor.fetchall()
        
        # 处理结果
        memories = []
        for row in results:
            memories.append({
                "id": row[0],
                "type": row[1],
                "content": json.loads(row[2]),
                "timestamp": row[3],
                "importance": row[4],
                "metadata": json.loads(row[5])
            })
            
        return memories
    
    def update_importance(self, memory_id, new_importance):
        """更新记忆的重要性"""
        self.cursor.execute('''
        UPDATE memories
        SET importance = ?
        WHERE id = ?
        ''', (new_importance, memory_id))
        
        self.conn.commit()
        
    def close(self):
        """关闭数据库连接"""
        self.conn.close()

情景记忆

情景记忆存储特定事件和经历的详细信息:

class EpisodicMemory:
    def __init__(self, long_term_memory):
        self.ltm = long_term_memory  # 使用长期记忆存储
        
    def store_episode(self, episode_data):
        """存储完整的交互场景"""
        # 提取元数据
        metadata = {
            "entities": episode_data.get("entities", []),
            "outcome": episode_data.get("outcome", "unknown"),
            "location": episode_data.get("location", "unknown")
        }
        
        # 将场景存入长期记忆
        self.ltm.add(
            memory_type="episode",
            content=episode_data,
            importance=episode_data.get("importance", 0.5),
            metadata=metadata
        )
        
    def retrieve_similar_episodes(self, current_situation, limit=3):
        """检索与当前情况相似的情景记忆"""
        # 提取当前情况的关键元素
        key_entities = [entity[0] for entity in current_situation.get("entities", [])]
        
        # 构建查询条件
        query = " ".join(key_entities)
        
        # 检索相似记忆
        similar_episodes = self.ltm.retrieve(
            memory_type="episode",
            query=query,
            limit=limit
        )
        
        return similar_episodes

语义记忆

语义记忆存储概念性知识和事实信息:

class SemanticMemory:
    def __init__(self, long_term_memory):
        self.ltm = long_term_memory  # 使用长期记忆存储
        
    def store_fact(self, fact, subject=None, predicate=None, object=None):
        """存储事实信息"""
        # 构建元数据
        metadata = {
            "subject": subject,
            "predicate": predicate,
            "object": object
        }
        
        # 将事实存入长期记忆
        self.ltm.add(
            memory_type="fact",
            content=fact,
            importance=0.7,  # 事实通常较重要
            metadata=metadata
        )
        
    def retrieve_facts(self, subject=None, predicate=None, object=None, query=None, limit=10):
        """检索相关事实"""
        # 构建查询
        search_query = query or ""
        if subject:
            search_query += f" {subject}"
        if predicate:
            search_query += f" {predicate}"
        if object:
            search_query += f" {object}"
            
        # 检索匹配的事实
        facts = self.ltm.retrieve(
            memory_type="fact",
            query=search_query.strip(),
            limit=limit
        )
        
        return facts

3.1.2 记忆检索机制

智能体需要高效地检索相关记忆,以下是几种常见的检索策略:

基于关键词的检索

def keyword_retrieval(memory_system, query, threshold=0.5):
    """基于关键词的简单检索"""
    # 提取查询关键词
    keywords = extract_keywords(query)
    
    # 检索所有记忆
    all_memories = memory_system.retrieve()
    
    # 计算相关性分数
    relevant_memories = []
    for memory in all_memories:
        memory_content = json.dumps(memory["content"])
        score = calculate_keyword_similarity(memory_content, keywords)
        
        if score >= threshold:
            relevant_memories.append((memory, score))
    
    # 按相关性排序
    relevant_memories.sort(key=lambda x: x[1], reverse=True)
    
    return [memory for memory, _ in relevant_memories]

def extract_keywords(text):
    """从文本中提取关键词"""
    # 简化版本,实际应用中可使用NLP库
    common_words = {"的", "了", "是", "在", "我", "有", "和", "就", "不", "人", "都", "一", "一个", "这", "那", "你", "他", "她", "它"}
    words = text.lower().split()
    return [word for word in words if word not in common_words]

def calculate_keyword_similarity(text, keywords):
    """计算文本与关键词的相似度"""
    matches = 0
    for keyword in keywords:
        if keyword in text.lower():
            matches += 1
    
    if not keywords:
        return 0
    
    return matches / len(keywords)

基于语义的检索

使用向量嵌入进行语义相似性检索:

import numpy as np
from sentence_transformers import SentenceTransformer

class SemanticRetrieval:
    def __init__(self):
        # 加载预训练模型
        self.model = SentenceTransformer('paraphrase-multilingual-MiniLM-L12-v2')
        self.memory_embeddings = []  # [(memory, embedding), ...]
        
    def add_memory_embedding(self, memory):
        """计算并存储记忆的嵌入向量"""
        # 提取记忆内容
        if isinstance(memory["content"], dict):
            content = json.dumps(memory["content"])
        else:
            content = str(memory["content"])
            
        # 计算嵌入向量
        embedding = self.model.encode(content)
        
        # 存储记忆及其嵌入向量
        self.memory_embeddings.append((memory, embedding))
        
    def retrieve(self, query, top_k=5):
        """检索与查询语义最相关的记忆"""
        # 计算查询的嵌入向量
        query_embedding = self.model.encode(query)
        
        # 计算相似度
        similarities = []
        for memory, embedding in self.memory_embeddings:
            similarity = self._cosine_similarity(query_embedding, embedding)
            similarities.append((memory, similarity))
            
        # 按相似度排序
        similarities.sort(key=lambda x: x[1], reverse=True)
        
        # 返回前top_k个结果
        return [memory for memory, _ in similarities[:top_k]]
    
    def _cosine_similarity(self, vec1, vec2):
        """计算余弦相似度"""
        return np.dot(vec1, vec2) / (np.linalg.norm(vec1) * np.linalg.norm(vec2))

记忆的重要性和遗忘机制

智能体需要管理记忆的重要性,并实现合理的遗忘机制:

class MemoryManager:
    def __init__(self, long_term_memory, forgetting_threshold=0.2):
        self.ltm = long_term_memory
        self.forgetting_threshold = forgetting_threshold
        
    def update_memory_importance(self, memory_id, access_count, success_contribution):
        """更新记忆重要性
        
        参数:
            memory_id: 记忆ID
            access_count: 记忆被访问的次数
            success_contribution: 记忆对成功操作的贡献度(0-1)
        """
        # 检索当前重要性
        memory = self.ltm.retrieve(query=f"id={memory_id}")
        if not memory:
            return
            
        current_importance = memory[0]["importance"]
        
        # 基于使用频率和有用性计算新重要性
        frequency_factor = min(access_count / 10, 1.0)  # 归一化访问频率
        new_importance = 0.7 * current_importance + 0.15 * frequency_factor + 0.15 * success_contribution
        
        # 更新记忆重要性
        self.ltm.update_importance(memory_id, new_importance)
        
    def forget_memories(self):
        """遗忘低重要性记忆"""
        # 检索所有记忆
        all_memories = self.ltm.retrieve(limit=1000)
        
        # 遗忘低于阈值的记忆
        forgotten_count = 0
        for memory in all_memories:
            if memory["importance"] < self.forgetting_threshold:
                # 根据重要性随机决定是否遗忘
                if random.random() > memory["importance"] * 2:
                    # 实际应用中可能选择标记为inactive而非真正删除
                    # 此处简化实现
                    self.ltm.delete(memory["id"])
                    forgotten_count += 1
                    
        return forgotten_count

3.2 学习机制:智能体的"进化能力"

学习机制使智能体能够从经验和反馈中改进自身性能,实现适应性行为。

3.2.1 学习的基本类型

监督学习

基于标记数据学习输入与输出间的映射关系:

from sklearn.ensemble import RandomForestClassifier
import numpy as np

class SupervisedLearner:
    def __init__(self):
        self.classifier = RandomForestClassifier()
        self.trained = False
        
    def train(self, features, labels):
        """训练分类器
        
        参数:
            features: 特征向量列表
            labels: 标签列表
        """
        if len(features) != len(labels):
            raise ValueError("特征和标签数量不匹配")
            
        # 训练分类器
        self.classifier.fit(features, labels)
        self.trained = True
        
        return self.classifier.score(features, labels)  # 返回训练准确率
        
    def predict(self, features):
        """预测新样本的标签"""
        if not self.trained:
            raise RuntimeError("分类器尚未训练")
            
        return self.classifier.predict(features)
    
    def update(self, new_features, new_labels):
        """增量更新分类器"""
        # 简化实现,重新训练
        # 实际应用中可能使用增量学习算法
        if self.trained:
            # 假设我们保存了先前的训练数据
            all_features = np.vstack([self.previous_features, new_features])
            all_labels = np.concatenate([self.previous_labels, new_labels])
            self.train(all_features, all_labels)
        else:
            self.train(new_features, new_labels)

强化学习

通过试错和奖励信号学习最优策略:

import numpy as np
import random

class QLearningAgent:
    def __init__(self, state_size, action_size, learning_rate=0.1, discount_factor=0.9, exploration_rate=0.1):
        self.state_size = state_size
        self.action_size = action_size
        self.learning_rate = learning_rate
        self.discount_factor = discount_factor
        self.exploration_rate = exploration_rate
        self.q_table = np.zeros((state_size, action_size))
        
    def choose_action(self, state):
        """选择行动,平衡探索与利用"""
        # 探索:随机选择行动
        if random.random() < self.exploration_rate:
            return random.randint(0, self.action_size - 1)
        
        # 利用:选择Q值最高的行动
        return np.argmax(self.q_table[state, :])
    
    def learn(self, state, action, reward, next_state):
        """更新Q值表"""
        # 获取当前Q值
        current_q = self.q_table[state, action]
        
        # 计算最大下一状态Q值
        max_next_q = np.max(self.q_table[next_state, :])
        
        # Q-learning更新公式
        new_q = current_q + self.learning_rate * (reward + self.discount_factor * max_next_q - current_q)
        
        # 更新Q表
        self.q_table[state, action] = new_q
        
    def decrease_exploration(self, decay_rate=0.995):
        """随时间减少探索率"""
        self.exploration_rate *= decay_rate
        self.exploration_rate = max(0.01, self.exploration_rate)  # 设置最小探索率

自监督学习

从未标记数据中学习有用的表示:

import torch
import torch.nn as nn
import torch.optim as optim

class SimpleAutoencoder(nn.Module):
    def __init__(self, input_size, encoding_size):
        super(SimpleAutoencoder, self).__init__()
        
        # 编码器
        self.encoder = nn.Sequential(
            nn.Linear(input_size, 128),
            nn.ReLU(),
            nn.Linear(128, encoding_size),
            nn.ReLU()
        )
        
        # 解码器
        self.decoder = nn.Sequential(
            nn.Linear(encoding_size, 128),
            nn.ReLU(),
            nn.Linear(128, input_size),
            nn.Sigmoid()  # 输出范围[0,1]
        )
        
    def forward(self, x):
        # 编码
        encoded = self.encoder(x)
        # 解码
        decoded = self.decoder(encoded)
        return decoded
    
    def encode(self, x):
        """仅执行编码步骤"""
        return self.encoder(x)

class SelfSupervisedLearner:
    def __init__(self, input_size, encoding_size):
        self.model = SimpleAutoencoder(input_size, encoding_size)
        self.optimizer = optim.Adam(self.model.parameters(), lr=0.001)
        self.criterion = nn.MSELoss()
        
    def train(self, data, epochs=100, batch_size=32):
        """训练自编码器"""
        for epoch in range(epochs):
            total_loss = 0
            batch_count = 0
            
            # 简化的批处理逻辑
            for i in range(0, len(data), batch_size):
                batch = data[i:i+batch_size]
                batch = torch.FloatTensor(batch)
                
                # 前向传播
                outputs = self.model(batch)
                loss = self.criterion(outputs, batch)
                
                # 反向传播和优化
                self.optimizer.zero_grad()
                loss.backward()
                self.optimizer.step()
                
                total_loss += loss.item()
                batch_count += 1
            
            # 打印每个epoch的平均损失
            if (epoch + 1) % 10 == 0:
                print(f'Epoch [{epoch+1}/{epochs}], Loss: {total_loss/batch_count:.4f}')
                
    def extract_features(self, data):
        """提取数据的特征表示"""
        with torch.no_grad():
            data_tensor = torch.FloatTensor(data)
            features = self.model.encode(data_tensor)
        return features.numpy()

3.2.2 经验回放

经验回放是强化学习中的重要技术,可以提高样本效率:

import random
import numpy as np
from collections import deque

class ExperienceReplay:
    def __init__(self, capacity=10000):
        self.memory = deque(maxlen=capacity)
        
    def add_experience(self, state, action, reward, next_state, done):
        """存储经验"""
        self.memory.append((state, action, reward, next_state, done))
        
    def sample_batch(self, batch_size=64):
        """随机采样经验批次"""
        if len(self.memory) < batch_size:
            # 如果记忆不足,返回所有可用经验
            return list(self.memory)
        else:
            # 随机采样
            return random.sample(self.memory, batch_size)
    
    def __len__(self):
        return len(self.memory)

3.2.3 元学习(学会学习)

元学习使智能体能够快速适应新任务:

class MetaLearningAgent:
    def __init__(self, base_model, meta_learning_rate=0.01):
        self.base_model = base_model  # 基础模型
        self.meta_learning_rate = meta_learning_rate
        self.task_adaptations = {}  # 存储针对特定任务的适应
        
    def adapt_to_task(self, task_id, task_data, adapt_steps=5):
        """快速适应到新任务"""
        if task_id in self.task_adaptations:
            # 使用现有适应
            adapted_model = self.task_adaptations[task_id]
        else:
            # 创建新适应
            adapted_model = self._clone_model(self.base_model)
            
        # 在少量样本上微调
        for _ in range(adapt_steps):
            # 简化的梯度更新逻辑
            loss = self._compute_loss(adapted_model, task_data)
            gradients = self._compute_gradients(loss)
            self._apply_gradients(adapted_model, gradients, self.meta_learning_rate)
            
        # 存储任务适应
        self.task_adaptations[task_id] = adapted_model
        
        return adapted_model
    
    def meta_train(self, task_batch, meta_steps=10):
        """元训练过程"""
        # 使用多个任务更新基础模型
        for _ in range(meta_steps):
            meta_loss = 0
            
            # 对每个任务进行适应和评估
            for task_id, task_data in task_batch:
                # 适应到任务
                adapted_model = self.adapt_to_task(task_id, task_data)
                
                # 在验证数据上评估
                task_val_data = self._get_validation_data(task_id)
                val_loss = self._compute_loss(adapted_model, task_val_data)
                meta_loss += val_loss
                
            # 更新基础模型
            meta_gradients = self._compute_meta_gradients(meta_loss)
            self._apply_gradients(self.base_model, meta_gradients, self.meta_learning_rate / len(task_batch))
    
    # 辅助方法(实际实现会更复杂)
    def _clone_model(self, model):
        # 简化实现,实际应用需要深复制模型
        return copy.deepcopy(model)
    
    def _compute_loss(self, model, data):
        # 计算损失的逻辑
        pass
    
    def _compute_gradients(self, loss):
        # 计算梯度的逻辑
        pass
    
    def _apply_gradients(self, model, gradients, learning_rate):
        # 应用梯度的逻辑
        pass
    
    def _compute_meta_gradients(self, meta_loss):
        # 计算元梯度的逻辑
        pass
    
    def _get_validation_data(self, task_id):
        # 获取验证数据的逻辑
        pass

3.3 构建学习型智能体:整合记忆和学习

将记忆系统和学习机制整合成完整的学习型智能体:

class LearningAgent:
    def __init__(self, state_size, action_size):
        # 初始化记忆系统
        self.long_term_memory = LongTermMemory()
        self.short_term_memory = ShortTermMemory()
        self.episodic_memory = EpisodicMemory(self.long_term_memory)
        self.semantic_memory = SemanticMemory(self.long_term_memory)
        
        # 初始化学习机制
        self.rl_agent = QLearningAgent(state_size, action_size)
        self.experience_replay = ExperienceReplay()
        
        # 记忆管理器
        self.memory_manager = MemoryManager(self.long_term_memory)
        
        # 性能指标
        self.rewards_history = []
        self.success_rate = 0.0
        
    def perceive_and_act(self, state):
        """感知环境并采取行动"""
        # 将状态索引化
        state_index = self._state_to_index(state)
        
        # 检索相关记忆
        relevant_episodes = self.episodic_memory.retrieve_similar_episodes(state)
        relevant_facts = self.semantic_memory.retrieve_facts(query=json.dumps(state))
        
        # 使用记忆增强当前状态
        augmented_state = self._augment_state_with_memory(state, relevant_episodes, relevant_facts)
        augmented_state_index = self._state_to_index(augmented_state)
        
        # 选择行动
        action = self.rl_agent.choose_action(augmented_state_index)
        
        # 将当前状态添加到短期记忆
        self.short_term_memory.add({
            "state": state,
            "augmented_state": augmented_state,
            "action": action,
            "relevant_episodes": relevant_episodes,
            "relevant_facts": relevant_facts
        })
        
        return action
    
    def learn_from_feedback(self, reward, next_state, done):
        """从环境反馈中学习"""
        # 获取最近交互
        if not self.short_term_memory.memory:
            return
            
        recent = self.short_term_memory.memory[-1]
        state = recent["augmented_state"]
        action = recent["action"]
        
        # 状态索引化
        state_index = self._state_to_index(state)
        next_state_index = self._state_to_index(next_state)
        
        # 更新Q学习代理
        self.rl_agent.learn(state_index, action, reward, next_state_index)
        
        # 添加到经验回放
        self.experience_replay.add_experience(state_index, action, reward, next_state_index, done)
        
        # 如果episode结束,存储到情景记忆
        if done:
            episode_data = {
                "states": [interaction["state"] for interaction in self.short_term_memory.memory],
                "actions": [interaction["action"] for interaction in self.short_term_memory.memory],
                "final_reward": reward,
                "success": reward > 0,
                "importance": 0.5 + 0.5 * (1 if reward > 0 else 0)  # 成功任务更重要
            }
            
            self.episodic_memory.store_episode(episode_data)
            
            # 批量学习
            self._batch_learning()
            
            # 更新性能指标
            self.rewards_history.append(reward)
            self.success_rate = sum(1 for r in self.rewards_history[-100:] if r > 0) / min(100, len(self.rewards_history))
            
            # 记忆管理
            if len(self.rewards_history) % 100 == 0:
                # 周期性遗忘
                forgotten = self.memory_manager.forget_memories()
                print(f"记忆管理:遗忘了 {forgotten} 条低价值记忆")
            
            # 减少探索
            self.rl_agent.decrease_exploration()
            
            # 清空短期记忆
            self.short_term_memory.clear()
    
    def learn_fact(self, fact, subject=None, predicate=None, object=None):
        """学习新事实"""
        self.semantic_memory.store_fact(fact, subject, predicate, object)
    
    def _batch_learning(self, batch_size=32):
        """从经验回放中批量学习"""
        if len(self.experience_replay) < batch_size:
            return
            
        # 采样经验批次
        batch = self.experience_replay.sample_batch(batch_size)
        
        # 批量更新
        for state, action, reward, next_state, done in batch:
            # 如果episode结束,则下一状态的最大Q值为0
            max_next_q = 0 if done else np.max(self.rl_agent.q_table[next_state, :])
            
            # 更新公式
            current_q = self.rl_agent.q_table[state, action]
            new_q = current_q + self.rl_agent.learning_rate * (
                reward + self.rl_agent.discount_factor * max_next_q - current_q)
            
            # 更新Q表
            self.rl_agent.q_table[state, action] = new_q
    
    def _state_to_index(self, state):
        """将状态转换为索引(简化实现)"""
        # 实际应用中需要更复杂的状态编码方案
        # 这里假设状态已经是整数或可哈希为整数
        return hash(str(state)) % self.rl_agent.state_size
    
    def _augment_state_with_memory(self, state, episodes, facts):
        """使用记忆增强当前状态"""
        # 创建增强状态
        augmented_state = state.copy() if isinstance(state, dict) else {"base_state": state}
        
        # 添加历史经验信息
        if episodes:
            # 从类似场景提取有用信息
            successful_actions = []
            for episode in episodes:
                if episode["content"].get("success", False):
                    # 提取成功场景的行动
                    successful_actions.extend(episode["content"].get("actions", []))
            
            if successful_actions:
                # 计算最常见的成功行动
                from collections import Counter
                action_counter = Counter(successful_actions)
                augmented_state["suggested_action"] = action_counter.most_common(1)[0][0]
        
        # 添加相关事实信息
        if facts:
            augmented_state["relevant_facts"] = [fact["content"] for fact in facts]
        
        return augmented_state

## 3.4 实例:构建具有记忆和学习能力的对话智能体

下面我们将实现一个具备记忆和学习能力的对话智能体,展示如何应用本章所学概念:

```python
import time
import random
import json
import numpy as np
from collections import deque

class MemoryEnhancedChatAgent:
    def __init__(self):
        self.name = "学习助手"
        
        # 初始化记忆系统
        self.short_term_memory = ShortTermMemory(capacity=10)
        self.long_term_memory = LongTermMemory(db_path="chat_agent_memory.db")
        self.episodic_memory = EpisodicMemory(self.long_term_memory)
        self.semantic_memory = SemanticMemory(self.long_term_memory)
        
        # 初始化学习组件
        self.learned_responses = {}  # 学习到的问题-回答对
        self.feedback_history = {}   # 用户反馈历史
        
        # 初始知识库
        self.initialize_knowledge()
        
    def initialize_knowledge(self):
        """初始化基础知识"""
        # 存储基本事实
        facts = [
            {"fact": "地球是太阳系中的第三颗行星", "subject": "地球", "predicate": "是", "object": "行星"},
            {"fact": "水的化学式是H2O", "subject": "水", "predicate": "化学式", "object": "H2O"},
            {"fact": "人类大脑有约860亿个神经元", "subject": "人脑", "predicate": "包含", "object": "神经元"}
        ]
        
        for fact_data in facts:
            self.semantic_memory.store_fact(**fact_data)
    
    def chat(self, user_input, user_id="user1"):
        """处理用户输入并生成回复"""
        # 记录当前交互的时间戳
        timestamp = time.time()
        
        # 处理用户输入
        processed_input = self._process_input(user_input)
        
        # 检索相关记忆
        past_interactions = self._retrieve_user_history(user_id)
        relevant_facts = self.semantic_memory.retrieve_facts(query=user_input)
        
        # 检查是否有直接学习到的回应
        if user_input in self.learned_responses:
            response = self.learned_responses[user_input]
            confidence = 0.9
        else:
            # 生成回应
            response, confidence = self._generate_response(
                processed_input, 
                past_interactions, 
                relevant_facts
            )
        
        # 存储当前交互到短期记忆
        interaction = {
            "user_id": user_id,
            "user_input": user_input,
            "response": response,
            "timestamp": timestamp,
            "confidence": confidence
        }
        self.short_term_memory.add(interaction)
        
        # 如果是新问题,存储到情景记忆
        if len(past_interactions) < 2 or past_interactions[-1]["user_input"] != user_input:
            self.episodic_memory.store_episode({
                "user_id": user_id,
                "user_input": user_input,
                "response": response,
                "timestamp": timestamp,
                "type": "conversation"
            })
        
        # 学习新事实(如果用户输入包含事实)
        self._learn_from_input(user_input)
        
        return response
    
    def provide_feedback(self, feedback, user_id="user1"):
        """处理用户反馈"""
        # 获取最近的交互
        if not self.short_term_memory.memory:
            return "没有最近的交互可以反馈。"
            
        recent = self.short_term_memory.memory[-1]
        
        # 记录反馈
        feedback_entry = {
            "user_id": user_id,
            "user_input": recent["user_input"],
            "response": recent["response"],
            "feedback": feedback,
            "timestamp": time.time()
        }
        
        # 存储反馈
        interaction_key = f"{recent['user_input']}:{recent['response']}"
        if interaction_key not in self.feedback_history:
            self.feedback_history[interaction_key] = []
        self.feedback_history[interaction_key].append(feedback)
        
        # 从反馈中学习
        if feedback > 0:  # 正面反馈
            # 记住这个成功的回答
            self.learned_responses[recent["user_input"]] = recent["response"]
            
            # 更新情景记忆的重要性
            # 简化实现,实际应用需要查询确切的记忆ID
            self.long_term_memory.update_importance_by_content(
                recent["user_input"], 
                min(0.8, recent.get("importance", 0.5) + 0.1)
            )
            
        return "感谢您的反馈!我会继续改进。"
    
    def _process_input(self, user_input):
        """处理用户输入"""
        # 简化NLP处理
        # 实际应用中可使用更复杂的NLP技术
        processed = {
            "text": user_input.lower(),
            "tokens": user_input.lower().split(),
            "entities": self._extract_simple_entities(user_input),
            "intent": self._determine_intent(user_input)
        }
        return processed
    
    def _extract_simple_entities(self, text):
        """简单的实体提取"""
        entities = []
        # 实际应用中使用命名实体识别
        # 这里使用一个非常简化的实现
        common_entities = {
            "地球": "天体",
            "太阳": "天体",
            "水": "物质",
            "人脑": "器官"
        }
        
        for entity, entity_type in common_entities.items():
            if entity in text:
                entities.append((entity, entity_type))
                
        return entities
    
    def _determine_intent(self, text):
        """确定用户意图"""
        # 简化意图识别
        if "?" in text or "?" in text or text.startswith("什么") or text.startswith("如何") or text.startswith("为什么"):
            return "question"
        elif "谢谢" in text or "感谢" in text:
            return "thanks"
        elif "你好" in text or "您好" in text:
            return "greeting"
        else:
            return "statement"
    
    def _retrieve_user_history(self, user_id):
        """检索用户历史交互"""
        # 从短期记忆中查找
        user_interactions = [m for m in self.short_term_memory.memory if m["user_id"] == user_id]
        
        # 从情景记忆中查找
        past_episodes = self.long_term_memory.retrieve(
            query=user_id,
            limit=5
        )
        
        # 组合结果
        for episode in past_episodes:
            if episode["type"] == "episode":
                content = episode["content"]
                if content.get("user_id") == user_id:
                    user_interactions.append(content)
        
        return sorted(user_interactions, key=lambda x: x["timestamp"])
    
    def _generate_response(self, processed_input, past_interactions, relevant_facts):
        """生成回应"""
        intent = processed_input["intent"]
        confidence = 0.5  # 默认置信度
        
        # 基于意图生成回应
        if intent == "greeting":
            response = f"您好!我是{self.name},一个具有记忆和学习能力的AI助手。有什么我能帮您的吗?"
            confidence = 0.9
            
        elif intent == "thanks":
            response = "不用谢!很高兴能帮到您。还有其他问题吗?"
            confidence = 0.9
            
        elif intent == "question":
            # 尝试回答问题
            answer, conf = self._answer_question(processed_input, relevant_facts, past_interactions)
            response = answer
            confidence = conf
            
        else:
            # 一般陈述
            response = self._respond_to_statement(processed_input, past_interactions)
            confidence = 0.6
            
        return response, confidence
    
    def _answer_question(self, processed_input, facts, past_interactions):
        """回答问题"""
        question = processed_input["text"]
        
        # 首先检查是否有直接相关的事实
        if facts:
            # 使用最相关的事实
            most_relevant = facts[0]["content"]
            return f"根据我所知,{most_relevant}", 0.8
            
        # 检查历史交互中是否有相似问题
        for interaction in reversed(past_interactions):
            if "user_input" in interaction and self._calculate_similarity(interaction["user_input"], question) > 0.8:
                return f"如我之前所说,{interaction['response']}", 0.7
                
        # 没有找到答案
        return "很抱歉,我目前没有这个问题的答案。您能提供更多信息吗?", 0.3
    
    def _respond_to_statement(self, processed_input, past_interactions):
        """回应陈述"""
        # 提取主题
        main_topic = None
        if processed_input["entities"]:
            main_topic = processed_input["entities"][0][0]
            
        if main_topic:
            # 查找与主题相关的事实
            facts = self.semantic_memory.retrieve_facts(subject=main_topic)
            if facts:
                return f"说到{main_topic},我知道{facts[0]['content']}。"
                
        # 通用回应
        return "我明白了。请继续告诉我更多。"
    
    def _learn_from_input(self, user_input):
        """从用户输入中学习新事实"""
        # 简化的事实提取
        # 实际应用中需要更复杂的信息提取
        if "是" in user_input and not user_input.startswith("请问") and not user_input.endswith("吗?"):
            parts = user_input.split("是")
            if len(parts) == 2:
                subject = parts[0].strip()
                object_part = parts[1].strip().rstrip("。")
                
                if subject and object_part:
                    # 存储新事实
                    self.semantic_memory.store_fact(
                        fact=user_input,
                        subject=subject,
                        predicate="是",
                        object=object_part
                    )
                    return True
        return False
    
    def _calculate_similarity(self, text1, text2):
        """计算两段文本的简单相似度"""
        words1 = set(text1.lower().split())
        words2 = set(text2.lower().split())
        
        # Jaccard相似度
        intersection = words1.intersection(words2)
        union = words1.union(words2)
        
        if not union:
            return 0
            
        return len(intersection) / len(union)

# 扩展LongTermMemory类以支持按内容更新重要性
class LongTermMemory:
    # ... [之前的代码] ...
    
    def update_importance_by_content(self, content_query, new_importance):
        """根据内容查询更新记忆的重要性"""
        self.cursor.execute('''
        SELECT id FROM memories
        WHERE content LIKE ?
        LIMIT 1
        ''', (f'%{content_query}%',))
        
        result = self.cursor.fetchone()
        if result:
            memory_id = result[0]
            self.update_importance(memory_id, new_importance)
            return True
        return False

使用示例

# 创建记忆增强型聊天智能体
chat_agent = MemoryEnhancedChatAgent()

# 与智能体交互
print("智能体:", chat_agent.chat("你好!"))
print("用户: 你能告诉我关于地球的信息吗?")
print("智能体:", chat_agent.chat("你能告诉我关于地球的信息吗?"))
print("用户: 谢谢你的回答!")
print("智能体:", chat_agent.chat("谢谢你的回答!"))

# 提供一个新事实
print("用户: 月球是地球的卫星。")
print("智能体:", chat_agent.chat("月球是地球的卫星。"))

# 提问刚学到的内容
print("用户: 月球是什么?")
print("智能体:", chat_agent.chat("月球是什么?"))

# 提供反馈
print("用户: [提供正面反馈]")
print("智能体:", chat_agent.provide_feedback(1))

# 再次询问同样的问题
print("用户: 月球是什么?")
print("智能体:", chat_agent.chat("月球是什么?"))

3.5 总结与下一步

本章详细介绍了智能体的记忆系统和学习机制,这是使智能体具备适应性和进化能力的关键组件。我们学习了:

  1. 不同类型的记忆系统及其实现方式
  2. 记忆检索和管理机制
  3. 各种学习算法及其应用
  4. 如何整合记忆和学习构建完整的学习型智能体

通过构建具有记忆和学习能力的对话智能体,我们展示了这些概念的实际应用。这种智能体能够:

  • 记住用户的历史交互
  • 从对话中学习新知识
  • 基于用户反馈改进回答
  • 随着时间推移提高性能

在实际应用中,记忆和学习使智能体能够适应环境变化、个性化用户体验、持续改进性能,是构建高级智能体的核心组件。

在下一章中,我们将探讨智能体的推理引擎和执行模块,完成对AI智能体五大核心组件的全面学习。

💬 评论

暂无评论