学途智助
首页
分类
标签
关于网站
登录
eeettt
2026-03-26
2
作者编辑
多租户架构实现深度指南
# 多租户架构实现深度指南 > 企业级Agent记忆系统 - 多租户隔离专题 > > 最后更新: 2026-03-26 --- ## 目录 1. [多租户隔离模型](#多租户隔离模型) 2. [数据隔离完整实现](#数据隔离完整实现) 3. [API层租户识别与路由](#api层租户识别与路由) 4. [资源配额与Rate Limiting](#资源配额与rate-limiting) 5. [安全防护机制](#安全防护机制) 6. [监控与审计](#监控与审计) 7. [性能优化](#性能优化) 8. [灾难恢复与数据迁移](#灾难恢复与数据迁移) --- ## 多租户隔离模型 ### 1.1 三种隔离模型对比 根据 [WorkOS多租户指南](https://workos.com/blog/tenant-isolation-in-multi-tenant-systems) 和 [AWS多租户架构](https://docs.aws.amazon.com/prescriptive-guidance/latest/agentic-ai-multitenant/enforcing-tenant-isolation.html): | 维度 | 物理隔离 (Silo) | 逻辑隔离 (Bridge) | 应用层隔离 (Pool) | |------|----------------|------------------|------------------| | **数据库** | 独立实例/Schema | 共享DB + Namespace | 共享表 + tenant_id | | **计算资源** | 独立容器/VM | 共享集群 + 命名空间 | 完全共享 | | **网络** | 独立VPC/子网 | 共享VPC + 安全组 | 共享网络 | | **安全性** | ⭐⭐⭐⭐⭐ 最高 | ⭐⭐⭐⭐ 高 | ⭐⭐⭐ 中等 | | **成本** | 💰💰💰💰💰 很高 | 💰💰💰 中等 | 💰 低 | | **运维复杂度** | 高 (每租户独立) | 中 (统一管理) | 低 (单一系统) | | **扩展性** | 受限 (实例数) | 良好 (弹性伸缩) | 优秀 (无限租户) | | **适用场景** | 金融、医疗、政府 | 企业客户 | SMB、SaaS | **核心风险对比**: | 隔离模型 | 主要风险 | 缓解措施 | |---------|---------|---------| | **物理隔离** | 成本失控、管理复杂 | 自动化部署、监控聚合 | | **逻辑隔离** | 配置错误导致泄漏 | 严格RBAC、自动化测试 | | **应用层隔离** | SQL注入、代码bug泄漏 | RLS、防御性编程、审计 | ### 1.2 推荐方案: 逻辑隔离 **为什么选择逻辑隔离?** 1. ✅ 安全性足够 (99.9%场景) 2. ✅ 成本可控 (相比物理隔离节省70%+) 3. ✅ 运维简单 (统一管理) 4. ✅ 弹性伸缩 (云原生友好) **架构图**: ``` ┌──────────────────────────────────────────────────────────────┐ │ Application Layer │ │ ┌────────────────────────────────────────────────────────┐ │ │ │ API Gateway │ │ │ │ - JWT验证 → 提取tenant_id │ │ │ │ - Rate Limiting (per tenant) │ │ │ │ - Request Logging (audit trail) │ │ │ └────────────────────────────────────────────────────────┘ │ └──────────────────────────────────────────────────────────────┘ ↓ ┌──────────────────────────────────────────────────────────────┐ │ Service Layer (Shared) │ │ ┌────────────────┐ ┌────────────────┐ ┌────────────────┐ │ │ │ Memory API │ │ Search API │ │ Admin API │ │ │ │ tenant_id → │ │ tenant_id → │ │ tenant_id → │ │ │ └────────────────┘ └────────────────┘ └────────────────┘ │ └──────────────────────────────────────────────────────────────┘ ↓ ┌──────────────────────────────────────────────────────────────┐ │ Data Layer (Logically Isolated) │ │ │ │ PostgreSQL (Metadata) Vector DB (Embeddings) │ │ ┌─────────────────────┐ ┌─────────────────────┐ │ │ │ Table: memories │ │ Collection: memory │ │ │ │ - tenant_id (索引) │ │ - tenant_id (过滤) │ │ │ │ - user_id │ │ - embedding │ │ │ │ - content │ │ - metadata │ │ │ │ - RLS enabled │ │ - namespace隔离 │ │ │ └─────────────────────┘ └─────────────────────┘ │ │ │ │ Graph DB (Relations) Redis (Cache) │ │ ┌─────────────────────┐ ┌─────────────────────┐ │ │ │ Node: :Tenant │ │ Keys: │ │ │ │ - id │ │ tenant:{id}:* │ │ │ │ Node: :Memory │ │ - 自动按前缀隔离 │ │ │ │ - tenant_id │ └─────────────────────┘ │ │ └─────────────────────┘ │ └──────────────────────────────────────────────────────────────┘ ``` --- ## 数据隔离完整实现 ### 2.1 PostgreSQL Row-Level Security (RLS) 根据 [OWASP多租户安全指南](https://cheatsheetseries.owasp.org/cheatsheets/Multi_Tenant_Security_Cheat_Sheet.html) 和 [Azure SQL RLS实践](https://oneuptime.com/blog/post/2026-02-16-how-to-design-a-multi-tenant-data-isolation-strategy-on-azure-sql-database-using-row-level-security/view): #### Schema设计 ```sql -- ============================================ -- 租户表 -- ============================================ CREATE TABLE tenants ( id UUID PRIMARY KEY DEFAULT gen_random_uuid(), name VARCHAR(255) NOT NULL, tier VARCHAR(50) NOT NULL, -- free, pro, enterprise status VARCHAR(50) DEFAULT 'active', -- active, suspended, deleted -- 配额 max_memories INTEGER DEFAULT 10000, max_users INTEGER DEFAULT 10, max_api_calls_per_day INTEGER DEFAULT 1000, -- 审计 created_at TIMESTAMP DEFAULT NOW(), updated_at TIMESTAMP DEFAULT NOW(), deleted_at TIMESTAMP ); CREATE INDEX idx_tenants_status ON tenants(status) WHERE status = 'active'; -- ============================================ -- 记忆表 (核心数据) -- ============================================ CREATE TABLE memories ( id UUID PRIMARY KEY DEFAULT gen_random_uuid(), tenant_id UUID NOT NULL REFERENCES tenants(id) ON DELETE CASCADE, user_id UUID NOT NULL, agent_id UUID, -- 内容 content TEXT NOT NULL, content_hash VARCHAR(64), -- SHA-256, 用于去重 -- 向量 (存储在向量DB,这里只保留引用) vector_id VARCHAR(255), embedding_model VARCHAR(100), -- 元数据 type VARCHAR(50), -- preference, fact, skill, etc. source VARCHAR(100), -- chat, api, import tags TEXT[], metadata JSONB DEFAULT '{}'::jsonb, -- 状态 status VARCHAR(50) DEFAULT 'active', access_count INTEGER DEFAULT 0, last_accessed_at TIMESTAMP, -- 审计 created_at TIMESTAMP DEFAULT NOW(), updated_at TIMESTAMP DEFAULT NOW(), deleted_at TIMESTAMP ); -- 关键索引 CREATE INDEX idx_memories_tenant_user ON memories(tenant_id, user_id); CREATE INDEX idx_memories_tenant_created ON memories(tenant_id, created_at DESC); CREATE INDEX idx_memories_tenant_type ON memories(tenant_id, type); CREATE INDEX idx_memories_hash ON memories(content_hash) WHERE content_hash IS NOT NULL; CREATE INDEX idx_memories_metadata ON memories USING GIN (metadata); CREATE INDEX idx_memories_tags ON memories USING GIN (tags); -- ============================================ -- Row-Level Security (RLS) -- ============================================ -- 启用RLS ALTER TABLE memories ENABLE ROW LEVEL SECURITY; -- 策略1: 租户隔离 (最重要!) CREATE POLICY tenant_isolation ON memories FOR ALL USING (tenant_id = current_setting('app.current_tenant_id')::UUID) WITH CHECK (tenant_id = current_setting('app.current_tenant_id')::UUID); -- 策略2: 软删除过滤 CREATE POLICY hide_deleted ON memories FOR SELECT USING (deleted_at IS NULL); -- 策略3: 管理员绕过 (Super Admin) CREATE POLICY admin_bypass ON memories FOR ALL USING ( current_setting('app.is_admin', true)::BOOLEAN = true ); -- ============================================ -- 性能视图 (预聚合统计) -- ============================================ CREATE MATERIALIZED VIEW tenant_memory_stats AS SELECT tenant_id, COUNT(*) as total_memories, COUNT(DISTINCT user_id) as total_users, SUM(access_count) as total_accesses, MAX(created_at) as last_memory_at, pg_size_pretty(SUM(octet_length(content))) as total_content_size FROM memories WHERE deleted_at IS NULL GROUP BY tenant_id; CREATE UNIQUE INDEX ON tenant_memory_stats(tenant_id); -- 定期刷新 (每小时) CREATE OR REPLACE FUNCTION refresh_tenant_stats() RETURNS void AS $$ BEGIN REFRESH MATERIALIZED VIEW CONCURRENTLY tenant_memory_stats; END; $$ LANGUAGE plpgsql; -- ============================================ -- 配额检查函数 -- ============================================ CREATE OR REPLACE FUNCTION check_memory_quota() RETURNS TRIGGER AS $$ DECLARE current_count INTEGER; max_allowed INTEGER; BEGIN -- 获取当前租户的记忆数量 SELECT COUNT(*) INTO current_count FROM memories WHERE tenant_id = NEW.tenant_id AND deleted_at IS NULL; -- 获取配额 SELECT max_memories INTO max_allowed FROM tenants WHERE id = NEW.tenant_id; -- 检查是否超限 IF current_count >= max_allowed THEN RAISE EXCEPTION 'Memory quota exceeded for tenant %', NEW.tenant_id; END IF; RETURN NEW; END; $$ LANGUAGE plpgsql; CREATE TRIGGER check_quota_before_insert BEFORE INSERT ON memories FOR EACH ROW EXECUTE FUNCTION check_memory_quota(); -- ============================================ -- 审计日志 -- ============================================ CREATE TABLE audit_logs ( id BIGSERIAL PRIMARY KEY, tenant_id UUID NOT NULL, user_id UUID, action VARCHAR(50) NOT NULL, -- CREATE, READ, UPDATE, DELETE resource_type VARCHAR(50), resource_id UUID, details JSONB, ip_address INET, user_agent TEXT, created_at TIMESTAMP DEFAULT NOW() ); CREATE INDEX idx_audit_tenant_time ON audit_logs(tenant_id, created_at DESC); CREATE INDEX idx_audit_action ON audit_logs(action); -- 分区 (按月) CREATE TABLE audit_logs_y2026m03 PARTITION OF audit_logs FOR VALUES FROM ('2026-03-01') TO ('2026-04-01'); ``` #### 应用层使用RLS ```python from contextlib import asynccontextmanager import asyncpg class TenantAwareDB: def __init__(self, db_pool: asyncpg.Pool): self.pool = db_pool @asynccontextmanager async def tenant_context(self, tenant_id: str, is_admin: bool = False): """ 设置租户上下文 所有查询自动过滤tenant_id """ async with self.pool.acquire() as conn: # 设置会话变量 await conn.execute(f""" SET LOCAL app.current_tenant_id = '{tenant_id}'; SET LOCAL app.is_admin = {is_admin}; """) try: yield conn finally: # 清理会话变量 await conn.execute(""" RESET app.current_tenant_id; RESET app.is_admin; """) async def create_memory( self, tenant_id: str, user_id: str, content: str, metadata: dict ): """ 创建记忆 - RLS自动确保tenant_id正确 """ async with self.tenant_context(tenant_id) as conn: # 注意: 不需要显式指定tenant_id! # RLS的WITH CHECK会自动验证 result = await conn.fetchrow(""" INSERT INTO memories (tenant_id, user_id, content, metadata) VALUES ($1, $2, $3, $4) RETURNING id, created_at """, tenant_id, user_id, content, json.dumps(metadata)) return result async def search_memories( self, tenant_id: str, query: str, limit: int = 10 ): """ 搜索记忆 - RLS自动过滤tenant_id """ async with self.tenant_context(tenant_id) as conn: # 不需要WHERE tenant_id = $1 # RLS策略会自动添加! results = await conn.fetch(""" SELECT id, content, metadata, created_at FROM memories WHERE content ILIKE $1 ORDER BY created_at DESC LIMIT $2 """, f"%{query}%", limit) return results async def get_tenant_stats(self, tenant_id: str): """ 获取租户统计 (从物化视图) """ async with self.pool.acquire() as conn: stats = await conn.fetchrow(""" SELECT * FROM tenant_memory_stats WHERE tenant_id = $1 """, tenant_id) return stats # 使用示例 db = TenantAwareDB(pool) # 创建记忆 await db.create_memory( tenant_id="tenant-123", user_id="user-456", content="User prefers dark mode", metadata={"type": "preference"} ) # 搜索 (自动隔离) results = await db.search_memories( tenant_id="tenant-123", query="dark" ) # 尝试跨租户访问会失败! results = await db.search_memories( tenant_id="tenant-999", # 不同租户 query="dark" ) # 返回空 (RLS阻止) ``` ### 2.2 向量数据库租户隔离 #### Qdrant Namespace隔离 ```python from qdrant_client import QdrantClient from qdrant_client.models import Distance, VectorParams, Filter, FieldCondition, MatchValue class TenantAwareVectorDB: def __init__(self, qdrant_url: str): self.client = QdrantClient(url=qdrant_url) def _get_collection_name(self, tenant_id: str) -> str: """ 策略1: 每个租户一个Collection (小租户数) """ return f"tenant_{tenant_id}_memories" def _get_shared_collection(self) -> str: """ 策略2: 共享Collection + payload过滤 (大租户数) """ return "shared_memories" async def ensure_collection(self, tenant_id: str, strategy="shared"): """ 确保租户的Collection存在 """ if strategy == "dedicated": collection_name = self._get_collection_name(tenant_id) collections = await self.client.get_collections() if collection_name not in [c.name for c in collections.collections]: await self.client.create_collection( collection_name=collection_name, vectors_config=VectorParams( size=1536, distance=Distance.COSINE ) ) elif strategy == "shared": # 使用共享Collection collection_name = self._get_shared_collection() # ... 类似创建逻辑 async def insert_vector( self, tenant_id: str, vector_id: str, embedding: list, metadata: dict, strategy="shared" ): """ 插入向量 - 自动添加tenant_id """ if strategy == "shared": collection_name = self._get_shared_collection() # 关键: 在payload中添加tenant_id await self.client.upsert( collection_name=collection_name, points=[{ "id": vector_id, "vector": embedding, "payload": { "tenant_id": tenant_id, # ← 隔离关键! **metadata } }] ) elif strategy == "dedicated": collection_name = self._get_collection_name(tenant_id) # 不需要tenant_id (物理隔离) await self.client.upsert( collection_name=collection_name, points=[{ "id": vector_id, "vector": embedding, "payload": metadata }] ) async def search( self, tenant_id: str, query_vector: list, limit: int = 10, strategy="shared" ): """ 搜索向量 - 自动过滤tenant_id """ if strategy == "shared": collection_name = self._get_shared_collection() # 关键: 必须添加tenant_id过滤! results = await self.client.search( collection_name=collection_name, query_vector=query_vector, query_filter=Filter( must=[ FieldCondition( key="tenant_id", match=MatchValue(value=tenant_id) ) ] ), limit=limit ) elif strategy == "dedicated": collection_name = self._get_collection_name(tenant_id) # 物理隔离,不需要过滤 results = await self.client.search( collection_name=collection_name, query_vector=query_vector, limit=limit ) return results # 使用示例 vector_db = TenantAwareVectorDB("http://localhost:6333") # 插入 await vector_db.insert_vector( tenant_id="tenant-123", vector_id="mem-001", embedding=[0.1, 0.2, ...], metadata={"type": "preference"}, strategy="shared" # 共享模式 ) # 搜索 (自动过滤tenant_id) results = await vector_db.search( tenant_id="tenant-123", query_vector=[0.1, 0.2, ...], limit=10, strategy="shared" ) ``` #### 安全加固: Double-Check ```python class SecureTenantVectorDB(TenantAwareVectorDB): """ 防御性编程: 多层验证 """ async def search(self, tenant_id: str, query_vector: list, limit: int = 10): # 第一层: 向量DB过滤 results = await super().search(tenant_id, query_vector, limit) # 第二层: 应用层验证 (防止配置错误) validated_results = [] for result in results: # 验证每个结果的tenant_id if result.payload.get("tenant_id") != tenant_id: # 记录安全事件! logger.critical( f"Tenant isolation breach detected! " f"Expected {tenant_id}, got {result.payload.get('tenant_id')}" ) # 不返回该结果 continue validated_results.append(result) # 第三层: 审计 await self.log_access( tenant_id=tenant_id, action="vector_search", result_count=len(validated_results) ) return validated_results async def log_access(self, tenant_id: str, action: str, **details): """记录所有访问""" # 写入审计日志 pass ``` ### 2.3 Redis缓存隔离 ```python import redis.asyncio as redis from typing import Optional class TenantAwareCache: """ Redis缓存的租户隔离 """ def __init__(self, redis_url: str): self.redis = redis.from_url(redis_url) def _make_key(self, tenant_id: str, key: str) -> str: """ 生成带租户前缀的key 格式: tenant:{tenant_id}:{key} """ return f"tenant:{tenant_id}:{key}" async def get(self, tenant_id: str, key: str) -> Optional[str]: """获取缓存 - 自动添加前缀""" full_key = self._make_key(tenant_id, key) value = await self.redis.get(full_key) return value.decode() if value else None async def set( self, tenant_id: str, key: str, value: str, ttl: int = 3600 ): """设置缓存 - 自动添加前缀和TTL""" full_key = self._make_key(tenant_id, key) await self.redis.setex(full_key, ttl, value) async def delete_pattern(self, tenant_id: str, pattern: str): """ 删除租户的所有匹配缓存 例如: 清空tenant-123的所有memory相关缓存 """ full_pattern = self._make_key(tenant_id, pattern) keys = [] # 使用SCAN而不是KEYS (避免阻塞) async for key in self.redis.scan_iter(match=full_pattern): keys.append(key) if keys: await self.redis.delete(*keys) return len(keys) async def get_tenant_cache_size(self, tenant_id: str) -> int: """ 获取租户缓存大小 (用于配额监控) """ pattern = self._make_key(tenant_id, "*") total_size = 0 async for key in self.redis.scan_iter(match=pattern): size = await self.redis.memory_usage(key) total_size += size or 0 return total_size # 使用示例 cache = TenantAwareCache("redis://localhost") # 缓存查询结果 await cache.set( tenant_id="tenant-123", key="search:query_hash_abc", value=json.dumps(search_results), ttl=300 # 5分钟 ) # 读取缓存 cached = await cache.get( tenant_id="tenant-123", key="search:query_hash_abc" ) # 清空租户缓存 await cache.delete_pattern( tenant_id="tenant-123", pattern="search:*" ) ``` --- ## API层租户识别与路由 ### 3.1 JWT Token设计 ```python from datetime import datetime, timedelta from typing import Optional import jwt from fastapi import HTTPException, Security from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials # 配置 SECRET_KEY = "your-secret-key" ALGORITHM = "HS256" security = HTTPBearer() def create_access_token( tenant_id: str, user_id: str, user_email: str, role: str = "user", expires_delta: Optional[timedelta] = None ) -> str: """ 创建JWT Token """ expire = datetime.utcnow() + (expires_delta or timedelta(hours=24)) to_encode = { # 标准claims "exp": expire, "iat": datetime.utcnow(), "iss": "agent-memory-system", # 自定义claims "tenant_id": tenant_id, # ← 关键! "user_id": user_id, "email": user_email, "role": role, } encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM) return encoded_jwt async def get_current_tenant( credentials: HTTPAuthorizationCredentials = Security(security) ) -> dict: """ 从JWT中提取租户信息 作为FastAPI依赖使用 """ token = credentials.credentials try: payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) # 验证必需字段 tenant_id = payload.get("tenant_id") user_id = payload.get("user_id") if not tenant_id or not user_id: raise HTTPException(401, "Invalid token: missing tenant_id or user_id") # 返回租户上下文 return { "tenant_id": tenant_id, "user_id": user_id, "email": payload.get("email"), "role": payload.get("role", "user"), } except jwt.ExpiredSignatureError: raise HTTPException(401, "Token expired") except jwt.JWTError as e: raise HTTPException(401, f"Invalid token: {str(e)}") # 使用示例 from fastapi import FastAPI, Depends app = FastAPI() @app.post("/api/v1/memories") async def create_memory( memory_data: dict, tenant_context: dict = Depends(get_current_tenant) # ← 自动注入 ): """ 创建记忆 - 自动获取tenant_id """ tenant_id = tenant_context["tenant_id"] user_id = tenant_context["user_id"] # 使用tenant_id result = await memory_service.create( tenant_id=tenant_id, user_id=user_id, **memory_data ) return result @app.get("/api/v1/memories/search") async def search_memories( query: str, tenant_context: dict = Depends(get_current_tenant) ): """ 搜索记忆 - tenant_id自动注入 """ results = await memory_service.search( tenant_id=tenant_context["tenant_id"], query=query ) return results ``` ### 3.2 请求级别审计 ```python from starlette.middleware.base import BaseHTTPMiddleware import time import uuid class TenantAuditMiddleware(BaseHTTPMiddleware): """ 自动记录所有API请求 """ async def dispatch(self, request, call_next): # 生成请求ID request_id = str(uuid.uuid4()) request.state.request_id = request_id # 记录开始时间 start_time = time.time() # 尝试提取tenant_id tenant_id = None try: if "Authorization" in request.headers: token = request.headers["Authorization"].replace("Bearer ", "") payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) tenant_id = payload.get("tenant_id") except: pass # 执行请求 response = await call_next(request) # 计算耗时 duration = time.time() - start_time # 记录审计日志 await log_request( request_id=request_id, tenant_id=tenant_id, method=request.method, path=request.url.path, status_code=response.status_code, duration_ms=duration * 1000, ip_address=request.client.host, user_agent=request.headers.get("user-agent") ) # 添加响应头 response.headers["X-Request-ID"] = request_id response.headers["X-Tenant-ID"] = tenant_id or "anonymous" return response async def log_request(request_id, tenant_id, method, path, status_code, duration_ms, ip_address, user_agent): """ 异步写入审计日志 """ log_entry = { "request_id": request_id, "tenant_id": tenant_id, "method": method, "path": path, "status_code": status_code, "duration_ms": duration_ms, "ip_address": ip_address, "user_agent": user_agent, "timestamp": datetime.utcnow().isoformat() } # 写入数据库 (异步) asyncio.create_task(db.insert_audit_log(log_entry)) # 也写入日志文件 (JSON格式) logger.info(json.dumps(log_entry)) # 添加中间件 app.add_middleware(TenantAuditMiddleware) ``` --- ## 资源配额与Rate Limiting ### 4.1 多层Rate Limiting ```python from fastapi import HTTPException from slowapi import Limiter, _rate_limit_exceeded_handler from slowapi.util import get_remote_address from slowapi.errors import RateLimitExceeded import redis.asyncio as redis # 全局限流器 limiter = Limiter(key_func=get_remote_address) class TenantRateLimiter: """ 租户级别的Rate Limiting """ def __init__(self, redis_client: redis.Redis): self.redis = redis_client async def check_limit( self, tenant_id: str, endpoint: str, limit_per_minute: int, limit_per_day: int ): """ 检查租户是否超限 """ # Key设计 minute_key = f"ratelimit:tenant:{tenant_id}:{endpoint}:minute" day_key = f"ratelimit:tenant:{tenant_id}:{endpoint}:day" # 检查分钟限制 minute_count = await self.redis.incr(minute_key) if minute_count == 1: await self.redis.expire(minute_key, 60) if minute_count > limit_per_minute: raise HTTPException( status_code=429, detail=f"Rate limit exceeded: {limit_per_minute}/minute" ) # 检查日限制 day_count = await self.redis.incr(day_key) if day_count == 1: await self.redis.expire(day_key, 86400) if day_count > limit_per_day: raise HTTPException( status_code=429, detail=f"Daily quota exceeded: {limit_per_day}/day" ) # 返回剩余配额 return { "minute_remaining": limit_per_minute - minute_count, "day_remaining": limit_per_day - day_count } # 租户配额配置 TENANT_LIMITS = { "free": { "search": {"per_minute": 10, "per_day": 100}, "create": {"per_minute": 5, "per_day": 50} }, "pro": { "search": {"per_minute": 100, "per_day": 10000}, "create": {"per_minute": 50, "per_day": 5000} }, "enterprise": { "search": {"per_minute": 1000, "per_day": 100000}, "create": {"per_minute": 500, "per_day": 50000} } } # 使用示例 @app.get("/api/v1/memories/search") @limiter.limit("100/minute") # 全局IP限制 async def search_memories( query: str, tenant_context: dict = Depends(get_current_tenant), rate_limiter: TenantRateLimiter = Depends() ): """ 搜索接口 - 多层限流 """ # 获取租户tier tenant = await db.get_tenant(tenant_context["tenant_id"]) # 租户级别限流 limits = TENANT_LIMITS[tenant.tier]["search"] quota = await rate_limiter.check_limit( tenant_id=tenant_context["tenant_id"], endpoint="search", limit_per_minute=limits["per_minute"], limit_per_day=limits["per_day"] ) # 执行搜索 results = await memory_service.search( tenant_id=tenant_context["tenant_id"], query=query ) # 返回配额信息 return { "results": results, "quota": quota } # 注册异常处理 app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler) ``` ### 4.2 资源配额实时监控 ```python class TenantQuotaMonitor: """ 实时监控租户资源使用 """ def __init__(self, db, redis_client): self.db = db self.redis = redis_client async def check_and_update( self, tenant_id: str, resource_type: str, increment: int = 1 ): """ 检查配额并更新使用量 resource_type: memories, api_calls, storage_bytes, etc. """ # 从Redis获取当前使用量 (缓存) cache_key = f"quota:{tenant_id}:{resource_type}" current_usage = await self.redis.get(cache_key) if current_usage is None: # Cache miss - 从DB加载 tenant = await self.db.get_tenant(tenant_id) current_usage = getattr(tenant, f"current_{resource_type}", 0) await self.redis.setex(cache_key, 300, current_usage) # 5分钟缓存 else: current_usage = int(current_usage) # 获取配额限制 tenant = await self.db.get_tenant(tenant_id) max_allowed = getattr(tenant, f"max_{resource_type}") # 检查是否超限 new_usage = current_usage + increment if new_usage > max_allowed: raise HTTPException( status_code=429, detail=f"{resource_type} quota exceeded: {new_usage}/{max_allowed}" ) # 更新Redis await self.redis.incrby(cache_key, increment) # 异步更新DB (批量) asyncio.create_task(self._batch_update_db(tenant_id, resource_type, increment)) return { "current": new_usage, "max": max_allowed, "remaining": max_allowed - new_usage, "percentage": (new_usage / max_allowed) * 100 } async def _batch_update_db(self, tenant_id, resource_type, increment): """ 批量更新数据库 (每分钟一次) """ # 添加到批量更新队列 pass # 使用示例 @app.post("/api/v1/memories") async def create_memory( memory_data: dict, tenant_context: dict = Depends(get_current_tenant), quota_monitor: TenantQuotaMonitor = Depends() ): """ 创建记忆 - 检查配额 """ # 检查记忆数配额 quota_info = await quota_monitor.check_and_update( tenant_id=tenant_context["tenant_id"], resource_type="memories", increment=1 ) # 如果接近配额 (>90%), 发送告警 if quota_info["percentage"] > 90: await send_quota_warning( tenant_id=tenant_context["tenant_id"], resource="memories", usage=quota_info["percentage"] ) # 创建记忆 result = await memory_service.create( tenant_id=tenant_context["tenant_id"], **memory_data ) return { "memory": result, "quota": quota_info } ``` --- ## 安全防护机制 ### 5.1 SQL注入防护 ```python # ❌ 错误示例 - SQL注入漏洞 async def search_memories_unsafe(tenant_id: str, query: str): sql = f"SELECT * FROM memories WHERE tenant_id = '{tenant_id}' AND content LIKE '%{query}%'" # 攻击: query = "'; DROP TABLE memories; --" return await db.execute(sql) # ✅ 正确示例 - 参数化查询 async def search_memories_safe(tenant_id: str, query: str): sql = """ SELECT * FROM memories WHERE tenant_id = $1 AND content ILIKE $2 """ return await db.fetch(sql, tenant_id, f"%{query}%") # ✅ 更好 - 使用ORM from tortoise import models, fields class Memory(models.Model): id = fields.UUIDField(pk=True) tenant_id = fields.UUIDField() content = fields.TextField() class Meta: table = "memories" # ORM自动参数化 async def search_memories_orm(tenant_id: str, query: str): return await Memory.filter( tenant_id=tenant_id, content__icontains=query ).all() ``` ### 5.2 防止Tenant ID伪造 ```python class TenantSecurityMiddleware(BaseHTTPMiddleware): """ 防止tenant_id被篡改 """ async def dispatch(self, request, call_next): # 1. 从JWT提取tenant_id token_tenant_id = None if "Authorization" in request.headers: try: token = request.headers["Authorization"].replace("Bearer ", "") payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) token_tenant_id = payload.get("tenant_id") except: pass # 2. 检查请求体/查询参数中的tenant_id # (有些API可能允许传递tenant_id) request_tenant_id = None if request.method == "POST": body = await request.body() try: data = json.loads(body) request_tenant_id = data.get("tenant_id") except: pass # 3. 验证一致性 if request_tenant_id and token_tenant_id: if request_tenant_id != token_tenant_id: # 检测到tenant_id不匹配 - 可能的攻击! logger.warning( f"Tenant ID mismatch detected! " f"Token: {token_tenant_id}, Request: {request_tenant_id}, " f"IP: {request.client.host}" ) return JSONResponse( status_code=403, content={"detail": "Tenant ID mismatch"} ) # 执行请求 response = await call_next(request) return response app.add_middleware(TenantSecurityMiddleware) ``` ### 5.3 GDPR合规 - 数据导出与删除 ```python class GDPRService: """ GDPR合规功能 """ async def export_tenant_data(self, tenant_id: str) -> dict: """ 导出租户的所有数据 (GDPR Right to Access) """ data = { "tenant": await db.get_tenant(tenant_id), "memories": await db.get_all_memories(tenant_id), "users": await db.get_all_users(tenant_id), "audit_logs": await db.get_audit_logs(tenant_id, limit=10000), "export_date": datetime.utcnow().isoformat() } # 转换为JSON return data async def delete_tenant_data(self, tenant_id: str, reason: str): """ 删除租户的所有数据 (GDPR Right to Erasure) """ # 1. 软删除数据库记录 await db.execute(""" UPDATE memories SET deleted_at = NOW(), status = 'deleted' WHERE tenant_id = $1 """, tenant_id) await db.execute(""" UPDATE tenants SET deleted_at = NOW(), status = 'deleted' WHERE id = $1 """, tenant_id) # 2. 删除向量数据 await vector_db.delete_collection(f"tenant_{tenant_id}_memories") # 3. 清空缓存 await cache.delete_pattern(tenant_id, "*") # 4. 记录删除日志 (保留用于审计) await db.insert_audit_log({ "tenant_id": tenant_id, "action": "TENANT_DELETED", "reason": reason, "timestamp": datetime.utcnow() }) # 5. 30天后物理删除 (异步任务) await schedule_physical_deletion(tenant_id, days=30) @app.post("/api/v1/admin/tenants/{tenant_id}/export") async def export_tenant( tenant_id: str, admin_context: dict = Depends(get_admin_user) # 仅admin ): """ 导出租户数据 """ data = await gdpr_service.export_tenant_data(tenant_id) # 返回JSON文件 return JSONResponse(content=data) @app.delete("/api/v1/admin/tenants/{tenant_id}") async def delete_tenant( tenant_id: str, reason: str, admin_context: dict = Depends(get_admin_user) ): """ 删除租户 """ await gdpr_service.delete_tenant_data(tenant_id, reason) return {"status": "deleted", "tenant_id": tenant_id} ``` --- ## 监控与审计 ### 6.1 Prometheus指标 ```python from prometheus_client import Counter, Histogram, Gauge # 租户级别指标 tenant_requests = Counter( 'tenant_api_requests_total', 'Total API requests per tenant', ['tenant_id', 'endpoint', 'status'] ) tenant_latency = Histogram( 'tenant_api_latency_seconds', 'API latency per tenant', ['tenant_id', 'endpoint'], buckets=[0.01, 0.05, 0.1, 0.2, 0.5, 1.0, 2.0, 5.0] ) tenant_memory_count = Gauge( 'tenant_memory_total', 'Total memories per tenant', ['tenant_id'] ) tenant_storage_bytes = Gauge( 'tenant_storage_bytes', 'Storage usage per tenant', ['tenant_id'] ) tenant_quota_usage = Gauge( 'tenant_quota_usage_ratio', 'Quota usage ratio (0-1)', ['tenant_id', 'resource_type'] ) # 在中间件中记录 class MetricsMiddleware(BaseHTTPMiddleware): async def dispatch(self, request, call_next): # 提取tenant_id tenant_id = extract_tenant_id(request) # 记录请求 tenant_requests.labels( tenant_id=tenant_id, endpoint=request.url.path, status="processing" ).inc() # 测量延迟 start_time = time.time() response = await call_next(request) duration = time.time() - start_time tenant_latency.labels( tenant_id=tenant_id, endpoint=request.url.path ).observe(duration) # 更新状态 tenant_requests.labels( tenant_id=tenant_id, endpoint=request.url.path, status=response.status_code ).inc() return response ``` ### 6.2 Grafana Dashboard配置 ```yaml # dashboard.json { "dashboard": { "title": "Multi-Tenant Memory System", "panels": [ { "title": "Requests per Tenant", "type": "graph", "targets": [ { "expr": "rate(tenant_api_requests_total[5m])", "legendFormat": "{{tenant_id}}" } ] }, { "title": "P95 Latency by Tenant", "type": "graph", "targets": [ { "expr": "histogram_quantile(0.95, tenant_api_latency_seconds)", "legendFormat": "{{tenant_id}}" } ] }, { "title": "Quota Usage", "type": "bar", "targets": [ { "expr": "tenant_quota_usage_ratio * 100", "legendFormat": "{{tenant_id}} - {{resource_type}}" } ], "alert": { "conditions": [ { "evaluator": {"type": "gt", "params": [90]}, "query": {"params": ["A", "5m", "now"]}, "reducer": {"type": "avg"} } ] } }, { "title": "Top Tenants by Storage", "type": "table", "targets": [ { "expr": "topk(10, tenant_storage_bytes)", "format": "table" } ] } ] } } ``` --- ## 总结 ### 核心检查清单 **数据隔离**: - [x] PostgreSQL启用RLS - [x] 向量DB添加tenant_id过滤 - [x] Redis key添加租户前缀 - [x] 所有查询包含tenant_id **API安全**: - [x] JWT包含tenant_id - [x] 请求验证tenant_id一致性 - [x] 防止SQL注入 (参数化查询) - [x] 输入验证与sanitization **配额管理**: - [x] 多层Rate Limiting (IP + 租户) - [x] 实时配额监控 - [x] 超限自动拒绝 - [x] 接近配额告警 **审计合规**: - [x] 所有请求记录audit log - [x] GDPR数据导出 - [x] GDPR数据删除 - [x] 30天保留期 **监控告警**: - [x] Prometheus租户级指标 - [x] Grafana dashboard - [x] 配额超限告警 - [x] 性能异常告警 ### 常见陷阱与防范 | 陷阱 | 后果 | 防范措施 | |------|------|---------| | 忘记tenant_id过滤 | **数据泄漏** | RLS + 代码审查 + 集成测试 | | SQL注入 | **数据泄漏/删除** | 参数化查询 + ORM | | 缓存key冲突 | **跨租户数据混淆** | 强制租户前缀 | | 配额不生效 | **成本失控** | 实时校验 + 告警 | | 缺少审计 | **合规风险** | 中间件自动记录 | --- ## 参考资源 - [OWASP Multi-Tenant Security Cheat Sheet](https://cheatsheetseries.owasp.org/cheatsheets/Multi_Tenant_Security_Cheat_Sheet.html) - [WorkOS Tenant Isolation Guide](https://workos.com/blog/tenant-isolation-in-multi-tenant-systems) - [AWS Multi-Tenant AI Guide](https://docs.aws.amazon.com/prescriptive-guidance/latest/agentic-ai-multitenant/enforcing-tenant-isolation.html) - [Azure SQL RLS Best Practices](https://oneuptime.com/blog/post/2026-02-16-how-to-design-a-multi-tenant-data-isolation-strategy-on-azure-sql-database-using-row-level-security/view) --- **文档版本**: v1.0 **最后更新**: 2026-03-26
Python
赞
博客信息
作者
eeettt
发布日期
2026-03-26
其他信息 : 其他三字母的人名首字母都是其他同学发布的哦