database/docs/1.0.2版本需求/02-架构设计方案.md
2025-12-27 16:21:09 +08:00

34 KiB
Raw Permalink Blame History

v1.0.2 架构设计方案

设计目标

  1. 抽象性: 核心业务逻辑与具体数据库实现解耦
  2. 可扩展性: 易于添加新的数据库类型支持
  3. 向后兼容: 不破坏现有 PostgreSQL 功能
  4. 性能: 抽象层开销最小化
  5. 可维护性: 清晰的代码组织和职责分离

一、整体架构

1.1 分层架构

┌─────────────────────────────────────────────────────────────┐
│                    客户端层 (MCP Clients)                    │
│              Claude, Cursor, 其他 AI 客户端                  │
└────────────────────────┬────────────────────────────────────┘
                         │ MCP Protocol (WebSocket/SSE)
┌────────────────────────▼────────────────────────────────────┐
│                    传输层 (Transport)                        │
│  ┌──────────────────┐        ┌──────────────────┐          │
│  │  WebSocket       │        │  SSE             │          │
│  │  Transport       │        │  Transport       │          │
│  └──────────────────┘        └──────────────────┘          │
└────────────────────────┬────────────────────────────────────┘
                         │ MCP Server
┌────────────────────────▼────────────────────────────────────┐
│                   MCP 工具层 (Tools)                         │
│  ┌──────────────────────────────────────────────────┐      │
│  │ Metadata Tools | Query Tools | Data Tools |      │      │
│  │ Diagnostics Tools | Transaction Tools            │      │
│  └──────────────────────────────────────────────────┘      │
│                     (数据库无关)                             │
└────────────────────────┬────────────────────────────────────┘
                         │
┌────────────────────────▼────────────────────────────────────┐
│              核心业务层 (Core - 数据库无关)                   │
│  ┌────────────────┐  ┌────────────────┐  ┌──────────────┐ │
│  │ Connection     │  │ Query          │  │ Transaction  │ │
│  │ Manager        │  │ Runner         │  │ Manager      │ │
│  └────────┬───────┘  └────────┬───────┘  └──────┬───────┘ │
│  ┌────────▼───────┐  ┌────────▼───────┐  ┌──────▼───────┐ │
│  │ Metadata       │  │ Bulk           │  │ Diagnostics  │ │
│  │ Browser        │  │ Helpers        │  │              │ │
│  └────────┬───────┘  └────────┬───────┘  └──────┬───────┘ │
│           └──────────┬─────────┴─────────────────┘         │
└──────────────────────┼─────────────────────────────────────┘
                       │ 依赖
┌──────────────────────▼─────────────────────────────────────┐
│              数据库驱动抽象层 (Driver Interface)             │
│  ┌──────────────────────────────────────────────────────┐  │
│  │         interface DatabaseDriver {                   │  │
│  │           execute(), buildPaginatedQuery(),          │  │
│  │           listSchemas(), describeTable(), ...        │  │
│  │         }                                            │  │
│  └──────────────────────────────────────────────────────┘  │
└────────────┬──────────────────────┬────────────────────────┘
             │                      │
┌────────────▼───────────┐  ┌──────▼───────────────────────┐
│  PostgreSQL Driver     │  │  SQL Server Driver           │
│  ┌──────────────────┐  │  │  ┌────────────────────────┐  │
│  │ pg library       │  │  │  │ mssql library          │  │
│  │ search_path      │  │  │  │ schema.table 限定      │  │
│  │ $1, $2 参数      │  │  │  │ @p1, @p2 参数          │  │
│  │ ON CONFLICT      │  │  │  │ MERGE                  │  │
│  │ RETURNING        │  │  │  │ OUTPUT                 │  │
│  │ pg_* 系统表      │  │  │  │ sys.* 系统表           │  │
│  └──────────────────┘  │  │  └────────────────────────┘  │
└────────────────────────┘  └──────────────────────────────┘

1.2 核心设计原则

依赖倒置原则 (DIP)

  • 高层模块 (Core) 不依赖低层模块 (Driver)
  • 两者都依赖抽象 (DatabaseDriver 接口)

开闭原则 (OCP)

  • 对扩展开放: 新增数据库类型只需实现新驱动
  • 对修改关闭: 核心业务逻辑无需改动

单一职责原则 (SRP)

  • Driver: 负责数据库特定的 SQL 生成和结果解析
  • Core: 负责业务逻辑和流程控制
  • Tools: 负责 MCP 协议适配

二、驱动接口设计

2.1 DatabaseDriver 核心接口

/**
 * 数据库驱动接口
 * 所有数据库驱动必须实现此接口
 */
export interface DatabaseDriver {
  /**
   * 驱动类型标识
   */
  readonly type: 'postgres' | 'sqlserver' | 'mysql';

  /**
   * 驱动名称和版本
   */
  readonly name: string;
  readonly version: string;

  // ========== 连接管理 ==========

  /**
   * 创建连接池
   */
  createConnectionPool(config: ConnectionConfig): any;

  /**
   * 测试连接
   */
  testConnection(pool: any): Promise<boolean>;

  /**
   * 关闭连接池
   */
  closeConnectionPool(pool: any): Promise<void>;

  // ========== 查询执行 ==========

  /**
   * 执行查询
   * @param client - 数据库客户端
   * @param sql - SQL 语句
   * @param params - 参数数组
   * @returns 查询结果
   */
  execute<T = any>(
    client: any,
    sql: string,
    params?: unknown[]
  ): Promise<QueryResult<T>>;

  /**
   * 构建分页查询 SQL
   */
  buildPaginatedQuery(
    sql: string,
    params: unknown[],
    limit: number,
    offset: number
  ): { sql: string; params: unknown[] };

  /**
   * 构建 EXPLAIN 查询 SQL
   */
  buildExplainQuery(sql: string, analyze?: boolean): string;

  /**
   * 解析 EXPLAIN 结果
   */
  parseExplainResult(result: any): ExplainPlan;

  // ========== SQL 语法工具 ==========

  /**
   * 引用标识符 (表名、列名等)
   * PostgreSQL: "name"
   * SQL Server: [name]
   */
  quoteIdentifier(identifier: string): string;

  /**
   * 构建完整限定表名
   * @param table - 表名
   * @param schema - Schema 名称 (可选)
   * @returns 完整限定名
   */
  buildQualifiedTableName(table: string, schema?: string): string;

  /**
   * 生成参数占位符
   * PostgreSQL: $1, $2, ...
   * SQL Server: @p1, @p2, ...
   * @param index - 参数索引 (从 1 开始)
   */
  getParameterPlaceholder(index: number): string;

  // ========== 事务管理 ==========

  /**
   * 构建 BEGIN TRANSACTION 语句
   */
  buildBeginStatement(options?: TransactionOptions): string;

  /**
   * 构建 COMMIT 语句
   */
  buildCommitStatement(): string;

  /**
   * 构建 ROLLBACK 语句
   */
  buildRollbackStatement(): string;

  /**
   * 构建 SAVEPOINT 语句
   */
  buildSavepointStatement(name: string): string;

  /**
   * 构建 ROLLBACK TO SAVEPOINT 语句
   */
  buildRollbackToSavepointStatement(name: string): string;

  // ========== Schema 管理 ==========

  /**
   * 构建设置 Schema 的语句
   * PostgreSQL: SET search_path TO schema1, schema2
   * SQL Server: 不支持,返回空字符串
   */
  buildSetSchemaStatement(schemas: string | string[]): string;

  /**
   * 是否支持 search_path 机制
   */
  supportsSearchPath(): boolean;

  // ========== 元数据查询 ==========

  /**
   * 构建列出所有 Schema 的查询
   */
  buildListSchemasQuery(): string;

  /**
   * 构建列出表的查询
   */
  buildListTablesQuery(schema?: string): string;

  /**
   * 构建描述表结构的查询
   */
  buildDescribeTableQuery(table: string, schema?: string): string;

  /**
   * 解析表定义结果
   */
  parseTableDefinition(rows: any[]): TableDefinition;

  /**
   * 构建列出视图的查询
   */
  buildListViewsQuery(schema?: string): string;

  /**
   * 构建列出索引的查询
   */
  buildListIndexesQuery(schema?: string, table?: string): string;

  /**
   * 构建列出约束的查询
   */
  buildListConstraintsQuery(schema?: string, table?: string): string;

  /**
   * 构建列出函数/存储过程的查询
   */
  buildListFunctionsQuery(schema?: string): string;

  /**
   * 构建列出触发器的查询
   */
  buildListTriggersQuery(schema?: string, table?: string): string;

  // ========== 批量操作 ==========

  /**
   * 构建批量插入语句
   */
  buildBulkInsertStatement(
    table: string,
    schema: string | undefined,
    columns: string[],
    rows: Record<string, unknown>[],
    chunkSize?: number
  ): { sql: string; params: unknown[] }[];

  /**
   * 构建 UPSERT 语句
   * PostgreSQL: ON CONFLICT ... DO UPDATE
   * SQL Server: MERGE
   */
  buildBulkUpsertStatement(
    table: string,
    schema: string | undefined,
    columns: string[],
    rows: Record<string, unknown>[],
    conflictColumns: string[],
    updateColumns: string[]
  ): { sql: string; params: unknown[] };

  // ========== 诊断功能 ==========

  /**
   * 构建获取活跃连接的查询
   */
  buildGetActiveConnectionsQuery(): string;

  /**
   * 解析活跃连接结果
   */
  parseActiveConnections(rows: any[]): ConnectionInfo[];

  /**
   * 构建获取锁信息的查询
   */
  buildGetLocksQuery(): string;

  /**
   * 解析锁信息结果
   */
  parseLocks(rows: any[]): LockInfo[];

  /**
   * 是否支持某个诊断功能
   */
  supportsDiagnostic(diagnostic: DiagnosticType): boolean;

  // ========== 数据类型映射 ==========

  /**
   * 将数据库特定类型映射为通用类型
   */
  mapToGenericType(nativeType: string): GenericDataType;

  /**
   * 将通用类型映射为数据库特定类型
   */
  mapFromGenericType(genericType: GenericDataType): string;
}

2.2 辅助类型定义

/**
 * 查询结果通用接口
 */
export interface QueryResult<T = any> {
  rows: T[];
  rowCount: number;
  fields?: FieldInfo[];
}

/**
 * 字段信息
 */
export interface FieldInfo {
  name: string;
  type: string;
  nullable: boolean;
}

/**
 * 表定义
 */
export interface TableDefinition {
  schema: string;
  table: string;
  columns: ColumnDefinition[];
  primaryKey?: string[];
  foreignKeys?: ForeignKeyDefinition[];
  indexes?: IndexDefinition[];
  uniqueConstraints?: UniqueConstraintDefinition[];
}

/**
 * 列定义
 */
export interface ColumnDefinition {
  name: string;
  type: string;
  nullable: boolean;
  defaultValue?: string;
  maxLength?: number;
  precision?: number;
  scale?: number;
}

/**
 * 外键定义
 */
export interface ForeignKeyDefinition {
  name: string;
  columns: string[];
  referencedTable: string;
  referencedSchema?: string;
  referencedColumns: string[];
  onDelete?: 'CASCADE' | 'SET NULL' | 'RESTRICT' | 'NO ACTION';
  onUpdate?: 'CASCADE' | 'SET NULL' | 'RESTRICT' | 'NO ACTION';
}

/**
 * 索引定义
 */
export interface IndexDefinition {
  name: string;
  columns: string[];
  unique: boolean;
  type?: string;
}

/**
 * 唯一约束定义
 */
export interface UniqueConstraintDefinition {
  name: string;
  columns: string[];
}

/**
 * 事务选项
 */
export interface TransactionOptions {
  isolationLevel?: 'READ UNCOMMITTED' | 'READ COMMITTED' | 'REPEATABLE READ' | 'SERIALIZABLE';
  readOnly?: boolean;
  deferrable?: boolean;  // PostgreSQL 特有
}

/**
 * 通用数据类型
 */
export enum GenericDataType {
  STRING = 'string',
  INTEGER = 'integer',
  DECIMAL = 'decimal',
  BOOLEAN = 'boolean',
  DATE = 'date',
  TIME = 'time',
  DATETIME = 'datetime',
  TIMESTAMP = 'timestamp',
  JSON = 'json',
  BINARY = 'binary',
  UUID = 'uuid',
  TEXT = 'text',
  UNKNOWN = 'unknown'
}

/**
 * 诊断类型
 */
export enum DiagnosticType {
  ACTIVE_CONNECTIONS = 'active_connections',
  LOCKS = 'locks',
  SLOW_QUERIES = 'slow_queries',
  REPLICATION_STATUS = 'replication_status',
  VACUUM_ANALYZE = 'vacuum_analyze'
}

/**
 * 连接信息
 */
export interface ConnectionInfo {
  pid: number;
  database: string;
  user: string;
  clientAddress?: string;
  state: string;
  query?: string;
  duration?: number;
}

/**
 * 锁信息
 */
export interface LockInfo {
  lockType: string;
  database: string;
  schema?: string;
  table?: string;
  pid: number;
  mode: string;
  granted: boolean;
}

/**
 * 执行计划
 */
export interface ExplainPlan {
  raw: any;
  summary?: {
    totalCost?: number;
    estimatedRows?: number;
    actualRows?: number;
    executionTime?: number;
  };
}

三、驱动工厂模式

3.1 驱动工厂实现

// src/drivers/driver-factory.ts

import { DatabaseDriver } from './database-driver.js';
import { PostgresDriver } from './postgres/postgres-driver.js';
import { SqlServerDriver } from './sqlserver/sqlserver-driver.js';

/**
 * 创建数据库驱动实例
 * @param type - 数据库类型
 * @returns 驱动实例
 */
export function createDriver(
  type: 'postgres' | 'sqlserver'
): DatabaseDriver {
  switch (type) {
    case 'postgres':
      return new PostgresDriver();
    case 'sqlserver':
      return new SqlServerDriver();
    default:
      throw new Error(`Unsupported database type: ${type}`);
  }
}

/**
 * 驱动注册表
 */
const driverRegistry = new Map<string, DatabaseDriver>();

/**
 * 注册自定义驱动
 */
export function registerDriver(type: string, driver: DatabaseDriver): void {
  driverRegistry.set(type, driver);
}

/**
 * 获取已注册的驱动
 */
export function getDriver(type: string): DatabaseDriver {
  const driver = driverRegistry.get(type);
  if (!driver) {
    throw new Error(`Driver not found for type: ${type}`);
  }
  return driver;
}

四、核心层重构

4.1 ConnectionManager 重构

当前实现 (PostgreSQL 特定):

export class PostgresConnectionManager {
  private pools = new Map<string, Pool>();  // pg.Pool

  getPool(name: string): Pool {
    const pool = new Pool({...});  // PostgreSQL 特定
    return pool;
  }
}

重构后 (数据库无关):

export class ConnectionManager {
  private driver: DatabaseDriver;
  private pools = new Map<string, any>();  // 通用 pool

  constructor(
    private configs: Record<string, EnvironmentConfig>,
    driver: DatabaseDriver
  ) {
    this.driver = driver;
  }

  getPool(envName: string): any {
    if (this.pools.has(envName)) {
      return this.pools.get(envName);
    }

    const config = this.configs[envName];
    const pool = this.driver.createConnectionPool(config.connection);
    this.pools.set(envName, pool);
    return pool;
  }

  async withClient<T>(
    envName: string,
    schema: string | string[] | undefined,
    callback: (client: any) => Promise<T>,
    options?: QueryOptions
  ): Promise<T> {
    const pool = this.getPool(envName);
    const client = await pool.connect();

    try {
      // 设置 schema (如果支持)
      if (schema && this.driver.supportsSearchPath()) {
        const schemaSQL = this.driver.buildSetSchemaStatement(schema);
        await client.query(schemaSQL);
      }

      return await callback(client);
    } finally {
      client.release();
    }
  }
}

4.2 QueryRunner 重构

重构前:

async executePaginated<T>(
  envName: string,
  text: string,
  values: unknown[],
  pagination: PaginationOptions
): Promise<QueryResult<T>> {
  const paginatedText = `${text} LIMIT $${limitIndex} OFFSET $${offsetIndex}`;
  // PostgreSQL 特定
}

重构后:

async executePaginated<T>(
  envName: string,
  text: string,
  values: unknown[],
  pagination: PaginationOptions
): Promise<QueryResult<T>> {
  const { sql, params } = this.driver.buildPaginatedQuery(
    text,
    values,
    pagination.limit,
    pagination.offset ?? 0
  );
  return this.execute<T>(envName, sql, params);
}

4.3 MetadataBrowser 重构

重构前:

async listTables(envName: string, schema?: string): Promise<string[]> {
  const sql = `
    SELECT tablename FROM pg_tables
    WHERE schemaname = $1
  `;  // PostgreSQL 特定
  // ...
}

重构后:

async listTables(envName: string, schema?: string): Promise<string[]> {
  const sql = this.driver.buildListTablesQuery(schema);
  const result = await this.queryRunner.execute(envName, sql);
  return result.rows.map(row => row.table_name || row.tablename);
}

五、PostgreSQL 驱动实现

5.1 核心方法示例

// src/drivers/postgres/postgres-driver.ts

import { Pool, PoolClient } from 'pg';
import { DatabaseDriver, QueryResult } from '../database-driver.js';

export class PostgresDriver implements DatabaseDriver {
  readonly type = 'postgres' as const;
  readonly name = 'PostgreSQL Driver';
  readonly version = '1.0.0';

  // ========== 连接管理 ==========

  createConnectionPool(config: any): Pool {
    return new Pool({
      host: config.host,
      port: config.port,
      database: config.database,
      user: config.user,
      password: config.password,
      ssl: config.ssl?.require ? {
        ca: config.ssl.ca,
        cert: config.ssl.cert,
        key: config.ssl.key,
        rejectUnauthorized: config.ssl.rejectUnauthorized ?? true
      } : false,
      max: config.pool?.max ?? 10,
      idleTimeoutMillis: config.pool?.idleTimeoutMs ?? 30000,
      connectionTimeoutMillis: config.pool?.connectionTimeoutMs ?? 10000
    });
  }

  async testConnection(pool: Pool): Promise<boolean> {
    const client = await pool.connect();
    try {
      await client.query('SELECT 1');
      return true;
    } finally {
      client.release();
    }
  }

  async closeConnectionPool(pool: Pool): Promise<void> {
    await pool.end();
  }

  // ========== 查询执行 ==========

  async execute<T>(
    client: PoolClient,
    sql: string,
    params?: unknown[]
  ): Promise<QueryResult<T>> {
    const result = await client.query<T>(sql, params);
    return {
      rows: result.rows,
      rowCount: result.rowCount ?? 0,
      fields: result.fields?.map(f => ({
        name: f.name,
        type: this.mapToGenericType(f.dataTypeID.toString()),
        nullable: true  // pg 不提供此信息
      }))
    };
  }

  buildPaginatedQuery(
    sql: string,
    params: unknown[],
    limit: number,
    offset: number
  ): { sql: string; params: unknown[] } {
    const limitIdx = params.length + 1;
    const offsetIdx = limitIdx + 1;
    return {
      sql: `${sql} LIMIT $${limitIdx} OFFSET $${offsetIdx}`,
      params: [...params, limit, offset]
    };
  }

  buildExplainQuery(sql: string, analyze?: boolean): string {
    const prefix = analyze
      ? 'EXPLAIN (ANALYZE, FORMAT JSON)'
      : 'EXPLAIN (FORMAT JSON)';
    return `${prefix} ${sql}`;
  }

  parseExplainResult(result: any): ExplainPlan {
    const plan = result.rows[0]['QUERY PLAN'][0];
    return {
      raw: plan,
      summary: {
        totalCost: plan['Total Cost'],
        estimatedRows: plan['Plan Rows'],
        actualRows: plan['Actual Rows'],
        executionTime: plan['Execution Time']
      }
    };
  }

  // ========== SQL 语法工具 ==========

  quoteIdentifier(identifier: string): string {
    return `"${identifier.replace(/"/g, '""')}"`;
  }

  buildQualifiedTableName(table: string, schema?: string): string {
    if (schema) {
      return `${this.quoteIdentifier(schema)}.${this.quoteIdentifier(table)}`;
    }
    return this.quoteIdentifier(table);
  }

  getParameterPlaceholder(index: number): string {
    return `$${index}`;
  }

  // ========== 事务管理 ==========

  buildBeginStatement(options?: TransactionOptions): string {
    let sql = 'BEGIN';
    if (options?.isolationLevel) {
      sql += ` ISOLATION LEVEL ${options.isolationLevel}`;
    }
    if (options?.readOnly) {
      sql += ' READ ONLY';
    }
    if (options?.deferrable) {
      sql += ' DEFERRABLE';
    }
    return sql;
  }

  buildCommitStatement(): string {
    return 'COMMIT';
  }

  buildRollbackStatement(): string {
    return 'ROLLBACK';
  }

  buildSavepointStatement(name: string): string {
    return `SAVEPOINT ${this.quoteIdentifier(name)}`;
  }

  buildRollbackToSavepointStatement(name: string): string {
    return `ROLLBACK TO SAVEPOINT ${this.quoteIdentifier(name)}`;
  }

  // ========== Schema 管理 ==========

  buildSetSchemaStatement(schemas: string | string[]): string {
    const paths = Array.isArray(schemas) ? schemas : [schemas];
    const quoted = paths.map(s => this.quoteIdentifier(s)).join(', ');
    return `SET search_path TO ${quoted}`;
  }

  supportsSearchPath(): boolean {
    return true;
  }

  // ========== 元数据查询 ==========

  buildListSchemasQuery(): string {
    return `
      SELECT schema_name
      FROM information_schema.schemata
      WHERE schema_name NOT IN ('pg_catalog', 'information_schema', 'pg_toast')
      ORDER BY schema_name
    `;
  }

  buildListTablesQuery(schema?: string): string {
    if (schema) {
      return `
        SELECT tablename as table_name
        FROM pg_tables
        WHERE schemaname = '${schema}'
        ORDER BY tablename
      `;
    }
    return `
      SELECT tablename as table_name, schemaname as schema_name
      FROM pg_tables
      WHERE schemaname NOT IN ('pg_catalog', 'information_schema')
      ORDER BY schemaname, tablename
    `;
  }

  // ... 其他方法实现
}

六、SQL Server 驱动实现

6.1 核心方法示例

// src/drivers/sqlserver/sqlserver-driver.ts

import * as mssql from 'mssql';
import { DatabaseDriver, QueryResult } from '../database-driver.js';

export class SqlServerDriver implements DatabaseDriver {
  readonly type = 'sqlserver' as const;
  readonly name = 'SQL Server Driver';
  readonly version = '1.0.0';

  // ========== 连接管理 ==========

  createConnectionPool(config: any): mssql.ConnectionPool {
    const poolConfig: mssql.config = {
      server: config.host,
      port: config.port,
      database: config.database,
      user: config.user,
      password: config.password,
      options: {
        encrypt: config.encrypt ?? true,
        trustServerCertificate: config.trustServerCertificate ?? false,
        enableArithAbort: true,
        instanceName: config.instanceName
      },
      pool: {
        max: config.pool?.max ?? 10,
        min: 0,
        idleTimeoutMillis: config.pool?.idleTimeoutMs ?? 30000
      }
    };

    const pool = new mssql.ConnectionPool(poolConfig);
    return pool.connect();  // 返回 Promise但工厂模式会处理
  }

  async testConnection(pool: mssql.ConnectionPool): Promise<boolean> {
    try {
      await pool.request().query('SELECT 1');
      return true;
    } catch {
      return false;
    }
  }

  async closeConnectionPool(pool: mssql.ConnectionPool): Promise<void> {
    await pool.close();
  }

  // ========== 查询执行 ==========

  async execute<T>(
    pool: mssql.ConnectionPool,
    sql: string,
    params?: unknown[]
  ): Promise<QueryResult<T>> {
    const request = pool.request();

    // 绑定参数
    if (params && params.length > 0) {
      params.forEach((param, i) => {
        request.input(`p${i + 1}`, param);
      });
    }

    const result = await request.query<T>(sql);

    return {
      rows: result.recordset,
      rowCount: result.rowsAffected[0] ?? 0,
      fields: result.recordset.columns?.map(col => ({
        name: col.name,
        type: this.mapToGenericType(col.type.declaration),
        nullable: col.nullable ?? true
      }))
    };
  }

  buildPaginatedQuery(
    sql: string,
    params: unknown[],
    limit: number,
    offset: number
  ): { sql: string; params: unknown[] } {
    // SQL Server: OFFSET n ROWS FETCH NEXT m ROWS ONLY
    return {
      sql: `${sql} OFFSET ${offset} ROWS FETCH NEXT ${limit} ROWS ONLY`,
      params: params
    };
  }

  buildExplainQuery(sql: string, analyze?: boolean): string {
    if (analyze) {
      return `
        SET STATISTICS IO ON;
        SET STATISTICS TIME ON;
        ${sql};
        SET STATISTICS IO OFF;
        SET STATISTICS TIME OFF;
      `;
    }
    // SQL Server 没有 EXPLAIN需要通过其他方式获取执行计划
    return sql;
  }

  parseExplainResult(result: any): ExplainPlan {
    // SQL Server 的执行计划解析更复杂
    return {
      raw: result,
      summary: {
        // 需要从 STATISTICS 输出中提取
      }
    };
  }

  // ========== SQL 语法工具 ==========

  quoteIdentifier(identifier: string): string {
    return `[${identifier.replace(/\]/g, ']]')}]`;
  }

  buildQualifiedTableName(table: string, schema?: string): string {
    if (schema) {
      return `${this.quoteIdentifier(schema)}.${this.quoteIdentifier(table)}`;
    }
    return this.quoteIdentifier(table);
  }

  getParameterPlaceholder(index: number): string {
    return `@p${index}`;
  }

  // ========== Schema 管理 ==========

  buildSetSchemaStatement(schemas: string | string[]): string {
    // SQL Server 不支持 search_path
    // 返回空字符串,由调用方使用完整限定名
    return '';
  }

  supportsSearchPath(): boolean {
    return false;
  }

  // ========== 元数据查询 ==========

  buildListSchemasQuery(): string {
    return `
      SELECT name as schema_name
      FROM sys.schemas
      WHERE name NOT IN ('dbo', 'guest', 'INFORMATION_SCHEMA', 'sys', 'db_owner',
                         'db_accessadmin', 'db_securityadmin', 'db_ddladmin',
                         'db_backupoperator', 'db_datareader', 'db_datawriter',
                         'db_denydatareader', 'db_denydatawriter')
      ORDER BY name
    `;
  }

  buildListTablesQuery(schema?: string): string {
    if (schema) {
      return `
        SELECT TABLE_NAME as table_name
        FROM INFORMATION_SCHEMA.TABLES
        WHERE TABLE_SCHEMA = '${schema}'
          AND TABLE_TYPE = 'BASE TABLE'
        ORDER BY TABLE_NAME
      `;
    }
    return `
      SELECT TABLE_NAME as table_name, TABLE_SCHEMA as schema_name
      FROM INFORMATION_SCHEMA.TABLES
      WHERE TABLE_TYPE = 'BASE TABLE'
      ORDER BY TABLE_SCHEMA, TABLE_NAME
    `;
  }

  buildDescribeTableQuery(table: string, schema?: string): string {
    const schemaFilter = schema ? `AND TABLE_SCHEMA = '${schema}'` : '';
    return `
      SELECT
        COLUMN_NAME as name,
        DATA_TYPE as type,
        IS_NULLABLE as nullable,
        COLUMN_DEFAULT as default_value,
        CHARACTER_MAXIMUM_LENGTH as max_length,
        NUMERIC_PRECISION as precision,
        NUMERIC_SCALE as scale
      FROM INFORMATION_SCHEMA.COLUMNS
      WHERE TABLE_NAME = '${table}' ${schemaFilter}
      ORDER BY ORDINAL_POSITION
    `;
  }

  buildBulkUpsertStatement(
    table: string,
    schema: string | undefined,
    columns: string[],
    rows: Record<string, unknown>[],
    conflictColumns: string[],
    updateColumns: string[]
  ): { sql: string; params: unknown[] } {
    const target = this.buildQualifiedTableName(table, schema);
    const quotedCols = columns.map(c => this.quoteIdentifier(c));

    // 构建 VALUES 子句
    const valueRows = rows.map((_, rowIdx) => {
      const rowParams = columns.map((_, colIdx) =>
        `@p${rowIdx * columns.length + colIdx + 1}`
      );
      return `(${rowParams.join(', ')})`;
    }).join(', ');

    const params = rows.flatMap(row => columns.map(col => row[col]));

    // 构建 MERGE 语句
    const sql = `
      MERGE INTO ${target} AS target
      USING (VALUES ${valueRows}) AS source (${quotedCols.join(', ')})
      ON ${conflictColumns.map(c =>
        `target.${this.quoteIdentifier(c)} = source.${this.quoteIdentifier(c)}`
      ).join(' AND ')}
      WHEN MATCHED THEN
        UPDATE SET ${updateColumns.map(c =>
          `${this.quoteIdentifier(c)} = source.${this.quoteIdentifier(c)}`
        ).join(', ')}
      WHEN NOT MATCHED THEN
        INSERT (${quotedCols.join(', ')})
        VALUES (${quotedCols.map(c => `source.${c}`).join(', ')})
      OUTPUT INSERTED.*;
    `;

    return { sql, params };
  }

  // ... 其他方法实现
}

七、配置系统扩展

7.1 配置类型扩展

// src/config/types.ts

export interface EnvironmentConfig {
  // 扩展类型支持
  type: 'postgres' | 'sqlserver';

  connection: PostgresConnection | SqlServerConnection;

  // 通用配置
  defaultSchema?: string;
  searchPath?: string[];  // SQL Server 会忽略此配置
  pool?: Partial<PoolConfig>;
  statementTimeoutMs?: number;
  slowQueryMs?: number;
  mode?: 'readonly' | 'readwrite' | 'ddl';
}

/**
 * PostgreSQL 连接配置
 */
export interface PostgresConnection {
  host: string;
  port: number;
  database: string;
  user: string;
  password: string;
  ssl?: false | SSLConfig;
}

/**
 * SQL Server 连接配置
 */
export interface SqlServerConnection {
  host: string;
  port: number;
  database: string;
  user: string;
  password: string;

  // SQL Server 特有配置
  domain?: string;                   // Windows 域
  instanceName?: string;             // 命名实例
  encrypt?: boolean;                 // 是否加密连接
  trustServerCertificate?: boolean;  // 是否信任服务器证书
}

7.2 配置示例

{
  "server": {
    "listen": { "host": "0.0.0.0", "port": 7700 },
    "auth": { "type": "token", "token": "ENV:MCP_AUTH_TOKEN" }
  },
  "environments": {
    "pg_drworks": {
      "type": "postgres",
      "connection": {
        "host": "localhost",
        "port": 5432,
        "database": "drworks",
        "user": "postgres",
        "password": "ENV:POSTGRES_PASSWORD"
      },
      "defaultSchema": "dbo",
      "searchPath": ["dbo", "api"],
      "mode": "readwrite"
    },
    "mssql_hospital": {
      "type": "sqlserver",
      "connection": {
        "host": "localhost",
        "port": 1433,
        "database": "HospitalDB",
        "user": "sa",
        "password": "ENV:MSSQL_PASSWORD",
        "encrypt": true,
        "trustServerCertificate": false
      },
      "defaultSchema": "dbo",
      "mode": "readwrite"
    }
  }
}

八、测试策略

8.1 单元测试

// __tests__/drivers/postgres-driver.test.ts

describe('PostgresDriver', () => {
  const driver = new PostgresDriver();

  test('quoteIdentifier', () => {
    expect(driver.quoteIdentifier('table')).toBe('"table"');
    expect(driver.quoteIdentifier('my"table')).toBe('"my""table"');
  });

  test('buildPaginatedQuery', () => {
    const { sql, params } = driver.buildPaginatedQuery(
      'SELECT * FROM users',
      [],
      10,
      20
    );
    expect(sql).toBe('SELECT * FROM users LIMIT $1 OFFSET $2');
    expect(params).toEqual([10, 20]);
  });
});

8.2 集成测试

// __tests__/integration/sqlserver.test.ts

describe('SQL Server Integration', () => {
  let pool: mssql.ConnectionPool;
  let driver: SqlServerDriver;

  beforeAll(async () => {
    driver = new SqlServerDriver();
    pool = await driver.createConnectionPool({
      host: 'localhost',
      port: 1433,
      database: 'test',
      user: 'sa',
      password: 'TestPassword123'
    });
  });

  afterAll(async () => {
    await driver.closeConnectionPool(pool);
  });

  test('execute query', async () => {
    const result = await driver.execute(pool, 'SELECT 1 as num');
    expect(result.rows).toHaveLength(1);
    expect(result.rows[0].num).toBe(1);
  });
});

九、性能优化

9.1 连接池优化

  • 预热连接池: 启动时创建最小连接数
  • 连接复用: 优先复用空闲连接
  • 监控连接池状态: 定期检查连接健康度

9.2 查询优化

  • 参数化查询: 所有查询都使用参数化
  • 批量操作: 合并多个插入为单个语句
  • 索引提示: 必要时添加索引提示 (数据库特定)

十、总结

关键设计亮点

  1. 高度抽象: 驱动接口完全隔离数据库差异
  2. 易于扩展: 新增数据库只需实现驱动接口
  3. 向后兼容: PostgreSQL 功能完全保留
  4. 类型安全: 完整的 TypeScript 类型定义
  5. 可测试性: 驱动可独立测试,核心层可 mock 驱动

技术债务

  • 部分诊断功能在 SQL Server 上实现有限
  • EXPLAIN 功能在 SQL Server 上需要特殊处理
  • Schema 机制差异需要调用方注意

文档维护:

  • 负责人: [待指定]
  • 最后更新: 2024-12-27