手搓RAG新增功能:递归检索与迭代查询+重回成熟框架API
在上那篇提到的我手搓的那个 RAG 项目新增功能中,漏掉了递归检索与迭代查询,这篇补上(源码见知识星球)。经过初步调试对召回效果有明显提升,这种方法解决了传统 RAG 的几个关键问题:
[*]处理复杂多步骤问题:通过多次迭代,分解复杂问题
[*]信息不足的补充:当初始检索结果不足以回答问题时,自动生成补充查询
[*]多角度信息收集:能够从不同角度收集相关信息
1、递归检索具体实现
递归检索函数(recursive_retrieval)
(支持最多三次迭代查询)
每次迭代使用混合检索(向量检索+BM25)获取信息
使用 LLM 分析当前检索结果,判断是否需要进一步查询
如果需要,LLM 会生成新的查询问题,用于下一轮检索
换句话说,递归检索的工作原理可以理解为"先检索-后思考-再检索"的过程,模拟了人解决问题的方式:先获取一些信息,思考下是否足够,如果不够则继续查找更多相关信息。总之,好的结果不是一蹴而就的。
为了更直观的让大家了解这个递归检索的过程,贴几张本地测试图片,供参考。需要说明的是,目前这个版本中还没做多模态的预处理和图片的召回,后续结合具体案例更新在知识星球中。
2、核心组件全解析
下面再对整个代码的核心组件做进一步的解析,大家可以在这个基础上按照业务需求进一步优化:
2.1文档处理和向量化
文本提取:使用 pdfminer.high_level.extract_text_to_fp 从 PDF 提取文本内容
文本分块:使用 RecursiveCharacterTextSplitter 将长文本分割成更小的块(在代码中块大小设为 400 字符,重叠为 40 字符)
向量化:使用 all-MiniLM-L6-v2 模型为每个文本块生成嵌入向量
存储:将文本块、向量和元数据存储到 ChromaDB 向量数据库
辅助索引:同时构建 BM25Okapi 索引用于基于关键词的检索
分块粒度和嵌入质量直接影响检索性能,这部分大家需要结合自己的文档结构特点自行调整,上述列出的做法仅供参考。
2.2混合检索机制
结合了两种不同的检索方法:
语义向量检索:基于嵌入向量的相似度搜索,能够捕捉语义关系
BM25 关键词检索:基于词频的经典检索算法,专注于关键词匹配
混合策略:通过 hybrid_merge 函数融合两种检索结果,使用α参数(0.7)控制两种方法的权重
这种混合策略结合了语义理解和关键词匹配的优势,提高了检索的综合表现。
2.3重排序机制
检索到初步结果后,使用更精确的模型进行重排序:
交叉编码器重排序:使用 CrossEncoder 模型,能够同时考虑查询和文档内容进行更精确的相关性评分
LLM 重排序:可选使用 LLM 对查询和文档的相关性进行评分,利用大模型的理解能力
缓存机制:使用@lru_cache 减少重复计算,提高效率
重排序步骤使检索结果更符合用户的实际需求,大幅提升了检索质量。
2.4递归检索与迭代查询
这是新增的一个功能,也是实测下来对最终效果有明显提升的一点,由 recursive_retrieval 函数实现:
初始检索:使用原始查询进行首轮检索
结果分析:使用 LLM 分析当前检索结果是否充分回答问题
查询改写:如果信息不足,LLM 会生成一个新的、更具体或从不同角度的查询
迭代过程:使用新查询继续检索,直到获取足够信息或达到最大迭代次数
这个机制解决了单次检索的局限性,能够处理复杂问题、多步骤推理,以及需要从多个角度收集信息的场景。
2.5生成回答
系统支持两种回答生成方式:
本地 Ollama 模型:使用 deepseek-r1:1.5b 或 deepseek-r1:7b 模型在本地生成回答
SiliconFlow API:调用云端 API 使用更强大的模型生成回答
https://cloud.siliconflow.cn/i/cmXGS5IJ
思维链处理:支持分离思考过程,以可折叠的形式展示给用户
之所以选择新增一个商业api选项,也是因为我自己的电脑跑本地模型是在太卡,问了一些复杂问题容易触发超时机制。当然另外还有个重要原因是,在针对不同核心组件做调优时,保持 chat 模型的水准个人实践下来也很重要,否则就变成了过度雕花。
3、优化方向参考
因为只是方便大家练手,大家感兴趣的可以在目前代码基础上做如下优化:
3.1文档格式支持扩展
当前系统仅支持 PDF 文件,可扩展支持:
# 支持更多文档格式def extract_text_from_file(file_path): """根据文件类型选择适当的提取方法""" ext = os.path.splitext(file_path).lower() if ext == '.pdf': return extract_text_from_pdf(file_path) elif ext == '.docx': return extract_text_from_docx(file_path) elif ext == '.pptx': return extract_text_from_pptx(file_path) elif ext in ['.txt', '.md', '.py', '.java', '.js', '.html', '.css']: with open(file_path, 'r', encoding='utf-8', errors='ignore') as f: return f.read() elif ext in ['.csv', '.xlsx', '.xls']: return extract_text_from_spreadsheet(file_path) else: raise ValueError(f"不支持的文件类型: {ext}")
[*]1.
[*]2.
[*]3.
[*]4.
[*]5.
[*]6.
[*]7.
[*]8.
[*]9.
[*]10.
[*]11.
[*]12.
[*]13.
[*]14.
[*]15.
[*]16.
[*]17.
[*]18.
3.2高级分块策略
可以实现更智能的分块策略:
语义分块:基于段落、章节或自然语义边界进行分块
结构感知分块:识别文档结构(标题、列表、表格等)
多粒度分块:同时维护不同粒度的块,灵活应对不同类型的查询
# 多粒度分块示例def create_multi_granularity_chunks(text): """创建多粒度分块""" # 大块(1000字符):适合概述类问题 large_splitter = RecursiveCharacterTextSplitter( chunk_size=1000, chunk_overlap=100 ) # 中块(400字符):平衡粒度 medium_splitter = RecursiveCharacterTextSplitter( chunk_size=400, chunk_overlap=40 ) # 小块(150字符):适合精确问答 small_splitter = RecursiveCharacterTextSplitter( chunk_size=150, chunk_overlap=20 ) large_chunks = large_splitter.split_text(text) medium_chunks = medium_splitter.split_text(text) small_chunks = small_splitter.split_text(text) return { "large": large_chunks, "medium": medium_chunks, "small": small_chunks }
[*]1.
[*]2.
[*]3.
[*]4.
[*]5.
[*]6.
[*]7.
[*]8.
[*]9.
[*]10.
[*]11.
[*]12.
[*]13.
[*]14.
[*]15.
[*]16.
[*]17.
[*]18.
[*]19.
[*]20.
[*]21.
[*]22.
[*]23.
[*]24.
[*]25.
3.3嵌入模型优化
当前系统使用 all-MiniLM-L6-v2,可以考虑:
更强大的多语言模型:如 multilingual-e5-large 提升中文理解能力
领域特定微调:针对特定领域数据微调嵌入模型
多模型集成:使用多个嵌入模型并融合结果
# 多模型嵌入示例def generate_multi_model_embeddings(text): """使用多个模型生成嵌入并融合""" model1 = SentenceTransformer('all-MiniLM-L6-v2') model2 = SentenceTransformer('paraphrase-multilingual-mpnet-base-v2') # 生成嵌入 emb1 = model1.encode(text) emb2 = model2.encode(text) # 简单融合策略:归一化后拼接 emb1_norm = emb1 / np.linalg.norm(emb1) emb2_norm = emb2 / np.linalg.norm(emb2) # 返回融合嵌入 return np.concatenate()
[*]1.
[*]2.
[*]3.
[*]4.
[*]5.
[*]6.
[*]7.
[*]8.
[*]9.
[*]10.
[*]11.
[*]12.
[*]13.
[*]14.
[*]15.
[*]16.
3.4检索与重排序改进
可以考虑以下改进:
查询扩展:使用同义词扩展或关键词扩展增强原始查询
密集检索与稀疏检索结合:更高级的混合检索方法
上下文感知重排序:考虑已检索文档间的关系进行重排序
多轮对话记忆:在多轮对话中利用历史上下文改进检索
# 查询扩展示例def expand_query(query): """使用LLM扩展查询,生成多个角度的查询变体""" prompt = f""" 为以下查询生成3个不同角度的变体,以提高检索召回率: 原始查询: {query} 变体应当包含同义词替换、关键词扩展或问题重构。 仅返回三个变体,每行一个,不要有额外解释。 """ response = call_llm_api(prompt) variants = return + variants# 返回原始查询和变体
[*]1.
[*]2.
[*]3.
[*]4.
[*]5.
[*]6.
[*]7.
[*]8.
[*]9.
[*]10.
[*]11.
[*]12.
[*]13.
[*]14.
[*]15.
3.5其他可能的扩展方向
结果验证机制:使用外部知识或规则验证生成结果的准确性
用户反馈学习:根据用户反馈调整检索策略和参数
多模态支持:处理图像、表格等非文本内容
层次化索引:构建文档的层次化表示,从概览到细节
个性化调整:根据用户历史查询和偏好调整检索策略
4、RAG 学习路线全梳理
最后回到学习路线上来,最开始手搓这个 demo,也是自己为了更好的从底层理解 RAG 系统,不曾想收到了很多正反馈。
https://github.com/weiwill88/Local_Pdf_Chat_RAG
另外正如上篇所说,直接使用封装度高的框架容易让人"知其然不知其所以然"。如果你已经掌握了基础原理,就可以考虑进一步探索 RAGFlow 或者 LlamaIndex 等成熟框架的 Python API,充分利用成熟框架提供的生产力优势。按照个人近几个月的学习和实践经验,整体学习路线参考如下:
4.1从自建到框架的过渡期
对比学习:将手搓实现的组件与成熟框架中对应功能进行比较
框架源码阅读:尝试阅读 LlamaIndex 或 RAGFlow 等框架的核心源码,看看他们如何实现相同功能
增量替换:逐步用框架组件替换自建组件,对比性能和结果差异
4.2成熟框架深入学习
从示例入手:优先研究官方提供的示例代码和教程
构建微型项目:使用框架 API 实现小型但完整的应用
实验各种组件:测试不同的检索器、嵌入模型、重排序策略等
4.3高级应用与定制
探索高级功能:如多模态 RAG、Agent 集成、自定义索引等
性能优化:学习如何调整参数提高检索质量和速度
领域适配:根据特定行业或领域需求定制 RAG 系统
5、Python API vs Web 界面能做什么?
公众号之前有篇文章详细介绍了 RAGFlow 官方 Python API 各个模块使用,后续有些盆友私信我问到,这些 api 除了可以批量的按照自定义方式处理文件外,还可以干啥?结合近期学习和实践,提供以下五个方向供大家参考,具体参考示例代码:
5.1系统集成能力
# 与现有系统集成示例from ragflow import RAGPipelineimport your_company_database# 从公司数据库获取文档documents = your_company_database.get_latest_documents()# 使用RAGFlow处理pipeline = RAGPipeline()for doc in documents: pipeline.add_document(doc) # 将结果同步回公司系统processed_results = pipeline.get_indexed_status()your_company_database.update_document_status(processed_results)
[*]1.
[*]2.
[*]3.
[*]4.
[*]5.
[*]6.
[*]7.
[*]8.
[*]9.
[*]10.
[*]11.
[*]12.
[*]13.
[*]14.
[*]15.
[*]16.
[*]17.
5.2定制化检索策略
# 自定义混合检索策略from ragflow import RetrievalPipelinefrom ragflow.retrievers import VectorRetriever, KeywordRetriever# 创建自定义的检索器组合def custom_hybrid_retriever(query, top_k=10): # 使用向量检索获取语义相关结果 semantic_results = vector_retriever.retrieve(query, top_k=top_k) # 使用关键词检索获取精确匹配 keyword_results = keyword_retriever.retrieve(query, top_k=top_k) # 自定义融合策略(比如考虑文档及时性等) results = custom_merge_function(semantic_results, keyword_results, recency_weight=0.2) return results
[*]1.
[*]2.
[*]3.
[*]4.
[*]5.
[*]6.
[*]7.
[*]8.
[*]9.
[*]10.
[*]11.
[*]12.
[*]13.
[*]14.
[*]15.
[*]16.
[*]17.
5.3高级处理流程自动化
# 创建复杂的数据处理和RAG工作流from ragflow import DocumentProcessor, Indexer, QueryEngineimport scheduleimport timedef daily_knowledge_update(): # 从多个来源获取新文档 new_docs = fetch_documents_from_sources() # 文档预处理(去重、格式转换等) processed_docs = DocumentProcessor().process_batch(new_docs) # 增量更新索引 indexer = Indexer() indexer.update_incrementally(processed_docs) # 生成日报 report = generate_daily_summary() send_email(report)# 设置定时任务schedule.every().day.at("01:00").do(daily_knowledge_update)while True: schedule.run_pending() time.sleep(60)
[*]1.
[*]2.
[*]3.
[*]4.
[*]5.
[*]6.
[*]7.
[*]8.
[*]9.
[*]10.
[*]11.
[*]12.
[*]13.
[*]14.
[*]15.
[*]16.
[*]17.
[*]18.
[*]19.
[*]20.
[*]21.
[*]22.
[*]23.
[*]24.
[*]25.
[*]26.
[*]27.
[*]28.
[*]29.
5.4自定义评估和优化
# 构建RAG系统评估框架from ragflow import RAGEvaluatorimport matplotlib.pyplot as plt# 准备测试数据集test_queries = load_test_queries()ground_truth = load_ground_truth_answers()# 测试不同的配置configurations = [ {"embeddings": "minilm", "chunk_size": 200, "retriever": "hybrid"}, {"embeddings": "e5-large", "chunk_size": 500, "retriever": "vector"}, {"embeddings": "bge-large", "chunk_size": 300, "retriever": "hybrid"}]results = {}for config in configurations: # 构建并测试每种配置 rag_system = build_rag_with_config(config) eval_results = RAGEvaluator.evaluate( rag_system, test_queries, ground_truth ) results = eval_results# 可视化不同配置的性能比较plot_evaluation_results(results)
[*]1.
[*]2.
[*]3.
[*]4.
[*]5.
[*]6.
[*]7.
[*]8.
[*]9.
[*]10.
[*]11.
[*]12.
[*]13.
[*]14.
[*]15.
[*]16.
[*]17.
[*]18.
[*]19.
[*]20.
[*]21.
[*]22.
[*]23.
[*]24.
[*]25.
[*]26.
[*]27.
[*]28.
[*]29.
[*]30.
5.5领域特定的增强
# 医疗领域RAG增强示例from ragflow import RAGPipelinefrom custom_medical_modules import MedicalTermNormalizer, DiseaseEntityExtractor# 创建领域特定的文档处理器medical_processor = DocumentProcessor()medical_processor.add_transformer(MedicalTermNormalizer())medical_processor.add_transformer(DiseaseEntityExtractor())# 构建医疗特定的RAG系统medical_rag = RAGPipeline( document_processor=medical_processor, retrieval_augmentors=[ CitationRetriever(),# 添加医学文献引用 EvidenceLevelClassifier()# 分类证据等级 ], response_formatters=[ MedicalDisclaimerFormatter(),# 添加医疗免责声明 CitationFormatter()# 格式化引用 ])
[*]1.
[*]2.
[*]3.
[*]4.
[*]5.
[*]6.
[*]7.
[*]8.
[*]9.
[*]10.
[*]11.
[*]12.
[*]13.
[*]14.
[*]15.
[*]16.
[*]17.
[*]18.
[*]19.
[*]20.
[*]21.
[*]22.
[*]23.
当然,还有结合多个框架使用的示例,我在Medium看到一个博主使用 LlamaIndex 整合多源数据并构建索引,然后通过 RAGFlow 的深度解析和重排序优化结果。最后利用 RAGFlow 的引用功能增强答案可信度,同时调用 LlamaIndex 的代理进行动态数据补充。不过,我还没试过,感兴趣的可以测试下看看。
RAGFlow 等框架的实际业务价值,不同实践经验的盆友可能有不同的体会,个人总结其实就是一句话,使用这些框架的 Python API 可以让我们快速构建原型并迭代改进,比从 0-1 开发节省了大量时间。从而专注于解决业务问题,而非技术细节。
页:
[1]