在上那篇提到的我手搓的那个 RAG 项目新增功能中,漏掉了递归检索与迭代查询,这篇补上(源码见知识星球)。经过初步调试对召回效果有明显提升,这种方法解决了传统 RAG 的几个关键问题:
- 处理复杂多步骤问题:通过多次迭代,分解复杂问题
- 信息不足的补充:当初始检索结果不足以回答问题时,自动生成补充查询
- 多角度信息收集:能够从不同角度收集相关信息
1、递归检索具体实现
递归检索函数(recursive_retrieval)
(支持最多三次迭代查询)
每次迭代使用混合检索(向量检索+BM25)获取信息
使用 LLM 分析当前检索结果,判断是否需要进一步查询
如果需要,LLM 会生成新的查询问题,用于下一轮检索
换句话说,递归检索的工作原理可以理解为"先检索-后思考-再检索"的过程,模拟了人解决问题的方式:先获取一些信息,思考下是否足够,如果不够则继续查找更多相关信息。总之,好的结果不是一蹴而就的。
为了更直观的让大家了解这个递归检索的过程,贴几张本地测试图片,供参考。需要说明的是,目前这个版本中还没做多模态的预处理和图片的召回,后续结合具体案例更新在知识星球中。
2、核心组件全解析
下面再对整个代码的核心组件做进一步的解析,大家可以在这个基础上按照业务需求进一步优化:
2.1文档处理和向量化
文本提取:使用 pdfminer.high_level.extract_text_to_fp 从 PDF 提取文本内容
文本分块:使用 RecursiveCharacterTextSplitter 将长文本分割成更小的块(在代码中块大小设为 400 字符,重叠为 40 字符)
向量化:使用 all-MiniLM-L6-v2 模型为每个文本块生成嵌入向量
存储:将文本块、向量和元数据存储到 ChromaDB 向量数据库
辅助索引:同时构建 BM25Okapi 索引用于基于关键词的检索
分块粒度和嵌入质量直接影响检索性能,这部分大家需要结合自己的文档结构特点自行调整,上述列出的做法仅供参考。
2.2混合检索机制
结合了两种不同的检索方法:
语义向量检索:基于嵌入向量的相似度搜索,能够捕捉语义关系
BM25 关键词检索:基于词频的经典检索算法,专注于关键词匹配
混合策略:通过 hybrid_merge 函数融合两种检索结果,使用α参数(0.7)控制两种方法的权重
这种混合策略结合了语义理解和关键词匹配的优势,提高了检索的综合表现。
2.3重排序机制
检索到初步结果后,使用更精确的模型进行重排序:
交叉编码器重排序:使用 CrossEncoder 模型,能够同时考虑查询和文档内容进行更精确的相关性评分
LLM 重排序:可选使用 LLM 对查询和文档的相关性进行评分,利用大模型的理解能力
缓存机制:使用@lru_cache 减少重复计算,提高效率
重排序步骤使检索结果更符合用户的实际需求,大幅提升了检索质量。
2.4递归检索与迭代查询
这是新增的一个功能,也是实测下来对最终效果有明显提升的一点,由 recursive_retrieval 函数实现:
初始检索:使用原始查询进行首轮检索
结果分析:使用 LLM 分析当前检索结果是否充分回答问题
查询改写:如果信息不足,LLM 会生成一个新的、更具体或从不同角度的查询
迭代过程:使用新查询继续检索,直到获取足够信息或达到最大迭代次数
这个机制解决了单次检索的局限性,能够处理复杂问题、多步骤推理,以及需要从多个角度收集信息的场景。
2.5生成回答
系统支持两种回答生成方式:
本地 Ollama 模型:使用 deepseek-r1:1.5b 或 deepseek-r1:7b 模型在本地生成回答
SiliconFlow API:调用云端 API 使用更强大的模型生成回答
https://cloud.siliconflow.cn/i/cmXGS5IJ
思维链处理:支持分离思考过程,以可折叠的形式展示给用户
之所以选择新增一个商业api选项,也是因为我自己的电脑跑本地模型是在太卡,问了一些复杂问题容易触发超时机制。当然另外还有个重要原因是,在针对不同核心组件做调优时,保持 chat 模型的水准个人实践下来也很重要,否则就变成了过度雕花。
3、优化方向参考
因为只是方便大家练手,大家感兴趣的可以在目前代码基础上做如下优化:
3.1文档格式支持扩展
当前系统仅支持 PDF 文件,可扩展支持:
复制# 支持更多文档格式 def extract_text_from_file(file_path): """根据文件类型选择适当的提取方法""" ext = os.path.splitext(file_path)[1].lower() if ext == '.pdf': return extract_text_from_pdf(file_path) elif ext == '.docx': return extract_text_from_docx(file_path) elif ext == '.pptx': return extract_text_from_pptx(file_path) elif ext in ['.txt', '.md', '.py', '.java', '.js', '.html', '.css']: with open(file_path, 'r', encoding='utf-8', errors='ignore') as f: return f.read() elif ext in ['.csv', '.xlsx', '.xls']: return extract_text_from_spreadsheet(file_path) else: raise ValueError(f"不支持的文件类型: {ext}")
3.2高级分块策略
可以实现更智能的分块策略:
语义分块:基于段落、章节或自然语义边界进行分块
结构感知分块:识别文档结构(标题、列表、表格等)
多粒度分块:同时维护不同粒度的块,灵活应对不同类型的查询
复制# 多粒度分块示例 def create_multi_granularity_chunks(text): """创建多粒度分块""" # 大块(1000字符):适合概述类问题 large_splitter = RecursiveCharacterTextSplitter( chunk_size=1000, chunk_overlap=100 ) # 中块(400字符):平衡粒度 medium_splitter = RecursiveCharacterTextSplitter( chunk_size=400, chunk_overlap=40 ) # 小块(150字符):适合精确问答 small_splitter = RecursiveCharacterTextSplitter( chunk_size=150, chunk_overlap=20 ) large_chunks = large_splitter.split_text(text) medium_chunks = medium_splitter.split_text(text) small_chunks = small_splitter.split_text(text) return { "large": large_chunks, "medium": medium_chunks, "small": small_chunks }
3.3嵌入模型优化
当前系统使用 all-MiniLM-L6-v2,可以考虑:
更强大的多语言模型:如 multilingual-e5-large 提升中文理解能力
领域特定微调:针对特定领域数据微调嵌入模型
多模型集成:使用多个嵌入模型并融合结果
复制# 多模型嵌入示例 def generate_multi_model_embeddings(text): """使用多个模型生成嵌入并融合""" model1 = SentenceTransformer('all-MiniLM-L6-v2') model2 = SentenceTransformer('paraphrase-multilingual-mpnet-base-v2') # 生成嵌入 emb1 = model1.encode(text) emb2 = model2.encode(text) # 简单融合策略:归一化后拼接 emb1_norm = emb1 / np.linalg.norm(emb1) emb2_norm = emb2 / np.linalg.norm(emb2) # 返回融合嵌入 return np.concatenate([emb1_norm, emb2_norm])
3.4检索与重排序改进
可以考虑以下改进:
查询扩展:使用同义词扩展或关键词扩展增强原始查询
密集检索与稀疏检索结合:更高级的混合检索方法
上下文感知重排序:考虑已检索文档间的关系进行重排序
多轮对话记忆:在多轮对话中利用历史上下文改进检索
复制# 查询扩展示例 def expand_query(query): """使用LLM扩展查询,生成多个角度的查询变体""" prompt = f""" 为以下查询生成3个不同角度的变体,以提高检索召回率: 原始查询: {query} 变体应当包含同义词替换、关键词扩展或问题重构。 仅返回三个变体,每行一个,不要有额外解释。 """ response = call_llm_api(prompt) variants = [line.strip() for line in response.strip().split('\n')] return [query] + variants # 返回原始查询和变体
3.5其他可能的扩展方向
结果验证机制:使用外部知识或规则验证生成结果的准确性
用户反馈学习:根据用户反馈调整检索策略和参数
多模态支持:处理图像、表格等非文本内容
层次化索引:构建文档的层次化表示,从概览到细节
个性化调整:根据用户历史查询和偏好调整检索策略
4、RAG 学习路线全梳理
最后回到学习路线上来,最开始手搓这个 demo,也是自己为了更好的从底层理解 RAG 系统,不曾想收到了很多正反馈。
https://github.com/weiwill88/Local_Pdf_Chat_RAG
另外正如上篇所说,直接使用封装度高的框架容易让人"知其然不知其所以然"。如果你已经掌握了基础原理,就可以考虑进一步探索 RAGFlow 或者 LlamaIndex 等成熟框架的 Python API,充分利用成熟框架提供的生产力优势。按照个人近几个月的学习和实践经验,整体学习路线参考如下:
4.1从自建到框架的过渡期
对比学习:将手搓实现的组件与成熟框架中对应功能进行比较
框架源码阅读:尝试阅读 LlamaIndex 或 RAGFlow 等框架的核心源码,看看他们如何实现相同功能
增量替换:逐步用框架组件替换自建组件,对比性能和结果差异
4.2成熟框架深入学习
从示例入手:优先研究官方提供的示例代码和教程
构建微型项目:使用框架 API 实现小型但完整的应用
实验各种组件:测试不同的检索器、嵌入模型、重排序策略等
4.3高级应用与定制
探索高级功能:如多模态 RAG、Agent 集成、自定义索引等
性能优化:学习如何调整参数提高检索质量和速度
领域适配:根据特定行业或领域需求定制 RAG 系统
5、Python API vs Web 界面能做什么?
公众号之前有篇文章详细介绍了 RAGFlow 官方 Python API 各个模块使用,后续有些盆友私信我问到,这些 api 除了可以批量的按照自定义方式处理文件外,还可以干啥?结合近期学习和实践,提供以下五个方向供大家参考,具体参考示例代码:
5.1系统集成能力
复制# 与现有系统集成示例 from ragflow import RAGPipeline import your_company_database # 从公司数据库获取文档 documents = your_company_database.get_latest_documents() # 使用RAGFlow处理 pipeline = RAGPipeline() for doc in documents: pipeline.add_document(doc) # 将结果同步回公司系统 processed_results = pipeline.get_indexed_status() your_company_database.update_document_status(processed_results)
5.2定制化检索策略
复制# 自定义混合检索策略 from ragflow import RetrievalPipeline from ragflow.retrievers import VectorRetriever, KeywordRetriever # 创建自定义的检索器组合 def custom_hybrid_retriever(query, top_k=10): # 使用向量检索获取语义相关结果 semantic_results = vector_retriever.retrieve(query, top_k=top_k) # 使用关键词检索获取精确匹配 keyword_results = keyword_retriever.retrieve(query, top_k=top_k) # 自定义融合策略(比如考虑文档及时性等) results = custom_merge_function(semantic_results, keyword_results, recency_weight=0.2) return results
5.3高级处理流程自动化
复制# 创建复杂的数据处理和RAG工作流 from ragflow import DocumentProcessor, Indexer, QueryEngine import schedule import time def daily_knowledge_update(): # 从多个来源获取新文档 new_docs = fetch_documents_from_sources() # 文档预处理(去重、格式转换等) processed_docs = DocumentProcessor().process_batch(new_docs) # 增量更新索引 indexer = Indexer() indexer.update_incrementally(processed_docs) # 生成日报 report = generate_daily_summary() send_email(report) # 设置定时任务 schedule.every().day.at("01:00").do(daily_knowledge_update) while True: schedule.run_pending() time.sleep(60)
5.4自定义评估和优化
复制# 构建RAG系统评估框架 from ragflow import RAGEvaluator import matplotlib.pyplot as plt # 准备测试数据集 test_queries = load_test_queries() ground_truth = load_ground_truth_answers() # 测试不同的配置 configurations = [ {"embeddings": "minilm", "chunk_size": 200, "retriever": "hybrid"}, {"embeddings": "e5-large", "chunk_size": 500, "retriever": "vector"}, {"embeddings": "bge-large", "chunk_size": 300, "retriever": "hybrid"} ] results = {} for config in configurations: # 构建并测试每种配置 rag_system = build_rag_with_config(config) eval_results = RAGEvaluator.evaluate( rag_system, test_queries, ground_truth ) results[str(config)] = eval_results # 可视化不同配置的性能比较 plot_evaluation_results(results)
5.5领域特定的增强
复制# 医疗领域RAG增强示例 from ragflow import RAGPipeline from custom_medical_modules import MedicalTermNormalizer, DiseaseEntityExtractor # 创建领域特定的文档处理器 medical_processor = DocumentProcessor() medical_processor.add_transformer(MedicalTermNormalizer()) medical_processor.add_transformer(DiseaseEntityExtractor()) # 构建医疗特定的RAG系统 medical_rag = RAGPipeline( document_processor=medical_processor, retrieval_augmentors=[ CitationRetriever(), # 添加医学文献引用 EvidenceLevelClassifier() # 分类证据等级 ], response_formatters=[ MedicalDisclaimerFormatter(), # 添加医疗免责声明 CitationFormatter() # 格式化引用 ] )
当然,还有结合多个框架使用的示例,我在Medium看到一个博主使用 LlamaIndex 整合多源数据并构建索引,然后通过 RAGFlow 的深度解析和重排序优化结果。最后利用 RAGFlow 的引用功能增强答案可信度,同时调用 LlamaIndex 的代理进行动态数据补充。不过,我还没试过,感兴趣的可以测试下看看。
RAGFlow 等框架的实际业务价值,不同实践经验的盆友可能有不同的体会,个人总结其实就是一句话,使用这些框架的 Python API 可以让我们快速构建原型并迭代改进,比从 0-1 开发节省了大量时间。从而专注于解决业务问题,而非技术细节。