AI 代码审查完整教程
目录
AI代码审查概述
为什么需要AI代码审查
传统代码审查依赖人工逐行检查,面临审查效率低、标准不统一、容易遗漏等问题。 AI代码审查通过大语言模型自动分析代码质量、安全性、性能和可维护性,作为人工审查的 有力补充,显著提高审查覆盖率和一致性。
AI审查系统架构
┌─────────────────────────────────────────────────────────────────┐
│ AI 代码审查系统架构 │
├─────────────────────────────────────────────────────────────────┤
│ │
│ ┌──────────────────────────────────────────────────────────┐ │
│ │ 触发层 │ │
│ │ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌─────────┐ │ │
│ │ │ Git Push │ │ PR 创建 │ │ 定时扫描 │ │ 手动触发│ │ │
│ │ └────┬─────┘ └────┬─────┘ └────┬─────┘ └────┬────┘ │ │
│ │ └──────────────┴──────────────┴──────────────┘ │ │
│ └──────────────────────────┬───────────────────────────────┘ │
│ v │
│ ┌──────────────────────────────────────────────────────────┐ │
│ │ 处理层 │ │
│ │ ┌────────────────────────────────────────────────────┐ │ │
│ │ │ 代码差异提取 (git diff) │ │ │
│ │ │ - 获取变更文件列表 │ │ │
│ │ │ - 提取新增/修改的代码行 │ │ │
│ │ │ - 收集上下文(函数签名、类结构、导入) │ │ │
│ │ └────────────────────────────────────────────────────┘ │ │
│ │ │ │ │
│ │ v │ │
│ │ ┌────────────────────────────────────────────────────┐ │ │
│ │ │ 多维度分析 │ │ │
│ │ │ ┌──────────┐ ┌──────────┐ ┌──────────┐ │ │ │
│ │ │ │ 代码质量 │ │ 安全检测 │ │ 性能分析 │ │ │ │
│ │ │ │ - 命名 │ │ - SQL注入 │ │ - 复杂度 │ │ │ │
│ │ │ │ - 复杂度 │ │ - XSS │ │ - N+1查询 │ │ │ │
│ │ │ │ - 重复 │ │ - 认证 │ │ - 内存 │ │ │ │
│ │ │ │ - 规范 │ │ - 泄露 │ │ - 算法 │ │ │ │
│ │ │ └──────────┘ └──────────┘ └──────────┘ │ │ │
│ │ └────────────────────────────────────────────────────┘ │ │
│ └──────────────────────────┬───────────────────────────────┘ │
│ v │
│ ┌──────────────────────────────────────────────────────────┐ │
│ │ 输出层 │ │
│ │ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌─────────┐ │ │
│ │ │ PR评论 │ │ 审查报告 │ │ Slack通知│ │ 阻止合并│ │ │
│ │ └──────────┘ └──────────┘ └──────────┘ └─────────┘ │ │
│ └──────────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────────┘AI审查 vs 人工审查对比
┌────────────────────────────────────────────────────────────────┐
│ AI审查 vs 人工审查 对比分析 │
├──────────────┬───────────────────┬─────────────────────────────┤
│ 维度 │ AI 审查 │ 人工审查 │
├──────────────┼───────────────────┼─────────────────────────────┤
│ 速度 │ 秒级 (几秒到几分) │ 小时到天级 │
│ 一致性 │ 高 (标准统一) │ 因人而异 │
│ 覆盖率 │ 100% 代码行 │ 通常 60-80% │
│ 安全检测 │ 模式匹配 + 语义 │ 依赖审查者经验 │
│ 业务理解 │ 弱 (缺乏上下文) │ 强 (了解业务背景) │
│ 架构判断 │ 有限 │ 强 (经验驱动) │
│ 疲劳 │ 无 │ 审查疲劳导致质量下降 │
│ 成本 │ API调用费用 │ 高级工程师时间成本 │
│ 学习能力 │ 依赖模型更新 │ 持续积累领域知识 │
├──────────────┴───────────────────┴─────────────────────────────┤
│ │
│ 最佳策略: AI 做第一轮筛查, 人工做最终决策 │
│ ┌──────────────────────────────────────────────────────────┐ │
│ │ AI 审查 (自动化) │ │
│ │ ├── 安全漏洞检测 (SQL注入、XSS、敏感信息泄露) │ │
│ │ ├── 代码规范检查 (命名、格式、复杂度) │ │
│ │ ├── 常见Bug模式识别 │ │
│ │ └── 性能反模式检测 (N+1、内存泄漏) │ │
│ │ │ │ │
│ │ v │ │
│ │ 人工审查 (终审) │ │
│ │ ├── 业务逻辑正确性 │ │
│ │ ├── 架构设计合理性 │ │
│ │ ├── AI 标记的问题确认/误报排除 │ │
│ │ └── 代码可维护性和团队规范 │ │
│ └──────────────────────────────────────────────────────────┘ │
└────────────────────────────────────────────────────────────────┘GPT-4代码分析实战
构建代码审查引擎
python
# ============================================================
# AI 代码审查引擎 - 基于 OpenAI GPT-4 API
# 功能: 自动分析代码变更的质量、安全性和性能
# ============================================================
import json
import os
from typing import List, Optional
from dataclasses import dataclass, field
from enum import Enum
import subprocess
class Severity(Enum):
"""问题严重程度"""
CRITICAL = "critical" # 严重: 安全漏洞、数据丢失风险
HIGH = "high" # 高: 明显的Bug或性能问题
MEDIUM = "medium" # 中: 代码质量问题
LOW = "low" # 低: 建议性改进
INFO = "info" # 信息: 风格建议
class Category(Enum):
"""问题分类"""
SECURITY = "security" # 安全问题
PERFORMANCE = "performance" # 性能问题
BUG = "bug" # 潜在Bug
QUALITY = "quality" # 代码质量
STYLE = "style" # 代码风格
MAINTAINABILITY = "maintainability" # 可维护性
@dataclass
class ReviewIssue:
"""审查发现的问题"""
file: str
line: int
severity: Severity
category: Category
title: str
description: str
suggestion: str
code_snippet: Optional[str] = None
@dataclass
class ReviewResult:
"""审查结果"""
issues: List[ReviewIssue] = field(default_factory=list)
summary: str = ""
overall_score: int = 0 # 0-100 分
files_reviewed: int = 0
lines_reviewed: int = 0
class AICodeReviewer:
"""AI 代码审查器"""
def __init__(self, api_key: str = None, model: str = "gpt-4"):
self.api_key = api_key or os.environ.get("OPENAI_API_KEY")
self.model = model
if not self.api_key:
raise ValueError("OPENAI_API_KEY 环境变量未设置")
def get_git_diff(self, base_branch: str = "main") -> str:
"""获取当前分支与基准分支的代码差异"""
try:
result = subprocess.run(
["git", "diff", base_branch, "--unified=5"],
capture_output=True,
text=True,
timeout=30
)
return result.stdout
except subprocess.TimeoutExpired:
return ""
def get_changed_files(self, base_branch: str = "main") -> List[str]:
"""获取变更文件列表"""
try:
result = subprocess.run(
["git", "diff", base_branch, "--name-only"],
capture_output=True,
text=True,
timeout=30
)
return [f for f in result.stdout.strip().split('\n') if f]
except subprocess.TimeoutExpired:
return []
def build_review_prompt(self, diff: str, focus_areas: List[str] = None) -> str:
"""构建审查 Prompt"""
areas = focus_areas or ["安全性", "性能", "代码质量", "可维护性"]
prompt = f"""你是一位资深代码审查专家。请对以下代码变更进行全面审查。
## 审查重点
{', '.join(areas)}
## 输出格式
请以JSON格式输出, 包含以下字段:
{{
"overall_score": 0-100的整体评分,
"summary": "一句话总结审查结果",
"issues": [
{{
"file": "文件名",
"line": 行号,
"severity": "critical/high/medium/low/info",
"category": "security/performance/bug/quality/style/maintainability",
"title": "问题标题",
"description": "问题描述",
"suggestion": "修复建议",
"code_snippet": "相关代码片段(可选)"
}}
]
}}
## 审查标准
1. 安全性: SQL注入、XSS、CSRF、敏感信息泄露、认证绕过
2. 性能: N+1查询、无必要循环、内存泄漏、阻塞操作
3. Bug: 空指针、边界条件、类型错误、逻辑错误
4. 质量: 命名规范、函数长度、圈复杂度、重复代码
5. 可维护性: 文档缺失、魔法数字、紧耦合、缺少错误处理
## 代码变更
{diff}
"""
return prompt
def review(self, base_branch: str = "main",
focus_areas: List[str] = None) -> ReviewResult:
"""执行代码审查"""
# 1. 获取代码差异
diff = self.get_git_diff(base_branch)
if not diff:
return ReviewResult(summary="没有检测到代码变更")
changed_files = self.get_changed_files(base_branch)
# 2. 构建 prompt
prompt = self.build_review_prompt(diff, focus_areas)
# 3. 调用 AI 模型 (此处用伪代码表示API调用)
# response = openai.ChatCompletion.create(
# model=self.model,
# messages=[
# {"role": "system", "content": "你是代码审查专家"},
# {"role": "user", "content": prompt}
# ],
# temperature=0.1,
# max_tokens=4000
# )
# review_data = json.loads(response.choices[0].message.content)
# 4. 解析结果 (示例)
review_data = self._mock_review_result()
# 5. 构建 ReviewResult
result = ReviewResult(
summary=review_data.get("summary", ""),
overall_score=review_data.get("overall_score", 0),
files_reviewed=len(changed_files),
lines_reviewed=len(diff.split('\n'))
)
for issue_data in review_data.get("issues", []):
issue = ReviewIssue(
file=issue_data["file"],
line=issue_data["line"],
severity=Severity(issue_data["severity"]),
category=Category(issue_data["category"]),
title=issue_data["title"],
description=issue_data["description"],
suggestion=issue_data["suggestion"],
code_snippet=issue_data.get("code_snippet")
)
result.issues.append(issue)
return result
def _mock_review_result(self) -> dict:
"""模拟审查结果 (实际项目中替换为API调用)"""
return {
"overall_score": 72,
"summary": "代码质量整体良好,但存在2个安全问题和1个性能问题需要修复",
"issues": [
{
"file": "src/api/users.py",
"line": 45,
"severity": "critical",
"category": "security",
"title": "SQL注入风险",
"description": "使用字符串拼接构建SQL查询",
"suggestion": "使用参数化查询替代字符串拼接",
"code_snippet": "query = f\"SELECT * FROM users WHERE name = '{name}'\""
}
]
}
# ============================================================
# 使用示例
# ============================================================
if __name__ == "__main__":
reviewer = AICodeReviewer()
result = reviewer.review(base_branch="main")
print(f"审查评分: {result.overall_score}/100")
print(f"审查摘要: {result.summary}")
print(f"审查文件数: {result.files_reviewed}")
print(f"发现问题数: {len(result.issues)}")
for issue in result.issues:
print(f"\n[{issue.severity.value.upper()}] {issue.title}")
print(f" 文件: {issue.file}:{issue.line}")
print(f" 描述: {issue.description}")
print(f" 建议: {issue.suggestion}")安全漏洞自动检测
安全扫描架构
┌─────────────────────────────────────────────────────────────────┐
│ AI 安全漏洞检测系统 │
├─────────────────────────────────────────────────────────────────┤
│ │
│ ┌──────────────────────────────────────────────────────────┐ │
│ │ OWASP Top 10 检测矩阵 │ │
│ │ │ │
│ │ A01: 访问控制缺陷 [检测: 权限检查遗漏] │ │
│ │ A02: 加密失败 [检测: 明文存储/弱加密] │ │
│ │ A03: 注入攻击 [检测: SQL/NoSQL/OS注入] │ │
│ │ A04: 不安全设计 [检测: 架构级安全缺陷] │ │
│ │ A05: 安全配置错误 [检测: 默认配置/调试开启] │ │
│ │ A06: 过时组件 [检测: 依赖版本扫描] │ │
│ │ A07: 认证失败 [检测: 弱密码/会话管理] │ │
│ │ A08: 数据完整性 [检测: 反序列化/CI-CD] │ │
│ │ A09: 日志监控不足 [检测: 敏感信息日志] │ │
│ │ A10: SSRF [检测: 服务端请求伪造] │ │
│ └──────────────────────────────────────────────────────────┘ │
│ │
│ ┌──────────────────────────────────────────────────────────┐ │
│ │ 检测流程 │ │
│ │ │ │
│ │ 源代码 ──> 静态分析 ──> AI语义分析 ──> 结果合并 ──> 报告 │ │
│ │ │ │ │ │ │
│ │ v v v │ │
│ │ 正则匹配 GPT-4分析 去重排序 │ │
│ │ AST解析 上下文理解 严重性评级 │ │
│ │ 规则引擎 意图推断 修复建议 │ │
│ └──────────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────────┘安全漏洞检测器实现
python
# ============================================================
# AI 安全漏洞检测器
# 支持: SQL注入、XSS、敏感信息泄露、不安全的认证
# ============================================================
import re
import ast
from typing import List, Dict, Tuple
from dataclasses import dataclass
@dataclass
class SecurityFinding:
"""安全发现"""
vulnerability_type: str
severity: str
file_path: str
line_number: int
code_line: str
description: str
remediation: str
cwe_id: str # CWE编号
class SecurityScanner:
"""安全漏洞扫描器 - 静态分析 + AI增强"""
# SQL注入检测模式
SQL_INJECTION_PATTERNS = [
# f-string 拼接SQL
(r'f["\'].*(?:SELECT|INSERT|UPDATE|DELETE|DROP|ALTER).*\{.*\}',
"SQL注入: 使用f-string拼接SQL查询"),
# .format() 拼接SQL
(r'(?:SELECT|INSERT|UPDATE|DELETE).*\.format\(',
"SQL注入: 使用.format()拼接SQL查询"),
# % 格式化拼接SQL
(r'(?:SELECT|INSERT|UPDATE|DELETE).*%\s*\(',
"SQL注入: 使用%格式化拼接SQL查询"),
# 字符串拼接SQL
(r'(?:SELECT|INSERT|UPDATE|DELETE).*\+\s*(?:request|user_input|params)',
"SQL注入: 字符串拼接用户输入"),
]
# XSS检测模式
XSS_PATTERNS = [
(r'innerHTML\s*=\s*(?!.*sanitize)',
"XSS: 未经消毒直接设置innerHTML"),
(r'document\.write\(',
"XSS: 使用document.write()"),
(r'dangerouslySetInnerHTML',
"XSS: 使用dangerouslySetInnerHTML需确认已消毒"),
(r'v-html\s*=',
"XSS: Vue v-html指令可能导致XSS"),
]
# 敏感信息泄露模式
SECRET_PATTERNS = [
(r'(?:password|passwd|pwd)\s*=\s*["\'][^"\']+["\']',
"敏感信息: 硬编码密码"),
(r'(?:api_key|apikey|api-key)\s*=\s*["\'][^"\']+["\']',
"敏感信息: 硬编码API密钥"),
(r'(?:secret|token)\s*=\s*["\'][A-Za-z0-9+/=]{20,}["\']',
"敏感信息: 硬编码密钥/令牌"),
(r'(?:aws_access_key|aws_secret)\s*=\s*["\']',
"敏感信息: 硬编码AWS凭证"),
]
# 不安全认证模式
AUTH_PATTERNS = [
(r'verify\s*=\s*False',
"不安全认证: SSL验证被禁用"),
(r'(?:md5|sha1)\s*\(',
"弱加密: MD5/SHA1不适合密码哈希"),
(r'JWT.*(?:none|None)',
"JWT: 使用none算法绕过签名验证"),
]
def scan_file(self, file_path: str, content: str) -> List[SecurityFinding]:
"""扫描单个文件的安全问题"""
findings = []
lines = content.split('\n')
all_patterns = [
(self.SQL_INJECTION_PATTERNS, "SQL Injection", "CWE-89"),
(self.XSS_PATTERNS, "Cross-Site Scripting", "CWE-79"),
(self.SECRET_PATTERNS, "Sensitive Data Exposure", "CWE-200"),
(self.AUTH_PATTERNS, "Broken Authentication", "CWE-287"),
]
for patterns, vuln_type, cwe_id in all_patterns:
for line_num, line in enumerate(lines, 1):
for pattern, description in patterns:
if re.search(pattern, line, re.IGNORECASE):
finding = SecurityFinding(
vulnerability_type=vuln_type,
severity=self._assess_severity(vuln_type),
file_path=file_path,
line_number=line_num,
code_line=line.strip(),
description=description,
remediation=self._get_remediation(vuln_type),
cwe_id=cwe_id
)
findings.append(finding)
return findings
def _assess_severity(self, vuln_type: str) -> str:
"""评估漏洞严重程度"""
severity_map = {
"SQL Injection": "critical",
"Cross-Site Scripting": "high",
"Sensitive Data Exposure": "critical",
"Broken Authentication": "high",
}
return severity_map.get(vuln_type, "medium")
def _get_remediation(self, vuln_type: str) -> str:
"""获取修复建议"""
remediation_map = {
"SQL Injection": "使用参数化查询(PreparedStatement)或ORM。"
"永远不要拼接用户输入到SQL字符串中。",
"Cross-Site Scripting": "对用户输入进行HTML编码/转义。"
"使用CSP(内容安全策略)头部。",
"Sensitive Data Exposure": "将敏感信息存储在环境变量中。"
"使用密钥管理服务(如AWS Secrets Manager)。",
"Broken Authentication": "使用bcrypt/argon2进行密码哈希。"
"启用SSL证书验证。",
}
return remediation_map.get(vuln_type, "请参考OWASP安全指南。")
def generate_report(self, findings: List[SecurityFinding]) -> str:
"""生成安全扫描报告"""
if not findings:
return "安全扫描完成: 未发现安全问题。"
critical = [f for f in findings if f.severity == "critical"]
high = [f for f in findings if f.severity == "high"]
medium = [f for f in findings if f.severity == "medium"]
report = []
report.append("=" * 60)
report.append(" AI 安全漏洞扫描报告")
report.append("=" * 60)
report.append(f"扫描结果: 发现 {len(findings)} 个安全问题")
report.append(f" - 严重 (Critical): {len(critical)}")
report.append(f" - 高危 (High): {len(high)}")
report.append(f" - 中危 (Medium): {len(medium)}")
report.append("")
for finding in sorted(findings,
key=lambda f: {"critical": 0, "high": 1,
"medium": 2}.get(f.severity, 3)):
report.append(f"[{finding.severity.upper()}] {finding.description}")
report.append(f" 文件: {finding.file_path}:{finding.line_number}")
report.append(f" CWE: {finding.cwe_id}")
report.append(f" 代码: {finding.code_line}")
report.append(f" 修复: {finding.remediation}")
report.append("")
return '\n'.join(report)
# ============================================================
# 使用示例
# ============================================================
# 模拟有安全问题的代码
vulnerable_code = '''
import sqlite3
import hashlib
def get_user(username):
# SQL注入漏洞
query = f"SELECT * FROM users WHERE name = '{username}'"
db = sqlite3.connect('app.db')
return db.execute(query).fetchone()
def hash_password(password):
# 弱哈希算法
return hashlib.md5(password.encode()).hexdigest()
# 硬编码密码
DB_PASSWORD = "super_secret_123"
API_KEY = "sk-1234567890abcdef1234567890abcdef"
def fetch_api(url):
import requests
# 禁用SSL验证
return requests.get(url, verify=False)
'''
if __name__ == "__main__":
scanner = SecurityScanner()
findings = scanner.scan_file("app.py", vulnerable_code)
report = scanner.generate_report(findings)
print(report)安全审查Prompt模板
python
# ============================================================
# 专门用于安全审查的 GPT-4 Prompt 模板
# ============================================================
SECURITY_REVIEW_PROMPT = """
你是一位网络安全专家和代码审计师。请对以下代码进行安全审查。
## 审查清单 (必须逐项检查)
### 注入攻击
- [ ] SQL注入 (字符串拼接、未参数化查询)
- [ ] NoSQL注入 (MongoDB查询对象注入)
- [ ] OS命令注入 (subprocess、os.system)
- [ ] LDAP注入
- [ ] XPath注入
### 认证与授权
- [ ] 弱密码策略 (无长度/复杂度要求)
- [ ] 明文存储密码 (未使用bcrypt/argon2)
- [ ] JWT安全性 (算法、过期时间、密钥强度)
- [ ] 会话管理 (固定会话ID、无超时)
- [ ] 权限检查遗漏 (水平/垂直越权)
### 数据保护
- [ ] 敏感信息硬编码 (密码、API密钥、令牌)
- [ ] 日志中泄露敏感信息
- [ ] 响应中暴露内部信息 (堆栈跟踪、调试信息)
- [ ] 不安全的数据传输 (HTTP、未加密)
### 输入验证
- [ ] XSS (反射型、存储型、DOM型)
- [ ] 文件上传漏洞 (类型、大小、路径穿越)
- [ ] 路径穿越 (../../../etc/passwd)
- [ ] 开放重定向
### 配置安全
- [ ] 调试模式开启 (DEBUG=True)
- [ ] CORS配置过宽 (Access-Control-Allow-Origin: *)
- [ ] 缺少安全头部 (CSP、HSTS、X-Frame-Options)
- [ ] SSL/TLS配置 (证书验证、弱密码套件)
## 待审查代码
{code}
## 输出要求
对每个发现的问题, 请给出:
1. 漏洞类型和CWE编号
2. 严重程度 (Critical/High/Medium/Low)
3. 具体位置 (文件名:行号)
4. 问题描述
5. 攻击场景示例
6. 修复代码示例
"""性能问题识别与优化
性能分析维度
┌────────────────────────────────────────────────────────────────┐
│ AI 性能审查维度 │
├────────────────────────────────────────────────────────────────┤
│ │
│ ┌──────────────────────────────────────────────────────────┐ │
│ │ 1. 算法复杂度 │ │
│ │ ┌─────────────────────────────────────────────────┐ │ │
│ │ │ O(n^2) 嵌套循环 -> 考虑哈希表 O(n) │ │ │
│ │ │ O(n*log(n)) 排序 -> 检查是否必要 │ │ │
│ │ │ 递归无终止条件 -> 栈溢出风险 │ │ │
│ │ │ 重复计算 -> 缓存/记忆化 │ │ │
│ │ └─────────────────────────────────────────────────┘ │ │
│ └──────────────────────────────────────────────────────────┘ │
│ │
│ ┌──────────────────────────────────────────────────────────┐ │
│ │ 2. 数据库性能 │ │
│ │ ┌─────────────────────────────────────────────────┐ │ │
│ │ │ N+1 查询 -> 使用 JOIN/预加载 │ │ │
│ │ │ 缺少索引 -> 添加合适索引 │ │ │
│ │ │ SELECT * -> 只查询需要的列 │ │ │
│ │ │ 循环内查询 -> 批量查询替代 │ │ │
│ │ └─────────────────────────────────────────────────┘ │ │
│ └──────────────────────────────────────────────────────────┘ │
│ │
│ ┌──────────────────────────────────────────────────────────┐ │
│ │ 3. 内存与I/O │ │
│ │ ┌─────────────────────────────────────────────────┐ │ │
│ │ │ 大文件一次性读取 -> 流式处理/分块读取 │ │ │
│ │ │ 无限增长的列表 -> 限制大小/生成器 │ │ │
│ │ │ 同步阻塞I/O -> 异步/线程池 │ │ │
│ │ │ 未关闭的资源 -> with/try-finally │ │ │
│ │ └─────────────────────────────────────────────────┘ │ │
│ └──────────────────────────────────────────────────────────┘ │
│ │
│ ┌──────────────────────────────────────────────────────────┐ │
│ │ 4. 前端性能 │ │
│ │ ┌─────────────────────────────────────────────────┐ │ │
│ │ │ 不必要的重渲染 -> React.memo/useMemo │ │ │
│ │ │ 大列表无虚拟化 -> react-virtualized │ │ │
│ │ │ 未做代码分割 -> 动态 import() │ │ │
│ │ │ 图片未优化 -> WebP/懒加载 │ │ │
│ │ └─────────────────────────────────────────────────┘ │ │
│ └──────────────────────────────────────────────────────────┘ │
└────────────────────────────────────────────────────────────────┘性能问题检测示例
python
# ============================================================
# AI 性能审查: 发现的典型性能问题及修复方案
# ============================================================
# ----- 问题1: O(n^2) 嵌套循环 -----
# [有问题] 在列表中查找重复元素 - O(n^2)
def find_duplicates_slow(items: list) -> list:
duplicates = []
for i in range(len(items)):
for j in range(i + 1, len(items)):
if items[i] == items[j] and items[i] not in duplicates:
duplicates.append(items[i])
return duplicates
# [AI修复] 使用集合优化 - O(n)
def find_duplicates_fast(items: list) -> list:
seen = set()
duplicates = set()
for item in items:
if item in seen:
duplicates.add(item)
seen.add(item)
return list(duplicates)
# ----- 问题2: 循环内数据库查询 (N+1问题) -----
# [有问题] N+1 查询
def get_order_details_slow(order_ids: list):
orders = []
for order_id in order_ids: # N次查询
order = db.query("SELECT * FROM orders WHERE id = %s", (order_id,))
items = db.query("SELECT * FROM order_items WHERE order_id = %s", (order_id,))
order['items'] = items
orders.append(order)
return orders
# [AI修复] 批量查询
def get_order_details_fast(order_ids: list):
if not order_ids:
return []
# 1次查询获取所有订单
placeholders = ','.join(['%s'] * len(order_ids))
orders = db.query(
f"SELECT * FROM orders WHERE id IN ({placeholders})",
tuple(order_ids)
)
# 1次查询获取所有订单项
items = db.query(
f"SELECT * FROM order_items WHERE order_id IN ({placeholders})",
tuple(order_ids)
)
# 内存中组装
items_by_order = {}
for item in items:
items_by_order.setdefault(item['order_id'], []).append(item)
for order in orders:
order['items'] = items_by_order.get(order['id'], [])
return orders # 总共只需2次查询
# ----- 问题3: 大文件一次性读取 -----
# [有问题] 一次性读取整个大文件到内存
def process_large_file_slow(file_path: str) -> int:
with open(file_path, 'r') as f:
content = f.read() # 可能消耗GB级内存
lines = content.split('\n')
return sum(1 for line in lines if 'ERROR' in line)
# [AI修复] 流式逐行处理
def process_large_file_fast(file_path: str) -> int:
error_count = 0
with open(file_path, 'r') as f:
for line in f: # 迭代器,逐行读取,内存恒定
if 'ERROR' in line:
error_count += 1
return error_count
# ----- 问题4: 缓存缺失 -----
# [有问题] 重复计算,无缓存
def get_user_permissions_slow(user_id: int) -> list:
"""每次调用都查询数据库"""
user = db.query("SELECT * FROM users WHERE id = %s", (user_id,))
roles = db.query("SELECT * FROM user_roles WHERE user_id = %s", (user_id,))
permissions = []
for role in roles:
perms = db.query("SELECT * FROM role_permissions WHERE role_id = %s",
(role['id'],))
permissions.extend(perms)
return permissions
# [AI修复] 添加缓存
from functools import lru_cache
import time
_permission_cache = {}
_cache_ttl = 300 # 5分钟过期
def get_user_permissions_fast(user_id: int) -> list:
"""带缓存的权限查询"""
cache_key = f"perms:{user_id}"
now = time.time()
# 检查缓存
if cache_key in _permission_cache:
cached_data, cached_at = _permission_cache[cache_key]
if now - cached_at < _cache_ttl:
return cached_data
# 缓存未命中,查询数据库(使用JOIN一次查完)
permissions = db.query("""
SELECT DISTINCT rp.permission
FROM user_roles ur
JOIN role_permissions rp ON ur.role_id = rp.role_id
WHERE ur.user_id = %s
""", (user_id,))
# 写入缓存
_permission_cache[cache_key] = (permissions, now)
return permissionsCI/CD流水线集成
GitHub Actions 集成
yaml
# ============================================================
# .github/workflows/ai-code-review.yml
# AI 代码审查 GitHub Actions 工作流
# ============================================================
name: AI Code Review
on:
pull_request:
types: [opened, synchronize, reopened]
branches: [main, develop]
permissions:
contents: read
pull-requests: write
jobs:
ai-review:
name: AI 代码审查
runs-on: ubuntu-latest
timeout-minutes: 10
steps:
- name: 检出代码
uses: actions/checkout@v4
with:
fetch-depth: 0 # 需要完整历史来做 diff
- name: 设置 Python 环境
uses: actions/setup-python@v5
with:
python-version: '3.11'
- name: 安装依赖
run: |
pip install openai requests
- name: 获取代码变更
id: diff
run: |
# 获取PR的代码差异
git diff origin/${{ github.base_ref }}...HEAD > diff.txt
echo "diff_size=$(wc -c < diff.txt)" >> $GITHUB_OUTPUT
- name: 运行 AI 代码审查
if: steps.diff.outputs.diff_size > 0
env:
OPENAI_API_KEY: ${{ secrets.OPENAI_API_KEY }}
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
run: |
python scripts/ai_review.py \
--diff-file diff.txt \
--output review_result.json \
--model gpt-4 \
--focus security,performance,quality
- name: 发布审查评论
if: steps.diff.outputs.diff_size > 0
uses: actions/github-script@v7
with:
script: |
const fs = require('fs');
const result = JSON.parse(fs.readFileSync('review_result.json', 'utf8'));
let body = `## AI 代码审查报告\n\n`;
body += `**评分**: ${result.overall_score}/100\n`;
body += `**摘要**: ${result.summary}\n\n`;
if (result.issues.length > 0) {
body += `### 发现的问题 (${result.issues.length})\n\n`;
for (const issue of result.issues) {
const icon = {
critical: '🔴',
high: '🟠',
medium: '🟡',
low: '🔵',
info: '⚪'
}[issue.severity];
body += `${icon} **[${issue.severity.toUpperCase()}]** ${issue.title}\n`;
body += `- 文件: \`${issue.file}:${issue.line}\`\n`;
body += `- 描述: ${issue.description}\n`;
body += `- 建议: ${issue.suggestion}\n\n`;
}
} else {
body += `未发现明显问题。\n`;
}
body += `\n---\n*由 AI 代码审查引擎自动生成*`;
await github.rest.issues.createComment({
owner: context.repo.owner,
repo: context.repo.repo,
issue_number: context.issue.number,
body: body
});
- name: 检查是否有严重问题
if: steps.diff.outputs.diff_size > 0
run: |
# 如果有 critical 级别的问题, 阻止合并
python -c "
import json
with open('review_result.json') as f:
result = json.load(f)
critical = [i for i in result['issues'] if i['severity'] == 'critical']
if critical:
print(f'发现 {len(critical)} 个严重问题, 阻止合并')
for c in critical:
print(f' - {c[\"title\"]}: {c[\"file\"]}:{c[\"line\"]}')
exit(1)
print('无严重安全问题')
"GitLab CI 集成
yaml
# ============================================================
# .gitlab-ci.yml - GitLab CI AI 审查配置
# ============================================================
ai-code-review:
stage: review
image: python:3.11-slim
rules:
- if: $CI_PIPELINE_SOURCE == "merge_request_event"
variables:
GIT_DEPTH: 0
before_script:
- pip install openai requests python-gitlab
script:
# 获取MR的代码差异
- git diff origin/$CI_MERGE_REQUEST_TARGET_BRANCH_NAME...HEAD > diff.txt
# 运行AI审查
- python scripts/ai_review.py
--diff-file diff.txt
--output review_result.json
--model gpt-4
# 将审查结果发布为MR评论
- python scripts/post_gitlab_comment.py
--project-id $CI_PROJECT_ID
--mr-iid $CI_MERGE_REQUEST_IID
--result-file review_result.json
artifacts:
reports:
codequality: review_result.json
expire_in: 7 days审查脚本实现
python
# ============================================================
# scripts/ai_review.py - CI/CD中使用的审查脚本
# ============================================================
import argparse
import json
import os
import sys
# 此处复用前面的 AICodeReviewer 类
# from ai_reviewer import AICodeReviewer
def main():
parser = argparse.ArgumentParser(description="AI 代码审查")
parser.add_argument("--diff-file", required=True, help="diff文件路径")
parser.add_argument("--output", required=True, help="输出结果文件路径")
parser.add_argument("--model", default="gpt-4", help="使用的AI模型")
parser.add_argument("--focus", default="security,performance,quality",
help="审查重点(逗号分隔)")
parser.add_argument("--threshold", type=int, default=60,
help="最低通过分数(0-100)")
args = parser.parse_args()
# 读取diff
with open(args.diff_file, 'r', encoding='utf-8') as f:
diff_content = f.read()
if not diff_content.strip():
print("没有代码变更, 跳过审查")
result = {"overall_score": 100, "summary": "无代码变更", "issues": []}
with open(args.output, 'w') as f:
json.dump(result, f, ensure_ascii=False, indent=2)
return
# 执行审查
focus_areas = args.focus.split(',')
print(f"开始AI代码审查...")
print(f" 模型: {args.model}")
print(f" 审查重点: {focus_areas}")
print(f" diff大小: {len(diff_content)} 字符")
# reviewer = AICodeReviewer(model=args.model)
# result = reviewer.review_diff(diff_content, focus_areas)
# 模拟结果 (实际项目替换为API调用)
result = {
"overall_score": 78,
"summary": "代码质量良好,发现1个中等安全问题",
"issues": [
{
"file": "src/api/handler.py",
"line": 23,
"severity": "medium",
"category": "security",
"title": "未验证用户输入长度",
"description": "用户名参数没有长度限制,可能导致缓冲区问题",
"suggestion": "添加最大长度验证: if len(username) > 50: raise ValueError()"
}
]
}
# 写入结果
with open(args.output, 'w', encoding='utf-8') as f:
json.dump(result, f, ensure_ascii=False, indent=2)
# 检查是否通过
print(f"\n审查完成:")
print(f" 评分: {result['overall_score']}/100")
print(f" 问题数: {len(result['issues'])}")
print(f" 摘要: {result['summary']}")
if result['overall_score'] < args.threshold:
print(f"\n审查未通过! 评分 {result['overall_score']} < 阈值 {args.threshold}")
sys.exit(1)
else:
print(f"\n审查通过!")
if __name__ == "__main__":
main()自动化审查工具搭建
完整审查流水线
┌────────────────────────────────────────────────────────────────┐
│ 完整的 AI 审查流水线架构 │
├────────────────────────────────────────────────────────────────┤
│ │
│ ┌──────────────┐ │
│ │ PR / MR 创建 │ │
│ └──────┬───────┘ │
│ v │
│ ┌──────────────┐ ┌──────────────┐ ┌────────────────┐ │
│ │ 静态分析 │──>│ AI 语义分析 │──>│ 结果聚合 │ │
│ │ - ESLint │ │ - GPT-4 │ │ - 去重 │ │
│ │ - Pylint │ │ - Claude │ │ - 排序 │ │
│ │ - SonarQube │ │ - 安全检查 │ │ - 评分 │ │
│ └──────────────┘ └──────────────┘ └───────┬────────┘ │
│ │ │
│ v │
│ ┌──────────────────────────────────────────────────────────┐ │
│ │ 输出分发 │ │
│ │ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌─────────┐ │ │
│ │ │ PR 评论 │ │ Slack │ │ 邮件通知 │ │ 仪表板 │ │ │
│ │ └──────────┘ └──────────┘ └──────────┘ └─────────┘ │ │
│ └──────────────────────────────────────────────────────────┘ │
│ │
│ 决策规则: │
│ ┌──────────────────────────────────────────────────────────┐ │
│ │ Critical 问题 -> 阻止合并 + 通知安全团队 │ │
│ │ High 问题 -> 标记为需修复 + 通知作者 │ │
│ │ Medium 问题 -> 建议修复 + PR评论 │ │
│ │ Low/Info -> 信息提示 + PR评论 │ │
│ └──────────────────────────────────────────────────────────┘ │
└────────────────────────────────────────────────────────────────┘审查报告生成
审查报告模板
python
# ============================================================
# 审查报告生成器 - 生成结构化的审查报告
# ============================================================
from datetime import datetime
from typing import List
class ReviewReportGenerator:
"""审查报告生成器"""
def generate_markdown_report(self, result: dict,
pr_info: dict = None) -> str:
"""生成 Markdown 格式的审查报告"""
now = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
report = []
report.append("# AI 代码审查报告")
report.append(f"\n**生成时间**: {now}")
if pr_info:
report.append(f"**PR标题**: {pr_info.get('title', 'N/A')}")
report.append(f"**作者**: {pr_info.get('author', 'N/A')}")
report.append(f"**分支**: {pr_info.get('branch', 'N/A')}")
report.append(f"\n## 总体评估")
report.append(f"\n| 指标 | 值 |")
report.append(f"|------|-----|")
report.append(f"| 总评分 | **{result['overall_score']}**/100 |")
report.append(f"| 审查文件数 | {result.get('files_reviewed', 'N/A')} |")
report.append(f"| 发现问题数 | {len(result.get('issues', []))} |")
# 按严重程度统计
issues = result.get('issues', [])
severity_counts = {}
for issue in issues:
sev = issue.get('severity', 'unknown')
severity_counts[sev] = severity_counts.get(sev, 0) + 1
report.append(f"| Critical | {severity_counts.get('critical', 0)} |")
report.append(f"| High | {severity_counts.get('high', 0)} |")
report.append(f"| Medium | {severity_counts.get('medium', 0)} |")
report.append(f"| Low | {severity_counts.get('low', 0)} |")
report.append(f"\n**摘要**: {result.get('summary', '无')}")
# 问题详情
if issues:
report.append(f"\n## 问题详情\n")
for i, issue in enumerate(issues, 1):
severity_badge = {
"critical": "CRITICAL",
"high": "HIGH",
"medium": "MEDIUM",
"low": "LOW"
}.get(issue.get('severity', ''), 'UNKNOWN')
report.append(f"### {i}. [{severity_badge}] {issue.get('title', '')}")
report.append(f"- **文件**: `{issue.get('file', '')}:{issue.get('line', '')}`")
report.append(f"- **分类**: {issue.get('category', '')}")
report.append(f"- **描述**: {issue.get('description', '')}")
report.append(f"- **建议**: {issue.get('suggestion', '')}")
if issue.get('code_snippet'):
report.append(f"\n```\n{issue['code_snippet']}\n```\n")
else:
report.append(f"\n## 审查结果\n")
report.append("未发现需要关注的问题。代码质量良好!")
report.append(f"\n---")
report.append(f"*此报告由 AI 代码审查引擎自动生成*")
return '\n'.join(report)
# 使用示例
if __name__ == "__main__":
generator = ReviewReportGenerator()
result = {
"overall_score": 72,
"files_reviewed": 5,
"summary": "发现2个安全问题需要修复",
"issues": [
{
"file": "src/api.py",
"line": 45,
"severity": "critical",
"category": "security",
"title": "SQL注入风险",
"description": "使用f-string拼接SQL查询",
"suggestion": "改用参数化查询",
"code_snippet": "query = f\"SELECT * FROM users WHERE id = '{user_id}'\""
},
{
"file": "src/auth.py",
"line": 12,
"severity": "high",
"category": "security",
"title": "弱密码哈希",
"description": "使用MD5进行密码哈希",
"suggestion": "改用bcrypt或argon2"
}
]
}
pr_info = {
"title": "feat: 添加用户管理模块",
"author": "developer",
"branch": "feature/user-management"
}
report = generator.generate_markdown_report(result, pr_info)
print(report)最佳实践与团队协作
AI审查集成策略
┌────────────────────────────────────────────────────────────────┐
│ 团队AI代码审查最佳实践 │
├────────────────────────────────────────────────────────────────┤
│ │
│ 原则1: AI辅助, 人工终审 │
│ ┌──────────────────────────────────────────────────────────┐ │
│ │ - AI 审查结果作为参考, 不是最终判断 │ │
│ │ - 人工审查者必须确认 AI 标记的问题 │ │
│ │ - 排除 AI 的误报(False Positive) │ │
│ │ - AI 不理解的业务上下文由人工补充 │ │
│ └──────────────────────────────────────────────────────────┘ │
│ │
│ 原则2: 渐进式采纳 │
│ ┌──────────────────────────────────────────────────────────┐ │
│ │ 阶段1: 只做安全扫描 (高确定性, 低误报) │ │
│ │ 阶段2: 添加性能检查 │ │
│ │ 阶段3: 添加代码质量评估 │ │
│ │ 阶段4: 添加架构建议 (需要团队磨合) │ │
│ └──────────────────────────────────────────────────────────┘ │
│ │
│ 原则3: 持续优化审查规则 │
│ ┌──────────────────────────────────────────────────────────┐ │
│ │ - 记录误报, 定期调整检测规则 │ │
│ │ - 收集团队反馈, 优化 Prompt │ │
│ │ - 根据项目特点定制审查清单 │ │
│ │ - 建立团队级别的 .reviewrc 配置文件 │ │
│ └──────────────────────────────────────────────────────────┘ │
│ │
│ 原则4: 成本控制 │
│ ┌──────────────────────────────────────────────────────────┐ │
│ │ - 只审查变更的代码, 不扫描整个代码库 │ │
│ │ - 大PR分批提交, 控制每次审查的 token 数 │ │
│ │ - 使用缓存避免重复审查相同代码 │ │
│ │ - 按优先级选择模型 (安全用GPT-4, 风格用GPT-3.5) │ │
│ └──────────────────────────────────────────────────────────┘ │
│ │
│ 审查配置文件示例 (.ai-review.yml): │
│ ┌──────────────────────────────────────────────────────────┐ │
│ │ model: gpt-4 │ │
│ │ focus: │ │
│ │ - security (weight: 40%) │ │
│ │ - performance (weight: 25%) │ │
│ │ - quality (weight: 20%) │ │
│ │ - maintainability (weight: 15%) │ │
│ │ threshold: 70 │ │
│ │ block_on: [critical] │ │
│ │ ignore_patterns: │ │
│ │ - "*.test.*" │ │
│ │ - "*.spec.*" │ │
│ │ - "migrations/*" │ │
│ └──────────────────────────────────────────────────────────┘ │
└────────────────────────────────────────────────────────────────┘总结
本教程涵盖了 AI 代码审查的核心内容:
- 审查概述: AI审查与人工审查的互补关系,最佳组合策略
- GPT-4分析: 完整的审查引擎实现,多维度代码分析
- 安全检测: OWASP Top 10 检测,SQL注入/XSS/敏感信息泄露自动识别
- 性能优化: N+1查询、算法复杂度、内存泄漏等性能反模式检测
- CI/CD集成: GitHub Actions 和 GitLab CI 完整配置
- 工具搭建: 端到端的自动化审查流水线搭建
- 团队实践: 渐进式采纳策略、成本控制、误报管理
参考资源
创建时间: 2024-01-15 最后更新: 2024-01-15
💬 讨论
使用 GitHub 账号登录后即可参与讨论