前言

在企业 AI 应用中,RAG(Retrieval-Augmented Generation,检索增强生成) 已成为构建智能问答系统的标准方案。但 RAG 系统的效果很大程度上取决于文档处理的质量 —— 垃圾进,垃圾出。

本文将以实际项目为例,详细讲解如何自动化处理 Word、PPT、PDF 文档,构建高质量的 RAG 知识库。

RAG 知识库架构

核心组件

┌─────────────────────────────────────────────────────────────┐
│                      RAG 系统架构                            │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│  ┌──────────┐    ┌──────────┐    ┌──────────┐              │
│  │ 文档处理  │ →  │ 向量化   │ →  │ 向量存储  │              │
│  │ 层       │    │ 层       │    │ 层       │              │
│  └──────────┘    └──────────┘    └──────────┘              │
│       ↓                ↓                ↓                   │
│  ┌──────────┐    ┌──────────┐    ┌──────────┐              │
│  │ Word     │    │ Embedding│    │ FAISS    │              │
│  │ PPT      │    │ 模型     │    │ Chroma   │              │
│  │ PDF      │    │          │    │ Pinecone │              │
│  └──────────┘    └──────────┘    └──────────┘              │
│                                                             │
│  ┌──────────┐    ┌──────────┐    ┌──────────┐              │
│  │ 检索层   │ ←  │ 查询     │ ←  │ 用户     │              │
│  │ + 重排序  │    │ 处理     │    │ 输入     │              │
│  └──────────┘    └──────────┘    └──────────┘              │
│       ↓                                                   │
│  ┌──────────┐                                            │
│  │ LLM 生成  │                                            │
│  │ 响应     │                                            │
│  └──────────┘                                            │
└─────────────────────────────────────────────────────────────┘

数据处理流程

graph LR
    A[原始文档] --> B[文档解析]
    B --> C[文本清洗]
    C --> D[智能分块]
    D --> E[向量化]
    E --> F[向量存储]
    F --> G[检索查询]
    G --> H[LLM 生成]

第一步:文档解析

1.1 Word 文档处理

依赖安装:

pip install python-docx pandoc
npm install -g docx

解析 Word 文档:

from docx import Document
import os

def parse_docx(file_path):
    """解析 Word 文档,提取文本和结构信息"""
    doc = Document(file_path)
    
    content = {
        'title': '',
        'sections': [],
        'tables': [],
        'images': []
    }
    
    # 提取标题
    for para in doc.paragraphs:
        if para.style.name.startswith('Heading'):
            content['sections'].append({
                'level': int(para.style.name[-1]),
                'text': para.text,
                'style': para.style.name
            })
        elif para.text.strip():
            content['sections'].append({
                'level': 0,
                'text': para.text,
                'style': 'Normal'
            })
    
    # 提取表格
    for i, table in enumerate(doc.tables):
        table_data = []
        for row in table.rows:
            row_data = [cell.text for cell in row.cells]
            table_data.append(row_data)
        content['tables'].append({
            'index': i,
            'data': table_data
        })
    
    return content

# 使用示例
doc_content = parse_docx('document.docx')
print(f"章节数:{len(doc_content['sections'])}")
print(f"表格数:{len(doc_content['tables'])}")

批量处理多个文档:

from pathlib import Path
from concurrent.futures import ThreadPoolExecutor

def batch_parse_docx(directory, max_workers=4):
    """批量解析目录下的所有 Word 文档"""
    docx_files = list(Path(directory).glob('*.docx'))
    results = {}
    
    def process_file(file_path):
        try:
            content = parse_docx(str(file_path))
            return file_path.name, content, None
        except Exception as e:
            return file_path.name, None, str(e)
    
    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        futures = [executor.submit(process_file, f) for f in docx_files]
        for future in futures:
            filename, content, error = future.result()
            if error:
                print(f"处理 {filename} 失败:{error}")
            else:
                results[filename] = content
    
    return results

1.2 PPT 演示文稿处理

依赖安装:

pip install python-pptx markitdown

解析 PPT 文档:

from pptx import Presentation

def parse_pptx(file_path):
    """解析 PPT 文档,提取幻灯片内容"""
    prs = Presentation(file_path)
    
    slides = []
    for i, slide in enumerate(prs.slides):
        slide_content = {
            'slide_number': i + 1,
            'title': '',
            'text': [],
            'images': [],
            'tables': []
        }
        
        # 提取标题
        if slide.shapes.title:
            slide_content['title'] = slide.shapes.title.text
        
        # 提取文本
        for shape in slide.shapes:
            if hasattr(shape, "text") and shape.text.strip():
                if shape != slide.shapes.title:
                    slide_content['text'].append(shape.text)
            
            # 提取表格
            if shape.has_table:
                table = shape.table
                table_data = []
                for row in table.rows:
                    row_data = [cell.text for cell in row.cells]
                    table_data.append(row_data)
                slide_content['tables'].append(table_data)
            
            # 提取图片
            if shape.shape_type == 13:  # MSO_SHAPE_TYPE.PICTURE
                slide_content['images'].append({
                    'name': shape.name,
                    'type': 'image'
                })
        
        slides.append(slide_content)
    
    return {
        'filename': os.path.basename(file_path),
        'total_slides': len(slides),
        'slides': slides
    }

# 使用示例
ppt_content = parse_pptx('presentation.pptx')
print(f"幻灯片数:{ppt_content['total_slides']}")
for slide in ppt_content['slides'][:3]:
    print(f"第{slide['slide_number']}页:{slide['title']}")

1.3 PDF 文档处理

依赖安装:

pip install pypdf pdfplumber pdf2image pytesseract

解析 PDF 文档:

import pdfplumber
from pypdf import PdfReader

def parse_pdf(file_path, extract_tables=True):
    """解析 PDF 文档,提取文本和表格"""
    
    # 基础文本提取
    reader = PdfReader(file_path)
    text_pages = []
    
    for i, page in enumerate(reader.pages):
        text = page.extract_text()
        text_pages.append({
            'page': i + 1,
            'text': text
        })
    
    # 表格提取(使用 pdfplumber 更准确)
    tables = []
    if extract_tables:
        with pdfplumber.open(file_path) as pdf:
            for i, page in enumerate(pdf.pages):
                page_tables = page.extract_tables()
                for j, table in enumerate(page_tables):
                    tables.append({
                        'page': i + 1,
                        'table_index': j,
                        'data': table
                    })
    
    return {
        'filename': os.path.basename(file_path),
        'total_pages': len(text_pages),
        'pages': text_pages,
        'tables': tables
    }

# 使用示例
pdf_content = parse_pdf('document.pdf')
print(f"总页数:{pdf_content['total_pages']}")
print(f"表格数:{len(pdf_content['tables'])}")

OCR 处理扫描件:

from pdf2image import convert_from_path
import pytesseract

def ocr_pdf(file_path, lang='chi_sim+eng'):
    """对扫描版 PDF 进行 OCR 识别"""
    
    # 转换 PDF 为图片
    images = convert_from_path(file_path, dpi=300)
    
    text_pages = []
    for i, image in enumerate(images):
        # OCR 识别
        text = pytesseract.image_to_string(image, lang=lang)
        text_pages.append({
            'page': i + 1,
            'text': text
        })
    
    return {
        'filename': os.path.basename(file_path),
        'total_pages': len(text_pages),
        'pages': text_pages,
        'ocr': True
    }

第二步:文本清洗和预处理

2.1 通用清洗函数

import re
from typing import List, Dict

def clean_text(text: str) -> str:
    """清洗文本,移除无用字符和格式"""
    
    # 移除多余空白
    text = re.sub(r'\s+', ' ', text)
    
    # 移除特殊字符(保留中文标点)
    text = re.sub(r'[^\w\s\u4e00-\u9fff,.!?;:()""''-]', '', text)
    
    # 移除页眉页脚(常见模式)
    text = re.sub(r'^\d+\s*$', '', text, flags=re.MULTILINE)
    text = re.sub(r'^第\d+页.*$', '', text, flags=re.MULTILINE)
    
    # 移除过短的行(可能是噪声)
    lines = text.split('\n')
    lines = [line for line in lines if len(line.strip()) > 2]
    
    return '\n'.join(lines).strip()

def extract_metadata(content: Dict) -> Dict:
    """提取文档元数据"""
    return {
        'source': content.get('filename', 'unknown'),
        'total_pages': content.get('total_pages', 0),
        'total_slides': content.get('total_slides', 0),
        'has_tables': len(content.get('tables', [])) > 0,
        'processed_at': datetime.now().isoformat()
    }

2.2 结构化文本组织

def organize_content(parsed_content: Dict) -> List[Dict]:
    """将解析内容组织为结构化段落"""
    
    chunks = []
    
    # 处理 Word 文档
    if 'sections' in parsed_content:
        current_section = ''
        for section in parsed_content['sections']:
            if section['level'] > 0:
                current_section = section['text']
            chunks.append({
                'type': 'section' if section['level'] > 0 else 'paragraph',
                'heading': current_section,
                'content': section['text'],
                'metadata': {'level': section['level']}
            })
    
    # 处理 PPT
    if 'slides' in parsed_content:
        for slide in parsed_content['slides']:
            text_content = '\n'.join(slide.get('text', []))
            if text_content:
                chunks.append({
                    'type': 'slide',
                    'heading': slide.get('title', ''),
                    'content': text_content,
                    'metadata': {'slide_number': slide['slide_number']}
                })
    
    # 处理 PDF
    if 'pages' in parsed_content:
        for page in parsed_content['pages']:
            if page['text'].strip():
                chunks.append({
                    'type': 'page',
                    'heading': f"第{page['page']}页",
                    'content': page['text'],
                    'metadata': {'page_number': page['page']}
                })
    
    return chunks

第三步:智能文本分块

3.1 分块策略对比

策略优点缺点适用场景
固定长度简单、可预测破坏语义边界短文本、均匀内容
按段落保持语义完整块大小不均结构化文档
递归分块灵活、保持上下文实现复杂长文档、混合内容
语义分块最佳语义连贯性计算成本高高质量要求场景

3.2 递归字符分块

from langchain.text_splitter import RecursiveCharacterTextSplitter

def recursive_chunking(texts: List[str], chunk_size=500, chunk_overlap=50):
    """使用 LangChain 递归分块器"""
    
    text_splitter = RecursiveCharacterTextSplitter(
        chunk_size=chunk_size,
        chunk_overlap=chunk_overlap,
        length_function=len,
        separators=["\n\n", "\n", "。", "!", "?", ";", " ", ""]
    )
    
    chunks = []
    for text in texts:
        chunk_texts = text_splitter.split_text(text)
        chunks.extend(chunk_texts)
    
    return chunks

# 使用示例
texts = ["文档内容 1", "文档内容 2"]
chunks = recursive_chunking(texts, chunk_size=500, chunk_overlap=50)
print(f"生成 {len(chunks)} 个文本块")

3.3 语义分块(推荐)

from typing import List
import numpy as np

def semantic_chunking(
    text: str, 
    embedding_model,
    threshold: float = 0.5,
    min_chunk_size: int = 100
) -> List[str]:
    """
    基于语义相似度的智能分块
    
    原理:计算相邻句子的嵌入相似度,在相似度低的地方切分
    """
    from sentence_transformers import SentenceTransformer
    
    # 加载模型
    if isinstance(embedding_model, str):
        model = SentenceTransformer(embedding_model)
    else:
        model = embedding_model
    
    # 分割成句子
    sentences = re.split(r'(?<=[。!?.!?])\s+', text)
    sentences = [s.strip() for s in sentences if s.strip()]
    
    if len(sentences) < 2:
        return [text]
    
    # 计算句子嵌入
    embeddings = model.encode(sentences, show_progress_bar=False)
    
    # 计算相邻句子相似度
    similarities = []
    for i in range(len(embeddings) - 1):
        sim = np.dot(embeddings[i], embeddings[i + 1])
        similarities.append(sim)
    
    # 在相似度低的地方切分
    chunks = []
    current_chunk = [sentences[0]]
    
    for i, sim in enumerate(similarities):
        if sim < threshold:
            # 切分
            chunk_text = ' '.join(current_chunk)
            if len(chunk_text) >= min_chunk_size:
                chunks.append(chunk_text)
            current_chunk = []
        current_chunk.append(sentences[i + 1])
    
    # 添加最后一个块
    if current_chunk:
        chunk_text = ' '.join(current_chunk)
        if len(chunk_text) >= min_chunk_size:
            chunks.append(chunk_text)
    
    return chunks

# 使用示例
from sentence_transformers import SentenceTransformer
model = SentenceTransformer('BAAI/bge-large-zh-v1.5')

text = "这是第一段内容。这是第二段内容。..."
chunks = semantic_chunking(text, model, threshold=0.5)
print(f"生成 {len(chunks)} 个语义块")

3.4 带元数据的分块

from dataclasses import dataclass
from datetime import datetime

@dataclass
class Chunk:
    content: str
    source: str
    chunk_id: str
    metadata: dict
    embedding: np.ndarray = None

def create_chunks_with_metadata(
    documents: List[Dict],
    chunk_size: int = 500,
    chunk_overlap: int = 50
) -> List[Chunk]:
    """创建带完整元数据的文本块"""
    
    all_chunks = []
    chunk_counter = 0
    
    for doc in documents:
        # 提取文本
        texts = []
        if 'pages' in doc:
            texts = [page['text'] for page in doc['pages']]
        elif 'slides' in doc:
            texts = ['\n'.join(slide.get('text', [])) for slide in doc['slides']]
        elif 'sections' in doc:
            texts = [section['text'] for section in doc['sections']]
        
        # 分块
        chunks = recursive_chunking(texts, chunk_size, chunk_overlap)
        
        # 创建 Chunk 对象
        for chunk_text in chunks:
            chunk = Chunk(
                content=chunk_text,
                source=doc.get('filename', 'unknown'),
                chunk_id=f"chunk_{chunk_counter:06d}",
                metadata={
                    'source_type': doc.get('type', 'unknown'),
                    'total_pages': doc.get('total_pages', 0),
                    'processed_at': datetime.now().isoformat()
                }
            )
            all_chunks.append(chunk)
            chunk_counter += 1
    
    return all_chunks

第四步:向量化和存储

4.1 选择嵌入模型

中文推荐模型:

模型维度速度精度适用场景
bge-large-zh-v1.51024高质量 RAG
bge-base-zh-v1.5768中高平衡性能
m3e-base768通用场景
text2vec-base-chinese768快速原型

安装和加载:

pip install sentence-transformers
from sentence_transformers import SentenceTransformer

# 加载中文嵌入模型
embedding_model = SentenceTransformer('BAAI/bge-large-zh-v1.5')

# 生成嵌入
texts = ["文本块 1", "文本块 2"]
embeddings = embedding_model.encode(texts, show_progress_bar=True)
print(f"嵌入维度:{embeddings.shape}")

4.2 向量数据库选择

数据库类型规模特点适用场景
FAISS本地百万级快速、轻量本地部署、小规模
Chroma本地/服务十万级易用、支持元数据开发测试、中小规模
Pinecone云服务亿级托管、自动扩展生产环境、大规模
Milvus服务亿级开源、功能全企业级部署
Qdrant服务千万级高性能、支持过滤中等规模生产

4.3 使用 FAISS 存储

import faiss
import numpy as np
import pickle

class FAISSStore:
    def __init__(self, dimension=1024):
        self.dimension = dimension
        self.index = faiss.IndexFlatL2(dimension)
        self.chunks = []
        self.metadata = []
    
    def add(self, chunks: List[Chunk], embeddings: np.ndarray):
        """添加文本块和嵌入"""
        self.index.add(embeddings.astype('float32'))
        self.chunks.extend([c.content for c in chunks])
        self.metadata.extend([c.metadata for c in chunks])
    
    def search(self, query_embedding: np.ndarray, k=5):
        """相似度搜索"""
        distances, indices = self.index.search(
            query_embedding.reshape(1, -1).astype('float32'), 
            k
        )
        
        results = []
        for i, idx in enumerate(indices[0]):
            if idx < len(self.chunks):
                results.append({
                    'content': self.chunks[idx],
                    'distance': float(distances[0][i]),
                    'metadata': self.metadata[idx]
                })
        
        return results
    
    def save(self, path: str):
        """保存索引"""
        faiss.write_index(self.index, f"{path}.index")
        with open(f"{path}.meta.pkl", 'wb') as f:
            pickle.dump({
                'chunks': self.chunks,
                'metadata': self.metadata,
                'dimension': self.dimension
            }, f)
    
    @classmethod
    def load(cls, path: str):
        """加载索引"""
        store = cls()
        store.index = faiss.read_index(f"{path}.index")
        with open(f"{path}.meta.pkl", 'rb') as f:
            meta = pickle.load(f)
            store.chunks = meta['chunks']
            store.metadata = meta['metadata']
            store.dimension = meta['dimension']
        return store

# 使用示例
store = FAISSStore(dimension=1024)

# 添加数据
chunks = create_chunks_with_metadata(documents)
embeddings = embedding_model.encode([c.content for c in chunks])
store.add(chunks, embeddings)

# 搜索
query = "如何构建 RAG 系统?"
query_embedding = embedding_model.encode(query)
results = store.search(query_embedding, k=5)

for r in results:
    print(f"距离:{r['distance']:.4f}")
    print(f"内容:{r['content'][:100]}...")

4.4 使用 Chroma 存储

import chromadb
from chromadb.config import Settings

class ChromaStore:
    def __init__(self, persist_directory="./chroma_db"):
        self.client = chromadb.PersistentClient(path=persist_directory)
        self.collection = None
    
    def create_collection(self, name: str, embedding_function=None):
        """创建集合"""
        if embedding_function is None:
            from chromadb.utils.embedding_functions import SentenceTransformerEmbeddingFunction
            embedding_function = SentenceTransformerEmbeddingFunction(
                model_name="BAAI/bge-large-zh-v1.5"
            )
        
        self.collection = self.client.create_collection(
            name=name,
            embedding_function=embedding_function,
            metadata={"hnsw:space": "cosine"}
        )
    
    def add_documents(self, documents: List[str], metadatas: List[dict], ids: List[str]):
        """添加文档"""
        self.collection.add(
            documents=documents,
            metadatas=metadatas,
            ids=ids
        )
    
    def query(self, query_texts: List[str], n_results=5):
        """查询文档"""
        results = self.collection.query(
            query_texts=query_texts,
            n_results=n_results
        )
        return results
    
    def save(self):
        """Chroma 自动持久化"""
        pass

# 使用示例
chroma_store = ChromaStore(persist_directory="./rag_db")
chroma_store.create_collection("knowledge_base")

# 添加文档
chroma_store.add_documents(
    documents=[c.content for c in chunks],
    metadatas=[c.metadata for c in chunks],
    ids=[c.chunk_id for c in chunks]
)

# 查询
results = chroma_store.query(
    query_texts=["RAG 系统架构"],
    n_results=5
)

for doc, meta in zip(results['documents'][0], results['metadatas'][0]):
    print(f"来源:{meta.get('source', 'unknown')}")
    print(f"内容:{doc[:100]}...")

第五步:检索优化

5.1 混合检索

from rank_bm25 import BM25Okapi

class HybridRetriever:
    """混合检索器:结合向量检索和 BM25"""
    
    def __init__(self, chunks: List[Chunk], embeddings: np.ndarray, vector_weight=0.7):
        self.vector_weight = vector_weight
        self.bm25_weight = 1 - vector_weight
        
        # 向量索引
        self.dimension = embeddings.shape[1]
        self.index = faiss.IndexFlatL2(self.dimension)
        self.index.add(embeddings.astype('float32'))
        self.chunks = chunks
        
        # BM25 索引
        tokenized_docs = [chunk.content.split() for chunk in chunks]
        self.bm25 = BM25Okapi(tokenized_docs)
    
    def search(self, query: str, k=10):
        """执行混合检索"""
        # 向量检索
        query_embedding = embedding_model.encode(query)
        vector_distances, vector_indices = self.index.search(
            query_embedding.reshape(1, -1).astype('float32'), 
            k * 2
        )
        
        # BM25 检索
        query_tokens = query.split()
        bm25_scores = self.bm25.get_scores(query_tokens)
        bm25_indices = np.argsort(bm25_scores)[::-1][:k * 2]
        
        # 融合分数
        scores = {}
        
        for i, idx in enumerate(vector_indices[0]):
            if idx not in scores:
                scores[idx] = 0
            scores[idx] += self.vector_weight * (1 / (1 + vector_distances[0][i]))
        
        for idx in bm25_indices:
            if idx not in scores:
                scores[idx] = 0
            scores[idx] += self.bm25_weight * bm25_scores[idx]
        
        # 排序返回
        sorted_indices = sorted(scores.keys(), key=lambda x: scores[x], reverse=True)[:k]
        
        results = []
        for idx in sorted_indices:
            results.append({
                'content': self.chunks[idx].content,
                'score': scores[idx],
                'metadata': self.chunks[idx].metadata
            })
        
        return results

# 使用示例
retriever = HybridRetriever(chunks, embeddings, vector_weight=0.7)
results = retriever.search("RAG 知识库构建流程", k=5)

5.2 重排序(Rerank)

from sentence_transformers import CrossEncoder

class Reranker:
    """使用 CrossEncoder 重排序"""
    
    def __init__(self, model_name='BAAI/bge-reranker-large'):
        self.reranker = CrossEncoder(model_name)
    
    def rerank(self, query: str, documents: List[dict], top_k=5):
        """对检索结果重排序"""
        pairs = [[query, doc['content']] for doc in documents]
        scores = self.reranker.predict(pairs)
        
        # 按分数排序
        for i, score in enumerate(scores):
            documents[i]['rerank_score'] = float(score)
        
        documents.sort(key=lambda x: x['rerank_score'], reverse=True)
        
        return documents[:top_k]

# 使用示例
reranker = Reranker(model_name='BAAI/bge-reranker-large')
reranked_results = reranker.rerank(
    query="如何构建 RAG 系统?",
    documents=results,
    top_k=5
)

print("重排序后结果:")
for r in reranked_results:
    print(f"分数:{r['rerank_score']:.4f}")
    print(f"内容:{r['content'][:100]}...")

第六步:完整工作流

6.1 自动化处理管道

from pathlib import Path
from datetime import datetime
import json

class RAGPipeline:
    """RAG 知识库构建完整管道"""
    
    def __init__(
        self,
        embedding_model_name='BAAI/bge-large-zh-v1.5',
        chunk_size=500,
        chunk_overlap=50,
        vector_db='faiss'
    ):
        # 加载嵌入模型
        self.embedding_model = SentenceTransformer(embedding_model_name)
        self.chunk_size = chunk_size
        self.chunk_overlap = chunk_overlap
        self.vector_db = vector_db
        self.store = None
        self.chunks = []
    
    def process_directory(self, directory: str, output_path: str):
        """处理目录下的所有文档"""
        
        directory = Path(directory)
        documents = []
        
        # 解析 Word 文档
        for docx_file in directory.glob('**/*.docx'):
            print(f"处理 Word: {docx_file}")
            content = parse_docx(str(docx_file))
            content['filename'] = str(docx_file)
            content['type'] = 'docx'
            documents.append(content)
        
        # 解析 PPT 文档
        for pptx_file in directory.glob('**/*.pptx'):
            print(f"处理 PPT: {pptx_file}")
            content = parse_pptx(str(pptx_file))
            content['filename'] = str(pptx_file)
            content['type'] = 'pptx'
            documents.append(content)
        
        # 解析 PDF 文档
        for pdf_file in directory.glob('**/*.pdf'):
            print(f"处理 PDF: {pdf_file}")
            content = parse_pdf(str(pdf_file))
            content['filename'] = str(pdf_file)
            content['type'] = 'pdf'
            documents.append(content)
        
        print(f"共解析 {len(documents)} 个文档")
        
        # 创建文本块
        self.chunks = create_chunks_with_metadata(
            documents,
            self.chunk_size,
            self.chunk_overlap
        )
        print(f"生成 {len(self.chunks)} 个文本块")
        
        # 生成嵌入
        print("生成向量嵌入...")
        embeddings = self.embedding_model.encode(
            [c.content for c in self.chunks],
            show_progress_bar=True
        )
        
        # 存储到向量数据库
        if self.vector_db == 'faiss':
            self.store = FAISSStore(dimension=embeddings.shape[1])
            self.store.add(self.chunks, embeddings)
            self.store.save(output_path)
        elif self.vector_db == 'chroma':
            self.store = ChromaStore(persist_directory=output_path)
            self.store.create_collection("knowledge_base")
            self.store.add_documents(
                documents=[c.content for c in self.chunks],
                metadatas=[c.metadata for c in self.chunks],
                ids=[c.chunk_id for c in self.chunks]
            )
        
        print(f"知识库已保存到:{output_path}")
        
        # 保存元数据
        metadata = {
            'total_documents': len(documents),
            'total_chunks': len(self.chunks),
            'embedding_model': self.embedding_model_name,
            'chunk_size': self.chunk_size,
            'chunk_overlap': self.chunk_overlap,
            'created_at': datetime.now().isoformat()
        }
        
        with open(f"{output_path}_metadata.json", 'w', encoding='utf-8') as f:
            json.dump(metadata, f, ensure_ascii=False, indent=2)
        
        return metadata
    
    def query(self, question: str, k=5):
        """查询知识库"""
        if self.store is None:
            raise ValueError("请先加载知识库")
        
        # 生成查询嵌入
        query_embedding = self.embedding_model.encode(question)
        
        # 检索
        if isinstance(self.store, FAISSStore):
            results = self.store.search(query_embedding, k)
        elif isinstance(self.store, ChromaStore):
            chroma_results = self.store.query([question], n_results=k)
            results = []
            for doc, meta in zip(
                chroma_results['documents'][0],
                chroma_results['metadatas'][0]
            ):
                results.append({
                    'content': doc,
                    'metadata': meta,
                    'distance': 0
                })
        
        return results

# 使用示例
pipeline = RAGPipeline(
    embedding_model_name='BAAI/bge-large-zh-v1.5',
    chunk_size=500,
    chunk_overlap=50,
    vector_db='faiss'
)

# 构建知识库
metadata = pipeline.process_directory(
    directory="./documents",
    output_path="./knowledge_base"
)

print(f"处理完成:{json.dumps(metadata, ensure_ascii=False, indent=2)}")

# 查询
results = pipeline.query("如何构建 RAG 知识库?", k=5)
for i, r in enumerate(results, 1):
    print(f"\n{i}. 来源:{r['metadata'].get('source', 'unknown')}")
    print(f"   内容:{r['content'][:200]}...")

6.2 接入 LLM 生成响应

from openai import OpenAI

class RAGSystem:
    """完整的 RAG 问答系统"""
    
    def __init__(self, pipeline: RAGPipeline, llm_api_key: str):
        self.pipeline = pipeline
        self.client = OpenAI(api_key=llm_api_key)
        self.model = "gpt-4o"  # 或其他模型
    
    def generate_response(self, question: str, k=5):
        """生成 RAG 响应"""
        
        # 检索相关文档
        results = self.pipeline.query(question, k=k)
        
        # 构建上下文
        context = "\n\n".join([
            f"[来源:{r['metadata'].get('source', 'unknown')}]\n{r['content']}"
            for r in results
        ])
        
        # 构建提示
        prompt = f"""你是一个专业的助手。请根据以下参考资料回答问题。

参考资料:
{context}

问题:{question}

请用中文回答,如果参考资料不足以回答问题,请说明。"""

        # 调用 LLM
        response = self.client.chat.completions.create(
            model=self.model,
            messages=[
                {"role": "system", "content": "你是一个专业、准确的助手。"},
                {"role": "user", "content": prompt}
            ],
            temperature=0.7,
            max_tokens=1000
        )
        
        return {
            'answer': response.choices[0].message.content,
            'sources': [r['metadata'].get('source', 'unknown') for r in results],
            'retrieved_chunks': results
        }

# 使用示例
rag_system = RAGSystem(pipeline, llm_api_key="your-api-key")

response = rag_system.generate_response("如何构建 RAG 知识库?")
print(f"答案:{response['answer']}")
print(f"参考来源:{response['sources']}")

性能优化建议

分块参数调优

# 不同场景的推荐参数
CHUNK_CONFIGS = {
    'technical_docs': {
        'chunk_size': 500,
        'chunk_overlap': 50,
        'strategy': 'recursive'
    },
    'legal_docs': {
        'chunk_size': 800,
        'chunk_overlap': 100,
        'strategy': 'semantic'
    },
    'presentation': {
        'chunk_size': 300,
        'chunk_overlap': 30,
        'strategy': 'by_slide'
    },
    'mixed': {
        'chunk_size': 500,
        'chunk_overlap': 50,
        'strategy': 'hybrid'
    }
}

评估指标

def evaluate_rag(retriever, test_questions, ground_truths):
    """评估 RAG 系统性能"""
    
    from sklearn.metrics import precision_score, recall_score
    
    results = {
        'precision': [],
        'recall': [],
        'latency': []
    }
    
    for question, truth in zip(test_questions, ground_truths):
        import time
        start = time.time()
        
        retrieved = retriever.search(question, k=5)
        retrieved_sources = set([r['metadata'].get('source') for r in retrieved])
        
        latency = time.time() - start
        results['latency'].append(latency)
        
        # 计算精确率和召回率
        if truth:
            tp = len(retrieved_sources & set(truth))
            precision = tp / len(retrieved_sources) if retrieved_sources else 0
            recall = tp / len(truth) if truth else 0
            results['precision'].append(precision)
            results['recall'].append(recall)
    
    return {
        'avg_precision': np.mean(results['precision']),
        'avg_recall': np.mean(results['recall']),
        'avg_latency': np.mean(results['latency'])
    }

总结

构建高质量的 RAG 知识库需要关注以下关键点:

  1. 文档解析:针对不同格式(Word/PPT/PDF)使用合适的解析工具
  2. 文本清洗:移除噪声,保持内容质量
  3. 智能分块:根据文档类型选择合适的分块策略
  4. 向量化:选择适合中文的嵌入模型
  5. 存储优化:根据规模选择向量数据库
  6. 检索增强:使用混合检索和重排序提升准确率

通过自动化处理管道,可以高效地将企业文档转化为可用的 RAG 知识库,为智能问答系统提供可靠的知识来源。

参考链接


本文由 OpenClaw 协助编写 🤖