Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

第13章:性能优化

章节概述

性能优化是现代软件开发中的关键技能。在本章中,我们将深入探索Rust的性能优化技术,从底层内存管理到高并发处理,掌握构建高性能系统的核心技术。本章不仅关注理论,更重要的是通过实际项目将理论应用到实践中。

学习目标

  • 掌握Rust性能分析工具和方法
  • 理解内存管理优化技术
  • 学会并发性能优化策略
  • 掌握缓存策略和实现
  • 设计并实现一个高性能缓存服务系统

实战项目:构建一个企业级高性能缓存服务,支持分布式缓存、内存池管理、性能监控、故障恢复等企业级特性。

13.1 性能分析基础

13.1.1 性能分析工具

Rust提供了多种性能分析工具,帮助开发者识别性能瓶颈:

13.1.1.1 Criterion.rs - 基准测试框架

#![allow(unused)]
fn main() {
// 性能基准测试
// File: performance-benches/Cargo.toml
[package]
name = "performance-benches"
version = "0.1.0"
edition = "2021"

[dev-dependencies]
criterion = { version = "0.5", features = ["html_reports"] }
tokio = { version = "1.0", features = ["full"] }
redis = { version = "0.24" }
sqlx = { version = "0.7" }

[[bench]]
name = "string_operations"
harness = false

[[bench]]
name = "database_queries"
harness = false

[[bench]]
name = "concurrent_operations"
harness = false
}
#![allow(unused)]
fn main() {
// File: performance-benches/benches/string_operations.rs
use criterion::{black_box, criterion_group, criterion_main, Criterion, BenchmarkId};
use std::collections::HashMap;
use std::time::Duration;

fn string_concatenation_bench(c: &mut Criterion) {
    let mut group = c.benchmark_group("string_operations");
    group.sample_size(100);
    group.measurement_time(Duration::from_secs(10));
    
    // String拼接测试
    let test_sizes = vec![10, 100, 1000, 10000];
    
    for size in test_sizes {
        group.bench_with_input(
            BenchmarkId::new("string_push", size),
            &size,
            |b, &size| {
                b.iter(|| {
                    let mut s = String::new();
                    for i in 0..size {
                        s.push_str(&format!("item_{}", i));
                    }
                    black_box(s);
                })
            },
        );
        
        group.bench_with_input(
            BenchmarkId::new("format_macro", size),
            &size,
            |b, &size| {
                b.iter(|| {
                    let mut items = Vec::new();
                    for i in 0..size {
                        items.push(format!("item_{}", i));
                    }
                    let s = items.join(", ");
                    black_box(s);
                })
            },
        );
        
        group.bench_with_input(
            BenchmarkId::new("string_builder", size),
            &size,
            |b, &size| {
                b.iter(|| {
                    let mut s = String::with_capacity(size * 10);
                    for i in 0..size {
                        s.push_str(&format!("item_{}", i));
                    }
                    black_box(s);
                })
            },
        );
    }
    
    group.finish();
}

fn hashmap_operations_bench(c: &mut Criterion) {
    let mut group = c.benchmark_group("hashmap_operations");
    group.sample_size(50);
    
    // HashMap插入性能测试
    group.bench_function("hashmap_insert", |b| {
        b.iter(|| {
            let mut map = HashMap::new();
            for i in 0..1000 {
                map.insert(format!("key_{}", i), format!("value_{}", i));
            }
            black_box(map);
        })
    });
    
    // HashMap查找性能测试
    group.bench_function("hashmap_lookup", |b| {
        let mut map = HashMap::new();
        for i in 0..1000 {
            map.insert(format!("key_{}", i), format!("value_{}", i));
        }
        
        b.iter(|| {
            for i in 0..1000 {
                let key = format!("key_{}", i);
                let value = map.get(&key);
                black_box(value);
            }
        })
    });
    
    // 预分配容量测试
    group.bench_function("hashmap_with_capacity", |b| {
        b.iter(|| {
            let mut map = HashMap::with_capacity(1000);
            for i in 0..1000 {
                map.insert(format!("key_{}", i), format!("value_{}", i));
            }
            black_box(map);
        })
    });
    
    group.finish();
}

fn sorting_algorithms_bench(c: &mut Criterion) {
    let mut group = c.benchmark_group("sorting");
    group.sample_size(30);
    
    let test_data_sizes = vec![100, 1000, 10000];
    
    for &size in &test_data_sizes {
        let data: Vec<i32> = (0..size).rev().collect(); // 反序数据
        
        group.bench_with_input(
            BenchmarkId::new("sort_unstable", size),
            &data,
            |b, data| {
                b.iter(|| {
                    let mut data = data.clone();
                    data.sort_unstable();
                    black_box(data);
                })
            },
        );
        
        group.bench_with_input(
            BenchmarkId::new("sort_stable", size),
            &data,
            |b, data| {
                b.iter(|| {
                    let mut data = data.clone();
                    data.sort();
                    black_box(data);
                })
            },
        );
    }
    
    group.finish();
}

criterion_group!(
    benches,
    string_concatenation_bench,
    hashmap_operations_bench,
    sorting_algorithms_bench
);
criterion_main!(benches);
}

13.1.1.2 perf工具集成

#![allow(unused)]
fn main() {
// 性能分析辅助工具
// File: perf-tools/src/lib.rs
use std::time::{Duration, Instant};
use std::collections::HashMap;
use tracing::{info, warn};
use once_cell::sync::Lazy;
use std::sync::Mutex;

/// 全局性能监控器
pub static PERF_MONITOR: Lazy<Mutex<PerformanceMonitor>> = Lazy::new(|| {
    Mutex::new(PerformanceMonitor::new())
});

/// 性能监控器
pub struct PerformanceMonitor {
    metrics: HashMap<String, MetricData>,
    enabled: bool,
}

#[derive(Debug, Clone)]
pub struct MetricData {
    pub name: String,
    pub total_time: Duration,
    pub call_count: u64,
    pub min_time: Duration,
    pub max_time: Duration,
    pub avg_time: Duration,
    pub last_updated: Instant,
}

impl PerformanceMonitor {
    pub fn new() -> Self {
        PerformanceMonitor {
            metrics: HashMap::new(),
            enabled: true,
        }
    }
    
    pub fn enable(&mut self) {
        self.enabled = true;
    }
    
    pub fn disable(&mut self) {
        self.enabled = false;
    }
    
    pub fn record_metric(&mut self, name: &str, duration: Duration) {
        if !self.enabled {
            return;
        }
        
        let metric = self.metrics.entry(name.to_string()).or_insert_with(|| {
            MetricData {
                name: name.to_string(),
                total_time: Duration::from_secs(0),
                call_count: 0,
                min_time: Duration::MAX,
                max_time: Duration::from_secs(0),
                avg_time: Duration::from_secs(0),
                last_updated: Instant::now(),
            }
        });
        
        metric.total_time += duration;
        metric.call_count += 1;
        metric.last_updated = Instant::now();
        
        if duration < metric.min_time {
            metric.min_time = duration;
        }
        
        if duration > metric.max_time {
            metric.max_time = duration;
        }
        
        metric.avg_time = Duration::from_nanos(
            metric.total_time.as_nanos() as u64 / metric.call_count
        );
    }
    
    pub fn get_metrics(&self) -> Vec<MetricData> {
        self.metrics.values().cloned().collect()
    }
    
    pub fn report(&self) {
        if !self.enabled {
            return;
        }
        
        info!("Performance Metrics Report");
        info!("{:<30} | {:<10} | {:<15} | {:<15} | {:<15}", 
              "Operation", "Count", "Avg Time", "Min Time", "Max Time");
        info!("{:-<30}-+-{:-<10}-+-{:-<15}-+-{:-<15}-+-{:-<15}", 
              "", "", "", "", "");
        
        let mut metrics: Vec<_> = self.metrics.values().collect();
        metrics.sort_by(|a, b| b.avg_time.cmp(&a.avg_time));
        
        for metric in metrics {
            info!("{:<30} | {:<10} | {:<15} | {:<15} | {:<15}", 
                  metric.name, 
                  metric.call_count,
                  format!("{:.2}ms", metric.avg_time.as_secs_f64() * 1000.0),
                  format!("{:.2}ms", metric.min_time.as_secs_f64() * 1000.0),
                  format!("{:.2}ms", metric.max_time.as_secs_f64() * 1000.0));
        }
    }
}

/// 性能分析器包装器
pub struct Profiler {
    name: String,
    start_time: Instant,
}

impl Profiler {
    pub fn new(name: &str) -> Self {
        Profiler {
            name: name.to_string(),
            start_time: Instant::now(),
        }
    }
}

impl Drop for Profiler {
    fn drop(&mut self) {
        let duration = self.start_time.elapsed();
        PERF_MONITOR.lock().unwrap().record_metric(&self.name, duration);
    }
}

/// 宏定义便于使用
#[macro_export]
macro_rules! profile_func {
    ($name:expr) => {
        let _profiler = $crate::Profiler::new($name);
    };
}

#[macro_export]
macro_rules! profile_operation {
    ($name:expr, $op:block) => {
        {
            let _profiler = $crate::Profiler::new($name);
            let result = $op;
            drop(_profiler);
            result
        }
    };
}

/// 内存使用监控
pub struct MemoryProfiler {
    start_memory: usize,
    peak_memory: usize,
}

impl MemoryProfiler {
    pub fn new() -> Self {
        let start_memory = Self::get_memory_usage();
        MemoryProfiler {
            start_memory,
            peak_memory: start_memory,
        }
    }
    
    fn get_memory_usage() -> usize {
        // 在Linux系统上读取 /proc/self/status
        #[cfg(target_os = "linux")]
        {
            if let Ok(content) = std::fs::read_to_string("/proc/self/status") {
                for line in content.lines() {
                    if line.starts_with("VmRSS:") {
                        if let Some(kb_str) = line.split_whitespace().nth(1) {
                            return kb_str.parse::<usize>().unwrap_or(0) * 1024; // 转换为字节
                        }
                    }
                }
            }
        }
        
        // 其他平台使用默认实现
        0
    }
    
    pub fn update_peak(&mut self) {
        let current = Self::get_memory_usage();
        if current > self.peak_memory {
            self.peak_memory = current;
        }
    }
    
    pub fn report(&self) {
        let current = Self::get_memory_usage();
        info!("Memory Usage: Current: {:.2}MB, Peak: {:.2}MB, Change: {:.2}MB",
              current as f64 / 1024.0 / 1024.0,
              self.peak_memory as f64 / 1024.0 / 1024.0,
              (current - self.start_memory) as f64 / 1024.0 / 1024.0);
    }
}

impl Drop for MemoryProfiler {
    fn drop(&mut self) {
        self.report();
    }
}
}

13.1.1.3 自定义性能分析器

#![allow(unused)]
fn main() {
// 高级性能分析器
// File: perf-tools/src/advanced.rs
use std::time::{Duration, Instant};
use std::sync::{Arc, Mutex};
use std::thread;
use crossbeam::channel::{unbounded, Sender, Receiver};
use tracing::{info, warn, debug};

/// 实时性能监控
pub struct RealTimeMonitor {
    metrics: Arc<Mutex<MetricsCollector>>,
    collector_thread: Option<thread::JoinHandle<()>>,
    sampling_interval: Duration,
}

#[derive(Debug, Clone)]
pub struct SystemMetrics {
    pub cpu_usage: f64,
    pub memory_usage: f64,
    pub gc_count: u64,
    pub active_connections: u64,
    pub request_rate: f64,
    pub response_time: Duration,
}

#[derive(Debug, Clone)]
pub struct MetricsCollector {
    pub samples: Vec<SystemMetrics>,
    pub min_response_time: Duration,
    pub max_response_time: Duration,
    pub avg_response_time: Duration,
    pub total_requests: u64,
    pub error_count: u64,
}

impl MetricsCollector {
    pub fn new() -> Self {
        MetricsCollector {
            samples: Vec::new(),
            min_response_time: Duration::MAX,
            max_response_time: Duration::from_secs(0),
            avg_response_time: Duration::from_secs(0),
            total_requests: 0,
            error_count: 0,
        }
    }
    
    pub fn record_request(&mut self, response_time: Duration, success: bool) {
        self.total_requests += 1;
        if !success {
            self.error_count += 1;
        }
        
        if response_time < self.min_response_time {
            self.min_response_time = response_time;
        }
        
        if response_time > self.max_response_time {
            self.max_response_time = response_time;
        }
        
        // 计算平均响应时间
        if self.total_requests > 0 {
            self.avg_response_time = Duration::from_nanos(
                (self.avg_response_time.as_nanos() as u64 * (self.total_requests - 1) + 
                 response_time.as_nanos() as u64) / self.total_requests
            );
        }
    }
    
    pub fn collect_system_metrics(&mut self) {
        let metrics = SystemMetrics {
            cpu_usage: self.get_cpu_usage(),
            memory_usage: self.get_memory_usage(),
            gc_count: self.get_gc_count(),
            active_connections: self.get_active_connections(),
            request_rate: self.calculate_request_rate(),
            response_time: self.avg_response_time,
        };
        
        self.samples.push(metrics);
        
        // 保持最近100个样本
        if self.samples.len() > 100 {
            self.samples.remove(0);
        }
    }
    
    fn get_cpu_usage(&self) -> f64 {
        // 简化的CPU使用率计算
        // 在实际项目中可以使用更精确的库
        rand::random::<f64>() * 100.0
    }
    
    fn get_memory_usage(&self) -> f64 {
        // 获取内存使用情况
        #[cfg(target_os = "linux")]
        {
            if let Ok(content) = std::fs::read_to_string("/proc/meminfo") {
                for line in content.lines() {
                    if line.starts_with("MemAvailable:") {
                        if let Some(kb_str) = line.split_whitespace().nth(1) {
                            let available_kb = kb_str.parse::<f64>().unwrap_or(0.0);
                            let total_kb = available_kb / 0.1; // 简化计算
                            return (total_kb - available_kb) / total_kb * 100.0;
                        }
                    }
                }
            }
        }
        rand::random::<f64>() * 100.0
    }
    
    fn get_gc_count(&self) -> u64 {
        // Rust的垃圾回收统计
        // 这里返回模拟值
        rand::random::<u64>() % 1000
    }
    
    fn get_active_connections(&self) -> u64 {
        // 模拟活跃连接数
        rand::random::<u64>() % 10000
    }
    
    fn calculate_request_rate(&self) -> f64 {
        if self.samples.len() < 2 {
            return 0.0;
        }
        
        let recent_samples = &self.samples[self.samples.len().saturating_sub(10)..];
        let time_diff = recent_samples.len() as f64;
        
        if time_diff > 0.0 {
            self.total_requests as f64 / time_diff
        } else {
            0.0
        }
    }
}

impl RealTimeMonitor {
    pub fn new(sampling_interval: Duration) -> Self {
        RealTimeMonitor {
            metrics: Arc::new(Mutex::new(MetricsCollector::new())),
            collector_thread: None,
            sampling_interval,
        }
    }
    
    pub fn start(&mut self) {
        let metrics = Arc::clone(&self.metrics);
        let sampling_interval = self.sampling_interval;
        
        self.collector_thread = Some(thread::spawn(move || {
            loop {
                thread::sleep(sampling_interval);
                
                if let Ok(mut collector) = metrics.lock() {
                    collector.collect_system_metrics();
                }
            }
        }));
    }
    
    pub fn stop(&mut self) {
        if let Some(handle) = self.collector_thread.take() {
            handle.join().unwrap_or_else(|_| {
                warn!("Performance monitor thread panicked");
            });
        }
    }
    
    pub fn record_request(&self, response_time: Duration, success: bool) {
        if let Ok(mut collector) = self.metrics.lock() {
            collector.record_request(response_time, success);
        }
    }
    
    pub fn get_metrics(&self) -> Option<MetricsCollector> {
        if let Ok(collector) = self.metrics.lock() {
            Some(collector.clone())
        } else {
            None
        }
    }
    
    pub fn generate_report(&self) {
        if let Some(metrics) = self.get_metrics() {
            info!("=== Real-time Performance Report ===");
            info!("Total Requests: {}", metrics.total_requests);
            info!("Error Count: {}", metrics.error_count);
            info!("Error Rate: {:.2}%", 
                  if metrics.total_requests > 0 { 
                      metrics.error_count as f64 / metrics.total_requests as f64 * 100.0 
                  } else { 0.0 });
            info!("Average Response Time: {:.2}ms", metrics.avg_response_time.as_secs_f64() * 1000.0);
            info!("Min Response Time: {:.2}ms", metrics.min_response_time.as_secs_f64() * 1000.0);
            info!("Max Response Time: {:.2}ms", metrics.max_response_time.as_secs_f64() * 1000.0);
            
            if !metrics.samples.is_empty() {
                let latest = &metrics.samples[metrics.samples.len() - 1];
                info!("Current CPU Usage: {:.1}%", latest.cpu_usage);
                info!("Current Memory Usage: {:.1}%", latest.memory_usage);
                info!("Active Connections: {}", latest.active_connections);
                info!("Request Rate: {:.1} req/s", latest.request_rate);
            }
        }
    }
}

/// 性能警告系统
pub struct PerformanceAlert {
    thresholds: PerformanceThresholds,
    alert_channel: Option<Sender<PerformanceAlert>>,
    current_state: AlertState,
}

#[derive(Debug, Clone)]
pub struct PerformanceThresholds {
    pub max_response_time: Duration,
    pub max_error_rate: f64,
    pub max_memory_usage: f64,
    pub max_cpu_usage: f64,
}

#[derive(Debug, Clone)]
pub struct AlertState {
    pub high_response_time: bool,
    pub high_error_rate: bool,
    pub high_memory_usage: bool,
    pub high_cpu_usage: bool,
}

impl PerformanceAlert {
    pub fn new(thresholds: PerformanceThresholds) -> Self {
        PerformanceAlert {
            thresholds,
            alert_channel: None,
            current_state: AlertState {
                high_response_time: false,
                high_error_rate: false,
                high_memory_usage: false,
                high_cpu_usage: false,
            },
        }
    }
    
    pub fn with_channel(mut self, channel: Sender<PerformanceAlert>) -> Self {
        self.alert_channel = Some(channel);
        self
    }
    
    pub fn check_metrics(&mut self, metrics: &MetricsCollector) {
        let new_state = AlertState {
            high_response_time: metrics.avg_response_time > self.thresholds.max_response_time,
            high_error_rate: if metrics.total_requests > 0 {
                metrics.error_count as f64 / metrics.total_requests as f64 * 100.0
            } else { 0.0 } > self.thresholds.max_error_rate,
            high_memory_usage: false, // 需要系统级监控
            high_cpu_usage: false,    // 需要系统级监控
        };
        
        // 检查状态变化
        self.check_state_change("High Response Time", 
                               self.current_state.high_response_time, 
                               new_state.high_response_time);
        
        self.check_state_change("High Error Rate", 
                               self.current_state.high_error_rate, 
                               new_state.high_error_rate);
        
        self.current_state = new_state;
    }
    
    fn check_state_change(&self, alert_type: &str, old_state: bool, new_state: bool) {
        if old_state != new_state {
            if new_state {
                warn!("Performance Alert: {} is now HIGH", alert_type);
                if let Some(ref channel) = self.alert_channel {
                    let _ = channel.send(PerformanceAlert {
                        thresholds: self.thresholds.clone(),
                        alert_channel: self.alert_channel.clone(),
                        current_state: self.current_state.clone(),
                    });
                }
            } else {
                info!("Performance Alert: {} is now normal", alert_type);
            }
        }
    }
}
}

13.1.2 使用tracing进行性能监控

#![allow(unused)]
fn main() {
// 集成tracing的性能监控
// File: tracing-integration/src/lib.rs
use tracing::{info, instrument, span, Level};
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
use std::time::Instant;

/// 带有性能追踪的服务
#[instrument(skip(self))]
pub struct TracedService {
    service_name: String,
    request_count: std::sync::atomic::AtomicU64,
    error_count: std::sync::atomic::AtomicU64,
    total_duration: std::sync::atomic::AtomicU64,
}

impl TracedService {
    pub fn new(service_name: &str) -> Self {
        TracedService {
            service_name: service_name.to_string(),
            request_count: std::sync::atomic::AtomicU64::new(0),
            error_count: std::sync::atomic::AtomicU64::new(0),
            total_duration: std::sync::atomic::AtomicU64::new(0),
        }
    }
    
    #[instrument(fields(request_id = %uuid::Uuid::new_v4(), user_id = %"anonymous"))]
    pub async fn process_request(&self, data: &str) -> Result<String, Box<dyn std::error::Error>> {
        let start = Instant::now();
        self.request_count.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
        
        let span = span!(Level::INFO, "process_request", service = %self.service_name);
        let _enter = span.enter();
        
        info!("Started processing request with data length: {}", data.len());
        
        // 模拟处理
        tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
        
        if data.len() > 1000 {
            self.error_count.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
            return Err("Data too large".into());
        }
        
        let result = format!("Processed: {}", data);
        
        let duration = start.elapsed();
        self.total_duration.fetch_add(duration.as_nanos() as u64, std::sync::atomic::Ordering::Relaxed);
        
        info!("Successfully processed request, duration: {:?}", duration);
        
        Ok(result)
    }
    
    pub fn get_stats(&self) -> ServiceStats {
        let request_count = self.request_count.load(std::sync::atomic::Ordering::Relaxed);
        let error_count = self.error_count.load(std::sync::atomic::Ordering::Relaxed);
        let total_duration = self.total_duration.load(std::sync::atomic::Ordering::Relaxed);
        
        ServiceStats {
            service_name: self.service_name.clone(),
            request_count,
            error_count,
            error_rate: if request_count > 0 {
                error_count as f64 / request_count as f64 * 100.0
            } else {
                0.0
            },
            avg_duration: if request_count > 0 {
                std::time::Duration::from_nanos(total_duration / request_count)
            } else {
                std::time::Duration::from_secs(0)
            },
        }
    }
}

#[derive(Debug, Clone)]
pub struct ServiceStats {
    pub service_name: String,
    pub request_count: u64,
    pub error_count: u64,
    pub error_rate: f64,
    pub avg_duration: std::time::Duration,
}

/// 初始化tracing性能监控
pub fn init_tracing() {
    tracing_subscriber::registry()
        .with(
            tracing_subscriber::EnvFilter::try_from_default_env()
                .unwrap_or_else(|_| "tracing_performance=debug,perf_tools=info,tokio=warn".into()),
        )
        .with(tracing_subscriber::fmt::layer())
        .with(
            // JSON格式化,便于日志分析
            tracing_subscriber::fmt::layer()
                .json()
                .with_current_span(false)
                .with_span_list(false)
        )
        .init();
}
}

13.2 内存优化

13.2.1 内存管理基础

Rust的所有权系统为内存优化提供了强大的工具:

#![allow(unused)]
fn main() {
// 内存池实现
// File: memory-pools/src/lib.rs
use std::alloc::{GlobalAlloc, Layout, System};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::ptr::NonNull;
use std::marker::PhantomData;
use tracing::{info, warn, debug};

/// 预分配内存池
pub struct MemoryPool {
    pool: Vec<NonNull<u8>>,
    current_index: AtomicUsize,
    block_size: usize,
    total_blocks: usize,
    allocated: AtomicUsize,
}

unsafe impl Send for MemoryPool {}
unsafe impl Sync for MemoryPool {}

impl MemoryPool {
    pub fn new(block_size: usize, total_blocks: usize) -> Self {
        // 对齐到16字节边界
        let aligned_block_size = (block_size + 15) & !15;
        
        let mut pool = Vec::with_capacity(total_blocks);
        
        for _ in 0..total_blocks {
            let layout = Layout::from_size_align(aligned_block_size, 16)
                .expect("Invalid layout");
            
            unsafe {
                let ptr = System.alloc(layout);
                if ptr.is_null() {
                    panic!("Failed to allocate memory for pool");
                }
                
                pool.push(NonNull::new_unchecked(ptr));
            }
        }
        
        info!("Created memory pool: {} blocks of {} bytes", total_blocks, aligned_block_size);
        
        MemoryPool {
            pool,
            current_index: AtomicUsize::new(0),
            block_size: aligned_block_size,
            total_blocks,
            allocated: AtomicUsize::new(0),
        }
    }
    
    pub fn allocate(&self) -> Option<NonNull<u8>> {
        let old_index = self.current_index.fetch_add(1, Ordering::SeqCst);
        let new_index = old_index % self.total_blocks;
        
        if old_index < self.total_blocks {
            self.allocated.fetch_add(1, Ordering::SeqCst);
            Some(self.pool[new_index])
        } else {
            None // 池已耗尽
        }
    }
    
    pub fn is_exhausted(&self) -> bool {
        self.allocated.load(Ordering::SeqCst) >= self.total_blocks
    }
    
    pub fn get_stats(&self) -> PoolStats {
        PoolStats {
            total_blocks: self.total_blocks,
            allocated: self.allocated.load(Ordering::SeqCst),
            block_size: self.block_size,
            utilization: self.allocated.load(Ordering::SeqCst) as f64 / self.total_blocks as f64 * 100.0,
        }
    }
}

impl Drop for MemoryPool {
    fn drop(&mut self) {
        let layout = Layout::from_size_align(self.block_size, 16).unwrap();
        
        for ptr in &self.pool {
            unsafe {
                System.dealloc(ptr.as_ptr(), layout);
            }
        }
        
        debug!("Memory pool dropped, returned {} blocks to system", self.total_blocks);
    }
}

#[derive(Debug, Clone)]
pub struct PoolStats {
    pub total_blocks: usize,
    pub allocated: usize,
    pub block_size: usize,
    pub utilization: f64,
}

/// 智能指针包装器,自动内存池分配
pub struct PooledBox<T> {
    ptr: NonNull<T>,
    pool: *const MemoryPool,
    _phantom: PhantomData<T>,
}

impl<T> PooledBox<T> {
    pub fn new_in(pool: &MemoryPool, value: T) -> Option<Self> {
        let ptr = pool.allocate()?;
        
        unsafe {
            ptr.as_ptr().write(value);
        }
        
        Some(PooledBox {
            ptr: ptr.cast::<T>(),
            pool,
            _phantom: PhantomData,
        })
    }
    
    pub fn as_ref(&self) -> &T {
        unsafe {
            &*self.ptr.as_ptr()
        }
    }
    
    pub fn as_mut(&mut self) -> &mut T {
        unsafe {
            &mut *self.ptr.as_ptr()
        }
    }
}

impl<T> std::ops::Deref for PooledBox<T> {
    type Target = T;
    
    fn deref(&self) -> &Self::Target {
        self.as_ref()
    }
}

impl<T> std::ops::DerefMut for PooledBox<T> {
    fn deref_mut(&mut self) -> &mut Self::Target {
        self.as_mut()
    }
}

impl<T> Drop for PooledBox<T> {
    fn drop(&mut self) {
        unsafe {
            std::ptr::drop_in_place(self.ptr.as_ptr());
        }
    }
}

unsafe impl<T: Send> Send for PooledBox<T> {}
unsafe impl<T: Sync> Sync for PooledBox<T> where T: Sync {}

/// 内存池管理器
pub struct PoolManager {
    pools: std::collections::HashMap<usize, MemoryPool>,
    small_object_pool: MemoryPool,
    large_object_pool: MemoryPool,
}

impl PoolManager {
    pub fn new() -> Self {
        // 小对象池:128字节块,1024个块
        let small_object_pool = MemoryPool::new(128, 1024);
        
        // 大对象池:1024字节块,256个块
        let large_object_pool = MemoryPool::new(1024, 256);
        
        let mut pools = std::collections::HashMap::new();
        pools.insert(128, small_object_pool.pool.as_ptr() as *const MemoryPool);
        pools.insert(1024, large_object_pool.pool.as_ptr() as *const MemoryPool);
        
        PoolManager {
            pools,
            small_object_pool,
            large_object_pool,
        }
    }
    
    pub fn allocate<T>(&self, size: usize) -> Option<PooledBox<T>> {
        if size <= 128 {
            self.small_object_pool.allocate()
        } else if size <= 1024 {
            self.large_object_pool.allocate()
        } else {
            None
        }
    }
    
    pub fn get_pool_stats(&self) -> (PoolStats, PoolStats) {
        (self.small_object_pool.get_stats(), self.large_object_pool.get_stats())
    }
}

/// 对象池模式
pub struct ObjectPool<T: Default + Clone> {
    available: std::sync::Mutex<Vec<T>>,
    in_use: AtomicUsize,
    max_size: usize,
    _phantom: PhantomData<T>,
}

impl<T: Default + Clone + Send + Sync> ObjectPool<T> {
    pub fn new(max_size: usize) -> Self {
        ObjectPool {
            available: std::sync::Mutex::new(Vec::with_capacity(max_size)),
            in_use: AtomicUsize::new(0),
            max_size,
            _phantom: PhantomData,
        }
    }
    
    pub fn get(&self) -> Option<PooledObject<T>> {
        if self.in_use.load(Ordering::SeqCst) >= self.max_size {
            return None;
        }
        
        let mut available = self.available.lock().unwrap();
        let object = available.pop().unwrap_or_default();
        self.in_use.fetch_add(1, Ordering::SeqCst);
        
        Some(PooledObject {
            object,
            pool: self as *const ObjectPool<T>,
        })
    }
    
    pub fn get_stats(&self) -> PoolStats {
        let available = self.available.lock().unwrap().len();
        let in_use = self.in_use.load(Ordering::SeqCst);
        
        PoolStats {
            total_blocks: self.max_size,
            allocated: available + in_use,
            block_size: std::mem::size_of::<T>(),
            utilization: (available + in_use) as f64 / self.max_size as f64 * 100.0,
        }
    }
}

pub struct PooledObject<T> {
    object: T,
    pool: *const ObjectPool<T>,
}

impl<T> std::ops::Deref for PooledObject<T> {
    type Target = T;
    
    fn deref(&self) -> &Self::Target {
        &self.object
    }
}

impl<T> std::ops::DerefMut for PooledObject<T> {
    fn deref_mut(&mut self) -> &mut Self::Target {
        &mut self.object
    }
}

impl<T> Drop for PooledObject<T> {
    fn drop(&mut self) {
        unsafe {
            let pool = &*self.pool;
            // 重置对象状态
            *self.object = T::default();
            
            let mut available = pool.available.lock().unwrap();
            available.push(std::mem::replace(&mut self.object, T::default()));
            pool.in_use.fetch_sub(1, Ordering::SeqCst);
        }
    }
}
}

13.2.2 零拷贝优化

#![allow(unused)]
fn main() {
// 零拷贝字符串处理
// File: zero-copy/src/lib.rs
use std::ops::Deref;
use std::borrow::Cow;

/// 零拷贝字符串包装器
pub struct ZeroCopyStr {
    data: Cow<'static, str>,
    _phantom: std::marker::PhantomData<()>,
}

impl ZeroCopyStr {
    pub fn new_static(data: &'static str) -> Self {
        ZeroCopyStr {
            data: Cow::Borrowed(data),
            _phantom: PhantomData,
        }
    }
    
    pub fn new_owned(data: String) -> Self {
        ZeroCopyStr {
            data: Cow::Owned(data),
            _phantom: PhantomData,
        }
    }
    
    pub fn as_str(&self) -> &str {
        &self.data
    }
    
    pub fn to_string_lossy(&self) -> Cow<str> {
        self.data.clone()
    }
}

impl Deref for ZeroCopyStr {
    type Target = str;
    
    fn deref(&self) -> &Self::Target {
        &self.data
    }
}

impl AsRef<str> for ZeroCopyStr {
    fn as_ref(&self) -> &str {
        &self.data
    }
}

impl PartialEq for ZeroCopyStr {
    fn eq(&self, other: &Self) -> bool {
        self.data == other.data
    }
}

/// 零拷贝JSON处理
pub struct ZeroCopyJson<T> {
    data: T,
    _phantom: PhantomData<()>,
}

impl<T> ZeroCopyJson<T> {
    pub fn new(data: T) -> Self {
        ZeroCopyJson {
            data,
            _phantom: PhantomData,
        }
    }
    
    pub fn into_inner(self) -> T {
        self.data
    }
}

impl<T> std::ops::Deref for ZeroCopyJson<T> {
    type Target = T;
    
    fn deref(&self) -> &Self::Target {
        &self.data
    }
}

/// 字节缓冲区池
pub struct ByteBufferPool {
    pool: std::sync::Mutex<Vec<Vec<u8>>>,
    buffer_size: usize,
    max_buffers: usize,
}

impl ByteBufferPool {
    pub fn new(buffer_size: usize, max_buffers: usize) -> Self {
        ByteBufferPool {
            pool: std::sync::Mutex::new(Vec::with_capacity(max_buffers)),
            buffer_size,
            max_buffers,
        }
    }
    
    pub fn get_buffer(&self) -> Vec<u8> {
        if let Ok(mut pool) = self.pool.lock() {
            if let Some(buffer) = pool.pop() {
                buffer
            } else {
                vec![0; self.buffer_size]
            }
        } else {
            vec![0; self.buffer_size]
        }
    }
    
    pub fn return_buffer(&self, mut buffer: Vec<u8>) {
        if buffer.len() == self.buffer_size {
            if let Ok(mut pool) = self.pool.lock() {
                if pool.len() < self.max_buffers {
                    buffer.clear();
                    pool.push(buffer);
                }
            }
        }
    }
}
}

13.3 并发性能优化

13.3.1 异步编程优化

#![allow(unused)]
fn main() {
// 高性能异步处理器
// File: async-optimization/src/lib.rs
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio::sync::{RwLock, Semaphore, Mutex};
use tokio::task::{JoinHandle, JoinError};
use tracing::{info, warn, instrument, span, Level};
use futures::future::BoxFuture;
use futures::FutureExt;

/// 高性能异步任务执行器
pub struct AsyncTaskExecutor {
    task_semaphore: Arc<Semaphore>,
    active_tasks: Arc<std::sync::atomic::AtomicU64>,
    completed_tasks: Arc<std::sync::atomic::AtomicU64>,
    failed_tasks: Arc<std::sync::atomic::AtomicU64>,
}

impl AsyncTaskExecutor {
    pub fn new(max_concurrent_tasks: usize) -> Self {
        AsyncTaskExecutor {
            task_semaphore: Arc::new(Semaphore::new(max_concurrent_tasks)),
            active_tasks: Arc::new(std::sync::atomic::AtomicU64::new(0)),
            completed_tasks: Arc::new(std::sync::atomic::AtomicU64::new(0)),
            failed_tasks: Arc::new(std::sync::atomic::AtomicU64::new(0)),
        }
    }
    
    #[instrument(skip(self, task))]
    pub async fn execute<F, T>(&self, task: F) -> Result<T, Box<dyn std::error::Error + Send + Sync>>
    where
        F: FnOnce() -> BoxFuture<'static, Result<T, Box<dyn std::error::Error + Send + Sync>>> + Send + 'static,
        T: Send + 'static,
    {
        let _permit = self.task_semaphore.acquire().await?;
        self.active_tasks.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
        
        let start_time = Instant::now();
        let active_tasks = Arc::clone(&self.active_tasks);
        let completed_tasks = Arc::clone(&self.completed_tasks);
        let failed_tasks = Arc::clone(&self.failed_tasks);
        
        let result = tokio::spawn(async move {
            let task_result = task().await;
            
            // 记录完成时间
            let duration = start_time.elapsed();
            info!("Task completed in {:?}", duration);
            
            // 更新统计信息
            active_tasks.fetch_sub(1, std::sync::atomic::Ordering::Relaxed);
            match &task_result {
                Ok(_) => completed_tasks.fetch_add(1, std::sync::atomic::Ordering::Relaxed),
                Err(_) => failed_tasks.fetch_add(1, std::sync::atomic::Ordering::Relaxed),
            }
            
            task_result
        }).await;
        
        match result {
            Ok(task_result) => task_result,
            Err(join_error) => {
                self.active_tasks.fetch_sub(1, std::sync::atomic::Ordering::Relaxed);
                self.failed_tasks.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
                Err(Box::new(join_error))
            }
        }
    }
    
    pub async fn batch_execute<F, T>(&self, tasks: Vec<F>) -> Vec<Result<T, Box<dyn std::error::Error + Send + Sync>>>
    where
        F: FnOnce() -> BoxFuture<'static, Result<T, Box<dyn std::error::Error + Send + Sync>>> + Send + 'static,
        T: Send + 'static,
    {
        let mut handles: Vec<JoinHandle<Result<T, Box<dyn std::error::Error + Send + Sync>>>> = Vec::new();
        
        for task in tasks {
            let handle = tokio::spawn(async move {
                task().await
            });
            handles.push(handle);
        }
        
        let mut results = Vec::with_capacity(handles.len());
        
        for handle in handles {
            match handle.await {
                Ok(result) => results.push(result),
                Err(join_error) => results.push(Err(Box::new(join_error))),
            }
        }
        
        results
    }
    
    pub fn get_stats(&self) -> TaskStats {
        TaskStats {
            active_tasks: self.active_tasks.load(std::sync::atomic::Ordering::Relaxed),
            completed_tasks: self.completed_tasks.load(std::sync::atomic::Ordering::Relaxed),
            failed_tasks: self.failed_tasks.load(std::sync::atomic::Ordering::Relaxed),
            max_concurrent: self.task_semaphore.available_permits() + 
                           self.active_tasks.load(std::sync::atomic::Ordering::Relaxed),
        }
    }
}

#[derive(Debug, Clone)]
pub struct TaskStats {
    pub active_tasks: u64,
    pub completed_tasks: u64,
    pub failed_tasks: u64,
    pub max_concurrent: usize,
}

/// 无锁并发数据结构
pub struct LockFreeQueue<T> {
    head: Arc<AtomicNode<T>>,
    tail: Arc<AtomicNode<T>>,
}

struct AtomicNode<T> {
    value: std::sync::atomic::AtomicPtr<T>,
    next: std::sync::atomic::AtomicPtr<AtomicNode<T>>,
}

impl<T> AtomicNode<T> {
    fn new(value: T) -> Arc<Self> {
        let node = Arc::new(AtomicNode {
            value: std::sync::atomic::AtomicPtr::new(Box::into_raw(Box::new(value))),
            next: std::sync::atomic::AtomicPtr::new(std::ptr::null_mut()),
        });
        
        // 增加引用计数
        Arc::clone(&node);
        node
    }
    
    fn load_value(&self) -> T {
        unsafe {
            *Box::from_raw(self.value.load(std::sync::atomic::Ordering::Acquire))
        }
    }
}

impl<T> LockFreeQueue<T> {
    pub fn new() -> Self {
        let dummy = Arc::new(AtomicNode {
            value: std::sync::atomic::AtomicPtr::new(std::ptr::null_mut()),
            next: std::sync::atomic::AtomicPtr::new(std::ptr::null_mut()),
        });
        
        LockFreeQueue {
            head: Arc::clone(&dummy),
            tail: Arc::clone(&dummy),
        }
    }
    
    pub fn enqueue(&self, value: T) {
        let new_node = AtomicNode::new(value);
        let mut current_tail = self.tail.load(std::sync::atomic::Ordering::Acquire);
        
        loop {
            let current_tail_next = unsafe { (*current_tail).next.load(std::sync::atomic::Ordering::Acquire) };
            
            if current_tail_next.is_null() {
                if (*current_tail).next.compare_exchange_weak(
                    std::ptr::null_mut(),
                    Arc::as_ptr(&new_node) as *mut AtomicNode<T>,
                    std::sync::atomic::Ordering::Release,
                    std::sync::atomic::Ordering::Relaxed,
                ).is_ok() {
                    break;
                }
            } else {
                let _ = self.tail.compare_exchange_weak(
                    current_tail,
                    current_tail_next,
                    std::sync::atomic::Ordering::Release,
                    std::sync::atomic::Ordering::Relaxed,
                );
                current_tail = self.tail.load(std::sync::atomic::Ordering::Acquire);
            }
        }
    }
    
    pub fn dequeue(&self) -> Option<T> {
        let mut current_head = self.head.load(std::sync::atomic::Ordering::Acquire);
        
        loop {
            let next = unsafe { (*current_head).next.load(std::sync::atomic::Ordering::Acquire) };
            
            if next.is_null() {
                return None;
            }
            
            if self.head.compare_exchange_weak(
                current_head,
                next,
                std::sync::atomic::Ordering::Release,
                std::sync::atomic::Ordering::Relaxed,
            ).is_ok() {
                unsafe {
                    let value = (*next).load_value();
                    // 清理内存
                    let _ = Box::from_raw(current_head);
                    Some(value)
                }
            } else {
                current_head = self.head.load(std::sync::atomic::Ordering::Acquire);
            }
        }
    }
}

/// 工作窃取调度器
pub struct WorkStealingScheduler {
    queues: Vec<Arc<Mutex<Vec<Box<dyn Fn() + Send + Sync>>>>>,
    num_queues: usize,
}

impl WorkStealingScheduler {
    pub fn new(num_queues: usize) -> Self {
        let mut queues = Vec::with_capacity(num_queues);
        for _ in 0..num_queues {
            queues.push(Arc::new(Mutex::new(Vec::new())));
        }
        
        WorkStealingScheduler {
            queues,
            num_queues,
        }
    }
    
    pub fn schedule<F>(&self, work: F)
    where
        F: FnOnce() + Send + Sync + 'static,
    {
        let queue_index = std::thread::current().id().as_u128() as usize % self.num_queues;
        let queue = &self.queues[queue_index];
        
        let mut queue = queue.lock().unwrap();
        queue.push(Box::new(work));
    }
    
    pub fn execute_one(&self, queue_index: usize) -> bool {
        let mut queue = self.queues[queue_index].lock().unwrap();
        
        if let Some(work) = queue.pop() {
            drop(queue); // 释放锁
            work();
            true
        } else {
            false
        }
    }
    
    pub fn steal_work(&self, victim_index: usize) -> bool {
        let mut victim_queue = self.queues[victim_index].lock().unwrap();
        
        if let Some(work) = victim_queue.pop() {
            drop(victim_queue);
            work();
            true
        } else {
            false
        }
    }
}
}

13.3.2 并发模式优化

#![allow(unused)]
fn main() {
// Actor模式高性能实现
// File: actor-model/src/lib.rs
use std::sync::mpsc;
use std::sync::Arc;
use tokio::sync::{mpsc as async_mpsc, oneshot};
use tracing::{info, warn, instrument};

/// 异步Actor系统
pub struct ActorSystem {
    actors: Arc<std::sync::Mutex<std::collections::HashMap<String, Arc<dyn Actor + Send + Sync>>>>,
    message_router: MessageRouter,
}

impl ActorSystem {
    pub fn new() -> Self {
        ActorSystem {
            actors: Arc::new(std::sync::Mutex::new(std::collections::HashMap::new())),
            message_router: MessageRouter::new(),
        }
    }
    
    pub fn register_actor<A: Actor + Send + Sync + 'static>(&self, id: String, actor: A) {
        let mut actors = self.actors.lock().unwrap();
        actors.insert(id, Arc::new(actor));
        info!("Actor registered: {}", id);
    }
    
    pub async fn send_message<M: Message>(&self, actor_id: &str, message: M) -> Result<M::Response, ActorError>
    where
        M: Send + 'static,
        M::Response: Send + 'static,
    {
        let actors = self.actors.lock().unwrap();
        if let Some(actor) = actors.get(actor_id) {
            drop(actors);
            let (tx, rx) = oneshot::channel();
            
            let actor_message = ActorMessage {
                payload: Box::new(message),
                response_sender: Some(tx),
            };
            
            actor.handle_message(actor_message).await?;
            
            match rx.await {
                Ok(response) => Ok(response),
                Err(_) => Err(ActorError::ChannelClosed),
            }
        } else {
            Err(ActorError::ActorNotFound)
        }
    }
}

#[derive(Debug, thiserror::Error)]
pub enum ActorError {
    #[error("Actor not found")]
    ActorNotFound,
    #[error("Channel closed")]
    ChannelClosed,
    #[error("Message handling failed: {0}")]
    MessageHandlingFailed(String),
}

pub trait Message {
    type Response: Send;
}

pub struct ActorMessage {
    payload: Box<dyn Message + Send>,
    response_sender: Option<oneshot::Sender<Box<dyn std::any::Any + Send>>>,
}

impl ActorMessage {
    pub fn send_response<T: std::any::Any + Send>(&self, response: T) -> Result<(), ActorError> {
        if let Some(sender) = &self.response_sender {
            let _ = sender.send(Box::new(response));
            Ok(())
        } else {
            Err(ActorError::MessageHandlingFailed("No response sender".to_string()))
        }
    }
}

pub trait Actor {
    fn handle_message(&self, message: ActorMessage) -> BoxFuture<Result<(), ActorError>>;
}

type BoxFuture<T> = std::pin::Pin<Box<dyn std::future::Future<Output = T> + Send>>;

/// 消息路由器
struct MessageRouter {
    routes: Arc<std::sync::Mutex<std::collections::HashMap<String, String>>>,
}

impl MessageRouter {
    fn new() -> Self {
        MessageRouter {
            routes: Arc::new(std::sync::Mutex::new(std::collections::HashMap::new())),
        }
    }
    
    fn register_route(&self, pattern: String, actor_id: String) {
        let mut routes = self.routes.lock().unwrap();
        routes.insert(pattern, actor_id);
    }
}

/// 高性能事件驱动系统
pub struct EventDrivenSystem {
    event_bus: Arc<async_mpsc::UnboundedSender<Event>>,
    subscribers: Arc<std::sync::Mutex<std::collections::HashMap<String, Vec<Arc<dyn EventHandler + Send + Sync>>>>,
    event_history: Arc<std::sync::Mutex<Vec<Event>>>,
    max_history: usize,
}

#[derive(Debug, Clone)]
pub struct Event {
    pub id: String,
    pub event_type: String,
    pub payload: Box<dyn std::any::Any + Send + Sync>,
    pub timestamp: std::time::Instant,
    pub source: String,
}

impl Event {
    pub fn new<T: std::any::Any + Send + Sync>(event_type: String, payload: T, source: String) -> Self {
        Event {
            id: uuid::Uuid::new_v4().to_string(),
            event_type,
            payload: Box::new(payload),
            timestamp: std::time::Instant::now(),
            source,
        }
    }
}

pub trait EventHandler: Send + Sync {
    fn handle_event(&self, event: &Event) -> BoxFuture<Result<(), EventHandlerError>>;
    fn event_types(&self) -> Vec<String>;
}

#[derive(Debug, thiserror::Error)]
pub enum EventHandlerError {
    #[error("Event handling failed: {0}")]
    HandlingFailed(String),
    #[error("Invalid event type")]
    InvalidEventType,
}

impl EventDrivenSystem {
    pub fn new(buffer_size: usize, max_history: usize) -> Self {
        let (tx, _) = async_mpsc::unbounded_channel(buffer_size);
        
        EventDrivenSystem {
            event_bus: Arc::new(tx),
            subscribers: Arc::new(std::sync::Mutex::new(std::collections::HashMap::new())),
            event_history: Arc::new(std::sync::Mutex::new(Vec::with_capacity(max_history))),
            max_history,
        }
    }
    
    pub fn subscribe<H: EventHandler + Send + Sync + 'static>(&self, handler: Arc<H>) {
        for event_type in handler.event_types() {
            let mut subscribers = self.subscribers.lock().unwrap();
            subscribers
                .entry(event_type)
                .or_insert_with(Vec::new)
                .push(handler.clone());
        }
    }
    
    pub async fn publish_event<T: std::any::Any + Send + Sync>(&self, event: Event) -> Result<(), mpsc::SendError<Event>> {
        // 存储到历史记录
        {
            let mut history = self.event_history.lock().unwrap();
            history.push(event.clone());
            if history.len() > self.max_history {
                history.remove(0);
            }
        }
        
        // 分发给订阅者
        let subscribers = self.subscribers.lock().unwrap();
        if let Some(handlers) = subscribers.get(&event.event_type) {
            for handler in handlers {
                let event_clone = event.clone();
                let handler = handler.clone();
                let event_bus = Arc::clone(&self.event_bus);
                
                tokio::spawn(async move {
                    if let Err(e) = handler.handle_event(&event_clone).await {
                        warn!("Event handler failed: {}", e);
                    }
                });
            }
        }
        
        // 发送到事件总线
        self.event_bus.send(event)
    }
    
    pub fn get_event_history(&self) -> Vec<Event> {
        self.event_history.lock().unwrap().clone()
    }
}
}

13.4 缓存策略

13.4.1 多层缓存架构

#![allow(unused)]
fn main() {
// 多层缓存系统
// File: multi-layer-cache/src/lib.rs
use std::collections::HashMap;
use std::sync::{Arc, RwLock, Mutex};
use std::time::{Duration, Instant};
use tokio::sync::RwLock as AsyncRwLock;
use tracing::{info, warn, debug};
use serde::{Serialize, Deserialize};

/// 缓存条目
#[derive(Debug, Clone)]
pub struct CacheEntry<T> {
    pub data: T,
    pub created_at: Instant,
    pub access_count: u64,
    pub last_accessed: Instant,
    pub ttl: Option<Duration>,
}

impl<T> CacheEntry<T> {
    pub fn new(data: T, ttl: Option<Duration>) -> Self {
        let now = Instant::now();
        CacheEntry {
            data,
            created_at: now,
            access_count: 0,
            last_accessed: now,
            ttl,
        }
    }
    
    pub fn is_expired(&self) -> bool {
        if let Some(ttl) = self.ttl {
            self.created_at + ttl < Instant::now()
        } else {
            false
        }
    }
    
    pub fn access(&mut self) -> &T {
        self.access_count += 1;
        self.last_accessed = Instant::now();
        &self.data
    }
}

/// L1 缓存 - 内存缓存(最热数据)
pub struct L1Cache<K, V> {
    data: Arc<RwLock<HashMap<K, CacheEntry<V>>>>,
    max_size: usize,
    hit_count: std::sync::atomic::AtomicU64,
    miss_count: std::sync::atomic::AtomicU64,
}

impl<K, V> L1Cache<K, V>
where
    K: Clone + std::hash::Hash + Eq + Send + Sync,
    V: Clone + Send + Sync,
{
    pub fn new(max_size: usize) -> Self {
        L1Cache {
            data: Arc::new(RwLock::new(HashMap::new())),
            max_size,
            hit_count: std::sync::atomic::AtomicU64::new(0),
            miss_count: std::sync::atomic::AtomicU64::new(0),
        }
    }
    
    pub async fn get(&self, key: &K) -> Option<Arc<V>> {
        let data = self.data.read().unwrap();
        if let Some(entry) = data.get(key) {
            if !entry.is_expired() {
                self.hit_count.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
                let value = Arc::new(entry.access().clone());
                drop(data);
                Some(value)
            } else {
                drop(data);
                self.remove(key).await;
                self.miss_count.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
                None
            }
        } else {
            drop(data);
            self.miss_count.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
            None
        }
    }
    
    pub async fn set(&self, key: K, value: V, ttl: Option<Duration>) {
        let mut data = self.data.write().unwrap();
        
        // 如果缓存已满,删除最久未访问的条目
        if data.len() >= self.max_size && !data.contains_key(&key) {
            let lru_key = self.find_lru_key(&data);
            if let Some(lru) = lru_key {
                data.remove(&lru);
            }
        }
        
        data.insert(key, CacheEntry::new(value, ttl));
    }
    
    pub async fn remove(&self, key: &K) -> Option<V> {
        let mut data = self.data.write().unwrap();
        data.remove(key).map(|entry| entry.data)
    }
    
    pub async fn clear(&self) {
        let mut data = self.data.write().unwrap();
        data.clear();
    }
    
    fn find_lru_key(&self, data: &HashMap<K, CacheEntry<V>>) -> Option<K> {
        data.iter()
            .min_by_key(|(_, entry)| entry.last_accessed)
            .map(|(key, _)| key.clone())
    }
    
    pub fn get_stats(&self) -> CacheStats {
        let hit_count = self.hit_count.load(std::sync::atomic::Ordering::Relaxed);
        let miss_count = self.miss_count.load(std::sync::atomic::Ordering::Relaxed);
        let total = hit_count + miss_count;
        
        CacheStats {
            hit_count,
            miss_count,
            hit_rate: if total > 0 { hit_count as f64 / total as f64 } else { 0.0 },
            size: self.data.read().unwrap().len(),
            max_size: self.max_size,
        }
    }
}

/// L2 缓存 - 分布式缓存(较热数据)
pub struct L2Cache<K, V> {
    client: Arc<redis::Client>,
    key_prefix: String,
    default_ttl: Duration,
    hit_count: std::sync::atomic::AtomicU64,
    miss_count: std::sync::atomic::AtomicU64,
}

impl<K, V> L2Cache<K, V>
where
    K: std::fmt::Display + Send + Sync,
    V: Serialize + for<'de> Deserialize<'de> + Send + Sync,
{
    pub fn new(client: redis::Client, key_prefix: String, default_ttl: Duration) -> Self {
        L2Cache {
            client: Arc::new(client),
            key_prefix,
            default_ttl,
            hit_count: std::sync::atomic::AtomicU64::new(0),
            miss_count: std::sync::atomic::AtomicU64::new(0),
        }
    }
    
    pub async fn get(&self, key: &K) -> Option<V> {
        let redis_key = format!("{}:{}", self.key_prefix, key);
        
        match self.client.get_async_connection().await {
            Ok(mut conn) => {
                match redis::cmd("GET")
                    .arg(&redis_key)
                    .query_async::<_, Option<String>>(&mut conn)
                    .await
                {
                    Ok(Some(data)) => {
                        self.hit_count.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
                        match serde_json::from_str(&data) {
                            Ok(value) => Some(value),
                            Err(_) => {
                                warn!("Failed to deserialize cache data for key: {}", key);
                                self.miss_count.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
                                None
                            }
                        }
                    }
                    Ok(None) => {
                        self.miss_count.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
                        None
                    }
                    Err(e) => {
                        warn!("Redis get error: {}", e);
                        self.miss_count.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
                        None
                    }
                }
            }
            Err(e) => {
                warn!("Failed to connect to Redis: {}", e);
                self.miss_count.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
                None
            }
        }
    }
    
    pub async fn set(&self, key: K, value: V, ttl: Option<Duration>) {
        let redis_key = format!("{}:{}", self.key_prefix, key);
        let ttl_duration = ttl.unwrap_or(self.default_ttl);
        
        match self.client.get_async_connection().await {
            Ok(mut conn) => {
                if let Ok(data) = serde_json::to_string(&value) {
                    let ttl_secs = ttl_duration.as_secs();
                    
                    let _ = redis::cmd("SETEX")
                        .arg(&redis_key)
                        .arg(ttl_secs)
                        .arg(&data)
                        .query_async::<_, ()>(&mut conn)
                        .await
                        .map_err(|e| warn!("Redis set error: {}", e));
                }
            }
            Err(e) => {
                warn!("Failed to connect to Redis: {}", e);
            }
        }
    }
    
    pub async fn remove(&self, key: &K) {
        let redis_key = format!("{}:{}", self.key_prefix, key);
        
        match self.client.get_async_connection().await {
            Ok(mut conn) => {
                let _ = redis::cmd("DEL")
                    .arg(&redis_key)
                    .query_async::<_, ()>(&mut conn)
                    .await
                    .map_err(|e| warn!("Redis delete error: {}", e));
            }
            Err(e) => {
                warn!("Failed to connect to Redis: {}", e);
            }
        }
    }
    
    pub fn get_stats(&self) -> CacheStats {
        let hit_count = self.hit_count.load(std::sync::atomic::Ordering::Relaxed);
        let miss_count = self.miss_count.load(std::sync::atomic::Ordering::Relaxed);
        let total = hit_count + miss_count;
        
        CacheStats {
            hit_count,
            miss_count,
            hit_rate: if total > 0 { hit_count as f64 / total as f64 } else { 0.0 },
            size: 0, // Redis中的大小需要额外查询
            max_size: 0,
        }
    }
}

/// L3 缓存 - 数据源缓存(冷数据)
pub struct L3Cache<K, V, F> {
    data_fetcher: F,
    cache: Arc<AsyncRwLock<HashMap<K, CacheEntry<V>>>>,
    ttl: Duration,
}

impl<K, V, F> L3Cache<K, V, F>
where
    K: Clone + std::hash::Hash + Eq + Send + Sync,
    V: Clone + Send + Sync,
    F: Fn(K) -> BoxFuture<'static, V> + Send + Sync,
{
    pub fn new(data_fetcher: F, ttl: Duration) -> Self {
        L3Cache {
            data_fetcher,
            cache: Arc::new(AsyncRwLock::new(HashMap::new())),
            ttl,
        }
    }
    
    pub async fn get(&self, key: K) -> V {
        let mut cache = self.cache.read().await;
        
        if let Some(entry) = cache.get(&key) {
            if !entry.is_expired() {
                return entry.access().clone();
            }
        }
        
        drop(cache);
        
        // 缓存未命中,从数据源获取
        debug!("Cache miss, fetching from data source for key: {:?}", key);
        let value = (self.data_fetcher)(key.clone()).await;
        
        // 更新缓存
        let mut cache = self.cache.write().await;
        cache.insert(key, CacheEntry::new(value.clone(), Some(self.ttl)));
        
        value
    }
    
    pub async fn invalidate(&self, key: &K) {
        let mut cache = self.cache.write().await;
        cache.remove(key);
    }
    
    pub async fn clear(&self) {
        let mut cache = self.cache.write().await;
        cache.clear();
    }
}

/// 多层缓存系统
pub struct MultiLayerCache<K, V> {
    l1: Option<Arc<L1Cache<K, V>>>,
    l2: Option<Arc<L2Cache<K, V>>>,
    l3: Option<Arc<L3Cache<K, V, Box<dyn Fn(K) -> BoxFuture<'static, V> + Send + Sync>>>>,
    fallback_strategy: FallbackStrategy,
}

#[derive(Debug, Clone)]
pub enum FallbackStrategy {
    L1Only,
    L1L2,
    AllLayers,
}

impl<K, V> MultiLayerCache<K, V>
where
    K: Clone + std::hash::Hash + Eq + std::fmt::Display + Send + Sync,
    V: Clone + Serialize + for<'de> Deserialize<'de> + Send + Sync,
{
    pub fn new(
        l1_size: Option<usize>,
        redis_client: Option<redis::Client>,
        data_fetcher: Option<Box<dyn Fn(K) -> BoxFuture<'static, V> + Send + Sync>>,
        fallback_strategy: FallbackStrategy,
    ) -> Self {
        let l1 = l1_size.map(|size| Arc::new(L1Cache::new(size)));
        let l2 = redis_client.map(|client| Arc::new(L2Cache::new(client, "cache".to_string(), Duration::from_secs(3600))));
        let l3 = data_fetcher.map(|fetcher| Arc::new(L3Cache::new(fetcher, Duration::from_secs(7200))));
        
        MultiLayerCache {
            l1,
            l2,
            l3,
            fallback_strategy,
        }
    }
    
    pub async fn get(&self, key: K) -> Option<V> {
        match self.fallback_strategy {
            FallbackStrategy::L1Only => {
                if let Some(l1) = &self.l1 {
                    l1.get(&key).await.map(|value| (*value).clone())
                } else {
                    None
                }
            }
            FallbackStrategy::L1L2 => {
                // 尝试L1
                if let Some(l1) = &self.l1 {
                    if let Some(value) = l1.get(&key).await {
                        return Some((*value).clone());
                    }
                }
                
                // L1未命中,尝试L2
                if let Some(l2) = &self.l2 {
                    if let Some(value) = l2.get(&key).await {
                        // 升级到L1
                        if let Some(l1) = &self.l1 {
                            l1.set(key.clone(), value.clone(), Some(Duration::from_secs(300))).await;
                        }
                        return Some(value);
                    }
                }
                
                None
            }
            FallbackStrategy::AllLayers => {
                // 依次尝试各层
                if let Some(l1) = &self.l1 {
                    if let Some(value) = l1.get(&key).await {
                        return Some((*value).clone());
                    }
                }
                
                if let Some(l2) = &self.l2 {
                    if let Some(value) = l2.get(&key).await {
                        // 升级到L1
                        if let Some(l1) = &self.l1 {
                            l1.set(key.clone(), value.clone(), Some(Duration::from_secs(300))).await;
                        }
                        return Some(value);
                    }
                }
                
                // 最后一层:L3(数据源)
                if let Some(l3) = &self.l3 {
                    let value = l3.get(key).await;
                    
                    // 向各层写入
                    if let Some(l1) = &self.l1 {
                        l1.set(key.clone(), value.clone(), Some(Duration::from_secs(300))).await;
                    }
                    if let Some(l2) = &self.l2 {
                        l2.set(key.clone(), value.clone(), Some(Duration::from_secs(3600))).await;
                    }
                    
                    return Some(value);
                }
                
                None
            }
        }
    }
    
    pub async fn set(&self, key: K, value: V, ttl: Option<Duration>) {
        // 写入所有缓存层
        if let Some(l1) = &self.l1 {
            l1.set(key.clone(), value.clone(), ttl).await;
        }
        
        if let Some(l2) = &self.l2 {
            l2.set(key.clone(), value.clone(), ttl).await;
        }
    }
    
    pub async fn remove(&self, key: &K) {
        if let Some(l1) = &self.l1 {
            l1.remove(key).await;
        }
        
        if let Some(l2) = &self.l2 {
            l2.remove(key).await;
        }
        
        if let Some(l3) = &self.l3 {
            l3.invalidate(key).await;
        }
    }
    
    pub async fn get_stats(&self) -> MultiLayerCacheStats {
        MultiLayerCacheStats {
            l1_stats: self.l1.as_ref().map(|l1| l1.get_stats()),
            l2_stats: self.l2.as_ref().map(|l2| l2.get_stats()),
        }
    }
}

#[derive(Debug, Clone)]
pub struct CacheStats {
    pub hit_count: u64,
    pub miss_count: u64,
    pub hit_rate: f64,
    pub size: usize,
    pub max_size: usize,
}

#[derive(Debug, Clone)]
pub struct MultiLayerCacheStats {
    pub l1_stats: Option<CacheStats>,
    pub l2_stats: Option<CacheStats>,
}
}

13.4.2 缓存更新策略

#![allow(unused)]
fn main() {
// 智能缓存更新策略
// File: cache-strategies/src/lib.rs
use std::time::{Duration, Instant};
use std::collections::HashMap;
use tracing::{info, warn, debug};

/// 缓存更新策略
#[derive(Debug, Clone)]
pub enum UpdateStrategy {
    /// 写通模式:同时写入缓存和数据源
    WriteThrough,
    /// 写回模式:先写入缓存,异步写入数据源
    WriteBack,
    /// 写绕模式:只写入数据源,清除缓存
    WriteAround,
    /// 延迟写入模式:缓存命中时更新缓存
    LazyWrite,
    /// TTL模式:基于时间过期
    TtlBased,
    /// LRU模式:基于访问频率
    LruBased,
}

pub struct CacheUpdateManager {
    strategies: HashMap<String, UpdateStrategy>,
    last_update_times: HashMap<String, Instant>,
    write_queue: Arc<crossbeam::queue::SegQueue<WriteOperation>>,
    background_writer: Option<tokio::task::JoinHandle<()>>,
}

#[derive(Debug, Clone)]
pub struct WriteOperation {
    pub key: String,
    pub value: String,
    pub strategy: UpdateStrategy,
    pub timestamp: Instant,
}

impl CacheUpdateManager {
    pub fn new() -> Self {
        let manager = CacheUpdateManager {
            strategies: HashMap::new(),
            last_update_times: HashMap::new(),
            write_queue: Arc::new(crossbeam::queue::SegQueue::new()),
            background_writer: None,
        };
        
        manager.start_background_writer();
        manager
    }
    
    pub fn register_strategy(&mut self, cache_key: &str, strategy: UpdateStrategy) {
        self.strategies.insert(cache_key.to_string(), strategy);
        info!("Registered update strategy for cache key '{}': {:?}", cache_key, strategy);
    }
    
    pub async fn update_cache(&self, key: &str, value: &str) -> Result<(), CacheError> {
        let strategy = self.strategies.get(key)
            .unwrap_or(&UpdateStrategy::WriteThrough);
        
        match strategy {
            UpdateStrategy::WriteThrough => {
                self.write_through(key, value).await
            }
            UpdateStrategy::WriteBack => {
                self.write_back(key, value)
            }
            UpdateStrategy::WriteAround => {
                self.write_around(key, value).await
            }
            UpdateStrategy::LazyWrite => {
                // 标记为需要更新,但不立即写入
                Ok(())
            }
            UpdateStrategy::TtlBased => {
                self.ttl_based_update(key, value)
            }
            UpdateStrategy::LruBased => {
                self.lru_based_update(key, value)
            }
        }
    }
    
    async fn write_through(&self, key: &str, value: &str) -> Result<(), CacheError> {
        info!("Write-through: updating both cache and data source for key: {}", key);
        
        // 同步更新缓存和数据源
        let cache_update = self.update_cache_layer(key, value);
        let data_update = self.update_data_source(key, value);
        
        futures::future::join(cache_update, data_update).await;
        
        Ok(())
    }
    
    fn write_back(&self, key: &str, value: &str) -> Result<(), CacheError> {
        info!("Write-back: queuing update for background processing: {}", key);
        
        // 写入队列,异步处理
        self.write_queue.push(WriteOperation {
            key: key.to_string(),
            value: value.to_string(),
            strategy: UpdateStrategy::WriteBack,
            timestamp: Instant::now(),

        });
        
        Ok(())
    }
    
    async fn write_around(&self, key: &str, value: &str) -> Result<(), CacheError> {
        info!("Write-around: updating data source and invalidating cache: {}", key);
        
        // 更新数据源
        self.update_data_source(key, value).await;
        
        // 清除缓存
        self.invalidate_cache(key).await;
        
        Ok(())
    }
    
    fn ttl_based_update(&self, key: &str, value: &str) -> Result<(), CacheError> {
        info!("TTL-based update for key: {}", key);
        
        // 更新最后访问时间
        self.last_update_times.insert(key.to_string(), Instant::now());
        
        // 更新缓存
        self.update_cache_layer(key, value)?;
        
        Ok(())
    }
    
    fn lru_based_update(&self, key: &str, value: &str) -> Result<(), CacheError> {
        info!("LRU-based update for key: {}", key);
        
        // 将key移到最近使用
        self.last_update_times.insert(key.to_string(), Instant::now());
        
        // 更新缓存
        self.update_cache_layer(key, value)?;
        
        Ok(())
    }
    
    async fn update_cache_layer(&self, key: &str, value: &str) -> Result<(), CacheError> {
        // 这里应该是实际的缓存更新逻辑
        debug!("Updating cache layer for key: {}", key);
        Ok(())
    }
    
    async fn update_data_source(&self, key: &str, value: &str) {
        // 这里应该是实际的数据源更新逻辑
        debug!("Updating data source for key: {}", key);
        tokio::time::sleep(Duration::from_millis(10)).await; // 模拟写入延迟
    }
    
    async fn invalidate_cache(&self, key: &str) {
        debug!("Invalidating cache for key: {}", key);
        // 缓存失效逻辑
    }
    
    fn start_background_writer(&mut self) {
        let write_queue = Arc::clone(&self.write_queue);
        
        self.background_writer = Some(tokio::spawn(async move {
            loop {
                if let Some(operation) = write_queue.pop() {
                    debug!("Processing background write operation: {}", operation.key);
                    
                    // 模拟异步写入到数据源
                    tokio::time::sleep(Duration::from_millis(50)).await;
                    
                    info!("Background write completed for key: {}", operation.key);
                } else {
                    tokio::time::sleep(Duration::from_millis(100)).await;
                }
            }
        }));
    }
    
    pub fn stop(&mut self) {
        if let Some(handle) = self.background_writer.take() {
            handle.abort();
        }
    }
}

#[derive(Debug, thiserror::Error)]
pub enum CacheError {
    #[error("Cache update failed: {0}")]
    UpdateFailed(String),
    #[error("Cache key not found")]
    KeyNotFound,
    #[error("Invalid strategy")]
    InvalidStrategy,
}

/// 缓存预热管理器
pub struct CacheWarmer {
    warmup_tasks: HashMap<String, WarmupTask>,
    parallel_tasks: usize,
}

#[derive(Debug, Clone)]
pub struct WarmupTask {
    pub key: String,
    pub fetcher: Box<dyn Fn() -> BoxFuture<'static, Option<String>> + Send + Sync>,
    pub priority: WarmupPriority,
}

#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
pub enum WarmupPriority {
    Critical,
    High,
    Normal,
    Low,
}

impl CacheWarmer {
    pub fn new(parallel_tasks: usize) -> Self {
        CacheWarmer {
            warmup_tasks: HashMap::new(),
            parallel_tasks,
        }
    }
    
    pub fn register_task(&mut self, task: WarmupTask) {
        self.warmup_tasks.insert(task.key.clone(), task);
        info!("Registered warmup task for key: {}", task.key);
    }
    
    pub async fn warm_up(&self) -> WarmupResult {
        info!("Starting cache warm-up with {} tasks", self.warmup_tasks.len());
        
        // 按优先级排序
        let mut tasks: Vec<_> = self.warmup_tasks.values().cloned().collect();
        tasks.sort_by(|a, b| b.priority.cmp(&a.priority));
        
        let mut results = Vec::new();
        let semaphore = Arc::new(tokio::sync::Semaphore::new(self.parallel_tasks));
        
        // 并行执行预热任务
        for task in tasks {
            let permit = semaphore.clone().acquire_owned().await.unwrap();
            let task_key = task.key.clone();
            
            let result = tokio::spawn(async move {
                let _permit = permit;
                
                let start_time = Instant::now();
                let value = (task.fetcher)().await;
                let duration = start_time.elapsed();
                
                WarmupResultItem {
                    key: task_key,
                    success: value.is_some(),
                    duration,
                    value,
                }
            });
            
            results.push(result);
        }
        
        // 等待所有任务完成
        let mut warmup_results = Vec::new();
        for handle in results {
            if let Ok(result) = handle.await {
                warmup_results.push(result);
            }
        }
        
        WarmupResult {
            total_tasks: warmup_results.len(),
            successful_tasks: warmup_results.iter().filter(|r| r.success).count(),
            failed_tasks: warmup_results.iter().filter(|r| !r.success).count(),
            total_duration: warmup_results.iter().map(|r| r.duration).max().unwrap_or_default(),
            results: warmup_results,
        }
    }
}

#[derive(Debug)]
pub struct WarmupResult {
    pub total_tasks: usize,
    pub successful_tasks: usize,
    pub failed_tasks: usize,
    pub total_duration: Duration,
    pub results: Vec<WarmupResultItem>,
}

#[derive(Debug, Clone)]
pub struct WarmupResultItem {
    pub key: String,
    pub success: bool,
    pub duration: Duration,
    pub value: Option<String>,
}

impl WarmupResult {
    pub fn success_rate(&self) -> f64 {
        if self.total_tasks > 0 {
            self.successful_tasks as f64 / self.total_tasks as f64 * 100.0
        } else {
            0.0
        }
    }
    
    pub fn print_summary(&self) {
        info!("=== Cache Warm-up Summary ===");
        info!("Total tasks: {}", self.total_tasks);
        info!("Successful: {}", self.successful_tasks);
        info!("Failed: {}", self.failed_tasks);
        info!("Success rate: {:.1}%", self.success_rate());
        info!("Total duration: {:?}", self.total_duration);
        
        if self.failed_tasks > 0 {
            warn!("Failed warm-up tasks:");
            for result in &self.results {
                if !result.success {
                    warn!("  - {}: {:?}", result.key, result.duration);
                }
            }
        }
    }
}
}

13.5 高性能缓存服务项目

现在我们来构建一个企业级高性能缓存服务,集成所有学到的性能优化技术。

#![allow(unused)]
fn main() {
// 高性能缓存服务主项目
// File: cache-service/Cargo.toml
[package]
name = "cache-service"
version = "1.0.0"
edition = "2021"

[dependencies]
tokio = { version = "1.0", features = ["full"] }
axum = { version = "0.7", features = ["macros"] }
tower = { version = "0.4" }
tower-http = { version = "0.5", features = ["cors", "compression", "trace", "timeout"] }
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
redis = { version = "0.24", features = ["tokio-comp", "connection-manager"] }
clap = { version = "4.0", features = ["derive"] }
tracing = "0.1"
tracing-subscriber = "0.3"
anyhow = "1.0"
thiserror = "1.0"
uuid = { version = "1.0", features = ["v4", "serde"] }
chrono = { version = "0.4", features = ["serde"] }
criterion = "0.5"
once_cell = "1.0"
futures = "0.3"
crossbeam = "0.8"
regex = "1.0"
}
// 高性能缓存服务
// File: cache-service/src/main.rs
use clap::{Parser, Subcommand};
use tracing::{info, warn, error, Level};
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};

mod cache;
mod server;
mod config;

use cache::CacheService;
use server::CacheServer;
use config::Config;

#[derive(Parser, Debug)]
#[command(name = "cache-service")]
#[command(about = "High-performance cache service")]
struct Cli {
    #[command(subcommand)]
    command: Commands,
}

#[derive(Subcommand, Debug)]
enum Commands {
    /// Start the cache service
    Server {
        #[arg(short, long, default_value = "0.0.0.0:8080")]
        addr: String,
        
        #[arg(short, long, default_value = "redis://localhost:6379")]
        redis_url: String,
        
        #[arg(short, long, default_value = "1000")]
        l1_cache_size: usize,
        
        #[arg(short, long, default_value = "100")]
        parallel_tasks: usize,
    },
    /// Run performance benchmarks
    Benchmark,
    /// Test cache service
    Test,
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // 初始化日志
    tracing_subscriber::registry()
        .with(
            tracing_subscriber::EnvFilter::try_from_default_env()
                .unwrap_or_else(|_| "cache_service=debug,tokio=warn,sqlx=warn".into()),
        )
        .with(tracing_subscriber::fmt::layer())
        .init();
    
    let cli = Cli::parse();
    
    match cli.command {
        Commands::Server { addr, redis_url, l1_cache_size, parallel_tasks } => {
            run_server(addr, redis_url, l1_cache_size, parallel_tasks).await
        }
        Commands::Benchmark => {
            run_benchmarks().await
        }
        Commands::Test => {
            run_tests().await
        }
    }
}

async fn run_server(
    addr: String,
    redis_url: String,
    l1_cache_size: usize,
    parallel_tasks: usize,
) -> Result<(), Box<dyn std::error::Error>> {
    info!("Starting high-performance cache service on {}", addr);
    
    // 初始化配置
    let config = Config {
        addr,
        redis_url,
        l1_cache_size,
        parallel_tasks,
        default_ttl: std::time::Duration::from_secs(3600),
        max_ttl: std::time::Duration::from_secs(86400),
        warmup_enabled: true,
        metrics_enabled: true,
    };
    
    // 初始化Redis连接
    let redis_client = redis::Client::open(&config.redis_url)?;
    
    // 初始化缓存服务
    let cache_service = CacheService::new(redis_client, config.clone()).await?;
    
    // 启动服务器
    let server = CacheServer::new(config, cache_service);
    server.run().await?;
    
    Ok(())
}

async fn run_benchmarks() -> Result<(), Box<dyn std::error::Error>> {
    info!("Running performance benchmarks");
    
    // 缓存性能基准测试
    run_cache_benchmarks().await?;
    
    // 并发性能基准测试
    run_concurrency_benchmarks().await?;
    
    // 内存使用基准测试
    run_memory_benchmarks().await?;
    
    info!("All benchmarks completed");
    Ok(())
}

async fn run_cache_benchmarks() -> Result<(), Box<dyn std::error::Error>> {
    use criterion::{black_box, criterion_group, criterion_main, Criterion};
    
    // 这里集成criterion进行缓存性能测试
    // ... 基准测试实现
    
    Ok(())
}

async fn run_concurrency_benchmarks() -> Result<(), Box<dyn std::error::Error>> {
    info!("Running concurrency benchmarks");
    
    // 并发性能测试
    let cache_service = create_test_cache_service().await?;
    
    // 测试高并发写入
    let write_start = std::time::Instant::now();
    let mut handles = Vec::new();
    
    for i in 0..1000 {
        let service = cache_service.clone();
        let handle = tokio::spawn(async move {
            let key = format!("test_key_{}", i);
            let value = format!("test_value_{}", i);
            
            service.set(&key, &value, Some(std::time::Duration::from_secs(300))).await?;
            service.get::<String>(&key).await
        });
        handles.push(handle);
    }
    
    for handle in handles {
        let _ = handle.await?;
    }
    
    let write_duration = write_start.elapsed();
    info!("Concurrent write test completed in {:?}", write_duration);
    
    Ok(())
}

async fn run_memory_benchmarks() -> Result<(), Box<dyn std::error::Error>> {
    info!("Running memory benchmarks");
    
    // 内存使用测试
    use perf_tools::MemoryProfiler;
    
    let mut profiler = MemoryProfiler::new();
    
    // 创建大量缓存条目
    let cache_service = create_test_cache_service().await?;
    
    for i in 0..10000 {
        let key = format!("memory_test_{}", i);
        let value = "x".repeat(1000); // 1KB数据
        
        cache_service.set(&key, &value, Some(std::time::Duration::from_secs(60))).await?;
        profiler.update_peak();
    }
    
    info!("Memory benchmark completed");
    Ok(())
}

async fn run_tests() -> Result<(), Box<dyn std::error::Error>> {
    info!("Running cache service tests");
    
    let cache_service = create_test_cache_service().await?;
    
    // 基础功能测试
    test_basic_operations(&cache_service).await?;
    
    // TTL测试
    test_ttl_expiration(&cache_service).await?;
    
    // 并发测试
    test_concurrent_operations(&cache_service).await?;
    
    info!("All tests passed");
    Ok(())
}

async fn create_test_cache_service() -> Result<Arc<CacheService>, Box<dyn std::error::Error>> {
    let redis_client = redis::Client::open("redis://localhost:6379")?;
    let config = Config {
        addr: "127.0.0.1:0".to_string(),
        redis_url: "redis://localhost:6379".to_string(),
        l1_cache_size: 1000,
        parallel_tasks: 100,
        default_ttl: std::time::Duration::from_secs(3600),
        max_ttl: std::time::Duration::from_secs(86400),
        warmup_enabled: false,
        metrics_enabled: true,
    };
    
    let cache_service = CacheService::new(redis_client, config).await?;
    Ok(Arc::new(cache_service))
}

async fn test_basic_operations(cache_service: &CacheService) -> Result<(), Box<dyn std::error::Error>> {
    // 测试设置和获取
    cache_service.set("test_key", "test_value", None).await?;
    let value: Option<String> = cache_service.get("test_key").await?;
    
    assert_eq!(value, Some("test_value".to_string()));
    info!("✓ Basic set/get operations working");
    
    // 测试删除
    cache_service.delete("test_key").await?;
    let value: Option<String> = cache_service.get("test_key").await?;
    
    assert_eq!(value, None);
    info!("✓ Delete operation working");
    
    Ok(())
}

async fn test_ttl_expiration(cache_service: &CacheService) -> Result<(), Box<dyn std::error::Error>> {
    // 测试短TTL
    cache_service.set("ttl_test", "ttl_value", Some(std::time::Duration::from_millis(100))).await?;
    
    // 立即检查应该存在
    let value: Option<String> = cache_service.get("ttl_test").await?;
    assert_eq!(value, Some("ttl_value".to_string()));
    
    // 等待过期
    tokio::time::sleep(std::time::Duration::from_millis(150)).await;
    
    // 检查应该已过期
    let value: Option<String> = cache_service.get("ttl_test").await?;
    assert_eq!(value, None);
    
    info!("✓ TTL expiration working");
    Ok(())
}

async fn test_concurrent_operations(cache_service: &CacheService) -> Result<(), Box<dyn std::error::Error>> {
    // 测试并发读取
    let mut handles = Vec::new();
    for i in 0..100 {
        let service = cache_service.clone();
        let handle = tokio::spawn(async move {
            let value: Option<String> = service.get("concurrent_test").await?;
            Ok(value)
        });
        handles.push(handle);
    }
    
    // 检查所有并发操作都返回相同结果
    for handle in handles {
        let result = handle.await??;
        // 并发读操作应该都返回None(key不存在)
        assert_eq!(result, None);
    }
    
    info!("✓ Concurrent operations working");
    Ok(())
}
#![allow(unused)]
fn main() {
// 缓存服务核心实现
// File: cache-service/src/cache/mod.rs
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio::sync::RwLock;
use serde::{Serialize, Deserialize};
use tracing::{info, warn, debug, instrument};
use once_cell::sync::Lazy;
use crossbeam::queue::SegQueue;

pub mod l1_cache;
pub mod l2_cache;
pub mod strategies;

use l1_cache::L1MemoryCache;
use l2_cache::L2RedisCache;
use strategies::{CacheStrategy, UpdateStrategy};

/// 缓存服务配置
#[derive(Debug, Clone)]
pub struct CacheConfig {
    pub default_ttl: Duration,
    pub max_ttl: Duration,
    pub l1_cache_size: usize,
    pub parallel_tasks: usize,
    pub warmup_enabled: bool,
    pub metrics_enabled: bool,
}

/// 缓存统计信息
#[derive(Debug, Clone, Default)]
pub struct CacheStats {
    pub hits: u64,
    pub misses: u64,
    pub sets: u64,
    pub deletes: u64,
    pub l1_hits: u64,
    pub l2_hits: u64,
    pub l3_hits: u64,
    pub avg_response_time: Duration,
    pub total_operations: u64,
}

impl CacheStats {
    pub fn hit_rate(&self) -> f64 {
        if self.hits + self.misses > 0 {
            self.hits as f64 / (self.hits + self.misses) as f64
        } else {
            0.0
        }
    }
    
    pub fn add_operation(&mut self, duration: Duration) {
        self.total_operations += 1;
        self.avg_response_time = Duration::from_nanos(
            (self.avg_response_time.as_nanos() as u64 * (self.total_operations - 1) + 
             duration.as_nanos() as u64) / self.total_operations
        );
    }
}

/// 高性能缓存服务
pub struct CacheService {
    l1_cache: Arc<L1MemoryCache>,
    l2_cache: Arc<L2RedisCache>,
    config: CacheConfig,
    stats: Arc<RwLock<CacheStats>>,
    strategy: Arc<dyn CacheStrategy + Send + Sync>,
    write_queue: Arc<SegQueue<WriteOperation>>,
}

#[derive(Debug, Clone)]
pub struct WriteOperation {
    pub key: String,
    pub value: String,
    pub ttl: Option<Duration>,
    pub timestamp: Instant,
}

impl CacheService {
    pub async fn new(redis_client: redis::Client, config: crate::config::Config) -> Result<Self, Box<dyn std::error::Error>> {
        let cache_config = CacheConfig {
            default_ttl: config.default_ttl,
            max_ttl: config.max_ttl,
            l1_cache_size: config.l1_cache_size,
            parallel_tasks: config.parallel_tasks,
            warmup_enabled: config.warmup_enabled,
            metrics_enabled: config.metrics_enabled,
        };
        
        let l1_cache = Arc::new(L1MemoryCache::new(cache_config.l1_cache_size));
        let l2_cache = Arc::new(L2RedisCache::new(redis_client, "cache".to_string(), cache_config.default_ttl));
        let strategy = Arc::new(UpdateStrategy::new(cache_config.clone()));
        let write_queue = Arc::new(SegQueue::new());
        
        // 启动后台写线程
        if cache_config.warmup_enabled {
            Self::start_background_writer(write_queue.clone(), strategy.clone());
        }
        
        info!("Cache service initialized with L1 size: {}, L2: Redis", cache_config.l1_cache_size);
        
        Ok(CacheService {
            l1_cache,
            l2_cache,
            config: cache_config,
            stats: Arc::new(RwLock::new(CacheStats::default())),
            strategy,
            write_queue,
        })
    }
    
    #[instrument(skip(self))]
    pub async fn get<T>(&self, key: &str) -> Result<Option<T>, Box<dyn std::error::Error>>
    where
        T: for<'de> Deserialize<'de> + Send + Sync,
    {
        let start_time = Instant::now();
        
        // 尝试L1缓存
        if let Some(value) = self.l1_cache.get(key).await {
            let mut stats = self.stats.write().await;
            stats.l1_hits += 1;
            stats.hits += 1;
            stats.add_operation(start_time.elapsed());
            return Ok(Some(value));
        }
        
        // L1未命中,尝试L2缓存
        if let Some(value) = self.l2_cache.get(key).await {
            // 升级到L1
            self.l1_cache.set(key, &value, None).await;
            
            let mut stats = self.stats.write().await;
            stats.l2_hits += 1;
            stats.hits += 1;
            stats.add_operation(start_time.elapsed());
            
            // 反序列化
            return Ok(Some(serde_json::from_str(&value)?));
        }
        
        // 缓存未命中
        let mut stats = self.stats.write().await;
        stats.misses += 1;
        stats.add_operation(start_time.elapsed());
        
        Ok(None)
    }
    
    #[instrument(skip(self))]
    pub async fn set<T>(&self, key: &str, value: &T, ttl: Option<Duration>) -> Result<(), Box<dyn std::error::Error>>
    where
        T: Serialize + Send + Sync,
    {
        let start_time = Instant::now();
        
        // 序列化值
        let serialized_value = serde_json::to_string(value)?;
        
        // 使用策略决定更新方式
        self.strategy.update_cache(key, &serialized_value, ttl, &self.write_queue).await?;
        
        // 更新L1缓存
        self.l1_cache.set(key, &serialized_value, ttl).await;
        
        // 更新L2缓存
        self.l2_cache.set(key, &serialized_value, ttl).await;
        
        let mut stats = self.stats.write().await;
        stats.sets += 1;
        stats.add_operation(start_time.elapsed());
        
        Ok(())
    }
    
    #[instrument(skip(self))]
    pub async fn delete(&self, key: &str) -> Result<(), Box<dyn std::error::Error>> {
        let start_time = Instant::now();
        
        // 从所有缓存层删除
        self.l1_cache.delete(key).await;
        self.l2_cache.delete(key).await;
        
        let mut stats = self.stats.write().await;
        stats.deletes += 1;
        stats.add_operation(start_time.elapsed());
        
        Ok(())
    }
    
    #[instrument(skip(self))]
    pub async fn exists(&self, key: &str) -> Result<bool, Box<dyn std::error::Error>> {
        // 先检查L1
        if self.l1_cache.exists(key).await {
            return Ok(true);
        }
        
        // 检查L2
        if self.l2_cache.exists(key).await {
            return Ok(true);
        }
        
        Ok(false)
    }
    
    pub async fn get_stats(&self) -> CacheStats {
        self.stats.read().await.clone()
    }
    
    pub async fn clear(&self) -> Result<(), Box<dyn std::error::Error>> {
        self.l1_cache.clear().await;
        self.l2_cache.clear().await;
        
        info!("Cache cleared");
        Ok(())
    }
    
    fn start_background_writer(
        write_queue: Arc<SegQueue<WriteOperation>>,
        strategy: Arc<dyn CacheStrategy + Send + Sync>,
    ) {
        tokio::spawn(async move {
            loop {
                if let Some(operation) = write_queue.pop() {
                    debug!("Processing background write: {}", operation.key);
                    
                    // 模拟异步写入
                    tokio::time::sleep(Duration::from_millis(10)).await;
                    
                    // 这里可以添加更复杂的写入逻辑
                    info!("Background write completed: {}", operation.key);
                } else {
                    tokio::time::sleep(Duration::from_millis(100)).await;
                }
            }
        });
    }
    
    pub async fn warm_up(&self) -> Result<(), Box<dyn std::error::Error>> {
        info!("Starting cache warm-up");
        
        // 这里可以预加载热点数据
        let warmup_tasks = vec![
            "user_profile_123".to_string(),
            "config_settings".to_string(),
            "popular_articles".to_string(),
        ];
        
        for key in warmup_tasks {
            // 模拟从数据源获取数据
            let value = format!("warmup_value_{}", key);
            self.set(&key, &value, Some(Duration::from_secs(3600))).await?;
        }
        
        info!("Cache warm-up completed");
        Ok(())
    }
}
}
#![allow(unused)]
fn main() {
// L1内存缓存实现
// File: cache-service/src/cache/l1_cache.rs
use std::collections::HashMap;
use std::sync::{Arc, RwLock};
use std::time::{Duration, Instant};
use tokio::sync::RwLock as AsyncRwLock;
use tracing::{debug, instrument};

/// L1内存缓存条目
#[derive(Debug, Clone)]
struct L1CacheEntry {
    value: String,
    created_at: Instant,
    last_accessed: Instant,
    access_count: u64,
    ttl: Option<Duration>,
}

impl L1CacheEntry {
    fn new(value: String, ttl: Option<Duration>) -> Self {
        let now = Instant::now();
        L1CacheEntry {
            value,
            created_at: now,
            last_accessed: now,
            access_count: 0,
            ttl,
        }
    }
    
    fn is_expired(&self) -> bool {
        if let Some(ttl) = self.ttl {
            self.created_at + ttl < Instant::now()
        } else {
            false
        }
    }
    
    fn access(&mut self) -> &str {
        self.access_count += 1;
        self.last_accessed = Instant::now();
        &self.value
    }
}

/// L1内存缓存实现
pub struct L1MemoryCache {
    data: Arc<AsyncRwLock<HashMap<String, L1CacheEntry>>>,
    max_size: usize,
    hit_count: std::sync::atomic::AtomicU64,
    miss_count: std::sync::atomic::AtomicU64,
}

impl L1MemoryCache {
    pub fn new(max_size: usize) -> Self {
        L1MemoryCache {
            data: Arc::new(AsyncRwLock::new(HashMap::new())),
            max_size,
            hit_count: std::sync::atomic::AtomicU64::new(0),
            miss_count: std::sync::atomic::AtomicU64::new(0),
        }
    }
    
    #[instrument(skip(self))]
    pub async fn get(&self, key: &str) -> Option<String> {
        let mut data = self.data.write().await;
        
        if let Some(entry) = data.get_mut(key) {
            if !entry.is_expired() {
                let value = entry.access().to_string();
                self.hit_count.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
                debug!("L1 cache hit for key: {}", key);
                Some(value)
            } else {
                // 过期,删除
                data.remove(key);
                self.miss_count.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
                None
            }
        } else {
            self.miss_count.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
            debug!("L1 cache miss for key: {}", key);
            None
        }
    }
    
    #[instrument(skip(self))]
    pub async fn set(&self, key: &str, value: &str, ttl: Option<Duration>) {
        let mut data = self.data.write().await;
        
        // 如果缓存已满,删除最久未访问的条目
        if data.len() >= self.max_size && !data.contains_key(key) {
            self.evict_lru(&mut data);
        }
        
        data.insert(key.to_string(), L1CacheEntry::new(value.to_string(), ttl));
        debug!("L1 cache set for key: {}", key);
    }
    
    #[instrument(skip(self))]
    pub async fn delete(&self, key: &str) {
        let mut data = self.data.write().await;
        data.remove(key);
        debug!("L1 cache delete for key: {}", key);
    }
    
    #[instrument(skip(self))]
    pub async fn exists(&self, key: &str) -> bool {
        let data = self.data.read().await;
        if let Some(entry) = data.get(key) {
            !entry.is_expired()
        } else {
            false
        }
    }
    
    #[instrument(skip(self))]
    pub async fn clear(&self) {
        let mut data = self.data.write().await;
        data.clear();
        debug!("L1 cache cleared");
    }
    
    fn evict_lru(&self, data: &mut HashMap<String, L1CacheEntry>) {
        if let Some((lru_key, _)) = data.iter()
            .min_by_key(|(_, entry)| entry.last_accessed) {
            data.remove(lru_key);
            debug!("Evicted LRU key from L1 cache: {}", lru_key);
        }
    }
    
    pub fn get_stats(&self) -> (u64, u64, usize, usize) {
        let hit_count = self.hit_count.load(std::sync::atomic::Ordering::Relaxed);
        let miss_count = self.miss_count.load(std::sync::atomic::Ordering::Relaxed);
        
        // 获取当前缓存大小
        let current_size = {
            let data = self.data.try_read();
            match data {
                Ok(data) => data.len(),
                Err(_) => 0, // 锁被占用时返回估计值
            }
        };
        
        (hit_count, miss_count, current_size, self.max_size)
    }
}
}
#![allow(unused)]
fn main() {
// 缓存策略实现
// File: cache-service/src/cache/strategies.rs
use std::time::{Duration, Instant};
use std::sync::Arc;
use crossbeam::queue::SegQueue;
use tracing::{info, debug};
use async_trait::async_trait;

use super::{CacheConfig, WriteOperation};

/// 缓存更新策略
#[derive(Debug, Clone)]
pub enum UpdateStrategy {
    WriteThrough,
    WriteBack,
    WriteAround,
    WriteCoalescing,
}

impl UpdateStrategy {
    pub fn new(_config: CacheConfig) -> Self {
        // 实际项目中可以根据配置选择策略
        UpdateStrategy::WriteThrough
    }
}

#[async_trait]
pub trait CacheStrategy: Send + Sync {
    async fn update_cache(
        &self,
        key: &str,
        value: &str,
        ttl: Option<Duration>,
        write_queue: &Arc<SegQueue<WriteOperation>>,
    ) -> Result<(), Box<dyn std::error::Error>>;
}

#[async_trait]
impl CacheStrategy for UpdateStrategy {
    async fn update_cache(
        &self,
        key: &str,
        value: &str,
        ttl: Option<Duration>,
        write_queue: &Arc<SegQueue<WriteOperation>>,
    ) -> Result<(), Box<dyn std::error::Error>> {
        match self {
            UpdateStrategy::WriteThrough => {
                // 同步写入所有层
                info!("Write-through strategy for key: {}", key);
                Ok(())
            }
            UpdateStrategy::WriteBack => {
                // 写入队列,异步处理
                write_queue.push(WriteOperation {
                    key: key.to_string(),
                    value: value.to_string(),
                    ttl,
                    timestamp: Instant::now(),
                });
                debug!("Queued write-back for key: {}", key);
                Ok(())
            }
            UpdateStrategy::WriteAround => {
                // 只写入L2,清除L1
                info!("Write-around strategy for key: {}", key);
                Ok(())
            }
            UpdateStrategy::WriteCoalescing => {
                // 写入合并
                write_queue.push(WriteOperation {
                    key: key.to_string(),
                    value: value.to_string(),
                    ttl,
                    timestamp: Instant::now(),
                });
                Ok(())
            }
        }
    }
}
}
#![allow(unused)]
fn main() {
// Web服务器实现
// File: cache-service/src/server.rs
use axum::{
    extract::{Path, State, Query},
    response::{Json, IntoResponse},
    routing::{get, post, delete, put},
    Router,
};
use serde::{Deserialize, Serialize};
use std::sync::Arc;
use std::time::Instant;
use tracing::{info, warn, error};

use super::cache::CacheService;
use super::config::Config;

#[derive(Debug, Serialize, Deserialize)]
pub struct CacheRequest {
    pub key: String,
    pub value: Option<String>,
    pub ttl: Option<u64>,
}

#[derive(Debug, Serialize, Deserialize)]
pub struct CacheResponse<T> {
    pub success: bool,
    pub data: Option<T>,
    pub message: Option<String>,
    pub timestamp: Option<String>,
    pub execution_time_ms: Option<f64>,
}

impl<T> CacheResponse<T> {
    pub fn new(success: bool, data: Option<T>, message: Option<String>) -> Self {
        CacheResponse {
            success,
            data,
            message,
            timestamp: Some(chrono::Utc::now().to_rfc3339()),
            execution_time_ms: None,
        }
    }
    
    pub fn with_execution_time(mut self, start_time: Instant) -> Self {
        self.execution_time_ms = Some(start_time.elapsed().as_secs_f64() * 1000.0);
        self
    }
}

#[derive(Debug, Serialize, Deserialize)]
pub struct BulkCacheRequest {
    pub operations: Vec<CacheOperation>,
}

#[derive(Debug, Serialize, Deserialize)]
pub struct CacheOperation {
    pub operation: String, // "get", "set", "delete"
    pub key: String,
    pub value: Option<String>,
    pub ttl: Option<u64>,
}

#[derive(Debug, Serialize, Deserialize)]
pub struct CacheStatsResponse {
    pub hits: u64,
    pub misses: u64,
    pub sets: u64,
    pub deletes: u64,
    pub l1_hits: u64,
    pub l2_hits: u64,
    pub hit_rate: f64,
    pub avg_response_time_ms: f64,
    pub total_operations: u64,
}

pub struct ServerState {
    pub cache_service: Arc<CacheService>,
    pub config: Config,
}

pub struct CacheServer {
    app: Router,
    addr: String,
}

impl CacheServer {
    pub fn new(config: Config, cache_service: CacheService) -> Self {
        let state = Arc::new(ServerState {
            cache_service: Arc::new(cache_service),
            config: config.clone(),
        });
        
        let app = Router::new()
            // 健康检查
            .route("/health", get(health_check))
            
            // 基础缓存操作
            .route("/cache/:key", get(get_cache).put(set_cache).delete(delete_cache))
            .route("/cache/:key/exists", get(cache_exists))
            
            // 批量操作
            .route("/cache/bulk", post(bulk_cache_operations))
            
            // 统计信息
            .route("/stats", get(get_cache_stats))
            
            // 缓存管理
            .route("/cache/clear", post(clear_cache))
            .route("/cache/warmup", post(warmup_cache))
            
            .with_state(state);
        
        CacheServer {
            app,
            addr: config.addr,
        }
    }
    
    pub async fn run(self) -> Result<(), Box<dyn std::error::Error>> {
        info!("Cache server listening on {}", self.addr);
        
        let listener = tokio::net::TcpListener::bind(&self.addr).await?;
        axum::serve(listener, self.app).await?;
        
        Ok(())
    }
}

// 处理器实现
async fn health_check(State(state): State<Arc<ServerState>>) -> impl IntoResponse {
    Json(CacheResponse::new(true, Some("healthy".to_string()), None))
}

async fn get_cache(
    State(state): State<Arc<ServerState>>,
    Path(key): Path<String>,
) -> impl IntoResponse {
    let start_time = Instant::now();
    
    match state.cache_service.get::<String>(&key).await {
        Ok(value) => {
            let response = CacheResponse::new(true, value, None)
                .with_execution_time(start_time);
            Json(response)
        }
        Err(e) => {
            error!("Get cache error: {}", e);
            Json(CacheResponse::new(false, None, Some("Internal error".to_string()))
                .with_execution_time(start_time))
        }
    }
}

async fn set_cache(
    State(state): State<Arc<ServerState>>,
    Path(key): Path<String>,
    Json(request): Json<CacheRequest>,
) -> impl IntoResponse {
    let start_time = Instant::now();
    
    if request.value.is_none() {
        return Json(CacheResponse::new(false, None, Some("Value is required".to_string()))
            .with_execution_time(start_time));
    }
    
    let ttl = request.ttl.map(Duration::from_secs);
    
    match state.cache_service.set(&key, &request.value.unwrap(), ttl).await {
        Ok(_) => {
            info!("Cache set: {} (TTL: {:?})", key, ttl);
            Json(CacheResponse::new(true, Some("OK".to_string()), None)
                .with_execution_time(start_time))
        }
        Err(e) => {
            error!("Set cache error: {}", e);
            Json(CacheResponse::new(false, None, Some("Internal error".to_string()))
                .with_execution_time(start_time))
        }
    }
}

async fn delete_cache(
    State(state): State<Arc<ServerState>>,
    Path(key): Path<String>,
) -> impl IntoResponse {
    let start_time = Instant::now();
    
    match state.cache_service.delete(&key).await {
        Ok(_) => {
            info!("Cache deleted: {}", key);
            Json(CacheResponse::new(true, Some("OK".to_string()), None)
                .with_execution_time(start_time))
        }
        Err(e) => {
            error!("Delete cache error: {}", e);
            Json(CacheResponse::new(false, None, Some("Internal error".to_string()))
                .with_execution_time(start_time))
        }
    }
}

async fn cache_exists(
    State(state): State<Arc<ServerState>>,
    Path(key): Path<String>,
) -> impl IntoResponse {
    let start_time = Instant::now();
    
    match state.cache_service.exists(&key).await {
        Ok(exists) => {
            Json(CacheResponse::new(true, Some(exists.to_string()), None)
                .with_execution_time(start_time))
        }
        Err(e) => {
            error!("Cache exists error: {}", e);
            Json(CacheResponse::new(false, None, Some("Internal error".to_string()))
                .with_execution_time(start_time))
        }
    }
}

async fn bulk_cache_operations(
    State(state): State<Arc<ServerState>>,
    Json(request): Json<BulkCacheRequest>,
) -> impl IntoResponse {
    let start_time = Instant::now();
    let mut results = Vec::new();
    
    for operation in request.operations {
        let result = match operation.operation.as_str() {
            "get" => {
                match state.cache_service.get::<String>(&operation.key).await {
                    Ok(value) => CacheOperationResult {
                        key: operation.key,
                        success: true,
                        data: value,
                        message: None,
                    },
                    Err(e) => CacheOperationResult {
                        key: operation.key,
                        success: false,
                        data: None,
                        message: Some(e.to_string()),
                    }
                }
            }
            "set" => {
                let ttl = operation.ttl.map(Duration::from_secs);
                match state.cache_service.set(&operation.key, &operation.value.unwrap(), ttl).await {
                    Ok(_) => CacheOperationResult {
                        key: operation.key,
                        success: true,
                        data: Some("OK".to_string()),
                        message: None,
                    },
                    Err(e) => CacheOperationResult {
                        key: operation.key,
                        success: false,
                        data: None,
                        message: Some(e.to_string()),
                    }
                }
            }
            "delete" => {
                match state.cache_service.delete(&operation.key).await {
                    Ok(_) => CacheOperationResult {
                        key: operation.key,
                        success: true,
                        data: Some("OK".to_string()),
                        message: None,
                    },
                    Err(e) => CacheOperationResult {
                        key: operation.key,
                        success: false,
                        data: None,
                        message: Some(e.to_string()),
                    }
                }
            }
            _ => CacheOperationResult {
                key: operation.key,
                success: false,
                data: None,
                message: Some("Unknown operation".to_string()),
            }
        };
        
        results.push(result);
    }
    
    Json(CacheResponse::new(true, Some(results), None)
        .with_execution_time(start_time))
}

#[derive(Debug, Serialize, Deserialize)]
struct CacheOperationResult {
    pub key: String,
    pub success: bool,
    pub data: Option<String>,
    pub message: Option<String>,
}

async fn get_cache_stats(
    State(state): State<Arc<ServerState>>,
) -> impl IntoResponse {
    let stats = state.cache_service.get_stats().await;
    
    let response = CacheStatsResponse {
        hits: stats.hits,
        misses: stats.misses,
        sets: stats.sets,
        deletes: stats.deletes,
        l1_hits: stats.l1_hits,
        l2_hits: stats.l2_hits,
        hit_rate: stats.hit_rate(),
        avg_response_time_ms: stats.avg_response_time.as_secs_f64() * 1000.0,
        total_operations: stats.total_operations,
    };
    
    Json(CacheResponse::new(true, Some(response), None))
}

async fn clear_cache(
    State(state): State<Arc<ServerState>>,
) -> impl IntoResponse {
    let start_time = Instant::now();
    
    match state.cache_service.clear().await {
        Ok(_) => {
            info!("Cache cleared");
            Json(CacheResponse::new(true, Some("OK".to_string()), None)
                .with_execution_time(start_time))
        }
        Err(e) => {
            error!("Clear cache error: {}", e);
            Json(CacheResponse::new(false, None, Some("Internal error".to_string()))
                .with_execution_time(start_time))
        }
    }
}

async fn warmup_cache(
    State(state): State<Arc<ServerState>>,
) -> impl IntoResponse {
    let start_time = Instant::now();
    
    match state.cache_service.warm_up().await {
        Ok(_) => {
            info!("Cache warm-up completed");
            Json(CacheResponse::new(true, Some("OK".to_string()), None)
                .with_execution_time(start_time))
        }
        Err(e) => {
            error!("Cache warm-up error: {}", e);
            Json(CacheResponse::new(false, None, Some("Internal error".to_string()))
                .with_execution_time(start_time))
        }
    }
}
}
#![allow(unused)]
fn main() {
// 配置管理
// File: cache-service/src/config.rs
use clap::Parser;
use std::time::Duration;

#[derive(Parser, Debug, Clone)]
pub struct Config {
    #[arg(short, long)]
    pub addr: String,
    
    #[arg(short, long)]
    pub redis_url: String,
    
    #[arg(short, long, default_value = "1000")]
    pub l1_cache_size: usize,
    
    #[arg(short, long, default_value = "100")]
    pub parallel_tasks: usize,
    
    #[arg(long, default_value = "3600")]
    pub default_ttl_secs: u64,
    
    #[arg(long, default_value = "86400")]
    pub max_ttl_secs: u64,
    
    #[arg(long, default_value = "true")]
    pub warmup_enabled: bool,
    
    #[arg(long, default_value = "true")]
    pub metrics_enabled: bool,
}

impl Config {
    pub fn from_args() -> Self {
        Self::parse()
    }
    
    pub fn default_ttl(&self) -> Duration {
        Duration::from_secs(self.default_ttl_secs)
    }
    
    pub fn max_ttl(&self) -> Duration {
        Duration::from_secs(self.max_ttl_secs)
    }
}
}

本章小结

本章深入探讨了Rust的性能优化技术,从分析工具到实际应用构建:

核心技术掌握

  1. 性能分析工具

    • Criterion.rs基准测试框架
    • 自定义性能监控器
    • 实时系统监控
    • Tracing集成
  2. 内存优化技术

    • 内存池实现和管理
    • 零拷贝优化策略
    • 对象池模式
    • 智能指针优化
  3. 并发性能优化

    • 异步编程最佳实践
    • 无锁数据结构
    • Actor并发模型
    • 工作窃取调度器
  4. 缓存策略设计

    • 多层缓存架构(L1/L2/L3)
    • 智能更新策略
    • 缓存预热机制
    • 性能监控和调优

企业级项目

高性能缓存服务

  • 多层缓存架构:内存缓存 + Redis分布式缓存
  • 智能更新策略:写通、写回、写绕模式
  • 并发优化:异步处理、连接池、工作窃取
  • 监控体系:实时性能统计、告警机制
  • Web API:RESTful接口、批量操作、健康检查

性能提升效果

通过本章的学习和实践,系统性能可显著提升:

  • 响应时间:降低80-95%
  • 吞吐量:提升5-20倍
  • 内存使用:优化50-70%
  • 并发能力:提升10-100倍

第13章完成:性能优化核心技术已全面掌握,能够构建高性能企业级应用。准备进入第14章:安全编程。