第13章:性能优化
章节概述
性能优化是现代软件开发中的关键技能。在本章中,我们将深入探索Rust的性能优化技术,从底层内存管理到高并发处理,掌握构建高性能系统的核心技术。本章不仅关注理论,更重要的是通过实际项目将理论应用到实践中。
学习目标:
- 掌握Rust性能分析工具和方法
- 理解内存管理优化技术
- 学会并发性能优化策略
- 掌握缓存策略和实现
- 设计并实现一个高性能缓存服务系统
实战项目:构建一个企业级高性能缓存服务,支持分布式缓存、内存池管理、性能监控、故障恢复等企业级特性。
13.1 性能分析基础
13.1.1 性能分析工具
Rust提供了多种性能分析工具,帮助开发者识别性能瓶颈:
13.1.1.1 Criterion.rs - 基准测试框架
#![allow(unused)] fn main() { // 性能基准测试 // File: performance-benches/Cargo.toml [package] name = "performance-benches" version = "0.1.0" edition = "2021" [dev-dependencies] criterion = { version = "0.5", features = ["html_reports"] } tokio = { version = "1.0", features = ["full"] } redis = { version = "0.24" } sqlx = { version = "0.7" } [[bench]] name = "string_operations" harness = false [[bench]] name = "database_queries" harness = false [[bench]] name = "concurrent_operations" harness = false }
#![allow(unused)] fn main() { // File: performance-benches/benches/string_operations.rs use criterion::{black_box, criterion_group, criterion_main, Criterion, BenchmarkId}; use std::collections::HashMap; use std::time::Duration; fn string_concatenation_bench(c: &mut Criterion) { let mut group = c.benchmark_group("string_operations"); group.sample_size(100); group.measurement_time(Duration::from_secs(10)); // String拼接测试 let test_sizes = vec![10, 100, 1000, 10000]; for size in test_sizes { group.bench_with_input( BenchmarkId::new("string_push", size), &size, |b, &size| { b.iter(|| { let mut s = String::new(); for i in 0..size { s.push_str(&format!("item_{}", i)); } black_box(s); }) }, ); group.bench_with_input( BenchmarkId::new("format_macro", size), &size, |b, &size| { b.iter(|| { let mut items = Vec::new(); for i in 0..size { items.push(format!("item_{}", i)); } let s = items.join(", "); black_box(s); }) }, ); group.bench_with_input( BenchmarkId::new("string_builder", size), &size, |b, &size| { b.iter(|| { let mut s = String::with_capacity(size * 10); for i in 0..size { s.push_str(&format!("item_{}", i)); } black_box(s); }) }, ); } group.finish(); } fn hashmap_operations_bench(c: &mut Criterion) { let mut group = c.benchmark_group("hashmap_operations"); group.sample_size(50); // HashMap插入性能测试 group.bench_function("hashmap_insert", |b| { b.iter(|| { let mut map = HashMap::new(); for i in 0..1000 { map.insert(format!("key_{}", i), format!("value_{}", i)); } black_box(map); }) }); // HashMap查找性能测试 group.bench_function("hashmap_lookup", |b| { let mut map = HashMap::new(); for i in 0..1000 { map.insert(format!("key_{}", i), format!("value_{}", i)); } b.iter(|| { for i in 0..1000 { let key = format!("key_{}", i); let value = map.get(&key); black_box(value); } }) }); // 预分配容量测试 group.bench_function("hashmap_with_capacity", |b| { b.iter(|| { let mut map = HashMap::with_capacity(1000); for i in 0..1000 { map.insert(format!("key_{}", i), format!("value_{}", i)); } black_box(map); }) }); group.finish(); } fn sorting_algorithms_bench(c: &mut Criterion) { let mut group = c.benchmark_group("sorting"); group.sample_size(30); let test_data_sizes = vec![100, 1000, 10000]; for &size in &test_data_sizes { let data: Vec<i32> = (0..size).rev().collect(); // 反序数据 group.bench_with_input( BenchmarkId::new("sort_unstable", size), &data, |b, data| { b.iter(|| { let mut data = data.clone(); data.sort_unstable(); black_box(data); }) }, ); group.bench_with_input( BenchmarkId::new("sort_stable", size), &data, |b, data| { b.iter(|| { let mut data = data.clone(); data.sort(); black_box(data); }) }, ); } group.finish(); } criterion_group!( benches, string_concatenation_bench, hashmap_operations_bench, sorting_algorithms_bench ); criterion_main!(benches); }
13.1.1.2 perf工具集成
#![allow(unused)] fn main() { // 性能分析辅助工具 // File: perf-tools/src/lib.rs use std::time::{Duration, Instant}; use std::collections::HashMap; use tracing::{info, warn}; use once_cell::sync::Lazy; use std::sync::Mutex; /// 全局性能监控器 pub static PERF_MONITOR: Lazy<Mutex<PerformanceMonitor>> = Lazy::new(|| { Mutex::new(PerformanceMonitor::new()) }); /// 性能监控器 pub struct PerformanceMonitor { metrics: HashMap<String, MetricData>, enabled: bool, } #[derive(Debug, Clone)] pub struct MetricData { pub name: String, pub total_time: Duration, pub call_count: u64, pub min_time: Duration, pub max_time: Duration, pub avg_time: Duration, pub last_updated: Instant, } impl PerformanceMonitor { pub fn new() -> Self { PerformanceMonitor { metrics: HashMap::new(), enabled: true, } } pub fn enable(&mut self) { self.enabled = true; } pub fn disable(&mut self) { self.enabled = false; } pub fn record_metric(&mut self, name: &str, duration: Duration) { if !self.enabled { return; } let metric = self.metrics.entry(name.to_string()).or_insert_with(|| { MetricData { name: name.to_string(), total_time: Duration::from_secs(0), call_count: 0, min_time: Duration::MAX, max_time: Duration::from_secs(0), avg_time: Duration::from_secs(0), last_updated: Instant::now(), } }); metric.total_time += duration; metric.call_count += 1; metric.last_updated = Instant::now(); if duration < metric.min_time { metric.min_time = duration; } if duration > metric.max_time { metric.max_time = duration; } metric.avg_time = Duration::from_nanos( metric.total_time.as_nanos() as u64 / metric.call_count ); } pub fn get_metrics(&self) -> Vec<MetricData> { self.metrics.values().cloned().collect() } pub fn report(&self) { if !self.enabled { return; } info!("Performance Metrics Report"); info!("{:<30} | {:<10} | {:<15} | {:<15} | {:<15}", "Operation", "Count", "Avg Time", "Min Time", "Max Time"); info!("{:-<30}-+-{:-<10}-+-{:-<15}-+-{:-<15}-+-{:-<15}", "", "", "", "", ""); let mut metrics: Vec<_> = self.metrics.values().collect(); metrics.sort_by(|a, b| b.avg_time.cmp(&a.avg_time)); for metric in metrics { info!("{:<30} | {:<10} | {:<15} | {:<15} | {:<15}", metric.name, metric.call_count, format!("{:.2}ms", metric.avg_time.as_secs_f64() * 1000.0), format!("{:.2}ms", metric.min_time.as_secs_f64() * 1000.0), format!("{:.2}ms", metric.max_time.as_secs_f64() * 1000.0)); } } } /// 性能分析器包装器 pub struct Profiler { name: String, start_time: Instant, } impl Profiler { pub fn new(name: &str) -> Self { Profiler { name: name.to_string(), start_time: Instant::now(), } } } impl Drop for Profiler { fn drop(&mut self) { let duration = self.start_time.elapsed(); PERF_MONITOR.lock().unwrap().record_metric(&self.name, duration); } } /// 宏定义便于使用 #[macro_export] macro_rules! profile_func { ($name:expr) => { let _profiler = $crate::Profiler::new($name); }; } #[macro_export] macro_rules! profile_operation { ($name:expr, $op:block) => { { let _profiler = $crate::Profiler::new($name); let result = $op; drop(_profiler); result } }; } /// 内存使用监控 pub struct MemoryProfiler { start_memory: usize, peak_memory: usize, } impl MemoryProfiler { pub fn new() -> Self { let start_memory = Self::get_memory_usage(); MemoryProfiler { start_memory, peak_memory: start_memory, } } fn get_memory_usage() -> usize { // 在Linux系统上读取 /proc/self/status #[cfg(target_os = "linux")] { if let Ok(content) = std::fs::read_to_string("/proc/self/status") { for line in content.lines() { if line.starts_with("VmRSS:") { if let Some(kb_str) = line.split_whitespace().nth(1) { return kb_str.parse::<usize>().unwrap_or(0) * 1024; // 转换为字节 } } } } } // 其他平台使用默认实现 0 } pub fn update_peak(&mut self) { let current = Self::get_memory_usage(); if current > self.peak_memory { self.peak_memory = current; } } pub fn report(&self) { let current = Self::get_memory_usage(); info!("Memory Usage: Current: {:.2}MB, Peak: {:.2}MB, Change: {:.2}MB", current as f64 / 1024.0 / 1024.0, self.peak_memory as f64 / 1024.0 / 1024.0, (current - self.start_memory) as f64 / 1024.0 / 1024.0); } } impl Drop for MemoryProfiler { fn drop(&mut self) { self.report(); } } }
13.1.1.3 自定义性能分析器
#![allow(unused)] fn main() { // 高级性能分析器 // File: perf-tools/src/advanced.rs use std::time::{Duration, Instant}; use std::sync::{Arc, Mutex}; use std::thread; use crossbeam::channel::{unbounded, Sender, Receiver}; use tracing::{info, warn, debug}; /// 实时性能监控 pub struct RealTimeMonitor { metrics: Arc<Mutex<MetricsCollector>>, collector_thread: Option<thread::JoinHandle<()>>, sampling_interval: Duration, } #[derive(Debug, Clone)] pub struct SystemMetrics { pub cpu_usage: f64, pub memory_usage: f64, pub gc_count: u64, pub active_connections: u64, pub request_rate: f64, pub response_time: Duration, } #[derive(Debug, Clone)] pub struct MetricsCollector { pub samples: Vec<SystemMetrics>, pub min_response_time: Duration, pub max_response_time: Duration, pub avg_response_time: Duration, pub total_requests: u64, pub error_count: u64, } impl MetricsCollector { pub fn new() -> Self { MetricsCollector { samples: Vec::new(), min_response_time: Duration::MAX, max_response_time: Duration::from_secs(0), avg_response_time: Duration::from_secs(0), total_requests: 0, error_count: 0, } } pub fn record_request(&mut self, response_time: Duration, success: bool) { self.total_requests += 1; if !success { self.error_count += 1; } if response_time < self.min_response_time { self.min_response_time = response_time; } if response_time > self.max_response_time { self.max_response_time = response_time; } // 计算平均响应时间 if self.total_requests > 0 { self.avg_response_time = Duration::from_nanos( (self.avg_response_time.as_nanos() as u64 * (self.total_requests - 1) + response_time.as_nanos() as u64) / self.total_requests ); } } pub fn collect_system_metrics(&mut self) { let metrics = SystemMetrics { cpu_usage: self.get_cpu_usage(), memory_usage: self.get_memory_usage(), gc_count: self.get_gc_count(), active_connections: self.get_active_connections(), request_rate: self.calculate_request_rate(), response_time: self.avg_response_time, }; self.samples.push(metrics); // 保持最近100个样本 if self.samples.len() > 100 { self.samples.remove(0); } } fn get_cpu_usage(&self) -> f64 { // 简化的CPU使用率计算 // 在实际项目中可以使用更精确的库 rand::random::<f64>() * 100.0 } fn get_memory_usage(&self) -> f64 { // 获取内存使用情况 #[cfg(target_os = "linux")] { if let Ok(content) = std::fs::read_to_string("/proc/meminfo") { for line in content.lines() { if line.starts_with("MemAvailable:") { if let Some(kb_str) = line.split_whitespace().nth(1) { let available_kb = kb_str.parse::<f64>().unwrap_or(0.0); let total_kb = available_kb / 0.1; // 简化计算 return (total_kb - available_kb) / total_kb * 100.0; } } } } } rand::random::<f64>() * 100.0 } fn get_gc_count(&self) -> u64 { // Rust的垃圾回收统计 // 这里返回模拟值 rand::random::<u64>() % 1000 } fn get_active_connections(&self) -> u64 { // 模拟活跃连接数 rand::random::<u64>() % 10000 } fn calculate_request_rate(&self) -> f64 { if self.samples.len() < 2 { return 0.0; } let recent_samples = &self.samples[self.samples.len().saturating_sub(10)..]; let time_diff = recent_samples.len() as f64; if time_diff > 0.0 { self.total_requests as f64 / time_diff } else { 0.0 } } } impl RealTimeMonitor { pub fn new(sampling_interval: Duration) -> Self { RealTimeMonitor { metrics: Arc::new(Mutex::new(MetricsCollector::new())), collector_thread: None, sampling_interval, } } pub fn start(&mut self) { let metrics = Arc::clone(&self.metrics); let sampling_interval = self.sampling_interval; self.collector_thread = Some(thread::spawn(move || { loop { thread::sleep(sampling_interval); if let Ok(mut collector) = metrics.lock() { collector.collect_system_metrics(); } } })); } pub fn stop(&mut self) { if let Some(handle) = self.collector_thread.take() { handle.join().unwrap_or_else(|_| { warn!("Performance monitor thread panicked"); }); } } pub fn record_request(&self, response_time: Duration, success: bool) { if let Ok(mut collector) = self.metrics.lock() { collector.record_request(response_time, success); } } pub fn get_metrics(&self) -> Option<MetricsCollector> { if let Ok(collector) = self.metrics.lock() { Some(collector.clone()) } else { None } } pub fn generate_report(&self) { if let Some(metrics) = self.get_metrics() { info!("=== Real-time Performance Report ==="); info!("Total Requests: {}", metrics.total_requests); info!("Error Count: {}", metrics.error_count); info!("Error Rate: {:.2}%", if metrics.total_requests > 0 { metrics.error_count as f64 / metrics.total_requests as f64 * 100.0 } else { 0.0 }); info!("Average Response Time: {:.2}ms", metrics.avg_response_time.as_secs_f64() * 1000.0); info!("Min Response Time: {:.2}ms", metrics.min_response_time.as_secs_f64() * 1000.0); info!("Max Response Time: {:.2}ms", metrics.max_response_time.as_secs_f64() * 1000.0); if !metrics.samples.is_empty() { let latest = &metrics.samples[metrics.samples.len() - 1]; info!("Current CPU Usage: {:.1}%", latest.cpu_usage); info!("Current Memory Usage: {:.1}%", latest.memory_usage); info!("Active Connections: {}", latest.active_connections); info!("Request Rate: {:.1} req/s", latest.request_rate); } } } } /// 性能警告系统 pub struct PerformanceAlert { thresholds: PerformanceThresholds, alert_channel: Option<Sender<PerformanceAlert>>, current_state: AlertState, } #[derive(Debug, Clone)] pub struct PerformanceThresholds { pub max_response_time: Duration, pub max_error_rate: f64, pub max_memory_usage: f64, pub max_cpu_usage: f64, } #[derive(Debug, Clone)] pub struct AlertState { pub high_response_time: bool, pub high_error_rate: bool, pub high_memory_usage: bool, pub high_cpu_usage: bool, } impl PerformanceAlert { pub fn new(thresholds: PerformanceThresholds) -> Self { PerformanceAlert { thresholds, alert_channel: None, current_state: AlertState { high_response_time: false, high_error_rate: false, high_memory_usage: false, high_cpu_usage: false, }, } } pub fn with_channel(mut self, channel: Sender<PerformanceAlert>) -> Self { self.alert_channel = Some(channel); self } pub fn check_metrics(&mut self, metrics: &MetricsCollector) { let new_state = AlertState { high_response_time: metrics.avg_response_time > self.thresholds.max_response_time, high_error_rate: if metrics.total_requests > 0 { metrics.error_count as f64 / metrics.total_requests as f64 * 100.0 } else { 0.0 } > self.thresholds.max_error_rate, high_memory_usage: false, // 需要系统级监控 high_cpu_usage: false, // 需要系统级监控 }; // 检查状态变化 self.check_state_change("High Response Time", self.current_state.high_response_time, new_state.high_response_time); self.check_state_change("High Error Rate", self.current_state.high_error_rate, new_state.high_error_rate); self.current_state = new_state; } fn check_state_change(&self, alert_type: &str, old_state: bool, new_state: bool) { if old_state != new_state { if new_state { warn!("Performance Alert: {} is now HIGH", alert_type); if let Some(ref channel) = self.alert_channel { let _ = channel.send(PerformanceAlert { thresholds: self.thresholds.clone(), alert_channel: self.alert_channel.clone(), current_state: self.current_state.clone(), }); } } else { info!("Performance Alert: {} is now normal", alert_type); } } } } }
13.1.2 使用tracing进行性能监控
#![allow(unused)] fn main() { // 集成tracing的性能监控 // File: tracing-integration/src/lib.rs use tracing::{info, instrument, span, Level}; use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; use std::time::Instant; /// 带有性能追踪的服务 #[instrument(skip(self))] pub struct TracedService { service_name: String, request_count: std::sync::atomic::AtomicU64, error_count: std::sync::atomic::AtomicU64, total_duration: std::sync::atomic::AtomicU64, } impl TracedService { pub fn new(service_name: &str) -> Self { TracedService { service_name: service_name.to_string(), request_count: std::sync::atomic::AtomicU64::new(0), error_count: std::sync::atomic::AtomicU64::new(0), total_duration: std::sync::atomic::AtomicU64::new(0), } } #[instrument(fields(request_id = %uuid::Uuid::new_v4(), user_id = %"anonymous"))] pub async fn process_request(&self, data: &str) -> Result<String, Box<dyn std::error::Error>> { let start = Instant::now(); self.request_count.fetch_add(1, std::sync::atomic::Ordering::Relaxed); let span = span!(Level::INFO, "process_request", service = %self.service_name); let _enter = span.enter(); info!("Started processing request with data length: {}", data.len()); // 模拟处理 tokio::time::sleep(tokio::time::Duration::from_millis(10)).await; if data.len() > 1000 { self.error_count.fetch_add(1, std::sync::atomic::Ordering::Relaxed); return Err("Data too large".into()); } let result = format!("Processed: {}", data); let duration = start.elapsed(); self.total_duration.fetch_add(duration.as_nanos() as u64, std::sync::atomic::Ordering::Relaxed); info!("Successfully processed request, duration: {:?}", duration); Ok(result) } pub fn get_stats(&self) -> ServiceStats { let request_count = self.request_count.load(std::sync::atomic::Ordering::Relaxed); let error_count = self.error_count.load(std::sync::atomic::Ordering::Relaxed); let total_duration = self.total_duration.load(std::sync::atomic::Ordering::Relaxed); ServiceStats { service_name: self.service_name.clone(), request_count, error_count, error_rate: if request_count > 0 { error_count as f64 / request_count as f64 * 100.0 } else { 0.0 }, avg_duration: if request_count > 0 { std::time::Duration::from_nanos(total_duration / request_count) } else { std::time::Duration::from_secs(0) }, } } } #[derive(Debug, Clone)] pub struct ServiceStats { pub service_name: String, pub request_count: u64, pub error_count: u64, pub error_rate: f64, pub avg_duration: std::time::Duration, } /// 初始化tracing性能监控 pub fn init_tracing() { tracing_subscriber::registry() .with( tracing_subscriber::EnvFilter::try_from_default_env() .unwrap_or_else(|_| "tracing_performance=debug,perf_tools=info,tokio=warn".into()), ) .with(tracing_subscriber::fmt::layer()) .with( // JSON格式化,便于日志分析 tracing_subscriber::fmt::layer() .json() .with_current_span(false) .with_span_list(false) ) .init(); } }
13.2 内存优化
13.2.1 内存管理基础
Rust的所有权系统为内存优化提供了强大的工具:
#![allow(unused)] fn main() { // 内存池实现 // File: memory-pools/src/lib.rs use std::alloc::{GlobalAlloc, Layout, System}; use std::sync::atomic::{AtomicUsize, Ordering}; use std::ptr::NonNull; use std::marker::PhantomData; use tracing::{info, warn, debug}; /// 预分配内存池 pub struct MemoryPool { pool: Vec<NonNull<u8>>, current_index: AtomicUsize, block_size: usize, total_blocks: usize, allocated: AtomicUsize, } unsafe impl Send for MemoryPool {} unsafe impl Sync for MemoryPool {} impl MemoryPool { pub fn new(block_size: usize, total_blocks: usize) -> Self { // 对齐到16字节边界 let aligned_block_size = (block_size + 15) & !15; let mut pool = Vec::with_capacity(total_blocks); for _ in 0..total_blocks { let layout = Layout::from_size_align(aligned_block_size, 16) .expect("Invalid layout"); unsafe { let ptr = System.alloc(layout); if ptr.is_null() { panic!("Failed to allocate memory for pool"); } pool.push(NonNull::new_unchecked(ptr)); } } info!("Created memory pool: {} blocks of {} bytes", total_blocks, aligned_block_size); MemoryPool { pool, current_index: AtomicUsize::new(0), block_size: aligned_block_size, total_blocks, allocated: AtomicUsize::new(0), } } pub fn allocate(&self) -> Option<NonNull<u8>> { let old_index = self.current_index.fetch_add(1, Ordering::SeqCst); let new_index = old_index % self.total_blocks; if old_index < self.total_blocks { self.allocated.fetch_add(1, Ordering::SeqCst); Some(self.pool[new_index]) } else { None // 池已耗尽 } } pub fn is_exhausted(&self) -> bool { self.allocated.load(Ordering::SeqCst) >= self.total_blocks } pub fn get_stats(&self) -> PoolStats { PoolStats { total_blocks: self.total_blocks, allocated: self.allocated.load(Ordering::SeqCst), block_size: self.block_size, utilization: self.allocated.load(Ordering::SeqCst) as f64 / self.total_blocks as f64 * 100.0, } } } impl Drop for MemoryPool { fn drop(&mut self) { let layout = Layout::from_size_align(self.block_size, 16).unwrap(); for ptr in &self.pool { unsafe { System.dealloc(ptr.as_ptr(), layout); } } debug!("Memory pool dropped, returned {} blocks to system", self.total_blocks); } } #[derive(Debug, Clone)] pub struct PoolStats { pub total_blocks: usize, pub allocated: usize, pub block_size: usize, pub utilization: f64, } /// 智能指针包装器,自动内存池分配 pub struct PooledBox<T> { ptr: NonNull<T>, pool: *const MemoryPool, _phantom: PhantomData<T>, } impl<T> PooledBox<T> { pub fn new_in(pool: &MemoryPool, value: T) -> Option<Self> { let ptr = pool.allocate()?; unsafe { ptr.as_ptr().write(value); } Some(PooledBox { ptr: ptr.cast::<T>(), pool, _phantom: PhantomData, }) } pub fn as_ref(&self) -> &T { unsafe { &*self.ptr.as_ptr() } } pub fn as_mut(&mut self) -> &mut T { unsafe { &mut *self.ptr.as_ptr() } } } impl<T> std::ops::Deref for PooledBox<T> { type Target = T; fn deref(&self) -> &Self::Target { self.as_ref() } } impl<T> std::ops::DerefMut for PooledBox<T> { fn deref_mut(&mut self) -> &mut Self::Target { self.as_mut() } } impl<T> Drop for PooledBox<T> { fn drop(&mut self) { unsafe { std::ptr::drop_in_place(self.ptr.as_ptr()); } } } unsafe impl<T: Send> Send for PooledBox<T> {} unsafe impl<T: Sync> Sync for PooledBox<T> where T: Sync {} /// 内存池管理器 pub struct PoolManager { pools: std::collections::HashMap<usize, MemoryPool>, small_object_pool: MemoryPool, large_object_pool: MemoryPool, } impl PoolManager { pub fn new() -> Self { // 小对象池:128字节块,1024个块 let small_object_pool = MemoryPool::new(128, 1024); // 大对象池:1024字节块,256个块 let large_object_pool = MemoryPool::new(1024, 256); let mut pools = std::collections::HashMap::new(); pools.insert(128, small_object_pool.pool.as_ptr() as *const MemoryPool); pools.insert(1024, large_object_pool.pool.as_ptr() as *const MemoryPool); PoolManager { pools, small_object_pool, large_object_pool, } } pub fn allocate<T>(&self, size: usize) -> Option<PooledBox<T>> { if size <= 128 { self.small_object_pool.allocate() } else if size <= 1024 { self.large_object_pool.allocate() } else { None } } pub fn get_pool_stats(&self) -> (PoolStats, PoolStats) { (self.small_object_pool.get_stats(), self.large_object_pool.get_stats()) } } /// 对象池模式 pub struct ObjectPool<T: Default + Clone> { available: std::sync::Mutex<Vec<T>>, in_use: AtomicUsize, max_size: usize, _phantom: PhantomData<T>, } impl<T: Default + Clone + Send + Sync> ObjectPool<T> { pub fn new(max_size: usize) -> Self { ObjectPool { available: std::sync::Mutex::new(Vec::with_capacity(max_size)), in_use: AtomicUsize::new(0), max_size, _phantom: PhantomData, } } pub fn get(&self) -> Option<PooledObject<T>> { if self.in_use.load(Ordering::SeqCst) >= self.max_size { return None; } let mut available = self.available.lock().unwrap(); let object = available.pop().unwrap_or_default(); self.in_use.fetch_add(1, Ordering::SeqCst); Some(PooledObject { object, pool: self as *const ObjectPool<T>, }) } pub fn get_stats(&self) -> PoolStats { let available = self.available.lock().unwrap().len(); let in_use = self.in_use.load(Ordering::SeqCst); PoolStats { total_blocks: self.max_size, allocated: available + in_use, block_size: std::mem::size_of::<T>(), utilization: (available + in_use) as f64 / self.max_size as f64 * 100.0, } } } pub struct PooledObject<T> { object: T, pool: *const ObjectPool<T>, } impl<T> std::ops::Deref for PooledObject<T> { type Target = T; fn deref(&self) -> &Self::Target { &self.object } } impl<T> std::ops::DerefMut for PooledObject<T> { fn deref_mut(&mut self) -> &mut Self::Target { &mut self.object } } impl<T> Drop for PooledObject<T> { fn drop(&mut self) { unsafe { let pool = &*self.pool; // 重置对象状态 *self.object = T::default(); let mut available = pool.available.lock().unwrap(); available.push(std::mem::replace(&mut self.object, T::default())); pool.in_use.fetch_sub(1, Ordering::SeqCst); } } } }
13.2.2 零拷贝优化
#![allow(unused)] fn main() { // 零拷贝字符串处理 // File: zero-copy/src/lib.rs use std::ops::Deref; use std::borrow::Cow; /// 零拷贝字符串包装器 pub struct ZeroCopyStr { data: Cow<'static, str>, _phantom: std::marker::PhantomData<()>, } impl ZeroCopyStr { pub fn new_static(data: &'static str) -> Self { ZeroCopyStr { data: Cow::Borrowed(data), _phantom: PhantomData, } } pub fn new_owned(data: String) -> Self { ZeroCopyStr { data: Cow::Owned(data), _phantom: PhantomData, } } pub fn as_str(&self) -> &str { &self.data } pub fn to_string_lossy(&self) -> Cow<str> { self.data.clone() } } impl Deref for ZeroCopyStr { type Target = str; fn deref(&self) -> &Self::Target { &self.data } } impl AsRef<str> for ZeroCopyStr { fn as_ref(&self) -> &str { &self.data } } impl PartialEq for ZeroCopyStr { fn eq(&self, other: &Self) -> bool { self.data == other.data } } /// 零拷贝JSON处理 pub struct ZeroCopyJson<T> { data: T, _phantom: PhantomData<()>, } impl<T> ZeroCopyJson<T> { pub fn new(data: T) -> Self { ZeroCopyJson { data, _phantom: PhantomData, } } pub fn into_inner(self) -> T { self.data } } impl<T> std::ops::Deref for ZeroCopyJson<T> { type Target = T; fn deref(&self) -> &Self::Target { &self.data } } /// 字节缓冲区池 pub struct ByteBufferPool { pool: std::sync::Mutex<Vec<Vec<u8>>>, buffer_size: usize, max_buffers: usize, } impl ByteBufferPool { pub fn new(buffer_size: usize, max_buffers: usize) -> Self { ByteBufferPool { pool: std::sync::Mutex::new(Vec::with_capacity(max_buffers)), buffer_size, max_buffers, } } pub fn get_buffer(&self) -> Vec<u8> { if let Ok(mut pool) = self.pool.lock() { if let Some(buffer) = pool.pop() { buffer } else { vec![0; self.buffer_size] } } else { vec![0; self.buffer_size] } } pub fn return_buffer(&self, mut buffer: Vec<u8>) { if buffer.len() == self.buffer_size { if let Ok(mut pool) = self.pool.lock() { if pool.len() < self.max_buffers { buffer.clear(); pool.push(buffer); } } } } } }
13.3 并发性能优化
13.3.1 异步编程优化
#![allow(unused)] fn main() { // 高性能异步处理器 // File: async-optimization/src/lib.rs use std::sync::Arc; use std::time::{Duration, Instant}; use tokio::sync::{RwLock, Semaphore, Mutex}; use tokio::task::{JoinHandle, JoinError}; use tracing::{info, warn, instrument, span, Level}; use futures::future::BoxFuture; use futures::FutureExt; /// 高性能异步任务执行器 pub struct AsyncTaskExecutor { task_semaphore: Arc<Semaphore>, active_tasks: Arc<std::sync::atomic::AtomicU64>, completed_tasks: Arc<std::sync::atomic::AtomicU64>, failed_tasks: Arc<std::sync::atomic::AtomicU64>, } impl AsyncTaskExecutor { pub fn new(max_concurrent_tasks: usize) -> Self { AsyncTaskExecutor { task_semaphore: Arc::new(Semaphore::new(max_concurrent_tasks)), active_tasks: Arc::new(std::sync::atomic::AtomicU64::new(0)), completed_tasks: Arc::new(std::sync::atomic::AtomicU64::new(0)), failed_tasks: Arc::new(std::sync::atomic::AtomicU64::new(0)), } } #[instrument(skip(self, task))] pub async fn execute<F, T>(&self, task: F) -> Result<T, Box<dyn std::error::Error + Send + Sync>> where F: FnOnce() -> BoxFuture<'static, Result<T, Box<dyn std::error::Error + Send + Sync>>> + Send + 'static, T: Send + 'static, { let _permit = self.task_semaphore.acquire().await?; self.active_tasks.fetch_add(1, std::sync::atomic::Ordering::Relaxed); let start_time = Instant::now(); let active_tasks = Arc::clone(&self.active_tasks); let completed_tasks = Arc::clone(&self.completed_tasks); let failed_tasks = Arc::clone(&self.failed_tasks); let result = tokio::spawn(async move { let task_result = task().await; // 记录完成时间 let duration = start_time.elapsed(); info!("Task completed in {:?}", duration); // 更新统计信息 active_tasks.fetch_sub(1, std::sync::atomic::Ordering::Relaxed); match &task_result { Ok(_) => completed_tasks.fetch_add(1, std::sync::atomic::Ordering::Relaxed), Err(_) => failed_tasks.fetch_add(1, std::sync::atomic::Ordering::Relaxed), } task_result }).await; match result { Ok(task_result) => task_result, Err(join_error) => { self.active_tasks.fetch_sub(1, std::sync::atomic::Ordering::Relaxed); self.failed_tasks.fetch_add(1, std::sync::atomic::Ordering::Relaxed); Err(Box::new(join_error)) } } } pub async fn batch_execute<F, T>(&self, tasks: Vec<F>) -> Vec<Result<T, Box<dyn std::error::Error + Send + Sync>>> where F: FnOnce() -> BoxFuture<'static, Result<T, Box<dyn std::error::Error + Send + Sync>>> + Send + 'static, T: Send + 'static, { let mut handles: Vec<JoinHandle<Result<T, Box<dyn std::error::Error + Send + Sync>>>> = Vec::new(); for task in tasks { let handle = tokio::spawn(async move { task().await }); handles.push(handle); } let mut results = Vec::with_capacity(handles.len()); for handle in handles { match handle.await { Ok(result) => results.push(result), Err(join_error) => results.push(Err(Box::new(join_error))), } } results } pub fn get_stats(&self) -> TaskStats { TaskStats { active_tasks: self.active_tasks.load(std::sync::atomic::Ordering::Relaxed), completed_tasks: self.completed_tasks.load(std::sync::atomic::Ordering::Relaxed), failed_tasks: self.failed_tasks.load(std::sync::atomic::Ordering::Relaxed), max_concurrent: self.task_semaphore.available_permits() + self.active_tasks.load(std::sync::atomic::Ordering::Relaxed), } } } #[derive(Debug, Clone)] pub struct TaskStats { pub active_tasks: u64, pub completed_tasks: u64, pub failed_tasks: u64, pub max_concurrent: usize, } /// 无锁并发数据结构 pub struct LockFreeQueue<T> { head: Arc<AtomicNode<T>>, tail: Arc<AtomicNode<T>>, } struct AtomicNode<T> { value: std::sync::atomic::AtomicPtr<T>, next: std::sync::atomic::AtomicPtr<AtomicNode<T>>, } impl<T> AtomicNode<T> { fn new(value: T) -> Arc<Self> { let node = Arc::new(AtomicNode { value: std::sync::atomic::AtomicPtr::new(Box::into_raw(Box::new(value))), next: std::sync::atomic::AtomicPtr::new(std::ptr::null_mut()), }); // 增加引用计数 Arc::clone(&node); node } fn load_value(&self) -> T { unsafe { *Box::from_raw(self.value.load(std::sync::atomic::Ordering::Acquire)) } } } impl<T> LockFreeQueue<T> { pub fn new() -> Self { let dummy = Arc::new(AtomicNode { value: std::sync::atomic::AtomicPtr::new(std::ptr::null_mut()), next: std::sync::atomic::AtomicPtr::new(std::ptr::null_mut()), }); LockFreeQueue { head: Arc::clone(&dummy), tail: Arc::clone(&dummy), } } pub fn enqueue(&self, value: T) { let new_node = AtomicNode::new(value); let mut current_tail = self.tail.load(std::sync::atomic::Ordering::Acquire); loop { let current_tail_next = unsafe { (*current_tail).next.load(std::sync::atomic::Ordering::Acquire) }; if current_tail_next.is_null() { if (*current_tail).next.compare_exchange_weak( std::ptr::null_mut(), Arc::as_ptr(&new_node) as *mut AtomicNode<T>, std::sync::atomic::Ordering::Release, std::sync::atomic::Ordering::Relaxed, ).is_ok() { break; } } else { let _ = self.tail.compare_exchange_weak( current_tail, current_tail_next, std::sync::atomic::Ordering::Release, std::sync::atomic::Ordering::Relaxed, ); current_tail = self.tail.load(std::sync::atomic::Ordering::Acquire); } } } pub fn dequeue(&self) -> Option<T> { let mut current_head = self.head.load(std::sync::atomic::Ordering::Acquire); loop { let next = unsafe { (*current_head).next.load(std::sync::atomic::Ordering::Acquire) }; if next.is_null() { return None; } if self.head.compare_exchange_weak( current_head, next, std::sync::atomic::Ordering::Release, std::sync::atomic::Ordering::Relaxed, ).is_ok() { unsafe { let value = (*next).load_value(); // 清理内存 let _ = Box::from_raw(current_head); Some(value) } } else { current_head = self.head.load(std::sync::atomic::Ordering::Acquire); } } } } /// 工作窃取调度器 pub struct WorkStealingScheduler { queues: Vec<Arc<Mutex<Vec<Box<dyn Fn() + Send + Sync>>>>>, num_queues: usize, } impl WorkStealingScheduler { pub fn new(num_queues: usize) -> Self { let mut queues = Vec::with_capacity(num_queues); for _ in 0..num_queues { queues.push(Arc::new(Mutex::new(Vec::new()))); } WorkStealingScheduler { queues, num_queues, } } pub fn schedule<F>(&self, work: F) where F: FnOnce() + Send + Sync + 'static, { let queue_index = std::thread::current().id().as_u128() as usize % self.num_queues; let queue = &self.queues[queue_index]; let mut queue = queue.lock().unwrap(); queue.push(Box::new(work)); } pub fn execute_one(&self, queue_index: usize) -> bool { let mut queue = self.queues[queue_index].lock().unwrap(); if let Some(work) = queue.pop() { drop(queue); // 释放锁 work(); true } else { false } } pub fn steal_work(&self, victim_index: usize) -> bool { let mut victim_queue = self.queues[victim_index].lock().unwrap(); if let Some(work) = victim_queue.pop() { drop(victim_queue); work(); true } else { false } } } }
13.3.2 并发模式优化
#![allow(unused)] fn main() { // Actor模式高性能实现 // File: actor-model/src/lib.rs use std::sync::mpsc; use std::sync::Arc; use tokio::sync::{mpsc as async_mpsc, oneshot}; use tracing::{info, warn, instrument}; /// 异步Actor系统 pub struct ActorSystem { actors: Arc<std::sync::Mutex<std::collections::HashMap<String, Arc<dyn Actor + Send + Sync>>>>, message_router: MessageRouter, } impl ActorSystem { pub fn new() -> Self { ActorSystem { actors: Arc::new(std::sync::Mutex::new(std::collections::HashMap::new())), message_router: MessageRouter::new(), } } pub fn register_actor<A: Actor + Send + Sync + 'static>(&self, id: String, actor: A) { let mut actors = self.actors.lock().unwrap(); actors.insert(id, Arc::new(actor)); info!("Actor registered: {}", id); } pub async fn send_message<M: Message>(&self, actor_id: &str, message: M) -> Result<M::Response, ActorError> where M: Send + 'static, M::Response: Send + 'static, { let actors = self.actors.lock().unwrap(); if let Some(actor) = actors.get(actor_id) { drop(actors); let (tx, rx) = oneshot::channel(); let actor_message = ActorMessage { payload: Box::new(message), response_sender: Some(tx), }; actor.handle_message(actor_message).await?; match rx.await { Ok(response) => Ok(response), Err(_) => Err(ActorError::ChannelClosed), } } else { Err(ActorError::ActorNotFound) } } } #[derive(Debug, thiserror::Error)] pub enum ActorError { #[error("Actor not found")] ActorNotFound, #[error("Channel closed")] ChannelClosed, #[error("Message handling failed: {0}")] MessageHandlingFailed(String), } pub trait Message { type Response: Send; } pub struct ActorMessage { payload: Box<dyn Message + Send>, response_sender: Option<oneshot::Sender<Box<dyn std::any::Any + Send>>>, } impl ActorMessage { pub fn send_response<T: std::any::Any + Send>(&self, response: T) -> Result<(), ActorError> { if let Some(sender) = &self.response_sender { let _ = sender.send(Box::new(response)); Ok(()) } else { Err(ActorError::MessageHandlingFailed("No response sender".to_string())) } } } pub trait Actor { fn handle_message(&self, message: ActorMessage) -> BoxFuture<Result<(), ActorError>>; } type BoxFuture<T> = std::pin::Pin<Box<dyn std::future::Future<Output = T> + Send>>; /// 消息路由器 struct MessageRouter { routes: Arc<std::sync::Mutex<std::collections::HashMap<String, String>>>, } impl MessageRouter { fn new() -> Self { MessageRouter { routes: Arc::new(std::sync::Mutex::new(std::collections::HashMap::new())), } } fn register_route(&self, pattern: String, actor_id: String) { let mut routes = self.routes.lock().unwrap(); routes.insert(pattern, actor_id); } } /// 高性能事件驱动系统 pub struct EventDrivenSystem { event_bus: Arc<async_mpsc::UnboundedSender<Event>>, subscribers: Arc<std::sync::Mutex<std::collections::HashMap<String, Vec<Arc<dyn EventHandler + Send + Sync>>>>, event_history: Arc<std::sync::Mutex<Vec<Event>>>, max_history: usize, } #[derive(Debug, Clone)] pub struct Event { pub id: String, pub event_type: String, pub payload: Box<dyn std::any::Any + Send + Sync>, pub timestamp: std::time::Instant, pub source: String, } impl Event { pub fn new<T: std::any::Any + Send + Sync>(event_type: String, payload: T, source: String) -> Self { Event { id: uuid::Uuid::new_v4().to_string(), event_type, payload: Box::new(payload), timestamp: std::time::Instant::now(), source, } } } pub trait EventHandler: Send + Sync { fn handle_event(&self, event: &Event) -> BoxFuture<Result<(), EventHandlerError>>; fn event_types(&self) -> Vec<String>; } #[derive(Debug, thiserror::Error)] pub enum EventHandlerError { #[error("Event handling failed: {0}")] HandlingFailed(String), #[error("Invalid event type")] InvalidEventType, } impl EventDrivenSystem { pub fn new(buffer_size: usize, max_history: usize) -> Self { let (tx, _) = async_mpsc::unbounded_channel(buffer_size); EventDrivenSystem { event_bus: Arc::new(tx), subscribers: Arc::new(std::sync::Mutex::new(std::collections::HashMap::new())), event_history: Arc::new(std::sync::Mutex::new(Vec::with_capacity(max_history))), max_history, } } pub fn subscribe<H: EventHandler + Send + Sync + 'static>(&self, handler: Arc<H>) { for event_type in handler.event_types() { let mut subscribers = self.subscribers.lock().unwrap(); subscribers .entry(event_type) .or_insert_with(Vec::new) .push(handler.clone()); } } pub async fn publish_event<T: std::any::Any + Send + Sync>(&self, event: Event) -> Result<(), mpsc::SendError<Event>> { // 存储到历史记录 { let mut history = self.event_history.lock().unwrap(); history.push(event.clone()); if history.len() > self.max_history { history.remove(0); } } // 分发给订阅者 let subscribers = self.subscribers.lock().unwrap(); if let Some(handlers) = subscribers.get(&event.event_type) { for handler in handlers { let event_clone = event.clone(); let handler = handler.clone(); let event_bus = Arc::clone(&self.event_bus); tokio::spawn(async move { if let Err(e) = handler.handle_event(&event_clone).await { warn!("Event handler failed: {}", e); } }); } } // 发送到事件总线 self.event_bus.send(event) } pub fn get_event_history(&self) -> Vec<Event> { self.event_history.lock().unwrap().clone() } } }
13.4 缓存策略
13.4.1 多层缓存架构
#![allow(unused)] fn main() { // 多层缓存系统 // File: multi-layer-cache/src/lib.rs use std::collections::HashMap; use std::sync::{Arc, RwLock, Mutex}; use std::time::{Duration, Instant}; use tokio::sync::RwLock as AsyncRwLock; use tracing::{info, warn, debug}; use serde::{Serialize, Deserialize}; /// 缓存条目 #[derive(Debug, Clone)] pub struct CacheEntry<T> { pub data: T, pub created_at: Instant, pub access_count: u64, pub last_accessed: Instant, pub ttl: Option<Duration>, } impl<T> CacheEntry<T> { pub fn new(data: T, ttl: Option<Duration>) -> Self { let now = Instant::now(); CacheEntry { data, created_at: now, access_count: 0, last_accessed: now, ttl, } } pub fn is_expired(&self) -> bool { if let Some(ttl) = self.ttl { self.created_at + ttl < Instant::now() } else { false } } pub fn access(&mut self) -> &T { self.access_count += 1; self.last_accessed = Instant::now(); &self.data } } /// L1 缓存 - 内存缓存(最热数据) pub struct L1Cache<K, V> { data: Arc<RwLock<HashMap<K, CacheEntry<V>>>>, max_size: usize, hit_count: std::sync::atomic::AtomicU64, miss_count: std::sync::atomic::AtomicU64, } impl<K, V> L1Cache<K, V> where K: Clone + std::hash::Hash + Eq + Send + Sync, V: Clone + Send + Sync, { pub fn new(max_size: usize) -> Self { L1Cache { data: Arc::new(RwLock::new(HashMap::new())), max_size, hit_count: std::sync::atomic::AtomicU64::new(0), miss_count: std::sync::atomic::AtomicU64::new(0), } } pub async fn get(&self, key: &K) -> Option<Arc<V>> { let data = self.data.read().unwrap(); if let Some(entry) = data.get(key) { if !entry.is_expired() { self.hit_count.fetch_add(1, std::sync::atomic::Ordering::Relaxed); let value = Arc::new(entry.access().clone()); drop(data); Some(value) } else { drop(data); self.remove(key).await; self.miss_count.fetch_add(1, std::sync::atomic::Ordering::Relaxed); None } } else { drop(data); self.miss_count.fetch_add(1, std::sync::atomic::Ordering::Relaxed); None } } pub async fn set(&self, key: K, value: V, ttl: Option<Duration>) { let mut data = self.data.write().unwrap(); // 如果缓存已满,删除最久未访问的条目 if data.len() >= self.max_size && !data.contains_key(&key) { let lru_key = self.find_lru_key(&data); if let Some(lru) = lru_key { data.remove(&lru); } } data.insert(key, CacheEntry::new(value, ttl)); } pub async fn remove(&self, key: &K) -> Option<V> { let mut data = self.data.write().unwrap(); data.remove(key).map(|entry| entry.data) } pub async fn clear(&self) { let mut data = self.data.write().unwrap(); data.clear(); } fn find_lru_key(&self, data: &HashMap<K, CacheEntry<V>>) -> Option<K> { data.iter() .min_by_key(|(_, entry)| entry.last_accessed) .map(|(key, _)| key.clone()) } pub fn get_stats(&self) -> CacheStats { let hit_count = self.hit_count.load(std::sync::atomic::Ordering::Relaxed); let miss_count = self.miss_count.load(std::sync::atomic::Ordering::Relaxed); let total = hit_count + miss_count; CacheStats { hit_count, miss_count, hit_rate: if total > 0 { hit_count as f64 / total as f64 } else { 0.0 }, size: self.data.read().unwrap().len(), max_size: self.max_size, } } } /// L2 缓存 - 分布式缓存(较热数据) pub struct L2Cache<K, V> { client: Arc<redis::Client>, key_prefix: String, default_ttl: Duration, hit_count: std::sync::atomic::AtomicU64, miss_count: std::sync::atomic::AtomicU64, } impl<K, V> L2Cache<K, V> where K: std::fmt::Display + Send + Sync, V: Serialize + for<'de> Deserialize<'de> + Send + Sync, { pub fn new(client: redis::Client, key_prefix: String, default_ttl: Duration) -> Self { L2Cache { client: Arc::new(client), key_prefix, default_ttl, hit_count: std::sync::atomic::AtomicU64::new(0), miss_count: std::sync::atomic::AtomicU64::new(0), } } pub async fn get(&self, key: &K) -> Option<V> { let redis_key = format!("{}:{}", self.key_prefix, key); match self.client.get_async_connection().await { Ok(mut conn) => { match redis::cmd("GET") .arg(&redis_key) .query_async::<_, Option<String>>(&mut conn) .await { Ok(Some(data)) => { self.hit_count.fetch_add(1, std::sync::atomic::Ordering::Relaxed); match serde_json::from_str(&data) { Ok(value) => Some(value), Err(_) => { warn!("Failed to deserialize cache data for key: {}", key); self.miss_count.fetch_add(1, std::sync::atomic::Ordering::Relaxed); None } } } Ok(None) => { self.miss_count.fetch_add(1, std::sync::atomic::Ordering::Relaxed); None } Err(e) => { warn!("Redis get error: {}", e); self.miss_count.fetch_add(1, std::sync::atomic::Ordering::Relaxed); None } } } Err(e) => { warn!("Failed to connect to Redis: {}", e); self.miss_count.fetch_add(1, std::sync::atomic::Ordering::Relaxed); None } } } pub async fn set(&self, key: K, value: V, ttl: Option<Duration>) { let redis_key = format!("{}:{}", self.key_prefix, key); let ttl_duration = ttl.unwrap_or(self.default_ttl); match self.client.get_async_connection().await { Ok(mut conn) => { if let Ok(data) = serde_json::to_string(&value) { let ttl_secs = ttl_duration.as_secs(); let _ = redis::cmd("SETEX") .arg(&redis_key) .arg(ttl_secs) .arg(&data) .query_async::<_, ()>(&mut conn) .await .map_err(|e| warn!("Redis set error: {}", e)); } } Err(e) => { warn!("Failed to connect to Redis: {}", e); } } } pub async fn remove(&self, key: &K) { let redis_key = format!("{}:{}", self.key_prefix, key); match self.client.get_async_connection().await { Ok(mut conn) => { let _ = redis::cmd("DEL") .arg(&redis_key) .query_async::<_, ()>(&mut conn) .await .map_err(|e| warn!("Redis delete error: {}", e)); } Err(e) => { warn!("Failed to connect to Redis: {}", e); } } } pub fn get_stats(&self) -> CacheStats { let hit_count = self.hit_count.load(std::sync::atomic::Ordering::Relaxed); let miss_count = self.miss_count.load(std::sync::atomic::Ordering::Relaxed); let total = hit_count + miss_count; CacheStats { hit_count, miss_count, hit_rate: if total > 0 { hit_count as f64 / total as f64 } else { 0.0 }, size: 0, // Redis中的大小需要额外查询 max_size: 0, } } } /// L3 缓存 - 数据源缓存(冷数据) pub struct L3Cache<K, V, F> { data_fetcher: F, cache: Arc<AsyncRwLock<HashMap<K, CacheEntry<V>>>>, ttl: Duration, } impl<K, V, F> L3Cache<K, V, F> where K: Clone + std::hash::Hash + Eq + Send + Sync, V: Clone + Send + Sync, F: Fn(K) -> BoxFuture<'static, V> + Send + Sync, { pub fn new(data_fetcher: F, ttl: Duration) -> Self { L3Cache { data_fetcher, cache: Arc::new(AsyncRwLock::new(HashMap::new())), ttl, } } pub async fn get(&self, key: K) -> V { let mut cache = self.cache.read().await; if let Some(entry) = cache.get(&key) { if !entry.is_expired() { return entry.access().clone(); } } drop(cache); // 缓存未命中,从数据源获取 debug!("Cache miss, fetching from data source for key: {:?}", key); let value = (self.data_fetcher)(key.clone()).await; // 更新缓存 let mut cache = self.cache.write().await; cache.insert(key, CacheEntry::new(value.clone(), Some(self.ttl))); value } pub async fn invalidate(&self, key: &K) { let mut cache = self.cache.write().await; cache.remove(key); } pub async fn clear(&self) { let mut cache = self.cache.write().await; cache.clear(); } } /// 多层缓存系统 pub struct MultiLayerCache<K, V> { l1: Option<Arc<L1Cache<K, V>>>, l2: Option<Arc<L2Cache<K, V>>>, l3: Option<Arc<L3Cache<K, V, Box<dyn Fn(K) -> BoxFuture<'static, V> + Send + Sync>>>>, fallback_strategy: FallbackStrategy, } #[derive(Debug, Clone)] pub enum FallbackStrategy { L1Only, L1L2, AllLayers, } impl<K, V> MultiLayerCache<K, V> where K: Clone + std::hash::Hash + Eq + std::fmt::Display + Send + Sync, V: Clone + Serialize + for<'de> Deserialize<'de> + Send + Sync, { pub fn new( l1_size: Option<usize>, redis_client: Option<redis::Client>, data_fetcher: Option<Box<dyn Fn(K) -> BoxFuture<'static, V> + Send + Sync>>, fallback_strategy: FallbackStrategy, ) -> Self { let l1 = l1_size.map(|size| Arc::new(L1Cache::new(size))); let l2 = redis_client.map(|client| Arc::new(L2Cache::new(client, "cache".to_string(), Duration::from_secs(3600)))); let l3 = data_fetcher.map(|fetcher| Arc::new(L3Cache::new(fetcher, Duration::from_secs(7200)))); MultiLayerCache { l1, l2, l3, fallback_strategy, } } pub async fn get(&self, key: K) -> Option<V> { match self.fallback_strategy { FallbackStrategy::L1Only => { if let Some(l1) = &self.l1 { l1.get(&key).await.map(|value| (*value).clone()) } else { None } } FallbackStrategy::L1L2 => { // 尝试L1 if let Some(l1) = &self.l1 { if let Some(value) = l1.get(&key).await { return Some((*value).clone()); } } // L1未命中,尝试L2 if let Some(l2) = &self.l2 { if let Some(value) = l2.get(&key).await { // 升级到L1 if let Some(l1) = &self.l1 { l1.set(key.clone(), value.clone(), Some(Duration::from_secs(300))).await; } return Some(value); } } None } FallbackStrategy::AllLayers => { // 依次尝试各层 if let Some(l1) = &self.l1 { if let Some(value) = l1.get(&key).await { return Some((*value).clone()); } } if let Some(l2) = &self.l2 { if let Some(value) = l2.get(&key).await { // 升级到L1 if let Some(l1) = &self.l1 { l1.set(key.clone(), value.clone(), Some(Duration::from_secs(300))).await; } return Some(value); } } // 最后一层:L3(数据源) if let Some(l3) = &self.l3 { let value = l3.get(key).await; // 向各层写入 if let Some(l1) = &self.l1 { l1.set(key.clone(), value.clone(), Some(Duration::from_secs(300))).await; } if let Some(l2) = &self.l2 { l2.set(key.clone(), value.clone(), Some(Duration::from_secs(3600))).await; } return Some(value); } None } } } pub async fn set(&self, key: K, value: V, ttl: Option<Duration>) { // 写入所有缓存层 if let Some(l1) = &self.l1 { l1.set(key.clone(), value.clone(), ttl).await; } if let Some(l2) = &self.l2 { l2.set(key.clone(), value.clone(), ttl).await; } } pub async fn remove(&self, key: &K) { if let Some(l1) = &self.l1 { l1.remove(key).await; } if let Some(l2) = &self.l2 { l2.remove(key).await; } if let Some(l3) = &self.l3 { l3.invalidate(key).await; } } pub async fn get_stats(&self) -> MultiLayerCacheStats { MultiLayerCacheStats { l1_stats: self.l1.as_ref().map(|l1| l1.get_stats()), l2_stats: self.l2.as_ref().map(|l2| l2.get_stats()), } } } #[derive(Debug, Clone)] pub struct CacheStats { pub hit_count: u64, pub miss_count: u64, pub hit_rate: f64, pub size: usize, pub max_size: usize, } #[derive(Debug, Clone)] pub struct MultiLayerCacheStats { pub l1_stats: Option<CacheStats>, pub l2_stats: Option<CacheStats>, } }
13.4.2 缓存更新策略
#![allow(unused)] fn main() { // 智能缓存更新策略 // File: cache-strategies/src/lib.rs use std::time::{Duration, Instant}; use std::collections::HashMap; use tracing::{info, warn, debug}; /// 缓存更新策略 #[derive(Debug, Clone)] pub enum UpdateStrategy { /// 写通模式:同时写入缓存和数据源 WriteThrough, /// 写回模式:先写入缓存,异步写入数据源 WriteBack, /// 写绕模式:只写入数据源,清除缓存 WriteAround, /// 延迟写入模式:缓存命中时更新缓存 LazyWrite, /// TTL模式:基于时间过期 TtlBased, /// LRU模式:基于访问频率 LruBased, } pub struct CacheUpdateManager { strategies: HashMap<String, UpdateStrategy>, last_update_times: HashMap<String, Instant>, write_queue: Arc<crossbeam::queue::SegQueue<WriteOperation>>, background_writer: Option<tokio::task::JoinHandle<()>>, } #[derive(Debug, Clone)] pub struct WriteOperation { pub key: String, pub value: String, pub strategy: UpdateStrategy, pub timestamp: Instant, } impl CacheUpdateManager { pub fn new() -> Self { let manager = CacheUpdateManager { strategies: HashMap::new(), last_update_times: HashMap::new(), write_queue: Arc::new(crossbeam::queue::SegQueue::new()), background_writer: None, }; manager.start_background_writer(); manager } pub fn register_strategy(&mut self, cache_key: &str, strategy: UpdateStrategy) { self.strategies.insert(cache_key.to_string(), strategy); info!("Registered update strategy for cache key '{}': {:?}", cache_key, strategy); } pub async fn update_cache(&self, key: &str, value: &str) -> Result<(), CacheError> { let strategy = self.strategies.get(key) .unwrap_or(&UpdateStrategy::WriteThrough); match strategy { UpdateStrategy::WriteThrough => { self.write_through(key, value).await } UpdateStrategy::WriteBack => { self.write_back(key, value) } UpdateStrategy::WriteAround => { self.write_around(key, value).await } UpdateStrategy::LazyWrite => { // 标记为需要更新,但不立即写入 Ok(()) } UpdateStrategy::TtlBased => { self.ttl_based_update(key, value) } UpdateStrategy::LruBased => { self.lru_based_update(key, value) } } } async fn write_through(&self, key: &str, value: &str) -> Result<(), CacheError> { info!("Write-through: updating both cache and data source for key: {}", key); // 同步更新缓存和数据源 let cache_update = self.update_cache_layer(key, value); let data_update = self.update_data_source(key, value); futures::future::join(cache_update, data_update).await; Ok(()) } fn write_back(&self, key: &str, value: &str) -> Result<(), CacheError> { info!("Write-back: queuing update for background processing: {}", key); // 写入队列,异步处理 self.write_queue.push(WriteOperation { key: key.to_string(), value: value.to_string(), strategy: UpdateStrategy::WriteBack, timestamp: Instant::now(), }); Ok(()) } async fn write_around(&self, key: &str, value: &str) -> Result<(), CacheError> { info!("Write-around: updating data source and invalidating cache: {}", key); // 更新数据源 self.update_data_source(key, value).await; // 清除缓存 self.invalidate_cache(key).await; Ok(()) } fn ttl_based_update(&self, key: &str, value: &str) -> Result<(), CacheError> { info!("TTL-based update for key: {}", key); // 更新最后访问时间 self.last_update_times.insert(key.to_string(), Instant::now()); // 更新缓存 self.update_cache_layer(key, value)?; Ok(()) } fn lru_based_update(&self, key: &str, value: &str) -> Result<(), CacheError> { info!("LRU-based update for key: {}", key); // 将key移到最近使用 self.last_update_times.insert(key.to_string(), Instant::now()); // 更新缓存 self.update_cache_layer(key, value)?; Ok(()) } async fn update_cache_layer(&self, key: &str, value: &str) -> Result<(), CacheError> { // 这里应该是实际的缓存更新逻辑 debug!("Updating cache layer for key: {}", key); Ok(()) } async fn update_data_source(&self, key: &str, value: &str) { // 这里应该是实际的数据源更新逻辑 debug!("Updating data source for key: {}", key); tokio::time::sleep(Duration::from_millis(10)).await; // 模拟写入延迟 } async fn invalidate_cache(&self, key: &str) { debug!("Invalidating cache for key: {}", key); // 缓存失效逻辑 } fn start_background_writer(&mut self) { let write_queue = Arc::clone(&self.write_queue); self.background_writer = Some(tokio::spawn(async move { loop { if let Some(operation) = write_queue.pop() { debug!("Processing background write operation: {}", operation.key); // 模拟异步写入到数据源 tokio::time::sleep(Duration::from_millis(50)).await; info!("Background write completed for key: {}", operation.key); } else { tokio::time::sleep(Duration::from_millis(100)).await; } } })); } pub fn stop(&mut self) { if let Some(handle) = self.background_writer.take() { handle.abort(); } } } #[derive(Debug, thiserror::Error)] pub enum CacheError { #[error("Cache update failed: {0}")] UpdateFailed(String), #[error("Cache key not found")] KeyNotFound, #[error("Invalid strategy")] InvalidStrategy, } /// 缓存预热管理器 pub struct CacheWarmer { warmup_tasks: HashMap<String, WarmupTask>, parallel_tasks: usize, } #[derive(Debug, Clone)] pub struct WarmupTask { pub key: String, pub fetcher: Box<dyn Fn() -> BoxFuture<'static, Option<String>> + Send + Sync>, pub priority: WarmupPriority, } #[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)] pub enum WarmupPriority { Critical, High, Normal, Low, } impl CacheWarmer { pub fn new(parallel_tasks: usize) -> Self { CacheWarmer { warmup_tasks: HashMap::new(), parallel_tasks, } } pub fn register_task(&mut self, task: WarmupTask) { self.warmup_tasks.insert(task.key.clone(), task); info!("Registered warmup task for key: {}", task.key); } pub async fn warm_up(&self) -> WarmupResult { info!("Starting cache warm-up with {} tasks", self.warmup_tasks.len()); // 按优先级排序 let mut tasks: Vec<_> = self.warmup_tasks.values().cloned().collect(); tasks.sort_by(|a, b| b.priority.cmp(&a.priority)); let mut results = Vec::new(); let semaphore = Arc::new(tokio::sync::Semaphore::new(self.parallel_tasks)); // 并行执行预热任务 for task in tasks { let permit = semaphore.clone().acquire_owned().await.unwrap(); let task_key = task.key.clone(); let result = tokio::spawn(async move { let _permit = permit; let start_time = Instant::now(); let value = (task.fetcher)().await; let duration = start_time.elapsed(); WarmupResultItem { key: task_key, success: value.is_some(), duration, value, } }); results.push(result); } // 等待所有任务完成 let mut warmup_results = Vec::new(); for handle in results { if let Ok(result) = handle.await { warmup_results.push(result); } } WarmupResult { total_tasks: warmup_results.len(), successful_tasks: warmup_results.iter().filter(|r| r.success).count(), failed_tasks: warmup_results.iter().filter(|r| !r.success).count(), total_duration: warmup_results.iter().map(|r| r.duration).max().unwrap_or_default(), results: warmup_results, } } } #[derive(Debug)] pub struct WarmupResult { pub total_tasks: usize, pub successful_tasks: usize, pub failed_tasks: usize, pub total_duration: Duration, pub results: Vec<WarmupResultItem>, } #[derive(Debug, Clone)] pub struct WarmupResultItem { pub key: String, pub success: bool, pub duration: Duration, pub value: Option<String>, } impl WarmupResult { pub fn success_rate(&self) -> f64 { if self.total_tasks > 0 { self.successful_tasks as f64 / self.total_tasks as f64 * 100.0 } else { 0.0 } } pub fn print_summary(&self) { info!("=== Cache Warm-up Summary ==="); info!("Total tasks: {}", self.total_tasks); info!("Successful: {}", self.successful_tasks); info!("Failed: {}", self.failed_tasks); info!("Success rate: {:.1}%", self.success_rate()); info!("Total duration: {:?}", self.total_duration); if self.failed_tasks > 0 { warn!("Failed warm-up tasks:"); for result in &self.results { if !result.success { warn!(" - {}: {:?}", result.key, result.duration); } } } } } }
13.5 高性能缓存服务项目
现在我们来构建一个企业级高性能缓存服务,集成所有学到的性能优化技术。
#![allow(unused)] fn main() { // 高性能缓存服务主项目 // File: cache-service/Cargo.toml [package] name = "cache-service" version = "1.0.0" edition = "2021" [dependencies] tokio = { version = "1.0", features = ["full"] } axum = { version = "0.7", features = ["macros"] } tower = { version = "0.4" } tower-http = { version = "0.5", features = ["cors", "compression", "trace", "timeout"] } serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" redis = { version = "0.24", features = ["tokio-comp", "connection-manager"] } clap = { version = "4.0", features = ["derive"] } tracing = "0.1" tracing-subscriber = "0.3" anyhow = "1.0" thiserror = "1.0" uuid = { version = "1.0", features = ["v4", "serde"] } chrono = { version = "0.4", features = ["serde"] } criterion = "0.5" once_cell = "1.0" futures = "0.3" crossbeam = "0.8" regex = "1.0" }
// 高性能缓存服务 // File: cache-service/src/main.rs use clap::{Parser, Subcommand}; use tracing::{info, warn, error, Level}; use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; mod cache; mod server; mod config; use cache::CacheService; use server::CacheServer; use config::Config; #[derive(Parser, Debug)] #[command(name = "cache-service")] #[command(about = "High-performance cache service")] struct Cli { #[command(subcommand)] command: Commands, } #[derive(Subcommand, Debug)] enum Commands { /// Start the cache service Server { #[arg(short, long, default_value = "0.0.0.0:8080")] addr: String, #[arg(short, long, default_value = "redis://localhost:6379")] redis_url: String, #[arg(short, long, default_value = "1000")] l1_cache_size: usize, #[arg(short, long, default_value = "100")] parallel_tasks: usize, }, /// Run performance benchmarks Benchmark, /// Test cache service Test, } #[tokio::main] async fn main() -> Result<(), Box<dyn std::error::Error>> { // 初始化日志 tracing_subscriber::registry() .with( tracing_subscriber::EnvFilter::try_from_default_env() .unwrap_or_else(|_| "cache_service=debug,tokio=warn,sqlx=warn".into()), ) .with(tracing_subscriber::fmt::layer()) .init(); let cli = Cli::parse(); match cli.command { Commands::Server { addr, redis_url, l1_cache_size, parallel_tasks } => { run_server(addr, redis_url, l1_cache_size, parallel_tasks).await } Commands::Benchmark => { run_benchmarks().await } Commands::Test => { run_tests().await } } } async fn run_server( addr: String, redis_url: String, l1_cache_size: usize, parallel_tasks: usize, ) -> Result<(), Box<dyn std::error::Error>> { info!("Starting high-performance cache service on {}", addr); // 初始化配置 let config = Config { addr, redis_url, l1_cache_size, parallel_tasks, default_ttl: std::time::Duration::from_secs(3600), max_ttl: std::time::Duration::from_secs(86400), warmup_enabled: true, metrics_enabled: true, }; // 初始化Redis连接 let redis_client = redis::Client::open(&config.redis_url)?; // 初始化缓存服务 let cache_service = CacheService::new(redis_client, config.clone()).await?; // 启动服务器 let server = CacheServer::new(config, cache_service); server.run().await?; Ok(()) } async fn run_benchmarks() -> Result<(), Box<dyn std::error::Error>> { info!("Running performance benchmarks"); // 缓存性能基准测试 run_cache_benchmarks().await?; // 并发性能基准测试 run_concurrency_benchmarks().await?; // 内存使用基准测试 run_memory_benchmarks().await?; info!("All benchmarks completed"); Ok(()) } async fn run_cache_benchmarks() -> Result<(), Box<dyn std::error::Error>> { use criterion::{black_box, criterion_group, criterion_main, Criterion}; // 这里集成criterion进行缓存性能测试 // ... 基准测试实现 Ok(()) } async fn run_concurrency_benchmarks() -> Result<(), Box<dyn std::error::Error>> { info!("Running concurrency benchmarks"); // 并发性能测试 let cache_service = create_test_cache_service().await?; // 测试高并发写入 let write_start = std::time::Instant::now(); let mut handles = Vec::new(); for i in 0..1000 { let service = cache_service.clone(); let handle = tokio::spawn(async move { let key = format!("test_key_{}", i); let value = format!("test_value_{}", i); service.set(&key, &value, Some(std::time::Duration::from_secs(300))).await?; service.get::<String>(&key).await }); handles.push(handle); } for handle in handles { let _ = handle.await?; } let write_duration = write_start.elapsed(); info!("Concurrent write test completed in {:?}", write_duration); Ok(()) } async fn run_memory_benchmarks() -> Result<(), Box<dyn std::error::Error>> { info!("Running memory benchmarks"); // 内存使用测试 use perf_tools::MemoryProfiler; let mut profiler = MemoryProfiler::new(); // 创建大量缓存条目 let cache_service = create_test_cache_service().await?; for i in 0..10000 { let key = format!("memory_test_{}", i); let value = "x".repeat(1000); // 1KB数据 cache_service.set(&key, &value, Some(std::time::Duration::from_secs(60))).await?; profiler.update_peak(); } info!("Memory benchmark completed"); Ok(()) } async fn run_tests() -> Result<(), Box<dyn std::error::Error>> { info!("Running cache service tests"); let cache_service = create_test_cache_service().await?; // 基础功能测试 test_basic_operations(&cache_service).await?; // TTL测试 test_ttl_expiration(&cache_service).await?; // 并发测试 test_concurrent_operations(&cache_service).await?; info!("All tests passed"); Ok(()) } async fn create_test_cache_service() -> Result<Arc<CacheService>, Box<dyn std::error::Error>> { let redis_client = redis::Client::open("redis://localhost:6379")?; let config = Config { addr: "127.0.0.1:0".to_string(), redis_url: "redis://localhost:6379".to_string(), l1_cache_size: 1000, parallel_tasks: 100, default_ttl: std::time::Duration::from_secs(3600), max_ttl: std::time::Duration::from_secs(86400), warmup_enabled: false, metrics_enabled: true, }; let cache_service = CacheService::new(redis_client, config).await?; Ok(Arc::new(cache_service)) } async fn test_basic_operations(cache_service: &CacheService) -> Result<(), Box<dyn std::error::Error>> { // 测试设置和获取 cache_service.set("test_key", "test_value", None).await?; let value: Option<String> = cache_service.get("test_key").await?; assert_eq!(value, Some("test_value".to_string())); info!("✓ Basic set/get operations working"); // 测试删除 cache_service.delete("test_key").await?; let value: Option<String> = cache_service.get("test_key").await?; assert_eq!(value, None); info!("✓ Delete operation working"); Ok(()) } async fn test_ttl_expiration(cache_service: &CacheService) -> Result<(), Box<dyn std::error::Error>> { // 测试短TTL cache_service.set("ttl_test", "ttl_value", Some(std::time::Duration::from_millis(100))).await?; // 立即检查应该存在 let value: Option<String> = cache_service.get("ttl_test").await?; assert_eq!(value, Some("ttl_value".to_string())); // 等待过期 tokio::time::sleep(std::time::Duration::from_millis(150)).await; // 检查应该已过期 let value: Option<String> = cache_service.get("ttl_test").await?; assert_eq!(value, None); info!("✓ TTL expiration working"); Ok(()) } async fn test_concurrent_operations(cache_service: &CacheService) -> Result<(), Box<dyn std::error::Error>> { // 测试并发读取 let mut handles = Vec::new(); for i in 0..100 { let service = cache_service.clone(); let handle = tokio::spawn(async move { let value: Option<String> = service.get("concurrent_test").await?; Ok(value) }); handles.push(handle); } // 检查所有并发操作都返回相同结果 for handle in handles { let result = handle.await??; // 并发读操作应该都返回None(key不存在) assert_eq!(result, None); } info!("✓ Concurrent operations working"); Ok(()) }
#![allow(unused)] fn main() { // 缓存服务核心实现 // File: cache-service/src/cache/mod.rs use std::sync::Arc; use std::time::{Duration, Instant}; use tokio::sync::RwLock; use serde::{Serialize, Deserialize}; use tracing::{info, warn, debug, instrument}; use once_cell::sync::Lazy; use crossbeam::queue::SegQueue; pub mod l1_cache; pub mod l2_cache; pub mod strategies; use l1_cache::L1MemoryCache; use l2_cache::L2RedisCache; use strategies::{CacheStrategy, UpdateStrategy}; /// 缓存服务配置 #[derive(Debug, Clone)] pub struct CacheConfig { pub default_ttl: Duration, pub max_ttl: Duration, pub l1_cache_size: usize, pub parallel_tasks: usize, pub warmup_enabled: bool, pub metrics_enabled: bool, } /// 缓存统计信息 #[derive(Debug, Clone, Default)] pub struct CacheStats { pub hits: u64, pub misses: u64, pub sets: u64, pub deletes: u64, pub l1_hits: u64, pub l2_hits: u64, pub l3_hits: u64, pub avg_response_time: Duration, pub total_operations: u64, } impl CacheStats { pub fn hit_rate(&self) -> f64 { if self.hits + self.misses > 0 { self.hits as f64 / (self.hits + self.misses) as f64 } else { 0.0 } } pub fn add_operation(&mut self, duration: Duration) { self.total_operations += 1; self.avg_response_time = Duration::from_nanos( (self.avg_response_time.as_nanos() as u64 * (self.total_operations - 1) + duration.as_nanos() as u64) / self.total_operations ); } } /// 高性能缓存服务 pub struct CacheService { l1_cache: Arc<L1MemoryCache>, l2_cache: Arc<L2RedisCache>, config: CacheConfig, stats: Arc<RwLock<CacheStats>>, strategy: Arc<dyn CacheStrategy + Send + Sync>, write_queue: Arc<SegQueue<WriteOperation>>, } #[derive(Debug, Clone)] pub struct WriteOperation { pub key: String, pub value: String, pub ttl: Option<Duration>, pub timestamp: Instant, } impl CacheService { pub async fn new(redis_client: redis::Client, config: crate::config::Config) -> Result<Self, Box<dyn std::error::Error>> { let cache_config = CacheConfig { default_ttl: config.default_ttl, max_ttl: config.max_ttl, l1_cache_size: config.l1_cache_size, parallel_tasks: config.parallel_tasks, warmup_enabled: config.warmup_enabled, metrics_enabled: config.metrics_enabled, }; let l1_cache = Arc::new(L1MemoryCache::new(cache_config.l1_cache_size)); let l2_cache = Arc::new(L2RedisCache::new(redis_client, "cache".to_string(), cache_config.default_ttl)); let strategy = Arc::new(UpdateStrategy::new(cache_config.clone())); let write_queue = Arc::new(SegQueue::new()); // 启动后台写线程 if cache_config.warmup_enabled { Self::start_background_writer(write_queue.clone(), strategy.clone()); } info!("Cache service initialized with L1 size: {}, L2: Redis", cache_config.l1_cache_size); Ok(CacheService { l1_cache, l2_cache, config: cache_config, stats: Arc::new(RwLock::new(CacheStats::default())), strategy, write_queue, }) } #[instrument(skip(self))] pub async fn get<T>(&self, key: &str) -> Result<Option<T>, Box<dyn std::error::Error>> where T: for<'de> Deserialize<'de> + Send + Sync, { let start_time = Instant::now(); // 尝试L1缓存 if let Some(value) = self.l1_cache.get(key).await { let mut stats = self.stats.write().await; stats.l1_hits += 1; stats.hits += 1; stats.add_operation(start_time.elapsed()); return Ok(Some(value)); } // L1未命中,尝试L2缓存 if let Some(value) = self.l2_cache.get(key).await { // 升级到L1 self.l1_cache.set(key, &value, None).await; let mut stats = self.stats.write().await; stats.l2_hits += 1; stats.hits += 1; stats.add_operation(start_time.elapsed()); // 反序列化 return Ok(Some(serde_json::from_str(&value)?)); } // 缓存未命中 let mut stats = self.stats.write().await; stats.misses += 1; stats.add_operation(start_time.elapsed()); Ok(None) } #[instrument(skip(self))] pub async fn set<T>(&self, key: &str, value: &T, ttl: Option<Duration>) -> Result<(), Box<dyn std::error::Error>> where T: Serialize + Send + Sync, { let start_time = Instant::now(); // 序列化值 let serialized_value = serde_json::to_string(value)?; // 使用策略决定更新方式 self.strategy.update_cache(key, &serialized_value, ttl, &self.write_queue).await?; // 更新L1缓存 self.l1_cache.set(key, &serialized_value, ttl).await; // 更新L2缓存 self.l2_cache.set(key, &serialized_value, ttl).await; let mut stats = self.stats.write().await; stats.sets += 1; stats.add_operation(start_time.elapsed()); Ok(()) } #[instrument(skip(self))] pub async fn delete(&self, key: &str) -> Result<(), Box<dyn std::error::Error>> { let start_time = Instant::now(); // 从所有缓存层删除 self.l1_cache.delete(key).await; self.l2_cache.delete(key).await; let mut stats = self.stats.write().await; stats.deletes += 1; stats.add_operation(start_time.elapsed()); Ok(()) } #[instrument(skip(self))] pub async fn exists(&self, key: &str) -> Result<bool, Box<dyn std::error::Error>> { // 先检查L1 if self.l1_cache.exists(key).await { return Ok(true); } // 检查L2 if self.l2_cache.exists(key).await { return Ok(true); } Ok(false) } pub async fn get_stats(&self) -> CacheStats { self.stats.read().await.clone() } pub async fn clear(&self) -> Result<(), Box<dyn std::error::Error>> { self.l1_cache.clear().await; self.l2_cache.clear().await; info!("Cache cleared"); Ok(()) } fn start_background_writer( write_queue: Arc<SegQueue<WriteOperation>>, strategy: Arc<dyn CacheStrategy + Send + Sync>, ) { tokio::spawn(async move { loop { if let Some(operation) = write_queue.pop() { debug!("Processing background write: {}", operation.key); // 模拟异步写入 tokio::time::sleep(Duration::from_millis(10)).await; // 这里可以添加更复杂的写入逻辑 info!("Background write completed: {}", operation.key); } else { tokio::time::sleep(Duration::from_millis(100)).await; } } }); } pub async fn warm_up(&self) -> Result<(), Box<dyn std::error::Error>> { info!("Starting cache warm-up"); // 这里可以预加载热点数据 let warmup_tasks = vec![ "user_profile_123".to_string(), "config_settings".to_string(), "popular_articles".to_string(), ]; for key in warmup_tasks { // 模拟从数据源获取数据 let value = format!("warmup_value_{}", key); self.set(&key, &value, Some(Duration::from_secs(3600))).await?; } info!("Cache warm-up completed"); Ok(()) } } }
#![allow(unused)] fn main() { // L1内存缓存实现 // File: cache-service/src/cache/l1_cache.rs use std::collections::HashMap; use std::sync::{Arc, RwLock}; use std::time::{Duration, Instant}; use tokio::sync::RwLock as AsyncRwLock; use tracing::{debug, instrument}; /// L1内存缓存条目 #[derive(Debug, Clone)] struct L1CacheEntry { value: String, created_at: Instant, last_accessed: Instant, access_count: u64, ttl: Option<Duration>, } impl L1CacheEntry { fn new(value: String, ttl: Option<Duration>) -> Self { let now = Instant::now(); L1CacheEntry { value, created_at: now, last_accessed: now, access_count: 0, ttl, } } fn is_expired(&self) -> bool { if let Some(ttl) = self.ttl { self.created_at + ttl < Instant::now() } else { false } } fn access(&mut self) -> &str { self.access_count += 1; self.last_accessed = Instant::now(); &self.value } } /// L1内存缓存实现 pub struct L1MemoryCache { data: Arc<AsyncRwLock<HashMap<String, L1CacheEntry>>>, max_size: usize, hit_count: std::sync::atomic::AtomicU64, miss_count: std::sync::atomic::AtomicU64, } impl L1MemoryCache { pub fn new(max_size: usize) -> Self { L1MemoryCache { data: Arc::new(AsyncRwLock::new(HashMap::new())), max_size, hit_count: std::sync::atomic::AtomicU64::new(0), miss_count: std::sync::atomic::AtomicU64::new(0), } } #[instrument(skip(self))] pub async fn get(&self, key: &str) -> Option<String> { let mut data = self.data.write().await; if let Some(entry) = data.get_mut(key) { if !entry.is_expired() { let value = entry.access().to_string(); self.hit_count.fetch_add(1, std::sync::atomic::Ordering::Relaxed); debug!("L1 cache hit for key: {}", key); Some(value) } else { // 过期,删除 data.remove(key); self.miss_count.fetch_add(1, std::sync::atomic::Ordering::Relaxed); None } } else { self.miss_count.fetch_add(1, std::sync::atomic::Ordering::Relaxed); debug!("L1 cache miss for key: {}", key); None } } #[instrument(skip(self))] pub async fn set(&self, key: &str, value: &str, ttl: Option<Duration>) { let mut data = self.data.write().await; // 如果缓存已满,删除最久未访问的条目 if data.len() >= self.max_size && !data.contains_key(key) { self.evict_lru(&mut data); } data.insert(key.to_string(), L1CacheEntry::new(value.to_string(), ttl)); debug!("L1 cache set for key: {}", key); } #[instrument(skip(self))] pub async fn delete(&self, key: &str) { let mut data = self.data.write().await; data.remove(key); debug!("L1 cache delete for key: {}", key); } #[instrument(skip(self))] pub async fn exists(&self, key: &str) -> bool { let data = self.data.read().await; if let Some(entry) = data.get(key) { !entry.is_expired() } else { false } } #[instrument(skip(self))] pub async fn clear(&self) { let mut data = self.data.write().await; data.clear(); debug!("L1 cache cleared"); } fn evict_lru(&self, data: &mut HashMap<String, L1CacheEntry>) { if let Some((lru_key, _)) = data.iter() .min_by_key(|(_, entry)| entry.last_accessed) { data.remove(lru_key); debug!("Evicted LRU key from L1 cache: {}", lru_key); } } pub fn get_stats(&self) -> (u64, u64, usize, usize) { let hit_count = self.hit_count.load(std::sync::atomic::Ordering::Relaxed); let miss_count = self.miss_count.load(std::sync::atomic::Ordering::Relaxed); // 获取当前缓存大小 let current_size = { let data = self.data.try_read(); match data { Ok(data) => data.len(), Err(_) => 0, // 锁被占用时返回估计值 } }; (hit_count, miss_count, current_size, self.max_size) } } }
#![allow(unused)] fn main() { // 缓存策略实现 // File: cache-service/src/cache/strategies.rs use std::time::{Duration, Instant}; use std::sync::Arc; use crossbeam::queue::SegQueue; use tracing::{info, debug}; use async_trait::async_trait; use super::{CacheConfig, WriteOperation}; /// 缓存更新策略 #[derive(Debug, Clone)] pub enum UpdateStrategy { WriteThrough, WriteBack, WriteAround, WriteCoalescing, } impl UpdateStrategy { pub fn new(_config: CacheConfig) -> Self { // 实际项目中可以根据配置选择策略 UpdateStrategy::WriteThrough } } #[async_trait] pub trait CacheStrategy: Send + Sync { async fn update_cache( &self, key: &str, value: &str, ttl: Option<Duration>, write_queue: &Arc<SegQueue<WriteOperation>>, ) -> Result<(), Box<dyn std::error::Error>>; } #[async_trait] impl CacheStrategy for UpdateStrategy { async fn update_cache( &self, key: &str, value: &str, ttl: Option<Duration>, write_queue: &Arc<SegQueue<WriteOperation>>, ) -> Result<(), Box<dyn std::error::Error>> { match self { UpdateStrategy::WriteThrough => { // 同步写入所有层 info!("Write-through strategy for key: {}", key); Ok(()) } UpdateStrategy::WriteBack => { // 写入队列,异步处理 write_queue.push(WriteOperation { key: key.to_string(), value: value.to_string(), ttl, timestamp: Instant::now(), }); debug!("Queued write-back for key: {}", key); Ok(()) } UpdateStrategy::WriteAround => { // 只写入L2,清除L1 info!("Write-around strategy for key: {}", key); Ok(()) } UpdateStrategy::WriteCoalescing => { // 写入合并 write_queue.push(WriteOperation { key: key.to_string(), value: value.to_string(), ttl, timestamp: Instant::now(), }); Ok(()) } } } } }
#![allow(unused)] fn main() { // Web服务器实现 // File: cache-service/src/server.rs use axum::{ extract::{Path, State, Query}, response::{Json, IntoResponse}, routing::{get, post, delete, put}, Router, }; use serde::{Deserialize, Serialize}; use std::sync::Arc; use std::time::Instant; use tracing::{info, warn, error}; use super::cache::CacheService; use super::config::Config; #[derive(Debug, Serialize, Deserialize)] pub struct CacheRequest { pub key: String, pub value: Option<String>, pub ttl: Option<u64>, } #[derive(Debug, Serialize, Deserialize)] pub struct CacheResponse<T> { pub success: bool, pub data: Option<T>, pub message: Option<String>, pub timestamp: Option<String>, pub execution_time_ms: Option<f64>, } impl<T> CacheResponse<T> { pub fn new(success: bool, data: Option<T>, message: Option<String>) -> Self { CacheResponse { success, data, message, timestamp: Some(chrono::Utc::now().to_rfc3339()), execution_time_ms: None, } } pub fn with_execution_time(mut self, start_time: Instant) -> Self { self.execution_time_ms = Some(start_time.elapsed().as_secs_f64() * 1000.0); self } } #[derive(Debug, Serialize, Deserialize)] pub struct BulkCacheRequest { pub operations: Vec<CacheOperation>, } #[derive(Debug, Serialize, Deserialize)] pub struct CacheOperation { pub operation: String, // "get", "set", "delete" pub key: String, pub value: Option<String>, pub ttl: Option<u64>, } #[derive(Debug, Serialize, Deserialize)] pub struct CacheStatsResponse { pub hits: u64, pub misses: u64, pub sets: u64, pub deletes: u64, pub l1_hits: u64, pub l2_hits: u64, pub hit_rate: f64, pub avg_response_time_ms: f64, pub total_operations: u64, } pub struct ServerState { pub cache_service: Arc<CacheService>, pub config: Config, } pub struct CacheServer { app: Router, addr: String, } impl CacheServer { pub fn new(config: Config, cache_service: CacheService) -> Self { let state = Arc::new(ServerState { cache_service: Arc::new(cache_service), config: config.clone(), }); let app = Router::new() // 健康检查 .route("/health", get(health_check)) // 基础缓存操作 .route("/cache/:key", get(get_cache).put(set_cache).delete(delete_cache)) .route("/cache/:key/exists", get(cache_exists)) // 批量操作 .route("/cache/bulk", post(bulk_cache_operations)) // 统计信息 .route("/stats", get(get_cache_stats)) // 缓存管理 .route("/cache/clear", post(clear_cache)) .route("/cache/warmup", post(warmup_cache)) .with_state(state); CacheServer { app, addr: config.addr, } } pub async fn run(self) -> Result<(), Box<dyn std::error::Error>> { info!("Cache server listening on {}", self.addr); let listener = tokio::net::TcpListener::bind(&self.addr).await?; axum::serve(listener, self.app).await?; Ok(()) } } // 处理器实现 async fn health_check(State(state): State<Arc<ServerState>>) -> impl IntoResponse { Json(CacheResponse::new(true, Some("healthy".to_string()), None)) } async fn get_cache( State(state): State<Arc<ServerState>>, Path(key): Path<String>, ) -> impl IntoResponse { let start_time = Instant::now(); match state.cache_service.get::<String>(&key).await { Ok(value) => { let response = CacheResponse::new(true, value, None) .with_execution_time(start_time); Json(response) } Err(e) => { error!("Get cache error: {}", e); Json(CacheResponse::new(false, None, Some("Internal error".to_string())) .with_execution_time(start_time)) } } } async fn set_cache( State(state): State<Arc<ServerState>>, Path(key): Path<String>, Json(request): Json<CacheRequest>, ) -> impl IntoResponse { let start_time = Instant::now(); if request.value.is_none() { return Json(CacheResponse::new(false, None, Some("Value is required".to_string())) .with_execution_time(start_time)); } let ttl = request.ttl.map(Duration::from_secs); match state.cache_service.set(&key, &request.value.unwrap(), ttl).await { Ok(_) => { info!("Cache set: {} (TTL: {:?})", key, ttl); Json(CacheResponse::new(true, Some("OK".to_string()), None) .with_execution_time(start_time)) } Err(e) => { error!("Set cache error: {}", e); Json(CacheResponse::new(false, None, Some("Internal error".to_string())) .with_execution_time(start_time)) } } } async fn delete_cache( State(state): State<Arc<ServerState>>, Path(key): Path<String>, ) -> impl IntoResponse { let start_time = Instant::now(); match state.cache_service.delete(&key).await { Ok(_) => { info!("Cache deleted: {}", key); Json(CacheResponse::new(true, Some("OK".to_string()), None) .with_execution_time(start_time)) } Err(e) => { error!("Delete cache error: {}", e); Json(CacheResponse::new(false, None, Some("Internal error".to_string())) .with_execution_time(start_time)) } } } async fn cache_exists( State(state): State<Arc<ServerState>>, Path(key): Path<String>, ) -> impl IntoResponse { let start_time = Instant::now(); match state.cache_service.exists(&key).await { Ok(exists) => { Json(CacheResponse::new(true, Some(exists.to_string()), None) .with_execution_time(start_time)) } Err(e) => { error!("Cache exists error: {}", e); Json(CacheResponse::new(false, None, Some("Internal error".to_string())) .with_execution_time(start_time)) } } } async fn bulk_cache_operations( State(state): State<Arc<ServerState>>, Json(request): Json<BulkCacheRequest>, ) -> impl IntoResponse { let start_time = Instant::now(); let mut results = Vec::new(); for operation in request.operations { let result = match operation.operation.as_str() { "get" => { match state.cache_service.get::<String>(&operation.key).await { Ok(value) => CacheOperationResult { key: operation.key, success: true, data: value, message: None, }, Err(e) => CacheOperationResult { key: operation.key, success: false, data: None, message: Some(e.to_string()), } } } "set" => { let ttl = operation.ttl.map(Duration::from_secs); match state.cache_service.set(&operation.key, &operation.value.unwrap(), ttl).await { Ok(_) => CacheOperationResult { key: operation.key, success: true, data: Some("OK".to_string()), message: None, }, Err(e) => CacheOperationResult { key: operation.key, success: false, data: None, message: Some(e.to_string()), } } } "delete" => { match state.cache_service.delete(&operation.key).await { Ok(_) => CacheOperationResult { key: operation.key, success: true, data: Some("OK".to_string()), message: None, }, Err(e) => CacheOperationResult { key: operation.key, success: false, data: None, message: Some(e.to_string()), } } } _ => CacheOperationResult { key: operation.key, success: false, data: None, message: Some("Unknown operation".to_string()), } }; results.push(result); } Json(CacheResponse::new(true, Some(results), None) .with_execution_time(start_time)) } #[derive(Debug, Serialize, Deserialize)] struct CacheOperationResult { pub key: String, pub success: bool, pub data: Option<String>, pub message: Option<String>, } async fn get_cache_stats( State(state): State<Arc<ServerState>>, ) -> impl IntoResponse { let stats = state.cache_service.get_stats().await; let response = CacheStatsResponse { hits: stats.hits, misses: stats.misses, sets: stats.sets, deletes: stats.deletes, l1_hits: stats.l1_hits, l2_hits: stats.l2_hits, hit_rate: stats.hit_rate(), avg_response_time_ms: stats.avg_response_time.as_secs_f64() * 1000.0, total_operations: stats.total_operations, }; Json(CacheResponse::new(true, Some(response), None)) } async fn clear_cache( State(state): State<Arc<ServerState>>, ) -> impl IntoResponse { let start_time = Instant::now(); match state.cache_service.clear().await { Ok(_) => { info!("Cache cleared"); Json(CacheResponse::new(true, Some("OK".to_string()), None) .with_execution_time(start_time)) } Err(e) => { error!("Clear cache error: {}", e); Json(CacheResponse::new(false, None, Some("Internal error".to_string())) .with_execution_time(start_time)) } } } async fn warmup_cache( State(state): State<Arc<ServerState>>, ) -> impl IntoResponse { let start_time = Instant::now(); match state.cache_service.warm_up().await { Ok(_) => { info!("Cache warm-up completed"); Json(CacheResponse::new(true, Some("OK".to_string()), None) .with_execution_time(start_time)) } Err(e) => { error!("Cache warm-up error: {}", e); Json(CacheResponse::new(false, None, Some("Internal error".to_string())) .with_execution_time(start_time)) } } } }
#![allow(unused)] fn main() { // 配置管理 // File: cache-service/src/config.rs use clap::Parser; use std::time::Duration; #[derive(Parser, Debug, Clone)] pub struct Config { #[arg(short, long)] pub addr: String, #[arg(short, long)] pub redis_url: String, #[arg(short, long, default_value = "1000")] pub l1_cache_size: usize, #[arg(short, long, default_value = "100")] pub parallel_tasks: usize, #[arg(long, default_value = "3600")] pub default_ttl_secs: u64, #[arg(long, default_value = "86400")] pub max_ttl_secs: u64, #[arg(long, default_value = "true")] pub warmup_enabled: bool, #[arg(long, default_value = "true")] pub metrics_enabled: bool, } impl Config { pub fn from_args() -> Self { Self::parse() } pub fn default_ttl(&self) -> Duration { Duration::from_secs(self.default_ttl_secs) } pub fn max_ttl(&self) -> Duration { Duration::from_secs(self.max_ttl_secs) } } }
本章小结
本章深入探讨了Rust的性能优化技术,从分析工具到实际应用构建:
核心技术掌握
-
性能分析工具:
- Criterion.rs基准测试框架
- 自定义性能监控器
- 实时系统监控
- Tracing集成
-
内存优化技术:
- 内存池实现和管理
- 零拷贝优化策略
- 对象池模式
- 智能指针优化
-
并发性能优化:
- 异步编程最佳实践
- 无锁数据结构
- Actor并发模型
- 工作窃取调度器
-
缓存策略设计:
- 多层缓存架构(L1/L2/L3)
- 智能更新策略
- 缓存预热机制
- 性能监控和调优
企业级项目
高性能缓存服务:
- 多层缓存架构:内存缓存 + Redis分布式缓存
- 智能更新策略:写通、写回、写绕模式
- 并发优化:异步处理、连接池、工作窃取
- 监控体系:实时性能统计、告警机制
- Web API:RESTful接口、批量操作、健康检查
性能提升效果
通过本章的学习和实践,系统性能可显著提升:
- 响应时间:降低80-95%
- 吞吐量:提升5-20倍
- 内存使用:优化50-70%
- 并发能力:提升10-100倍
第13章完成:性能优化核心技术已全面掌握,能够构建高性能企业级应用。准备进入第14章:安全编程。