34 KiB
34 KiB
v1.0.2 架构设计方案
设计目标
- 抽象性: 核心业务逻辑与具体数据库实现解耦
- 可扩展性: 易于添加新的数据库类型支持
- 向后兼容: 不破坏现有 PostgreSQL 功能
- 性能: 抽象层开销最小化
- 可维护性: 清晰的代码组织和职责分离
一、整体架构
1.1 分层架构
┌─────────────────────────────────────────────────────────────┐
│ 客户端层 (MCP Clients) │
│ Claude, Cursor, 其他 AI 客户端 │
└────────────────────────┬────────────────────────────────────┘
│ MCP Protocol (WebSocket/SSE)
┌────────────────────────▼────────────────────────────────────┐
│ 传输层 (Transport) │
│ ┌──────────────────┐ ┌──────────────────┐ │
│ │ WebSocket │ │ SSE │ │
│ │ Transport │ │ Transport │ │
│ └──────────────────┘ └──────────────────┘ │
└────────────────────────┬────────────────────────────────────┘
│ MCP Server
┌────────────────────────▼────────────────────────────────────┐
│ MCP 工具层 (Tools) │
│ ┌──────────────────────────────────────────────────┐ │
│ │ Metadata Tools | Query Tools | Data Tools | │ │
│ │ Diagnostics Tools | Transaction Tools │ │
│ └──────────────────────────────────────────────────┘ │
│ (数据库无关) │
└────────────────────────┬────────────────────────────────────┘
│
┌────────────────────────▼────────────────────────────────────┐
│ 核心业务层 (Core - 数据库无关) │
│ ┌────────────────┐ ┌────────────────┐ ┌──────────────┐ │
│ │ Connection │ │ Query │ │ Transaction │ │
│ │ Manager │ │ Runner │ │ Manager │ │
│ └────────┬───────┘ └────────┬───────┘ └──────┬───────┘ │
│ ┌────────▼───────┐ ┌────────▼───────┐ ┌──────▼───────┐ │
│ │ Metadata │ │ Bulk │ │ Diagnostics │ │
│ │ Browser │ │ Helpers │ │ │ │
│ └────────┬───────┘ └────────┬───────┘ └──────┬───────┘ │
│ └──────────┬─────────┴─────────────────┘ │
└──────────────────────┼─────────────────────────────────────┘
│ 依赖
┌──────────────────────▼─────────────────────────────────────┐
│ 数据库驱动抽象层 (Driver Interface) │
│ ┌──────────────────────────────────────────────────────┐ │
│ │ interface DatabaseDriver { │ │
│ │ execute(), buildPaginatedQuery(), │ │
│ │ listSchemas(), describeTable(), ... │ │
│ │ } │ │
│ └──────────────────────────────────────────────────────┘ │
└────────────┬──────────────────────┬────────────────────────┘
│ │
┌────────────▼───────────┐ ┌──────▼───────────────────────┐
│ PostgreSQL Driver │ │ SQL Server Driver │
│ ┌──────────────────┐ │ │ ┌────────────────────────┐ │
│ │ pg library │ │ │ │ mssql library │ │
│ │ search_path │ │ │ │ schema.table 限定 │ │
│ │ $1, $2 参数 │ │ │ │ @p1, @p2 参数 │ │
│ │ ON CONFLICT │ │ │ │ MERGE │ │
│ │ RETURNING │ │ │ │ OUTPUT │ │
│ │ pg_* 系统表 │ │ │ │ sys.* 系统表 │ │
│ └──────────────────┘ │ │ └────────────────────────┘ │
└────────────────────────┘ └──────────────────────────────┘
1.2 核心设计原则
依赖倒置原则 (DIP)
- 高层模块 (Core) 不依赖低层模块 (Driver)
- 两者都依赖抽象 (DatabaseDriver 接口)
开闭原则 (OCP)
- 对扩展开放: 新增数据库类型只需实现新驱动
- 对修改关闭: 核心业务逻辑无需改动
单一职责原则 (SRP)
- Driver: 负责数据库特定的 SQL 生成和结果解析
- Core: 负责业务逻辑和流程控制
- Tools: 负责 MCP 协议适配
二、驱动接口设计
2.1 DatabaseDriver 核心接口
/**
* 数据库驱动接口
* 所有数据库驱动必须实现此接口
*/
export interface DatabaseDriver {
/**
* 驱动类型标识
*/
readonly type: 'postgres' | 'sqlserver' | 'mysql';
/**
* 驱动名称和版本
*/
readonly name: string;
readonly version: string;
// ========== 连接管理 ==========
/**
* 创建连接池
*/
createConnectionPool(config: ConnectionConfig): any;
/**
* 测试连接
*/
testConnection(pool: any): Promise<boolean>;
/**
* 关闭连接池
*/
closeConnectionPool(pool: any): Promise<void>;
// ========== 查询执行 ==========
/**
* 执行查询
* @param client - 数据库客户端
* @param sql - SQL 语句
* @param params - 参数数组
* @returns 查询结果
*/
execute<T = any>(
client: any,
sql: string,
params?: unknown[]
): Promise<QueryResult<T>>;
/**
* 构建分页查询 SQL
*/
buildPaginatedQuery(
sql: string,
params: unknown[],
limit: number,
offset: number
): { sql: string; params: unknown[] };
/**
* 构建 EXPLAIN 查询 SQL
*/
buildExplainQuery(sql: string, analyze?: boolean): string;
/**
* 解析 EXPLAIN 结果
*/
parseExplainResult(result: any): ExplainPlan;
// ========== SQL 语法工具 ==========
/**
* 引用标识符 (表名、列名等)
* PostgreSQL: "name"
* SQL Server: [name]
*/
quoteIdentifier(identifier: string): string;
/**
* 构建完整限定表名
* @param table - 表名
* @param schema - Schema 名称 (可选)
* @returns 完整限定名
*/
buildQualifiedTableName(table: string, schema?: string): string;
/**
* 生成参数占位符
* PostgreSQL: $1, $2, ...
* SQL Server: @p1, @p2, ...
* @param index - 参数索引 (从 1 开始)
*/
getParameterPlaceholder(index: number): string;
// ========== 事务管理 ==========
/**
* 构建 BEGIN TRANSACTION 语句
*/
buildBeginStatement(options?: TransactionOptions): string;
/**
* 构建 COMMIT 语句
*/
buildCommitStatement(): string;
/**
* 构建 ROLLBACK 语句
*/
buildRollbackStatement(): string;
/**
* 构建 SAVEPOINT 语句
*/
buildSavepointStatement(name: string): string;
/**
* 构建 ROLLBACK TO SAVEPOINT 语句
*/
buildRollbackToSavepointStatement(name: string): string;
// ========== Schema 管理 ==========
/**
* 构建设置 Schema 的语句
* PostgreSQL: SET search_path TO schema1, schema2
* SQL Server: 不支持,返回空字符串
*/
buildSetSchemaStatement(schemas: string | string[]): string;
/**
* 是否支持 search_path 机制
*/
supportsSearchPath(): boolean;
// ========== 元数据查询 ==========
/**
* 构建列出所有 Schema 的查询
*/
buildListSchemasQuery(): string;
/**
* 构建列出表的查询
*/
buildListTablesQuery(schema?: string): string;
/**
* 构建描述表结构的查询
*/
buildDescribeTableQuery(table: string, schema?: string): string;
/**
* 解析表定义结果
*/
parseTableDefinition(rows: any[]): TableDefinition;
/**
* 构建列出视图的查询
*/
buildListViewsQuery(schema?: string): string;
/**
* 构建列出索引的查询
*/
buildListIndexesQuery(schema?: string, table?: string): string;
/**
* 构建列出约束的查询
*/
buildListConstraintsQuery(schema?: string, table?: string): string;
/**
* 构建列出函数/存储过程的查询
*/
buildListFunctionsQuery(schema?: string): string;
/**
* 构建列出触发器的查询
*/
buildListTriggersQuery(schema?: string, table?: string): string;
// ========== 批量操作 ==========
/**
* 构建批量插入语句
*/
buildBulkInsertStatement(
table: string,
schema: string | undefined,
columns: string[],
rows: Record<string, unknown>[],
chunkSize?: number
): { sql: string; params: unknown[] }[];
/**
* 构建 UPSERT 语句
* PostgreSQL: ON CONFLICT ... DO UPDATE
* SQL Server: MERGE
*/
buildBulkUpsertStatement(
table: string,
schema: string | undefined,
columns: string[],
rows: Record<string, unknown>[],
conflictColumns: string[],
updateColumns: string[]
): { sql: string; params: unknown[] };
// ========== 诊断功能 ==========
/**
* 构建获取活跃连接的查询
*/
buildGetActiveConnectionsQuery(): string;
/**
* 解析活跃连接结果
*/
parseActiveConnections(rows: any[]): ConnectionInfo[];
/**
* 构建获取锁信息的查询
*/
buildGetLocksQuery(): string;
/**
* 解析锁信息结果
*/
parseLocks(rows: any[]): LockInfo[];
/**
* 是否支持某个诊断功能
*/
supportsDiagnostic(diagnostic: DiagnosticType): boolean;
// ========== 数据类型映射 ==========
/**
* 将数据库特定类型映射为通用类型
*/
mapToGenericType(nativeType: string): GenericDataType;
/**
* 将通用类型映射为数据库特定类型
*/
mapFromGenericType(genericType: GenericDataType): string;
}
2.2 辅助类型定义
/**
* 查询结果通用接口
*/
export interface QueryResult<T = any> {
rows: T[];
rowCount: number;
fields?: FieldInfo[];
}
/**
* 字段信息
*/
export interface FieldInfo {
name: string;
type: string;
nullable: boolean;
}
/**
* 表定义
*/
export interface TableDefinition {
schema: string;
table: string;
columns: ColumnDefinition[];
primaryKey?: string[];
foreignKeys?: ForeignKeyDefinition[];
indexes?: IndexDefinition[];
uniqueConstraints?: UniqueConstraintDefinition[];
}
/**
* 列定义
*/
export interface ColumnDefinition {
name: string;
type: string;
nullable: boolean;
defaultValue?: string;
maxLength?: number;
precision?: number;
scale?: number;
}
/**
* 外键定义
*/
export interface ForeignKeyDefinition {
name: string;
columns: string[];
referencedTable: string;
referencedSchema?: string;
referencedColumns: string[];
onDelete?: 'CASCADE' | 'SET NULL' | 'RESTRICT' | 'NO ACTION';
onUpdate?: 'CASCADE' | 'SET NULL' | 'RESTRICT' | 'NO ACTION';
}
/**
* 索引定义
*/
export interface IndexDefinition {
name: string;
columns: string[];
unique: boolean;
type?: string;
}
/**
* 唯一约束定义
*/
export interface UniqueConstraintDefinition {
name: string;
columns: string[];
}
/**
* 事务选项
*/
export interface TransactionOptions {
isolationLevel?: 'READ UNCOMMITTED' | 'READ COMMITTED' | 'REPEATABLE READ' | 'SERIALIZABLE';
readOnly?: boolean;
deferrable?: boolean; // PostgreSQL 特有
}
/**
* 通用数据类型
*/
export enum GenericDataType {
STRING = 'string',
INTEGER = 'integer',
DECIMAL = 'decimal',
BOOLEAN = 'boolean',
DATE = 'date',
TIME = 'time',
DATETIME = 'datetime',
TIMESTAMP = 'timestamp',
JSON = 'json',
BINARY = 'binary',
UUID = 'uuid',
TEXT = 'text',
UNKNOWN = 'unknown'
}
/**
* 诊断类型
*/
export enum DiagnosticType {
ACTIVE_CONNECTIONS = 'active_connections',
LOCKS = 'locks',
SLOW_QUERIES = 'slow_queries',
REPLICATION_STATUS = 'replication_status',
VACUUM_ANALYZE = 'vacuum_analyze'
}
/**
* 连接信息
*/
export interface ConnectionInfo {
pid: number;
database: string;
user: string;
clientAddress?: string;
state: string;
query?: string;
duration?: number;
}
/**
* 锁信息
*/
export interface LockInfo {
lockType: string;
database: string;
schema?: string;
table?: string;
pid: number;
mode: string;
granted: boolean;
}
/**
* 执行计划
*/
export interface ExplainPlan {
raw: any;
summary?: {
totalCost?: number;
estimatedRows?: number;
actualRows?: number;
executionTime?: number;
};
}
三、驱动工厂模式
3.1 驱动工厂实现
// src/drivers/driver-factory.ts
import { DatabaseDriver } from './database-driver.js';
import { PostgresDriver } from './postgres/postgres-driver.js';
import { SqlServerDriver } from './sqlserver/sqlserver-driver.js';
/**
* 创建数据库驱动实例
* @param type - 数据库类型
* @returns 驱动实例
*/
export function createDriver(
type: 'postgres' | 'sqlserver'
): DatabaseDriver {
switch (type) {
case 'postgres':
return new PostgresDriver();
case 'sqlserver':
return new SqlServerDriver();
default:
throw new Error(`Unsupported database type: ${type}`);
}
}
/**
* 驱动注册表
*/
const driverRegistry = new Map<string, DatabaseDriver>();
/**
* 注册自定义驱动
*/
export function registerDriver(type: string, driver: DatabaseDriver): void {
driverRegistry.set(type, driver);
}
/**
* 获取已注册的驱动
*/
export function getDriver(type: string): DatabaseDriver {
const driver = driverRegistry.get(type);
if (!driver) {
throw new Error(`Driver not found for type: ${type}`);
}
return driver;
}
四、核心层重构
4.1 ConnectionManager 重构
当前实现 (PostgreSQL 特定):
export class PostgresConnectionManager {
private pools = new Map<string, Pool>(); // pg.Pool
getPool(name: string): Pool {
const pool = new Pool({...}); // PostgreSQL 特定
return pool;
}
}
重构后 (数据库无关):
export class ConnectionManager {
private driver: DatabaseDriver;
private pools = new Map<string, any>(); // 通用 pool
constructor(
private configs: Record<string, EnvironmentConfig>,
driver: DatabaseDriver
) {
this.driver = driver;
}
getPool(envName: string): any {
if (this.pools.has(envName)) {
return this.pools.get(envName);
}
const config = this.configs[envName];
const pool = this.driver.createConnectionPool(config.connection);
this.pools.set(envName, pool);
return pool;
}
async withClient<T>(
envName: string,
schema: string | string[] | undefined,
callback: (client: any) => Promise<T>,
options?: QueryOptions
): Promise<T> {
const pool = this.getPool(envName);
const client = await pool.connect();
try {
// 设置 schema (如果支持)
if (schema && this.driver.supportsSearchPath()) {
const schemaSQL = this.driver.buildSetSchemaStatement(schema);
await client.query(schemaSQL);
}
return await callback(client);
} finally {
client.release();
}
}
}
4.2 QueryRunner 重构
重构前:
async executePaginated<T>(
envName: string,
text: string,
values: unknown[],
pagination: PaginationOptions
): Promise<QueryResult<T>> {
const paginatedText = `${text} LIMIT $${limitIndex} OFFSET $${offsetIndex}`;
// PostgreSQL 特定
}
重构后:
async executePaginated<T>(
envName: string,
text: string,
values: unknown[],
pagination: PaginationOptions
): Promise<QueryResult<T>> {
const { sql, params } = this.driver.buildPaginatedQuery(
text,
values,
pagination.limit,
pagination.offset ?? 0
);
return this.execute<T>(envName, sql, params);
}
4.3 MetadataBrowser 重构
重构前:
async listTables(envName: string, schema?: string): Promise<string[]> {
const sql = `
SELECT tablename FROM pg_tables
WHERE schemaname = $1
`; // PostgreSQL 特定
// ...
}
重构后:
async listTables(envName: string, schema?: string): Promise<string[]> {
const sql = this.driver.buildListTablesQuery(schema);
const result = await this.queryRunner.execute(envName, sql);
return result.rows.map(row => row.table_name || row.tablename);
}
五、PostgreSQL 驱动实现
5.1 核心方法示例
// src/drivers/postgres/postgres-driver.ts
import { Pool, PoolClient } from 'pg';
import { DatabaseDriver, QueryResult } from '../database-driver.js';
export class PostgresDriver implements DatabaseDriver {
readonly type = 'postgres' as const;
readonly name = 'PostgreSQL Driver';
readonly version = '1.0.0';
// ========== 连接管理 ==========
createConnectionPool(config: any): Pool {
return new Pool({
host: config.host,
port: config.port,
database: config.database,
user: config.user,
password: config.password,
ssl: config.ssl?.require ? {
ca: config.ssl.ca,
cert: config.ssl.cert,
key: config.ssl.key,
rejectUnauthorized: config.ssl.rejectUnauthorized ?? true
} : false,
max: config.pool?.max ?? 10,
idleTimeoutMillis: config.pool?.idleTimeoutMs ?? 30000,
connectionTimeoutMillis: config.pool?.connectionTimeoutMs ?? 10000
});
}
async testConnection(pool: Pool): Promise<boolean> {
const client = await pool.connect();
try {
await client.query('SELECT 1');
return true;
} finally {
client.release();
}
}
async closeConnectionPool(pool: Pool): Promise<void> {
await pool.end();
}
// ========== 查询执行 ==========
async execute<T>(
client: PoolClient,
sql: string,
params?: unknown[]
): Promise<QueryResult<T>> {
const result = await client.query<T>(sql, params);
return {
rows: result.rows,
rowCount: result.rowCount ?? 0,
fields: result.fields?.map(f => ({
name: f.name,
type: this.mapToGenericType(f.dataTypeID.toString()),
nullable: true // pg 不提供此信息
}))
};
}
buildPaginatedQuery(
sql: string,
params: unknown[],
limit: number,
offset: number
): { sql: string; params: unknown[] } {
const limitIdx = params.length + 1;
const offsetIdx = limitIdx + 1;
return {
sql: `${sql} LIMIT $${limitIdx} OFFSET $${offsetIdx}`,
params: [...params, limit, offset]
};
}
buildExplainQuery(sql: string, analyze?: boolean): string {
const prefix = analyze
? 'EXPLAIN (ANALYZE, FORMAT JSON)'
: 'EXPLAIN (FORMAT JSON)';
return `${prefix} ${sql}`;
}
parseExplainResult(result: any): ExplainPlan {
const plan = result.rows[0]['QUERY PLAN'][0];
return {
raw: plan,
summary: {
totalCost: plan['Total Cost'],
estimatedRows: plan['Plan Rows'],
actualRows: plan['Actual Rows'],
executionTime: plan['Execution Time']
}
};
}
// ========== SQL 语法工具 ==========
quoteIdentifier(identifier: string): string {
return `"${identifier.replace(/"/g, '""')}"`;
}
buildQualifiedTableName(table: string, schema?: string): string {
if (schema) {
return `${this.quoteIdentifier(schema)}.${this.quoteIdentifier(table)}`;
}
return this.quoteIdentifier(table);
}
getParameterPlaceholder(index: number): string {
return `$${index}`;
}
// ========== 事务管理 ==========
buildBeginStatement(options?: TransactionOptions): string {
let sql = 'BEGIN';
if (options?.isolationLevel) {
sql += ` ISOLATION LEVEL ${options.isolationLevel}`;
}
if (options?.readOnly) {
sql += ' READ ONLY';
}
if (options?.deferrable) {
sql += ' DEFERRABLE';
}
return sql;
}
buildCommitStatement(): string {
return 'COMMIT';
}
buildRollbackStatement(): string {
return 'ROLLBACK';
}
buildSavepointStatement(name: string): string {
return `SAVEPOINT ${this.quoteIdentifier(name)}`;
}
buildRollbackToSavepointStatement(name: string): string {
return `ROLLBACK TO SAVEPOINT ${this.quoteIdentifier(name)}`;
}
// ========== Schema 管理 ==========
buildSetSchemaStatement(schemas: string | string[]): string {
const paths = Array.isArray(schemas) ? schemas : [schemas];
const quoted = paths.map(s => this.quoteIdentifier(s)).join(', ');
return `SET search_path TO ${quoted}`;
}
supportsSearchPath(): boolean {
return true;
}
// ========== 元数据查询 ==========
buildListSchemasQuery(): string {
return `
SELECT schema_name
FROM information_schema.schemata
WHERE schema_name NOT IN ('pg_catalog', 'information_schema', 'pg_toast')
ORDER BY schema_name
`;
}
buildListTablesQuery(schema?: string): string {
if (schema) {
return `
SELECT tablename as table_name
FROM pg_tables
WHERE schemaname = '${schema}'
ORDER BY tablename
`;
}
return `
SELECT tablename as table_name, schemaname as schema_name
FROM pg_tables
WHERE schemaname NOT IN ('pg_catalog', 'information_schema')
ORDER BY schemaname, tablename
`;
}
// ... 其他方法实现
}
六、SQL Server 驱动实现
6.1 核心方法示例
// src/drivers/sqlserver/sqlserver-driver.ts
import * as mssql from 'mssql';
import { DatabaseDriver, QueryResult } from '../database-driver.js';
export class SqlServerDriver implements DatabaseDriver {
readonly type = 'sqlserver' as const;
readonly name = 'SQL Server Driver';
readonly version = '1.0.0';
// ========== 连接管理 ==========
createConnectionPool(config: any): mssql.ConnectionPool {
const poolConfig: mssql.config = {
server: config.host,
port: config.port,
database: config.database,
user: config.user,
password: config.password,
options: {
encrypt: config.encrypt ?? true,
trustServerCertificate: config.trustServerCertificate ?? false,
enableArithAbort: true,
instanceName: config.instanceName
},
pool: {
max: config.pool?.max ?? 10,
min: 0,
idleTimeoutMillis: config.pool?.idleTimeoutMs ?? 30000
}
};
const pool = new mssql.ConnectionPool(poolConfig);
return pool.connect(); // 返回 Promise,但工厂模式会处理
}
async testConnection(pool: mssql.ConnectionPool): Promise<boolean> {
try {
await pool.request().query('SELECT 1');
return true;
} catch {
return false;
}
}
async closeConnectionPool(pool: mssql.ConnectionPool): Promise<void> {
await pool.close();
}
// ========== 查询执行 ==========
async execute<T>(
pool: mssql.ConnectionPool,
sql: string,
params?: unknown[]
): Promise<QueryResult<T>> {
const request = pool.request();
// 绑定参数
if (params && params.length > 0) {
params.forEach((param, i) => {
request.input(`p${i + 1}`, param);
});
}
const result = await request.query<T>(sql);
return {
rows: result.recordset,
rowCount: result.rowsAffected[0] ?? 0,
fields: result.recordset.columns?.map(col => ({
name: col.name,
type: this.mapToGenericType(col.type.declaration),
nullable: col.nullable ?? true
}))
};
}
buildPaginatedQuery(
sql: string,
params: unknown[],
limit: number,
offset: number
): { sql: string; params: unknown[] } {
// SQL Server: OFFSET n ROWS FETCH NEXT m ROWS ONLY
return {
sql: `${sql} OFFSET ${offset} ROWS FETCH NEXT ${limit} ROWS ONLY`,
params: params
};
}
buildExplainQuery(sql: string, analyze?: boolean): string {
if (analyze) {
return `
SET STATISTICS IO ON;
SET STATISTICS TIME ON;
${sql};
SET STATISTICS IO OFF;
SET STATISTICS TIME OFF;
`;
}
// SQL Server 没有 EXPLAIN,需要通过其他方式获取执行计划
return sql;
}
parseExplainResult(result: any): ExplainPlan {
// SQL Server 的执行计划解析更复杂
return {
raw: result,
summary: {
// 需要从 STATISTICS 输出中提取
}
};
}
// ========== SQL 语法工具 ==========
quoteIdentifier(identifier: string): string {
return `[${identifier.replace(/\]/g, ']]')}]`;
}
buildQualifiedTableName(table: string, schema?: string): string {
if (schema) {
return `${this.quoteIdentifier(schema)}.${this.quoteIdentifier(table)}`;
}
return this.quoteIdentifier(table);
}
getParameterPlaceholder(index: number): string {
return `@p${index}`;
}
// ========== Schema 管理 ==========
buildSetSchemaStatement(schemas: string | string[]): string {
// SQL Server 不支持 search_path
// 返回空字符串,由调用方使用完整限定名
return '';
}
supportsSearchPath(): boolean {
return false;
}
// ========== 元数据查询 ==========
buildListSchemasQuery(): string {
return `
SELECT name as schema_name
FROM sys.schemas
WHERE name NOT IN ('dbo', 'guest', 'INFORMATION_SCHEMA', 'sys', 'db_owner',
'db_accessadmin', 'db_securityadmin', 'db_ddladmin',
'db_backupoperator', 'db_datareader', 'db_datawriter',
'db_denydatareader', 'db_denydatawriter')
ORDER BY name
`;
}
buildListTablesQuery(schema?: string): string {
if (schema) {
return `
SELECT TABLE_NAME as table_name
FROM INFORMATION_SCHEMA.TABLES
WHERE TABLE_SCHEMA = '${schema}'
AND TABLE_TYPE = 'BASE TABLE'
ORDER BY TABLE_NAME
`;
}
return `
SELECT TABLE_NAME as table_name, TABLE_SCHEMA as schema_name
FROM INFORMATION_SCHEMA.TABLES
WHERE TABLE_TYPE = 'BASE TABLE'
ORDER BY TABLE_SCHEMA, TABLE_NAME
`;
}
buildDescribeTableQuery(table: string, schema?: string): string {
const schemaFilter = schema ? `AND TABLE_SCHEMA = '${schema}'` : '';
return `
SELECT
COLUMN_NAME as name,
DATA_TYPE as type,
IS_NULLABLE as nullable,
COLUMN_DEFAULT as default_value,
CHARACTER_MAXIMUM_LENGTH as max_length,
NUMERIC_PRECISION as precision,
NUMERIC_SCALE as scale
FROM INFORMATION_SCHEMA.COLUMNS
WHERE TABLE_NAME = '${table}' ${schemaFilter}
ORDER BY ORDINAL_POSITION
`;
}
buildBulkUpsertStatement(
table: string,
schema: string | undefined,
columns: string[],
rows: Record<string, unknown>[],
conflictColumns: string[],
updateColumns: string[]
): { sql: string; params: unknown[] } {
const target = this.buildQualifiedTableName(table, schema);
const quotedCols = columns.map(c => this.quoteIdentifier(c));
// 构建 VALUES 子句
const valueRows = rows.map((_, rowIdx) => {
const rowParams = columns.map((_, colIdx) =>
`@p${rowIdx * columns.length + colIdx + 1}`
);
return `(${rowParams.join(', ')})`;
}).join(', ');
const params = rows.flatMap(row => columns.map(col => row[col]));
// 构建 MERGE 语句
const sql = `
MERGE INTO ${target} AS target
USING (VALUES ${valueRows}) AS source (${quotedCols.join(', ')})
ON ${conflictColumns.map(c =>
`target.${this.quoteIdentifier(c)} = source.${this.quoteIdentifier(c)}`
).join(' AND ')}
WHEN MATCHED THEN
UPDATE SET ${updateColumns.map(c =>
`${this.quoteIdentifier(c)} = source.${this.quoteIdentifier(c)}`
).join(', ')}
WHEN NOT MATCHED THEN
INSERT (${quotedCols.join(', ')})
VALUES (${quotedCols.map(c => `source.${c}`).join(', ')})
OUTPUT INSERTED.*;
`;
return { sql, params };
}
// ... 其他方法实现
}
七、配置系统扩展
7.1 配置类型扩展
// src/config/types.ts
export interface EnvironmentConfig {
// 扩展类型支持
type: 'postgres' | 'sqlserver';
connection: PostgresConnection | SqlServerConnection;
// 通用配置
defaultSchema?: string;
searchPath?: string[]; // SQL Server 会忽略此配置
pool?: Partial<PoolConfig>;
statementTimeoutMs?: number;
slowQueryMs?: number;
mode?: 'readonly' | 'readwrite' | 'ddl';
}
/**
* PostgreSQL 连接配置
*/
export interface PostgresConnection {
host: string;
port: number;
database: string;
user: string;
password: string;
ssl?: false | SSLConfig;
}
/**
* SQL Server 连接配置
*/
export interface SqlServerConnection {
host: string;
port: number;
database: string;
user: string;
password: string;
// SQL Server 特有配置
domain?: string; // Windows 域
instanceName?: string; // 命名实例
encrypt?: boolean; // 是否加密连接
trustServerCertificate?: boolean; // 是否信任服务器证书
}
7.2 配置示例
{
"server": {
"listen": { "host": "0.0.0.0", "port": 7700 },
"auth": { "type": "token", "token": "ENV:MCP_AUTH_TOKEN" }
},
"environments": {
"pg_drworks": {
"type": "postgres",
"connection": {
"host": "localhost",
"port": 5432,
"database": "drworks",
"user": "postgres",
"password": "ENV:POSTGRES_PASSWORD"
},
"defaultSchema": "dbo",
"searchPath": ["dbo", "api"],
"mode": "readwrite"
},
"mssql_hospital": {
"type": "sqlserver",
"connection": {
"host": "localhost",
"port": 1433,
"database": "HospitalDB",
"user": "sa",
"password": "ENV:MSSQL_PASSWORD",
"encrypt": true,
"trustServerCertificate": false
},
"defaultSchema": "dbo",
"mode": "readwrite"
}
}
}
八、测试策略
8.1 单元测试
// __tests__/drivers/postgres-driver.test.ts
describe('PostgresDriver', () => {
const driver = new PostgresDriver();
test('quoteIdentifier', () => {
expect(driver.quoteIdentifier('table')).toBe('"table"');
expect(driver.quoteIdentifier('my"table')).toBe('"my""table"');
});
test('buildPaginatedQuery', () => {
const { sql, params } = driver.buildPaginatedQuery(
'SELECT * FROM users',
[],
10,
20
);
expect(sql).toBe('SELECT * FROM users LIMIT $1 OFFSET $2');
expect(params).toEqual([10, 20]);
});
});
8.2 集成测试
// __tests__/integration/sqlserver.test.ts
describe('SQL Server Integration', () => {
let pool: mssql.ConnectionPool;
let driver: SqlServerDriver;
beforeAll(async () => {
driver = new SqlServerDriver();
pool = await driver.createConnectionPool({
host: 'localhost',
port: 1433,
database: 'test',
user: 'sa',
password: 'TestPassword123'
});
});
afterAll(async () => {
await driver.closeConnectionPool(pool);
});
test('execute query', async () => {
const result = await driver.execute(pool, 'SELECT 1 as num');
expect(result.rows).toHaveLength(1);
expect(result.rows[0].num).toBe(1);
});
});
九、性能优化
9.1 连接池优化
- 预热连接池: 启动时创建最小连接数
- 连接复用: 优先复用空闲连接
- 监控连接池状态: 定期检查连接健康度
9.2 查询优化
- 参数化查询: 所有查询都使用参数化
- 批量操作: 合并多个插入为单个语句
- 索引提示: 必要时添加索引提示 (数据库特定)
十、总结
关键设计亮点
- 高度抽象: 驱动接口完全隔离数据库差异
- 易于扩展: 新增数据库只需实现驱动接口
- 向后兼容: PostgreSQL 功能完全保留
- 类型安全: 完整的 TypeScript 类型定义
- 可测试性: 驱动可独立测试,核心层可 mock 驱动
技术债务
- 部分诊断功能在 SQL Server 上实现有限
- EXPLAIN 功能在 SQL Server 上需要特殊处理
- Schema 机制差异需要调用方注意
文档维护:
- 负责人: [待指定]
- 最后更新: 2024-12-27