搜 索

LLM 应用架构:从原型到生产

  • 2阅读
  • 2025年08月23日
  • 0评论
首页 / AI/大数据 / 正文

前言:Demo 到生产的鸿沟

你做了一个 LLM 应用的 Demo:

# Demo 版本:100 行代码搞定
response = openai.chat.completions.create(
    model="gpt-4",
    messages=[{"role": "user", "content": user_input}]
)
print(response.choices[0].message.content)

但生产环境需要考虑:

mindmap root((生产环境考量)) 性能 延迟 吞吐量 并发 可靠性 错误处理 重试机制 降级方案 成本 Token 优化 模型选择 缓存策略 安全 输入输出过滤 权限控制 审计日志 可观测 监控 日志 追踪

一、整体架构设计

1.1 典型 LLM 应用架构

flowchart TB subgraph 客户端 Web[Web 应用] Mobile[移动应用] API[API 调用] end subgraph 网关层 LB[负载均衡] Gateway[API Gateway] Auth[认证授权] RateLimit[限流] end subgraph 业务层 Router[请求路由] InputProc[输入处理] OutputProc[输出处理] Cache[缓存层] end subgraph LLM层 Orchestrator[编排层] LLM1[GPT-4] LLM2[Claude] LLM3[本地模型] RAG[RAG 服务] Agent[Agent 服务] end subgraph 数据层 VectorDB[(向量数据库)] RelDB[(关系数据库)] Redis[(Redis)] MQ[消息队列] end subgraph 可观测性 Log[日志] Metrics[指标] Trace[追踪] end Web --> LB Mobile --> LB API --> LB LB --> Gateway Gateway --> Auth Auth --> RateLimit RateLimit --> Router Router --> InputProc InputProc --> Cache Cache --> Orchestrator Orchestrator --> LLM1 Orchestrator --> LLM2 Orchestrator --> LLM3 Orchestrator --> RAG Orchestrator --> Agent RAG --> VectorDB OutputProc --> Cache Router -.-> Log Orchestrator -.-> Metrics LLM1 -.-> Trace

1.2 核心组件说明

组件作用推荐技术
API Gateway认证、限流、路由Kong, Nginx, AWS API Gateway
编排层LLM 调用逻辑LangChain, 自研
缓存层减少重复调用Redis, Memcached
向量数据库RAG 检索Milvus, Pinecone, Chroma
消息队列异步处理Kafka, RabbitMQ, Redis
可观测性监控追踪Prometheus, Grafana, Jaeger

1.3 架构模式选择

graph TB subgraph 同步模式 S1[请求] --> S2[处理] --> S3[响应] S3 -->|延迟高| S4[用户等待] end subgraph 异步模式 A1[请求] --> A2[入队] A2 --> A3[立即返回任务ID] A4[Worker处理] --> A5[结果存储] A6[轮询/回调获取结果] end subgraph 流式模式 T1[请求] --> T2[流式处理] T2 --> T3[持续返回] T3 --> T4[用户实时看到] end style T4 fill:#4ecdc4

选择建议

  • 同步模式:简单请求,延迟要求不高
  • 异步模式:复杂任务,可以接受等待
  • 流式模式:对话场景,提升用户体验

二、性能优化

2.1 延迟优化

import asyncio
import time
from typing import List
from openai import AsyncOpenAI


class LatencyOptimizer:
    """延迟优化器"""
    
    def __init__(self):
        self.client = AsyncOpenAI()
    
    async def parallel_calls(self, prompts: List[str]) -> List[str]:
        """并行调用多个 LLM 请求"""
        tasks = [
            self.client.chat.completions.create(
                model="gpt-3.5-turbo",
                messages=[{"role": "user", "content": p}]
            )
            for p in prompts
        ]
        
        responses = await asyncio.gather(*tasks)
        return [r.choices[0].message.content for r in responses]
    
    async def streaming_response(self, prompt: str):
        """流式响应"""
        stream = await self.client.chat.completions.create(
            model="gpt-4",
            messages=[{"role": "user", "content": prompt}],
            stream=True,
        )
        
        async for chunk in stream:
            if chunk.choices[0].delta.content:
                yield chunk.choices[0].delta.content
    
    async def speculative_execution(self, prompt: str):
        """投机执行:同时调用快慢模型"""
        fast_task = self.client.chat.completions.create(
            model="gpt-3.5-turbo",  # 快但质量一般
            messages=[{"role": "user", "content": prompt}]
        )
        
        slow_task = self.client.chat.completions.create(
            model="gpt-4",  # 慢但质量好
            messages=[{"role": "user", "content": prompt}]
        )
        
        # 先返回快的结果,后台继续等慢的
        done, pending = await asyncio.wait(
            [fast_task, slow_task],
            return_when=asyncio.FIRST_COMPLETED
        )
        
        fast_result = done.pop().result()
        
        # 返回快的结果,同时安排后续替换
        return {
            "initial": fast_result.choices[0].message.content,
            "pending_upgrade": pending,
        }

2.2 缓存策略

import hashlib
import json
from typing import Optional
import redis


class LLMCache:
    """LLM 响应缓存"""
    
    def __init__(self, redis_url: str = "redis://localhost:6379"):
        self.redis = redis.from_url(redis_url)
        self.default_ttl = 3600  # 1小时
    
    def _get_cache_key(self, prompt: str, model: str, params: dict) -> str:
        """生成缓存键"""
        # 将请求参数哈希
        content = json.dumps({
            "prompt": prompt,
            "model": model,
            "params": params,
        }, sort_keys=True)
        
        return f"llm:cache:{hashlib.md5(content.encode()).hexdigest()}"
    
    def get(self, prompt: str, model: str, params: dict) -> Optional[str]:
        """获取缓存"""
        key = self._get_cache_key(prompt, model, params)
        cached = self.redis.get(key)
        
        if cached:
            return json.loads(cached)["response"]
        return None
    
    def set(self, prompt: str, model: str, params: dict, response: str, ttl: int = None):
        """设置缓存"""
        key = self._get_cache_key(prompt, model, params)
        value = json.dumps({"response": response})
        
        self.redis.setex(key, ttl or self.default_ttl, value)
    
    def get_or_call(self, prompt: str, model: str, params: dict, llm_func):
        """获取缓存或调用 LLM"""
        cached = self.get(prompt, model, params)
        if cached:
            return cached, True  # 返回缓存命中标志
        
        response = llm_func(prompt, model, params)
        self.set(prompt, model, params, response)
        
        return response, False


class SemanticCache:
    """语义缓存:相似问题返回相同答案"""
    
    def __init__(self, embedding_model, similarity_threshold: float = 0.95):
        self.embedding_model = embedding_model
        self.threshold = similarity_threshold
        self.cache = {}  # 实际应用中用向量数据库
    
    def get(self, query: str) -> Optional[str]:
        """语义匹配获取缓存"""
        query_embedding = self.embedding_model.encode(query)
        
        for cached_query, (cached_embedding, response) in self.cache.items():
            similarity = self._cosine_similarity(query_embedding, cached_embedding)
            if similarity > self.threshold:
                return response
        
        return None
    
    def set(self, query: str, response: str):
        """设置语义缓存"""
        embedding = self.embedding_model.encode(query)
        self.cache[query] = (embedding, response)

2.3 批处理优化

import asyncio
from typing import List, Dict
from dataclasses import dataclass
from collections import defaultdict


@dataclass
class BatchRequest:
    """批处理请求"""
    request_id: str
    prompt: str
    future: asyncio.Future


class BatchProcessor:
    """批处理器:合并多个请求一起处理"""
    
    def __init__(
        self,
        batch_size: int = 10,
        max_wait_time: float = 0.1,  # 最多等待 100ms
    ):
        self.batch_size = batch_size
        self.max_wait_time = max_wait_time
        self.pending_requests: List[BatchRequest] = []
        self.lock = asyncio.Lock()
        self._processing = False
    
    async def submit(self, prompt: str) -> str:
        """提交请求"""
        request_id = str(uuid.uuid4())
        future = asyncio.Future()
        
        request = BatchRequest(
            request_id=request_id,
            prompt=prompt,
            future=future,
        )
        
        async with self.lock:
            self.pending_requests.append(request)
            
            # 达到批次大小,立即处理
            if len(self.pending_requests) >= self.batch_size:
                asyncio.create_task(self._process_batch())
            elif not self._processing:
                # 启动定时处理
                asyncio.create_task(self._delayed_process())
        
        return await future
    
    async def _delayed_process(self):
        """延迟处理"""
        self._processing = True
        await asyncio.sleep(self.max_wait_time)
        await self._process_batch()
        self._processing = False
    
    async def _process_batch(self):
        """处理一批请求"""
        async with self.lock:
            if not self.pending_requests:
                return
            
            batch = self.pending_requests[:self.batch_size]
            self.pending_requests = self.pending_requests[self.batch_size:]
        
        # 批量调用 LLM
        prompts = [r.prompt for r in batch]
        responses = await self._batch_llm_call(prompts)
        
        # 设置结果
        for request, response in zip(batch, responses):
            request.future.set_result(response)
    
    async def _batch_llm_call(self, prompts: List[str]) -> List[str]:
        """批量 LLM 调用"""
        # 使用 vLLM 等支持批处理的框架
        from vllm import LLM, SamplingParams
        
        llm = LLM(model="gpt-3.5-turbo")
        outputs = llm.generate(prompts, SamplingParams(max_tokens=256))
        
        return [o.outputs[0].text for o in outputs]

三、可靠性设计

3.1 错误处理与重试

import asyncio
import random
from typing import Callable, TypeVar
from functools import wraps

T = TypeVar('T')


class RetryConfig:
    """重试配置"""
    max_retries: int = 3
    base_delay: float = 1.0
    max_delay: float = 60.0
    exponential_base: float = 2.0
    jitter: bool = True


def with_retry(config: RetryConfig = RetryConfig()):
    """重试装饰器"""
    def decorator(func: Callable[..., T]) -> Callable[..., T]:
        @wraps(func)
        async def wrapper(*args, **kwargs) -> T:
            last_exception = None
            
            for attempt in range(config.max_retries + 1):
                try:
                    return await func(*args, **kwargs)
                
                except RateLimitError as e:
                    # 速率限制:等待更长时间
                    delay = min(
                        config.max_delay,
                        config.base_delay * (config.exponential_base ** attempt)
                    )
                    if config.jitter:
                        delay *= (0.5 + random.random())
                    
                    await asyncio.sleep(delay)
                    last_exception = e
                
                except (APIConnectionError, APITimeoutError) as e:
                    # 连接问题:快速重试
                    delay = config.base_delay * (attempt + 1)
                    await asyncio.sleep(delay)
                    last_exception = e
                
                except InvalidRequestError as e:
                    # 请求无效:不重试
                    raise e
            
            raise last_exception
        
        return wrapper
    return decorator


class ResilientLLMClient:
    """弹性 LLM 客户端"""
    
    def __init__(self, primary_client, fallback_clients: list = None):
        self.primary = primary_client
        self.fallbacks = fallback_clients or []
        self.circuit_breaker = CircuitBreaker()
    
    @with_retry()
    async def call(self, prompt: str, **kwargs) -> str:
        """带熔断的调用"""
        
        # 检查熔断器
        if self.circuit_breaker.is_open():
            return await self._fallback_call(prompt, **kwargs)
        
        try:
            response = await self.primary.chat.completions.create(
                model=kwargs.get("model", "gpt-4"),
                messages=[{"role": "user", "content": prompt}],
            )
            
            self.circuit_breaker.record_success()
            return response.choices[0].message.content
        
        except Exception as e:
            self.circuit_breaker.record_failure()
            
            if self.circuit_breaker.is_open():
                return await self._fallback_call(prompt, **kwargs)
            raise e
    
    async def _fallback_call(self, prompt: str, **kwargs) -> str:
        """降级调用"""
        for client in self.fallbacks:
            try:
                response = await client.chat.completions.create(
                    model=kwargs.get("fallback_model", "gpt-3.5-turbo"),
                    messages=[{"role": "user", "content": prompt}],
                )
                return response.choices[0].message.content
            except:
                continue
        
        raise Exception("所有服务都不可用")


class CircuitBreaker:
    """熔断器"""
    
    def __init__(
        self,
        failure_threshold: int = 5,
        recovery_timeout: float = 30.0,
    ):
        self.failure_threshold = failure_threshold
        self.recovery_timeout = recovery_timeout
        self.failure_count = 0
        self.last_failure_time = None
        self.state = "closed"  # closed, open, half-open
    
    def is_open(self) -> bool:
        if self.state == "closed":
            return False
        
        if self.state == "open":
            # 检查是否可以尝试恢复
            if time.time() - self.last_failure_time > self.recovery_timeout:
                self.state = "half-open"
                return False
            return True
        
        return False  # half-open
    
    def record_success(self):
        self.failure_count = 0
        self.state = "closed"
    
    def record_failure(self):
        self.failure_count += 1
        self.last_failure_time = time.time()
        
        if self.failure_count >= self.failure_threshold:
            self.state = "open"

3.2 请求队列与限流

import asyncio
from asyncio import Queue, Semaphore
from dataclasses import dataclass
from typing import Any
import time


@dataclass  
class QueuedRequest:
    """队列请求"""
    id: str
    prompt: str
    priority: int
    timestamp: float
    future: asyncio.Future


class RequestQueue:
    """请求队列"""
    
    def __init__(
        self,
        max_concurrent: int = 10,
        max_queue_size: int = 1000,
        rate_limit_rpm: int = 60,
    ):
        self.semaphore = Semaphore(max_concurrent)
        self.queue = asyncio.PriorityQueue(maxsize=max_queue_size)
        self.rate_limiter = RateLimiter(rate_limit_rpm)
        self._running = True
        
    async def start(self):
        """启动处理循环"""
        while self._running:
            # 获取请求
            priority, request = await self.queue.get()
            
            # 等待速率限制
            await self.rate_limiter.acquire()
            
            # 处理请求
            asyncio.create_task(self._process_request(request))
    
    async def submit(self, prompt: str, priority: int = 5) -> str:
        """提交请求"""
        future = asyncio.Future()
        
        request = QueuedRequest(
            id=str(uuid.uuid4()),
            prompt=prompt,
            priority=priority,
            timestamp=time.time(),
            future=future,
        )
        
        await self.queue.put((priority, request))
        
        return await future
    
    async def _process_request(self, request: QueuedRequest):
        """处理单个请求"""
        async with self.semaphore:
            try:
                response = await self._call_llm(request.prompt)
                request.future.set_result(response)
            except Exception as e:
                request.future.set_exception(e)


class RateLimiter:
    """速率限制器(令牌桶)"""
    
    def __init__(self, requests_per_minute: int):
        self.rpm = requests_per_minute
        self.tokens = requests_per_minute
        self.last_update = time.time()
        self.lock = asyncio.Lock()
    
    async def acquire(self):
        """获取令牌"""
        async with self.lock:
            now = time.time()
            
            # 补充令牌
            elapsed = now - self.last_update
            self.tokens = min(
                self.rpm,
                self.tokens + elapsed * (self.rpm / 60)
            )
            self.last_update = now
            
            if self.tokens < 1:
                # 等待令牌
                wait_time = (1 - self.tokens) * (60 / self.rpm)
                await asyncio.sleep(wait_time)
                self.tokens = 0
            else:
                self.tokens -= 1

四、成本优化

4.1 模型路由

class ModelRouter:
    """智能模型路由"""
    
    def __init__(self):
        self.models = {
            "simple": {
                "name": "gpt-3.5-turbo",
                "cost_per_1k_tokens": 0.002,
                "latency_ms": 200,
            },
            "medium": {
                "name": "gpt-4-turbo",
                "cost_per_1k_tokens": 0.03,
                "latency_ms": 500,
            },
            "complex": {
                "name": "gpt-4",
                "cost_per_1k_tokens": 0.06,
                "latency_ms": 1000,
            },
        }
        
        self.complexity_classifier = self._load_classifier()
    
    def route(self, prompt: str, requirements: dict = None) -> str:
        """路由到合适的模型"""
        
        # 根据需求选择
        if requirements:
            if requirements.get("low_latency"):
                return self.models["simple"]["name"]
            if requirements.get("high_quality"):
                return self.models["complex"]["name"]
        
        # 根据复杂度分类
        complexity = self._classify_complexity(prompt)
        
        if complexity == "simple":
            return self.models["simple"]["name"]
        elif complexity == "medium":
            return self.models["medium"]["name"]
        else:
            return self.models["complex"]["name"]
    
    def _classify_complexity(self, prompt: str) -> str:
        """分类任务复杂度"""
        # 简单规则
        if len(prompt) < 100:
            return "simple"
        
        complex_keywords = ["分析", "推理", "解释", "比较", "evaluate", "analyze"]
        if any(kw in prompt.lower() for kw in complex_keywords):
            return "complex"
        
        return "medium"


class CostTracker:
    """成本追踪"""
    
    def __init__(self):
        self.usage = defaultdict(lambda: {"tokens": 0, "cost": 0.0})
        
        self.pricing = {
            "gpt-4": {"input": 0.03, "output": 0.06},
            "gpt-4-turbo": {"input": 0.01, "output": 0.03},
            "gpt-3.5-turbo": {"input": 0.0005, "output": 0.0015},
            "claude-3-opus": {"input": 0.015, "output": 0.075},
        }
    
    def record(self, model: str, input_tokens: int, output_tokens: int):
        """记录使用量"""
        pricing = self.pricing.get(model, {"input": 0, "output": 0})
        
        cost = (input_tokens * pricing["input"] + output_tokens * pricing["output"]) / 1000
        
        self.usage[model]["tokens"] += input_tokens + output_tokens
        self.usage[model]["cost"] += cost
    
    def get_report(self) -> dict:
        """获取成本报告"""
        total_cost = sum(u["cost"] for u in self.usage.values())
        
        return {
            "total_cost": total_cost,
            "by_model": dict(self.usage),
        }

4.2 Prompt 优化

class PromptOptimizer:
    """Prompt 优化器"""
    
    def __init__(self, tokenizer):
        self.tokenizer = tokenizer
    
    def compress(self, prompt: str, max_tokens: int) -> str:
        """压缩 Prompt"""
        tokens = self.tokenizer.encode(prompt)
        
        if len(tokens) <= max_tokens:
            return prompt
        
        # 策略1:截断
        truncated = self.tokenizer.decode(tokens[:max_tokens])
        
        # 策略2:摘要(用小模型)
        # summary = self._summarize(prompt)
        
        return truncated
    
    def remove_redundancy(self, prompt: str) -> str:
        """移除冗余"""
        import re
        
        # 移除多余空白
        prompt = re.sub(r'\s+', ' ', prompt)
        
        # 移除重复内容
        lines = prompt.split('\n')
        unique_lines = list(dict.fromkeys(lines))
        
        return '\n'.join(unique_lines)
    
    def estimate_cost(self, prompt: str, model: str, expected_output_tokens: int = 500) -> float:
        """估算成本"""
        input_tokens = len(self.tokenizer.encode(prompt))
        
        pricing = {
            "gpt-4": (0.03, 0.06),
            "gpt-3.5-turbo": (0.0005, 0.0015),
        }
        
        input_price, output_price = pricing.get(model, (0, 0))
        
        return (input_tokens * input_price + expected_output_tokens * output_price) / 1000

五、可观测性

5.1 日志与追踪

import logging
import json
from datetime import datetime
from typing import Any
import uuid


class LLMLogger:
    """LLM 专用日志器"""
    
    def __init__(self, service_name: str):
        self.service = service_name
        self.logger = logging.getLogger(service_name)
    
    def log_request(
        self,
        request_id: str,
        prompt: str,
        model: str,
        params: dict,
    ):
        """记录请求"""
        self.logger.info(json.dumps({
            "event": "llm_request",
            "request_id": request_id,
            "model": model,
            "prompt_length": len(prompt),
            "params": params,
            "timestamp": datetime.utcnow().isoformat(),
        }))
    
    def log_response(
        self,
        request_id: str,
        response: str,
        latency_ms: float,
        tokens_used: dict,
    ):
        """记录响应"""
        self.logger.info(json.dumps({
            "event": "llm_response",
            "request_id": request_id,
            "response_length": len(response),
            "latency_ms": latency_ms,
            "tokens": tokens_used,
            "timestamp": datetime.utcnow().isoformat(),
        }))
    
    def log_error(
        self,
        request_id: str,
        error: Exception,
        context: dict = None,
    ):
        """记录错误"""
        self.logger.error(json.dumps({
            "event": "llm_error",
            "request_id": request_id,
            "error_type": type(error).__name__,
            "error_message": str(error),
            "context": context,
            "timestamp": datetime.utcnow().isoformat(),
        }))


class LLMTracer:
    """LLM 调用追踪"""
    
    def __init__(self):
        # 集成 OpenTelemetry 或 Jaeger
        from opentelemetry import trace
        self.tracer = trace.get_tracer(__name__)
    
    def trace_call(self, func):
        """追踪装饰器"""
        @wraps(func)
        async def wrapper(*args, **kwargs):
            with self.tracer.start_as_current_span("llm_call") as span:
                span.set_attribute("model", kwargs.get("model", "unknown"))
                span.set_attribute("prompt_length", len(kwargs.get("prompt", "")))
                
                try:
                    result = await func(*args, **kwargs)
                    span.set_attribute("status", "success")
                    return result
                except Exception as e:
                    span.set_attribute("status", "error")
                    span.set_attribute("error", str(e))
                    raise
        
        return wrapper

5.2 监控指标

from prometheus_client import Counter, Histogram, Gauge


class LLMMetrics:
    """LLM 监控指标"""
    
    def __init__(self):
        # 请求计数
        self.requests = Counter(
            'llm_requests_total',
            'Total LLM requests',
            ['model', 'status']
        )
        
        # 延迟分布
        self.latency = Histogram(
            'llm_latency_seconds',
            'LLM request latency',
            ['model'],
            buckets=[0.1, 0.5, 1, 2, 5, 10, 30]
        )
        
        # Token 使用
        self.tokens = Counter(
            'llm_tokens_total',
            'Total tokens used',
            ['model', 'type']  # type: input/output
        )
        
        # 成本
        self.cost = Counter(
            'llm_cost_dollars',
            'Total cost in dollars',
            ['model']
        )
        
        # 缓存命中
        self.cache_hits = Counter(
            'llm_cache_hits_total',
            'Cache hits',
            ['cache_type']
        )
        
        # 队列长度
        self.queue_length = Gauge(
            'llm_queue_length',
            'Current queue length'
        )
    
    def record_request(self, model: str, status: str, latency: float, tokens: dict):
        """记录请求指标"""
        self.requests.labels(model=model, status=status).inc()
        self.latency.labels(model=model).observe(latency)
        self.tokens.labels(model=model, type='input').inc(tokens.get('input', 0))
        self.tokens.labels(model=model, type='output').inc(tokens.get('output', 0))

5.3 告警规则

# prometheus_rules.yml
groups:
  - name: llm_alerts
    rules:
      # 错误率过高
      - alert: LLMHighErrorRate
        expr: rate(llm_requests_total{status="error"}[5m]) / rate(llm_requests_total[5m]) > 0.1
        for: 5m
        labels:
          severity: critical
        annotations:
          summary: "LLM 错误率过高"
          description: "错误率超过 10%"
      
      # 延迟过高
      - alert: LLMHighLatency
        expr: histogram_quantile(0.95, rate(llm_latency_seconds_bucket[5m])) > 10
        for: 5m
        labels:
          severity: warning
        annotations:
          summary: "LLM 延迟过高"
          description: "P95 延迟超过 10 秒"
      
      # 成本超支
      - alert: LLMCostSpike
        expr: increase(llm_cost_dollars[1h]) > 100
        for: 5m
        labels:
          severity: warning
        annotations:
          summary: "LLM 成本异常"
          description: "1小时内成本超过 $100"
      
      # 队列积压
      - alert: LLMQueueBacklog
        expr: llm_queue_length > 100
        for: 5m
        labels:
          severity: warning
        annotations:
          summary: "LLM 请求队列积压"

六、完整示例:生产级 LLM 服务

from fastapi import FastAPI, HTTPException, Depends
from pydantic import BaseModel
import asyncio


app = FastAPI()


# 初始化组件
cache = LLMCache()
router = ModelRouter()
metrics = LLMMetrics()
logger = LLMLogger("llm-service")
rate_limiter = RateLimiter(requests_per_minute=60)


class ChatRequest(BaseModel):
    message: str
    model: str = None
    stream: bool = False
    priority: int = 5


class ChatResponse(BaseModel):
    response: str
    model_used: str
    latency_ms: float
    cached: bool


@app.post("/chat", response_model=ChatResponse)
async def chat(request: ChatRequest):
    """聊天接口"""
    request_id = str(uuid.uuid4())
    start_time = time.time()
    
    try:
        # 1. 日志记录
        logger.log_request(request_id, request.message, request.model, {})
        
        # 2. 输入过滤
        input_filter = InputFilter()
        is_safe, reason = input_filter.filter(request.message)
        if not is_safe:
            raise HTTPException(status_code=400, detail=reason)
        
        # 3. 检查缓存
        cache_key = f"{request.message}:{request.model}"
        cached = cache.get(request.message, request.model or "auto", {})
        if cached:
            metrics.cache_hits.labels(cache_type="exact").inc()
            return ChatResponse(
                response=cached,
                model_used=request.model or "cached",
                latency_ms=(time.time() - start_time) * 1000,
                cached=True,
            )
        
        # 4. 模型路由
        model = request.model or router.route(request.message)
        
        # 5. 速率限制
        await rate_limiter.acquire()
        
        # 6. 调用 LLM
        client = ResilientLLMClient(primary_client, fallback_clients)
        response_text = await client.call(request.message, model=model)
        
        # 7. 输出过滤
        output_filter = OutputFilter()
        filtered_response, issues = output_filter.filter_output(response_text)
        
        # 8. 缓存结果
        cache.set(request.message, model, {}, filtered_response)
        
        # 9. 记录指标
        latency = (time.time() - start_time) * 1000
        metrics.record_request(model, "success", latency / 1000, {"input": 100, "output": 200})
        
        logger.log_response(request_id, filtered_response, latency, {"input": 100, "output": 200})
        
        return ChatResponse(
            response=filtered_response,
            model_used=model,
            latency_ms=latency,
            cached=False,
        )
    
    except Exception as e:
        logger.log_error(request_id, e)
        metrics.requests.labels(model=request.model or "unknown", status="error").inc()
        raise HTTPException(status_code=500, detail=str(e))


@app.get("/health")
async def health():
    """健康检查"""
    return {"status": "healthy"}


@app.get("/metrics")
async def metrics_endpoint():
    """Prometheus 指标"""
    from prometheus_client import generate_latest
    return Response(generate_latest(), media_type="text/plain")

七、总结

架构设计核心要点

mindmap root((LLM 应用架构)) 性能 并行调用 流式响应 缓存策略 批处理 可靠性 重试机制 熔断降级 请求队列 成本 模型路由 Prompt优化 成本追踪 可观测性 日志 指标 追踪 告警

关键 Takeaway

  1. 分层架构:网关 → 业务 → LLM → 数据,各司其职
  2. 流式优先:提升用户体验的最有效方式
  3. 多级缓存:精确缓存 + 语义缓存
  4. 智能路由:简单任务用便宜模型
  5. 韧性设计:重试、熔断、降级
  6. 全面监控:延迟、错误、成本、队列

技术选型建议

组件推荐方案
API 框架FastAPI
缓存Redis
队列Redis/Kafka
向量数据库Milvus/Pinecone
监控Prometheus + Grafana
追踪Jaeger/Zipkin

参考资料

  1. LangChain 文档
  2. OpenAI 最佳实践
  3. MLOps 实践
  4. vLLM 部署指南

本文首发于 underestimated.cn

"从入门到放弃"系列第十九篇:从 Demo 到生产,路还很长。

架构决定了应用能走多远。 🏗️

评论区
暂无评论
avatar