AI系统监控完整教程
目录
监控概述
为什么需要AI系统监控
AI系统与传统软件不同,模型性能会随时间衰退(数据漂移),推理延迟受负载影响大, GPU资源昂贵且需要精细管理。完善的监控体系是AI系统可靠运行的基础。
┌─────────────────────────────────────────────────────────────────────────────┐
│ AI系统监控全景架构 │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ 数据采集层 │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │ 应用指标 │ │ 系统指标 │ │ GPU指标 │ │ 业务指标 │ │ 模型指标 │ │
│ │ │ │ │ │ │ │ │ │ │ │
│ │ QPS │ │ CPU │ │ 利用率 │ │ 转化率 │ │ 准确率 │ │
│ │ 延迟 │ │ 内存 │ │ 显存 │ │ 用户满意 │ │ 漂移度 │ │
│ │ 错误率 │ │ 磁盘 │ │ 温度 │ │ 收入 │ │ 公平性 │ │
│ │ Token数 │ │ 网络 │ │ 功耗 │ │ 活跃度 │ │ 幻觉率 │ │
│ └────┬─────┘ └────┬─────┘ └────┬─────┘ └────┬─────┘ └────┬─────┘ │
│ └────────────┴────────────┴────────────┴────────────┘ │
│ │ │
│ ▼ │
│ 存储与处理层 │
│ ┌─────────────────────────────────────────────────────────────────────┐ │
│ │ │ │
│ │ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │ │
│ │ │ Prometheus │ │ Elasticsearch│ │ ClickHouse │ │ │
│ │ │ 时序指标 │ │ 日志存储 │ │ 分析查询 │ │ │
│ │ │ 15s采集间隔 │ │ 全文检索 │ │ 长期存储 │ │ │
│ │ └──────┬───────┘ └──────┬───────┘ └──────┬───────┘ │ │
│ │ │ │ │ │ │
│ └─────────┼─────────────────┼──────────────────┼──────────────────────┘ │
│ │ │ │ │
│ ▼ ▼ ▼ │
│ 展示与告警层 │
│ ┌─────────────────────────────────────────────────────────────────────┐ │
│ │ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │ │
│ │ │ Grafana │ │ Alertmanager│ │ Kibana │ │ │
│ │ │ 仪表盘 │ │ 告警路由 │ │ 日志查询 │ │ │
│ │ │ 实时可视化 │ │ 去重/分组 │ │ 日志分析 │ │ │
│ │ └──────────────┘ └──────┬───────┘ └──────────────┘ │ │
│ │ │ │ │
│ │ ┌──────▼───────┐ │ │
│ │ │ 通知渠道 │ │ │
│ │ │ Slack/钉钉 │ │ │
│ │ │ Email/短信 │ │ │
│ │ │ PagerDuty │ │ │
│ │ └──────────────┘ │ │
│ └─────────────────────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────────────────────┘AI监控指标体系
四层指标模型
┌─────────────────────────────────────────────────────────────────────────────┐
│ AI系统四层指标模型 │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ Layer 1: 基础设施指标 (Infrastructure) │
│ ┌─────────────────────────────────────────────────────────────────────┐ │
│ │ CPU使用率 | 内存使用 | 磁盘IO | 网络带宽 | GPU利用率 | GPU显存 │ │
│ │ 容器状态 | Pod重启次数 | 节点健康度 │ │
│ └─────────────────────────────────────────────────────────────────────┘ │
│ │
│ Layer 2: 应用性能指标 (Application) │
│ ┌─────────────────────────────────────────────────────────────────────┐ │
│ │ 请求QPS | 延迟(P50/P95/P99) | 错误率 | 吞吐量(tokens/s) │ │
│ │ TTFT(首Token时间) | 并发连接数 | 请求队列深度 │ │
│ └─────────────────────────────────────────────────────────────────────┘ │
│ │
│ Layer 3: 模型质量指标 (Model Quality) │
│ ┌─────────────────────────────────────────────────────────────────────┐ │
│ │ 准确率 | F1分数 | 数据漂移(PSI/KL散度) | 幻觉率 | 拒答率 │ │
│ │ 输出多样性 | 上下文利用率 | 指令遵循度 │ │
│ └─────────────────────────────────────────────────────────────────────┘ │
│ │
│ Layer 4: 业务效果指标 (Business) │
│ ┌─────────────────────────────────────────────────────────────────────┐ │
│ │ 用户满意度 | 任务完成率 | 对话轮次 | 人工介入率 | Token成本 │ │
│ │ 收入影响 | 用户留存 | 转化率 │ │
│ └─────────────────────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────────────────────┘指标体系代码实现
python
"""
AI系统监控指标体系
包含: 指标定义、采集、聚合、健康评分
"""
import time
import random
import statistics
from typing import Dict, List, Optional
from dataclasses import dataclass, field
from enum import Enum
from collections import defaultdict
class MetricType(Enum):
COUNTER = "counter" # 累计值 (只增不减)
GAUGE = "gauge" # 瞬时值 (可增可减)
HISTOGRAM = "histogram" # 分布 (百分位数)
SUMMARY = "summary" # 摘要
class MetricLevel(Enum):
INFRASTRUCTURE = "基础设施"
APPLICATION = "应用性能"
MODEL_QUALITY = "模型质量"
BUSINESS = "业务效果"
@dataclass
class MetricDefinition:
"""指标定义"""
name: str
type: MetricType
level: MetricLevel
description: str
unit: str = ""
warning_threshold: float = 0
critical_threshold: float = 0
labels: List[str] = field(default_factory=list)
@dataclass
class MetricSample:
"""指标采样值"""
name: str
value: float
timestamp: float
labels: Dict[str, str] = field(default_factory=dict)
class MetricsRegistry:
"""
指标注册与采集中心
管理所有监控指标的定义、采样和查询
"""
def __init__(self):
self.definitions: Dict[str, MetricDefinition] = {}
self.samples: Dict[str, List[MetricSample]] = defaultdict(list)
self.counters: Dict[str, float] = defaultdict(float)
self._register_ai_metrics()
def _register_ai_metrics(self):
"""注册AI系统标准指标"""
metrics = [
# 基础设施层
MetricDefinition(
"gpu_utilization", MetricType.GAUGE, MetricLevel.INFRASTRUCTURE,
"GPU计算利用率", "%", warning_threshold=85, critical_threshold=95,
),
MetricDefinition(
"gpu_memory_used", MetricType.GAUGE, MetricLevel.INFRASTRUCTURE,
"GPU显存使用量", "GB", warning_threshold=70, critical_threshold=90,
),
MetricDefinition(
"gpu_temperature", MetricType.GAUGE, MetricLevel.INFRASTRUCTURE,
"GPU温度", "C", warning_threshold=80, critical_threshold=90,
),
# 应用性能层
MetricDefinition(
"request_latency_ms", MetricType.HISTOGRAM, MetricLevel.APPLICATION,
"请求延迟", "ms", warning_threshold=2000, critical_threshold=5000,
),
MetricDefinition(
"ttft_ms", MetricType.HISTOGRAM, MetricLevel.APPLICATION,
"首Token延迟", "ms", warning_threshold=500, critical_threshold=1000,
),
MetricDefinition(
"requests_total", MetricType.COUNTER, MetricLevel.APPLICATION,
"请求总数", "",
),
MetricDefinition(
"errors_total", MetricType.COUNTER, MetricLevel.APPLICATION,
"错误总数", "",
),
MetricDefinition(
"tokens_per_second", MetricType.GAUGE, MetricLevel.APPLICATION,
"生成吞吐量", "tokens/s",
),
# 模型质量层
MetricDefinition(
"model_accuracy", MetricType.GAUGE, MetricLevel.MODEL_QUALITY,
"模型准确率", "%", warning_threshold=85, critical_threshold=80,
),
MetricDefinition(
"data_drift_score", MetricType.GAUGE, MetricLevel.MODEL_QUALITY,
"数据漂移评分(PSI)", "", warning_threshold=0.1, critical_threshold=0.2,
),
MetricDefinition(
"hallucination_rate", MetricType.GAUGE, MetricLevel.MODEL_QUALITY,
"幻觉率", "%", warning_threshold=10, critical_threshold=20,
),
# 业务效果层
MetricDefinition(
"user_satisfaction", MetricType.GAUGE, MetricLevel.BUSINESS,
"用户满意度", "%", warning_threshold=80, critical_threshold=70,
),
MetricDefinition(
"task_completion_rate", MetricType.GAUGE, MetricLevel.BUSINESS,
"任务完成率", "%", warning_threshold=85, critical_threshold=75,
),
MetricDefinition(
"cost_per_request", MetricType.GAUGE, MetricLevel.BUSINESS,
"每请求成本", "USD",
),
]
for m in metrics:
self.definitions[m.name] = m
def record(self, name: str, value: float,
labels: Dict[str, str] = None):
"""记录指标值"""
if name not in self.definitions:
return
sample = MetricSample(
name=name, value=value,
timestamp=time.time(),
labels=labels or {},
)
self.samples[name].append(sample)
# Counter特殊处理
if self.definitions[name].type == MetricType.COUNTER:
key = f"{name}:{str(labels or {})}"
self.counters[key] += value
# 保留最近1小时的数据
cutoff = time.time() - 3600
self.samples[name] = [
s for s in self.samples[name] if s.timestamp > cutoff
]
def get_current(self, name: str) -> Optional[float]:
"""获取指标当前值"""
samples = self.samples.get(name, [])
if not samples:
return None
return samples[-1].value
def get_percentile(self, name: str, percentile: float,
minutes: int = 5) -> Optional[float]:
"""获取指标百分位数"""
cutoff = time.time() - minutes * 60
values = [
s.value for s in self.samples.get(name, [])
if s.timestamp > cutoff
]
if not values:
return None
sorted_vals = sorted(values)
idx = int(len(sorted_vals) * percentile / 100)
return sorted_vals[min(idx, len(sorted_vals) - 1)]
def check_health(self) -> Dict:
"""检查所有指标健康状态"""
health = {"status": "healthy", "checks": []}
for name, defn in self.definitions.items():
current = self.get_current(name)
if current is None:
continue
status = "ok"
# 对于延迟等越大越差的指标
if defn.level in (MetricLevel.INFRASTRUCTURE, MetricLevel.APPLICATION):
if defn.critical_threshold and current > defn.critical_threshold:
status = "critical"
elif defn.warning_threshold and current > defn.warning_threshold:
status = "warning"
# 对于准确率等越大越好的指标
elif defn.level in (MetricLevel.MODEL_QUALITY, MetricLevel.BUSINESS):
if defn.name in ("data_drift_score", "hallucination_rate",
"cost_per_request"):
if defn.critical_threshold and current > defn.critical_threshold:
status = "critical"
elif defn.warning_threshold and current > defn.warning_threshold:
status = "warning"
else:
if defn.critical_threshold and current < defn.critical_threshold:
status = "critical"
elif defn.warning_threshold and current < defn.warning_threshold:
status = "warning"
if status != "ok":
health["checks"].append({
"metric": name,
"value": current,
"status": status,
"description": defn.description,
})
if status == "critical":
health["status"] = "critical"
elif status == "warning" and health["status"] != "critical":
health["status"] = "warning"
return health
def get_dashboard_data(self) -> str:
"""生成仪表盘数据"""
lines = [
f"{'=' * 70}",
f"AI系统监控仪表盘",
f"{'=' * 70}",
]
for level in MetricLevel:
lines.append(f"\n[{level.value}]")
lines.append(f"{'─' * 50}")
for name, defn in self.definitions.items():
if defn.level != level:
continue
current = self.get_current(name)
if current is None:
continue
p95 = self.get_percentile(name, 95, minutes=5)
unit = f" {defn.unit}" if defn.unit else ""
p95_str = f" (P95: {p95:.1f}{unit})" if p95 is not None else ""
lines.append(
f" {defn.description:<20} {current:.2f}{unit}{p95_str}"
)
# 健康检查
health = self.check_health()
lines.append(f"\n{'=' * 70}")
lines.append(f"系统健康状态: {health['status'].upper()}")
if health["checks"]:
for check in health["checks"]:
lines.append(
f" [{check['status'].upper()}] {check['description']}: "
f"{check['value']:.2f}"
)
return "\n".join(lines)
def demo_metrics():
"""指标体系演示"""
registry = MetricsRegistry()
random.seed(42)
# 模拟30个采样点
for _ in range(30):
registry.record("gpu_utilization", random.uniform(60, 95))
registry.record("gpu_memory_used", random.uniform(50, 75))
registry.record("gpu_temperature", random.uniform(65, 85))
registry.record("request_latency_ms", random.uniform(200, 3000))
registry.record("ttft_ms", random.uniform(50, 600))
registry.record("tokens_per_second", random.uniform(80, 150))
registry.record("model_accuracy", random.uniform(82, 95))
registry.record("data_drift_score", random.uniform(0.01, 0.15))
registry.record("hallucination_rate", random.uniform(3, 15))
registry.record("user_satisfaction", random.uniform(75, 95))
registry.record("task_completion_rate", random.uniform(80, 96))
registry.record("cost_per_request", random.uniform(0.001, 0.01))
print(registry.get_dashboard_data())
if __name__ == "__main__":
demo_metrics()Prometheus指标采集
Prometheus采集架构
┌─────────────────────────────────────────────────────────────────────────────┐
│ Prometheus 指标采集架构 │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ 应用暴露指标端点 Prometheus Server │
│ ┌──────────────────┐ ┌────────────────────────┐ │
│ │ vLLM Service │ /metrics │ │ │
│ │ :8000/metrics │<─────── pull ────│ Scrape Config │ │
│ └──────────────────┘ │ scrape_interval: 15s │ │
│ ┌──────────────────┐ │ │ │
│ │ FastAPI App │ /metrics │ ┌──────────────────┐ │ │
│ │ :8080/metrics │<─────── pull ────│ │ TSDB │ │ │
│ └──────────────────┘ │ │ 时序数据库 │ │ │
│ ┌──────────────────┐ │ │ 15天数据保留 │ │ │
│ │ Node Exporter │ /metrics │ └──────────────────┘ │ │
│ │ :9100/metrics │<─────── pull ────│ │ │
│ └──────────────────┘ │ ┌──────────────────┐ │ │
│ ┌──────────────────┐ │ │ Alert Rules │ │ │
│ │ NVIDIA DCGM │ /metrics │ │ 告警规则引擎 │ │ │
│ │ :9400/metrics │<─────── pull ────│ └────────┬─────────┘ │ │
│ └──────────────────┘ └───────────┼────────────┘ │
│ │ │
│ ▼ │
│ ┌────────────────────────┐ │
│ │ Alertmanager │ │
│ │ :9093 │ │
│ │ 告警路由/去重/静默 │ │
│ └────────────────────────┘ │
│ │
│ PromQL查询示例: │
│ ┌─────────────────────────────────────────────────────────────────────┐ │
│ │ # 请求QPS │ │
│ │ rate(llm_requests_total[5m]) │ │
│ │ │ │
│ │ # P95延迟 │ │
│ │ histogram_quantile(0.95, rate(llm_latency_bucket[5m])) │ │
│ │ │ │
│ │ # 错误率 │ │
│ │ rate(llm_errors_total[5m]) / rate(llm_requests_total[5m]) * 100 │ │
│ │ │ │
│ │ # GPU显存使用率 │ │
│ │ gpu_memory_used_bytes / gpu_memory_total_bytes * 100 │ │
│ └─────────────────────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────────────────────┘Prometheus指标代码
python
"""
Prometheus风格的指标暴露
支持: Counter、Gauge、Histogram、Summary
可直接与Prometheus集成
"""
import time
import math
from typing import Dict, List, Optional, Tuple
from collections import defaultdict
from dataclasses import dataclass, field
class PrometheusCounter:
"""Prometheus Counter (只增不减)"""
def __init__(self, name: str, description: str, labels: List[str] = None):
self.name = name
self.description = description
self.label_names = labels or []
self._values: Dict[Tuple, float] = defaultdict(float)
def inc(self, value: float = 1, labels: Dict[str, str] = None):
key = tuple(sorted((labels or {}).items()))
self._values[key] += value
def get(self, labels: Dict[str, str] = None) -> float:
key = tuple(sorted((labels or {}).items()))
return self._values[key]
def to_prometheus(self) -> str:
lines = [
f"# HELP {self.name} {self.description}",
f"# TYPE {self.name} counter",
]
for label_tuple, value in self._values.items():
label_str = ",".join(f'{k}="{v}"' for k, v in label_tuple)
if label_str:
lines.append(f"{self.name}{{{label_str}}} {value}")
else:
lines.append(f"{self.name} {value}")
return "\n".join(lines)
class PrometheusGauge:
"""Prometheus Gauge (可增可减)"""
def __init__(self, name: str, description: str):
self.name = name
self.description = description
self._value = 0.0
def set(self, value: float):
self._value = value
def inc(self, value: float = 1):
self._value += value
def dec(self, value: float = 1):
self._value -= value
def get(self) -> float:
return self._value
def to_prometheus(self) -> str:
return (
f"# HELP {self.name} {self.description}\n"
f"# TYPE {self.name} gauge\n"
f"{self.name} {self._value}"
)
class PrometheusHistogram:
"""Prometheus Histogram (分布)"""
def __init__(self, name: str, description: str,
buckets: List[float] = None):
self.name = name
self.description = description
self.buckets = sorted(buckets or [
10, 25, 50, 100, 250, 500, 1000, 2500, 5000, 10000
])
self._bucket_counts: Dict[float, int] = {b: 0 for b in self.buckets}
self._bucket_counts[float("inf")] = 0
self._sum = 0.0
self._count = 0
def observe(self, value: float):
self._sum += value
self._count += 1
for bucket in self.buckets:
if value <= bucket:
self._bucket_counts[bucket] += 1
self._bucket_counts[float("inf")] += 1
def to_prometheus(self) -> str:
lines = [
f"# HELP {self.name} {self.description}",
f"# TYPE {self.name} histogram",
]
cumulative = 0
for bucket in self.buckets:
cumulative += self._bucket_counts[bucket]
lines.append(f'{self.name}_bucket{{le="{bucket}"}} {cumulative}')
lines.append(f'{self.name}_bucket{{le="+Inf"}} {self._count}')
lines.append(f"{self.name}_sum {self._sum}")
lines.append(f"{self.name}_count {self._count}")
return "\n".join(lines)
class LLMMetricsExporter:
"""
LLM服务Prometheus指标导出器
暴露 /metrics 端点供Prometheus抓取
"""
def __init__(self):
# 应用指标
self.request_total = PrometheusCounter(
"llm_requests_total", "Total LLM requests",
labels=["model", "status"],
)
self.request_latency = PrometheusHistogram(
"llm_request_latency_ms", "Request latency in milliseconds",
buckets=[50, 100, 200, 500, 1000, 2000, 5000, 10000],
)
self.ttft = PrometheusHistogram(
"llm_ttft_ms", "Time to first token in milliseconds",
buckets=[20, 50, 100, 200, 500, 1000],
)
self.tokens_generated = PrometheusCounter(
"llm_tokens_generated_total", "Total tokens generated",
)
self.active_requests = PrometheusGauge(
"llm_active_requests", "Currently active requests",
)
# GPU指标
self.gpu_utilization = PrometheusGauge(
"llm_gpu_utilization_percent", "GPU utilization percentage",
)
self.gpu_memory = PrometheusGauge(
"llm_gpu_memory_used_gb", "GPU memory used in GB",
)
# 模型指标
self.model_accuracy = PrometheusGauge(
"llm_model_accuracy_percent", "Model accuracy percentage",
)
def record_request(self, model: str, status: str,
latency_ms: float, ttft_ms: float,
tokens: int):
"""记录一次请求的所有指标"""
self.request_total.inc(labels={"model": model, "status": status})
self.request_latency.observe(latency_ms)
self.ttft.observe(ttft_ms)
self.tokens_generated.inc(tokens)
def get_metrics_text(self) -> str:
"""输出Prometheus格式的指标文本"""
parts = [
self.request_total.to_prometheus(),
self.request_latency.to_prometheus(),
self.ttft.to_prometheus(),
self.tokens_generated.to_prometheus(),
self.active_requests.to_prometheus(),
self.gpu_utilization.to_prometheus(),
self.gpu_memory.to_prometheus(),
self.model_accuracy.to_prometheus(),
]
return "\n\n".join(parts) + "\n"
def generate_prometheus_config(self) -> str:
"""生成Prometheus配置文件"""
return """# prometheus.yml
global:
scrape_interval: 15s
evaluation_interval: 15s
rule_files:
- "alert_rules.yml"
alerting:
alertmanagers:
- static_configs:
- targets: ['alertmanager:9093']
scrape_configs:
# LLM服务指标
- job_name: 'llm-service'
static_configs:
- targets: ['vllm-server:8000']
metrics_path: /metrics
scrape_interval: 10s
# FastAPI应用指标
- job_name: 'llm-api'
static_configs:
- targets: ['api-server:8080']
# GPU指标 (NVIDIA DCGM Exporter)
- job_name: 'gpu-metrics'
static_configs:
- targets: ['dcgm-exporter:9400']
# 节点指标
- job_name: 'node'
static_configs:
- targets: ['node-exporter:9100']
"""
def demo_prometheus():
"""Prometheus指标演示"""
exporter = LLMMetricsExporter()
import random
random.seed(42)
# 模拟请求
print("=" * 60)
print("模拟100个LLM请求...")
print("=" * 60)
for i in range(100):
status = "success" if random.random() > 0.05 else "error"
latency = random.uniform(200, 3000)
ttft = random.uniform(30, 500)
tokens = random.randint(50, 500)
exporter.active_requests.inc()
exporter.record_request(
model="llama-8b", status=status,
latency_ms=latency, ttft_ms=ttft, tokens=tokens,
)
exporter.active_requests.dec()
# 设置GPU指标
exporter.gpu_utilization.set(78.5)
exporter.gpu_memory.set(52.3)
exporter.model_accuracy.set(91.2)
# 输出指标
print("\n--- Prometheus 指标输出 ---")
print(exporter.get_metrics_text()[:1500] + "\n... (截断)")
# 输出配置
print("\n--- Prometheus 配置 ---")
print(exporter.generate_prometheus_config())
if __name__ == "__main__":
demo_prometheus()Grafana可视化
Grafana仪表盘配置
┌─────────────────────────────────────────────────────────────────────────────┐
│ Grafana LLM监控仪表盘布局 │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ Row 1: 概览 (Overview) │
│ ┌────────────┐ ┌────────────┐ ┌────────────┐ ┌────────────┐ │
│ │ 当前QPS │ │ 平均延迟 │ │ 错误率 │ │ GPU利用率 │ │
│ │ ┌───┐ │ │ ┌───┐ │ │ ┌───┐ │ │ ┌───┐ │ │
│ │ │256│ │ │ │1.2s│ │ │ │2.1%│ │ │ │78%│ │ │
│ │ └───┘ │ │ └───┘ │ │ └───┘ │ │ └───┘ │ │
│ │ Stat Panel│ │ Stat Panel│ │ Stat Panel│ │ Stat Panel│ │
│ └────────────┘ └────────────┘ └────────────┘ └────────────┘ │
│ │
│ Row 2: 延迟分析 (Latency) │
│ ┌──────────────────────────────┐ ┌──────────────────────────────┐ │
│ │ 请求延迟 P50/P95/P99 │ │ TTFT分布 │ │
│ │ ms │ │ ms │ │
│ │ 5000│ │ │ 1000│ │ │
│ │ │ ╱──P99 │ │ │ ╱──P99 │ │
│ │ 2000│ ╱──P95 │ │ 500│ ╱──P95 │ │
│ │ │ ╱──P50 │ │ │ ╱──P50 │ │
│ │ 0│╱────────────── time │ │ 0│╱────────────── time │ │
│ │ Time Series Panel │ │ Time Series Panel │ │
│ └──────────────────────────────┘ └──────────────────────────────┘ │
│ │
│ Row 3: 吞吐量与GPU (Throughput & GPU) │
│ ┌──────────────────────────────┐ ┌──────────────────────────────┐ │
│ │ Tokens/s 吞吐量 │ │ GPU利用率 & 显存 │ │
│ │ t/s │ │ % │ │
│ │ 3000│ ╱─╲ │ │ 100│ ──GPU Util │ │
│ │ │ ╱ ╲───── │ │ 80│╱────╲─── │ │
│ │ 1000│╱ ╲ │ │ 60│ GPU Mem ────── │ │
│ │ 0│────────────── time │ │ 0│────────────── time │ │
│ └──────────────────────────────┘ └──────────────────────────────┘ │
│ │
│ Row 4: 模型质量 (Model Quality) │
│ ┌──────────────────────────────┐ ┌──────────────────────────────┐ │
│ │ 模型准确率趋势 │ │ 数据漂移评分 (PSI) │ │
│ │ % │ │ │ │
│ │ 95 │────────╲ │ │ 0.2│ ╱─╲ Critical │ │
│ │ 90 │ ╲────── │ │ 0.1│────╱ ╲── Warning │ │
│ │ 85 │ ╱ threshold │ │ 0│────────────── time │ │
│ │ 0│────────────── time │ │ │ │
│ └──────────────────────────────┘ └──────────────────────────────┘ │
└─────────────────────────────────────────────────────────────────────────────┘Grafana仪表盘代码
python
"""
Grafana仪表盘配置生成器
生成JSON格式的Grafana Dashboard配置
"""
import json
from typing import Dict, List
class GrafanaDashboardGenerator:
"""生成Grafana仪表盘JSON配置"""
@staticmethod
def generate_llm_dashboard() -> Dict:
"""生成LLM监控仪表盘"""
dashboard = {
"dashboard": {
"title": "LLM Service Monitor",
"tags": ["llm", "ai", "monitoring"],
"timezone": "browser",
"refresh": "10s",
"panels": [],
}
}
panels = dashboard["dashboard"]["panels"]
y_pos = 0
# Row 1: 概览统计
stat_panels = [
("当前QPS", 'rate(llm_requests_total[5m])', "requests/s"),
("平均延迟", 'avg(llm_request_latency_ms)', "ms"),
("错误率",
'rate(llm_requests_total{status="error"}[5m])'
'/rate(llm_requests_total[5m])*100', "%"),
("GPU利用率", 'llm_gpu_utilization_percent', "%"),
]
for i, (title, expr, unit) in enumerate(stat_panels):
panels.append({
"type": "stat",
"title": title,
"gridPos": {"h": 4, "w": 6, "x": i * 6, "y": y_pos},
"targets": [{"expr": expr}],
"fieldConfig": {"defaults": {"unit": unit}},
})
y_pos += 4
# Row 2: 延迟时序图
panels.append({
"type": "timeseries",
"title": "请求延迟 P50/P95/P99",
"gridPos": {"h": 8, "w": 12, "x": 0, "y": y_pos},
"targets": [
{"expr": 'histogram_quantile(0.5, rate(llm_request_latency_ms_bucket[5m]))',
"legendFormat": "P50"},
{"expr": 'histogram_quantile(0.95, rate(llm_request_latency_ms_bucket[5m]))',
"legendFormat": "P95"},
{"expr": 'histogram_quantile(0.99, rate(llm_request_latency_ms_bucket[5m]))',
"legendFormat": "P99"},
],
"fieldConfig": {"defaults": {"unit": "ms"}},
})
panels.append({
"type": "timeseries",
"title": "TTFT分布 (首Token延迟)",
"gridPos": {"h": 8, "w": 12, "x": 12, "y": y_pos},
"targets": [
{"expr": 'histogram_quantile(0.5, rate(llm_ttft_ms_bucket[5m]))',
"legendFormat": "P50"},
{"expr": 'histogram_quantile(0.95, rate(llm_ttft_ms_bucket[5m]))',
"legendFormat": "P95"},
],
"fieldConfig": {"defaults": {"unit": "ms"}},
})
y_pos += 8
# Row 3: GPU监控
panels.append({
"type": "timeseries",
"title": "GPU利用率 & 显存使用",
"gridPos": {"h": 8, "w": 12, "x": 0, "y": y_pos},
"targets": [
{"expr": "llm_gpu_utilization_percent",
"legendFormat": "GPU利用率"},
{"expr": "llm_gpu_memory_used_gb / 80 * 100",
"legendFormat": "显存使用率"},
],
"fieldConfig": {"defaults": {"unit": "percent", "max": 100}},
})
panels.append({
"type": "timeseries",
"title": "生成吞吐量 (tokens/s)",
"gridPos": {"h": 8, "w": 12, "x": 12, "y": y_pos},
"targets": [
{"expr": 'rate(llm_tokens_generated_total[5m])',
"legendFormat": "tokens/s"},
],
})
return dashboard
@staticmethod
def generate_docker_compose() -> str:
"""生成监控栈docker-compose"""
return """# docker-compose-monitoring.yml
version: '3.8'
services:
prometheus:
image: prom/prometheus:latest
ports:
- "9090:9090"
volumes:
- ./prometheus.yml:/etc/prometheus/prometheus.yml
- ./alert_rules.yml:/etc/prometheus/alert_rules.yml
- prometheus_data:/prometheus
command:
- '--config.file=/etc/prometheus/prometheus.yml'
- '--storage.tsdb.retention.time=15d'
grafana:
image: grafana/grafana:latest
ports:
- "3000:3000"
environment:
- GF_SECURITY_ADMIN_PASSWORD=admin
volumes:
- grafana_data:/var/lib/grafana
- ./grafana/dashboards:/etc/grafana/provisioning/dashboards
- ./grafana/datasources:/etc/grafana/provisioning/datasources
depends_on:
- prometheus
alertmanager:
image: prom/alertmanager:latest
ports:
- "9093:9093"
volumes:
- ./alertmanager.yml:/etc/alertmanager/alertmanager.yml
node-exporter:
image: prom/node-exporter:latest
ports:
- "9100:9100"
volumes:
prometheus_data:
grafana_data:
"""
def demo_grafana():
"""Grafana仪表盘生成演示"""
generator = GrafanaDashboardGenerator()
dashboard = generator.generate_llm_dashboard()
print("=" * 60)
print("Grafana LLM Dashboard JSON (部分):")
print("=" * 60)
print(json.dumps(dashboard, indent=2, ensure_ascii=False)[:2000])
print("\n... (截断)")
print("\n--- Docker Compose监控栈 ---")
print(generator.generate_docker_compose())
if __name__ == "__main__":
demo_grafana()LLM专用监控指标
LLM质量监控
┌─────────────────────────────────────────────────────────────────────────────┐
│ LLM专用监控指标体系 │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ 性能指标 质量指标 │
│ ┌──────────────────────────┐ ┌──────────────────────────┐ │
│ │ TTFT (Time To First │ │ 幻觉率 │ │
│ │ Token) │ │ - 事实性错误比例 │ │
│ │ 目标: <200ms │ │ - 阈值: <10% │ │
│ │ │ │ │ │
│ │ TPOT (Time Per Output │ │ 拒答率 │ │
│ │ Token) │ │ - 不当拒绝比例 │ │
│ │ 目标: <50ms │ │ - 阈值: <5% │ │
│ │ │ │ │ │
│ │ TPS (Tokens Per Second) │ │ 指令遵循度 │ │
│ │ 全系统吞吐量 │ │ - 格式/约束遵守率 │ │
│ │ │ │ - 阈值: >90% │ │
│ │ 并发请求数 │ │ │ │
│ │ 同时处理的请求 │ │ 输出多样性 │ │
│ │ │ │ - 重复/模板化输出率 │ │
│ └──────────────────────────┘ └──────────────────────────┘ │
│ │
│ 成本指标 安全指标 │
│ ┌──────────────────────────┐ ┌──────────────────────────┐ │
│ │ Token消耗量 │ │ 有害内容检出率 │ │
│ │ - 输入token数 │ │ - 安全过滤触发频率 │ │
│ │ - 输出token数 │ │ │ │
│ │ │ │ PII泄露检测 │ │
│ │ 每请求成本 │ │ - 个人信息泄露率 │ │
│ │ - 按模型/用户统计 │ │ │ │
│ │ │ │ 越狱检测 │ │
│ │ 缓存命中率 │ │ - 提示注入攻击频率 │ │
│ │ - 语义缓存有效性 │ │ │ │
│ └──────────────────────────┘ └──────────────────────────┘ │
└─────────────────────────────────────────────────────────────────────────────┘LLM质量监控代码
python
"""
LLM专用质量监控
包含: 幻觉检测、数据漂移、输出质量评估
"""
import time
import hashlib
from typing import Dict, List, Optional
from dataclasses import dataclass, field
@dataclass
class LLMRequest:
"""LLM请求记录"""
request_id: str
prompt: str
response: str
model: str
input_tokens: int
output_tokens: int
latency_ms: float
ttft_ms: float
timestamp: float = 0
class LLMQualityMonitor:
"""
LLM输出质量监控
检测: 幻觉、拒答、指令遵循、输出多样性
"""
def __init__(self):
self.request_log: List[LLMRequest] = []
self.quality_scores: List[Dict] = []
# 简单关键词检测(实际生产中使用专门的检测模型)
self.refusal_patterns = [
"我无法", "我不能", "作为AI", "我没有能力",
"I cannot", "I'm unable", "As an AI",
]
self.uncertainty_patterns = [
"可能", "也许", "不确定", "我认为", "据我所知",
]
def evaluate_response(self, request: LLMRequest) -> Dict:
"""评估单次响应的质量"""
request.timestamp = time.time()
self.request_log.append(request)
scores = {
"request_id": request.request_id,
"timestamp": request.timestamp,
"response_length": len(request.response),
"is_refusal": self._check_refusal(request.response),
"uncertainty_score": self._check_uncertainty(request.response),
"repetition_score": self._check_repetition(request.response),
"format_compliance": self._check_format(request.prompt, request.response),
"response_relevance": self._check_relevance(request.prompt, request.response),
}
self.quality_scores.append(scores)
return scores
def _check_refusal(self, response: str) -> bool:
"""检查是否是拒答"""
return any(p in response for p in self.refusal_patterns)
def _check_uncertainty(self, response: str) -> float:
"""检查不确定性程度 (0-1)"""
count = sum(1 for p in self.uncertainty_patterns if p in response)
return min(1.0, count / 3)
def _check_repetition(self, response: str) -> float:
"""检查重复率 (0-1)"""
words = response.split()
if len(words) < 10:
return 0
# N-gram重复率
trigrams = [tuple(words[i:i+3]) for i in range(len(words) - 2)]
unique_ratio = len(set(trigrams)) / len(trigrams) if trigrams else 1
return 1 - unique_ratio
def _check_format(self, prompt: str, response: str) -> float:
"""检查格式遵循度 (0-1)"""
score = 1.0
if "JSON" in prompt and "{" not in response:
score -= 0.5
if "列表" in prompt and ("1." not in response and "-" not in response):
score -= 0.3
if "代码" in prompt and "```" not in response and "def " not in response:
score -= 0.3
return max(0, score)
def _check_relevance(self, prompt: str, response: str) -> float:
"""检查响应相关性 (简单词重叠, 0-1)"""
prompt_words = set(prompt.lower().split())
response_words = set(response.lower().split())
if not prompt_words:
return 0
overlap = len(prompt_words & response_words)
return min(1.0, overlap / max(5, len(prompt_words) * 0.3))
def get_quality_report(self, minutes: int = 60) -> str:
"""生成质量报告"""
cutoff = time.time() - minutes * 60
recent = [s for s in self.quality_scores if s["timestamp"] > cutoff]
if not recent:
return "无数据"
total = len(recent)
refusal_count = sum(1 for s in recent if s["is_refusal"])
avg_uncertainty = sum(s["uncertainty_score"] for s in recent) / total
avg_repetition = sum(s["repetition_score"] for s in recent) / total
avg_format = sum(s["format_compliance"] for s in recent) / total
avg_relevance = sum(s["response_relevance"] for s in recent) / total
lines = [
f"{'=' * 50}",
f"LLM质量监控报告 (最近{minutes}分钟)",
f"{'=' * 50}",
f"总请求数: {total}",
f"拒答率: {refusal_count/total:.1%} ({refusal_count}/{total})",
f"不确定性: {avg_uncertainty:.2f} (0=确定, 1=高度不确定)",
f"重复率: {avg_repetition:.2f} (0=无重复, 1=高度重复)",
f"格式遵循: {avg_format:.2f} (1=完全遵循)",
f"内容相关: {avg_relevance:.2f} (1=高度相关)",
f"{'─' * 50}",
f"综合质量评分: {(avg_format + avg_relevance + (1-avg_repetition)) / 3:.2f}",
]
return "\n".join(lines)
class DataDriftDetector:
"""数据漂移检测器"""
def __init__(self, window_size: int = 1000):
self.reference_distribution: Dict[str, float] = {}
self.current_samples: List[Dict] = []
self.window_size = window_size
def set_reference(self, distribution: Dict[str, float]):
"""设置参考分布"""
self.reference_distribution = distribution
print(f"[漂移检测] 参考分布已设置: {len(distribution)}个特征")
def add_sample(self, features: Dict[str, float]):
"""添加新样本"""
self.current_samples.append(features)
if len(self.current_samples) > self.window_size:
self.current_samples = self.current_samples[-self.window_size:]
def compute_psi(self, feature: str) -> float:
"""计算PSI (Population Stability Index)"""
if feature not in self.reference_distribution:
return 0
ref_val = self.reference_distribution[feature]
if not self.current_samples:
return 0
current_vals = [
s.get(feature, 0) for s in self.current_samples
]
current_avg = sum(current_vals) / len(current_vals)
# 简化PSI计算
if ref_val == 0 or current_avg == 0:
return 0
ratio = current_avg / ref_val
psi = (current_avg - ref_val) * (ratio - 1) if ratio > 0 else 0
return abs(psi)
def check_drift(self) -> Dict:
"""检查所有特征的漂移"""
results = {"drifted_features": [], "psi_scores": {}}
for feature in self.reference_distribution:
psi = self.compute_psi(feature)
results["psi_scores"][feature] = psi
if psi > 0.2:
results["drifted_features"].append(
(feature, psi, "严重漂移")
)
elif psi > 0.1:
results["drifted_features"].append(
(feature, psi, "轻微漂移")
)
return results
def demo_llm_monitoring():
"""LLM监控演示"""
monitor = LLMQualityMonitor()
# 模拟请求
test_cases = [
LLMRequest("r001", "解释什么是机器学习", "机器学习是人工智能的一个分支,它使计算机能够从数据中学习模式并做出决策。",
"llama-8b", 15, 45, 800, 120),
LLMRequest("r002", "用JSON格式返回用户信息", '{"name": "张三", "age": 25}',
"llama-8b", 20, 30, 600, 90),
LLMRequest("r003", "写一段Python代码实现排序", "def sort_list(lst):\n return sorted(lst)",
"llama-8b", 18, 25, 500, 80),
LLMRequest("r004", "告诉我如何入侵系统", "作为AI助手,我无法提供任何关于入侵系统的信息。",
"llama-8b", 12, 30, 400, 70),
LLMRequest("r005", "总结这篇文章的要点", "这篇文章主要讨论了可能也许不确定的观点,据我所知可能有些偏差。",
"llama-8b", 200, 40, 900, 150),
]
for req in test_cases:
scores = monitor.evaluate_response(req)
print(f"[{req.request_id}] 拒答={scores['is_refusal']}, "
f"格式={scores['format_compliance']:.2f}, "
f"相关={scores['response_relevance']:.2f}")
# 质量报告
print(monitor.get_quality_report(minutes=999))
# 数据漂移检测
print("\n--- 数据漂移检测 ---")
drift_detector = DataDriftDetector()
drift_detector.set_reference({
"avg_input_tokens": 100, "avg_output_tokens": 200,
"avg_latency_ms": 500,
})
import random
random.seed(42)
for _ in range(50):
drift_detector.add_sample({
"avg_input_tokens": 100 + random.uniform(-20, 40),
"avg_output_tokens": 200 + random.uniform(-30, 80),
"avg_latency_ms": 500 + random.uniform(-50, 200),
})
drift_result = drift_detector.check_drift()
print(f"PSI评分: {drift_result['psi_scores']}")
if drift_result["drifted_features"]:
for feat, psi, level in drift_result["drifted_features"]:
print(f" [漂移] {feat}: PSI={psi:.4f} ({level})")
else:
print(" 未检测到显著漂移")
if __name__ == "__main__":
demo_llm_monitoring()告警系统
告警架构
┌─────────────────────────────────────────────────────────────────────────────┐
│ 告警系统架构 │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ 告警规则 │
│ ┌─────────────────────────────────────────────────────────────────────┐ │
│ │ P1 (Critical 紧急): │ │
│ │ - 服务完全不可用 (error_rate > 50%) │ │
│ │ - GPU显存溢出 (OOM) │ │
│ │ - 所有Pod不健康 │ │
│ │ 通知: 电话 + 短信 + Slack (立即) │ │
│ │ │ │
│ │ P2 (Warning 警告): │ │
│ │ - 延迟P95 > 5s (持续5分钟) │ │
│ │ - 错误率 > 5% (持续5分钟) │ │
│ │ - GPU利用率 > 90% (持续10分钟) │ │
│ │ - 模型准确率下降 > 5% │ │
│ │ 通知: Slack + 邮件 (5分钟内) │ │
│ │ │ │
│ │ P3 (Info 提示): │ │
│ │ - 数据漂移PSI > 0.1 │ │
│ │ - 缓存命中率 < 30% │ │
│ │ - 成本超出预算80% │ │
│ │ 通知: Slack (工作时间内) │ │
│ └─────────────────────────────────────────────────────────────────────┘ │
│ │
│ 告警流程: │
│ ┌──────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────┐ │
│ │ 规则 │──>│ 触发 │──>│ 分组 │──>│ 静默 │──>│ 通知 │ │
│ │ 评估 │ │ 判断 │ │ 去重 │ │ 检查 │ │ 发送 │ │
│ └──────┘ └──────────┘ └──────────┘ └──────────┘ └──────┘ │
└─────────────────────────────────────────────────────────────────────────────┘告警系统代码
python
"""
AI系统告警管理
支持: 多级告警、去重、静默、通知路由
"""
import time
from typing import Dict, List, Callable, Optional
from dataclasses import dataclass, field
from enum import Enum
class AlertSeverity(Enum):
CRITICAL = "critical" # P1
WARNING = "warning" # P2
INFO = "info" # P3
class AlertState(Enum):
FIRING = "firing"
RESOLVED = "resolved"
SILENCED = "silenced"
@dataclass
class AlertRule:
"""告警规则"""
name: str
severity: AlertSeverity
condition: str # 描述
check_func: Callable[[Dict], bool] # 检查函数
message_template: str
for_duration: int = 300 # 持续时间(秒)
labels: Dict[str, str] = field(default_factory=dict)
@dataclass
class Alert:
"""告警实例"""
rule_name: str
severity: AlertSeverity
state: AlertState
message: str
first_fired: float
last_fired: float
resolved_at: float = 0
notification_sent: bool = False
labels: Dict[str, str] = field(default_factory=dict)
class AlertManager:
"""
告警管理器
支持: 规则评估、去重、静默、通知路由
"""
def __init__(self):
self.rules: List[AlertRule] = []
self.active_alerts: Dict[str, Alert] = {}
self.alert_history: List[Alert] = []
self.silences: Dict[str, float] = {} # name -> until_timestamp
self.notification_channels: Dict[str, List[Callable]] = {
"critical": [],
"warning": [],
"info": [],
}
def add_rule(self, rule: AlertRule):
"""添加告警规则"""
self.rules.append(rule)
def add_notification_channel(self, severity: str,
handler: Callable):
"""添加通知渠道"""
if severity in self.notification_channels:
self.notification_channels[severity].append(handler)
def silence(self, rule_name: str, duration_seconds: int):
"""静默某条告警"""
self.silences[rule_name] = time.time() + duration_seconds
print(f"[告警] 已静默: {rule_name} ({duration_seconds}秒)")
def evaluate(self, metrics: Dict) -> List[Alert]:
"""评估所有规则"""
new_alerts = []
now = time.time()
for rule in self.rules:
is_firing = rule.check_func(metrics)
if is_firing:
if rule.name in self.active_alerts:
# 更新已有告警
alert = self.active_alerts[rule.name]
alert.last_fired = now
else:
# 创建新告警
alert = Alert(
rule_name=rule.name,
severity=rule.severity,
state=AlertState.FIRING,
message=rule.message_template.format(**metrics),
first_fired=now,
last_fired=now,
labels=rule.labels,
)
self.active_alerts[rule.name] = alert
new_alerts.append(alert)
# 检查静默
if rule.name in self.silences:
if now < self.silences[rule.name]:
alert.state = AlertState.SILENCED
continue
# 发送通知
self._notify(alert)
else:
# 告警恢复
if rule.name in self.active_alerts:
alert = self.active_alerts.pop(rule.name)
alert.state = AlertState.RESOLVED
alert.resolved_at = now
self.alert_history.append(alert)
print(f"[告警恢复] {rule.name}")
return new_alerts
def _notify(self, alert: Alert):
"""发送告警通知"""
if alert.notification_sent:
return
if alert.state == AlertState.SILENCED:
return
severity = alert.severity.value
handlers = self.notification_channels.get(severity, [])
for handler in handlers:
handler(alert)
alert.notification_sent = True
def get_status(self) -> str:
"""获取告警状态"""
lines = [
f"{'=' * 60}",
f"告警管理器状态",
f"{'=' * 60}",
f"活跃告警: {len(self.active_alerts)}",
f"历史告警: {len(self.alert_history)}",
]
if self.active_alerts:
lines.append(f"\n活跃告警列表:")
for name, alert in self.active_alerts.items():
duration = time.time() - alert.first_fired
lines.append(
f" [{alert.severity.value.upper()}] {name} "
f"({alert.state.value}) 持续{duration:.0f}秒"
)
lines.append(f" {alert.message}")
return "\n".join(lines)
def generate_alertmanager_config(self) -> str:
"""生成Alertmanager配置"""
return """# alertmanager.yml
global:
resolve_timeout: 5m
route:
group_by: ['alertname', 'severity']
group_wait: 30s
group_interval: 5m
repeat_interval: 4h
receiver: 'default'
routes:
- match:
severity: critical
receiver: 'critical-team'
repeat_interval: 15m
- match:
severity: warning
receiver: 'warning-team'
repeat_interval: 1h
receivers:
- name: 'default'
slack_configs:
- api_url: 'https://hooks.slack.com/services/YOUR/WEBHOOK/URL'
channel: '#ai-monitoring'
- name: 'critical-team'
slack_configs:
- api_url: 'https://hooks.slack.com/services/YOUR/WEBHOOK/URL'
channel: '#ai-alerts-critical'
pagerduty_configs:
- service_key: 'your-pagerduty-key'
- name: 'warning-team'
slack_configs:
- api_url: 'https://hooks.slack.com/services/YOUR/WEBHOOK/URL'
channel: '#ai-alerts-warning'
"""
def generate_alert_rules(self) -> str:
"""生成Prometheus告警规则"""
return """# alert_rules.yml
groups:
- name: llm_alerts
rules:
# P1: 服务不可用
- alert: LLMServiceDown
expr: up{job="llm-service"} == 0
for: 1m
labels:
severity: critical
annotations:
summary: "LLM服务不可用"
description: "LLM服务 {{ $labels.instance }} 已停止响应超过1分钟"
# P2: 高延迟
- alert: LLMHighLatency
expr: histogram_quantile(0.95, rate(llm_request_latency_ms_bucket[5m])) > 5000
for: 5m
labels:
severity: warning
annotations:
summary: "LLM推理延迟过高"
description: "P95延迟 {{ $value }}ms 超过5000ms阈值"
# P2: 高错误率
- alert: LLMHighErrorRate
expr: rate(llm_requests_total{status="error"}[5m]) / rate(llm_requests_total[5m]) > 0.05
for: 5m
labels:
severity: warning
annotations:
summary: "LLM服务错误率过高"
description: "错误率 {{ $value | humanizePercentage }} 超过5%阈值"
# P2: GPU显存高
- alert: GPUMemoryHigh
expr: llm_gpu_memory_used_gb / 80 > 0.9
for: 10m
labels:
severity: warning
annotations:
summary: "GPU显存使用超过90%"
# P3: 数据漂移
- alert: DataDriftDetected
expr: llm_data_drift_psi > 0.1
for: 30m
labels:
severity: info
annotations:
summary: "检测到数据漂移"
description: "PSI值 {{ $value }} 超过0.1阈值"
"""
def demo_alerts():
"""告警系统演示"""
manager = AlertManager()
# 定义告警规则
manager.add_rule(AlertRule(
name="高延迟", severity=AlertSeverity.WARNING,
condition="P95延迟 > 2000ms",
check_func=lambda m: m.get("latency_p95", 0) > 2000,
message_template="P95延迟 {latency_p95:.0f}ms 超过阈值2000ms",
))
manager.add_rule(AlertRule(
name="高错误率", severity=AlertSeverity.CRITICAL,
condition="错误率 > 10%",
check_func=lambda m: m.get("error_rate", 0) > 0.1,
message_template="错误率 {error_rate:.1%} 超过阈值10%",
))
manager.add_rule(AlertRule(
name="GPU显存高", severity=AlertSeverity.WARNING,
condition="GPU显存 > 90%",
check_func=lambda m: m.get("gpu_memory_pct", 0) > 90,
message_template="GPU显存使用 {gpu_memory_pct:.0f}% 超过90%",
))
# 添加通知渠道
def slack_notify(alert: Alert):
print(f" [Slack通知] [{alert.severity.value}] {alert.message}")
def pagerduty_notify(alert: Alert):
print(f" [PagerDuty] [{alert.severity.value}] {alert.message}")
manager.add_notification_channel("critical", pagerduty_notify)
manager.add_notification_channel("critical", slack_notify)
manager.add_notification_channel("warning", slack_notify)
# 模拟指标变化
scenarios = [
{"name": "正常", "latency_p95": 800, "error_rate": 0.02, "gpu_memory_pct": 70},
{"name": "延迟升高", "latency_p95": 3500, "error_rate": 0.03, "gpu_memory_pct": 75},
{"name": "故障", "latency_p95": 8000, "error_rate": 0.15, "gpu_memory_pct": 95},
{"name": "恢复中", "latency_p95": 1500, "error_rate": 0.04, "gpu_memory_pct": 80},
{"name": "恢复", "latency_p95": 500, "error_rate": 0.01, "gpu_memory_pct": 65},
]
for scenario in scenarios:
name = scenario.pop("name")
print(f"\n--- 场景: {name} ---")
alerts = manager.evaluate(scenario)
if not alerts and not manager.active_alerts:
print(" 一切正常, 无活跃告警")
scenario["name"] = name
print(f"\n{manager.get_status()}")
print(f"\n--- Prometheus告警规则 ---")
print(manager.generate_alert_rules()[:1000] + "\n... (截断)")
if __name__ == "__main__":
demo_alerts()日志收集与分析
结构化日志
python
"""
AI服务结构化日志系统
支持: JSON格式日志、请求追踪、性能日志
"""
import json
import time
import uuid
from typing import Dict, Any, Optional
from dataclasses import dataclass, asdict
@dataclass
class LLMLogEntry:
"""LLM请求日志条目"""
timestamp: str
level: str
request_id: str
event: str
model: str = ""
input_tokens: int = 0
output_tokens: int = 0
latency_ms: float = 0
ttft_ms: float = 0
status: str = ""
error: str = ""
user_id: str = ""
cost_usd: float = 0
extra: Dict[str, Any] = None
class StructuredLogger:
"""结构化日志记录器"""
def __init__(self, service_name: str = "llm-service"):
self.service_name = service_name
self.logs: list = []
def _log(self, level: str, event: str, **kwargs) -> LLMLogEntry:
entry = LLMLogEntry(
timestamp=time.strftime("%Y-%m-%dT%H:%M:%S"),
level=level,
request_id=kwargs.get("request_id", ""),
event=event,
**{k: v for k, v in kwargs.items() if k != "request_id"},
)
log_dict = {k: v for k, v in asdict(entry).items() if v}
log_dict["service"] = self.service_name
self.logs.append(log_dict)
print(json.dumps(log_dict, ensure_ascii=False))
return entry
def info(self, event: str, **kwargs):
return self._log("INFO", event, **kwargs)
def warning(self, event: str, **kwargs):
return self._log("WARNING", event, **kwargs)
def error(self, event: str, **kwargs):
return self._log("ERROR", event, **kwargs)
def log_request(self, request_id: str, model: str,
input_tokens: int, output_tokens: int,
latency_ms: float, ttft_ms: float,
status: str, cost_usd: float = 0,
user_id: str = "", error: str = ""):
"""记录完整的LLM请求日志"""
level = "INFO" if status == "success" else "ERROR"
self._log(
level, "llm_request_completed",
request_id=request_id, model=model,
input_tokens=input_tokens, output_tokens=output_tokens,
latency_ms=latency_ms, ttft_ms=ttft_ms,
status=status, cost_usd=cost_usd,
user_id=user_id, error=error,
)
def demo_logging():
"""日志系统演示"""
logger = StructuredLogger("llm-api-v1")
logger.info("service_started", extra={"version": "1.0.0", "gpu": "A100"})
# 模拟请求日志
for i in range(3):
req_id = f"req-{uuid.uuid4().hex[:8]}"
logger.log_request(
request_id=req_id, model="llama-8b",
input_tokens=150, output_tokens=200,
latency_ms=800 + i * 100, ttft_ms=120,
status="success", cost_usd=0.003,
user_id=f"user_{i}",
)
# 错误日志
logger.log_request(
request_id="req-err001", model="llama-8b",
input_tokens=500, output_tokens=0,
latency_ms=5000, ttft_ms=0,
status="error", error="GPU OOM: out of memory",
)
logger.warning("gpu_memory_high", extra={"gpu_id": 0, "usage_pct": 92})
if __name__ == "__main__":
demo_logging()完整监控系统搭建
一键部署配置
python
"""
完整AI监控系统一键部署
整合: Prometheus + Grafana + Alertmanager + 结构化日志
"""
def generate_full_monitoring_stack() -> str:
"""生成完整监控栈部署配置"""
return """
# ================================================================
# 完整AI监控系统部署指南
# ================================================================
# 1. 克隆配置
mkdir -p ai-monitoring && cd ai-monitoring
# 2. 启动监控栈
docker compose -f docker-compose-monitoring.yml up -d
# 3. 访问各服务
# - Grafana: http://localhost:3000 (admin/admin)
# - Prometheus: http://localhost:9090
# - Alertmanager: http://localhost:9093
# 4. 导入Grafana仪表盘
# 在Grafana中导入 dashboards/llm-monitor.json
# 5. 配置告警通知
# 编辑 alertmanager.yml 中的Slack webhook地址
# 6. 验证
curl http://localhost:9090/-/healthy # Prometheus健康
curl http://localhost:3000/api/health # Grafana健康
"""
if __name__ == "__main__":
print(generate_full_monitoring_stack())总结
本教程涵盖了AI系统监控的核心内容:
- 监控概述: 三层架构(采集/存储/展示), AI系统与传统系统的监控差异
- 指标体系: 四层模型(基础设施/应用性能/模型质量/业务效果), 指标定义与健康评分
- Prometheus: 指标采集架构, Counter/Gauge/Histogram实现, PromQL查询
- Grafana: 仪表盘布局设计, JSON配置生成, docker-compose监控栈
- LLM专用指标: 幻觉检测/拒答率/指令遵循度/数据漂移PSI检测
- 告警系统: 三级告警(P1/P2/P3), 去重/静默/通知路由, Alertmanager配置
- 日志收集: 结构化JSON日志, 请求追踪, 性能日志记录
- 完整系统: Prometheus + Grafana + Alertmanager一键部署
参考资源
创建时间: 2024-01-15 最后更新: 2024-01-15
💬 讨论
使用 GitHub 账号登录后即可参与讨论