手搓RAG新增功能:递归检索与迭代查询+重回成熟框架API

在上那篇提到的我手搓的那个 RAG 项目新增功能中,漏掉了递归检索与迭代查询,这篇补上(源码见知识星球)。 经过初步调试对召回效果有明显提升,这种方法解决了传统 RAG 的几个关键问题:处理复杂多步骤问题:通过多次迭代,分解复杂问题信息不足的补充:当初始检索结果不足以回答问题时,自动生成补充查询多角度信息收集:能够从不同角度收集相关信息1、递归检索具体实现递归检索函数(recursive_retrieval)(支持最多三次迭代查询)每次迭代使用混合检索(向量检索 BM25)获取信息使用 LLM 分析当前检索结果,判断是否需要进一步查询如果需要,LLM 会生成新的查询问题,用于下一轮检索换句话说,递归检索的工作原理可以理解为"先检索-后思考-再检索"的过程,模拟了人解决问题的方式:先获取一些信息,思考下是否足够,如果不够则继续查找更多相关信息。 总之,好的结果不是一蹴而就的。

手搓RAG新增功能:递归检索与迭代查询+重回成熟框架API

在上那篇提到的我手搓的那个 RAG 项目新增功能中,漏掉了递归检索与迭代查询,这篇补上(源码见知识星球)。经过初步调试对召回效果有明显提升,这种方法解决了传统 RAG 的几个关键问题:

  • 处理复杂多步骤问题:通过多次迭代,分解复杂问题
  • 信息不足的补充:当初始检索结果不足以回答问题时,自动生成补充查询
  • 多角度信息收集:能够从不同角度收集相关信息

1、递归检索具体实现

递归检索函数(recursive_retrieval)

(支持最多三次迭代查询)

每次迭代使用混合检索(向量检索+BM25)获取信息

使用 LLM 分析当前检索结果,判断是否需要进一步查询

如果需要,LLM 会生成新的查询问题,用于下一轮检索

换句话说,递归检索的工作原理可以理解为"先检索-后思考-再检索"的过程,模拟了人解决问题的方式:先获取一些信息,思考下是否足够,如果不够则继续查找更多相关信息。总之,好的结果不是一蹴而就的。

为了更直观的让大家了解这个递归检索的过程,贴几张本地测试图片,供参考。需要说明的是,目前这个版本中还没做多模态的预处理和图片的召回,后续结合具体案例更新在知识星球中。

图片

图片

图片

图片

2、核心组件全解析

下面再对整个代码的核心组件做进一步的解析,大家可以在这个基础上按照业务需求进一步优化:

图片

2.1文档处理和向量化

文本提取:使用 pdfminer.high_level.extract_text_to_fp 从 PDF 提取文本内容

文本分块:使用 RecursiveCharacterTextSplitter 将长文本分割成更小的块(在代码中块大小设为 400 字符,重叠为 40 字符)

向量化:使用 all-MiniLM-L6-v2 模型为每个文本块生成嵌入向量

存储:将文本块、向量和元数据存储到 ChromaDB 向量数据库

辅助索引:同时构建 BM25Okapi 索引用于基于关键词的检索

分块粒度和嵌入质量直接影响检索性能,这部分大家需要结合自己的文档结构特点自行调整,上述列出的做法仅供参考。

2.2混合检索机制

结合了两种不同的检索方法:

语义向量检索:基于嵌入向量的相似度搜索,能够捕捉语义关系

BM25 关键词检索:基于词频的经典检索算法,专注于关键词匹配

混合策略:通过 hybrid_merge 函数融合两种检索结果,使用α参数(0.7)控制两种方法的权重

这种混合策略结合了语义理解和关键词匹配的优势,提高了检索的综合表现。

2.3重排序机制

检索到初步结果后,使用更精确的模型进行重排序:

交叉编码器重排序:使用 CrossEncoder 模型,能够同时考虑查询和文档内容进行更精确的相关性评分

LLM 重排序:可选使用 LLM 对查询和文档的相关性进行评分,利用大模型的理解能力

缓存机制:使用@lru_cache 减少重复计算,提高效率

重排序步骤使检索结果更符合用户的实际需求,大幅提升了检索质量。

2.4递归检索与迭代查询

这是新增的一个功能,也是实测下来对最终效果有明显提升的一点,由 recursive_retrieval 函数实现:

初始检索:使用原始查询进行首轮检索

结果分析:使用 LLM 分析当前检索结果是否充分回答问题

查询改写:如果信息不足,LLM 会生成一个新的、更具体或从不同角度的查询

迭代过程:使用新查询继续检索,直到获取足够信息或达到最大迭代次数

这个机制解决了单次检索的局限性,能够处理复杂问题、多步骤推理,以及需要从多个角度收集信息的场景。

2.5生成回答

系统支持两种回答生成方式:

本地 Ollama 模型:使用 deepseek-r1:1.5b 或 deepseek-r1:7b 模型在本地生成回答

SiliconFlow API:调用云端 API 使用更强大的模型生成回答

图片

https://cloud.siliconflow.cn/i/cmXGS5IJ

思维链处理:支持分离思考过程,以可折叠的形式展示给用户

之所以选择新增一个商业api选项,也是因为我自己的电脑跑本地模型是在太卡,问了一些复杂问题容易触发超时机制。当然另外还有个重要原因是,在针对不同核心组件做调优时,保持 chat 模型的水准个人实践下来也很重要,否则就变成了过度雕花。

3、优化方向参考

因为只是方便大家练手,大家感兴趣的可以在目前代码基础上做如下优化:

3.1文档格式支持扩展

当前系统仅支持 PDF 文件,可扩展支持:

复制
# 支持更多文档格式
def extract_text_from_file(file_path):
    """根据文件类型选择适当的提取方法"""
    ext = os.path.splitext(file_path)[1].lower()
   
    if ext == '.pdf':
        return extract_text_from_pdf(file_path)
    elif ext == '.docx':
        return extract_text_from_docx(file_path)
    elif ext == '.pptx':
        return extract_text_from_pptx(file_path)
    elif ext in ['.txt', '.md', '.py', '.java', '.js', '.html', '.css']:
        with open(file_path, 'r', encoding='utf-8', errors='ignore') as f:
            return f.read()
    elif ext in ['.csv', '.xlsx', '.xls']:
        return extract_text_from_spreadsheet(file_path)
    else:
        raise ValueError(f"不支持的文件类型: {ext}")

3.2高级分块策略

可以实现更智能的分块策略:

语义分块:基于段落、章节或自然语义边界进行分块

结构感知分块:识别文档结构(标题、列表、表格等)

多粒度分块:同时维护不同粒度的块,灵活应对不同类型的查询 

复制
# 多粒度分块示例
def create_multi_granularity_chunks(text):
    """创建多粒度分块"""
    # 大块(1000字符):适合概述类问题
    large_splitter = RecursiveCharacterTextSplitter(
        chunk_size=1000, chunk_overlap=100
    )
    # 中块(400字符):平衡粒度
    medium_splitter = RecursiveCharacterTextSplitter(
        chunk_size=400, chunk_overlap=40
    )
    # 小块(150字符):适合精确问答
    small_splitter = RecursiveCharacterTextSplitter(
        chunk_size=150, chunk_overlap=20
    )
    
    large_chunks = large_splitter.split_text(text)
    medium_chunks = medium_splitter.split_text(text)
    small_chunks = small_splitter.split_text(text)
    
    return {
        "large": large_chunks,
        "medium": medium_chunks,
        "small": small_chunks
    }

3.3嵌入模型优化

当前系统使用 all-MiniLM-L6-v2,可以考虑:

更强大的多语言模型:如 multilingual-e5-large 提升中文理解能力

领域特定微调:针对特定领域数据微调嵌入模型

多模型集成:使用多个嵌入模型并融合结果 

复制
# 多模型嵌入示例
def generate_multi_model_embeddings(text):
    """使用多个模型生成嵌入并融合"""
    model1 = SentenceTransformer('all-MiniLM-L6-v2')
    model2 = SentenceTransformer('paraphrase-multilingual-mpnet-base-v2')
    
    # 生成嵌入
    emb1 = model1.encode(text)
    emb2 = model2.encode(text)
    
    # 简单融合策略:归一化后拼接
    emb1_norm = emb1 / np.linalg.norm(emb1)
    emb2_norm = emb2 / np.linalg.norm(emb2)
    
    # 返回融合嵌入
    return np.concatenate([emb1_norm, emb2_norm])

3.4检索与重排序改进

可以考虑以下改进:

查询扩展:使用同义词扩展或关键词扩展增强原始查询

密集检索与稀疏检索结合:更高级的混合检索方法

上下文感知重排序:考虑已检索文档间的关系进行重排序

多轮对话记忆:在多轮对话中利用历史上下文改进检索

复制
# 查询扩展示例
def expand_query(query):
    """使用LLM扩展查询,生成多个角度的查询变体"""
    prompt = f"""
    为以下查询生成3个不同角度的变体,以提高检索召回率:
    
    原始查询: {query}
    
    变体应当包含同义词替换、关键词扩展或问题重构。
    仅返回三个变体,每行一个,不要有额外解释。
    """
    
    response = call_llm_api(prompt)
    variants = [line.strip() for line in response.strip().split('\n')]
    return [query] + variants  # 返回原始查询和变体

3.5其他可能的扩展方向

结果验证机制:使用外部知识或规则验证生成结果的准确性

用户反馈学习:根据用户反馈调整检索策略和参数

多模态支持:处理图像、表格等非文本内容

层次化索引:构建文档的层次化表示,从概览到细节

个性化调整:根据用户历史查询和偏好调整检索策略

4、RAG 学习路线全梳理

最后回到学习路线上来,最开始手搓这个 demo,也是自己为了更好的从底层理解 RAG 系统,不曾想收到了很多正反馈。

图片

https://github.com/weiwill88/Local_Pdf_Chat_RAG

另外正如上篇所说,直接使用封装度高的框架容易让人"知其然不知其所以然"。如果你已经掌握了基础原理,就可以考虑进一步探索 RAGFlow 或者 LlamaIndex 等成熟框架的 Python API,充分利用成熟框架提供的生产力优势。按照个人近几个月的学习和实践经验,整体学习路线参考如下:

4.1从自建到框架的过渡期

对比学习:将手搓实现的组件与成熟框架中对应功能进行比较

框架源码阅读:尝试阅读 LlamaIndex 或 RAGFlow 等框架的核心源码,看看他们如何实现相同功能

增量替换:逐步用框架组件替换自建组件,对比性能和结果差异

4.2成熟框架深入学习

从示例入手:优先研究官方提供的示例代码和教程

构建微型项目:使用框架 API 实现小型但完整的应用

实验各种组件:测试不同的检索器、嵌入模型、重排序策略等

4.3高级应用与定制

探索高级功能:如多模态 RAG、Agent 集成、自定义索引等

性能优化:学习如何调整参数提高检索质量和速度

领域适配:根据特定行业或领域需求定制 RAG 系统

5、Python API vs Web 界面能做什么?

公众号之前有篇文章详细介绍了 RAGFlow 官方 Python API 各个模块使用,后续有些盆友私信我问到,这些 api 除了可以批量的按照自定义方式处理文件外,还可以干啥?结合近期学习和实践,提供以下五个方向供大家参考,具体参考示例代码:

5.1系统集成能力

复制
# 与现有系统集成示例
from ragflow import RAGPipeline
import your_company_database


# 从公司数据库获取文档
documents = your_company_database.get_latest_documents()


# 使用RAGFlow处理
pipeline = RAGPipeline()
for doc in documents:
    pipeline.add_document(doc)
    
# 将结果同步回公司系统
processed_results = pipeline.get_indexed_status()
your_company_database.update_document_status(processed_results)

5.2定制化检索策略

复制
# 自定义混合检索策略
from ragflow import RetrievalPipeline
from ragflow.retrievers import VectorRetriever, KeywordRetriever


# 创建自定义的检索器组合
def custom_hybrid_retriever(query, top_k=10):
    # 使用向量检索获取语义相关结果
    semantic_results = vector_retriever.retrieve(query, top_k=top_k)
    
    # 使用关键词检索获取精确匹配
    keyword_results = keyword_retriever.retrieve(query, top_k=top_k)
    
    # 自定义融合策略(比如考虑文档及时性等)
    results = custom_merge_function(semantic_results, keyword_results, 
                                    recency_weight=0.2)
    return results

5.3高级处理流程自动化

复制
# 创建复杂的数据处理和RAG工作流
from ragflow import DocumentProcessor, Indexer, QueryEngine
import schedule
import time


def daily_knowledge_update():
    # 从多个来源获取新文档
    new_docs = fetch_documents_from_sources()
    
    # 文档预处理(去重、格式转换等)
    processed_docs = DocumentProcessor().process_batch(new_docs)
    
    # 增量更新索引
    indexer = Indexer()
    indexer.update_incrementally(processed_docs)
    
    # 生成日报
    report = generate_daily_summary()
    send_email(report)


# 设置定时任务
schedule.every().day.at("01:00").do(daily_knowledge_update)


while True:
    schedule.run_pending()
    time.sleep(60)

5.4自定义评估和优化

复制
# 构建RAG系统评估框架
from ragflow import RAGEvaluator
import matplotlib.pyplot as plt


# 准备测试数据集
test_queries = load_test_queries()
ground_truth = load_ground_truth_answers()


# 测试不同的配置
configurations = [
    {"embeddings": "minilm", "chunk_size": 200, "retriever": "hybrid"},
    {"embeddings": "e5-large", "chunk_size": 500, "retriever": "vector"},
    {"embeddings": "bge-large", "chunk_size": 300, "retriever": "hybrid"}
]


results = {}
for config in configurations:
    # 构建并测试每种配置
    rag_system = build_rag_with_config(config)
    eval_results = RAGEvaluator.evaluate(
        rag_system, test_queries, ground_truth
    )
    results[str(config)] = eval_results


# 可视化不同配置的性能比较
plot_evaluation_results(results)

5.5领域特定的增强

复制
# 医疗领域RAG增强示例
from ragflow import RAGPipeline
from custom_medical_modules import MedicalTermNormalizer, DiseaseEntityExtractor


# 创建领域特定的文档处理器
medical_processor = DocumentProcessor()
medical_processor.add_transformer(MedicalTermNormalizer())
medical_processor.add_transformer(DiseaseEntityExtractor())


# 构建医疗特定的RAG系统
medical_rag = RAGPipeline(
    document_processor=medical_processor,
    retrieval_augmentors=[
        CitationRetriever(),  # 添加医学文献引用
        EvidenceLevelClassifier()  # 分类证据等级
    ],
    response_formatters=[
        MedicalDisclaimerFormatter(),  # 添加医疗免责声明
        CitationFormatter()  # 格式化引用
    ]
)

当然,还有结合多个框架使用的示例,我在Medium看到一个博主使用 LlamaIndex 整合多源数据并构建索引,然后通过 RAGFlow 的深度解析和重排序优化结果。最后利用 RAGFlow 的引用功能增强答案可信度,同时调用 LlamaIndex 的代理进行动态数据补充。不过,我还没试过,感兴趣的可以测试下看看。

RAGFlow 等框架的实际业务价值,不同实践经验的盆友可能有不同的体会,个人总结其实就是一句话,使用这些框架的 Python API 可以让我们快速构建原型并迭代改进,比从 0-1 开发节省了大量时间。从而专注于解决业务问题,而非技术细节。

相关资讯

推荐一个企业级知识图谱增强的检索增强生成(RAG)的项目

介绍Microsoft GraphRAG 是一个开源项目,旨在利用 Microsoft Graph 的强大功能构建企业级的知识图谱增强的检索增强生成(RAG)方案。 简单来说,它将企业内部的各种数据源(如邮件、文档、日历、联系人等)通过 Microsoft Graph 连接起来,形成一个结构化的知识图谱,然后利用这个知识图谱来增强 RAG 系统的检索能力,从而提升大语言模型(LLM)在企业应用中的问答和生成效果。 项目架构GraphRAG 的架构设计清晰且模块化,主要包含以下几个核心组件:(1) 数据连接器(Data Connectors):负责从各种企业数据源(如 Microsoft 365 服务,包括 Exchange Online、SharePoint Online、OneDrive、Teams 等)提取数据。

RAG(一)RAG开山之作:知识密集型NLP任务的“新范式”

在AI应用爆发的时代,RAG(Retrieval-Augmented Generation,检索增强生成)技术正逐渐成为AI 2.0时代的“杀手级”应用。 它通过将信息检索与文本生成相结合,突破了传统生成模型在知识覆盖和回答准确性上的瓶颈。 不仅提升了模型的性能和可靠性,还降低了成本,增强了可解释性。

基于阿里开源Qwen2.5-7B-Instruct模型进行多代理RAG开发实战

译者 | 朱先忠审校 | 重楼引言大型语言模型已经展现出令人印象深刻的能力,并且随着每一代新模型的发布,它们仍在稳步改进。 例如,聊天机器人和自动摘要器等应用程序可以直接利用LLM的语言能力,因为这些LLM只要求生成文本输出——这也是该类模型的自然设置。 此外,大型语言模型还表现出了理解和解决复杂任务的令人印象深刻的能力,但是只要它们的解决方案保持“纸上谈兵”,即纯文本形式,那么它们就需要外部人类用户代表它们行事并报告所提议操作的结果。