Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

第11章:数据库操作

章节概述

数据持久化是现代应用程序的核心需求之一。在本章中,我们将深入探索Rust的数据库编程能力,从基础的SQL操作到复杂的企业级数据管理。本章不仅关注技术实现,更强调性能优化、数据安全和可维护性。

学习目标

  • 掌握Rust数据库编程的核心概念和最佳实践
  • 理解PostgreSQL等关系型数据库的特点和优势
  • 学会构建高性能的异步数据库操作
  • 掌握连接池管理和查询优化技术
  • 学会数据库迁移和版本管理
  • 设计并实现一个企业级任务管理平台

实战项目:构建一个企业级任务管理系统,支持团队协作、任务分配、进度跟踪、项目管理等功能。

11.1 数据库编程基础

11.1.1 Rust数据库生态

Rust在数据库编程方面具有以下优势:

  • 类型安全:编译时类型检查,避免SQL注入
  • 内存安全:防止缓冲区溢出和内存泄露
  • 零成本抽象:接近C++的性能表现
  • 异步支持:优秀的异步/await支持高并发
  • 丰富的生态:多个成熟的数据库驱动和ORM

11.1.2 主要数据库库介绍

sqlx - 异步SQL库

#![allow(unused)]
fn main() {
use sqlx::{Postgres, Row, Column};
use sqlx::postgres::PgPoolOptions;
use std::time::Duration;

#[derive(Debug, sqlx::FromRow)]
struct User {
    id: i64,
    username: String,
    email: String,
    created_at: chrono::DateTime<chrono::Utc>,
}

async fn connect_database() -> Result<sqlx::PgPool, sqlx::Error> {
    let pool = PgPoolOptions::new()
        .max_connections(10)
        .connect_timeout(Duration::from_secs(5))
        .connect("postgresql://user:password@localhost/database")
        .await?;
    
    Ok(pool)
}

async fn fetch_user_by_id(pool: &sqlx::PgPool, user_id: i64) -> Result<Option<User>, sqlx::Error> {
    let user = sqlx::query_as!(
        User,
        "SELECT id, username, email, created_at FROM users WHERE id = $1",
        user_id
    )
    .fetch_optional(pool)
    .await?;
    
    Ok(user)
}
}

diesel - 类型安全的ORM

#![allow(unused)]
fn main() {
use diesel::{PgConnection, Queryable, Insertable, associations::BelongsTo};
use diesel::table;

table! {
    users {
        id -> BigInt,
        username -> Varchar,
        email -> Varchar,
        created_at -> Timestamptz,
    }
}

#[derive(Queryable, BelongsTo)]
#[belongs_to(User)]
struct Task {
    id: i64,
    title: String,
    description: Option<String>,
    user_id: i64,
    created_at: chrono::DateTime<chrono::Utc>,
    updated_at: chrono::DateTime<chrono::Utc>,
}

fn insert_user(conn: &PgConnection, username: &str, email: &str) -> Result<i64, diesel::result::Error> {
    use crate::schema::users;
    
    let new_user = NewUser {
        username,
        email,
    };
    
    let result = diesel::insert_into(users::table)
        .values(&new_user)
        .returning(users::id)
        .get_result(conn)?;
    
    Ok(result)
}
}

11.1.3 数据库连接管理

#![allow(unused)]
fn main() {
use sqlx::{PgPool, Connection};
use std::time::Duration;
use tokio::sync::RwLock;
use std::collections::HashMap;
use tracing::{info, warn, error};

pub struct DatabaseManager {
    pool: PgPool,
    connections: RwLock<HashMap<String, PgPool>>,
    health_check_interval: Duration,
}

impl DatabaseManager {
    pub async fn new(database_url: &str) -> Result<Self, Box<dyn std::error::Error>> {
        let pool = PgPoolOptions::new()
            .max_connections(20)  // 最大连接数
            .min_connections(5)   // 最小连接数
            .max_lifetime(Duration::from_secs(1800))  // 连接生命周期
            .idle_timeout(Duration::from_secs(300))   // 空闲超时
            .connect_timeout(Duration::from_secs(10)) // 连接超时
            .connect(database_url)
            .await?;
        
        info!("Database connection pool initialized");
        
        Ok(DatabaseManager {
            pool,
            connections: RwLock::new(HashMap::new()),
            health_check_interval: Duration::from_secs(30),
        })
    }
    
    pub async fn health_check(&self) -> Result<bool, Box<dyn std::error::Error>> {
        let result: Result<(i64,), sqlx::Error> = sqlx::query_as("SELECT 1 as test")
            .fetch_one(&self.pool)
            .await;
        
        match result {
            Ok(_) => {
                info!("Database health check passed");
                Ok(true)
            }
            Err(e) => {
                error!("Database health check failed: {}", e);
                Ok(false)
            }
        }
    }
    
    pub async fn start_health_monitoring(&self) {
        let manager = self.clone();
        tokio::spawn(async move {
            let mut interval = tokio::time::interval(manager.health_check_interval);
            loop {
                interval.tick().await;
                
                if let Ok(false) = manager.health_check().await {
                    warn!("Database health check failed");
                    // 这里可以添加告警逻辑
                }
            }
        });
    }
    
    pub async fn get_connection(&self) -> Result<sqlx::PgConnection, sqlx::Error> {
        self.pool.acquire().await?.into_owned()
    }
    
    pub async fn execute_query<T>(
        &self,
        query: &str,
        params: impl sqlx::Encode<'_, sqlx::Postgres> + sqlx::Type<sqlx::Postgres> + Send
    ) -> Result<T, sqlx::Error>
    where
        T: for<'r> sqlx::FromRow<'r, sqlx::postgres::PgRow> + Send
    {
        let mut conn = self.get_connection().await?;
        sqlx::query_as::<_, T>(query)
            .bind(params)
            .fetch_one(&mut *conn)
            .await
    }
    
    pub async fn execute_update(&self, query: &str) -> Result<u64, sqlx::Error> {
        let mut conn = self.get_connection().await?;
        sqlx::query(query)
            .execute(&mut *conn)
            .await
            .map(|result| result.rows_affected())
    }
    
    pub async fn execute_insert<T>(
        &self,
        query: &str
    ) -> Result<T, sqlx::Error>
    where
        T: for<'r> sqlx::FromRow<'r, sqlx::postgres::PgRow> + Send
    {
        let mut conn = self.get_connection().await?;
        sqlx::query_as::<_, T>(query)
            .fetch_one(&mut *conn)
            .await
    }
}

impl Clone for DatabaseManager {
    fn clone(&self) -> Self {
        DatabaseManager {
            pool: self.pool.clone(),
            connections: self.connections.clone(),
            health_check_interval: self.health_check_interval,
        }
    }
}
}

11.2 SQL基础和PostgreSQL

11.2.1 PostgreSQL特性

PostgreSQL是功能最强大的开源关系型数据库之一,具有以下特点:

  • ACID事务支持
  • 复杂查询支持
  • JSON/JSONB支持
  • 全文搜索
  • 地理位置数据支持
  • 扩展性良好

11.2.2 基础SQL操作

#![allow(unused)]
fn main() {
// 数据定义语言 (DDL)
async fn create_tables(pool: &sqlx::PgPool) -> Result<(), sqlx::Error> {
    // 创建用户表
    sqlx::query(r#"
        CREATE TABLE IF NOT EXISTS users (
            id SERIAL PRIMARY KEY,
            username VARCHAR(50) UNIQUE NOT NULL,
            email VARCHAR(100) UNIQUE NOT NULL,
            password_hash VARCHAR(255) NOT NULL,
            display_name VARCHAR(100),
            avatar_url TEXT,
            is_active BOOLEAN DEFAULT TRUE,
            created_at TIMESTAMPTZ DEFAULT NOW(),
            updated_at TIMESTAMPTZ DEFAULT NOW()
        )
    "#).execute(pool).await?;
    
    // 创建项目表
    sqlx::query(r#"
        CREATE TABLE IF NOT EXISTS projects (
            id SERIAL PRIMARY KEY,
            name VARCHAR(100) NOT NULL,
            description TEXT,
            owner_id INTEGER NOT NULL REFERENCES users(id),
            status project_status DEFAULT 'active',
            created_at TIMESTAMPTZ DEFAULT NOW(),
            updated_at TIMESTAMPTZ DEFAULT NOW()
        )
    "#).execute(pool).await?;
    
    // 创建任务表
    sqlx::query(r#"
        CREATE TABLE IF NOT EXISTS tasks (
            id SERIAL PRIMARY KEY,
            title VARCHAR(200) NOT NULL,
            description TEXT,
            project_id INTEGER NOT NULL REFERENCES projects(id),
            assignee_id INTEGER REFERENCES users(id),
            created_by_id INTEGER NOT NULL REFERENCES users(id),
            status task_status DEFAULT 'todo',
            priority task_priority DEFAULT 'medium',
            estimated_hours DECIMAL(5,2),
            actual_hours DECIMAL(5,2) DEFAULT 0,
            due_date DATE,
            created_at TIMESTAMPTZ DEFAULT NOW(),
            updated_at TIMESTAMPTZ DEFAULT NOW()
        )
    "#).execute(pool).await?;
    
    // 创建任务评论表
    sqlx::query(r#"
        CREATE TABLE IF NOT EXISTS task_comments (
            id SERIAL PRIMARY KEY,
            task_id INTEGER NOT NULL REFERENCES tasks(id),
            user_id INTEGER NOT NULL REFERENCES users(id),
            content TEXT NOT NULL,
            created_at TIMESTAMPTZ DEFAULT NOW()
        )
    "#).execute(pool).await?;
    
    Ok(())
}

// 创建自定义枚举类型
async fn create_custom_types(pool: &sqlx::PgPool) -> Result<(), sqlx::Error> {
    sqlx::query(r#"
        DO $$ BEGIN
            CREATE TYPE project_status AS ENUM ('active', 'completed', 'cancelled', 'on_hold');
        EXCEPTION
            WHEN duplicate_object THEN null;
        END $$;
    "#).execute(pool).await?;
    
    sqlx::query(r#"
        DO $$ BEGIN
            CREATE TYPE task_status AS ENUM ('todo', 'in_progress', 'review', 'completed', 'cancelled');
        EXCEPTION
            WHEN duplicate_object THEN null;
        END $$;
    "#).execute(pool).await?;
    
    sqlx::query(r#"
        DO $$ BEGIN
            CREATE TYPE task_priority AS ENUM ('low', 'medium', 'high', 'urgent');
        EXCEPTION
            WHEN duplicate_object THEN null;
        END $$;
    "#).execute(pool).await?;
    
    Ok(())
}
}

11.2.3 索引优化

#![allow(unused)]
fn main() {
async fn create_indexes(pool: &sqlx::PgPool) -> Result<(), sqlx::Error> {
    // 为用户表创建索引
    sqlx::query("CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_users_email ON users(email)")
        .execute(pool).await?;
    sqlx::query("CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_users_username ON users(username)")
        .execute(pool).await?;
    
    // 为项目表创建索引
    sqlx::query("CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_projects_owner ON projects(owner_id)")
        .execute(pool).await?;
    sqlx::query("CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_projects_status ON projects(status)")
        .execute(pool).await?;
    
    // 为任务表创建复合索引
    sqlx::query("CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_tasks_project_status ON tasks(project_id, status)")
        .execute(pool).await?;
    sqlx::query("CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_tasks_assignee ON tasks(assignee_id)")
        .execute(pool).await?;
    sqlx::query("CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_tasks_due_date ON tasks(due_date)")
        .execute(pool).await?;
    sqlx::query("CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_tasks_created_at ON tasks(created_at)")
        .execute(pool).await?;
    
    // 创建全文搜索索引
    sqlx::query("CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_tasks_title_fts ON tasks USING gin(to_tsvector('english', title))")
        .execute(pool).await?;
    sqlx::query("CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_tasks_description_fts ON tasks USING gin(to_tsvector('english', description))")
        .execute(pool).await?;
    
    // 创建部分索引(只对活跃任务)
    sqlx::query("CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_tasks_active_assignee ON tasks(assignee_id) WHERE status != 'completed'")
        .execute(pool).await?;
    
    Ok(())
}
}

11.2.4 数据完整性约束

#![allow(unused)]
fn main() {
async fn add_constraints(pool: &sqlx::PgPool) -> Result<(), sqlx::Error> {
    // 添加外键约束
    sqlx::query("ALTER TABLE projects ADD CONSTRAINT fk_projects_owner FOREIGN KEY (owner_id) REFERENCES users(id) ON DELETE CASCADE")
        .execute(pool).await?;
    
    sqlx::query("ALTER TABLE tasks ADD CONSTRAINT fk_tasks_project FOREIGN KEY (project_id) REFERENCES projects(id) ON DELETE CASCADE")
        .execute(pool).await?;
    sqlx::query("ALTER TABLE tasks ADD CONSTRAINT fk_tasks_assignee FOREIGN KEY (assignee_id) REFERENCES users(id) ON DELETE SET NULL")
        .execute(pool).await?;
    sqlx::query("ALTER TABLE tasks ADD CONSTRAINT fk_tasks_created_by FOREIGN KEY (created_by_id) REFERENCES users(id) ON DELETE CASCADE")
        .execute(pool).await?;
    
    sqlx::query("ALTER TABLE task_comments ADD CONSTRAINT fk_task_comments_task FOREIGN KEY (task_id) REFERENCES tasks(id) ON DELETE CASCADE")
        .execute(pool).await?;
    sqlx::query("ALTER TABLE task_comments ADD CONSTRAINT fk_task_comments_user FOREIGN KEY (user_id) REFERENCES users(id) ON DELETE CASCADE")
        .execute(pool).await?;
    
    // 添加检查约束
    sqlx::query("ALTER TABLE tasks ADD CONSTRAINT chk_estimated_hours_positive CHECK (estimated_hours > 0 OR estimated_hours IS NULL)")
        .execute(pool).await?;
    sqlx::query("ALTER TABLE tasks ADD CONSTRAINT chk_actual_hours_non_negative CHECK (actual_hours >= 0)")
        .execute(pool).await?;
    sqlx::query("ALTER TABLE tasks ADD CONSTRAINT chk_due_date_after_created CHECK (due_date >= DATE(created_at) OR due_date IS NULL)")
        .execute(pool).await?;
    
    // 添加唯一约束
    sqlx::query("ALTER TABLE task_comments ADD CONSTRAINT unique_task_user_created_at UNIQUE (task_id, user_id, created_at)")
        .execute(pool).await?;
    
    Ok(())
}
}

11.3 异步数据库操作

11.3.1 查询构建器

#![allow(unused)]
fn main() {
use sqlx::Postgres;
use serde::{Serialize, Deserialize};
use std::collections::HashMap;

#[derive(Debug, Serialize, Deserialize)]
pub struct QueryBuilder {
    table: String,
    conditions: Vec<String>,
    params: Vec<Box<dyn sqlx::Encode<'static, Postgres> + Send>>,
    order_by: Option<(String, bool)>,
    limit: Option<i64>,
    offset: Option<i64>,
}

impl QueryBuilder {
    pub fn new(table: &str) -> Self {
        QueryBuilder {
            table: table.to_string(),
            conditions: Vec::new(),
            params: Vec::new(),
            order_by: None,
            limit: None,
            offset: None,
        }
    }
    
    pub fn add_condition(&mut self, condition: &str, param: impl sqlx::Encode<'static, Postgres> + Send + 'static) -> &mut Self {
        self.conditions.push(condition.to_string());
        self.params.push(Box::new(param) as Box<dyn sqlx::Encode<'static, Postgres> + Send>);
        self
    }
    
    pub fn add_or_condition(&mut self, condition: &str, param: impl sqlx::Encode<'static, Postgres> + Send + 'static) -> &mut Self {
        if !self.conditions.is_empty() {
            self.conditions.push(format!("OR {}", condition));
        } else {
            self.conditions.push(condition.to_string());
        }
        self.params.push(Box::new(param) as Box<dyn sqlx::Encode<'static, Postgres> + Send>);
        self
    }
    
    pub fn order_by(&mut self, column: &str, ascending: bool) -> &mut Self {
        self.order_by = Some((column.to_string(), ascending));
        self
    }
    
    pub fn limit(&mut self, limit: i64) -> &mut Self {
        self.limit = Some(limit);
        self
    }
    
    pub fn offset(&mut self, offset: i64) -> &mut Self {
        self.offset = Some(offset);
        self
    }
    
    pub fn build_select(&self) -> String {
        let mut query = format!("SELECT * FROM {}", self.table);
        
        if !self.conditions.is_empty() {
            query.push_str(" WHERE ");
            query.push_str(&self.conditions.join(" AND "));
        }
        
        if let Some((ref column, ascending)) = self.order_by {
            let order = if ascending { "ASC" } else { "DESC" };
            query.push_str(&format!(" ORDER BY {} {}", column, order));
        }
        
        if let Some(limit) = self.limit {
            query.push_str(&format!(" LIMIT {}", limit));
        }
        
        if let Some(offset) = self.offset {
            query.push_str(&format!(" OFFSET {}", offset));
        }
        
        query
    }
    
    pub async fn execute_select<T>(
        &self,
        pool: &sqlx::PgPool
    ) -> Result<Vec<T>, sqlx::Error>
    where
        T: for<'r> sqlx::FromRow<'r, sqlx::postgres::PgRow> + Send
    {
        let query = self.build_select();
        let mut query_builder = sqlx::query_as::<_, T>(&query);
        
        for param in &self.params {
            // 这里需要一个更复杂的方式来绑定参数
            // 简化实现
        }
        
        query_builder.fetch_all(pool).await
    }
}

// 使用示例
#[cfg(test)]
mod query_builder_tests {
    use super::*;
    
    #[tokio::test]
    async fn test_query_builder() {
        let mut query = QueryBuilder::new("tasks")
            .add_condition("status = $1", "todo")
            .add_condition("priority = $2", "high")
            .order_by("created_at", false)
            .limit(10);
        
        let sql = query.build_select();
        assert!(sql.contains("SELECT * FROM tasks"));
        assert!(sql.contains("WHERE status = $1 AND priority = $2"));
        assert!(sql.contains("ORDER BY created_at DESC"));
        assert!(sql.contains("LIMIT 10"));
    }
}
}

11.3.2 事务管理

#![allow(unused)]
fn main() {
use sqlx::PgPool;
use sqlx::Transaction;
use tokio::sync::Mutex;
use std::sync::Arc;

pub struct TransactionManager {
    pool: PgPool,
}

impl TransactionManager {
    pub fn new(pool: PgPool) -> Self {
        TransactionManager { pool }
    }
    
    pub async fn execute_in_transaction<T, F, R>(
        &self,
        operation: F
    ) -> Result<T, Box<dyn std::error::Error + Send + Sync>>
    where
        F: Fn(&mut sqlx::Transaction<'_, sqlx::Postgres>) -> R,
        R: std::future::Future<Output = Result<T, Box<dyn std::error::Error + Send + Sync>>>,
    {
        let mut tx = self.pool.begin().await?;
        
        let result = operation(&mut tx).await?;
        
        tx.commit().await?;
        Ok(result)
    }
    
    pub async fn batch_insert<T>(
        &self,
        items: &[T],
        insert_query: &str
    ) -> Result<Vec<i64>, Box<dyn std::error::Error + Send + Sync>>
    where
        T: sqlx::Encode<'static, sqlx::Postgres> + Send + Clone
    {
        let mut tx = self.pool.begin().await?;
        let mut ids = Vec::new();
        
        for item in items {
            let result = sqlx::query(insert_query)
                .bind(item)
                .execute(&mut *tx)
                .await?;
            
            ids.push(result.last_insert_id());
        }
        
        tx.commit().await?;
        Ok(ids)
    }
    
    pub async fn ensure_data_consistency<F, R>(
        &self,
        operations: Vec<F>
    ) -> Result<R, Box<dyn std::error::Error + Send + Sync>>
    where
        F: Fn(&mut sqlx::Transaction<'_, sqlx::Postgres>) -> R,
        R: std::future::Future<Output = Result<R, Box<dyn std::error::Error + Send + Sync>>>,
    {
        let mut tx = self.pool.begin().await?;
        
        for operation in operations {
            let _ = operation(&mut tx).await?;
        }
        
        tx.commit().await?;
        Ok(())
    }
}

// 使用事务的示例
async fn create_project_with_tasks(
    pool: &sqlx::PgPool,
    project_data: &ProjectData,
    tasks: &[TaskData]
) -> Result<Project, sqlx::Error> {
    let mut tx = pool.begin().await?;
    
    // 创建项目
    let project = sqlx::query_as!(
        Project,
        "INSERT INTO projects (name, description, owner_id) VALUES ($1, $2, $3) RETURNING *",
        project_data.name,
        project_data.description,
        project_data.owner_id
    )
    .fetch_one(&mut *tx)
    .await?;
    
    // 创建任务
    for task_data in tasks {
        sqlx::query!(
            "INSERT INTO tasks (title, description, project_id, created_by_id) VALUES ($1, $2, $3, $4)",
            task_data.title,
            task_data.description,
            project.id,
            task_data.created_by_id
        )
        .execute(&mut *tx)
        .await?;
    }
    
    tx.commit().await?;
    Ok(project)
}
}

11.3.3 批量操作优化

#![allow(unused)]
fn main() {
use sqlx::{PgPool, Row};
use tokio::time::Instant;

pub struct BatchOperations {
    pool: PgPool,
    batch_size: usize,
}

impl BatchOperations {
    pub fn new(pool: PgPool, batch_size: usize) -> Self {
        BatchOperations { pool, batch_size }
    }
    
    pub async fn batch_insert_users(
        &self,
        users: &[UserData]
    ) -> Result<Vec<i64>, sqlx::Error> {
        let mut ids = Vec::new();
        
        for batch in users.chunks(self.batch_size) {
            let mut tx = self.pool.begin().await?;
            
            for user_data in batch {
                let result = sqlx::query!(
                    "INSERT INTO users (username, email, password_hash) VALUES ($1, $2, $3) RETURNING id",
                    user_data.username,
                    user_data.email,
                    user_data.password_hash
                )
                .fetch_one(&mut *tx)
                .await?;
                
                ids.push(result.id);
            }
            
            tx.commit().await?;
        }
        
        Ok(ids)
    }
    
    pub async fn batch_update_task_statuses(
        &self,
        updates: &[(i64, String)]
    ) -> Result<u64, sqlx::Error> {
        let start_time = Instant::now();
        
        // 使用批量更新语句
        let mut query_builder = String::from("UPDATE tasks SET status = CASE id ");
        let mut params: Vec<Box<dyn sqlx::Encode<'static, Postgres> + Send>> = Vec::new();
        let mut param_counter = 1;
        
        for (task_id, status) in updates {
            query_builder.push_str(&format!("WHEN ${} THEN ${} ", param_counter, param_counter + 1));
            params.push(Box::new(*task_id) as Box<dyn sqlx::Encode<'static, Postgres> + Send>);
            params.push(Box::new(status.clone()) as Box<dyn sqlx::Encode<'static, Postgres> + Send>);
            param_counter += 2;
        }
        
        query_builder.push_str("END WHERE id = ANY($)");
        let task_ids: Vec<i64> = updates.iter().map(|(id, _)| *id).collect();
        params.push(Box::new(task_ids) as Box<dyn sqlx::Encode<'static, Postgres> + Send>);
        
        let mut query = sqlx::query(&query_builder);
        for param in params {
            // 绑定参数
            // 简化实现
        }
        
        let result = query.execute(&self.pool).await?;
        
        let duration = start_time.elapsed();
        tracing::info!("Batch update completed in {:?}", duration);
        
        Ok(result.rows_affected())
    }
    
    pub async fn batch_delete_old_tasks(
        &self,
        before_date: chrono::DateTime<chrono::Utc>
    ) -> Result<u64, sqlx::Error> {
        let result = sqlx::query!(
            "DELETE FROM tasks WHERE created_at < $1",
            before_date
        )
        .execute(&self.pool)
        .await?;
        
        Ok(result.rows_affected())
    }
    
    pub async fn bulk_upsert_projects(
        &self,
        projects: &[ProjectData]
    ) -> Result<Vec<i64>, sqlx::Error> {
        // 使用ON CONFLICT进行批量插入或更新
        let mut values = Vec::new();
        let mut param_index = 1;
        
        for project in projects {
            values.push(format!(
                "(${}, ${}, ${}, ${}, NOW())",
                param_index,
                param_index + 1,
                param_index + 2,
                param_index + 3
            ));
            param_index += 4;
        }
        
        let query = format!(
            r#"
            INSERT INTO projects (name, description, owner_id, created_at)
            VALUES {}
            ON CONFLICT (name) DO UPDATE SET
                description = EXCLUDED.description,
                owner_id = EXCLUDED.owner_id,
                updated_at = NOW()
            RETURNING id
            "#,
            values.join(", ")
        );
        
        // 构建并执行查询
        // 实际实现中需要正确绑定参数
        let mut query_builder = sqlx::query(&query);
        
        // 添加所有参数
        for project in projects {
            query_builder = query_builder
                .bind(&project.name)
                .bind(&project.description)
                .bind(&project.owner_id);
        }
        
        let rows = query_builder.fetch_all(&self.pool).await?;
        let ids: Vec<i64> = rows.iter().map(|row| row.get(0)).collect();
        
        Ok(ids)
    }
}

#[derive(Debug, Clone)]
struct UserData {
    username: String,
    email: String,
    password_hash: String,
}

#[derive(Debug, Clone)]
struct ProjectData {
    name: String,
    description: Option<String>,
    owner_id: i64,
}

#[derive(Debug, Clone)]
struct TaskData {
    title: String,
    description: Option<String>,
    created_by_id: i64,
}
}

11.4 连接池管理

11.4.1 高级连接池配置

#![allow(unused)]
fn main() {
use sqlx::{PgPool, PgPoolOptions};
use tokio::sync::RwLock;
use std::time::Duration;
use std::collections::HashMap;
use tracing::{info, warn, error};

pub struct ConnectionPoolManager {
    pools: RwLock<HashMap<String, PgPool>>,
    pool_configs: HashMap<String, PoolConfig>,
}

#[derive(Debug, Clone)]
pub struct PoolConfig {
    pub max_connections: u32,
    pub min_connections: u32,
    pub max_lifetime: Duration,
    pub idle_timeout: Duration,
    pub connect_timeout: Duration,
    pub acquire_timeout: Duration,
    pub health_check_interval: Duration,
}

impl Default for PoolConfig {
    fn default() -> Self {
        PoolConfig {
            max_connections: 20,
            min_connections: 5,
            max_lifetime: Duration::from_secs(1800),
            idle_timeout: Duration::from_secs(300),
            connect_timeout: Duration::from_secs(10),
            acquire_timeout: Duration::from_secs(30),
            health_check_interval: Duration::from_secs(60),
        }
    }
}

impl ConnectionPoolManager {
    pub fn new() -> Self {
        ConnectionPoolManager {
            pools: RwLock::new(HashMap::new()),
            pool_configs: HashMap::new(),
        }
    }
    
    pub async fn add_pool(
        &self,
        name: &str,
        connection_string: &str,
        config: Option<PoolConfig>
    ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
        let pool_config = config.unwrap_or_default();
        
        let pool = PgPoolOptions::new()
            .max_connections(pool_config.max_connections)
            .min_connections(pool_config.min_connections)
            .max_lifetime(pool_config.max_lifetime)
            .idle_timeout(pool_config.idle_timeout)
            .connect_timeout(pool_config.connect_timeout)
            .acquire_timeout(pool_config.acquire_timeout)
            .connect(connection_string)
            .await?;
        
        // 测试连接
        let test_result: Result<(i64,), sqlx::Error> = sqlx::query_as("SELECT 1")
            .fetch_one(&pool)
            .await;
        
        if let Err(e) = test_result {
            return Err(format!("Failed to test connection for pool '{}': {}", name, e).into());
        }
        
        // 存储池和配置
        {
            let mut pools = self.pools.write().await;
            pools.insert(name.to_string(), pool);
        }
        
        self.pool_configs.insert(name.to_string(), pool_config.clone());
        
        // 启动健康检查
        let pool_name = name.to_string();
        let pool_clone = pool.clone();
        let config_clone = pool_config.clone();
        
        tokio::spawn(async move {
            let mut interval = tokio::time::interval(config_clone.health_check_interval);
            loop {
                interval.tick().await;
                
                if let Err(e) = Self::health_check_pool(&pool_clone).await {
                    warn!("Health check failed for pool '{}': {}", pool_name, e);
                    // 这里可以添加告警逻辑
                }
            }
        });
        
        info!("Connection pool '{}' initialized successfully", name);
        Ok(())
    }
    
    async fn health_check_pool(pool: &sqlx::PgPool) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
        let result: Result<(i64,), sqlx::Error> = sqlx::query_as("SELECT 1")
            .fetch_one(pool)
            .await;
        
        if let Err(e) = result {
            return Err(format!("Health check failed: {}", e).into());
        }
        
        // 检查连接统计
        let stats = pool.statistics();
        if stats.idle_connections() == 0 {
            warn!("No idle connections in pool");
        }
        
        if stats.busy_connections() == pool.size() {
            warn!("All connections in pool are busy");
        }
        
        Ok(())
    }
    
    pub async fn get_pool(&self, name: &str) -> Option<sqlx::PgPool> {
        let pools = self.pools.read().await;
        pools.get(name).cloned()
    }
    
    pub async fn get_pool_with_timeout(
        &self,
        name: &str,
        timeout: Duration
    ) -> Result<sqlx::PgPool, Box<dyn std::error::Error + Send + Sync>> {
        let pool = self.get_pool(name)
            .await
            .ok_or_else(|| format!("Pool '{}' not found", name))?;
        
        // 尝试获取连接,带超时
        let start = std::time::Instant::now();
        loop {
            match tokio::time::timeout(timeout, pool.acquire()).await {
                Ok(Ok(_)) => {
                    return Ok(pool);
                }
                Ok(Err(e)) => {
                    return Err(format!("Failed to acquire connection from pool '{}': {}", name, e).into());
                }
                Err(_) => {
                    if start.elapsed() >= timeout {
                        return Err(format!("Timeout acquiring connection from pool '{}'", name).into());
                    }
                    tokio::time::sleep(Duration::from_millis(100)).await;
                }
            }
        }
    }
    
    pub async fn list_pools(&self) -> Vec<String> {
        let pools = self.pools.read().await;
        pools.keys().cloned().collect()
    }
    
    pub async fn close_pool(&self, name: &str) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
        let mut pools = self.pools.write().await;
        if let Some(pool) = pools.remove(name) {
            pool.close().await;
            info!("Connection pool '{}' closed", name);
        }
        
        self.pool_configs.remove(name);
        Ok(())
    }
    
    pub async fn get_pool_stats(&self, name: &str) -> Option<PoolStats> {
        let pools = self.pools.read().await;
        if let Some(pool) = pools.get(name) {
            let stats = pool.statistics();
            Some(PoolStats {
                size: pool.size(),
                idle_connections: stats.idle_connections(),
                busy_connections: stats.busy_connections(),
                total_connections: stats.total_connections(),
            })
        } else {
            None
        }
    }
}

#[derive(Debug, Clone)]
pub struct PoolStats {
    pub size: u32,
    pub idle_connections: u32,
    pub busy_connections: u32,
    pub total_connections: u32,
}

// 智能连接分配
pub struct ConnectionRouter {
    manager: ConnectionPoolManager,
    routing_rules: HashMap<String, String>, // table -> pool_name
}

impl ConnectionRouter {
    pub fn new(manager: ConnectionPoolManager) -> Self {
        ConnectionRouter {
            manager,
            routing_rules: HashMap::new(),
        }
    }
    
    pub fn add_routing_rule(&mut self, table: &str, pool_name: &str) {
        self.routing_rules.insert(table.to_string(), pool_name.to_string());
    }
    
    pub async fn get_connection_for_table(&self, table: &str) -> Result<sqlx::PgPool, Box<dyn std::error::Error + Send + Sync>> {
        if let Some(pool_name) = self.routing_rules.get(table) {
            self.manager.get_pool_with_timeout(pool_name, Duration::from_secs(5)).await
        } else {
            // 默认使用第一个可用的池
            let pools = self.manager.list_pools().await;
            if let Some(pool_name) = pools.first() {
                self.manager.get_pool_with_timeout(pool_name, Duration::from_secs(5)).await
            } else {
                Err("No connection pools available".into())
            }
        }
    }
}
}

11.4.2 连接池监控

#![allow(unused)]
fn main() {
use tokio::sync::RwLock;
use std::time::{Duration, Instant};
use std::collections::HashMap;
use serde::{Serialize, Deserialize};

pub struct PoolMonitor {
    pools: Arc<RwLock<ConnectionPoolManager>>,
    metrics: Arc<RwLock<PoolMetrics>>,
    collection_interval: Duration,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PoolMetrics {
    pool_name: String,
    timestamp: Instant,
    connection_count: u32,
    idle_connections: u32,
    busy_connections: u32,
    query_count: u64,
    average_query_time: Duration,
    slow_queries: Vec<SlowQuery>,
    errors: u64,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SlowQuery {
    query: String,
    duration: Duration,
    timestamp: Instant,
}

impl PoolMonitor {
    pub fn new(pools: Arc<RwLock<ConnectionPoolManager>>) -> Self {
        PoolMonitor {
            pools,
            metrics: Arc::new(RwLock::new(PoolMetrics {
                pool_name: String::new(),
                timestamp: Instant::now(),
                connection_count: 0,
                idle_connections: 0,
                busy_connections: 0,
                query_count: 0,
                average_query_time: Duration::from_millis(0),
                slow_queries: Vec::new(),
                errors: 0,
            })),
            collection_interval: Duration::from_secs(30),
        }
    }
    
    pub async fn start_monitoring(&self) {
        let pools = Arc::clone(&self.pools);
        let metrics = Arc::clone(&self.metrics);
        
        tokio::spawn(async move {
            let mut interval = tokio::time::interval(Duration::from_secs(30));
            loop {
                interval.tick().await;
                
                // 收集所有池的指标
                if let Ok(pool_list) = pools.read().await.list_pools().await {
                    for pool_name in pool_list {
                        if let Some(stats) = pools.read().await.get_pool_stats(&pool_name).await {
                            let mut metrics_guard = metrics.write().await;
                            metrics_guard.pool_name = pool_name;
                            metrics_guard.connection_count = stats.size;
                            metrics_guard.idle_connections = stats.idle_connections;
                            metrics_guard.busy_connections = stats.busy_connections;
                        }
                    }
                }
            }
        });
    }
    
    pub async fn record_query(&self, pool_name: &str, query: &str, duration: Duration) {
        let mut metrics_guard = self.metrics.write().await;
        
        if metrics_guard.pool_name == pool_name {
            metrics_guard.query_count += 1;
            
            // 更新平均查询时间
            let total_time = metrics_guard.average_query_time * (metrics_guard.query_count - 1);
            metrics_guard.average_query_time = total_time / metrics_guard.query_count + duration / metrics_guard.query_count;
            
            // 记录慢查询
            if duration > Duration::from_millis(1000) {
                metrics_guard.slow_queries.push(SlowQuery {
                    query: query.to_string(),
                    duration,
                    timestamp: Instant::now(),
                });
                
                // 只保留最近的10个慢查询
                if metrics_guard.slow_queries.len() > 10 {
                    metrics_guard.slow_queries.remove(0);
                }
            }
        }
    }
    
    pub async fn record_error(&self, pool_name: &str) {
        let mut metrics_guard = self.metrics.write().await;
        if metrics_guard.pool_name == pool_name {
            metrics_guard.errors += 1;
        }
    }
    
    pub async fn get_metrics(&self) -> PoolMetrics {
        self.metrics.read().await.clone()
    }
    
    pub async fn get_pool_health(&self, pool_name: &str) -> PoolHealth {
        let metrics = self.get_metrics().await;
        
        let connection_usage = if metrics.connection_count > 0 {
            metrics.busy_connections as f64 / metrics.connection_count as f64
        } else {
            0.0
        };
        
        let health_score = if connection_usage > 0.9 {
            PoolHealthStatus::Critical
        } else if connection_usage > 0.7 {
            PoolHealthStatus::Warning
        } else if metrics.errors > 0 {
            PoolHealthStatus::Degraded
        } else {
            PoolHealthStatus::Healthy
        };
        
        PoolHealth {
            pool_name: pool_name.to_string(),
            status: health_score,
            connection_usage,
            error_rate: if metrics.query_count > 0 {
                metrics.errors as f64 / metrics.query_count as f64
            } else {
                0.0
            },
            average_query_time: metrics.average_query_time,
            last_updated: metrics.timestamp,
        }
    }
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum PoolHealthStatus {
    Healthy,
    Degraded,
    Warning,
    Critical,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PoolHealth {
    pub pool_name: String,
    pub status: PoolHealthStatus,
    pub connection_usage: f64,
    pub error_rate: f64,
    pub average_query_time: Duration,
    pub last_updated: Instant,
}
}

11.5 迁移管理

11.5.1 迁移系统设计

#![allow(unused)]
fn main() {
use sqlx::{PgPool, Migrate, Migration};
use std::collections::HashMap;
use std::time::{Duration, Instant};
use tracing::{info, warn, error};

#[derive(Debug, Clone)]
pub struct MigrationConfig {
    pub auto_migrate: bool,
    pub backup_before_migration: bool,
    pub transaction_per_migration: bool,
    pub verify_checksums: bool,
}

impl Default for MigrationConfig {
    fn default() -> Self {
        MigrationConfig {
            auto_migrate: true,
            backup_before_migration: true,
            transaction_per_migration: true,
            verify_checksums: true,
        }
    }
}

pub struct MigrationManager {
    pool: PgPool,
    migrations: HashMap<String, DatabaseMigration>,
    config: MigrationConfig,
}

#[derive(Debug, Clone)]
struct DatabaseMigration {
    id: String,
    description: String,
    sql: String,
    checksum: String,
    created_at: Instant,
    applied_at: Option<Instant>,
}

impl MigrationManager {
    pub fn new(pool: PgPool, config: Option<MigrationConfig>) -> Self {
        MigrationManager {
            pool,
            migrations: HashMap::new(),
            config: config.unwrap_or_default(),
        }
    }
    
    pub fn add_migration(&mut self, id: &str, description: &str, sql: &str) -> Result<(), Box<dyn std::error::Error>> {
        let checksum = self.calculate_checksum(sql);
        
        let migration = DatabaseMigration {
            id: id.to_string(),
            description: description.to_string(),
            sql: sql.to_string(),
            checksum,
            created_at: Instant::now(),
            applied_at: None,
        };
        
        self.migrations.insert(id.to_string(), migration);
        info!("Added migration: {} - {}", id, description);
        
        Ok(())
    }
    
    pub async fn run_migrations(&mut self) -> Result<(), Box<dyn std::error::Error>> {
        info!("Starting migration process");
        
        // 创建迁移历史表
        self.create_migration_table().await?;
        
        // 获取已应用迁移列表
        let applied_migrations = self.get_applied_migrations().await?;
        let applied_ids: std::collections::HashSet<String> = applied_migrations.into_iter().collect();
        
        // 找出需要应用的新迁移
        let mut pending_migrations = Vec::new();
        for migration in self.migrations.values() {
            if !applied_ids.contains(&migration.id) {
                pending_migrations.push(migration.clone());
            }
        }
        
        // 按ID排序确保执行顺序
        pending_migrations.sort_by(|a, b| a.id.cmp(&b.id));
        
        if pending_migrations.is_empty() {
            info!("No pending migrations");
            return Ok(());
        }
        
        info!("Found {} pending migrations", pending_migrations.len());
        
        // 执行迁移
        for migration in pending_migrations {
            self.apply_migration(&migration).await?;
        }
        
        info!("Migration process completed successfully");
        Ok(())
    }
    
    async fn create_migration_table(&self) -> Result<(), sqlx::Error> {
        sqlx::query(r#"
            CREATE TABLE IF NOT EXISTS schema_migrations (
                id VARCHAR(255) PRIMARY KEY,
                description TEXT,
                checksum VARCHAR(64) NOT NULL,
                created_at TIMESTAMPTZ NOT NULL,
                applied_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
            )
        "#).execute(&self.pool).await?;
        
        Ok(())
    }
    
    async fn get_applied_migrations(&self) -> Result<Vec<String>, sqlx::Error> {
        let rows = sqlx::query!("SELECT id FROM schema_migrations ORDER BY id")
            .fetch_all(&self.pool)
            .await?;
        
        Ok(rows.into_iter().map(|row| row.id).collect())
    }
    
    async fn apply_migration(&self, migration: &DatabaseMigration) -> Result<(), Box<dyn std::error::Error>> {
        info!("Applying migration: {} - {}", migration.id, migration.description);
        
        if self.config.transaction_per_migration {
            let mut tx = self.pool.begin().await?;
            
            // 执行迁移SQL
            sqlx::query(&migration.sql).execute(&mut *tx).await?;
            
            // 记录迁移历史
            sqlx::query!(
                "INSERT INTO schema_migrations (id, description, checksum, created_at) VALUES ($1, $2, $3, $4)",
                migration.id,
                migration.description,
                migration.checksum,
                migration.created_at
            )
            .execute(&mut *tx)
            .await?;
            
            tx.commit().await?;
        } else {
            // 直接执行
            sqlx::query(&migration.sql).execute(&self.pool).await?;
            
            sqlx::query!(
                "INSERT INTO schema_migrations (id, description, checksum, created_at) VALUES ($1, $2, $3, $4)",
                migration.id,
                migration.description,
                migration.checksum,
                migration.created_at
            )
            .execute(&self.pool)
            .await?;
        }
        
        if self.config.verify_checksums {
            // 验证checksum
            let current_checksum = self.calculate_checksum(&migration.sql);
            if current_checksum != migration.checksum {
                warn!("Checksum mismatch for migration {}", migration.id);
            }
        }
        
        info!("Successfully applied migration: {}", migration.id);
        Ok(())
    }
    
    pub async fn rollback_migration(&self, migration_id: &str) -> Result<(), Box<dyn std::error::Error>> {
        info!("Rolling back migration: {}", migration_id);
        
        // 检查迁移是否存在
        let exists = sqlx::query!(
            "SELECT 1 FROM schema_migrations WHERE id = $1",
            migration_id
        )
        .fetch_optional(&self.pool)
        .await?
        .is_some();
        
        if !exists {
            return Err(format!("Migration {} not found", migration_id).into());
        }
        
        // 获取迁移信息
        let migration_info = sqlx::query!(
            "SELECT description FROM schema_migrations WHERE id = $1",
            migration_id
        )
        .fetch_one(&self.pool)
        .await?;
        
        // 这里需要实现回滚逻辑
        // 实际实现中应该存储回滚SQL
        warn!("Rollback for migration {} ({}) needs to be implemented", migration_id, migration_info.description);
        
        Ok(())
    }
    
    pub async fn get_migration_status(&self) -> Result<Vec<MigrationStatus>, Box<dyn std::error::Error>> {
        let applied_migrations = self.get_applied_migrations().await?;
        let applied_ids: std::collections::HashSet<String> = applied_migrations.into_iter().collect();
        
        let mut status = Vec::new();
        
        for migration in self.migrations.values() {
            let is_applied = applied_ids.contains(&migration.id);
            status.push(MigrationStatus {
                id: migration.id.clone(),
                description: migration.description.clone(),
                status: if is_applied { "applied".to_string() } else { "pending".to_string() },
                applied_at: if is_applied { Some(migration.created_at) } else { None },
                checksum: migration.checksum.clone(),
            });
        }
        
        status.sort_by(|a, b| a.id.cmp(&b.id));
        Ok(status)
    }
    
    fn calculate_checksum(&self, sql: &str) -> String {
        use sha2::{Sha256, Digest};
        
        let mut hasher = Sha256::new();
        hasher.update(sql);
        format!("{:x}", hasher.finalize())
    }
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct MigrationStatus {
    pub id: String,
    pub description: String,
    pub status: String,
    pub applied_at: Option<Instant>,
    pub checksum: String,
}
}

11.5.2 自动迁移系统

#![allow(unused)]
fn main() {
use std::fs;
use std::path::Path;
use sqlx::PgPool;
use tracing::{info, warn, error};

pub struct AutomaticMigrationSystem {
    pool: PgPool,
    migration_directory: std::path::PathBuf,
    config: MigrationConfig,
}

impl AutomaticMigrationSystem {
    pub fn new(pool: PgPool, migration_directory: &str) -> Self {
        AutomaticMigrationSystem {
            pool,
            migration_directory: Path::new(migration_directory).to_path_buf(),
            config: MigrationConfig::default(),
        }
    }
    
    pub async fn discover_and_run_migrations(&mut self) -> Result<(), Box<dyn std::error::Error>> {
        if !self.migration_directory.exists() {
            fs::create_dir_all(&self.migration_directory)?;
            info!("Created migration directory: {:?}", self.migration_directory);
        }
        
        // 扫描迁移文件
        let migration_files = self.scan_migration_files().await?;
        info!("Found {} migration files", migration_files.len());
        
        // 创建迁移管理器
        let mut migration_manager = MigrationManager::new(self.pool.clone(), Some(self.config.clone()));
        
        // 添加发现的迁移
        for file in migration_files {
            let migration = self.load_migration_file(&file).await?;
            migration_manager.add_migration(&migration.id, &migration.description, &migration.sql)?;
        }
        
        // 运行迁移
        migration_manager.run_migrations().await?;
        
        Ok(())
    }
    
    async fn scan_migration_files(&self) -> Result<Vec<MigrationFile>, Box<dyn std::error::Error>> {
        let mut files = Vec::new();
        
        if self.migration_directory.exists() {
            let entries = fs::read_dir(&self.migration_directory)?;
            
            for entry in entries {
                let entry = entry?;
                let path = entry.path();
                
                if path.is_file() && path.extension().and_then(|s| s.to_str()) == Some("sql") {
                    let file_name = path.file_stem()
                        .and_then(|s| s.to_str())
                        .ok_or_else(|| "Invalid file name".to_string())?;
                    
                    let parts: Vec<&str> = file_name.split('_').collect();
                    if parts.len() >= 2 {
                        let timestamp = parts[0].to_string();
                        let id = parts[1].to_string();
                        let description = parts[2..].join(" ");
                        
                        files.push(MigrationFile {
                            path,
                            timestamp,
                            id,
                            description,
                        });
                    }
                }
            }
        }
        
        // 按时间戳排序
        files.sort_by(|a, b| a.timestamp.cmp(&b.timestamp));
        Ok(files)
    }
    
    async fn load_migration_file(&self, file: &MigrationFile) -> Result<MigrationFile, Box<dyn std::error::Error>> {
        let content = fs::read_to_string(&file.path)?;
        Ok(MigrationFile {
            path: file.path.clone(),
            timestamp: file.timestamp.clone(),
            id: file.id.clone(),
            description: file.description.clone(),
            content: Some(content),
        })
    }
    
    pub async fn create_migration_file(&self, id: &str, description: &str) -> Result<std::path::PathBuf, Box<dyn std::error::Error>> {
        let timestamp = chrono::Utc::now().format("%Y%m%d_%H%M%S").to_string();
        let filename = format!("{}_{}_{}.sql", timestamp, id, description.replace(' ', "_"));
        let file_path = self.migration_directory.join(filename);
        
        let content = format!(
            "-- Migration: {} - {}\n-- Created: {}\n\nBEGIN;\n\n-- TODO: Add your migration SQL here\n\nCOMMIT;\n",
            id,
            description,
            chrono::Utc::now().to_rfc3339()
        );
        
        fs::write(&file_path, content)?;
        
        info!("Created migration file: {:?}", file_path);
        Ok(file_path)
    }
}

#[derive(Debug, Clone)]
struct MigrationFile {
    path: std::path::PathBuf,
    timestamp: String,
    id: String,
    description: String,
    content: Option<String>,
}

impl MigrationFile {
    pub fn sql(&self) -> &str {
        self.content.as_deref().unwrap_or("")
    }
}
}

11.6 企业级任务管理平台

现在我们来构建一个完整的企业级任务管理系统,集成所有学到的数据库技术。

#![allow(unused)]
fn main() {
// 任务管理系统主项目
// File: task-manager/Cargo.toml
/*
[package]
name = "enterprise-task-manager"
version = "1.0.0"
edition = "2021"

[dependencies]
tokio = { version = "1.0", features = ["full"] }
sqlx = { version = "0.7", features = ["runtime-tokio-rustls", "postgres", "json", "uuid", "chrono"] }
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
chrono = { version = "0.4", features = ["serde"] }
uuid = { version = "1.0", features = ["v4", "serde"] }
clap = { version = "4.0", features = ["derive"] }
tracing = "0.1"
tracing-subscriber = "0.3"
bcrypt = "0.15"
jsonwebtoken = "9.0"
reqwest = { version = "0.11", features = ["json"] }
anyhow = "1.0"
thiserror = "1.0"
*/

// 核心数据结构
// File: task-manager/src/models.rs
use serde::{Deserialize, Serialize};
use chrono::{DateTime, Utc};
use uuid::Uuid;
use sqlx::{FromRow, Type};

#[derive(Debug, Clone, FromRow, Serialize, Deserialize)]
pub struct User {
    pub id: Uuid,
    pub username: String,
    pub email: String,
    pub display_name: String,
    pub password_hash: String,
    pub is_active: bool,
    pub role: UserRole,
    pub created_at: DateTime<Utc>,
    pub updated_at: DateTime<Utc>,
    pub last_login: Option<DateTime<Utc>>,
}

#[derive(Debug, Clone, Serialize, Deserialize, Type)]
#[sqlx(type_name = "user_role")]
#[serde(rename_all = "snake_case")]
pub enum UserRole {
    Admin,
    Manager,
    Member,
    Viewer,
}

#[derive(Debug, Clone, FromRow, Serialize, Deserialize)]
pub struct Team {
    pub id: Uuid,
    pub name: String,
    pub description: Option<String>,
    pub owner_id: Uuid,
    pub created_at: DateTime<Utc>,
    pub updated_at: DateTime<Utc>,
}

#[derive(Debug, Clone, FromRow, Serialize, Deserialize)]
pub struct Project {
    pub id: Uuid,
    pub name: String,
    pub description: Option<String>,
    pub team_id: Uuid,
    pub owner_id: Uuid,
    pub status: ProjectStatus,
    pub priority: ProjectPriority,
    pub start_date: Option<DateTime<Utc>>,
    pub end_date: Option<DateTime<Utc>>,
    pub created_at: DateTime<Utc>,
    pub updated_at: DateTime<Utc>,
}

#[derive(Debug, Clone, Serialize, Deserialize, Type)]
#[sqlx(type_name = "project_status")]
#[serde(rename_all = "snake_case")]
pub enum ProjectStatus {
    Planning,
    Active,
    OnHold,
    Completed,
    Cancelled,
}

#[derive(Debug, Clone, Serialize, Deserialize, Type)]
#[sqlx(type_name = "project_priority")]
#[serde(rename_all = "snake_case")]
pub enum ProjectPriority {
    Low,
    Medium,
    High,
    Critical,
}

#[derive(Debug, Clone, FromRow, Serialize, Deserialize)]
pub struct Task {
    pub id: Uuid,
    pub title: String,
    pub description: Option<String>,
    pub project_id: Uuid,
    pub assignee_id: Option<Uuid>,
    pub created_by_id: Uuid,
    pub status: TaskStatus,
    pub priority: TaskPriority,
    pub task_type: TaskType,
    pub estimated_hours: Option<f64>,
    pub actual_hours: f64,
    pub due_date: Option<DateTime<Utc>>,
    pub completed_at: Option<DateTime<Utc>>,
    pub created_at: DateTime<Utc>,
    pub updated_at: DateTime<Utc>,
    pub progress: i32, // 0-100
}

#[derive(Debug, Clone, Serialize, Deserialize, Type)]
#[sqlx(type_name = "task_status")]
#[serde(rename_all = "snake_case")]
pub enum TaskStatus {
    Todo,
    InProgress,
    InReview,
    Testing,
    Completed,
    Cancelled,
    Blocked,
}

#[derive(Debug, Clone, Serialize, Deserialize, Type)]
#[sqlx(type_name = "task_priority")]
#[serde(rename_all = "snake_case")]
pub enum TaskPriority {
    Low,
    Medium,
    High,
    Urgent,
}

#[derive(Debug, Clone, Serialize, Deserialize, Type)]
#[sqlx(type_name = "task_type")]
#[serde(rename_all = "snake_case")]
pub enum TaskType {
    Feature,
    BugFix,
    Documentation,
    Design,
    Testing,
    Research,
    Other,
}

#[derive(Debug, Clone, FromRow, Serialize, Deserialize)]
pub struct TaskComment {
    pub id: Uuid,
    pub task_id: Uuid,
    pub user_id: Uuid,
    pub content: String,
    pub created_at: DateTime<Utc>,
}

#[derive(Debug, Clone, FromRow, Serialize, Deserialize)]
pub struct TaskAttachment {
    pub id: Uuid,
    pub task_id: Uuid,
    pub filename: String,
    pub file_path: String,
    pub file_size: i64,
    pub mime_type: String,
    pub uploaded_by: Uuid,
    pub created_at: DateTime<Utc>,
}

#[derive(Debug, Clone, FromRow, Serialize, Deserialize)]
pub struct TimeEntry {
    pub id: Uuid,
    pub task_id: Uuid,
    pub user_id: Uuid,
    pub hours: f64,
    pub description: Option<String>,
    pub date: DateTime<Utc>,
    pub created_at: DateTime<Utc>,
}

#[derive(Debug, Clone, FromRow, Serialize, Deserialize)]
pub struct Notification {
    pub id: Uuid,
    pub user_id: Uuid,
    pub title: String,
    pub message: String,
    pub notification_type: NotificationType,
    pub is_read: bool,
    pub related_id: Option<Uuid>,
    pub created_at: DateTime<Utc>,
}

#[derive(Debug, Clone, Serialize, Deserialize, Type)]
#[sqlx(type_name = "notification_type")]
#[serde(rename_all = "snake_case")]
pub enum NotificationType {
    TaskAssigned,
    TaskCompleted,
    TaskOverdue,
    ProjectUpdated,
    CommentAdded,
    System,
}

// API请求/响应结构
#[derive(Debug, Serialize, Deserialize)]
pub struct CreateUserRequest {
    pub username: String,
    pub email: String,
    pub display_name: String,
    pub password: String,
    pub role: UserRole,
}

#[derive(Debug, Serialize, Deserialize)]
pub struct CreateProjectRequest {
    pub name: String,
    pub description: Option<String>,
    pub team_id: Uuid,
    pub priority: ProjectPriority,
    pub start_date: Option<DateTime<Utc>>,
    pub end_date: Option<DateTime<Utc>>,
}

#[derive(Debug, Serialize, Deserialize)]
pub struct CreateTaskRequest {
    pub title: String,
    pub description: Option<String>,
    pub project_id: Uuid,
    pub assignee_id: Option<Uuid>,
    pub priority: TaskPriority,
    pub task_type: TaskType,
    pub estimated_hours: Option<f64>,
    pub due_date: Option<DateTime<Utc>>,
}

#[derive(Debug, Serialize, Deserialize)]
pub struct UpdateTaskRequest {
    pub title: Option<String>,
    pub description: Option<String>,
    pub assignee_id: Option<Uuid>,
    pub status: Option<TaskStatus>,
    pub priority: Option<TaskPriority>,
    pub due_date: Option<DateTime<Utc>>,
    pub progress: Option<i32>,
}

#[derive(Debug, Serialize, Deserialize)]
pub struct CreateTimeEntryRequest {
    pub task_id: Uuid,
    pub hours: f64,
    pub description: Option<String>,
    pub date: DateTime<Utc>,
}

#[derive(Debug, Serialize, Deserialize)]
pub struct ApiResponse<T> {
    pub success: bool,
    pub data: Option<T>,
    pub error: Option<String>,
    pub timestamp: DateTime<Utc>,
    pub request_id: Uuid,
}

impl<T> ApiResponse<T> {
    pub fn success(data: T) -> Self {
        ApiResponse {
            success: true,
            data: Some(data),
            error: None,
            timestamp: Utc::now(),
            request_id: Uuid::new_v4(),
        }
    }
    
    pub fn error(message: String) -> ApiResponse<T> {
        ApiResponse {
            success: false,
            data: None,
            error: Some(message),
            timestamp: Utc::now(),
            request_id: Uuid::new_v4(),
        }
    }
}

// 查询过滤器
#[derive(Debug, Serialize, Deserialize)]
pub struct TaskFilter {
    pub project_id: Option<Uuid>,
    pub assignee_id: Option<Uuid>,
    pub status: Option<TaskStatus>,
    pub priority: Option<TaskPriority>,
    pub task_type: Option<TaskType>,
    pub due_date_from: Option<DateTime<Utc>>,
    pub due_date_to: Option<DateTime<Utc>>,
    pub created_by: Option<Uuid>,
    pub search: Option<String>,
    pub limit: Option<i64>,
    pub offset: Option<i64>,
}

#[derive(Debug, Serialize, Deserialize)]
pub struct ProjectFilter {
    pub team_id: Option<Uuid>,
    pub owner_id: Option<Uuid>,
    pub status: Option<ProjectStatus>,
    pub priority: Option<ProjectPriority>,
    pub start_date_from: Option<DateTime<Utc>>,
    pub start_date_to: Option<DateTime<Utc>>,
    pub end_date_from: Option<DateTime<Utc>>,
    pub end_date_to: Option<DateTime<Utc>>,
    pub search: Option<String>,
    pub limit: Option<i64>,
    pub offset: Option<i64>,
}
}
#![allow(unused)]
fn main() {
// 数据库服务层
// File: task-manager/src/services.rs
use super::models::*;
use crate::database::DatabaseManager;
use sqlx::PgPool;
use tracing::{info, warn, error, instrument};

pub struct UserService {
    pool: DatabaseManager,
}

impl UserService {
    pub fn new(pool: DatabaseManager) -> Self {
        UserService { pool }
    }
    
    #[instrument(skip(self))]
    pub async fn create_user(&self, request: &CreateUserRequest) -> Result<User, sqlx::Error> {
        let password_hash = bcrypt::hash(&request.password, bcrypt::DEFAULT_COST)?;
        
        let user = sqlx::query!(
            r#"
            INSERT INTO users (id, username, email, display_name, password_hash, role, is_active)
            VALUES (gen_random_uuid(), $1, $2, $3, $4, $5, true)
            RETURNING *
            "#,
            request.username,
            request.email,
            request.display_name,
            password_hash,
            request.role as UserRole
        )
        .fetch_one(&self.pool.pool)
        .await?;
        
        Ok(User::from_row(&user)?)
    }
    
    #[instrument(skip(self))]
    pub async fn get_user_by_id(&self, user_id: &Uuid) -> Result<Option<User>, sqlx::Error> {
        let user = sqlx::query!(
            "SELECT * FROM users WHERE id = $1",
            user_id
        )
        .fetch_optional(&self.pool.pool)
        .await?;
        
        Ok(user.map(|row| User::from_row(&row).unwrap()))
    }
    
    #[instrument(skip(self))]
    pub async fn get_user_by_username(&self, username: &str) -> Result<Option<User>, sqlx::Error> {
        let user = sqlx::query!(
            "SELECT * FROM users WHERE username = $1",
            username
        )
        .fetch_optional(&self.pool.pool)
        .await?;
        
        Ok(user.map(|row| User::from_row(&row).unwrap()))
    }
    
    #[instrument(skip(self))]
    pub async fn get_user_by_email(&self, email: &str) -> Result<Option<User>, sqlx::Error> {
        let user = sqlx::query!(
            "SELECT * FROM users WHERE email = $1",
            email
        )
        .fetch_optional(&self.pool.pool)
        .await?;
        
        Ok(user.map(|row| User::from_row(&row).unwrap()))
    }
    
    #[instrument(skip(self))]
    pub async fn update_user_last_login(&self, user_id: &Uuid) -> Result<(), sqlx::Error> {
        sqlx::query!(
            "UPDATE users SET last_login = NOW() WHERE id = $1",
            user_id
        )
        .execute(&self.pool.pool)
        .await?;
        
        Ok(())
    }
    
    #[instrument(skip(self))]
    pub async fn get_all_users(&self) -> Result<Vec<User>, sqlx::Error> {
        let users = sqlx::query!(
            "SELECT * FROM users ORDER BY created_at DESC"
        )
        .fetch_all(&self.pool.pool)
        .await?;
        
        Ok(users.into_iter().map(|row| User::from_row(&row).unwrap()).collect())
    }
    
    #[instrument(skip(self))]
    pub async fn authenticate_user(&self, username: &str, password: &str) -> Result<Option<User>, sqlx::Error> {
        if let Some(user) = self.get_user_by_username(username).await? {
            if bcrypt::verify(password, &user.password_hash)? {
                self.update_user_last_login(&user.id).await?;
                Ok(Some(user))
            } else {
                Ok(None)
            }
        } else {
            Ok(None)
        }
    }
}

pub struct ProjectService {
    pool: DatabaseManager,
}

impl ProjectService {
    pub fn new(pool: DatabaseManager) -> Self {
        ProjectService { pool }
    }
    
    #[instrument(skip(self))]
    pub async fn create_project(&self, request: &CreateProjectRequest, owner_id: Uuid) -> Result<Project, sqlx::Error> {
        let project = sqlx::query!(
            r#"
            INSERT INTO projects (id, name, description, team_id, owner_id, status, priority)
            VALUES (gen_random_uuid(), $1, $2, $3, $4, 'active', $5)
            RETURNING *
            "#,
            request.name,
            request.description,
            request.team_id,
            owner_id,
            request.priority as ProjectPriority
        )
        .fetch_one(&self.pool.pool)
        .await?;
        
        Ok(Project::from_row(&project)?)
    }
    
    #[instrument(skip(self))]
    pub async fn get_project_by_id(&self, project_id: &Uuid) -> Result<Option<Project>, sqlx::Error> {
        let project = sqlx::query!(
            "SELECT * FROM projects WHERE id = $1",
            project_id
        )
        .fetch_optional(&self.pool.pool)
        .await?;
        
        Ok(project.map(|row| Project::from_row(&row).unwrap()))
    }
    
    #[instrument(skip(self))]
    pub async fn get_projects_by_team(&self, team_id: &Uuid) -> Result<Vec<Project>, sqlx::Error> {
        let projects = sqlx::query!(
            "SELECT * FROM projects WHERE team_id = $1 ORDER BY created_at DESC",
            team_id
        )
        .fetch_all(&self.pool.pool)
        .await?;
        
        Ok(projects.into_iter().map(|row| Project::from_row(&row).unwrap()).collect())
    }
    
    #[instrument(skip(self))]
    pub async fn get_projects_by_user(&self, user_id: &Uuid) -> Result<Vec<Project>, sqlx::Error> {
        let projects = sqlx::query!(
            r#"
            SELECT DISTINCT p.* FROM projects p
            LEFT JOIN project_members pm ON p.id = pm.project_id
            WHERE p.owner_id = $1 OR pm.user_id = $1
            ORDER BY p.created_at DESC
            "#,
            user_id
        )
        .fetch_all(&self.pool.pool)
        .await?;
        
        Ok(projects.into_iter().map(|row| Project::from_row(&row).unwrap()).collect())
    }
    
    #[instrument(skip(self))]
    pub async fn update_project_status(&self, project_id: &Uuid, status: ProjectStatus) -> Result<(), sqlx::Error> {
        sqlx::query!(
            "UPDATE projects SET status = $1, updated_at = NOW() WHERE id = $2",
            status as ProjectStatus,
            project_id
        )
        .execute(&self.pool.pool)
        .await?;
        
        Ok(())
    }
    
    #[instrument(skip(self))]
    pub async fn filter_projects(&self, filter: &ProjectFilter) -> Result<(Vec<Project>, i64), sqlx::Error> {
        let mut where_conditions = Vec::new();
        let mut params: Vec<Box<dyn sqlx::Encode<'static, sqlx::Postgres> + Send>> = Vec::new();
        let mut param_index = 1;
        
        if let Some(team_id) = &filter.team_id {
            where_conditions.push(format!("team_id = ${}", param_index));
            params.push(Box::new(*team_id) as Box<dyn sqlx::Encode<'static, sqlx::Postgres> + Send>);
            param_index += 1;
        }
        
        if let Some(owner_id) = &filter.owner_id {
            where_conditions.push(format!("owner_id = ${}", param_index));
            params.push(Box::new(*owner_id) as Box<dyn sqlx::Encode<'static, sqlx::Postgres> + Send>);
            param_index += 1;
        }
        
        if let Some(status) = &filter.status {
            where_conditions.push(format!("status = ${}", param_index));
            params.push(Box::new(status.clone()) as Box<dyn sqlx::Encode<'static, sqlx::Postgres> + Send>);
            param_index += 1;
        }
        
        if let Some(priority) = &filter.priority {
            where_conditions.push(format!("priority = ${}", param_index));
            params.push(Box::new(priority.clone()) as Box<dyn sqlx::Encode<'static, sqlx::Postgres> + Send>);
            param_index += 1;
        }
        
        if let Some(search) = &filter.search {
            where_conditions.push(format!("(name ILIKE ${} OR description ILIKE ${})", param_index, param_index + 1));
            let search_pattern = format!("%{}%", search);
            params.push(Box::new(search_pattern.clone()) as Box<dyn sqlx::Encode<'static, sqlx::Postgres> + Send>);
            params.push(Box::new(search_pattern) as Box<dyn sqlx::Encode<'static, sqlx::Postgres> + Send>);
            param_index += 2;
        }
        
        let where_clause = if !where_conditions.is_empty() {
            format!("WHERE {}", where_conditions.join(" AND "))
        } else {
            String::new()
        };
        
        // 获取总数
        let count_query = format!("SELECT COUNT(*) as total FROM projects {}", where_clause);
        let count_row = self.pool.execute_query(&count_query, &params).await?;
        let total: i64 = count_row.get("total");
        
        // 获取分页结果
        let mut query = format!("SELECT * FROM projects {} ORDER BY created_at DESC", where_clause);
        
        if let Some(limit) = filter.limit {
            query.push_str(&format!(" LIMIT {}", limit));
        }
        
        if let Some(offset) = filter.offset {
            query.push_str(&format!(" OFFSET {}", offset));
        }
        
        let projects = self.pool.execute_query(&query, &params).await?;
        Ok((projects, total))
    }
}

pub struct TaskService {
    pool: DatabaseManager,
}

impl TaskService {
    pub fn new(pool: DatabaseManager) -> Self {
        TaskService { pool }
    }
    
    #[instrument(skip(self))]
    pub async fn create_task(&self, request: &CreateTaskRequest, created_by: Uuid) -> Result<Task, sqlx::Error> {
        let task = sqlx::query!(
            r#"
            INSERT INTO tasks (
                id, title, description, project_id, assignee_id, created_by_id, 
                status, priority, task_type, estimated_hours, due_date, progress
            )
            VALUES (
                gen_random_uuid(), $1, $2, $3, $4, $5, 'todo', $6, $7, $8, $9, 0
            )
            RETURNING *
            "#,
            request.title,
            request.description,
            request.project_id,
            request.assignee_id,
            created_by,
            request.priority as TaskPriority,
            request.task_type as TaskType,
            request.estimated_hours,
            request.due_date
        )
        .fetch_one(&self.pool.pool)
        .await?;
        
        Ok(Task::from_row(&task)?)
    }
    
    #[instrument(skip(self))]
    pub async fn get_task_by_id(&self, task_id: &Uuid) -> Result<Option<Task>, sqlx::Error> {
        let task =        let task = sqlx::query!(
            "SELECT * FROM tasks WHERE id = $1",
            task_id
        )
        .fetch_optional(&self.pool.pool)
        .await?;
        
        Ok(task.map(|row| Task::from_row(&row).unwrap()))
    }
    
    #[instrument(skip(self))]
    pub async fn get_tasks_by_project(&self, project_id: &Uuid) -> Result<Vec<Task>, sqlx::Error> {
        let tasks = sqlx::query!(
            "SELECT * FROM tasks WHERE project_id = $1 ORDER BY created_at DESC",
            project_id
        )
        .fetch_all(&self.pool.pool)
        .await?;
        
        Ok(tasks.into_iter().map(|row| Task::from_row(&row).unwrap()).collect())
    }
    
    #[instrument(skip(self))]
    pub async fn get_tasks_by_user(&self, user_id: &Uuid) -> Result<Vec<Task>, sqlx::Error> {
        let tasks = sqlx::query!(
            "SELECT * FROM tasks WHERE assignee_id = $1 ORDER BY created_at DESC",
            user_id
        )
        .fetch_all(&self.pool.pool)
        .await?;
        
        Ok(tasks.into_iter().map(|row| Task::from_row(&row).unwrap()).collect())
    }
    
    #[instrument(skip(self))]
    pub async fn update_task(&self, task_id: &Uuid, request: &UpdateTaskRequest) -> Result<Task, sqlx::Error> {
        // 检查任务是否存在
        let task = self.get_task_by_id(task_id).await?.ok_or_else(|| 
            sqlx::Error::RowNotFound
        )?;
        
        // 更新任务
        let updated_task = sqlx::query!(
            r#"
            UPDATE tasks 
            SET 
                title = COALESCE($1, title),
                description = COALESCE($2, description),
                assignee_id = COALESCE($3, assignee_id),
                status = COALESCE($4, status),
                priority = COALESCE($5, priority),
                due_date = COALESCE($6, due_date),
                progress = COALESCE($7, progress),
                updated_at = NOW(),
                completed_at = CASE 
                    WHEN $8 = 'completed' AND status != 'completed' THEN NOW()
                    ELSE completed_at
                END
            WHERE id = $9
            RETURNING *
            "#,
            request.title,
            request.description,
            request.assignee_id,
            request.status.map(|s| s as TaskStatus),
            request.priority.map(|p| p as TaskPriority),
            request.due_date,
            request.progress,
            request.status.as_ref().map(|s| s.to_string()),
            task_id
        )
        .fetch_one(&self.pool.pool)
        .await?;
        
        Ok(Task::from_row(&updated_task)?)
    }
    
    #[instrument(skip(self))]
    pub async fn update_task_status(&self, task_id: &Uuid, status: TaskStatus) -> Result<(), sqlx::Error> {
        let completed_at = if status == TaskStatus::Completed {
            Some(Utc::now())
        } else {
            None
        };
        
        sqlx::query!(
            r#"
            UPDATE tasks 
            SET status = $1, completed_at = $2, updated_at = NOW()
            WHERE id = $2
            "#,
            status as TaskStatus,
            completed_at,
            task_id
        )
        .execute(&self.pool.pool)
        .await?;
        
        Ok(())
    }
    
    #[instrument(skip(self))]
    pub async fn log_time(&self, request: &CreateTimeEntryRequest, user_id: Uuid) -> Result<TimeEntry, sqlx::Error> {
        // 创建时间记录
        let time_entry = sqlx::query!(
            r#"
            INSERT INTO time_entries (id, task_id, user_id, hours, description, date)
            VALUES (gen_random_uuid(), $1, $2, $3, $4, $5)
            RETURNING *
            "#,
            request.task_id,
            user_id,
            request.hours,
            request.description,
            request.date
        )
        .fetch_one(&self.pool.pool)
        .await?;
        
        // 更新任务的实际工时
        sqlx::query!(
            r#"
            UPDATE tasks 
            SET actual_hours = actual_hours + $1, updated_at = NOW()
            WHERE id = $2
            "#,
            request.hours,
            request.task_id
        )
        .execute(&self.pool.pool)
        .await?;
        
        Ok(TimeEntry::from_row(&time_entry)?)
    }
    
    #[instrument(skip(self))]
    pub async fn filter_tasks(&self, filter: &TaskFilter) -> Result<(Vec<Task>, i64), sqlx::Error> {
        let mut where_conditions = Vec::new();
        let mut params: Vec<Box<dyn sqlx::Encode<'static, sqlx::Postgres> + Send>> = Vec::new();
        let mut param_index = 1;
        
        if let Some(project_id) = &filter.project_id {
            where_conditions.push(format!("project_id = ${}", param_index));
            params.push(Box::new(*project_id) as Box<dyn sqlx::Encode<'static, sqlx::Postgres> + Send>);
            param_index += 1;
        }
        
        if let Some(assignee_id) = &filter.assignee_id {
            where_conditions.push(format!("assignee_id = ${}", param_index));
            params.push(Box::new(*assignee_id) as Box<dyn sqlx::Encode<'static, sqlx::Postgres> + Send>);
            param_index += 1;
        }
        
        if let Some(status) = &filter.status {
            where_conditions.push(format!("status = ${}", param_index));
            params.push(Box::new(status.clone()) as Box<dyn sqlx::Encode<'static, sqlx::Postgres> + Send>);
            param_index += 1;
        }
        
        if let Some(priority) = &filter.priority {
            where_conditions.push(format!("priority = ${}", param_index));
            params.push(Box::new(priority.clone()) as Box<dyn sqlx::Encode<'static, sqlx::Postgres> + Send>);
            param_index += 1;
        }
        
        if let Some(task_type) = &filter.task_type {
            where_conditions.push(format!("task_type = ${}", param_index));
            params.push(Box::new(task_type.clone()) as Box<dyn sqlx::Encode<'static, sqlx::Postgres> + Send>);
            param_index += 1;
        }
        
        if let Some(due_date_from) = &filter.due_date_from {
            where_conditions.push(format!("due_date >= ${}", param_index));
            params.push(Box::new(*due_date_from) as Box<dyn sqlx::Encode<'static, sqlx::Postgres> + Send>);
            param_index += 1;
        }
        
        if let Some(due_date_to) = &filter.due_date_to {
            where_conditions.push(format!("due_date <= ${}", param_index));
            params.push(Box::new(*due_date_to) as Box<dyn sqlx::Encode<'static, sqlx::Postgres> + Send>);
            param_index += 1;
        }
        
        if let Some(created_by) = &filter.created_by {
            where_conditions.push(format!("created_by_id = ${}", param_index));
            params.push(Box::new(*created_by) as Box<dyn sqlx::Encode<'static, sqlx::Postgres> + Send>);
            param_index += 1;
        }
        
        if let Some(search) = &filter.search {
            where_conditions.push(format!(
                "(title ILIKE ${} OR description ILIKE ${})", 
                param_index, 
                param_index + 1
            ));
            let search_pattern = format!("%{}%", search);
            params.push(Box::new(search_pattern.clone()) as Box<dyn sqlx::Encode<'static, sqlx::Postgres> + Send>);
            params.push(Box::new(search_pattern) as Box<dyn sqlx::Encode<'static, sqlx::Postgres> + Send>);
            param_index += 2;
        }
        
        let where_clause = if !where_conditions.is_empty() {
            format!("WHERE {}", where_conditions.join(" AND "))
        } else {
            String::new()
        };
        
        // 获取总数
        let count_query = format!("SELECT COUNT(*) as total FROM tasks {}", where_clause);
        let count_row = self.pool.execute_query(&count_query, &params).await?;
        let total: i64 = count_row.get("total");
        
        // 获取分页结果
        let mut query = format!("SELECT * FROM tasks {} ORDER BY created_at DESC", where_clause);
        
        if let Some(limit) = filter.limit {
            query.push_str(&format!(" LIMIT {}", limit));
        }
        
        if let Some(offset) = filter.offset {
            query.push_str(&format!(" OFFSET {}", offset));
        }
        
        let tasks = self.pool.execute_query(&query, &params).await?;
        Ok((tasks, total))
    }
    
    #[instrument(skip(self))]
    pub async fn get_task_statistics(&self, project_id: &Uuid) -> Result<TaskStatistics, sqlx::Error> {
        let stats = sqlx::query!(
            r#"
            SELECT 
                COUNT(*) as total_tasks,
                COUNT(CASE WHEN status = 'completed' THEN 1 END) as completed_tasks,
                COUNT(CASE WHEN status = 'in_progress' THEN 1 END) as in_progress_tasks,
                COUNT(CASE WHEN status = 'todo' THEN 1 END) as todo_tasks,
                COUNT(CASE WHEN status = 'blocked' THEN 1 END) as blocked_tasks,
                AVG(progress) as average_progress,
                SUM(actual_hours) as total_hours,
                COUNT(CASE WHEN due_date < NOW() AND status != 'completed' THEN 1 END) as overdue_tasks
            FROM tasks 
            WHERE project_id = $1
            "#,
            project_id
        )
        .fetch_one(&self.pool.pool)
        .await?;
        
        Ok(TaskStatistics {
            total_tasks: stats.total_tasks,
            completed_tasks: stats.completed_tasks,
            in_progress_tasks: stats.in_progress_tasks,
            todo_tasks: stats.todo_tasks,
            blocked_tasks: stats.blocked_tasks,
            average_progress: stats.average_progress.unwrap_or(0.0),
            total_hours: stats.total_hours.unwrap_or(0.0),
            overdue_tasks: stats.overdue_tasks,
        })
    }
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TaskStatistics {
    pub total_tasks: i64,
    pub completed_tasks: i64,
    pub in_progress_tasks: i64,
    pub todo_tasks: i64,
    pub blocked_tasks: i64,
    pub average_progress: f64,
    pub total_hours: f64,
    pub overdue_tasks: i64,
}

pub struct NotificationService {
    pool: DatabaseManager,
}

impl NotificationService {
    pub fn new(pool: DatabaseManager) -> Self {
        NotificationService { pool }
    }
    
    #[instrument(skip(self))]
    pub async fn create_notification(
        &self,
        user_id: Uuid,
        title: String,
        message: String,
        notification_type: NotificationType,
        related_id: Option<Uuid>
    ) -> Result<Notification, sqlx::Error> {
        let notification = sqlx::query!(
            r#"
            INSERT INTO notifications (id, user_id, title, message, notification_type, is_read, related_id)
            VALUES (gen_random_uuid(), $1, $2, $3, $4, false, $5)
            RETURNING *
            "#,
            user_id,
            title,
            message,
            notification_type as NotificationType,
            related_id
        )
        .fetch_one(&self.pool.pool)
        .await?;
        
        Ok(Notification::from_row(&notification)?)
    }
    
    #[instrument(skip(self))]
    pub async fn get_user_notifications(&self, user_id: &Uuid) -> Result<Vec<Notification>, sqlx::Error> {
        let notifications = sqlx::query!(
            "SELECT * FROM notifications WHERE user_id = $1 ORDER BY created_at DESC",
            user_id
        )
        .fetch_all(&self.pool.pool)
        .await?;
        
        Ok(notifications.into_iter().map(|row| Notification::from_row(&row).unwrap()).collect())
    }
    
    #[instrument(skip(self))]
    pub async fn mark_notification_read(&self, notification_id: &Uuid) -> Result<(), sqlx::Error> {
        sqlx::query!(
            "UPDATE notifications SET is_read = true WHERE id = $1",
            notification_id
        )
        .execute(&self.pool.pool)
        .await?;
        
        Ok(())
    }
    
    #[instrument(skip(self))]
    pub async fn mark_all_notifications_read(&self, user_id: &Uuid) -> Result<(), sqlx::Error> {
        sqlx::query!(
            "UPDATE notifications SET is_read = true WHERE user_id = $1 AND is_read = false",
            user_id
        )
        .execute(&self.pool.pool)
        .await?;
        
        Ok(())
    }
    
    // 自动生成通知
    #[instrument(skip(self))]
    pub async fn notify_task_assigned(&self, task: &Task, assignee_id: Uuid) -> Result<(), sqlx::Error> {
        self.create_notification(
            assignee_id,
            "任务分配".to_string(),
            format!("您被分配了新任务:{}", task.title),
            NotificationType::TaskAssigned,
            Some(task.id)
        ).await?;
        
        Ok(())
    }
    
    #[instrument(skip(self))]
    pub async fn notify_task_overdue(&self, task: &Task) -> Result<(), sqlx::Error> {
        if let Some(assignee_id) = task.assignee_id {
            self.create_notification(
                assignee_id,
                "任务逾期".to_string(),
                format!("任务 \"{}\" 已逾期", task.title),
                NotificationType::TaskOverdue,
                Some(task.id)
            ).await?;
        }
        
        Ok(())
    }
}
}
#![allow(unused)]
fn main() {
// 报告和分析服务
// File: task-manager/src/analytics.rs
use super::models::*;
use crate::database::DatabaseManager;
use chrono::{Duration, Utc};
use serde::{Serialize, Deserialize};

pub struct AnalyticsService {
    pool: DatabaseManager,
}

impl AnalyticsService {
    pub fn new(pool: DatabaseManager) -> Self {
        AnalyticsService { pool }
    }
    
    pub async fn get_project_analytics(&self, project_id: &Uuid) -> Result<ProjectAnalytics, sqlx::Error> {
        let analytics = sqlx::query!(
            r#"
            WITH task_stats AS (
                SELECT 
                    COUNT(*) as total_tasks,
                    COUNT(CASE WHEN status = 'completed' THEN 1 END) as completed_tasks,
                    COUNT(CASE WHEN status = 'in_progress' THEN 1 END) as in_progress_tasks,
                    COUNT(CASE WHEN status = 'todo' THEN 1 END) as todo_tasks,
                    COUNT(CASE WHEN status = 'blocked' THEN 1 END) as blocked_tasks,
                    COUNT(CASE WHEN due_date < NOW() AND status != 'completed' THEN 1 END) as overdue_tasks,
                    AVG(progress) as average_progress,
                    SUM(estimated_hours) as total_estimated_hours,
                    SUM(actual_hours) as total_actual_hours,
                    AVG(EXTRACT(EPOCH FROM (COALESCE(completed_at, NOW()) - created_at))/3600) as average_completion_time_hours
                FROM tasks
                WHERE project_id = $1
            ),
            user_performance AS (
                SELECT 
                    assignee_id,
                    COUNT(*) as tasks_assigned,
                    COUNT(CASE WHEN status = 'completed' THEN 1 END) as tasks_completed,
                    SUM(actual_hours) as total_hours_logged,
                    AVG(progress) as average_progress
                FROM tasks
                WHERE project_id = $1 AND assignee_id IS NOT NULL
                GROUP BY assignee_id
            )
            SELECT 
                ts.*,
                json_agg(
                    json_build_object(
                        'user_id', up.assignee_id,
                        'tasks_assigned', up.tasks_assigned,
                        'tasks_completed', up.tasks_completed,
                        'total_hours_logged', up.total_hours_logged,
                        'average_progress', up.average_progress
                    )
                ) FILTER (WHERE up.assignee_id IS NOT NULL) as user_performance
            FROM task_stats ts
            LEFT JOIN user_performance up ON true
            GROUP BY ts.total_tasks, ts.completed_tasks, ts.in_progress_tasks, ts.todo_tasks, 
                     ts.blocked_tasks, ts.overdue_tasks, ts.average_progress, 
                     ts.total_estimated_hours, ts.total_actual_hours, ts.average_completion_time_hours
            "#,
            project_id
        )
        .fetch_one(&self.pool.pool)
        .await?;
        
        Ok(ProjectAnalytics {
            total_tasks: analytics.total_tasks,
            completed_tasks: analytics.completed_tasks,
            in_progress_tasks: analytics.in_progress_tasks,
            todo_tasks: analytics.todo_tasks,
            blocked_tasks: analytics.blocked_tasks,
            overdue_tasks: analytics.overdue_tasks,
            average_progress: analytics.average_progress.unwrap_or(0.0),
            total_estimated_hours: analytics.total_estimated_hours.unwrap_or(0.0),
            total_actual_hours: analytics.total_actual_hours.unwrap_or(0.0),
            average_completion_time_hours: analytics.average_completion_time_hours.unwrap_or(0.0),
            user_performance: analytics.user_performance.unwrap_or_default(),
        })
    }
    
    pub async fn get_team_analytics(&self, team_id: &Uuid, start_date: DateTime<Utc>, end_date: DateTime<Utc>) -> Result<TeamAnalytics, sqlx::Error> {
        let analytics = sqlx::query!(
            r#"
            WITH project_stats AS (
                SELECT 
                    COUNT(*) as total_projects,
                    COUNT(CASE WHEN status = 'active' THEN 1 END) as active_projects,
                    COUNT(CASE WHEN status = 'completed' THEN 1 END) as completed_projects,
                    COUNT(CASE WHEN status = 'cancelled' THEN 1 END) as cancelled_projects
                FROM projects
                WHERE team_id = $1 AND created_at BETWEEN $2 AND $3
            ),
            task_trends AS (
                SELECT 
                    DATE_TRUNC('week', created_at) as week,
                    COUNT(*) as tasks_created,
                    COUNT(CASE WHEN status = 'completed' THEN 1 END) as tasks_completed
                FROM tasks t
                JOIN projects p ON t.project_id = p.id
                WHERE p.team_id = $1 AND t.created_at BETWEEN $2 AND $3
                GROUP BY DATE_TRUNC('week', t.created_at)
                ORDER BY week
            ),
            time_tracking AS (
                SELECT 
                    user_id,
                    DATE_TRUNC('day', date) as day,
                    SUM(hours) as total_hours
                FROM time_entries te
                JOIN tasks t ON te.task_id = t.id
                JOIN projects p ON t.project_id = p.id
                WHERE p.team_id = $1 AND te.date BETWEEN $2 AND $3
                GROUP BY user_id, DATE_TRUNC('day', te.date)
                ORDER BY day, user_id
            )
            SELECT 
                ps.*,
                json_agg(
                    json_build_object(
                        'week', tt.week,
                        'tasks_created', tt.tasks_created,
                        'tasks_completed', tt.tasks_completed
                    )
                ) as task_trends,
                json_agg(
                    json_build_object(
                        'day', tt2.day,
                        'user_id', tt2.user_id,
                        'total_hours', tt2.total_hours
                    )
                ) as time_tracking
            FROM project_stats ps
            LEFT JOIN task_trends tt ON true
            LEFT JOIN time_tracking tt2 ON true
            GROUP BY ps.total_projects, ps.active_projects, ps.completed_projects, ps.cancelled_projects
            "#,
            team_id, start_date, end_date
        )
        .fetch_one(&self.pool.pool)
        .await?;
        
        Ok(TeamAnalytics {
            total_projects: analytics.total_projects,
            active_projects: analytics.active_projects,
            completed_projects: analytics.completed_projects,
            cancelled_projects: analytics.cancelled_projects,
            task_trends: analytics.task_trends.unwrap_or_default(),
            time_tracking: analytics.time_tracking.unwrap_or_default(),
        })
    }
    
    pub async fn get_user_productivity_report(&self, user_id: &Uuid, days: i64) -> Result<UserProductivityReport, sqlx::Error> {
        let end_date = Utc::now();
        let start_date = end_date - Duration::days(days);
        
        let report = sqlx::query!(
            r#"
            WITH daily_stats AS (
                SELECT 
                    DATE(t.created_at) as date,
                    COUNT(*) as tasks_created,
                    COUNT(CASE WHEN t.status = 'completed' THEN 1 END) as tasks_completed
                FROM tasks t
                WHERE t.created_by_id = $1 AND t.created_at BETWEEN $2 AND $3
                GROUP BY DATE(t.created_at)
                ORDER BY date
            ),
            hourly_stats AS (
                SELECT 
                    DATE(te.date) as date,
                    EXTRACT(hour FROM te.date) as hour,
                    SUM(te.hours) as hours_logged
                FROM time_entries te
                WHERE te.user_id = $1 AND te.date BETWEEN $2 AND $3
                GROUP BY DATE(te.date), EXTRACT(hour FROM te.date)
                ORDER BY date, hour
            )
            SELECT 
                COALESCE(SUM(ds.tasks_created), 0) as total_tasks_created,
                COALESCE(SUM(ds.tasks_completed), 0) as total_tasks_completed,
                COALESCE(SUM(hs.hours_logged), 0) as total_hours_logged,
                COALESCE(AVG(hs.hours_logged), 0) as average_daily_hours,
                json_agg(
                    json_build_object(
                        'date', ds.date,
                        'tasks_created', ds.tasks_created,
                        'tasks_completed', ds.tasks_completed
                    )
                ) as daily_task_stats,
                json_agg(
                    json_build_object(
                        'date', hs.date,
                        'hour', hs.hour,
                        'hours_logged', hs.hours_logged
                    )
                ) FILTER (WHERE hs.date IS NOT NULL) as hourly_stats
            FROM daily_stats ds
            FULL OUTER JOIN (
                SELECT date, NULL as tasks_created, NULL as tasks_completed
                FROM (
                    SELECT DISTINCT DATE(date) as date
                    FROM time_entries
                    WHERE user_id = $1 AND date BETWEEN $2 AND $3
                ) dates
            ) unique_dates ON ds.date = unique_dates.date
            LEFT JOIN (
                SELECT 
                    date, 
                    SUM(hours_logged) as hours_logged
                FROM (
                    SELECT DATE(te.date) as date, SUM(te.hours) as hours_logged
                    FROM time_entries te
                    WHERE te.user_id = $1 AND te.date BETWEEN $2 AND $3
                    GROUP BY DATE(te.date)
                ) daily_hours
                GROUP BY date
            ) hs ON ds.date = hs.date
            GROUP BY ds.date
            "#,
            user_id, start_date, end_date
        )
        .fetch_one(&self.pool.pool)
        .await?;
        
        Ok(UserProductivityReport {
            total_tasks_created: report.total_tasks_created,
            total_tasks_completed: report.total_tasks_completed,
            total_hours_logged: report.total_hours_logged,
            average_daily_hours: report.average_daily_hours,
            daily_task_stats: report.daily_task_stats.unwrap_or_default(),
            hourly_stats: report.hourly_stats.unwrap_or_default(),
        })
    }
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ProjectAnalytics {
    pub total_tasks: i64,
    pub completed_tasks: i64,
    pub in_progress_tasks: i64,
    pub todo_tasks: i64,
    pub blocked_tasks: i64,
    pub overdue_tasks: i64,
    pub average_progress: f64,
    pub total_estimated_hours: f64,
    pub total_actual_hours: f64,
    pub average_completion_time_hours: f64,
    pub user_performance: Vec<serde_json::Value>,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TeamAnalytics {
    pub total_projects: i64,
    pub active_projects: i64,
    pub completed_projects: i64,
    pub cancelled_projects: i64,
    pub task_trends: Vec<serde_json::Value>,
    pub time_tracking: Vec<serde_json::Value>,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct UserProductivityReport {
    pub total_tasks_created: i64,
    pub total_tasks_completed: i64,
    pub total_hours_logged: f64,
    pub average_daily_hours: f64,
    pub daily_task_stats: Vec<serde_json::Value>,
    pub hourly_stats: Vec<serde_json::Value>,
}
}
// 主应用程序
// File: task-manager/src/main.rs
use clap::{Parser, Subcommand};
use tracing::{info, warn, error, Level};
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
use std::sync::Arc;
use tokio::sync::RwLock;

mod models;
mod services;
mod database;
mod analytics;
mod web;

use models::*;
use services::*;
use database::DatabaseManager;
use analytics::AnalyticsService;
use web::WebServer;

#[derive(Parser, Debug)]
#[command(name = "task-manager")]
#[command(about = "Enterprise Task Management System")]
struct Cli {
    #[command(subcommand)]
    command: Commands,
}

#[derive(Subcommand, Debug)]
enum Commands {
    /// Start the web server
    Server {
        #[arg(short, long, default_value = "0.0.0.0:3000")]
        addr: String,
        
        #[arg(short, long, default_value = "postgres://task_user:password@localhost/task_manager")]
        database_url: String,
        
        #[arg(short, long, default_value = "redis://localhost:6379")]
        redis_url: String,
    },
    /// Run database migrations
    Migrate {
        #[arg(short, long, default_value = "postgres://task_user:password@localhost/task_manager")]
        database_url: String,
    },
    /// Create database and run migrations
    Setup {
        #[arg(short, long, default_value = "postgres://task_user:password@localhost/task_manager")]
        database_url: String,
    },
    /// Generate analytics report
    Analytics {
        #[arg(short, long)]
        project_id: String,
        
        #[arg(short, long, default_value = "postgres://task_user:password@localhost/task_manager")]
        database_url: String,
    },
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // 初始化日志
    tracing_subscriber::registry()
        .with(
            tracing_subscriber::EnvFilter::try_from_default_env()
                .unwrap_or_else(|_| "task_manager=debug,tokio=warn,sqlx=warn".into()),
        )
        .with(tracing_subscriber::fmt::layer())
        .init();
    
    let cli = Cli::parse();
    
    match cli.command {
        Commands::Server { addr, database_url, redis_url } => {
            run_server(addr, database_url, redis_url).await
        }
        Commands::Migrate { database_url } => {
            run_migrations(database_url).await
        }
        Commands::Setup { database_url } => {
            setup_database(database_url).await
        }
        Commands::Analytics { project_id, database_url } => {
            run_analytics(&project_id, database_url).await
        }
    }
}

#[instrument]
async fn run_server(
    addr: String,
    database_url: String,
    redis_url: String,
) -> Result<(), Box<dyn std::error::Error>> {
    info!("Starting Task Manager server on {}", addr);
    
    // 初始化数据库
    let database_manager = DatabaseManager::new(&database_url).await?;
    let user_service = Arc::new(UserService::new(database_manager.clone()));
    let project_service = Arc::new(ProjectService::new(database_manager.clone()));
    let task_service = Arc::new(TaskService::new(database_manager.clone()));
    let notification_service = Arc::new(NotificationService::new(database_manager.clone()));
    let analytics_service = Arc::new(AnalyticsService::new(database_manager.clone()));
    
    // 启动Web服务器
    let server = WebServer::new(
        addr,
        user_service,
        project_service,
        task_service,
        notification_service,
        analytics_service,
    );
    
    info!("Task Manager server started successfully");
    server.run().await?;
    
    Ok(())
}

#[instrument]
async fn run_migrations(database_url: String) -> Result<(), Box<dyn std::error::Error>> {
    info!("Running database migrations");
    
    let pool = sqlx::PgPool::connect(&database_url).await?;
    let mut migration_manager = crate::database::MigrationManager::new(pool, None);
    
    // 添加迁移
    migration_manager.add_migration(
        "20240101000000_create_users_table",
        "Create users table",
        r#"
        CREATE TYPE user_role AS ENUM ('admin', 'manager', 'member', 'viewer');
        
        CREATE TABLE users (
            id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
            username VARCHAR(50) UNIQUE NOT NULL,
            email VARCHAR(100) UNIQUE NOT NULL,
            display_name VARCHAR(100) NOT NULL,
            password_hash VARCHAR(255) NOT NULL,
            is_active BOOLEAN DEFAULT TRUE,
            role user_role NOT NULL DEFAULT 'member',
            created_at TIMESTAMPTZ DEFAULT NOW(),
            updated_at TIMESTAMPTZ DEFAULT NOW(),
            last_login TIMESTAMPTZ
        );
        
        CREATE INDEX idx_users_username ON users(username);
        CREATE INDEX idx_users_email ON users(email);
        "#,
    )?;
    
    migration_manager.add_migration(
        "20240101000001_create_teams_table",
        "Create teams table",
        r#"
        CREATE TABLE teams (
            id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
            name VARCHAR(100) NOT NULL,
            description TEXT,
            owner_id UUID NOT NULL REFERENCES users(id) ON DELETE CASCADE,
            created_at TIMESTAMPTZ DEFAULT NOW(),
            updated_at TIMESTAMPTZ DEFAULT NOW()
        );
        
        CREATE INDEX idx_teams_owner ON teams(owner_id);
        "#,
    )?;
    
    migration_manager.add_migration(
        "20240101000002_create_projects_table",
        "Create projects table",
        r#"
        CREATE TYPE project_status AS ENUM ('planning', 'active', 'on_hold', 'completed', 'cancelled');
        CREATE TYPE project_priority AS ENUM ('low', 'medium', 'high', 'critical');
        
        CREATE TABLE projects (
            id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
            name VARCHAR(100) NOT NULL,
            description TEXT,
            team_id UUID NOT NULL REFERENCES teams(id) ON DELETE CASCADE,
            owner_id UUID NOT NULL REFERENCES users(id) ON DELETE CASCADE,
            status project_status NOT NULL DEFAULT 'planning',
            priority project_priority NOT NULL DEFAULT 'medium',
            start_date TIMESTAMPTZ,
            end_date TIMESTAMPTZ,
            created_at TIMESTAMPTZ DEFAULT NOW(),
            updated_at TIMESTAMPTZ DEFAULT NOW()
        );
        
        CREATE INDEX idx_projects_team ON projects(team_id);
        CREATE INDEX idx_projects_owner ON projects(owner_id);
        CREATE INDEX idx_projects_status ON projects(status);
        "#,
    )?;
    
    migration_manager.add_migration(
        "20240101000003_create_tasks_table",
        "Create tasks table",
        r#"
        CREATE TYPE task_status AS ENUM ('todo', 'in_progress', 'in_review', 'testing', 'completed', 'cancelled', 'blocked');
        CREATE TYPE task_priority AS ENUM ('low', 'medium', 'high', 'urgent');
        CREATE TYPE task_type AS ENUM ('feature', 'bug_fix', 'documentation', 'design', 'testing', 'research', 'other');
        
        CREATE TABLE tasks (
            id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
            title VARCHAR(200) NOT NULL,
            description TEXT,
            project_id UUID NOT NULL REFERENCES projects(id) ON DELETE CASCADE,
            assignee_id UUID REFERENCES users(id) ON DELETE SET NULL,
            created_by_id UUID NOT NULL REFERENCES users(id) ON DELETE CASCADE,
            status task_status NOT NULL DEFAULT 'todo',
            priority task_priority NOT NULL DEFAULT 'medium',
            task_type task_type NOT NULL DEFAULT 'other',
            estimated_hours DECIMAL(8,2),
            actual_hours DECIMAL(8,2) DEFAULT 0,
            due_date TIMESTAMPTZ,
            completed_at TIMESTAMPTZ,
            created_at TIMESTAMPTZ DEFAULT NOW(),
            updated_at TIMESTAMPTZ DEFAULT NOW(),
            progress INTEGER DEFAULT 0 CHECK (progress >= 0 AND progress <= 100)
        );
        
        CREATE INDEX idx_tasks_project ON tasks(project_id);
        CREATE INDEX idx_tasks_assignee ON tasks(assignee_id);
        CREATE INDEX idx_tasks_status ON tasks(status);
        CREATE INDEX idx_tasks_due_date ON tasks(due_date);
        CREATE INDEX idx_tasks_created_at ON tasks(created_at);
        
        -- 添加约束
        ALTER TABLE tasks ADD CONSTRAINT chk_estimated_hours_positive 
        CHECK (estimated_hours > 0 OR estimated_hours IS NULL);
        ALTER TABLE tasks ADD CONSTRAINT chk_actual_hours_non_negative 
        CHECK (actual_hours >= 0);
        "#,
    )?;
    
    migration_manager.add_migration(
        "20240101000004_create_time_entries_table",
        "Create time entries table",
        r#"
        CREATE TABLE time_entries (
            id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
            task_id UUID NOT NULL REFERENCES tasks(id) ON DELETE CASCADE,
            user_id UUID NOT NULL REFERENCES users(id) ON DELETE CASCADE,
            hours DECIMAL(5,2) NOT NULL,
            description TEXT,
            date TIMESTAMPTZ NOT NULL,
            created_at TIMESTAMPTZ DEFAULT NOW()
        );
        
        CREATE INDEX idx_time_entries_task ON time_entries(task_id);
        CREATE INDEX idx_time_entries_user ON time_entries(user_id);
        CREATE INDEX idx_time_entries_date ON time_entries(date);
        
        ALTER TABLE tasks ADD CONSTRAINT chk_hours_positive CHECK (hours > 0);
        "#,
    )?;
    
    migration_manager.add_migration(
        "20240101000005_create_notifications_table",
        "Create notifications table",
        r#"
        CREATE TYPE notification_type AS ENUM ('task_assigned', 'task_completed', 'task_overdue', 'project_updated', 'comment_added', 'system');
        
        CREATE TABLE notifications (
            id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
            user_id UUID NOT NULL REFERENCES users(id) ON DELETE CASCADE,
            title VARCHAR(200) NOT NULL,
            message TEXT NOT NULL,
            notification_type notification_type NOT NULL,
            is_read BOOLEAN DEFAULT FALSE,
            related_id UUID,
            created_at TIMESTAMPTZ DEFAULT NOW()
        );
        
        CREATE INDEX idx_notifications_user ON notifications(user_id);
        CREATE INDEX idx_notifications_is_read ON notifications(is_read);
        CREATE INDEX idx_notifications_created_at ON notifications(created_at);
        "#,
    )?;
    
    // 运行迁移
    migration_manager.run_migrations().await?;
    
    info!("Database migrations completed successfully");
    Ok(())
}

#[instrument]
async fn setup_database(database_url: String) -> Result<(), Box<dyn std::error::Error>> {
    info!("Setting up database and running migrations");
    
    // 先运行迁移
    run_migrations(database_url.clone()).await?;
    
    // 创建默认管理员用户
    let pool = sqlx::PgPool::connect(&database_url).await?;
    let admin_password = "admin123"; // 生产环境中应该使用环境变量
    
    sqlx::query!(
        r#"
        INSERT INTO users (username, email, display_name, password_hash, role, is_active)
        VALUES ('admin', 'admin@example.com', 'Administrator', $1, 'admin', true)
        ON CONFLICT (username) DO NOTHING
        "#,
        bcrypt::hash(&admin_password, bcrypt::DEFAULT_COST)?
    )
    .execute(&pool)
    .await?;
    
    info!("Default admin user created - username: admin, password: admin123");
    info!("Please change the admin password after first login");
    
    Ok(())
}

#[instrument]
async fn run_analytics(project_id: &str, database_url: String) -> Result<(), Box<dyn std::error::Error>> {
    info!("Generating analytics report for project: {}", project_id);
    
    let database_manager = DatabaseManager::new(&database_url).await?;
    let analytics_service = AnalyticsService::new(database_manager);
    
    let project_uuid = uuid::Uuid::parse_str(project_id)?;
    let analytics = analytics_service.get_project_analytics(&project_uuid).await?;
    
    // 输出分析结果
    println!("=== Project Analytics Report ===");
    println!("Total Tasks: {}", analytics.total_tasks);
    println!("Completed Tasks: {}", analytics.completed_tasks);
    println!("In Progress Tasks: {}", analytics.in_progress_tasks);
    println!("Todo Tasks: {}", analytics.todo_tasks);
    println!("Blocked Tasks: {}", analytics.blocked_tasks);
    println!("Overdue Tasks: {}", analytics.overdue_tasks);
    println!("Average Progress: {:.1}%", analytics.average_progress);
    println!("Total Estimated Hours: {:.1}", analytics.total_estimated_hours);
    println!("Total Actual Hours: {:.1}", analytics.total_actual_hours);
    println!("Average Completion Time: {:.1} hours", analytics.average_completion_time_hours);
    
    if !analytics.user_performance.is_empty() {
        println!("\n=== User Performance ===");
        for user_perf in &analytics.user_performance {
            println!("{}", serde_json::to_string_pretty(user_perf)?);
        }
    }
    
    Ok(())
}
#![allow(unused)]
fn main() {
// Web服务器
// File: task-manager/src/web.rs
use super::services::*;
use super::analytics::*;
use super::models::*;
use axum::{
    extract::{Path, Query, State},
    http::StatusCode,
    response::Json,
    routing::{get, post, put, delete},
    Router,
};
use std::sync::Arc;
use tower::ServiceBuilder;
use tower_http::{cors::CorsLayer, trace::TraceLayer};
use tracing::instrument;

pub struct WebServer {
    app: Router,
    addr: String,
}

impl WebServer {
    pub fn new(
        addr: String,
        user_service: Arc<UserService>,
        project_service: Arc<ProjectService>,
        task_service: Arc<TaskService>,
        notification_service: Arc<NotificationService>,
        analytics_service: Arc<AnalyticsService>,
    ) -> Self {
        let app = Router::new()
            .route("/", get(|| async { "Task Manager API" }))
            .route("/health", get(health_check))
            
            // 用户相关API
            .route("/api/users", get(get_users))
            .route("/api/users", post(create_user))
            .route("/api/users/:id", get(get_user_by_id))
            .route("/api/users/:id", put(update_user))
            .route("/api/users/:id", delete(delete_user))
            
            // 项目相关API
            .route("/api/projects", get(get_projects))
            .route("/api/projects", post(create_project))
            .route("/api/projects/:id", get(get_project_by_id))
            .route("/api/projects/:id", put(update_project))
            .route("/api/projects/:id", delete(delete_project))
            .route("/api/projects/:id/tasks", get(get_project_tasks))
            .route("/api/projects/:id/analytics", get(get_project_analytics))
            
            // 任务相关API
            .route("/api/tasks", get(get_tasks))
            .route("/api/tasks", post(create_task))
            .route("/api/tasks/:id", get(get_task_by_id))
            .route("/api/tasks/:id", put(update_task))
            .route("/api/tasks/:id", delete(delete_task))
            .route("/api/tasks/:id/status", put(update_task_status))
            .route("/api/tasks/:id/time", post(log_time))
            
            // 通知相关API
            .route("/api/notifications", get(get_notifications))
            .route("/api/notifications/:id/read", put(mark_notification_read))
            .route("/api/notifications/read-all", put(mark_all_notifications_read))
            
            // 分析报告API
            .route("/api/analytics/team/:team_id", get(get_team_analytics))
            .route("/api/analytics/user/:user_id", get(get_user_productivity))
            
            .with_state(AppState {
                user_service,
                project_service,
                task_service,
                notification_service,
                analytics_service,
            })
            .layer(
                ServiceBuilder::new()
                    .layer(TraceLayer::new_for_http())
                    .layer(CorsLayer::permissive())
            );
        
        WebServer { app, addr }
    }
    
    pub async fn run(self) -> Result<(), Box<dyn std::error::Error>> {
        let listener = tokio::net::TcpListener::bind(&self.addr).await?;
        println!("Task Manager API server listening on {}", self.addr);
        
        axum::serve(listener, self.app).await?;
        Ok(())
    }
}

#[derive(Clone)]
struct AppState {
    user_service: Arc<UserService>,
    project_service: Arc<ProjectService>,
    task_service: Arc<TaskService>,
    notification_service: Arc<NotificationService>,
    analytics_service: Arc<AnalyticsService>,
}

async fn health_check() -> &'static str {
    "OK"
}

#[instrument(skip(state))]
async fn get_users(State(state): State<AppState>) -> Result<Json<ApiResponse<Vec<User>>>, (StatusCode, String)> {
    match state.user_service.get_all_users().await {
        Ok(users) => Ok(Json(ApiResponse::success(users))),
        Err(e) => Err((StatusCode::INTERNAL_SERVER_ERROR, e.to_string())),
    }
}

#[instrument(skip(state, request))]
async fn create_user(
    State(state): State<AppState>,
    Json(request): Json<CreateUserRequest>,
) -> Result<Json<ApiResponse<User>>, (StatusCode, String)> {
    match state.user_service.create_user(&request).await {
        Ok(user) => Ok(Json(ApiResponse::success(user))),
        Err(e) => Err((StatusCode::BAD_REQUEST, e.to_string())),
    }
}

#[instrument(skip(state))]
async fn get_user_by_id(
    State(state): State<AppState>,
    Path(id): Path<String>,
) -> Result<Json<ApiResponse<Option<User>>>, (StatusCode, String)> {
    match uuid::Uuid::parse_str(&id) {
        Ok(user_id) => {
            match state.user_service.get_user_by_id(&user_id).await {
                Ok(user) => Ok(Json(ApiResponse::success(user))),
                Err(e) => Err((StatusCode::INTERNAL_SERVER_ERROR, e.to_string())),
            }
        }
        Err(_) => Err((StatusCode::BAD_REQUEST, "Invalid UUID".to_string())),
    }
}

#[instrument(skip(state))]
async fn get_projects(
    State(state): State<AppState>,
    Query(params): Query<std::collections::HashMap<String, String>>,
) -> Result<Json<ApiResponse<Vec<Project>>>, (StatusCode, String)> {
    // 简化的过滤实现
    let filter = ProjectFilter {
        team_id: params.get("team_id").and_then(|s| uuid::Uuid::parse_str(s).ok()),
        owner_id: params.get("owner_id").and_then(|s| uuid::Uuid::parse_str(s).ok()),
        status: None,
        priority: None,
        start_date_from: None,
        start_date_to: None,
        end_date_from: None,
        end_date_to: None,
        search: params.get("search").cloned(),
        limit: params.get("limit").and_then(|s| s.parse().ok()),
        offset: params.get("offset").and_then(|s| s.parse().ok()),
    };
    
    match state.project_service.filter_projects(&filter).await {
        Ok((projects, _)) => Ok(Json(ApiResponse::success(projects))),
        Err(e) => Err((StatusCode::INTERNAL_SERVER_ERROR, e.to_string())),
    }
}

#[instrument(skip(state, request))]
async fn create_project(
    State(state): State<AppState>,
    Json(request): Json<CreateProjectRequest>,
) -> Result<Json<ApiResponse<Project>>, (StatusCode, String)> {
    // 从JWT token获取当前用户ID(简化实现)
    let owner_id = uuid::Uuid::new_v4();
    
    match state.project_service.create_project(&request, owner_id).await {
        Ok(project) => Ok(Json(ApiResponse::success(project))),
        Err(e) => Err((StatusCode::BAD_REQUEST, e.to_string())),
    }
}

#[instrument(skip(state))]
async fn get_project_by_id(
    State(state): State<AppState>,
    Path(id): Path<String>,
) -> Result<Json<ApiResponse<Option<Project>>>, (StatusCode, String)> {
    match uuid::Uuid::parse_str(&id) {
        Ok(project_id) => {
            match state.project_service.get_project_by_id(&project_id).await {
                Ok(project) => Ok(Json(ApiResponse::success(project))),
                Err(e) => Err((StatusCode::INTERNAL_SERVER_ERROR, e.to_string())),
            }
        }
        Err(_) => Err((StatusCode::BAD_REQUEST, "Invalid UUID".to_string())),
    }
}

#[instrument(skip(state))]
async fn get_project_tasks(
    State(state): State<AppState>,
    Path(id): Path<String>,
) -> Result<Json<ApiResponse<Vec<Task>>>, (StatusCode, String)> {
    match uuid::Uuid::parse_str(&id) {
        Ok(project_id) => {
            match state.task_service.get_tasks_by_project(&project_id).await {
                Ok(tasks) => Ok(Json(ApiResponse::success(tasks))),
                Err(e) => Err((StatusCode::INTERNAL_SERVER_ERROR, e.to_string())),
            }
        }
        Err(_) => Err((StatusCode::BAD_REQUEST, "Invalid UUID".to_string())),
    }
}

#[instrument(skip(state))]
async fn get_tasks(
    State(state): State<AppState>,
    Query(params): Query<std::collections::HashMap<String, String>>,
) -> Result<Json<ApiResponse<Vec<Task>>>, (StatusCode, String)> {
    let filter = TaskFilter {
        project_id: params.get("project_id").and_then(|s| uuid::Uuid::parse_str(s).ok()),
        assignee_id: params.get("assignee_id").and_then(|s| uuid::Uuid::parse_str(s).ok()),
        status: None,
        priority: None,
        task_type: None,
        due_date_from: None,
        due_date_to: None,
        created_by: None,
        search: params.get("search").cloned(),
        limit: params.get("limit").and_then(|s| s.parse().ok()),
        offset: params.get("offset").and_then(|s| s.parse().ok()),
    };
    
    match state.task_service.filter_tasks(&filter).await {
        Ok((tasks, _)) => Ok(Json(ApiResponse::success(tasks))),
        Err(e) => Err((StatusCode::INTERNAL_SERVER_ERROR, e.to_string())),
    }
}

#[instrument(skip(state, request))]
async fn create_task(
    State(state): State<AppState>,
    Json(request): Json<CreateTaskRequest>,
) -> Result<Json<ApiResponse<Task>>, (StatusCode, String)> {
    let created_by = uuid::Uuid::new_v4(); // 从JWT获取
    
    match state.task_service.create_task(&request, created_by).await {
        Ok(task) => Ok(Json(ApiResponse::success(task))),
        Err(e) => Err((StatusCode::BAD_REQUEST, e.to_string())),
    }
}

#[instrument(skip(state, request))]
async fn update_task(
    State(state): State<AppState>,
    Path(id): Path<String>,
    Json(request): Json<UpdateTaskRequest>,
) -> Result<Json<ApiResponse<Task>>, (StatusCode, String)> {
    match uuid::Uuid::parse_str(&id) {
        Ok(task_id) => {
            match state.task_service.update_task(&task_id, &request).await {
                Ok(task) => Ok(Json(ApiResponse::success(task))),
                Err(e) => Err((StatusCode::BAD_REQUEST, e.to_string())),
            }
        }
        Err(_) => Err((StatusCode::BAD_REQUEST, "Invalid UUID".to_string())),
    }
}

// 其他路由实现...
}
#![allow(unused)]
fn main() {
// 数据库管理模块
// File: task-manager/src/database.rs
use sqlx::{PgPool, Pool, Postgres};
use crate::services::*;
use std::time::Duration;

pub struct DatabaseManager {
    pub pool: Pool<Postgres>,
}

impl DatabaseManager {
    pub async fn new(database_url: &str) -> Result<Self, sqlx::Error> {
        let pool = PgPoolOptions::new()
            .max_connections(20)
            .min_connections(5)
            .max_lifetime(Duration::from_secs(1800))
            .idle_timeout(Duration::from_secs(300))
            .connect_timeout(Duration::from_secs(10))
            .connect(database_url)
            .await?;
        
        Ok(DatabaseManager { pool })
    }
    
    pub async fn execute_query<T>(
        &self,
        query: &str,
        params: &[Box<dyn sqlx::Encode<'static, Postgres> + Send>],
    ) -> Result<T, sqlx::Error>
    where
        T: for<'r> sqlx::FromRow<'r, sqlx::postgres::PgRow> + Send
    {
        let mut query_builder = sqlx::query_as::<_, T>(query);
        
        for param in params {
            // 这里需要类型转换,实际实现会更复杂
            // 简化实现
        }
        
        query_builder.fetch_one(&self.pool).await
    }
    
    pub async fn execute_update(&self, query: &str, params: &[Box<dyn sqlx::Encode<'static, Postgres> + Send>]) -> Result<u64, sqlx::Error> {
        let mut query_builder = sqlx::query(query);
        
        for param in params {
            // 简化实现
        }
        
        query_builder.execute(&self.pool).await.map(|result| result.rows_affected())
    }
}

impl Clone for DatabaseManager {
    fn clone(&self) -> Self {
        DatabaseManager {
            pool: self.pool.clone(),
        }
    }
}
}
#![allow(unused)]
fn main() {
// Docker配置和部署
File: task-manager/docker-compose.yml
version: '3.8'

services:
  postgres:
    image: postgres:15
    environment:
      POSTGRES_DB: task_manager
      POSTGRES_USER: task_user
      POSTGRES_PASSWORD: password
    ports:
      - "5432:5432"
    volumes:
      - postgres_data:/var/lib/postgresql/data
      - ./init.sql:/docker-entrypoint-initdb.d/init.sql

  task-manager:
    build: .
    ports:
      - "3000:3000"
    environment:
      DATABASE_URL: postgres://task_user:password@postgres:5432/task_manager
    depends_on:
      - postgres
    restart: unless-stopped

volumes:
  postgres_data:
}
#![allow(unused)]
fn main() {
// File: task-manager/README.md
企业级任务管理系统

一个基于Rust构建的企业级任务管理系统,支持项目管理、团队协作、进度跟踪、数据分析等功能。

# 功能特性

## 项目管理
- **多项目支持**:创建、编辑、删除项目
- **项目状态跟踪**:规划、进行中、暂停、已完成、已取消
- **项目优先级**:低、中、高、紧急
- **项目时间线**:开始时间、结束时间设置
- **项目分析**:进度统计、团队绩效、效率分析

## 任务管理
- **任务生命周期**:待办、进行中、审核、测试、完成、取消、阻塞
- **任务类型**:功能、修复、文档、设计、测试、研究
- **优先级管理**:低、中、高、紧急
- **工时跟踪**:预估工时、实际工时、详细时间记录
- **进度管理**:0-100%进度跟踪
- **任务分配**:支持多用户协作

## 团队协作
- **用户管理**:角色权限(管理员、经理、成员、查看者)
- **团队管理**:创建团队、添加成员
- **实时通知**:任务分配、完成提醒、逾期警告
- **评论系统**:任务评论、讨论记录
- **文件附件**:任务附件管理

## 数据分析
- **项目分析**:项目进度、团队绩效、工时统计
- **个人分析**:工作效率、工时分布、完成率
- **团队分析**:团队协作、生产力趋势
- **时间分析**:工时分布、效率优化建议

# 快速开始

## 使用Docker Compose(推荐)

1. 启动服务
```bash
docker-compose up -d
}
  1. 初始化数据库
cargo run setup --database-url "postgres://task_user:password@localhost/task_manager"
  1. 访问系统
  • Web界面:http://localhost:3000
  • API文档:http://localhost:3000/health

本地开发

  1. 安装依赖
# 安装PostgreSQL
sudo apt-get install postgresql-15

# 创建数据库
createdb task_manager
  1. 运行应用
cargo run server
  1. 数据库设置
cargo run migrate --database-url "postgres://task_user:password@localhost/task_manager"

API文档

用户管理

  • GET /api/users - 获取所有用户
  • POST /api/users - 创建用户
  • GET /api/users/:id - 获取用户详情
  • PUT /api/users/:id - 更新用户
  • DELETE /api/users/:id - 删除用户

项目管理

  • GET /api/projects - 获取项目列表
  • POST /api/projects - 创建项目
  • GET /api/projects/:id - 获取项目详情
  • PUT /api/projects/:id - 更新项目
  • DELETE /api/projects/:id - 删除项目
  • GET /api/projects/:id/tasks - 获取项目任务
  • GET /api/projects/:id/analytics - 获取项目分析

任务管理

  • GET /api/tasks - 获取任务列表
  • POST /api/tasks - 创建任务
  • GET /api/tasks/:id - 获取任务详情
  • PUT /api/tasks/:id - 更新任务
  • DELETE /api/tasks/:id - 删除任务
  • PUT /api/tasks/:id/status - 更新任务状态
  • POST /api/tasks/:id/time - 记录工时

通知管理

  • GET /api/notifications - 获取通知列表
  • PUT /api/notifications/:id/read - 标记通知已读
  • PUT /api/notifications/read-all - 全部标记已读

性能优化

数据库优化

  • 合理的索引设计
  • 查询优化
  • 连接池管理
  • 读写分离

缓存策略

  • 用户会话缓存
  • 项目数据缓存
  • 统计信息缓存

异步处理

  • 异步数据库操作
  • 非阻塞I/O
  • 任务队列

安全特性

身份认证

  • JWT token认证
  • 密码哈希存储
  • 会话管理

权限控制

  • 基于角色的访问控制
  • 细粒度权限管理
  • 数据隔离

数据安全

  • SQL注入防护
  • XSS防护
  • CSRF保护

监控和运维

系统监控

  • 数据库连接监控
  • 性能指标收集
  • 错误日志记录

健康检查

  • API健康检查端点
  • 数据库连接检查
  • 系统资源监控

扩展性

水平扩展

  • 无状态API设计
  • 数据库分片
  • 负载均衡

微服务架构

  • 用户服务
  • 项目服务
  • 任务服务
  • 通知服务

贡献

欢迎提交Issue和Pull Request来改进这个项目。

许可证

MIT License


联系信息

  • 作者:MiniMax Agent
  • 邮箱:developer@minimax.com
  • 文档:https://docs.minimax.com/task-manager