Skip to content

智能客服系统 - RAG + Agent 完整实战

目录

  1. 项目概述
  2. 系统架构
  3. RAG知识库实现
  4. 多轮对话管理
  5. Agent工具调用
  6. FastAPI后端服务
  7. 前端对话界面
  8. 部署与监控

项目概述

项目背景

智能客服系统是AI落地最广泛的场景之一。传统客服系统基于关键词匹配和规则引擎, 无法理解用户的真实意图。结合大语言模型(LLM)、检索增强生成(RAG)和智能体(Agent) 技术,可以构建一个真正理解用户问题、能查阅知识库、能调用业务系统的智能客服。

系统整体架构

+===========================================================================+
|                        智能客服系统 - 总体架构                              |
+===========================================================================+
|                                                                           |
|   +-------------------+     +-----------------+     +-----------------+   |
|   |                   |     |                 |     |                 |   |
|   |    Web 前端       |     |  微信小程序      |     |   API 调用方    |   |
|   |  (React/Vue)      |     |                 |     |  (第三方系统)   |   |
|   +--------+----------+     +--------+--------+     +--------+--------+   |
|            |                         |                       |            |
|            +------------+------------+-----------+-----------+            |
|                         |                        |                        |
|                         v                        v                        |
|            +------------+------------------------+-----------+            |
|            |                  API Gateway                    |            |
|            |            (Nginx / Kong)                       |            |
|            +-------------------------+-----------------------+            |
|                                      |                                    |
|                                      v                                    |
|            +-------------------------+-----------------------+            |
|            |              FastAPI 后端服务                    |            |
|            |  +---------------+  +----------------+          |            |
|            |  | 对话管理器    |  | 会话状态管理    |          |            |
|            |  | (Dialogue     |  | (Session       |          |            |
|            |  |  Manager)     |  |  Manager)      |          |            |
|            |  +-------+-------+  +-------+--------+          |            |
|            |          |                  |                    |            |
|            |          v                  v                    |            |
|            |  +-------+------------------+--------+          |            |
|            |  |         意图路由器                  |          |            |
|            |  |     (Intent Router)                |          |            |
|            |  +--+--------+--------+--------+-----+          |            |
|            |     |        |        |        |                |            |
|            +-----+--------+--------+--------+----------------+            |
|                  |        |        |        |                             |
|         +--------+--+ +---+----+ +-+------+ +---+--------+               |
|         |           | |        | |        | |            |               |
|         | RAG知识库 | | Agent  | | 闲聊   | | 人工转接   |               |
|         | 问答      | | 工具   | | 模块   | | 模块       |               |
|         |           | | 调用   | |        | |            |               |
|         +-----+-----+ +---+---+ +---+----+ +---+--------+               |
|               |            |         |          |                         |
|         +-----+-----+ +---+---+ +---+----+ +---+--------+               |
|         | ChromaDB  | | 订单  | |  LLM   | | 消息队列   |               |
|         | 向量数据库 | | 系统  | | 直接   | | (Redis)    |               |
|         |           | | CRM   | | 生成   | |            |               |
|         +-----------+ +-------+ +--------+ +------------+               |
|                                                                           |
+===========================================================================+

核心功能清单

功能模块说明技术方案
RAG问答基于企业知识库回答用户问题ChromaDB + OpenAI Embedding
多轮对话保持对话上下文,理解指代关系Redis会话存储 + 滑动窗口
Agent工具查订单、退款、修改信息等操作Function Calling + 工具链
意图识别自动判断用户意图并路由LLM分类 + 规则兜底
人工转接复杂问题无缝转接人工客服WebSocket + Redis队列
对话评估自动评估回答质量LLM打分 + 用户反馈

技术栈总览

+------------------------------------------------------------------+
|                        技术栈                                     |
+------------------------------------------------------------------+
|  前端:  React 18 + TypeScript + Tailwind CSS + WebSocket         |
|  后端:  FastAPI + Pydantic + SQLAlchemy + Celery                 |
|  AI:    OpenAI GPT-4 + text-embedding-3-small                   |
|  向量库: ChromaDB (开发) / Milvus (生产)                          |
|  缓存:  Redis (会话 + 消息队列)                                   |
|  数据库: PostgreSQL (业务数据) + MongoDB (对话日志)               |
|  部署:  Docker Compose + Nginx                                   |
+------------------------------------------------------------------+

环境准备

python
# requirements.txt - 项目依赖
"""
fastapi==0.104.1
uvicorn==0.24.0
openai==1.6.1
chromadb==0.4.22
redis==5.0.1
pydantic==2.5.3
sqlalchemy==2.0.23
python-multipart==0.0.6
websockets==12.0
httpx==0.25.2
tenacity==8.2.3
python-jose==3.3.0
passlib==1.7.4
celery==5.3.6
jinja2==3.1.2
"""

import os
from dataclasses import dataclass, field
from typing import Optional


@dataclass
class AppConfig:
    """应用配置类 - 统一管理所有配置项"""

    # OpenAI 配置
    openai_api_key: str = os.getenv("OPENAI_API_KEY", "")
    openai_base_url: str = os.getenv("OPENAI_BASE_URL", "https://api.openai.com/v1")
    chat_model: str = "gpt-4o-mini"
    embedding_model: str = "text-embedding-3-small"

    # ChromaDB 配置
    chroma_persist_dir: str = "./data/chroma_db"
    collection_name: str = "customer_service_kb"

    # Redis 配置
    redis_host: str = os.getenv("REDIS_HOST", "localhost")
    redis_port: int = int(os.getenv("REDIS_PORT", "6379"))
    redis_db: int = 0

    # 对话配置
    max_history_turns: int = 10          # 最多保留的对话轮数
    session_ttl_seconds: int = 3600      # 会话过期时间(秒)
    max_rag_results: int = 5             # RAG最多返回结果数
    similarity_threshold: float = 0.75   # 相似度阈值

    # 系统提示词
    system_prompt: str = """你是一个专业的智能客服助手。你的职责是:
1. 准确回答用户关于产品和服务的问题
2. 帮助用户查询订单、处理退款等操作
3. 当无法回答时,主动转接人工客服
4. 保持友好、专业、耐心的态度

重要规则:
- 只基于知识库内容回答,不编造信息
- 涉及金额操作需要二次确认
- 敏感操作(退款/注销)需要验证身份"""


# 创建全局配置实例
config = AppConfig()
print(f"[配置加载完成] 模型: {config.chat_model}, 向量库: {config.chroma_persist_dir}")

系统架构

分层架构设计

+=======================================================================+
|                     分层架构详细设计                                    |
+=======================================================================+
|                                                                       |
|  +--- 接入层 (Access Layer) ----------------------------------------+ |
|  |                                                                   | |
|  |  HTTP REST API    WebSocket长连接    Webhook回调                  | |
|  |  (问答请求)        (实时对话)         (第三方通知)                 | |
|  +-------------------------------------------------------------------+ |
|                              |                                        |
|  +--- 业务层 (Business Layer) --------------------------------------+ |
|  |                                                                   | |
|  |  +-----------+  +------------+  +------------+  +-----------+    | |
|  |  | 意图识别  |  | 对话编排   |  | 会话管理   |  | 权限校验  |    | |
|  |  | Intent    |  | Dialogue   |  | Session    |  | Auth      |    | |
|  |  | Classify  |  | Orchestra  |  | Manager    |  | Guard     |    | |
|  |  +-----------+  +------------+  +------------+  +-----------+    | |
|  +-------------------------------------------------------------------+ |
|                              |                                        |
|  +--- 能力层 (Capability Layer) ------------------------------------+ |
|  |                                                                   | |
|  |  +-----------+  +------------+  +------------+  +-----------+    | |
|  |  | RAG检索   |  | LLM生成   |  | Agent执行  |  | 对话评估  |    | |
|  |  | Retrieval |  | Generation |  | Tool Call  |  | Evaluate  |    | |
|  |  +-----------+  +------------+  +------------+  +-----------+    | |
|  +-------------------------------------------------------------------+ |
|                              |                                        |
|  +--- 数据层 (Data Layer) ------------------------------------------+ |
|  |                                                                   | |
|  |  ChromaDB     Redis       PostgreSQL     MongoDB                  | |
|  |  (向量)       (缓存)      (业务)         (日志)                   | |
|  +-------------------------------------------------------------------+ |
|                                                                       |
+=======================================================================+

请求处理流程

用户发送消息
       |
       v
+------+-------+
| 接收消息      |
| 提取session_id|
+------+-------+
       |
       v
+------+-------+
| 加载会话上下文|  <--- Redis: 获取历史消息
| 检查会话状态  |
+------+-------+
       |
       v
+------+--------+
| 意图识别       |  <--- LLM: 判断用户意图
| - FAQ问答      |
| - 业务操作     |
| - 闲聊寒暄     |
| - 投诉/转人工  |
+------+--------+
       |
       +------------------+------------------+------------------+
       |                  |                  |                  |
       v                  v                  v                  v
  +----+----+       +-----+-----+      +----+----+      +-----+-----+
  |  RAG    |       |  Agent    |      | 闲聊    |      | 人工转接  |
  |  知识库 |       |  工具调用 |      | LLM直接 |      | WebSocket |
  |  检索   |       |  执行操作 |      | 生成    |      | 通知      |
  +----+----+       +-----+-----+      +----+----+      +-----+-----+
       |                  |                  |                  |
       +------------------+------------------+------------------+
       |
       v
+------+-------+
| 生成回复      |  <--- LLM: 组合上下文生成最终答案
| 格式化输出    |
+------+-------+
       |
       v
+------+-------+
| 保存对话记录  |  ---> Redis: 更新会话历史
| 异步写日志    |  ---> MongoDB: 对话日志
+------+-------+
       |
       v
  返回给用户

数据模型定义

python
"""
数据模型定义 - 使用 Pydantic 定义所有数据结构
"""
from datetime import datetime
from enum import Enum
from typing import Optional, List, Dict, Any
from pydantic import BaseModel, Field
import uuid


class MessageRole(str, Enum):
    """消息角色"""
    USER = "user"
    ASSISTANT = "assistant"
    SYSTEM = "system"


class IntentType(str, Enum):
    """意图类型"""
    FAQ = "faq"                      # 知识库问答
    ORDER_QUERY = "order_query"      # 查询订单
    REFUND = "refund"                # 退款申请
    COMPLAINT = "complaint"          # 投诉建议
    CHITCHAT = "chitchat"            # 闲聊寒暄
    HUMAN_TRANSFER = "human_transfer"  # 转人工
    UNKNOWN = "unknown"              # 未知意图


class ChatMessage(BaseModel):
    """单条对话消息"""
    role: MessageRole
    content: str
    timestamp: datetime = Field(default_factory=datetime.now)
    metadata: Dict[str, Any] = Field(default_factory=dict)


class ChatRequest(BaseModel):
    """用户请求模型"""
    session_id: Optional[str] = Field(default=None, description="会话ID,为空则新建会话")
    message: str = Field(..., min_length=1, max_length=2000, description="用户消息")
    user_id: Optional[str] = Field(default=None, description="用户ID,用于身份识别")


class SourceReference(BaseModel):
    """RAG引用来源"""
    doc_title: str = Field(description="文档标题")
    chunk_text: str = Field(description="引用的原文片段")
    similarity_score: float = Field(description="相似度分数")
    page_number: Optional[int] = Field(default=None, description="页码")


class ToolCallResult(BaseModel):
    """工具调用结果"""
    tool_name: str = Field(description="工具名称")
    arguments: Dict[str, Any] = Field(description="调用参数")
    result: Any = Field(description="执行结果")
    success: bool = Field(default=True, description="是否成功")


class ChatResponse(BaseModel):
    """系统回复模型"""
    session_id: str = Field(description="会话ID")
    reply: str = Field(description="回复内容")
    intent: IntentType = Field(description="识别的意图")
    sources: List[SourceReference] = Field(default_factory=list, description="引用来源")
    tool_calls: List[ToolCallResult] = Field(default_factory=list, description="工具调用")
    confidence: float = Field(default=0.0, description="置信度")
    need_human: bool = Field(default=False, description="是否需要转人工")
    timestamp: datetime = Field(default_factory=datetime.now)


class SessionState(BaseModel):
    """会话状态"""
    session_id: str = Field(default_factory=lambda: str(uuid.uuid4()))
    user_id: Optional[str] = None
    messages: List[ChatMessage] = Field(default_factory=list)
    intent_history: List[IntentType] = Field(default_factory=list)
    context: Dict[str, Any] = Field(default_factory=dict)
    created_at: datetime = Field(default_factory=datetime.now)
    last_active: datetime = Field(default_factory=datetime.now)
    is_transferred: bool = False      # 是否已转人工


# ============================================================
# 使用示例
# ============================================================
if __name__ == "__main__":
    # 创建一个请求
    request = ChatRequest(
        session_id="test-session-001",
        message="我想查一下我的订单状态",
        user_id="user-123"
    )
    print(f"[请求] session={request.session_id}, message={request.message}")

    # 创建一个回复
    response = ChatResponse(
        session_id="test-session-001",
        reply="好的,请提供您的订单号,我帮您查询。",
        intent=IntentType.ORDER_QUERY,
        confidence=0.95,
    )
    print(f"[回复] intent={response.intent}, reply={response.reply}")

    # 创建会话状态
    session = SessionState(user_id="user-123")
    session.messages.append(ChatMessage(role=MessageRole.USER, content="你好"))
    session.messages.append(ChatMessage(role=MessageRole.ASSISTANT, content="您好!有什么可以帮您?"))
    print(f"[会话] id={session.session_id}, 消息数={len(session.messages)}")

RAG知识库实现

RAG 工作原理

+=======================================================================+
|                    RAG (检索增强生成) 工作原理                          |
+=======================================================================+
|                                                                       |
|  【离线阶段 - 知识库构建】                                             |
|                                                                       |
|  原始文档             文档分块              向量化             存储    |
|  +----------+     +----------+        +----------+      +---------+  |
|  | FAQ.md   | --> | chunk_1  |  --->  | [0.12,   | ---> |         |  |
|  | 产品手册 | --> | chunk_2  |  --->  |  0.45,   | ---> | ChromaDB|  |
|  | 退款政策 | --> | chunk_3  |  --->  |  -0.23,  | ---> |  向量   |  |
|  | ...      | --> | ...      |  --->  |  ...]    | ---> |  数据库 |  |
|  +----------+     +----------+        +----------+      +---------+  |
|       |                |                    |                         |
|    读取文件        RecursiveText        OpenAI                       |
|                    Splitter             Embedding                     |
|                                                                       |
|  【在线阶段 - 检索问答】                                               |
|                                                                       |
|  用户问题 --> Embedding --> 向量检索 --> Top-K文档 --> LLM生成回答    |
|                                |                         |            |
|                                v                         v            |
|                           相似度计算               Prompt + Context   |
|                          (余弦相似度)             组合生成最终答案     |
|                                                                       |
+=======================================================================+

文档加载与分块

python
"""
文档加载器 - 支持多种格式的文档加载和智能分块
"""
import os
import re
import hashlib
from typing import List, Dict, Optional
from dataclasses import dataclass, field


@dataclass
class DocumentChunk:
    """文档分块数据结构"""
    chunk_id: str              # 分块唯一ID
    content: str               # 分块文本内容
    doc_title: str             # 来源文档标题
    doc_path: str              # 来源文档路径
    chunk_index: int           # 在文档中的序号
    page_number: Optional[int] = None  # 页码(PDF)
    metadata: Dict = field(default_factory=dict)

    def __post_init__(self):
        if not self.chunk_id:
            # 自动生成chunk_id
            content_hash = hashlib.md5(self.content.encode()).hexdigest()[:8]
            self.chunk_id = f"{self.doc_title}_{self.chunk_index}_{content_hash}"


class TextSplitter:
    """
    递归文本分块器

    策略: 按优先级尝试不同分隔符,确保语义完整性
    分隔符优先级: 段落 > 句子 > 逗号 > 空格 > 字符
    """

    def __init__(
        self,
        chunk_size: int = 500,
        chunk_overlap: int = 50,
        separators: Optional[List[str]] = None,
    ):
        self.chunk_size = chunk_size
        self.chunk_overlap = chunk_overlap
        self.separators = separators or ["\n\n", "\n", "。", "!", "?", ".", "!", "?", " ", ""]

    def split_text(self, text: str) -> List[str]:
        """将文本分成多个块"""
        final_chunks: List[str] = []
        # 找到合适的分隔符
        separator = self.separators[-1]
        for sep in self.separators:
            if sep == "":
                separator = sep
                break
            if sep in text:
                separator = sep
                break

        # 按分隔符切分
        if separator:
            splits = text.split(separator)
        else:
            splits = list(text)

        # 合并小块
        current_chunk = ""
        for split_text in splits:
            piece = split_text + separator if separator else split_text
            if len(current_chunk) + len(piece) <= self.chunk_size:
                current_chunk += piece
            else:
                if current_chunk.strip():
                    final_chunks.append(current_chunk.strip())
                # 保留overlap
                if self.chunk_overlap > 0 and len(current_chunk) > self.chunk_overlap:
                    overlap_text = current_chunk[-self.chunk_overlap:]
                    current_chunk = overlap_text + piece
                else:
                    current_chunk = piece

        if current_chunk.strip():
            final_chunks.append(current_chunk.strip())

        return final_chunks


class DocumentLoader:
    """
    多格式文档加载器

    支持格式: .txt, .md, .csv, .json
    """

    def __init__(self, splitter: Optional[TextSplitter] = None):
        self.splitter = splitter or TextSplitter()

    def load_file(self, file_path: str) -> List[DocumentChunk]:
        """加载单个文件并返回分块列表"""
        if not os.path.exists(file_path):
            raise FileNotFoundError(f"文件不存在: {file_path}")

        ext = os.path.splitext(file_path)[1].lower()
        doc_title = os.path.basename(file_path)

        # 根据文件类型读取内容
        if ext in (".txt", ".md"):
            content = self._load_text(file_path)
        elif ext == ".csv":
            content = self._load_csv(file_path)
        elif ext == ".json":
            content = self._load_json(file_path)
        else:
            raise ValueError(f"不支持的文件格式: {ext}")

        # 分块
        chunks_text = self.splitter.split_text(content)

        # 构建DocumentChunk对象
        chunks = []
        for i, text in enumerate(chunks_text):
            chunk = DocumentChunk(
                chunk_id="",
                content=text,
                doc_title=doc_title,
                doc_path=file_path,
                chunk_index=i,
                metadata={"file_type": ext, "total_chunks": len(chunks_text)},
            )
            chunks.append(chunk)

        return chunks

    def load_directory(self, dir_path: str, extensions: Optional[List[str]] = None) -> List[DocumentChunk]:
        """批量加载目录下的所有文档"""
        extensions = extensions or [".txt", ".md", ".csv", ".json"]
        all_chunks: List[DocumentChunk] = []

        for root, _dirs, files in os.walk(dir_path):
            for fname in sorted(files):
                ext = os.path.splitext(fname)[1].lower()
                if ext in extensions:
                    fpath = os.path.join(root, fname)
                    try:
                        chunks = self.load_file(fpath)
                        all_chunks.extend(chunks)
                        print(f"  [加载成功] {fname} -> {len(chunks)} 个分块")
                    except Exception as e:
                        print(f"  [加载失败] {fname}: {e}")

        print(f"[文档加载完成] 共 {len(all_chunks)} 个分块")
        return all_chunks

    def _load_text(self, path: str) -> str:
        with open(path, "r", encoding="utf-8") as f:
            return f.read()

    def _load_csv(self, path: str) -> str:
        import csv
        rows = []
        with open(path, "r", encoding="utf-8") as f:
            reader = csv.reader(f)
            for row in reader:
                rows.append(" | ".join(row))
        return "\n".join(rows)

    def _load_json(self, path: str) -> str:
        import json
        with open(path, "r", encoding="utf-8") as f:
            data = json.load(f)
        if isinstance(data, list):
            return "\n\n".join(
                json.dumps(item, ensure_ascii=False, indent=2) if isinstance(item, dict)
                else str(item)
                for item in data
            )
        return json.dumps(data, ensure_ascii=False, indent=2)


# ============================================================
# 使用示例
# ============================================================
if __name__ == "__main__":
    # 创建分块器
    splitter = TextSplitter(chunk_size=300, chunk_overlap=30)

    # 测试文本分块
    sample_text = """
    退款政策说明

    1. 7天无理由退款:购买后7天内可申请无理由退款,商品需保持完好。
    2. 质量问题退款:收到商品存在质量问题,15天内可申请退款退货。
    3. 退款流程:提交退款申请 -> 客服审核 -> 退回商品 -> 确认收货 -> 退款到账。
    4. 退款时效:审核通过后,退款将在3-5个工作日内原路返回。
    5. 不支持退款的情况:定制商品、已使用的虚拟商品、超过退款期限。
    """
    chunks = splitter.split_text(sample_text.strip())
    for i, chunk in enumerate(chunks):
        print(f"--- 分块 {i+1} (长度: {len(chunk)}) ---")
        print(chunk[:100] + "..." if len(chunk) > 100 else chunk)

向量存储与检索

python
"""
向量知识库 - 基于 ChromaDB 的向量存储和语义检索
"""
from typing import List, Dict, Optional, Tuple
from dataclasses import dataclass
import json
import os

# ChromaDB 和 OpenAI 客户端
try:
    import chromadb
    from chromadb.config import Settings
    from openai import OpenAI
    HAS_DEPS = True
except ImportError:
    HAS_DEPS = False
    print("[警告] 请安装依赖: pip install chromadb openai")


@dataclass
class RetrievalResult:
    """检索结果"""
    content: str
    doc_title: str
    chunk_id: str
    similarity_score: float
    metadata: Dict


class VectorKnowledgeBase:
    """
    向量知识库

    功能:
    1. 文档向量化并存入ChromaDB
    2. 语义检索 - 根据查询返回最相关的文档分块
    3. 支持增量更新和文档删除

    使用流程:
        kb = VectorKnowledgeBase()
        kb.add_documents(chunks)              # 添加文档
        results = kb.search("退款政策是什么")   # 语义检索
    """

    def __init__(
        self,
        persist_directory: str = "./data/chroma_db",
        collection_name: str = "customer_service_kb",
        embedding_model: str = "text-embedding-3-small",
    ):
        self.embedding_model = embedding_model
        self.collection_name = collection_name

        if not HAS_DEPS:
            raise RuntimeError("缺少依赖,请安装: pip install chromadb openai")

        # 初始化 OpenAI 客户端
        self.openai_client = OpenAI(
            api_key=os.getenv("OPENAI_API_KEY", "your-api-key"),
            base_url=os.getenv("OPENAI_BASE_URL", "https://api.openai.com/v1"),
        )

        # 初始化 ChromaDB
        os.makedirs(persist_directory, exist_ok=True)
        self.chroma_client = chromadb.PersistentClient(path=persist_directory)
        self.collection = self.chroma_client.get_or_create_collection(
            name=collection_name,
            metadata={"hnsw:space": "cosine"},  # 使用余弦相似度
        )
        print(f"[向量知识库] 集合: {collection_name}, 已有 {self.collection.count()} 条记录")

    def _get_embedding(self, texts: List[str]) -> List[List[float]]:
        """调用 OpenAI Embedding API 获取向量"""
        response = self.openai_client.embeddings.create(
            model=self.embedding_model,
            input=texts,
        )
        return [item.embedding for item in response.data]

    def add_documents(self, chunks: list, batch_size: int = 50) -> int:
        """
        批量添加文档到向量库

        参数:
            chunks: DocumentChunk 列表
            batch_size: 每批处理数量(避免API超时)
        返回:
            成功添加的文档数量
        """
        added = 0
        for i in range(0, len(chunks), batch_size):
            batch = chunks[i : i + batch_size]
            texts = [c.content for c in batch]
            ids = [c.chunk_id for c in batch]
            metadatas = [
                {
                    "doc_title": c.doc_title,
                    "doc_path": c.doc_path,
                    "chunk_index": c.chunk_index,
                    "page_number": c.page_number or -1,
                }
                for c in batch
            ]

            # 获取向量
            embeddings = self._get_embedding(texts)

            # 写入ChromaDB
            self.collection.upsert(
                ids=ids,
                embeddings=embeddings,
                documents=texts,
                metadatas=metadatas,
            )
            added += len(batch)
            print(f"  [写入] {added}/{len(chunks)} 条")

        print(f"[文档入库完成] 共 {added} 条,向量库总量: {self.collection.count()}")
        return added

    def search(
        self,
        query: str,
        top_k: int = 5,
        score_threshold: float = 0.70,
        filter_metadata: Optional[Dict] = None,
    ) -> List[RetrievalResult]:
        """
        语义检索

        参数:
            query: 用户查询文本
            top_k: 返回最多结果数
            score_threshold: 最低相似度阈值
            filter_metadata: 元数据过滤条件
        """
        # 查询向量化
        query_embedding = self._get_embedding([query])[0]

        # ChromaDB检索
        search_params = {
            "query_embeddings": [query_embedding],
            "n_results": top_k,
            "include": ["documents", "metadatas", "distances"],
        }
        if filter_metadata:
            search_params["where"] = filter_metadata

        results = self.collection.query(**search_params)

        # 解析结果
        retrieval_results = []
        if results and results["documents"]:
            for doc, meta, dist in zip(
                results["documents"][0],
                results["metadatas"][0],
                results["distances"][0],
            ):
                # ChromaDB cosine distance: 0=完全相同, 2=完全不同
                # 转换为相似度: similarity = 1 - distance/2
                similarity = 1 - dist / 2

                if similarity >= score_threshold:
                    retrieval_results.append(
                        RetrievalResult(
                            content=doc,
                            doc_title=meta.get("doc_title", ""),
                            chunk_id=meta.get("chunk_id", ""),
                            similarity_score=round(similarity, 4),
                            metadata=meta,
                        )
                    )

        # 按相似度降序排列
        retrieval_results.sort(key=lambda x: x.similarity_score, reverse=True)
        return retrieval_results

    def delete_document(self, doc_title: str) -> int:
        """删除指定文档的所有分块"""
        # 查找该文档的所有chunk
        results = self.collection.get(where={"doc_title": doc_title})
        if results and results["ids"]:
            self.collection.delete(ids=results["ids"])
            print(f"[删除文档] {doc_title}: 删除 {len(results['ids'])} 个分块")
            return len(results["ids"])
        return 0

    def get_stats(self) -> Dict:
        """获取知识库统计信息"""
        count = self.collection.count()
        return {
            "collection_name": self.collection_name,
            "total_chunks": count,
            "embedding_model": self.embedding_model,
        }


# ============================================================
# RAG问答链
# ============================================================
class RAGChain:
    """
    RAG问答链 - 组合检索和生成

    流程: 用户问题 -> 向量检索 -> 构建Prompt -> LLM生成 -> 返回带引用的答案
    """

    def __init__(self, knowledge_base: VectorKnowledgeBase):
        self.kb = knowledge_base
        self.openai_client = OpenAI(
            api_key=os.getenv("OPENAI_API_KEY", "your-api-key"),
            base_url=os.getenv("OPENAI_BASE_URL", "https://api.openai.com/v1"),
        )

    def build_rag_prompt(self, query: str, contexts: List[RetrievalResult]) -> str:
        """构建包含检索上下文的Prompt"""
        context_text = ""
        for i, ctx in enumerate(contexts, 1):
            context_text += f"\n【参考资料 {i}】(来源: {ctx.doc_title}, 相似度: {ctx.similarity_score})\n"
            context_text += f"{ctx.content}\n"

        prompt = f"""请基于以下参考资料回答用户问题。

要求:
1. 只使用参考资料中的信息回答,不要编造内容
2. 如果参考资料不足以回答问题,请诚实说明
3. 回答要简洁、准确、有条理
4. 在回答末尾标注引用了哪些参考资料

===== 参考资料 =====
{context_text}
===== 参考资料结束 =====

用户问题: {query}

请回答:"""
        return prompt

    def answer(
        self,
        query: str,
        chat_history: Optional[List[Dict]] = None,
        top_k: int = 5,
    ) -> Tuple[str, List[RetrievalResult]]:
        """
        RAG问答

        返回: (回答文本, 引用来源列表)
        """
        # 1. 检索相关文档
        results = self.kb.search(query, top_k=top_k)

        if not results:
            return "抱歉,我在知识库中没有找到相关信息。请问您能换个方式描述问题吗?", []

        # 2. 构建Prompt
        rag_prompt = self.build_rag_prompt(query, results)

        # 3. 构建消息列表
        messages = [
            {"role": "system", "content": "你是一个专业的客服助手,基于提供的参考资料回答问题。"},
        ]
        # 添加历史对话(提供上下文)
        if chat_history:
            for msg in chat_history[-6:]:  # 最多保留最近3轮
                messages.append(msg)

        messages.append({"role": "user", "content": rag_prompt})

        # 4. 调用LLM生成回答
        response = self.openai_client.chat.completions.create(
            model="gpt-4o-mini",
            messages=messages,
            temperature=0.3,       # 低温度,更准确
            max_tokens=1000,
        )

        answer_text = response.choices[0].message.content
        return answer_text, results


# ============================================================
# 使用示例
# ============================================================
if __name__ == "__main__":
    print("=== RAG知识库 使用示例 ===")
    print()
    print("# 1. 初始化知识库")
    print("kb = VectorKnowledgeBase(persist_directory='./data/chroma_db')")
    print()
    print("# 2. 加载文档")
    print("loader = DocumentLoader()")
    print("chunks = loader.load_directory('./knowledge_base/')")
    print("kb.add_documents(chunks)")
    print()
    print("# 3. 检索测试")
    print("results = kb.search('退款需要多长时间')")
    print("for r in results:")
    print("    print(f'  [{r.similarity_score}] {r.doc_title}: {r.content[:80]}...')")
    print()
    print("# 4. RAG问答")
    print("rag = RAGChain(kb)")
    print("answer, sources = rag.answer('退款需要多长时间?')")
    print(f"# answer = '退款审核通过后3-5个工作日原路返回...'")

多轮对话管理

对话管理架构

+=======================================================================+
|                      多轮对话管理架构                                   |
+=======================================================================+
|                                                                       |
|  +--- 会话存储 (Redis) -------------------------------------------+  |
|  |                                                                 |  |
|  |  session:abc123 = {                                             |  |
|  |    "messages": [                                                |  |
|  |      {"role":"user",      "content":"我想查订单"},               |  |
|  |      {"role":"assistant", "content":"请提供订单号"},             |  |
|  |      {"role":"user",      "content":"A12345"},                  |  |
|  |      {"role":"assistant", "content":"订单A12345状态..."}        |  |
|  |    ],                                                           |  |
|  |    "context": {"order_id":"A12345", "verified": true},          |  |
|  |    "intent_history": ["order_query", "order_query"]             |  |
|  |  }                                                              |  |
|  |  TTL: 3600秒                                                    |  |
|  +-----------------------------------------------------------------+  |
|                                                                       |
|  +--- 上下文窗口策略 ---------------------------------------------+  |
|  |                                                                 |  |
|  |  滑动窗口: 保留最近 N 轮对话                                    |  |
|  |                                                                 |  |
|  |  [轮1] [轮2] [轮3] [轮4] [轮5] [轮6] [轮7] [轮8]              |  |
|  |                          |<--- 保留最近5轮 --->|               |  |
|  |  |<-- 被截断(摘要化) -->|                                      |  |
|  |                                                                 |  |
|  |  摘要策略: 对早期对话生成摘要,作为系统消息注入                  |  |
|  +-----------------------------------------------------------------+  |
|                                                                       |
+=======================================================================+

会话管理器

python
"""
会话管理器 - 基于Redis的多轮对话状态管理
"""
import json
import time
import uuid
from typing import List, Dict, Optional, Any
from dataclasses import dataclass, field, asdict
from datetime import datetime

# 尝试导入Redis,提供Mock实现用于本地测试
try:
    import redis
    HAS_REDIS = True
except ImportError:
    HAS_REDIS = False


class InMemoryStore:
    """
    内存存储 - Redis的本地替代,用于开发和测试
    接口与Redis兼容
    """

    def __init__(self):
        self._store: Dict[str, str] = {}
        self._ttl: Dict[str, float] = {}

    def get(self, key: str) -> Optional[str]:
        # 检查过期
        if key in self._ttl and time.time() > self._ttl[key]:
            del self._store[key]
            del self._ttl[key]
            return None
        return self._store.get(key)

    def set(self, key: str, value: str, ex: Optional[int] = None):
        self._store[key] = value
        if ex:
            self._ttl[key] = time.time() + ex

    def delete(self, key: str):
        self._store.pop(key, None)
        self._ttl.pop(key, None)

    def exists(self, key: str) -> bool:
        if key in self._ttl and time.time() > self._ttl[key]:
            del self._store[key]
            del self._ttl[key]
            return False
        return key in self._store

    def keys(self, pattern: str = "*") -> List[str]:
        import fnmatch
        return [k for k in self._store.keys() if fnmatch.fnmatch(k, pattern)]


@dataclass
class Message:
    """对话消息"""
    role: str        # "user" | "assistant" | "system"
    content: str
    timestamp: str = ""
    metadata: Dict[str, Any] = field(default_factory=dict)

    def __post_init__(self):
        if not self.timestamp:
            self.timestamp = datetime.now().isoformat()


@dataclass
class Session:
    """会话对象"""
    session_id: str = ""
    user_id: str = ""
    messages: List[Message] = field(default_factory=list)
    context: Dict[str, Any] = field(default_factory=dict)
    intent_history: List[str] = field(default_factory=list)
    created_at: str = ""
    last_active: str = ""
    is_transferred: bool = False

    def __post_init__(self):
        if not self.session_id:
            self.session_id = str(uuid.uuid4())
        now = datetime.now().isoformat()
        if not self.created_at:
            self.created_at = now
        if not self.last_active:
            self.last_active = now


class SessionManager:
    """
    会话管理器

    负责:
    1. 会话的创建、读取、更新、删除
    2. 对话历史的管理(滑动窗口 + 历史摘要)
    3. 上下文变量的维护
    4. 会话过期自动清理
    """

    KEY_PREFIX = "chatbot:session:"

    def __init__(
        self,
        redis_host: str = "localhost",
        redis_port: int = 6379,
        redis_db: int = 0,
        session_ttl: int = 3600,
        max_history_turns: int = 10,
        use_memory: bool = False,
    ):
        self.session_ttl = session_ttl
        self.max_history_turns = max_history_turns

        # 选择存储后端
        if use_memory or not HAS_REDIS:
            self.store = InMemoryStore()
            print("[会话管理器] 使用内存存储(开发模式)")
        else:
            self.store = redis.Redis(
                host=redis_host, port=redis_port, db=redis_db, decode_responses=True
            )
            print(f"[会话管理器] 连接Redis: {redis_host}:{redis_port}")

    def _key(self, session_id: str) -> str:
        return f"{self.KEY_PREFIX}{session_id}"

    def create_session(self, user_id: str = "") -> Session:
        """创建新会话"""
        session = Session(user_id=user_id)
        self._save(session)
        print(f"[新会话] id={session.session_id}, user={user_id}")
        return session

    def get_session(self, session_id: str) -> Optional[Session]:
        """获取会话,不存在则返回None"""
        data = self.store.get(self._key(session_id))
        if data is None:
            return None
        return self._deserialize(data)

    def get_or_create(self, session_id: Optional[str] = None, user_id: str = "") -> Session:
        """获取或创建会话"""
        if session_id:
            session = self.get_session(session_id)
            if session:
                return session
        return self.create_session(user_id)

    def add_message(self, session_id: str, role: str, content: str, metadata: Optional[Dict] = None) -> Session:
        """添加消息到会话"""
        session = self.get_session(session_id)
        if not session:
            raise ValueError(f"会话不存在: {session_id}")

        msg = Message(role=role, content=content, metadata=metadata or {})
        session.messages.append(msg)
        session.last_active = datetime.now().isoformat()

        # 如果超过最大轮数,截断早期消息
        max_messages = self.max_history_turns * 2  # 每轮 user+assistant
        if len(session.messages) > max_messages:
            # 保留system消息 + 最近的N条
            system_msgs = [m for m in session.messages if m.role == "system"]
            recent_msgs = session.messages[-max_messages:]
            session.messages = system_msgs + recent_msgs

        self._save(session)
        return session

    def update_context(self, session_id: str, key: str, value: Any):
        """更新会话上下文变量"""
        session = self.get_session(session_id)
        if session:
            session.context[key] = value
            self._save(session)

    def add_intent(self, session_id: str, intent: str):
        """记录意图历史"""
        session = self.get_session(session_id)
        if session:
            session.intent_history.append(intent)
            self._save(session)

    def get_chat_history(self, session_id: str, max_turns: Optional[int] = None) -> List[Dict]:
        """获取对话历史(OpenAI格式)"""
        session = self.get_session(session_id)
        if not session:
            return []

        messages = session.messages
        if max_turns:
            # 只取最近N轮
            messages = messages[-(max_turns * 2):]

        return [{"role": m.role, "content": m.content} for m in messages]

    def delete_session(self, session_id: str):
        """删除会话"""
        self.store.delete(self._key(session_id))
        print(f"[删除会话] {session_id}")

    def list_active_sessions(self) -> List[str]:
        """列出所有活跃会话"""
        keys = self.store.keys(f"{self.KEY_PREFIX}*")
        return [k.replace(self.KEY_PREFIX, "") for k in keys]

    def _save(self, session: Session):
        """序列化并保存会话"""
        data = json.dumps(asdict(session), ensure_ascii=False)
        self.store.set(self._key(session.session_id), data, ex=self.session_ttl)

    def _deserialize(self, data: str) -> Session:
        """反序列化会话"""
        d = json.loads(data)
        d["messages"] = [Message(**m) for m in d.get("messages", [])]
        return Session(**d)


# ============================================================
# 使用示例
# ============================================================
if __name__ == "__main__":
    # 使用内存存储进行测试
    manager = SessionManager(use_memory=True, max_history_turns=5)

    # 创建会话
    session = manager.create_session(user_id="user-001")
    sid = session.session_id
    print(f"会话ID: {sid}")

    # 模拟多轮对话
    conversations = [
        ("user", "你好,我想查一下订单"),
        ("assistant", "您好!请提供您的订单号,我帮您查询。"),
        ("user", "订单号是 ORD-20240101-001"),
        ("assistant", "查询到订单 ORD-20240101-001:\n- 商品: iPhone 15\n- 状态: 已发货\n- 预计到达: 1月5日"),
        ("user", "物流到哪了?"),
        ("assistant", "您的包裹目前在北京转运中心,预计明天送达。"),
        ("user", "好的,谢谢"),
        ("assistant", "不客气!还有其他问题随时问我。"),
    ]

    for role, content in conversations:
        manager.add_message(sid, role, content)

    # 查看对话历史
    history = manager.get_chat_history(sid)
    print(f"\n对话历史 ({len(history)} 条):")
    for msg in history:
        prefix = "  用户: " if msg["role"] == "user" else "  助手: "
        print(f"{prefix}{msg['content'][:50]}")

    # 更新上下文
    manager.update_context(sid, "order_id", "ORD-20240101-001")
    manager.update_context(sid, "verified", True)

    # 查看会话状态
    session = manager.get_session(sid)
    print(f"\n会话上下文: {session.context}")
    print(f"活跃会话数: {len(manager.list_active_sessions())}")

Agent工具调用

Agent 架构

+=======================================================================+
|                      Agent 工具调用架构                                 |
+=======================================================================+
|                                                                       |
|  用户消息: "帮我退款,订单号A12345"                                    |
|       |                                                               |
|       v                                                               |
|  +----+--------------------------------------------+                  |
|  |              LLM (function calling)             |                  |
|  |                                                  |                  |
|  |  分析用户意图,决定需要调用哪些工具              |                  |
|  |  输出: tool_name + arguments                     |                  |
|  +----+--------------------------------------------+                  |
|       |                                                               |
|       v                                                               |
|  +----+--------------------------------------------+                  |
|  |              工具路由器 (Tool Router)             |                  |
|  +----+--------+--------+--------+---------+-------+                  |
|       |        |        |        |         |                          |
|       v        v        v        v         v                          |
|  +--------+ +------+ +------+ +-------+ +--------+                   |
|  | 查订单 | | 退款 | | 查物流| | 查余额 | | 修改  |                   |
|  | query  | | refund| | track | | balance| | 信息  |                   |
|  | _order | |       | |       | |        | | update|                   |
|  +----+---+ +---+--+ +---+--+ +----+---+ +---+---+                   |
|       |         |        |         |          |                       |
|       v         v        v         v          v                       |
|  +----+---------+--------+---------+----------+---+                   |
|  |            业务数据库 / 外部API                  |                   |
|  +------------------------------------------------+                   |
|       |                                                               |
|       v                                                               |
|  +----+--------------------------------------------+                  |
|  |    LLM 组合工具结果,生成自然语言回复            |                  |
|  +----+--------------------------------------------+                  |
|       |                                                               |
|       v                                                               |
|  回复用户: "您的订单A12345退款已提交,预计3个工作日到账"              |
|                                                                       |
+=======================================================================+

工具定义与执行

python
"""
Agent 工具系统 - 定义和执行客服场景下的各种工具
"""
import json
import random
import string
from typing import Dict, Any, List, Optional, Callable
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from enum import Enum


# ============================================================
# 工具定义 (OpenAI Function Calling 格式)
# ============================================================

TOOL_DEFINITIONS = [
    {
        "type": "function",
        "function": {
            "name": "query_order",
            "description": "根据订单号查询订单详情,包括商品信息、金额、状态、物流等",
            "parameters": {
                "type": "object",
                "properties": {
                    "order_id": {
                        "type": "string",
                        "description": "订单号,格式如 ORD-YYYYMMDD-NNN",
                    }
                },
                "required": ["order_id"],
            },
        },
    },
    {
        "type": "function",
        "function": {
            "name": "submit_refund",
            "description": "提交退款申请。需要订单号和退款原因。",
            "parameters": {
                "type": "object",
                "properties": {
                    "order_id": {
                        "type": "string",
                        "description": "要退款的订单号",
                    },
                    "reason": {
                        "type": "string",
                        "enum": ["quality_issue", "wrong_item", "no_longer_needed", "not_received", "other"],
                        "description": "退款原因",
                    },
                    "description": {
                        "type": "string",
                        "description": "详细描述(可选)",
                    },
                },
                "required": ["order_id", "reason"],
            },
        },
    },
    {
        "type": "function",
        "function": {
            "name": "track_shipping",
            "description": "查询物流配送状态",
            "parameters": {
                "type": "object",
                "properties": {
                    "order_id": {
                        "type": "string",
                        "description": "订单号",
                    }
                },
                "required": ["order_id"],
            },
        },
    },
    {
        "type": "function",
        "function": {
            "name": "check_balance",
            "description": "查询用户账户余额和积分",
            "parameters": {
                "type": "object",
                "properties": {
                    "user_id": {
                        "type": "string",
                        "description": "用户ID",
                    }
                },
                "required": ["user_id"],
            },
        },
    },
    {
        "type": "function",
        "function": {
            "name": "update_user_info",
            "description": "修改用户信息(收货地址、手机号等)",
            "parameters": {
                "type": "object",
                "properties": {
                    "user_id": {"type": "string", "description": "用户ID"},
                    "field": {
                        "type": "string",
                        "enum": ["address", "phone", "email", "name"],
                        "description": "要修改的字段",
                    },
                    "new_value": {"type": "string", "description": "新值"},
                },
                "required": ["user_id", "field", "new_value"],
            },
        },
    },
    {
        "type": "function",
        "function": {
            "name": "search_knowledge_base",
            "description": "搜索知识库获取产品信息、政策说明等",
            "parameters": {
                "type": "object",
                "properties": {
                    "query": {"type": "string", "description": "搜索关键词"},
                },
                "required": ["query"],
            },
        },
    },
]


# ============================================================
# 工具实现 (模拟业务逻辑)
# ============================================================

# 模拟数据库
MOCK_ORDERS = {
    "ORD-20240101-001": {
        "order_id": "ORD-20240101-001",
        "user_id": "user-001",
        "product": "iPhone 15 128GB 蓝色",
        "amount": 5999.00,
        "status": "shipped",
        "status_text": "已发货",
        "created_at": "2024-01-01 10:30:00",
        "shipping_company": "顺丰速运",
        "tracking_number": "SF1234567890",
    },
    "ORD-20240102-002": {
        "order_id": "ORD-20240102-002",
        "user_id": "user-001",
        "product": "AirPods Pro 2",
        "amount": 1799.00,
        "status": "delivered",
        "status_text": "已签收",
        "created_at": "2024-01-02 14:20:00",
        "shipping_company": "京东物流",
        "tracking_number": "JD9876543210",
    },
}

MOCK_USERS = {
    "user-001": {
        "user_id": "user-001",
        "name": "张三",
        "phone": "138****1234",
        "balance": 200.50,
        "points": 3580,
        "address": "北京市朝阳区xxx街道xxx号",
    },
}


def query_order(order_id: str) -> Dict[str, Any]:
    """查询订单"""
    order = MOCK_ORDERS.get(order_id)
    if not order:
        return {"success": False, "error": f"未找到订单: {order_id}"}
    return {"success": True, "data": order}


def submit_refund(order_id: str, reason: str, description: str = "") -> Dict[str, Any]:
    """提交退款"""
    order = MOCK_ORDERS.get(order_id)
    if not order:
        return {"success": False, "error": f"未找到订单: {order_id}"}

    if order["status"] == "refunded":
        return {"success": False, "error": "该订单已退款,请勿重复申请"}

    refund_id = "REF-" + "".join(random.choices(string.digits, k=8))
    return {
        "success": True,
        "data": {
            "refund_id": refund_id,
            "order_id": order_id,
            "amount": order["amount"],
            "reason": reason,
            "description": description,
            "status": "pending",
            "estimated_days": "3-5个工作日",
        },
    }


def track_shipping(order_id: str) -> Dict[str, Any]:
    """查询物流"""
    order = MOCK_ORDERS.get(order_id)
    if not order:
        return {"success": False, "error": f"未找到订单: {order_id}"}

    tracking_info = [
        {"time": "2024-01-03 08:00", "status": "包裹已揽收", "location": "深圳"},
        {"time": "2024-01-03 15:30", "status": "已到达转运中心", "location": "广州"},
        {"time": "2024-01-04 06:00", "status": "运输中", "location": "武汉"},
        {"time": "2024-01-04 18:00", "status": "已到达目的城市", "location": "北京"},
        {"time": "2024-01-05 09:30", "status": "派送中", "location": "北京朝阳区"},
    ]
    return {
        "success": True,
        "data": {
            "order_id": order_id,
            "company": order.get("shipping_company", "未知"),
            "tracking_number": order.get("tracking_number", ""),
            "latest_status": tracking_info[-1]["status"],
            "tracking_details": tracking_info,
        },
    }


def check_balance(user_id: str) -> Dict[str, Any]:
    """查询余额"""
    user = MOCK_USERS.get(user_id)
    if not user:
        return {"success": False, "error": f"未找到用户: {user_id}"}
    return {
        "success": True,
        "data": {
            "user_id": user_id,
            "balance": user["balance"],
            "points": user["points"],
            "points_value": round(user["points"] / 100, 2),
        },
    }


def update_user_info(user_id: str, field_name: str, new_value: str) -> Dict[str, Any]:
    """修改用户信息"""
    user = MOCK_USERS.get(user_id)
    if not user:
        return {"success": False, "error": f"未找到用户: {user_id}"}

    old_value = user.get(field_name, "")
    # 模拟更新
    return {
        "success": True,
        "data": {
            "field": field_name,
            "old_value": old_value,
            "new_value": new_value,
            "message": f"已将{field_name}修改为: {new_value}",
        },
    }


def search_knowledge_base(query: str) -> Dict[str, Any]:
    """搜索知识库(模拟)"""
    # 实际项目中调用 VectorKnowledgeBase.search()
    mock_results = {
        "退款": "7天无理由退款,审核通过后3-5个工作日到账。",
        "发货": "订单支付后24小时内发货,偏远地区48小时。",
        "保修": "电子产品享受一年保修服务,人为损坏不在保修范围内。",
    }
    for keyword, answer in mock_results.items():
        if keyword in query:
            return {"success": True, "data": {"query": query, "results": [answer]}}
    return {"success": True, "data": {"query": query, "results": ["暂未找到相关信息"]}}


# ============================================================
# 工具注册表
# ============================================================

TOOL_REGISTRY: Dict[str, Callable] = {
    "query_order": query_order,
    "submit_refund": submit_refund,
    "track_shipping": track_shipping,
    "check_balance": check_balance,
    "update_user_info": update_user_info,
    "search_knowledge_base": search_knowledge_base,
}


class ToolExecutor:
    """
    工具执行器 - 安全地执行工具调用

    功能:
    1. 参数校验
    2. 权限检查
    3. 执行工具
    4. 记录日志
    """

    def __init__(self, registry: Optional[Dict[str, Callable]] = None):
        self.registry = registry or TOOL_REGISTRY
        # 需要二次确认的危险操作
        self.dangerous_tools = {"submit_refund", "update_user_info"}

    def execute(self, tool_name: str, arguments: Dict[str, Any]) -> Dict[str, Any]:
        """执行工具调用"""
        if tool_name not in self.registry:
            return {"success": False, "error": f"未知工具: {tool_name}"}

        try:
            func = self.registry[tool_name]
            result = func(**arguments)
            print(f"  [工具调用] {tool_name}({arguments}) -> success={result.get('success')}")
            return result
        except Exception as e:
            print(f"  [工具错误] {tool_name}: {e}")
            return {"success": False, "error": str(e)}

    def needs_confirmation(self, tool_name: str) -> bool:
        """是否需要用户二次确认"""
        return tool_name in self.dangerous_tools

    def list_tools(self) -> List[str]:
        """列出所有可用工具"""
        return list(self.registry.keys())


# ============================================================
# Agent 主循环
# ============================================================

class CustomerServiceAgent:
    """
    客服Agent - 结合LLM和工具调用的智能代理

    工作流程:
    1. 接收用户消息
    2. LLM分析意图,决定是否调用工具
    3. 如需工具 -> 执行工具 -> 将结果返回LLM
    4. LLM生成最终回复
    """

    def __init__(self):
        self.tool_executor = ToolExecutor()
        try:
            from openai import OpenAI
            self.client = OpenAI(
                api_key=os.getenv("OPENAI_API_KEY", "your-key"),
                base_url=os.getenv("OPENAI_BASE_URL", "https://api.openai.com/v1"),
            )
            self.available = True
        except ImportError:
            self.available = False

    def run(
        self,
        user_message: str,
        chat_history: Optional[List[Dict]] = None,
        user_id: str = "",
    ) -> Dict[str, Any]:
        """
        Agent主入口

        返回: {"reply": str, "tool_calls": list, "intent": str}
        """
        if not self.available:
            return self._mock_run(user_message, user_id)

        messages = [
            {
                "role": "system",
                "content": f"""你是一个智能客服助手。当前用户ID: {user_id}

你可以使用工具来帮助用户完成操作。规则:
1. 查询类操作直接执行
2. 退款、修改信息等操作需要先确认
3. 无法处理的问题建议转人工
4. 保持友好专业的语气""",
            }
        ]
        if chat_history:
            messages.extend(chat_history[-10:])
        messages.append({"role": "user", "content": user_message})

        # 第一次调用: LLM决定是否使用工具
        response = self.client.chat.completions.create(
            model="gpt-4o-mini",
            messages=messages,
            tools=TOOL_DEFINITIONS,
            tool_choice="auto",
            temperature=0.3,
        )

        assistant_msg = response.choices[0].message
        tool_calls_result = []

        # 如果LLM决定调用工具
        if assistant_msg.tool_calls:
            messages.append(assistant_msg)

            for tool_call in assistant_msg.tool_calls:
                func_name = tool_call.function.name
                func_args = json.loads(tool_call.function.arguments)

                # 执行工具
                result = self.tool_executor.execute(func_name, func_args)
                tool_calls_result.append({
                    "tool": func_name,
                    "args": func_args,
                    "result": result,
                })

                # 将工具结果添加到消息
                messages.append({
                    "role": "tool",
                    "tool_call_id": tool_call.id,
                    "content": json.dumps(result, ensure_ascii=False),
                })

            # 第二次调用: LLM根据工具结果生成回复
            final_response = self.client.chat.completions.create(
                model="gpt-4o-mini",
                messages=messages,
                temperature=0.5,
            )
            reply = final_response.choices[0].message.content
        else:
            reply = assistant_msg.content

        return {
            "reply": reply,
            "tool_calls": tool_calls_result,
            "intent": self._classify_intent(tool_calls_result),
        }

    def _mock_run(self, user_message: str, user_id: str) -> Dict[str, Any]:
        """无API时的模拟运行"""
        tool_calls_result = []

        # 简单规则匹配
        if "订单" in user_message or "查询" in user_message:
            # 尝试提取订单号
            import re
            match = re.search(r"ORD-\d{8}-\d{3}", user_message)
            if match:
                oid = match.group()
                result = self.tool_executor.execute("query_order", {"order_id": oid})
                tool_calls_result.append({"tool": "query_order", "args": {"order_id": oid}, "result": result})
                if result["success"]:
                    data = result["data"]
                    reply = (
                        f"查询到您的订单信息:\n"
                        f"- 订单号: {data['order_id']}\n"
                        f"- 商品: {data['product']}\n"
                        f"- 金额: {data['amount']}\n"
                        f"- 状态: {data['status_text']}"
                    )
                else:
                    reply = f"抱歉,{result['error']}"
            else:
                reply = "请提供您的订单号(格式: ORD-XXXXXXXX-XXX),我帮您查询。"

        elif "退款" in user_message:
            reply = "好的,请提供订单号和退款原因,我帮您提交退款申请。"

        elif "物流" in user_message or "快递" in user_message:
            reply = "请提供订单号,我帮您查询物流信息。"

        else:
            reply = "您好!我是智能客服助手,可以帮您查询订单、申请退款、查物流等。请问有什么可以帮您的?"

        return {
            "reply": reply,
            "tool_calls": tool_calls_result,
            "intent": self._classify_intent(tool_calls_result),
        }

    def _classify_intent(self, tool_calls: List) -> str:
        """根据工具调用推断意图"""
        if not tool_calls:
            return "chitchat"
        tool_names = [tc["tool"] for tc in tool_calls]
        if "query_order" in tool_names:
            return "order_query"
        if "submit_refund" in tool_names:
            return "refund"
        if "track_shipping" in tool_names:
            return "order_query"
        return "faq"


import os

# ============================================================
# 使用示例
# ============================================================
if __name__ == "__main__":
    print("=== Agent 工具调用测试 ===\n")

    agent = CustomerServiceAgent()

    # 测试场景1: 查询订单
    print("[场景1] 查询订单")
    result = agent.run("帮我查一下订单 ORD-20240101-001 的状态", user_id="user-001")
    print(f"  回复: {result['reply']}")
    print(f"  意图: {result['intent']}")
    print(f"  工具调用: {len(result['tool_calls'])} 次")
    print()

    # 测试场景2: 退款
    print("[场景2] 退款咨询")
    result = agent.run("我想退款", user_id="user-001")
    print(f"  回复: {result['reply']}")
    print()

    # 测试场景3: 闲聊
    print("[场景3] 闲聊")
    result = agent.run("你好呀", user_id="user-001")
    print(f"  回复: {result['reply']}")
    print(f"  意图: {result['intent']}")

FastAPI后端服务

API架构

+=======================================================================+
|                       FastAPI 后端架构                                  |
+=======================================================================+
|                                                                       |
|  +--- API路由 ---------------------------------------------------+   |
|  |                                                                |   |
|  |  POST /api/v1/chat           主对话接口                        |   |
|  |  POST /api/v1/chat/stream    流式对话接口(SSE)                 |   |
|  |  GET  /api/v1/session/{id}   获取会话信息                      |   |
|  |  DELETE /api/v1/session/{id} 删除会话                          |   |
|  |  POST /api/v1/feedback       用户反馈                          |   |
|  |  GET  /api/v1/health         健康检查                          |   |
|  |  POST /api/v1/kb/upload      上传知识库文档                    |   |
|  |  WS   /ws/chat               WebSocket实时对话                 |   |
|  +----------------------------------------------------------------+   |
|                                                                       |
|  +--- 中间件 ----------------------------------------------------+   |
|  |                                                                |   |
|  |  CORS中间件     请求日志     限流中间件     认证中间件          |   |
|  +----------------------------------------------------------------+   |
|                                                                       |
+=======================================================================+

完整后端代码

python
"""
FastAPI 后端服务 - 智能客服系统完整API
"""
import os
import json
import uuid
import time
import asyncio
from typing import Optional, List, Dict, Any, AsyncGenerator
from datetime import datetime
from contextlib import asynccontextmanager

# FastAPI 相关
from fastapi import FastAPI, HTTPException, WebSocket, WebSocketDisconnect, UploadFile, File
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import StreamingResponse, JSONResponse
from pydantic import BaseModel, Field


# ============================================================
# 请求/响应模型
# ============================================================
class ChatRequestModel(BaseModel):
    session_id: Optional[str] = Field(default=None, description="会话ID")
    message: str = Field(..., min_length=1, max_length=2000, description="用户消息")
    user_id: Optional[str] = Field(default=None, description="用户ID")
    stream: bool = Field(default=False, description="是否流式返回")


class ChatResponseModel(BaseModel):
    session_id: str
    reply: str
    intent: str = "unknown"
    sources: List[Dict] = []
    tool_calls: List[Dict] = []
    confidence: float = 0.0
    need_human: bool = False
    timestamp: str = ""


class FeedbackModel(BaseModel):
    session_id: str
    message_index: int = -1
    rating: int = Field(..., ge=1, le=5, description="1-5分评价")
    comment: Optional[str] = None


class SessionInfoModel(BaseModel):
    session_id: str
    user_id: str
    message_count: int
    created_at: str
    last_active: str
    is_transferred: bool


# ============================================================
# 核心服务类 (简化版,整合前面的模块)
# ============================================================
class ChatService:
    """聊天服务 - 整合所有AI能力"""

    def __init__(self):
        self.sessions: Dict[str, Dict] = {}      # 内存会话存储
        self.feedback_log: List[Dict] = []         # 反馈日志
        print("[ChatService] 初始化完成")

    def get_or_create_session(self, session_id: Optional[str], user_id: str = "") -> str:
        """获取或创建会话"""
        if session_id and session_id in self.sessions:
            self.sessions[session_id]["last_active"] = datetime.now().isoformat()
            return session_id

        new_id = session_id or str(uuid.uuid4())
        self.sessions[new_id] = {
            "session_id": new_id,
            "user_id": user_id,
            "messages": [],
            "context": {},
            "created_at": datetime.now().isoformat(),
            "last_active": datetime.now().isoformat(),
            "is_transferred": False,
        }
        return new_id

    def process_message(self, session_id: str, message: str, user_id: str = "") -> Dict:
        """处理用户消息并返回回复"""
        session = self.sessions.get(session_id, {})
        messages = session.get("messages", [])

        # 添加用户消息
        messages.append({"role": "user", "content": message, "time": datetime.now().isoformat()})

        # === 意图识别 (简化版规则) ===
        intent = self._classify_intent(message)

        # === 生成回复 ===
        reply, sources, tool_calls = self._generate_reply(message, intent, messages, user_id)

        # 添加助手回复
        messages.append({"role": "assistant", "content": reply, "time": datetime.now().isoformat()})
        session["messages"] = messages
        self.sessions[session_id] = session

        return {
            "reply": reply,
            "intent": intent,
            "sources": sources,
            "tool_calls": tool_calls,
            "confidence": 0.9 if intent != "unknown" else 0.5,
            "need_human": intent == "complaint",
        }

    def _classify_intent(self, message: str) -> str:
        """简化意图分类"""
        keywords_map = {
            "order_query": ["订单", "查询", "查一下", "物流", "快递", "发货"],
            "refund": ["退款", "退货", "退钱", "退回"],
            "complaint": ["投诉", "不满", "差评", "垃圾"],
            "faq": ["怎么", "如何", "什么是", "政策", "规则", "保修"],
            "chitchat": ["你好", "谢谢", "再见", "哈哈"],
        }
        for intent, keywords in keywords_map.items():
            for kw in keywords:
                if kw in message:
                    return intent
        return "unknown"

    def _generate_reply(
        self, message: str, intent: str, history: list, user_id: str
    ) -> tuple:
        """根据意图生成回复"""
        sources = []
        tool_calls = []

        reply_map = {
            "order_query": "请提供您的订单号,我帮您查询订单状态和物流信息。",
            "refund": "好的,请提供订单号和退款原因。我们支持7天无理由退款,审核通过后3-5个工作日到账。",
            "complaint": "非常抱歉给您带来不好的体验。我已记录您的反馈,马上为您转接人工客服处理。",
            "faq": "这是一个好问题!让我在知识库中为您查找相关信息...",
            "chitchat": "您好!我是智能客服助手,随时为您服务。您可以问我订单查询、退款、产品信息等问题。",
            "unknown": "感谢您的咨询!请问您是想查询订单、申请退款,还是有其他问题需要帮助?",
        }

        reply = reply_map.get(intent, reply_map["unknown"])

        # 模拟RAG引用来源
        if intent == "faq":
            sources = [
                {"doc_title": "产品FAQ.md", "chunk_text": "相关知识库内容...", "score": 0.92},
            ]

        return reply, sources, tool_calls

    async def process_message_stream(
        self, session_id: str, message: str, user_id: str = ""
    ) -> AsyncGenerator[str, None]:
        """流式处理消息 (SSE)"""
        result = self.process_message(session_id, message, user_id)
        reply = result["reply"]

        # 模拟逐字输出
        for i, char in enumerate(reply):
            chunk_data = {"type": "token", "content": char, "index": i}
            yield f"data: {json.dumps(chunk_data, ensure_ascii=False)}\n\n"
            await asyncio.sleep(0.03)  # 模拟生成延迟

        # 发送完成信号
        done_data = {
            "type": "done",
            "session_id": session_id,
            "intent": result["intent"],
            "sources": result["sources"],
        }
        yield f"data: {json.dumps(done_data, ensure_ascii=False)}\n\n"


# ============================================================
# FastAPI 应用
# ============================================================

# 全局服务实例
chat_service = ChatService()


@asynccontextmanager
async def lifespan(app: FastAPI):
    """应用生命周期管理"""
    print("[启动] 智能客服系统后端")
    yield
    print("[关闭] 清理资源...")


app = FastAPI(
    title="智能客服系统 API",
    description="基于 RAG + Agent 的智能客服后端服务",
    version="1.0.0",
    lifespan=lifespan,
)

# CORS 中间件
app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)


# --- 健康检查 ---
@app.get("/api/v1/health")
async def health_check():
    return {
        "status": "healthy",
        "timestamp": datetime.now().isoformat(),
        "active_sessions": len(chat_service.sessions),
    }


# --- 主对话接口 ---
@app.post("/api/v1/chat", response_model=ChatResponseModel)
async def chat(request: ChatRequestModel):
    """
    主对话接口

    接收用户消息,返回AI回复。
    支持多轮对话,自动管理会话状态。
    """
    try:
        # 获取或创建会话
        session_id = chat_service.get_or_create_session(
            request.session_id, request.user_id or ""
        )

        # 处理消息
        result = chat_service.process_message(
            session_id, request.message, request.user_id or ""
        )

        return ChatResponseModel(
            session_id=session_id,
            reply=result["reply"],
            intent=result["intent"],
            sources=result["sources"],
            tool_calls=result["tool_calls"],
            confidence=result["confidence"],
            need_human=result["need_human"],
            timestamp=datetime.now().isoformat(),
        )
    except Exception as e:
        raise HTTPException(status_code=500, detail=f"处理消息失败: {str(e)}")


# --- 流式对话接口 (SSE) ---
@app.post("/api/v1/chat/stream")
async def chat_stream(request: ChatRequestModel):
    """流式对话接口 - Server-Sent Events"""
    session_id = chat_service.get_or_create_session(
        request.session_id, request.user_id or ""
    )
    return StreamingResponse(
        chat_service.process_message_stream(session_id, request.message, request.user_id or ""),
        media_type="text/event-stream",
        headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"},
    )


# --- 获取会话信息 ---
@app.get("/api/v1/session/{session_id}", response_model=SessionInfoModel)
async def get_session(session_id: str):
    """获取会话详细信息"""
    session = chat_service.sessions.get(session_id)
    if not session:
        raise HTTPException(status_code=404, detail="会话不存在")

    return SessionInfoModel(
        session_id=session["session_id"],
        user_id=session.get("user_id", ""),
        message_count=len(session.get("messages", [])),
        created_at=session.get("created_at", ""),
        last_active=session.get("last_active", ""),
        is_transferred=session.get("is_transferred", False),
    )


# --- 删除会话 ---
@app.delete("/api/v1/session/{session_id}")
async def delete_session(session_id: str):
    if session_id in chat_service.sessions:
        del chat_service.sessions[session_id]
        return {"message": f"会话 {session_id} 已删除"}
    raise HTTPException(status_code=404, detail="会话不存在")


# --- 用户反馈 ---
@app.post("/api/v1/feedback")
async def submit_feedback(feedback: FeedbackModel):
    """提交对话反馈评价"""
    chat_service.feedback_log.append({
        "session_id": feedback.session_id,
        "rating": feedback.rating,
        "comment": feedback.comment,
        "timestamp": datetime.now().isoformat(),
    })
    return {"message": "感谢您的反馈!", "feedback_id": str(uuid.uuid4())}


# --- WebSocket 实时对话 ---
class ConnectionManager:
    """WebSocket连接管理器"""

    def __init__(self):
        self.active_connections: Dict[str, WebSocket] = {}

    async def connect(self, session_id: str, websocket: WebSocket):
        await websocket.accept()
        self.active_connections[session_id] = websocket

    def disconnect(self, session_id: str):
        self.active_connections.pop(session_id, None)

    async def send_message(self, session_id: str, message: Dict):
        ws = self.active_connections.get(session_id)
        if ws:
            await ws.send_json(message)


ws_manager = ConnectionManager()


@app.websocket("/ws/chat")
async def websocket_chat(websocket: WebSocket):
    """WebSocket实时对话端点"""
    session_id = str(uuid.uuid4())
    await ws_manager.connect(session_id, websocket)
    chat_service.get_or_create_session(session_id)

    # 发送欢迎消息
    await ws_manager.send_message(session_id, {
        "type": "connected",
        "session_id": session_id,
        "message": "连接成功!我是智能客服助手,请问有什么可以帮您?",
    })

    try:
        while True:
            data = await websocket.receive_json()
            user_message = data.get("message", "")

            if not user_message:
                continue

            # 处理消息
            result = chat_service.process_message(session_id, user_message)

            # 返回回复
            await ws_manager.send_message(session_id, {
                "type": "reply",
                "session_id": session_id,
                "reply": result["reply"],
                "intent": result["intent"],
                "sources": result["sources"],
                "need_human": result["need_human"],
            })

    except WebSocketDisconnect:
        ws_manager.disconnect(session_id)
        print(f"[WebSocket断开] session={session_id}")


# ============================================================
# 启动入口
# ============================================================
if __name__ == "__main__":
    import uvicorn

    print("=" * 60)
    print("  智能客服系统 - FastAPI 后端")
    print("  API文档: http://localhost:8000/docs")
    print("  健康检查: http://localhost:8000/api/v1/health")
    print("=" * 60)

    uvicorn.run(
        "01_chatbot:app",  # 或使用 app 直接传入
        host="0.0.0.0",
        port=8000,
        reload=True,
        log_level="info",
    )

前端对话界面

前端架构

+=======================================================================+
|                        前端界面架构                                     |
+=======================================================================+
|                                                                       |
|  +--------------------------------------------------------------+    |
|  |                      对话界面布局                              |    |
|  +--------------------------------------------------------------+    |
|  |  +------------------+  +----------------------------------+  |    |
|  |  |                  |  |          对话区域                 |  |    |
|  |  |  侧边栏          |  |  +----------------------------+  |  |    |
|  |  |  +------------+  |  |  | [系统] 您好,有什么帮您?   |  |  |    |
|  |  |  | 会话列表   |  |  |  +----------------------------+  |  |    |
|  |  |  |            |  |  |  +----------------------------+  |  |    |
|  |  |  | > 会话1    |  |  |  | [用户] 查一下我的订单       |  |  |    |
|  |  |  |   会话2    |  |  |  +----------------------------+  |  |    |
|  |  |  |   会话3    |  |  |  +----------------------------+  |  |    |
|  |  |  +------------+  |  |  | [助手] 好的,订单信息...    |  |  |    |
|  |  |  +------------+  |  |  |  [来源: FAQ.md]             |  |  |    |
|  |  |  | 新建对话   |  |  |  +----------------------------+  |  |    |
|  |  |  +------------+  |  |                                  |  |    |
|  |  +------------------+  |  +----------------------------+  |  |    |
|  |                        |  | 输入框        [发送] [语音] |  |  |    |
|  |                        |  +----------------------------+  |  |    |
|  |                        +----------------------------------+  |    |
|  +--------------------------------------------------------------+    |
|                                                                       |
+=======================================================================+

HTML + JavaScript 前端

python
"""
前端对话界面 - 使用 FastAPI 内置模板引擎提供简单的聊天界面
实际生产中建议使用 React/Vue 独立前端项目
"""

# 以下为嵌入式HTML模板,通过FastAPI的Jinja2渲染
CHAT_HTML_TEMPLATE = """
<!DOCTYPE html>
<html lang="zh-CN">
<head>
    <meta charset="UTF-8">
    <meta name="viewport" content="width=device-width, initial-scale=1.0">
    <title>智能客服系统</title>
    <style>
        * { margin: 0; padding: 0; box-sizing: border-box; }
        body {
            font-family: -apple-system, BlinkMacSystemFont, 'Segoe UI', sans-serif;
            background: #f5f5f5;
            height: 100vh;
            display: flex;
            justify-content: center;
            align-items: center;
        }
        .chat-container {
            width: 480px; height: 680px;
            background: white; border-radius: 16px;
            box-shadow: 0 4px 24px rgba(0,0,0,0.1);
            display: flex; flex-direction: column;
            overflow: hidden;
        }
        .chat-header {
            background: linear-gradient(135deg, #667eea, #764ba2);
            color: white; padding: 16px 20px;
            font-size: 16px; font-weight: 600;
        }
        .chat-header small { opacity: 0.8; font-weight: 400; }
        .chat-messages {
            flex: 1; overflow-y: auto; padding: 16px;
            display: flex; flex-direction: column; gap: 12px;
        }
        .message { max-width: 80%; padding: 10px 14px; border-radius: 12px;
                    line-height: 1.6; font-size: 14px; word-break: break-word; }
        .message.user { align-self: flex-end; background: #667eea; color: white; }
        .message.assistant { align-self: flex-start; background: #f0f0f0; color: #333; }
        .message .source { font-size: 12px; color: #888; margin-top: 6px;
                           border-top: 1px solid #ddd; padding-top: 4px; }
        .chat-input {
            padding: 12px 16px; border-top: 1px solid #eee;
            display: flex; gap: 8px;
        }
        .chat-input input {
            flex: 1; padding: 10px 14px; border: 1px solid #ddd;
            border-radius: 8px; font-size: 14px; outline: none;
        }
        .chat-input input:focus { border-color: #667eea; }
        .chat-input button {
            padding: 10px 20px; background: #667eea; color: white;
            border: none; border-radius: 8px; cursor: pointer; font-size: 14px;
        }
        .chat-input button:hover { background: #5a6fd6; }
        .typing { color: #999; font-style: italic; font-size: 13px; }
    </style>
</head>
<body>
    <div class="chat-container">
        <div class="chat-header">
            智能客服助手
            <br><small>在线 | 基于RAG+Agent技术</small>
        </div>
        <div class="chat-messages" id="messages">
            <div class="message assistant">
                您好!我是智能客服助手,可以帮您:<br>
                - 查询订单状态<br>
                - 申请退款退货<br>
                - 解答产品问题<br>
                请问有什么可以帮您?
            </div>
        </div>
        <div class="chat-input">
            <input type="text" id="userInput" placeholder="请输入您的问题..."
                   onkeypress="if(event.key==='Enter') sendMessage()">
            <button onclick="sendMessage()">发送</button>
        </div>
    </div>

    <script>
        let sessionId = null;
        const API_BASE = '/api/v1';

        async function sendMessage() {
            const input = document.getElementById('userInput');
            const message = input.value.trim();
            if (!message) return;

            // 显示用户消息
            appendMessage('user', message);
            input.value = '';

            // 显示"正在输入"
            const typingEl = showTyping();

            try {
                const response = await fetch(`${API_BASE}/chat`, {
                    method: 'POST',
                    headers: { 'Content-Type': 'application/json' },
                    body: JSON.stringify({
                        session_id: sessionId,
                        message: message,
                        user_id: 'web-user'
                    })
                });
                const data = await response.json();
                sessionId = data.session_id;

                // 移除"正在输入"
                typingEl.remove();

                // 显示回复
                let replyHtml = data.reply;
                if (data.sources && data.sources.length > 0) {
                    replyHtml += '<div class="source">来源: ' +
                        data.sources.map(s => s.doc_title).join(', ') + '</div>';
                }
                appendMessage('assistant', replyHtml, true);

                if (data.need_human) {
                    appendMessage('assistant',
                        '正在为您转接人工客服,请稍候...', true);
                }
            } catch (error) {
                typingEl.remove();
                appendMessage('assistant', '抱歉,网络连接异常,请稍后重试。', true);
            }
        }

        function appendMessage(role, content, isHtml = false) {
            const div = document.createElement('div');
            div.className = `message ${role}`;
            if (isHtml) div.innerHTML = content;
            else div.textContent = content;
            document.getElementById('messages').appendChild(div);
            div.scrollIntoView({ behavior: 'smooth' });
        }

        function showTyping() {
            const div = document.createElement('div');
            div.className = 'typing';
            div.textContent = '客服助手正在输入...';
            document.getElementById('messages').appendChild(div);
            div.scrollIntoView({ behavior: 'smooth' });
            return div;
        }
    </script>
</body>
</html>
"""


def setup_frontend_route(app):
    """在FastAPI应用上注册前端路由"""
    from fastapi.responses import HTMLResponse

    @app.get("/", response_class=HTMLResponse)
    async def chat_page():
        return CHAT_HTML_TEMPLATE

    print("[前端] 已注册聊天页面路由: GET /")


# 使用示例:
# setup_frontend_route(app)   # 在 FastAPI app 上注册

部署与监控

部署架构

+=======================================================================+
|                         生产部署架构                                    |
+=======================================================================+
|                                                                       |
|                          互联网用户                                    |
|                              |                                        |
|                              v                                        |
|                     +--------+--------+                               |
|                     |   CDN / 域名    |                               |
|                     +--------+--------+                               |
|                              |                                        |
|                              v                                        |
|                     +--------+--------+                               |
|                     |     Nginx       |   SSL终止                     |
|                     |  反向代理+负载均衡|   静态文件                    |
|                     +--------+--------+                               |
|                              |                                        |
|              +---------------+---------------+                        |
|              |                               |                        |
|              v                               v                        |
|     +--------+--------+            +--------+--------+               |
|     |  FastAPI 实例1   |            |  FastAPI 实例2  |               |
|     |  (uvicorn:8001)  |            |  (uvicorn:8002) |               |
|     +--------+--------+            +--------+--------+               |
|              |                               |                        |
|              +---------------+---------------+                        |
|                              |                                        |
|         +--------------------+--------------------+                   |
|         |                    |                    |                   |
|         v                    v                    v                   |
|  +------+------+    +-------+------+    +--------+-----+            |
|  |   Redis     |    | PostgreSQL   |    |  ChromaDB    |            |
|  | (会话/缓存) |    | (业务数据)   |    | (向量存储)   |            |
|  +-------------+    +--------------+    +--------------+            |
|                                                                       |
+=======================================================================+

Docker Compose 配置

python
"""
Docker Compose 配置生成器
生成用于部署智能客服系统的 docker-compose.yml
"""

DOCKER_COMPOSE_YAML = """
# docker-compose.yml - 智能客服系统
version: '3.8'

services:
  # FastAPI 后端服务
  chatbot-api:
    build:
      context: .
      dockerfile: Dockerfile
    ports:
      - "8000:8000"
    environment:
      - OPENAI_API_KEY=${OPENAI_API_KEY}
      - OPENAI_BASE_URL=${OPENAI_BASE_URL:-https://api.openai.com/v1}
      - REDIS_HOST=redis
      - REDIS_PORT=6379
      - DB_URL=postgresql://chatbot:password@postgres:5432/chatbot_db
    depends_on:
      - redis
      - postgres
    volumes:
      - chroma_data:/app/data/chroma_db
    restart: unless-stopped
    healthcheck:
      test: ["CMD", "curl", "-f", "http://localhost:8000/api/v1/health"]
      interval: 30s
      timeout: 10s
      retries: 3

  # Redis - 会话存储和缓存
  redis:
    image: redis:7-alpine
    ports:
      - "6379:6379"
    volumes:
      - redis_data:/data
    command: redis-server --appendonly yes
    restart: unless-stopped

  # PostgreSQL - 业务数据
  postgres:
    image: postgres:15-alpine
    environment:
      POSTGRES_DB: chatbot_db
      POSTGRES_USER: chatbot
      POSTGRES_PASSWORD: password
    ports:
      - "5432:5432"
    volumes:
      - postgres_data:/var/lib/postgresql/data
    restart: unless-stopped

  # Nginx - 反向代理
  nginx:
    image: nginx:alpine
    ports:
      - "80:80"
      - "443:443"
    volumes:
      - ./nginx.conf:/etc/nginx/conf.d/default.conf
    depends_on:
      - chatbot-api
    restart: unless-stopped

volumes:
  chroma_data:
  redis_data:
  postgres_data:
"""

DOCKERFILE_CONTENT = """
# Dockerfile - 智能客服系统
FROM python:3.11-slim

WORKDIR /app

# 安装系统依赖
RUN apt-get update && apt-get install -y --no-install-recommends \\
    curl gcc && \\
    rm -rf /var/lib/apt/lists/*

# 安装Python依赖
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

# 复制代码
COPY . .

# 创建数据目录
RUN mkdir -p /app/data/chroma_db /app/logs

EXPOSE 8000

CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000", "--workers", "2"]
"""

NGINX_CONF = """
# nginx.conf
upstream chatbot_api {
    server chatbot-api:8000;
}

server {
    listen 80;
    server_name localhost;

    # API 代理
    location /api/ {
        proxy_pass http://chatbot_api;
        proxy_set_header Host $host;
        proxy_set_header X-Real-IP $remote_addr;
        proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
    }

    # WebSocket 代理
    location /ws/ {
        proxy_pass http://chatbot_api;
        proxy_http_version 1.1;
        proxy_set_header Upgrade $http_upgrade;
        proxy_set_header Connection "upgrade";
        proxy_set_header Host $host;
    }

    # 前端页面
    location / {
        proxy_pass http://chatbot_api;
    }
}
"""


def generate_deployment_files(output_dir: str = "."):
    """生成所有部署配置文件"""
    import os
    os.makedirs(output_dir, exist_ok=True)

    files = {
        "docker-compose.yml": DOCKER_COMPOSE_YAML.strip(),
        "Dockerfile": DOCKERFILE_CONTENT.strip(),
        "nginx.conf": NGINX_CONF.strip(),
    }

    for filename, content in files.items():
        filepath = os.path.join(output_dir, filename)
        with open(filepath, "w", encoding="utf-8") as f:
            f.write(content)
        print(f"  [生成] {filepath}")

    print(f"\n[部署文件就绪] 运行命令: docker compose up -d")


if __name__ == "__main__":
    print("=== 生成部署配置 ===")
    generate_deployment_files("./deploy")
    print()
    print("=== 部署步骤 ===")
    print("1. 设置环境变量: export OPENAI_API_KEY=your-key")
    print("2. 构建并启动:   docker compose up -d --build")
    print("3. 查看日志:     docker compose logs -f chatbot-api")
    print("4. 健康检查:     curl http://localhost/api/v1/health")
    print("5. 打开界面:     浏览器访问 http://localhost")

监控与日志

python
"""
监控与日志系统 - 对话质量评估和系统指标监控
"""
import time
import json
from typing import Dict, List, Optional, Any
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from collections import defaultdict


@dataclass
class MetricPoint:
    """指标数据点"""
    name: str
    value: float
    timestamp: str = ""
    labels: Dict[str, str] = field(default_factory=dict)

    def __post_init__(self):
        if not self.timestamp:
            self.timestamp = datetime.now().isoformat()


class ChatbotMetrics:
    """
    客服系统监控指标收集器

    收集的指标:
    - 请求量/QPS
    - 响应延迟
    - 意图分布
    - 工具调用成功率
    - 用户满意度
    - 转人工率
    """

    def __init__(self):
        self.request_count = 0
        self.total_latency = 0.0
        self.intent_counter: Dict[str, int] = defaultdict(int)
        self.tool_call_counter: Dict[str, int] = defaultdict(int)
        self.tool_error_counter: Dict[str, int] = defaultdict(int)
        self.feedback_scores: List[int] = []
        self.human_transfer_count = 0
        self.start_time = time.time()

    def record_request(self, latency_ms: float, intent: str, tool_calls: List[str] = None, need_human: bool = False):
        """记录一次请求"""
        self.request_count += 1
        self.total_latency += latency_ms
        self.intent_counter[intent] += 1
        if need_human:
            self.human_transfer_count += 1
        if tool_calls:
            for tool in tool_calls:
                self.tool_call_counter[tool] += 1

    def record_tool_error(self, tool_name: str):
        """记录工具调用错误"""
        self.tool_error_counter[tool_name] += 1

    def record_feedback(self, score: int):
        """记录用户评分"""
        self.feedback_scores.append(score)

    def get_summary(self) -> Dict[str, Any]:
        """获取监控摘要"""
        uptime = time.time() - self.start_time
        avg_latency = (self.total_latency / self.request_count) if self.request_count > 0 else 0
        avg_score = (sum(self.feedback_scores) / len(self.feedback_scores)) if self.feedback_scores else 0
        qps = self.request_count / uptime if uptime > 0 else 0

        return {
            "uptime_seconds": round(uptime, 1),
            "total_requests": self.request_count,
            "qps": round(qps, 2),
            "avg_latency_ms": round(avg_latency, 1),
            "intent_distribution": dict(self.intent_counter),
            "tool_calls": dict(self.tool_call_counter),
            "tool_errors": dict(self.tool_error_counter),
            "human_transfer_rate": round(
                self.human_transfer_count / max(self.request_count, 1) * 100, 1
            ),
            "avg_feedback_score": round(avg_score, 2),
            "total_feedbacks": len(self.feedback_scores),
        }

    def print_dashboard(self):
        """打印监控面板"""
        s = self.get_summary()
        print()
        print("+" + "=" * 58 + "+")
        print("|{:^58s}|".format("智能客服系统 - 监控面板"))
        print("+" + "=" * 58 + "+")
        print(f"| {'运行时间':<16s} | {s['uptime_seconds']:.0f}{'':<27s}|")
        print(f"| {'总请求数':<16s} | {s['total_requests']:<33d}|")
        print(f"| {'QPS':<18s} | {s['qps']:<33.2f}|")
        print(f"| {'平均延迟':<16s} | {s['avg_latency_ms']:.1f} ms{'':<26s}|")
        print(f"| {'转人工率':<16s} | {s['human_transfer_rate']:.1f}%{'':<27s}|")
        print(f"| {'平均满意度':<14s} | {s['avg_feedback_score']:.2f} / 5.00{'':<20s}|")
        print("+" + "-" * 58 + "+")
        print(f"| {'意图分布':<16s} |{'':<34s}|")
        for intent, count in s["intent_distribution"].items():
            pct = count / max(s["total_requests"], 1) * 100
            bar = "#" * int(pct / 5)
            print(f"|   {intent:<14s} | {count:>4d} ({pct:>5.1f}%) {bar:<18s}|")
        print("+" + "=" * 58 + "+")


# ============================================================
# 使用示例
# ============================================================
if __name__ == "__main__":
    metrics = ChatbotMetrics()

    # 模拟一些请求数据
    import random
    intents = ["faq", "order_query", "refund", "chitchat", "complaint"]
    for i in range(100):
        intent = random.choice(intents)
        latency = random.uniform(100, 800)
        tools = ["query_order"] if intent == "order_query" else []
        need_human = intent == "complaint" and random.random() > 0.5
        metrics.record_request(latency, intent, tools, need_human)

    # 模拟反馈
    for _ in range(30):
        metrics.record_feedback(random.choice([3, 4, 4, 5, 5, 5]))

    # 打印面板
    metrics.print_dashboard()

总结

本教程完整实现了一个基于 RAG + Agent 的智能客服系统,涵盖以下核心内容:

  1. 项目概述: 系统整体架构设计、技术栈选择、核心功能规划
  2. 系统架构: 分层架构(接入层/业务层/能力层/数据层)、请求处理流程、数据模型定义
  3. RAG知识库: 文档加载/分块/向量化、ChromaDB语义检索、RAG问答链
  4. 多轮对话: Redis会话管理、滑动窗口策略、上下文维护
  5. Agent工具: OpenAI Function Calling、工具定义/注册/执行、安全确认机制
  6. FastAPI后端: REST API + SSE流式 + WebSocket实时对话
  7. 前端界面: 嵌入式HTML/CSS/JS聊天界面
  8. 部署监控: Docker Compose部署、Nginx反向代理、指标收集与监控面板

最佳实践

  1. 知识库管理: 定期更新知识库内容,监控检索命中率,对低质量回答进行人工标注
  2. Prompt工程: 系统提示词要明确角色、规则和限制,用few-shot示例提升回答质量
  3. 安全防护: 敏感操作二次确认,用户身份验证,输入内容过滤,防止Prompt注入
  4. 性能优化: 向量检索结果缓存,LLM响应流式输出,异步处理非关键任务
  5. 持续改进: 收集用户反馈,分析Bad Case,定期优化Prompt和检索策略

参考资源


创建时间: 2024-01-01 最后更新: 2024-01-01

💬 讨论

使用 GitHub 账号登录后即可参与讨论

基于 MIT 许可发布