Skip to content

API 网关认证授权

目录


认证授权概述

认证 vs 授权

┌─────────────────────────────────────────────┐
│        认证(Authentication) vs              │
│        授权(Authorization)                   │
├─────────────────────────────────────────────┤
│                                             │
│  认证 (Authentication)                      │
│  ┌─────────────────────────────────────┐   │
│  │  "你是谁?"                          │   │
│  │  ├─ 验证用户身份                    │   │
│  │  ├─ 用户名/密码                     │   │
│  │  ├─ Token (JWT)                     │   │
│  │  ├─ OAuth2                          │   │
│  │  └─ 多因素认证 (MFA)                │   │
│  └─────────────────────────────────────┘   │
│                                             │
│  授权 (Authorization)                       │
│  ┌─────────────────────────────────────┐   │
│  │  "你能做什么?"                      │   │
│  │  ├─ 验证访问权限                    │   │
│  │  ├─ RBAC (基于角色)                 │   │
│  │  ├─ ABAC (基于属性)                 │   │
│  │  ├─ ACL (访问控制列表)              │   │
│  │  └─ Scope (OAuth2)                  │   │
│  └─────────────────────────────────────┘   │
└─────────────────────────────────────────────┘

完整认证流程

      API 网关认证授权流程
┌─────────────────────────────────────┐
│  1. 客户端请求                      │
│     POST /api/v1/users              │
│     Authorization: Bearer <token>   │
└──────────────┬──────────────────────┘


┌──────────────────────────────────────┐
│  2. API Gateway 接收                 │
│     提取 Token                       │
└──────────────┬───────────────────────┘


┌──────────────────────────────────────┐
│  3. Token 验证                       │
│     ├─ 签名验证                      │
│     ├─ 过期检查                      │
│     └─ 格式校验                      │
└──────────────┬───────────────────────┘

          ┌────┴────┐
          │         │
        失败       成功
          │         │
          │         ▼
          │    ┌────────────────────────┐
          │    │  4. 提取用户信息       │
          │    │     user_id, roles等   │
          │    └────┬───────────────────┘
          │         │
          │         ▼
          │    ┌────────────────────────┐
          │    │  5. 权限检查 (RBAC)    │
          │    │     检查 roles/scopes  │
          │    └────┬───────────────────┘
          │         │
          │    ┌────┴────┐
          │    │         │
          │   拒绝      允许
          │    │         │
          ▼    ▼         ▼
┌────────────────┐  ┌───────────────────┐
│  6. 返回错误   │  │  7. 转发到后端    │
│     401/403    │  │     添加 Headers  │
└────────────────┘  └───────────────────┘

JWT 认证

JWT 结构

┌──────────────────────────────────────────┐
│            JWT Token 结构                │
├──────────────────────────────────────────┤
│                                          │
│  Header.Payload.Signature                │
│                                          │
│  ┌────────────────────────────────────┐ │
│  │  Header (Base64)                   │ │
│  │  {                                 │ │
│  │    "alg": "HS256",                 │ │
│  │    "typ": "JWT"                    │ │
│  │  }                                 │ │
│  └────────────────────────────────────┘ │
│                   +                      │
│  ┌────────────────────────────────────┐ │
│  │  Payload (Base64)                  │ │
│  │  {                                 │ │
│  │    "sub": "user123",               │ │
│  │    "name": "John Doe",             │ │
│  │    "iat": 1516239022,              │ │
│  │    "exp": 1516242622,              │ │
│  │    "roles": ["admin", "user"]      │ │
│  │  }                                 │ │
│  └────────────────────────────────────┘ │
│                   +                      │
│  ┌────────────────────────────────────┐ │
│  │  Signature                         │ │
│  │  HMACSHA256(                       │ │
│  │    base64(header) + "." +          │ │
│  │    base64(payload),                │ │
│  │    secret                          │ │
│  │  )                                 │ │
│  └────────────────────────────────────┘ │
└──────────────────────────────────────────┘

JWT 生成与验证 (Python)

python
# jwt_handler.py - JWT 生成与验证
import jwt
import datetime
from typing import Dict, Optional
from functools import wraps
from flask import request, jsonify

class JWTHandler:
    def __init__(self, secret_key: str, algorithm: str = "HS256"):
        self.secret_key = secret_key
        self.algorithm = algorithm

    def generate_token(
        self,
        user_id: str,
        username: str,
        roles: list,
        expires_in: int = 3600
    ) -> str:
        """生成 JWT Token"""
        now = datetime.datetime.utcnow()

        payload = {
            # 标准声明
            "iss": "api-gateway",           # 签发者
            "sub": user_id,                 # 主题 (用户ID)
            "aud": "api",                   # 接收方
            "iat": now,                     # 签发时间
            "exp": now + datetime.timedelta(seconds=expires_in),  # 过期时间
            "nbf": now,                     # 生效时间

            # 自定义声明
            "username": username,
            "roles": roles,
            "permissions": self._get_permissions(roles)
        }

        token = jwt.encode(payload, self.secret_key, algorithm=self.algorithm)
        return token

    def verify_token(self, token: str) -> Optional[Dict]:
        """验证 JWT Token"""
        try:
            payload = jwt.decode(
                token,
                self.secret_key,
                algorithms=[self.algorithm],
                audience="api",
                issuer="api-gateway"
            )
            return payload

        except jwt.ExpiredSignatureError:
            return {"error": "Token has expired"}
        except jwt.InvalidTokenError as e:
            return {"error": f"Invalid token: {str(e)}"}

    def _get_permissions(self, roles: list) -> list:
        """根据角色获取权限"""
        permission_map = {
            "admin": ["read", "write", "delete", "manage"],
            "user": ["read", "write"],
            "guest": ["read"]
        }

        permissions = set()
        for role in roles:
            permissions.update(permission_map.get(role, []))

        return list(permissions)

    def refresh_token(self, old_token: str) -> Optional[str]:
        """刷新 Token"""
        payload = self.verify_token(old_token)

        if "error" in payload:
            return None

        # 生成新 Token
        return self.generate_token(
            user_id=payload["sub"],
            username=payload["username"],
            roles=payload["roles"]
        )


# Flask 装饰器
def require_auth(roles: list = None):
    """认证装饰器"""
    def decorator(f):
        @wraps(f)
        def decorated_function(*args, **kwargs):
            auth_header = request.headers.get("Authorization")

            if not auth_header or not auth_header.startswith("Bearer "):
                return jsonify({"error": "Missing or invalid token"}), 401

            token = auth_header.split(" ")[1]
            jwt_handler = JWTHandler("your-secret-key")

            payload = jwt_handler.verify_token(token)

            if "error" in payload:
                return jsonify(payload), 401

            # 角色检查
            if roles:
                user_roles = payload.get("roles", [])
                if not any(role in user_roles for role in roles):
                    return jsonify({"error": "Insufficient permissions"}), 403

            # 将用户信息添加到请求上下文
            request.user = payload

            return f(*args, **kwargs)

        return decorated_function
    return decorator


# 使用示例
from flask import Flask

app = Flask(__name__)
jwt_handler = JWTHandler("your-secret-key")

@app.route("/api/login", methods=["POST"])
def login():
    """登录接口"""
    data = request.json
    username = data.get("username")
    password = data.get("password")

    # 验证用户名密码 (示例)
    if username == "admin" and password == "admin123":
        token = jwt_handler.generate_token(
            user_id="user_001",
            username=username,
            roles=["admin", "user"]
        )

        return jsonify({
            "access_token": token,
            "token_type": "Bearer",
            "expires_in": 3600
        })

    return jsonify({"error": "Invalid credentials"}), 401


@app.route("/api/v1/users", methods=["GET"])
@require_auth(roles=["admin", "user"])
def get_users():
    """获取用户列表 (需要认证)"""
    user_info = request.user

    return jsonify({
        "current_user": user_info["username"],
        "roles": user_info["roles"],
        "users": [
            {"id": 1, "name": "User 1"},
            {"id": 2, "name": "User 2"}
        ]
    })


@app.route("/api/v1/admin", methods=["POST"])
@require_auth(roles=["admin"])
def admin_only():
    """管理员接口"""
    return jsonify({"message": "Admin access granted"})


if __name__ == "__main__":
    app.run(port=8080)

Kong JWT 插件配置

yaml
# Kong JWT 配置
plugins:
  - name: jwt
    config:
      uri_param_names:
        - jwt
      cookie_names:
        - jwt
      header_names:
        - Authorization
      claims_to_verify:
        - exp
        - nbf
      key_claim_name: iss
      secret_is_base64: false
      anonymous: null
      run_on_preflight: true
      maximum_expiration: 7200

consumers:
  - username: api-client-1
    custom_id: client_001

    jwt_secrets:
      # HS256 (对称加密)
      - key: api-gateway
        algorithm: HS256
        secret: your-256-bit-secret

  - username: api-client-2

    jwt_secrets:
      # RS256 (非对称加密)
      - key: api-gateway-rsa
        algorithm: RS256
        rsa_public_key: |
          -----BEGIN PUBLIC KEY-----
          MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEA1Z...
          -----END PUBLIC KEY-----

APISIX JWT 插件

bash
#!/bin/bash
# APISIX JWT 配置

ADMIN_API="http://localhost:9180"
API_KEY="your-api-key"

# 1. 创建 Consumer
curl -X PUT "${ADMIN_API}/apisix/admin/consumers" \
  -H "X-API-KEY: ${API_KEY}" \
  -d '{
    "username": "mobile-app",
    "desc": "移动端应用",
    "plugins": {
      "jwt-auth": {
        "key": "mobile-app-key",
        "secret": "my-secret-key",
        "algorithm": "HS256",
        "exp": 86400,
        "base64_secret": false
      }
    }
  }'

# 2. 在路由上启用 JWT
curl -X PUT "${ADMIN_API}/apisix/admin/routes/1" \
  -H "X-API-KEY: ${API_KEY}" \
  -d '{
    "uri": "/api/v1/users/*",
    "plugins": {
      "jwt-auth": {
        "header": "Authorization",
        "query": "jwt",
        "cookie": "jwt"
      }
    },
    "upstream": {
      "type": "roundrobin",
      "nodes": {
        "user-service:8080": 1
      }
    }
  }'

# 3. 获取 Token
curl -X GET "http://localhost:9080/apisix/plugin/jwt/sign?key=mobile-app-key" \
  -H "Content-Type: application/json"

OAuth2 实现

OAuth2 授权流程

        OAuth2 授权码模式流程
┌─────────────────────────────────────────┐
│  1. 用户访问客户端应用                  │
│     User -> Client App                  │
└──────────────┬──────────────────────────┘


┌──────────────────────────────────────────┐
│  2. 重定向到授权服务器                   │
│     GET /oauth/authorize?                │
│       client_id=xxx&                     │
│       redirect_uri=xxx&                  │
│       response_type=code&                │
│       scope=read+write                   │
└──────────────┬───────────────────────────┘


┌──────────────────────────────────────────┐
│  3. 用户登录并授权                       │
│     Authorization Server                 │
│     输入用户名密码,选择授权范围          │
└──────────────┬───────────────────────────┘


┌──────────────────────────────────────────┐
│  4. 返回授权码                           │
│     302 Redirect                         │
│     Location: callback?code=AUTH_CODE    │
└──────────────┬───────────────────────────┘


┌──────────────────────────────────────────┐
│  5. 使用授权码换取 Token                 │
│     POST /oauth/token                    │
│     {                                    │
│       "grant_type": "authorization_code",│
│       "code": "AUTH_CODE",               │
│       "client_id": "xxx",                │
│       "client_secret": "xxx",            │
│       "redirect_uri": "xxx"              │
│     }                                    │
└──────────────┬───────────────────────────┘


┌──────────────────────────────────────────┐
│  6. 返回访问令牌                         │
│     {                                    │
│       "access_token": "ACCESS_TOKEN",    │
│       "token_type": "Bearer",            │
│       "expires_in": 3600,                │
│       "refresh_token": "REFRESH_TOKEN",  │
│       "scope": "read write"              │
│     }                                    │
└──────────────┬───────────────────────────┘


┌──────────────────────────────────────────┐
│  7. 使用 Access Token 访问 API           │
│     GET /api/v1/users                    │
│     Authorization: Bearer ACCESS_TOKEN   │
└──────────────────────────────────────────┘

OAuth2 服务器实现 (Python)

python
# oauth2_server.py - OAuth2 授权服务器
from flask import Flask, request, jsonify, redirect
from authlib.integrations.flask_oauth2 import AuthorizationServer, ResourceProtector
from authlib.integrations.sqla_oauth2 import (
    create_query_client_func,
    create_save_token_func,
    create_bearer_token_validator
)
from authlib.oauth2.rfc6749 import grants
import secrets
import time

app = Flask(__name__)
app.config['SECRET_KEY'] = 'your-secret-key'

# 简化示例,实际应使用数据库
class Client:
    def __init__(self, client_id, client_secret, redirect_uris, scopes):
        self.client_id = client_id
        self.client_secret = client_secret
        self.redirect_uris = redirect_uris
        self.scope = scopes
        self.grant_types = ['authorization_code', 'refresh_token']
        self.response_types = ['code']
        self.token_endpoint_auth_method = 'client_secret_post'

    def check_redirect_uri(self, redirect_uri):
        return redirect_uri in self.redirect_uris

    def check_client_secret(self, client_secret):
        return self.client_secret == client_secret

    def check_response_type(self, response_type):
        return response_type in self.response_types

    def check_grant_type(self, grant_type):
        return grant_type in self.grant_types

# 存储客户端和令牌
clients = {
    "mobile-app": Client(
        client_id="mobile-app",
        client_secret="mobile-secret",
        redirect_uris=["https://app.example.com/callback"],
        scopes="read write"
    )
}

authorization_codes = {}
access_tokens = {}
refresh_tokens = {}

# 授权码授权
class AuthorizationCodeGrant(grants.AuthorizationCodeGrant):
    def save_authorization_code(self, code, request):
        """保存授权码"""
        authorization_codes[code] = {
            "client_id": request.client.client_id,
            "redirect_uri": request.redirect_uri,
            "scope": request.scope,
            "user_id": request.user.id if hasattr(request, 'user') else "user_001",
            "code_challenge": request.data.get("code_challenge"),
            "code_challenge_method": request.data.get("code_challenge_method"),
            "expires_at": time.time() + 600  # 10分钟过期
        }

    def query_authorization_code(self, code, client):
        """查询授权码"""
        item = authorization_codes.get(code)
        if item and item["client_id"] == client.client_id:
            return item
        return None

    def delete_authorization_code(self, authorization_code):
        """删除授权码"""
        for code, item in list(authorization_codes.items()):
            if item == authorization_code:
                del authorization_codes[code]
                break

    def authenticate_user(self, authorization_code):
        """认证用户"""
        class User:
            def __init__(self, user_id):
                self.id = user_id

        return User(authorization_code.get("user_id"))

def query_client(client_id):
    return clients.get(client_id)

def save_token(token, request):
    """保存访问令牌"""
    access_token = token["access_token"]
    access_tokens[access_token] = {
        "client_id": request.client.client_id,
        "user_id": request.user.id if hasattr(request, 'user') else "user_001",
        "scope": token.get("scope"),
        "expires_at": time.time() + token["expires_in"]
    }

    if "refresh_token" in token:
        refresh_tokens[token["refresh_token"]] = {
            "client_id": request.client.client_id,
            "user_id": request.user.id if hasattr(request, 'user') else "user_001",
            "scope": token.get("scope")
        }

# 初始化授权服务器
server = AuthorizationServer()
server.register_grant(AuthorizationCodeGrant)

@app.route("/oauth/authorize", methods=["GET", "POST"])
def authorize():
    """授权端点"""
    if request.method == "GET":
        # 显示授权页面
        client_id = request.args.get("client_id")
        scope = request.args.get("scope")

        return f"""
        <h2>授权请求</h2>
        <p>应用 {client_id} 请求以下权限:</p>
        <ul>
            <li>{scope}</li>
        </ul>
        <form method="POST">
            <input type="hidden" name="client_id" value="{client_id}">
            <input type="hidden" name="scope" value="{scope}">
            <button type="submit" name="confirm" value="yes">同意</button>
            <button type="submit" name="confirm" value="no">拒绝</button>
        </form>
        """

    # 用户确认授权
    if request.form.get("confirm") == "yes":
        # 生成授权码
        code = secrets.token_urlsafe(32)
        client_id = request.form.get("client_id")
        redirect_uri = request.args.get("redirect_uri")

        grant = AuthorizationCodeGrant()
        grant.save_authorization_code(code, type('obj', (object,), {
            'client': clients[client_id],
            'redirect_uri': redirect_uri,
            'scope': request.form.get("scope"),
            'data': request.args
        }))

        return redirect(f"{redirect_uri}?code={code}")

    return jsonify({"error": "access_denied"}), 403


@app.route("/oauth/token", methods=["POST"])
def issue_token():
    """令牌端点"""
    grant_type = request.form.get("grant_type")

    if grant_type == "authorization_code":
        code = request.form.get("code")
        client_id = request.form.get("client_id")
        client_secret = request.form.get("client_secret")

        # 验证客户端
        client = clients.get(client_id)
        if not client or not client.check_client_secret(client_secret):
            return jsonify({"error": "invalid_client"}), 401

        # 验证授权码
        auth_code = authorization_codes.get(code)
        if not auth_code or auth_code["client_id"] != client_id:
            return jsonify({"error": "invalid_grant"}), 400

        # 检查过期
        if auth_code["expires_at"] < time.time():
            return jsonify({"error": "expired_token"}), 400

        # 生成访问令牌
        access_token = secrets.token_urlsafe(32)
        refresh_token = secrets.token_urlsafe(32)

        token_data = {
            "access_token": access_token,
            "token_type": "Bearer",
            "expires_in": 3600,
            "refresh_token": refresh_token,
            "scope": auth_code["scope"]
        }

        save_token(token_data, type('obj', (object,), {
            'client': client,
            'user': type('user', (object,), {'id': auth_code["user_id"]})
        }))

        # 删除授权码
        del authorization_codes[code]

        return jsonify(token_data)

    elif grant_type == "refresh_token":
        refresh_token = request.form.get("refresh_token")
        client_id = request.form.get("client_id")

        # 验证刷新令牌
        token_info = refresh_tokens.get(refresh_token)
        if not token_info or token_info["client_id"] != client_id:
            return jsonify({"error": "invalid_grant"}), 400

        # 生成新的访问令牌
        new_access_token = secrets.token_urlsafe(32)

        token_data = {
            "access_token": new_access_token,
            "token_type": "Bearer",
            "expires_in": 3600,
            "scope": token_info["scope"]
        }

        access_tokens[new_access_token] = {
            "client_id": client_id,
            "user_id": token_info["user_id"],
            "scope": token_info["scope"],
            "expires_at": time.time() + 3600
        }

        return jsonify(token_data)

    return jsonify({"error": "unsupported_grant_type"}), 400


@app.route("/api/userinfo", methods=["GET"])
def userinfo():
    """用户信息端点"""
    auth_header = request.headers.get("Authorization")

    if not auth_header or not auth_header.startswith("Bearer "):
        return jsonify({"error": "invalid_token"}), 401

    access_token = auth_header.split(" ")[1]
    token_info = access_tokens.get(access_token)

    if not token_info:
        return jsonify({"error": "invalid_token"}), 401

    if token_info["expires_at"] < time.time():
        return jsonify({"error": "expired_token"}), 401

    return jsonify({
        "sub": token_info["user_id"],
        "name": "John Doe",
        "email": "[email protected]",
        "scope": token_info["scope"]
    })


if __name__ == "__main__":
    app.run(port=9000)

RBAC 权限控制

RBAC 模型

        RBAC (Role-Based Access Control)
┌──────────────────────────────────────────┐
│                                          │
│  User ───▶ Role ───▶ Permission         │
│   │         │          │                 │
│   │         │          │                 │
│  用户     角色       权限                │
│                                          │
│  ┌────────────────────────────────────┐ │
│  │  User: [email protected]            │ │
│  │  ├─ Role: admin                    │ │
│  │  └─ Role: user                     │ │
│  └────────────────────────────────────┘ │
│                │                         │
│                ▼                         │
│  ┌────────────────────────────────────┐ │
│  │  Role: admin                       │ │
│  │  ├─ Permission: user:read          │ │
│  │  ├─ Permission: user:write         │ │
│  │  ├─ Permission: user:delete        │ │
│  │  └─ Permission: system:manage      │ │
│  └────────────────────────────────────┘ │
│                                          │
│  ┌────────────────────────────────────┐ │
│  │  Role: user                        │ │
│  │  ├─ Permission: user:read          │ │
│  │  └─ Permission: user:write         │ │
│  └────────────────────────────────────┘ │
└──────────────────────────────────────────┘

RBAC 实现 (Go)

go
// rbac.go - RBAC 权限系统
package main

import (
    "fmt"
    "strings"
)

// Permission 权限
type Permission struct {
    Resource string // 资源: user, order, product
    Action   string // 操作: read, write, delete
}

func (p Permission) String() string {
    return fmt.Sprintf("%s:%s", p.Resource, p.Action)
}

// Role 角色
type Role struct {
    Name        string
    Permissions []Permission
}

// User 用户
type User struct {
    ID    string
    Name  string
    Email string
    Roles []Role
}

// RBAC 权限控制器
type RBAC struct {
    roles map[string]*Role
    users map[string]*User
}

func NewRBAC() *RBAC {
    return &RBAC{
        roles: make(map[string]*Role),
        users: make(map[string]*User),
    }
}

// 添加角色
func (r *RBAC) AddRole(name string, permissions []Permission) {
    r.roles[name] = &Role{
        Name:        name,
        Permissions: permissions,
    }
}

// 添加用户
func (r *RBAC) AddUser(user *User) {
    r.users[user.ID] = user
}

// 给用户分配角色
func (r *RBAC) AssignRole(userID, roleName string) error {
    user, ok := r.users[userID]
    if !ok {
        return fmt.Errorf("user not found: %s", userID)
    }

    role, ok := r.roles[roleName]
    if !ok {
        return fmt.Errorf("role not found: %s", roleName)
    }

    user.Roles = append(user.Roles, *role)
    return nil
}

// 检查用户权限
func (r *RBAC) CheckPermission(userID string, required Permission) bool {
    user, ok := r.users[userID]
    if !ok {
        return false
    }

    for _, role := range user.Roles {
        for _, perm := range role.Permissions {
            if perm.Resource == required.Resource &&
               (perm.Action == required.Action || perm.Action == "*") {
                return true
            }
        }
    }

    return false
}

// 获取用户所有权限
func (r *RBAC) GetUserPermissions(userID string) []Permission {
    user, ok := r.users[userID]
    if !ok {
        return nil
    }

    permMap := make(map[string]Permission)

    for _, role := range user.Roles {
        for _, perm := range role.Permissions {
            key := perm.String()
            permMap[key] = perm
        }
    }

    permissions := make([]Permission, 0, len(permMap))
    for _, perm := range permMap {
        permissions = append(permissions, perm)
    }

    return permissions
}

func main() {
    rbac := NewRBAC()

    // 定义角色和权限
    rbac.AddRole("admin", []Permission{
        {Resource: "user", Action: "*"},
        {Resource: "order", Action: "*"},
        {Resource: "product", Action: "*"},
        {Resource: "system", Action: "manage"},
    })

    rbac.AddRole("user", []Permission{
        {Resource: "user", Action: "read"},
        {Resource: "user", Action: "write"},
        {Resource: "order", Action: "read"},
        {Resource: "order", Action: "write"},
    })

    rbac.AddRole("guest", []Permission{
        {Resource: "product", Action: "read"},
    })

    // 创建用户
    user1 := &User{
        ID:    "user_001",
        Name:  "Admin User",
        Email: "[email protected]",
    }

    rbac.AddUser(user1)
    rbac.AssignRole("user_001", "admin")

    // 权限检查
    canDelete := rbac.CheckPermission("user_001", Permission{
        Resource: "user",
        Action:   "delete",
    })

    fmt.Printf("Admin can delete user: %v\n", canDelete)

    // 获取所有权限
    permissions := rbac.GetUserPermissions("user_001")
    fmt.Println("User permissions:")
    for _, perm := range permissions {
        fmt.Printf("  - %s\n", perm)
    }
}

Kong RBAC 插件

yaml
# Kong ACL (Access Control List) 配置
plugins:
  - name: acl
    config:
      allow:
        - admin-group
        - premium-users
      deny:
        - blacklist
      hide_groups_header: false

consumers:
  - username: admin-user
    acls:
      - group: admin-group

  - username: premium-user
    acls:
      - group: premium-users

  - username: blocked-user
    acls:
      - group: blacklist

多租户认证

多租户架构

       多租户认证隔离模型
┌────────────────────────────────────┐
│  Tenant A (租户 A)                 │
│  ┌──────────────────────────────┐ │
│  │  Users:                      │ │
│  │  ├─ [email protected]       │ │
│  │  └─ [email protected]       │ │
│  │                              │ │
│  │  Resources:                  │ │
│  │  ├─ Database: tenant_a_db    │ │
│  │  └─ Storage: tenant-a-bucket │ │
│  └──────────────────────────────┘ │
└────────────────────────────────────┘

┌────────────────────────────────────┐
│  Tenant B (租户 B)                 │
│  ┌──────────────────────────────┐ │
│  │  Users:                      │ │
│  │  ├─ [email protected]       │ │
│  │  └─ [email protected]       │ │
│  │                              │ │
│  │  Resources:                  │ │
│  │  ├─ Database: tenant_b_db    │ │
│  │  └─ Storage: tenant-b-bucket │ │
│  └──────────────────────────────┘ │
└────────────────────────────────────┘

数据隔离策略:
1. 数据库隔离 (每个租户独立数据库)
2. Schema 隔离 (共享数据库,独立 Schema)
3. 行级隔离 (共享表,tenant_id 区分)

多租户认证实现

python
# multi_tenant_auth.py
from flask import Flask, request, jsonify
import jwt
from functools import wraps

app = Flask(__name__)

# 租户配置
TENANTS = {
    "tenant-a": {
        "name": "Tenant A",
        "secret_key": "tenant-a-secret",
        "database": "tenant_a_db",
        "features": ["feature1", "feature2"]
    },
    "tenant-b": {
        "name": "Tenant B",
        "secret_key": "tenant-b-secret",
        "database": "tenant_b_db",
        "features": ["feature1", "feature3"]
    }
}

def extract_tenant_id(request):
    """从请求中提取租户 ID"""
    # 方式 1: 从子域名提取
    host = request.host
    if "." in host:
        subdomain = host.split(".")[0]
        if subdomain in TENANTS:
            return subdomain

    # 方式 2: 从 Header 提取
    tenant_id = request.headers.get("X-Tenant-ID")
    if tenant_id and tenant_id in TENANTS:
        return tenant_id

    # 方式 3: 从 JWT 提取
    auth_header = request.headers.get("Authorization")
    if auth_header and auth_header.startswith("Bearer "):
        token = auth_header.split(" ")[1]
        try:
            # 先用通用密钥解码获取租户 ID
            payload = jwt.decode(token, options={"verify_signature": False})
            tenant_id = payload.get("tenant_id")
            if tenant_id in TENANTS:
                return tenant_id
        except:
            pass

    return None

def require_tenant_auth(f):
    """多租户认证装饰器"""
    @wraps(f)
    def decorated_function(*args, **kwargs):
        tenant_id = extract_tenant_id(request)

        if not tenant_id:
            return jsonify({"error": "Tenant not identified"}), 400

        tenant_config = TENANTS[tenant_id]

        auth_header = request.headers.get("Authorization")
        if not auth_header or not auth_header.startswith("Bearer "):
            return jsonify({"error": "Missing token"}), 401

        token = auth_header.split(" ")[1]

        try:
            # 使用租户专属密钥验证
            payload = jwt.decode(
                token,
                tenant_config["secret_key"],
                algorithms=["HS256"]
            )

            # 验证租户 ID
            if payload.get("tenant_id") != tenant_id:
                return jsonify({"error": "Invalid tenant"}), 403

            # 将租户信息添加到请求上下文
            request.tenant_id = tenant_id
            request.tenant_config = tenant_config
            request.user = payload

            return f(*args, **kwargs)

        except jwt.ExpiredSignatureError:
            return jsonify({"error": "Token expired"}), 401
        except jwt.InvalidTokenError:
            return jsonify({"error": "Invalid token"}), 401

    return decorated_function

@app.route("/api/v1/data", methods=["GET"])
@require_tenant_auth
def get_data():
    """获取租户数据"""
    tenant_id = request.tenant_id
    database = request.tenant_config["database"]

    # 从租户专属数据库获取数据
    # db = get_database(database)
    # data = db.query(...)

    return jsonify({
        "tenant_id": tenant_id,
        "database": database,
        "user": request.user["username"],
        "data": ["item1", "item2"]
    })

if __name__ == "__main__":
    app.run(port=8080)

实战案例

案例: 企业级 API 网关认证系统

yaml
# enterprise_auth_architecture.yml
# 企业级认证架构

architecture:
  # Layer 1: 外部认证
  external_auth:
    - oauth2_providers:
        - Google OAuth2
        - Microsoft Azure AD
        - GitHub OAuth
    - saml_providers:
        - Okta
        - Auth0

  # Layer 2: API 网关认证
  api_gateway:
    - jwt_validation:
        issuer: "https://auth.company.com"
        audience: "api.company.com"
        algorithms: ["RS256"]

    - rate_limiting:
        per_user: 1000/hour
        per_ip: 100/minute

    - ip_whitelisting:
        internal_network: "10.0.0.0/8"
        partner_ips: ["203.0.113.0/24"]

  # Layer 3: 服务间认证
  service_mesh:
    - mtls:
        enabled: true
        cert_rotation: 24h

    - service_accounts:
        - name: "user-service"
          permissions: ["user:read", "user:write"]
        - name: "order-service"
          permissions: ["order:*", "user:read"]

monitoring:
  - failed_auth_attempts:
      threshold: 5/minute
      action: block_ip

  - token_usage:
      track_by: user_id
      alert_on: anomaly

compliance:
  - audit_logs:
      retention: 90_days
      include: ["auth_attempts", "token_issued", "token_revoked"]

  - gdpr:
      data_encryption: true
      right_to_be_forgotten: true

总结

认证方案选择

┌──────────────────────────────────────┐
│      认证方案选择指南                │
├──────────────┬───────────────────────┤
│  场景        │  推荐方案             │
├──────────────┼───────────────────────┤
│ 内部 API     │ JWT (HS256)           │
│ 公开 API     │ JWT (RS256) + OAuth2  │
│ 第三方集成   │ OAuth2                │
│ 移动 APP     │ JWT + Refresh Token   │
│ 微服务间     │ mTLS + JWT            │
│ 企业应用     │ SAML/OIDC             │
└──────────────┴───────────────────────┘

关键要点

  1. JWT: 无状态、跨域友好、适合分布式系统
  2. OAuth2: 标准化授权协议、支持第三方登录
  3. RBAC: 基于角色的权限控制、易于管理
  4. 多租户: 数据隔离、租户专属配置
  5. 安全性: HTTPS、Token 过期、刷新机制

下一步学习

💬 讨论

使用 GitHub 账号登录后即可参与讨论

基于 MIT 许可发布