前言:Demo 到生产的鸿沟
你做了一个 LLM 应用的 Demo:
# Demo 版本:100 行代码搞定
response = openai.chat.completions.create(
model="gpt-4",
messages=[{"role": "user", "content": user_input}]
)
print(response.choices[0].message.content)但生产环境需要考虑:
mindmap
root((生产环境考量))
性能
延迟
吞吐量
并发
可靠性
错误处理
重试机制
降级方案
成本
Token 优化
模型选择
缓存策略
安全
输入输出过滤
权限控制
审计日志
可观测
监控
日志
追踪
一、整体架构设计
1.1 典型 LLM 应用架构
flowchart TB
subgraph 客户端
Web[Web 应用]
Mobile[移动应用]
API[API 调用]
end
subgraph 网关层
LB[负载均衡]
Gateway[API Gateway]
Auth[认证授权]
RateLimit[限流]
end
subgraph 业务层
Router[请求路由]
InputProc[输入处理]
OutputProc[输出处理]
Cache[缓存层]
end
subgraph LLM层
Orchestrator[编排层]
LLM1[GPT-4]
LLM2[Claude]
LLM3[本地模型]
RAG[RAG 服务]
Agent[Agent 服务]
end
subgraph 数据层
VectorDB[(向量数据库)]
RelDB[(关系数据库)]
Redis[(Redis)]
MQ[消息队列]
end
subgraph 可观测性
Log[日志]
Metrics[指标]
Trace[追踪]
end
Web --> LB
Mobile --> LB
API --> LB
LB --> Gateway
Gateway --> Auth
Auth --> RateLimit
RateLimit --> Router
Router --> InputProc
InputProc --> Cache
Cache --> Orchestrator
Orchestrator --> LLM1
Orchestrator --> LLM2
Orchestrator --> LLM3
Orchestrator --> RAG
Orchestrator --> Agent
RAG --> VectorDB
OutputProc --> Cache
Router -.-> Log
Orchestrator -.-> Metrics
LLM1 -.-> Trace
1.2 核心组件说明
| 组件 | 作用 | 推荐技术 |
|---|---|---|
| API Gateway | 认证、限流、路由 | Kong, Nginx, AWS API Gateway |
| 编排层 | LLM 调用逻辑 | LangChain, 自研 |
| 缓存层 | 减少重复调用 | Redis, Memcached |
| 向量数据库 | RAG 检索 | Milvus, Pinecone, Chroma |
| 消息队列 | 异步处理 | Kafka, RabbitMQ, Redis |
| 可观测性 | 监控追踪 | Prometheus, Grafana, Jaeger |
1.3 架构模式选择
graph TB
subgraph 同步模式
S1[请求] --> S2[处理] --> S3[响应]
S3 -->|延迟高| S4[用户等待]
end
subgraph 异步模式
A1[请求] --> A2[入队]
A2 --> A3[立即返回任务ID]
A4[Worker处理] --> A5[结果存储]
A6[轮询/回调获取结果]
end
subgraph 流式模式
T1[请求] --> T2[流式处理]
T2 --> T3[持续返回]
T3 --> T4[用户实时看到]
end
style T4 fill:#4ecdc4
选择建议:
- 同步模式:简单请求,延迟要求不高
- 异步模式:复杂任务,可以接受等待
- 流式模式:对话场景,提升用户体验
二、性能优化
2.1 延迟优化
import asyncio
import time
from typing import List
from openai import AsyncOpenAI
class LatencyOptimizer:
"""延迟优化器"""
def __init__(self):
self.client = AsyncOpenAI()
async def parallel_calls(self, prompts: List[str]) -> List[str]:
"""并行调用多个 LLM 请求"""
tasks = [
self.client.chat.completions.create(
model="gpt-3.5-turbo",
messages=[{"role": "user", "content": p}]
)
for p in prompts
]
responses = await asyncio.gather(*tasks)
return [r.choices[0].message.content for r in responses]
async def streaming_response(self, prompt: str):
"""流式响应"""
stream = await self.client.chat.completions.create(
model="gpt-4",
messages=[{"role": "user", "content": prompt}],
stream=True,
)
async for chunk in stream:
if chunk.choices[0].delta.content:
yield chunk.choices[0].delta.content
async def speculative_execution(self, prompt: str):
"""投机执行:同时调用快慢模型"""
fast_task = self.client.chat.completions.create(
model="gpt-3.5-turbo", # 快但质量一般
messages=[{"role": "user", "content": prompt}]
)
slow_task = self.client.chat.completions.create(
model="gpt-4", # 慢但质量好
messages=[{"role": "user", "content": prompt}]
)
# 先返回快的结果,后台继续等慢的
done, pending = await asyncio.wait(
[fast_task, slow_task],
return_when=asyncio.FIRST_COMPLETED
)
fast_result = done.pop().result()
# 返回快的结果,同时安排后续替换
return {
"initial": fast_result.choices[0].message.content,
"pending_upgrade": pending,
}2.2 缓存策略
import hashlib
import json
from typing import Optional
import redis
class LLMCache:
"""LLM 响应缓存"""
def __init__(self, redis_url: str = "redis://localhost:6379"):
self.redis = redis.from_url(redis_url)
self.default_ttl = 3600 # 1小时
def _get_cache_key(self, prompt: str, model: str, params: dict) -> str:
"""生成缓存键"""
# 将请求参数哈希
content = json.dumps({
"prompt": prompt,
"model": model,
"params": params,
}, sort_keys=True)
return f"llm:cache:{hashlib.md5(content.encode()).hexdigest()}"
def get(self, prompt: str, model: str, params: dict) -> Optional[str]:
"""获取缓存"""
key = self._get_cache_key(prompt, model, params)
cached = self.redis.get(key)
if cached:
return json.loads(cached)["response"]
return None
def set(self, prompt: str, model: str, params: dict, response: str, ttl: int = None):
"""设置缓存"""
key = self._get_cache_key(prompt, model, params)
value = json.dumps({"response": response})
self.redis.setex(key, ttl or self.default_ttl, value)
def get_or_call(self, prompt: str, model: str, params: dict, llm_func):
"""获取缓存或调用 LLM"""
cached = self.get(prompt, model, params)
if cached:
return cached, True # 返回缓存命中标志
response = llm_func(prompt, model, params)
self.set(prompt, model, params, response)
return response, False
class SemanticCache:
"""语义缓存:相似问题返回相同答案"""
def __init__(self, embedding_model, similarity_threshold: float = 0.95):
self.embedding_model = embedding_model
self.threshold = similarity_threshold
self.cache = {} # 实际应用中用向量数据库
def get(self, query: str) -> Optional[str]:
"""语义匹配获取缓存"""
query_embedding = self.embedding_model.encode(query)
for cached_query, (cached_embedding, response) in self.cache.items():
similarity = self._cosine_similarity(query_embedding, cached_embedding)
if similarity > self.threshold:
return response
return None
def set(self, query: str, response: str):
"""设置语义缓存"""
embedding = self.embedding_model.encode(query)
self.cache[query] = (embedding, response)2.3 批处理优化
import asyncio
from typing import List, Dict
from dataclasses import dataclass
from collections import defaultdict
@dataclass
class BatchRequest:
"""批处理请求"""
request_id: str
prompt: str
future: asyncio.Future
class BatchProcessor:
"""批处理器:合并多个请求一起处理"""
def __init__(
self,
batch_size: int = 10,
max_wait_time: float = 0.1, # 最多等待 100ms
):
self.batch_size = batch_size
self.max_wait_time = max_wait_time
self.pending_requests: List[BatchRequest] = []
self.lock = asyncio.Lock()
self._processing = False
async def submit(self, prompt: str) -> str:
"""提交请求"""
request_id = str(uuid.uuid4())
future = asyncio.Future()
request = BatchRequest(
request_id=request_id,
prompt=prompt,
future=future,
)
async with self.lock:
self.pending_requests.append(request)
# 达到批次大小,立即处理
if len(self.pending_requests) >= self.batch_size:
asyncio.create_task(self._process_batch())
elif not self._processing:
# 启动定时处理
asyncio.create_task(self._delayed_process())
return await future
async def _delayed_process(self):
"""延迟处理"""
self._processing = True
await asyncio.sleep(self.max_wait_time)
await self._process_batch()
self._processing = False
async def _process_batch(self):
"""处理一批请求"""
async with self.lock:
if not self.pending_requests:
return
batch = self.pending_requests[:self.batch_size]
self.pending_requests = self.pending_requests[self.batch_size:]
# 批量调用 LLM
prompts = [r.prompt for r in batch]
responses = await self._batch_llm_call(prompts)
# 设置结果
for request, response in zip(batch, responses):
request.future.set_result(response)
async def _batch_llm_call(self, prompts: List[str]) -> List[str]:
"""批量 LLM 调用"""
# 使用 vLLM 等支持批处理的框架
from vllm import LLM, SamplingParams
llm = LLM(model="gpt-3.5-turbo")
outputs = llm.generate(prompts, SamplingParams(max_tokens=256))
return [o.outputs[0].text for o in outputs]三、可靠性设计
3.1 错误处理与重试
import asyncio
import random
from typing import Callable, TypeVar
from functools import wraps
T = TypeVar('T')
class RetryConfig:
"""重试配置"""
max_retries: int = 3
base_delay: float = 1.0
max_delay: float = 60.0
exponential_base: float = 2.0
jitter: bool = True
def with_retry(config: RetryConfig = RetryConfig()):
"""重试装饰器"""
def decorator(func: Callable[..., T]) -> Callable[..., T]:
@wraps(func)
async def wrapper(*args, **kwargs) -> T:
last_exception = None
for attempt in range(config.max_retries + 1):
try:
return await func(*args, **kwargs)
except RateLimitError as e:
# 速率限制:等待更长时间
delay = min(
config.max_delay,
config.base_delay * (config.exponential_base ** attempt)
)
if config.jitter:
delay *= (0.5 + random.random())
await asyncio.sleep(delay)
last_exception = e
except (APIConnectionError, APITimeoutError) as e:
# 连接问题:快速重试
delay = config.base_delay * (attempt + 1)
await asyncio.sleep(delay)
last_exception = e
except InvalidRequestError as e:
# 请求无效:不重试
raise e
raise last_exception
return wrapper
return decorator
class ResilientLLMClient:
"""弹性 LLM 客户端"""
def __init__(self, primary_client, fallback_clients: list = None):
self.primary = primary_client
self.fallbacks = fallback_clients or []
self.circuit_breaker = CircuitBreaker()
@with_retry()
async def call(self, prompt: str, **kwargs) -> str:
"""带熔断的调用"""
# 检查熔断器
if self.circuit_breaker.is_open():
return await self._fallback_call(prompt, **kwargs)
try:
response = await self.primary.chat.completions.create(
model=kwargs.get("model", "gpt-4"),
messages=[{"role": "user", "content": prompt}],
)
self.circuit_breaker.record_success()
return response.choices[0].message.content
except Exception as e:
self.circuit_breaker.record_failure()
if self.circuit_breaker.is_open():
return await self._fallback_call(prompt, **kwargs)
raise e
async def _fallback_call(self, prompt: str, **kwargs) -> str:
"""降级调用"""
for client in self.fallbacks:
try:
response = await client.chat.completions.create(
model=kwargs.get("fallback_model", "gpt-3.5-turbo"),
messages=[{"role": "user", "content": prompt}],
)
return response.choices[0].message.content
except:
continue
raise Exception("所有服务都不可用")
class CircuitBreaker:
"""熔断器"""
def __init__(
self,
failure_threshold: int = 5,
recovery_timeout: float = 30.0,
):
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout
self.failure_count = 0
self.last_failure_time = None
self.state = "closed" # closed, open, half-open
def is_open(self) -> bool:
if self.state == "closed":
return False
if self.state == "open":
# 检查是否可以尝试恢复
if time.time() - self.last_failure_time > self.recovery_timeout:
self.state = "half-open"
return False
return True
return False # half-open
def record_success(self):
self.failure_count = 0
self.state = "closed"
def record_failure(self):
self.failure_count += 1
self.last_failure_time = time.time()
if self.failure_count >= self.failure_threshold:
self.state = "open"3.2 请求队列与限流
import asyncio
from asyncio import Queue, Semaphore
from dataclasses import dataclass
from typing import Any
import time
@dataclass
class QueuedRequest:
"""队列请求"""
id: str
prompt: str
priority: int
timestamp: float
future: asyncio.Future
class RequestQueue:
"""请求队列"""
def __init__(
self,
max_concurrent: int = 10,
max_queue_size: int = 1000,
rate_limit_rpm: int = 60,
):
self.semaphore = Semaphore(max_concurrent)
self.queue = asyncio.PriorityQueue(maxsize=max_queue_size)
self.rate_limiter = RateLimiter(rate_limit_rpm)
self._running = True
async def start(self):
"""启动处理循环"""
while self._running:
# 获取请求
priority, request = await self.queue.get()
# 等待速率限制
await self.rate_limiter.acquire()
# 处理请求
asyncio.create_task(self._process_request(request))
async def submit(self, prompt: str, priority: int = 5) -> str:
"""提交请求"""
future = asyncio.Future()
request = QueuedRequest(
id=str(uuid.uuid4()),
prompt=prompt,
priority=priority,
timestamp=time.time(),
future=future,
)
await self.queue.put((priority, request))
return await future
async def _process_request(self, request: QueuedRequest):
"""处理单个请求"""
async with self.semaphore:
try:
response = await self._call_llm(request.prompt)
request.future.set_result(response)
except Exception as e:
request.future.set_exception(e)
class RateLimiter:
"""速率限制器(令牌桶)"""
def __init__(self, requests_per_minute: int):
self.rpm = requests_per_minute
self.tokens = requests_per_minute
self.last_update = time.time()
self.lock = asyncio.Lock()
async def acquire(self):
"""获取令牌"""
async with self.lock:
now = time.time()
# 补充令牌
elapsed = now - self.last_update
self.tokens = min(
self.rpm,
self.tokens + elapsed * (self.rpm / 60)
)
self.last_update = now
if self.tokens < 1:
# 等待令牌
wait_time = (1 - self.tokens) * (60 / self.rpm)
await asyncio.sleep(wait_time)
self.tokens = 0
else:
self.tokens -= 1四、成本优化
4.1 模型路由
class ModelRouter:
"""智能模型路由"""
def __init__(self):
self.models = {
"simple": {
"name": "gpt-3.5-turbo",
"cost_per_1k_tokens": 0.002,
"latency_ms": 200,
},
"medium": {
"name": "gpt-4-turbo",
"cost_per_1k_tokens": 0.03,
"latency_ms": 500,
},
"complex": {
"name": "gpt-4",
"cost_per_1k_tokens": 0.06,
"latency_ms": 1000,
},
}
self.complexity_classifier = self._load_classifier()
def route(self, prompt: str, requirements: dict = None) -> str:
"""路由到合适的模型"""
# 根据需求选择
if requirements:
if requirements.get("low_latency"):
return self.models["simple"]["name"]
if requirements.get("high_quality"):
return self.models["complex"]["name"]
# 根据复杂度分类
complexity = self._classify_complexity(prompt)
if complexity == "simple":
return self.models["simple"]["name"]
elif complexity == "medium":
return self.models["medium"]["name"]
else:
return self.models["complex"]["name"]
def _classify_complexity(self, prompt: str) -> str:
"""分类任务复杂度"""
# 简单规则
if len(prompt) < 100:
return "simple"
complex_keywords = ["分析", "推理", "解释", "比较", "evaluate", "analyze"]
if any(kw in prompt.lower() for kw in complex_keywords):
return "complex"
return "medium"
class CostTracker:
"""成本追踪"""
def __init__(self):
self.usage = defaultdict(lambda: {"tokens": 0, "cost": 0.0})
self.pricing = {
"gpt-4": {"input": 0.03, "output": 0.06},
"gpt-4-turbo": {"input": 0.01, "output": 0.03},
"gpt-3.5-turbo": {"input": 0.0005, "output": 0.0015},
"claude-3-opus": {"input": 0.015, "output": 0.075},
}
def record(self, model: str, input_tokens: int, output_tokens: int):
"""记录使用量"""
pricing = self.pricing.get(model, {"input": 0, "output": 0})
cost = (input_tokens * pricing["input"] + output_tokens * pricing["output"]) / 1000
self.usage[model]["tokens"] += input_tokens + output_tokens
self.usage[model]["cost"] += cost
def get_report(self) -> dict:
"""获取成本报告"""
total_cost = sum(u["cost"] for u in self.usage.values())
return {
"total_cost": total_cost,
"by_model": dict(self.usage),
}4.2 Prompt 优化
class PromptOptimizer:
"""Prompt 优化器"""
def __init__(self, tokenizer):
self.tokenizer = tokenizer
def compress(self, prompt: str, max_tokens: int) -> str:
"""压缩 Prompt"""
tokens = self.tokenizer.encode(prompt)
if len(tokens) <= max_tokens:
return prompt
# 策略1:截断
truncated = self.tokenizer.decode(tokens[:max_tokens])
# 策略2:摘要(用小模型)
# summary = self._summarize(prompt)
return truncated
def remove_redundancy(self, prompt: str) -> str:
"""移除冗余"""
import re
# 移除多余空白
prompt = re.sub(r'\s+', ' ', prompt)
# 移除重复内容
lines = prompt.split('\n')
unique_lines = list(dict.fromkeys(lines))
return '\n'.join(unique_lines)
def estimate_cost(self, prompt: str, model: str, expected_output_tokens: int = 500) -> float:
"""估算成本"""
input_tokens = len(self.tokenizer.encode(prompt))
pricing = {
"gpt-4": (0.03, 0.06),
"gpt-3.5-turbo": (0.0005, 0.0015),
}
input_price, output_price = pricing.get(model, (0, 0))
return (input_tokens * input_price + expected_output_tokens * output_price) / 1000五、可观测性
5.1 日志与追踪
import logging
import json
from datetime import datetime
from typing import Any
import uuid
class LLMLogger:
"""LLM 专用日志器"""
def __init__(self, service_name: str):
self.service = service_name
self.logger = logging.getLogger(service_name)
def log_request(
self,
request_id: str,
prompt: str,
model: str,
params: dict,
):
"""记录请求"""
self.logger.info(json.dumps({
"event": "llm_request",
"request_id": request_id,
"model": model,
"prompt_length": len(prompt),
"params": params,
"timestamp": datetime.utcnow().isoformat(),
}))
def log_response(
self,
request_id: str,
response: str,
latency_ms: float,
tokens_used: dict,
):
"""记录响应"""
self.logger.info(json.dumps({
"event": "llm_response",
"request_id": request_id,
"response_length": len(response),
"latency_ms": latency_ms,
"tokens": tokens_used,
"timestamp": datetime.utcnow().isoformat(),
}))
def log_error(
self,
request_id: str,
error: Exception,
context: dict = None,
):
"""记录错误"""
self.logger.error(json.dumps({
"event": "llm_error",
"request_id": request_id,
"error_type": type(error).__name__,
"error_message": str(error),
"context": context,
"timestamp": datetime.utcnow().isoformat(),
}))
class LLMTracer:
"""LLM 调用追踪"""
def __init__(self):
# 集成 OpenTelemetry 或 Jaeger
from opentelemetry import trace
self.tracer = trace.get_tracer(__name__)
def trace_call(self, func):
"""追踪装饰器"""
@wraps(func)
async def wrapper(*args, **kwargs):
with self.tracer.start_as_current_span("llm_call") as span:
span.set_attribute("model", kwargs.get("model", "unknown"))
span.set_attribute("prompt_length", len(kwargs.get("prompt", "")))
try:
result = await func(*args, **kwargs)
span.set_attribute("status", "success")
return result
except Exception as e:
span.set_attribute("status", "error")
span.set_attribute("error", str(e))
raise
return wrapper5.2 监控指标
from prometheus_client import Counter, Histogram, Gauge
class LLMMetrics:
"""LLM 监控指标"""
def __init__(self):
# 请求计数
self.requests = Counter(
'llm_requests_total',
'Total LLM requests',
['model', 'status']
)
# 延迟分布
self.latency = Histogram(
'llm_latency_seconds',
'LLM request latency',
['model'],
buckets=[0.1, 0.5, 1, 2, 5, 10, 30]
)
# Token 使用
self.tokens = Counter(
'llm_tokens_total',
'Total tokens used',
['model', 'type'] # type: input/output
)
# 成本
self.cost = Counter(
'llm_cost_dollars',
'Total cost in dollars',
['model']
)
# 缓存命中
self.cache_hits = Counter(
'llm_cache_hits_total',
'Cache hits',
['cache_type']
)
# 队列长度
self.queue_length = Gauge(
'llm_queue_length',
'Current queue length'
)
def record_request(self, model: str, status: str, latency: float, tokens: dict):
"""记录请求指标"""
self.requests.labels(model=model, status=status).inc()
self.latency.labels(model=model).observe(latency)
self.tokens.labels(model=model, type='input').inc(tokens.get('input', 0))
self.tokens.labels(model=model, type='output').inc(tokens.get('output', 0))5.3 告警规则
# prometheus_rules.yml
groups:
- name: llm_alerts
rules:
# 错误率过高
- alert: LLMHighErrorRate
expr: rate(llm_requests_total{status="error"}[5m]) / rate(llm_requests_total[5m]) > 0.1
for: 5m
labels:
severity: critical
annotations:
summary: "LLM 错误率过高"
description: "错误率超过 10%"
# 延迟过高
- alert: LLMHighLatency
expr: histogram_quantile(0.95, rate(llm_latency_seconds_bucket[5m])) > 10
for: 5m
labels:
severity: warning
annotations:
summary: "LLM 延迟过高"
description: "P95 延迟超过 10 秒"
# 成本超支
- alert: LLMCostSpike
expr: increase(llm_cost_dollars[1h]) > 100
for: 5m
labels:
severity: warning
annotations:
summary: "LLM 成本异常"
description: "1小时内成本超过 $100"
# 队列积压
- alert: LLMQueueBacklog
expr: llm_queue_length > 100
for: 5m
labels:
severity: warning
annotations:
summary: "LLM 请求队列积压"六、完整示例:生产级 LLM 服务
from fastapi import FastAPI, HTTPException, Depends
from pydantic import BaseModel
import asyncio
app = FastAPI()
# 初始化组件
cache = LLMCache()
router = ModelRouter()
metrics = LLMMetrics()
logger = LLMLogger("llm-service")
rate_limiter = RateLimiter(requests_per_minute=60)
class ChatRequest(BaseModel):
message: str
model: str = None
stream: bool = False
priority: int = 5
class ChatResponse(BaseModel):
response: str
model_used: str
latency_ms: float
cached: bool
@app.post("/chat", response_model=ChatResponse)
async def chat(request: ChatRequest):
"""聊天接口"""
request_id = str(uuid.uuid4())
start_time = time.time()
try:
# 1. 日志记录
logger.log_request(request_id, request.message, request.model, {})
# 2. 输入过滤
input_filter = InputFilter()
is_safe, reason = input_filter.filter(request.message)
if not is_safe:
raise HTTPException(status_code=400, detail=reason)
# 3. 检查缓存
cache_key = f"{request.message}:{request.model}"
cached = cache.get(request.message, request.model or "auto", {})
if cached:
metrics.cache_hits.labels(cache_type="exact").inc()
return ChatResponse(
response=cached,
model_used=request.model or "cached",
latency_ms=(time.time() - start_time) * 1000,
cached=True,
)
# 4. 模型路由
model = request.model or router.route(request.message)
# 5. 速率限制
await rate_limiter.acquire()
# 6. 调用 LLM
client = ResilientLLMClient(primary_client, fallback_clients)
response_text = await client.call(request.message, model=model)
# 7. 输出过滤
output_filter = OutputFilter()
filtered_response, issues = output_filter.filter_output(response_text)
# 8. 缓存结果
cache.set(request.message, model, {}, filtered_response)
# 9. 记录指标
latency = (time.time() - start_time) * 1000
metrics.record_request(model, "success", latency / 1000, {"input": 100, "output": 200})
logger.log_response(request_id, filtered_response, latency, {"input": 100, "output": 200})
return ChatResponse(
response=filtered_response,
model_used=model,
latency_ms=latency,
cached=False,
)
except Exception as e:
logger.log_error(request_id, e)
metrics.requests.labels(model=request.model or "unknown", status="error").inc()
raise HTTPException(status_code=500, detail=str(e))
@app.get("/health")
async def health():
"""健康检查"""
return {"status": "healthy"}
@app.get("/metrics")
async def metrics_endpoint():
"""Prometheus 指标"""
from prometheus_client import generate_latest
return Response(generate_latest(), media_type="text/plain")七、总结
架构设计核心要点
mindmap
root((LLM 应用架构))
性能
并行调用
流式响应
缓存策略
批处理
可靠性
重试机制
熔断降级
请求队列
成本
模型路由
Prompt优化
成本追踪
可观测性
日志
指标
追踪
告警
关键 Takeaway
- 分层架构:网关 → 业务 → LLM → 数据,各司其职
- 流式优先:提升用户体验的最有效方式
- 多级缓存:精确缓存 + 语义缓存
- 智能路由:简单任务用便宜模型
- 韧性设计:重试、熔断、降级
- 全面监控:延迟、错误、成本、队列
技术选型建议
| 组件 | 推荐方案 |
|---|---|
| API 框架 | FastAPI |
| 缓存 | Redis |
| 队列 | Redis/Kafka |
| 向量数据库 | Milvus/Pinecone |
| 监控 | Prometheus + Grafana |
| 追踪 | Jaeger/Zipkin |
参考资料
本文首发于 underestimated.cn
"从入门到放弃"系列第十九篇:从 Demo 到生产,路还很长。
架构决定了应用能走多远。 🏗️