前言
在企业 AI 应用中,RAG(Retrieval-Augmented Generation,检索增强生成) 已成为构建智能问答系统的标准方案。但 RAG 系统的效果很大程度上取决于文档处理的质量 —— 垃圾进,垃圾出。
本文将以实际项目为例,详细讲解如何自动化处理 Word、PPT、PDF 文档,构建高质量的 RAG 知识库。
RAG 知识库架构
核心组件
┌─────────────────────────────────────────────────────────────┐
│ RAG 系统架构 │
├─────────────────────────────────────────────────────────────┤
│ │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │ 文档处理 │ → │ 向量化 │ → │ 向量存储 │ │
│ │ 层 │ │ 层 │ │ 层 │ │
│ └──────────┘ └──────────┘ └──────────┘ │
│ ↓ ↓ ↓ │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │ Word │ │ Embedding│ │ FAISS │ │
│ │ PPT │ │ 模型 │ │ Chroma │ │
│ │ PDF │ │ │ │ Pinecone │ │
│ └──────────┘ └──────────┘ └──────────┘ │
│ │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │ 检索层 │ ← │ 查询 │ ← │ 用户 │ │
│ │ + 重排序 │ │ 处理 │ │ 输入 │ │
│ └──────────┘ └──────────┘ └──────────┘ │
│ ↓ │
│ ┌──────────┐ │
│ │ LLM 生成 │ │
│ │ 响应 │ │
│ └──────────┘ │
└─────────────────────────────────────────────────────────────┘
数据处理流程
graph LR
A[原始文档] --> B[文档解析]
B --> C[文本清洗]
C --> D[智能分块]
D --> E[向量化]
E --> F[向量存储]
F --> G[检索查询]
G --> H[LLM 生成]
第一步:文档解析
1.1 Word 文档处理
依赖安装:
pip install python-docx pandoc
npm install -g docx
解析 Word 文档:
from docx import Document
import os
def parse_docx(file_path):
"""解析 Word 文档,提取文本和结构信息"""
doc = Document(file_path)
content = {
'title': '',
'sections': [],
'tables': [],
'images': []
}
# 提取标题
for para in doc.paragraphs:
if para.style.name.startswith('Heading'):
content['sections'].append({
'level': int(para.style.name[-1]),
'text': para.text,
'style': para.style.name
})
elif para.text.strip():
content['sections'].append({
'level': 0,
'text': para.text,
'style': 'Normal'
})
# 提取表格
for i, table in enumerate(doc.tables):
table_data = []
for row in table.rows:
row_data = [cell.text for cell in row.cells]
table_data.append(row_data)
content['tables'].append({
'index': i,
'data': table_data
})
return content
# 使用示例
doc_content = parse_docx('document.docx')
print(f"章节数:{len(doc_content['sections'])}")
print(f"表格数:{len(doc_content['tables'])}")
批量处理多个文档:
from pathlib import Path
from concurrent.futures import ThreadPoolExecutor
def batch_parse_docx(directory, max_workers=4):
"""批量解析目录下的所有 Word 文档"""
docx_files = list(Path(directory).glob('*.docx'))
results = {}
def process_file(file_path):
try:
content = parse_docx(str(file_path))
return file_path.name, content, None
except Exception as e:
return file_path.name, None, str(e)
with ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = [executor.submit(process_file, f) for f in docx_files]
for future in futures:
filename, content, error = future.result()
if error:
print(f"处理 {filename} 失败:{error}")
else:
results[filename] = content
return results
1.2 PPT 演示文稿处理
依赖安装:
pip install python-pptx markitdown
解析 PPT 文档:
from pptx import Presentation
def parse_pptx(file_path):
"""解析 PPT 文档,提取幻灯片内容"""
prs = Presentation(file_path)
slides = []
for i, slide in enumerate(prs.slides):
slide_content = {
'slide_number': i + 1,
'title': '',
'text': [],
'images': [],
'tables': []
}
# 提取标题
if slide.shapes.title:
slide_content['title'] = slide.shapes.title.text
# 提取文本
for shape in slide.shapes:
if hasattr(shape, "text") and shape.text.strip():
if shape != slide.shapes.title:
slide_content['text'].append(shape.text)
# 提取表格
if shape.has_table:
table = shape.table
table_data = []
for row in table.rows:
row_data = [cell.text for cell in row.cells]
table_data.append(row_data)
slide_content['tables'].append(table_data)
# 提取图片
if shape.shape_type == 13: # MSO_SHAPE_TYPE.PICTURE
slide_content['images'].append({
'name': shape.name,
'type': 'image'
})
slides.append(slide_content)
return {
'filename': os.path.basename(file_path),
'total_slides': len(slides),
'slides': slides
}
# 使用示例
ppt_content = parse_pptx('presentation.pptx')
print(f"幻灯片数:{ppt_content['total_slides']}")
for slide in ppt_content['slides'][:3]:
print(f"第{slide['slide_number']}页:{slide['title']}")
1.3 PDF 文档处理
依赖安装:
pip install pypdf pdfplumber pdf2image pytesseract
解析 PDF 文档:
import pdfplumber
from pypdf import PdfReader
def parse_pdf(file_path, extract_tables=True):
"""解析 PDF 文档,提取文本和表格"""
# 基础文本提取
reader = PdfReader(file_path)
text_pages = []
for i, page in enumerate(reader.pages):
text = page.extract_text()
text_pages.append({
'page': i + 1,
'text': text
})
# 表格提取(使用 pdfplumber 更准确)
tables = []
if extract_tables:
with pdfplumber.open(file_path) as pdf:
for i, page in enumerate(pdf.pages):
page_tables = page.extract_tables()
for j, table in enumerate(page_tables):
tables.append({
'page': i + 1,
'table_index': j,
'data': table
})
return {
'filename': os.path.basename(file_path),
'total_pages': len(text_pages),
'pages': text_pages,
'tables': tables
}
# 使用示例
pdf_content = parse_pdf('document.pdf')
print(f"总页数:{pdf_content['total_pages']}")
print(f"表格数:{len(pdf_content['tables'])}")
OCR 处理扫描件:
from pdf2image import convert_from_path
import pytesseract
def ocr_pdf(file_path, lang='chi_sim+eng'):
"""对扫描版 PDF 进行 OCR 识别"""
# 转换 PDF 为图片
images = convert_from_path(file_path, dpi=300)
text_pages = []
for i, image in enumerate(images):
# OCR 识别
text = pytesseract.image_to_string(image, lang=lang)
text_pages.append({
'page': i + 1,
'text': text
})
return {
'filename': os.path.basename(file_path),
'total_pages': len(text_pages),
'pages': text_pages,
'ocr': True
}
第二步:文本清洗和预处理
2.1 通用清洗函数
import re
from typing import List, Dict
def clean_text(text: str) -> str:
"""清洗文本,移除无用字符和格式"""
# 移除多余空白
text = re.sub(r'\s+', ' ', text)
# 移除特殊字符(保留中文标点)
text = re.sub(r'[^\w\s\u4e00-\u9fff,.!?;:()""''-]', '', text)
# 移除页眉页脚(常见模式)
text = re.sub(r'^\d+\s*$', '', text, flags=re.MULTILINE)
text = re.sub(r'^第\d+页.*$', '', text, flags=re.MULTILINE)
# 移除过短的行(可能是噪声)
lines = text.split('\n')
lines = [line for line in lines if len(line.strip()) > 2]
return '\n'.join(lines).strip()
def extract_metadata(content: Dict) -> Dict:
"""提取文档元数据"""
return {
'source': content.get('filename', 'unknown'),
'total_pages': content.get('total_pages', 0),
'total_slides': content.get('total_slides', 0),
'has_tables': len(content.get('tables', [])) > 0,
'processed_at': datetime.now().isoformat()
}
2.2 结构化文本组织
def organize_content(parsed_content: Dict) -> List[Dict]:
"""将解析内容组织为结构化段落"""
chunks = []
# 处理 Word 文档
if 'sections' in parsed_content:
current_section = ''
for section in parsed_content['sections']:
if section['level'] > 0:
current_section = section['text']
chunks.append({
'type': 'section' if section['level'] > 0 else 'paragraph',
'heading': current_section,
'content': section['text'],
'metadata': {'level': section['level']}
})
# 处理 PPT
if 'slides' in parsed_content:
for slide in parsed_content['slides']:
text_content = '\n'.join(slide.get('text', []))
if text_content:
chunks.append({
'type': 'slide',
'heading': slide.get('title', ''),
'content': text_content,
'metadata': {'slide_number': slide['slide_number']}
})
# 处理 PDF
if 'pages' in parsed_content:
for page in parsed_content['pages']:
if page['text'].strip():
chunks.append({
'type': 'page',
'heading': f"第{page['page']}页",
'content': page['text'],
'metadata': {'page_number': page['page']}
})
return chunks
第三步:智能文本分块
3.1 分块策略对比
| 策略 | 优点 | 缺点 | 适用场景 |
|---|---|---|---|
| 固定长度 | 简单、可预测 | 破坏语义边界 | 短文本、均匀内容 |
| 按段落 | 保持语义完整 | 块大小不均 | 结构化文档 |
| 递归分块 | 灵活、保持上下文 | 实现复杂 | 长文档、混合内容 |
| 语义分块 | 最佳语义连贯性 | 计算成本高 | 高质量要求场景 |
3.2 递归字符分块
from langchain.text_splitter import RecursiveCharacterTextSplitter
def recursive_chunking(texts: List[str], chunk_size=500, chunk_overlap=50):
"""使用 LangChain 递归分块器"""
text_splitter = RecursiveCharacterTextSplitter(
chunk_size=chunk_size,
chunk_overlap=chunk_overlap,
length_function=len,
separators=["\n\n", "\n", "。", "!", "?", ";", " ", ""]
)
chunks = []
for text in texts:
chunk_texts = text_splitter.split_text(text)
chunks.extend(chunk_texts)
return chunks
# 使用示例
texts = ["文档内容 1", "文档内容 2"]
chunks = recursive_chunking(texts, chunk_size=500, chunk_overlap=50)
print(f"生成 {len(chunks)} 个文本块")
3.3 语义分块(推荐)
from typing import List
import numpy as np
def semantic_chunking(
text: str,
embedding_model,
threshold: float = 0.5,
min_chunk_size: int = 100
) -> List[str]:
"""
基于语义相似度的智能分块
原理:计算相邻句子的嵌入相似度,在相似度低的地方切分
"""
from sentence_transformers import SentenceTransformer
# 加载模型
if isinstance(embedding_model, str):
model = SentenceTransformer(embedding_model)
else:
model = embedding_model
# 分割成句子
sentences = re.split(r'(?<=[。!?.!?])\s+', text)
sentences = [s.strip() for s in sentences if s.strip()]
if len(sentences) < 2:
return [text]
# 计算句子嵌入
embeddings = model.encode(sentences, show_progress_bar=False)
# 计算相邻句子相似度
similarities = []
for i in range(len(embeddings) - 1):
sim = np.dot(embeddings[i], embeddings[i + 1])
similarities.append(sim)
# 在相似度低的地方切分
chunks = []
current_chunk = [sentences[0]]
for i, sim in enumerate(similarities):
if sim < threshold:
# 切分
chunk_text = ' '.join(current_chunk)
if len(chunk_text) >= min_chunk_size:
chunks.append(chunk_text)
current_chunk = []
current_chunk.append(sentences[i + 1])
# 添加最后一个块
if current_chunk:
chunk_text = ' '.join(current_chunk)
if len(chunk_text) >= min_chunk_size:
chunks.append(chunk_text)
return chunks
# 使用示例
from sentence_transformers import SentenceTransformer
model = SentenceTransformer('BAAI/bge-large-zh-v1.5')
text = "这是第一段内容。这是第二段内容。..."
chunks = semantic_chunking(text, model, threshold=0.5)
print(f"生成 {len(chunks)} 个语义块")
3.4 带元数据的分块
from dataclasses import dataclass
from datetime import datetime
@dataclass
class Chunk:
content: str
source: str
chunk_id: str
metadata: dict
embedding: np.ndarray = None
def create_chunks_with_metadata(
documents: List[Dict],
chunk_size: int = 500,
chunk_overlap: int = 50
) -> List[Chunk]:
"""创建带完整元数据的文本块"""
all_chunks = []
chunk_counter = 0
for doc in documents:
# 提取文本
texts = []
if 'pages' in doc:
texts = [page['text'] for page in doc['pages']]
elif 'slides' in doc:
texts = ['\n'.join(slide.get('text', [])) for slide in doc['slides']]
elif 'sections' in doc:
texts = [section['text'] for section in doc['sections']]
# 分块
chunks = recursive_chunking(texts, chunk_size, chunk_overlap)
# 创建 Chunk 对象
for chunk_text in chunks:
chunk = Chunk(
content=chunk_text,
source=doc.get('filename', 'unknown'),
chunk_id=f"chunk_{chunk_counter:06d}",
metadata={
'source_type': doc.get('type', 'unknown'),
'total_pages': doc.get('total_pages', 0),
'processed_at': datetime.now().isoformat()
}
)
all_chunks.append(chunk)
chunk_counter += 1
return all_chunks
第四步:向量化和存储
4.1 选择嵌入模型
中文推荐模型:
| 模型 | 维度 | 速度 | 精度 | 适用场景 |
|---|---|---|---|---|
bge-large-zh-v1.5 | 1024 | 中 | 高 | 高质量 RAG |
bge-base-zh-v1.5 | 768 | 快 | 中高 | 平衡性能 |
m3e-base | 768 | 快 | 中 | 通用场景 |
text2vec-base-chinese | 768 | 快 | 中 | 快速原型 |
安装和加载:
pip install sentence-transformers
from sentence_transformers import SentenceTransformer
# 加载中文嵌入模型
embedding_model = SentenceTransformer('BAAI/bge-large-zh-v1.5')
# 生成嵌入
texts = ["文本块 1", "文本块 2"]
embeddings = embedding_model.encode(texts, show_progress_bar=True)
print(f"嵌入维度:{embeddings.shape}")
4.2 向量数据库选择
| 数据库 | 类型 | 规模 | 特点 | 适用场景 |
|---|---|---|---|---|
| FAISS | 本地 | 百万级 | 快速、轻量 | 本地部署、小规模 |
| Chroma | 本地/服务 | 十万级 | 易用、支持元数据 | 开发测试、中小规模 |
| Pinecone | 云服务 | 亿级 | 托管、自动扩展 | 生产环境、大规模 |
| Milvus | 服务 | 亿级 | 开源、功能全 | 企业级部署 |
| Qdrant | 服务 | 千万级 | 高性能、支持过滤 | 中等规模生产 |
4.3 使用 FAISS 存储
import faiss
import numpy as np
import pickle
class FAISSStore:
def __init__(self, dimension=1024):
self.dimension = dimension
self.index = faiss.IndexFlatL2(dimension)
self.chunks = []
self.metadata = []
def add(self, chunks: List[Chunk], embeddings: np.ndarray):
"""添加文本块和嵌入"""
self.index.add(embeddings.astype('float32'))
self.chunks.extend([c.content for c in chunks])
self.metadata.extend([c.metadata for c in chunks])
def search(self, query_embedding: np.ndarray, k=5):
"""相似度搜索"""
distances, indices = self.index.search(
query_embedding.reshape(1, -1).astype('float32'),
k
)
results = []
for i, idx in enumerate(indices[0]):
if idx < len(self.chunks):
results.append({
'content': self.chunks[idx],
'distance': float(distances[0][i]),
'metadata': self.metadata[idx]
})
return results
def save(self, path: str):
"""保存索引"""
faiss.write_index(self.index, f"{path}.index")
with open(f"{path}.meta.pkl", 'wb') as f:
pickle.dump({
'chunks': self.chunks,
'metadata': self.metadata,
'dimension': self.dimension
}, f)
@classmethod
def load(cls, path: str):
"""加载索引"""
store = cls()
store.index = faiss.read_index(f"{path}.index")
with open(f"{path}.meta.pkl", 'rb') as f:
meta = pickle.load(f)
store.chunks = meta['chunks']
store.metadata = meta['metadata']
store.dimension = meta['dimension']
return store
# 使用示例
store = FAISSStore(dimension=1024)
# 添加数据
chunks = create_chunks_with_metadata(documents)
embeddings = embedding_model.encode([c.content for c in chunks])
store.add(chunks, embeddings)
# 搜索
query = "如何构建 RAG 系统?"
query_embedding = embedding_model.encode(query)
results = store.search(query_embedding, k=5)
for r in results:
print(f"距离:{r['distance']:.4f}")
print(f"内容:{r['content'][:100]}...")
4.4 使用 Chroma 存储
import chromadb
from chromadb.config import Settings
class ChromaStore:
def __init__(self, persist_directory="./chroma_db"):
self.client = chromadb.PersistentClient(path=persist_directory)
self.collection = None
def create_collection(self, name: str, embedding_function=None):
"""创建集合"""
if embedding_function is None:
from chromadb.utils.embedding_functions import SentenceTransformerEmbeddingFunction
embedding_function = SentenceTransformerEmbeddingFunction(
model_name="BAAI/bge-large-zh-v1.5"
)
self.collection = self.client.create_collection(
name=name,
embedding_function=embedding_function,
metadata={"hnsw:space": "cosine"}
)
def add_documents(self, documents: List[str], metadatas: List[dict], ids: List[str]):
"""添加文档"""
self.collection.add(
documents=documents,
metadatas=metadatas,
ids=ids
)
def query(self, query_texts: List[str], n_results=5):
"""查询文档"""
results = self.collection.query(
query_texts=query_texts,
n_results=n_results
)
return results
def save(self):
"""Chroma 自动持久化"""
pass
# 使用示例
chroma_store = ChromaStore(persist_directory="./rag_db")
chroma_store.create_collection("knowledge_base")
# 添加文档
chroma_store.add_documents(
documents=[c.content for c in chunks],
metadatas=[c.metadata for c in chunks],
ids=[c.chunk_id for c in chunks]
)
# 查询
results = chroma_store.query(
query_texts=["RAG 系统架构"],
n_results=5
)
for doc, meta in zip(results['documents'][0], results['metadatas'][0]):
print(f"来源:{meta.get('source', 'unknown')}")
print(f"内容:{doc[:100]}...")
第五步:检索优化
5.1 混合检索
from rank_bm25 import BM25Okapi
class HybridRetriever:
"""混合检索器:结合向量检索和 BM25"""
def __init__(self, chunks: List[Chunk], embeddings: np.ndarray, vector_weight=0.7):
self.vector_weight = vector_weight
self.bm25_weight = 1 - vector_weight
# 向量索引
self.dimension = embeddings.shape[1]
self.index = faiss.IndexFlatL2(self.dimension)
self.index.add(embeddings.astype('float32'))
self.chunks = chunks
# BM25 索引
tokenized_docs = [chunk.content.split() for chunk in chunks]
self.bm25 = BM25Okapi(tokenized_docs)
def search(self, query: str, k=10):
"""执行混合检索"""
# 向量检索
query_embedding = embedding_model.encode(query)
vector_distances, vector_indices = self.index.search(
query_embedding.reshape(1, -1).astype('float32'),
k * 2
)
# BM25 检索
query_tokens = query.split()
bm25_scores = self.bm25.get_scores(query_tokens)
bm25_indices = np.argsort(bm25_scores)[::-1][:k * 2]
# 融合分数
scores = {}
for i, idx in enumerate(vector_indices[0]):
if idx not in scores:
scores[idx] = 0
scores[idx] += self.vector_weight * (1 / (1 + vector_distances[0][i]))
for idx in bm25_indices:
if idx not in scores:
scores[idx] = 0
scores[idx] += self.bm25_weight * bm25_scores[idx]
# 排序返回
sorted_indices = sorted(scores.keys(), key=lambda x: scores[x], reverse=True)[:k]
results = []
for idx in sorted_indices:
results.append({
'content': self.chunks[idx].content,
'score': scores[idx],
'metadata': self.chunks[idx].metadata
})
return results
# 使用示例
retriever = HybridRetriever(chunks, embeddings, vector_weight=0.7)
results = retriever.search("RAG 知识库构建流程", k=5)
5.2 重排序(Rerank)
from sentence_transformers import CrossEncoder
class Reranker:
"""使用 CrossEncoder 重排序"""
def __init__(self, model_name='BAAI/bge-reranker-large'):
self.reranker = CrossEncoder(model_name)
def rerank(self, query: str, documents: List[dict], top_k=5):
"""对检索结果重排序"""
pairs = [[query, doc['content']] for doc in documents]
scores = self.reranker.predict(pairs)
# 按分数排序
for i, score in enumerate(scores):
documents[i]['rerank_score'] = float(score)
documents.sort(key=lambda x: x['rerank_score'], reverse=True)
return documents[:top_k]
# 使用示例
reranker = Reranker(model_name='BAAI/bge-reranker-large')
reranked_results = reranker.rerank(
query="如何构建 RAG 系统?",
documents=results,
top_k=5
)
print("重排序后结果:")
for r in reranked_results:
print(f"分数:{r['rerank_score']:.4f}")
print(f"内容:{r['content'][:100]}...")
第六步:完整工作流
6.1 自动化处理管道
from pathlib import Path
from datetime import datetime
import json
class RAGPipeline:
"""RAG 知识库构建完整管道"""
def __init__(
self,
embedding_model_name='BAAI/bge-large-zh-v1.5',
chunk_size=500,
chunk_overlap=50,
vector_db='faiss'
):
# 加载嵌入模型
self.embedding_model = SentenceTransformer(embedding_model_name)
self.chunk_size = chunk_size
self.chunk_overlap = chunk_overlap
self.vector_db = vector_db
self.store = None
self.chunks = []
def process_directory(self, directory: str, output_path: str):
"""处理目录下的所有文档"""
directory = Path(directory)
documents = []
# 解析 Word 文档
for docx_file in directory.glob('**/*.docx'):
print(f"处理 Word: {docx_file}")
content = parse_docx(str(docx_file))
content['filename'] = str(docx_file)
content['type'] = 'docx'
documents.append(content)
# 解析 PPT 文档
for pptx_file in directory.glob('**/*.pptx'):
print(f"处理 PPT: {pptx_file}")
content = parse_pptx(str(pptx_file))
content['filename'] = str(pptx_file)
content['type'] = 'pptx'
documents.append(content)
# 解析 PDF 文档
for pdf_file in directory.glob('**/*.pdf'):
print(f"处理 PDF: {pdf_file}")
content = parse_pdf(str(pdf_file))
content['filename'] = str(pdf_file)
content['type'] = 'pdf'
documents.append(content)
print(f"共解析 {len(documents)} 个文档")
# 创建文本块
self.chunks = create_chunks_with_metadata(
documents,
self.chunk_size,
self.chunk_overlap
)
print(f"生成 {len(self.chunks)} 个文本块")
# 生成嵌入
print("生成向量嵌入...")
embeddings = self.embedding_model.encode(
[c.content for c in self.chunks],
show_progress_bar=True
)
# 存储到向量数据库
if self.vector_db == 'faiss':
self.store = FAISSStore(dimension=embeddings.shape[1])
self.store.add(self.chunks, embeddings)
self.store.save(output_path)
elif self.vector_db == 'chroma':
self.store = ChromaStore(persist_directory=output_path)
self.store.create_collection("knowledge_base")
self.store.add_documents(
documents=[c.content for c in self.chunks],
metadatas=[c.metadata for c in self.chunks],
ids=[c.chunk_id for c in self.chunks]
)
print(f"知识库已保存到:{output_path}")
# 保存元数据
metadata = {
'total_documents': len(documents),
'total_chunks': len(self.chunks),
'embedding_model': self.embedding_model_name,
'chunk_size': self.chunk_size,
'chunk_overlap': self.chunk_overlap,
'created_at': datetime.now().isoformat()
}
with open(f"{output_path}_metadata.json", 'w', encoding='utf-8') as f:
json.dump(metadata, f, ensure_ascii=False, indent=2)
return metadata
def query(self, question: str, k=5):
"""查询知识库"""
if self.store is None:
raise ValueError("请先加载知识库")
# 生成查询嵌入
query_embedding = self.embedding_model.encode(question)
# 检索
if isinstance(self.store, FAISSStore):
results = self.store.search(query_embedding, k)
elif isinstance(self.store, ChromaStore):
chroma_results = self.store.query([question], n_results=k)
results = []
for doc, meta in zip(
chroma_results['documents'][0],
chroma_results['metadatas'][0]
):
results.append({
'content': doc,
'metadata': meta,
'distance': 0
})
return results
# 使用示例
pipeline = RAGPipeline(
embedding_model_name='BAAI/bge-large-zh-v1.5',
chunk_size=500,
chunk_overlap=50,
vector_db='faiss'
)
# 构建知识库
metadata = pipeline.process_directory(
directory="./documents",
output_path="./knowledge_base"
)
print(f"处理完成:{json.dumps(metadata, ensure_ascii=False, indent=2)}")
# 查询
results = pipeline.query("如何构建 RAG 知识库?", k=5)
for i, r in enumerate(results, 1):
print(f"\n{i}. 来源:{r['metadata'].get('source', 'unknown')}")
print(f" 内容:{r['content'][:200]}...")
6.2 接入 LLM 生成响应
from openai import OpenAI
class RAGSystem:
"""完整的 RAG 问答系统"""
def __init__(self, pipeline: RAGPipeline, llm_api_key: str):
self.pipeline = pipeline
self.client = OpenAI(api_key=llm_api_key)
self.model = "gpt-4o" # 或其他模型
def generate_response(self, question: str, k=5):
"""生成 RAG 响应"""
# 检索相关文档
results = self.pipeline.query(question, k=k)
# 构建上下文
context = "\n\n".join([
f"[来源:{r['metadata'].get('source', 'unknown')}]\n{r['content']}"
for r in results
])
# 构建提示
prompt = f"""你是一个专业的助手。请根据以下参考资料回答问题。
参考资料:
{context}
问题:{question}
请用中文回答,如果参考资料不足以回答问题,请说明。"""
# 调用 LLM
response = self.client.chat.completions.create(
model=self.model,
messages=[
{"role": "system", "content": "你是一个专业、准确的助手。"},
{"role": "user", "content": prompt}
],
temperature=0.7,
max_tokens=1000
)
return {
'answer': response.choices[0].message.content,
'sources': [r['metadata'].get('source', 'unknown') for r in results],
'retrieved_chunks': results
}
# 使用示例
rag_system = RAGSystem(pipeline, llm_api_key="your-api-key")
response = rag_system.generate_response("如何构建 RAG 知识库?")
print(f"答案:{response['answer']}")
print(f"参考来源:{response['sources']}")
性能优化建议
分块参数调优
# 不同场景的推荐参数
CHUNK_CONFIGS = {
'technical_docs': {
'chunk_size': 500,
'chunk_overlap': 50,
'strategy': 'recursive'
},
'legal_docs': {
'chunk_size': 800,
'chunk_overlap': 100,
'strategy': 'semantic'
},
'presentation': {
'chunk_size': 300,
'chunk_overlap': 30,
'strategy': 'by_slide'
},
'mixed': {
'chunk_size': 500,
'chunk_overlap': 50,
'strategy': 'hybrid'
}
}
评估指标
def evaluate_rag(retriever, test_questions, ground_truths):
"""评估 RAG 系统性能"""
from sklearn.metrics import precision_score, recall_score
results = {
'precision': [],
'recall': [],
'latency': []
}
for question, truth in zip(test_questions, ground_truths):
import time
start = time.time()
retrieved = retriever.search(question, k=5)
retrieved_sources = set([r['metadata'].get('source') for r in retrieved])
latency = time.time() - start
results['latency'].append(latency)
# 计算精确率和召回率
if truth:
tp = len(retrieved_sources & set(truth))
precision = tp / len(retrieved_sources) if retrieved_sources else 0
recall = tp / len(truth) if truth else 0
results['precision'].append(precision)
results['recall'].append(recall)
return {
'avg_precision': np.mean(results['precision']),
'avg_recall': np.mean(results['recall']),
'avg_latency': np.mean(results['latency'])
}
总结
构建高质量的 RAG 知识库需要关注以下关键点:
- 文档解析:针对不同格式(Word/PPT/PDF)使用合适的解析工具
- 文本清洗:移除噪声,保持内容质量
- 智能分块:根据文档类型选择合适的分块策略
- 向量化:选择适合中文的嵌入模型
- 存储优化:根据规模选择向量数据库
- 检索增强:使用混合检索和重排序提升准确率
通过自动化处理管道,可以高效地将企业文档转化为可用的 RAG 知识库,为智能问答系统提供可靠的知识来源。
参考链接
本文由 OpenClaw 协助编写 🤖