제어관리 개선판
This commit is contained in:
@@ -677,51 +677,100 @@ export class NodeFlowExecutionService {
|
||||
node: FlowNode,
|
||||
context: ExecutionContext
|
||||
): Promise<any[]> {
|
||||
const { connectionId, tableName, schema, whereConditions } = node.data;
|
||||
const { connectionId, tableName, schema, whereConditions, dataSourceType } =
|
||||
node.data;
|
||||
|
||||
if (!connectionId || !tableName) {
|
||||
throw new Error("외부 DB 연결 정보 또는 테이블명이 설정되지 않았습니다.");
|
||||
}
|
||||
// 🆕 노드의 dataSourceType 확인 (기본값: context-data)
|
||||
const nodeDataSourceType = dataSourceType || "context-data";
|
||||
|
||||
logger.info(`🔌 외부 DB 소스 조회: ${connectionId}.${tableName}`);
|
||||
logger.info(
|
||||
`🔌 외부 DB 소스 노드 실행: ${connectionId}.${tableName}, dataSourceType=${nodeDataSourceType}`
|
||||
);
|
||||
|
||||
try {
|
||||
// 연결 풀 서비스 임포트 (동적 임포트로 순환 참조 방지)
|
||||
const { ExternalDbConnectionPoolService } = await import(
|
||||
"./externalDbConnectionPoolService"
|
||||
);
|
||||
const poolService = ExternalDbConnectionPoolService.getInstance();
|
||||
|
||||
// 스키마 접두사 처리
|
||||
const schemaPrefix = schema ? `${schema}.` : "";
|
||||
const fullTableName = `${schemaPrefix}${tableName}`;
|
||||
|
||||
// WHERE 절 생성
|
||||
let sql = `SELECT * FROM ${fullTableName}`;
|
||||
let params: any[] = [];
|
||||
|
||||
if (whereConditions && whereConditions.length > 0) {
|
||||
const whereResult = this.buildWhereClause(whereConditions);
|
||||
sql += ` ${whereResult.clause}`;
|
||||
params = whereResult.values;
|
||||
// 1. context-data 모드: 외부에서 주입된 데이터 사용
|
||||
if (nodeDataSourceType === "context-data") {
|
||||
if (
|
||||
context.sourceData &&
|
||||
Array.isArray(context.sourceData) &&
|
||||
context.sourceData.length > 0
|
||||
) {
|
||||
logger.info(
|
||||
`📊 컨텍스트 데이터 사용: ${context.dataSourceType}, ${context.sourceData.length}건`
|
||||
);
|
||||
return context.sourceData;
|
||||
}
|
||||
|
||||
logger.info(`📊 외부 DB 쿼리 실행: ${sql}`);
|
||||
|
||||
// 연결 풀을 통해 쿼리 실행
|
||||
const result = await poolService.executeQuery(connectionId, sql, params);
|
||||
|
||||
logger.info(
|
||||
`✅ 외부 DB 소스 조회 완료: ${tableName}, ${result.length}건`
|
||||
);
|
||||
|
||||
return result;
|
||||
} catch (error: any) {
|
||||
logger.error(`❌ 외부 DB 소스 조회 실패:`, error);
|
||||
throw new Error(
|
||||
`외부 DB 조회 실패 (연결 ID: ${connectionId}): ${error.message}`
|
||||
logger.warn(
|
||||
`⚠️ context-data 모드이지만 전달된 데이터가 없습니다. 빈 배열 반환.`
|
||||
);
|
||||
return [];
|
||||
}
|
||||
|
||||
// 2. table-all 모드: 외부 DB 테이블 전체 데이터 조회
|
||||
if (nodeDataSourceType === "table-all") {
|
||||
if (!connectionId || !tableName) {
|
||||
throw new Error(
|
||||
"외부 DB 연결 정보 또는 테이블명이 설정되지 않았습니다."
|
||||
);
|
||||
}
|
||||
|
||||
try {
|
||||
// 연결 풀 서비스 임포트 (동적 임포트로 순환 참조 방지)
|
||||
const { ExternalDbConnectionPoolService } = await import(
|
||||
"./externalDbConnectionPoolService"
|
||||
);
|
||||
const poolService = ExternalDbConnectionPoolService.getInstance();
|
||||
|
||||
// 스키마 접두사 처리
|
||||
const schemaPrefix = schema ? `${schema}.` : "";
|
||||
const fullTableName = `${schemaPrefix}${tableName}`;
|
||||
|
||||
// WHERE 절 생성
|
||||
let sql = `SELECT * FROM ${fullTableName}`;
|
||||
let params: any[] = [];
|
||||
|
||||
if (whereConditions && whereConditions.length > 0) {
|
||||
const whereResult = this.buildWhereClause(whereConditions);
|
||||
sql += ` ${whereResult.clause}`;
|
||||
params = whereResult.values;
|
||||
}
|
||||
|
||||
logger.info(`📊 외부 DB 쿼리 실행: ${sql}`);
|
||||
|
||||
// 연결 풀을 통해 쿼리 실행
|
||||
const result = await poolService.executeQuery(
|
||||
connectionId,
|
||||
sql,
|
||||
params
|
||||
);
|
||||
|
||||
logger.info(
|
||||
`✅ 외부 DB 전체 데이터 조회 완료: ${tableName}, ${result.length}건`
|
||||
);
|
||||
|
||||
return result;
|
||||
} catch (error: any) {
|
||||
logger.error(`❌ 외부 DB 소스 조회 실패:`, error);
|
||||
throw new Error(
|
||||
`외부 DB 조회 실패 (연결 ID: ${connectionId}): ${error.message}`
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
// 3. 알 수 없는 모드 (기본값으로 처리)
|
||||
logger.warn(
|
||||
`⚠️ 알 수 없는 dataSourceType: ${nodeDataSourceType}, context-data로 처리`
|
||||
);
|
||||
|
||||
if (
|
||||
context.sourceData &&
|
||||
Array.isArray(context.sourceData) &&
|
||||
context.sourceData.length > 0
|
||||
) {
|
||||
return context.sourceData;
|
||||
}
|
||||
|
||||
return [];
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -731,40 +780,71 @@ export class NodeFlowExecutionService {
|
||||
node: FlowNode,
|
||||
context: ExecutionContext
|
||||
): Promise<any[]> {
|
||||
// 🔥 외부에서 주입된 데이터가 있으면 우선 사용
|
||||
const { tableName, schema, whereConditions, dataSourceType } = node.data;
|
||||
|
||||
// 🆕 노드의 dataSourceType 확인 (기본값: context-data)
|
||||
const nodeDataSourceType = dataSourceType || "context-data";
|
||||
|
||||
logger.info(
|
||||
`📊 테이블 소스 노드 실행: ${tableName}, dataSourceType=${nodeDataSourceType}`
|
||||
);
|
||||
|
||||
// 1. context-data 모드: 외부에서 주입된 데이터 사용
|
||||
if (nodeDataSourceType === "context-data") {
|
||||
if (
|
||||
context.sourceData &&
|
||||
Array.isArray(context.sourceData) &&
|
||||
context.sourceData.length > 0
|
||||
) {
|
||||
logger.info(
|
||||
`📊 컨텍스트 데이터 사용: ${context.dataSourceType}, ${context.sourceData.length}건`
|
||||
);
|
||||
return context.sourceData;
|
||||
}
|
||||
|
||||
logger.warn(
|
||||
`⚠️ context-data 모드이지만 전달된 데이터가 없습니다. 빈 배열 반환.`
|
||||
);
|
||||
return [];
|
||||
}
|
||||
|
||||
// 2. table-all 모드: 테이블 전체 데이터 조회
|
||||
if (nodeDataSourceType === "table-all") {
|
||||
if (!tableName) {
|
||||
logger.warn("⚠️ 테이블 소스 노드에 테이블명이 없습니다.");
|
||||
return [];
|
||||
}
|
||||
|
||||
const schemaPrefix = schema ? `${schema}.` : "";
|
||||
const whereResult = whereConditions
|
||||
? this.buildWhereClause(whereConditions)
|
||||
: { clause: "", values: [] };
|
||||
|
||||
const sql = `SELECT * FROM ${schemaPrefix}${tableName} ${whereResult.clause}`;
|
||||
|
||||
const result = await query(sql, whereResult.values);
|
||||
|
||||
logger.info(
|
||||
`📊 테이블 전체 데이터 조회: ${tableName}, ${result.length}건`
|
||||
);
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
// 3. 알 수 없는 모드 (기본값으로 처리)
|
||||
logger.warn(
|
||||
`⚠️ 알 수 없는 dataSourceType: ${nodeDataSourceType}, context-data로 처리`
|
||||
);
|
||||
|
||||
if (
|
||||
context.sourceData &&
|
||||
Array.isArray(context.sourceData) &&
|
||||
context.sourceData.length > 0
|
||||
) {
|
||||
logger.info(
|
||||
`📊 외부 주입 데이터 사용: ${context.dataSourceType}, ${context.sourceData.length}건`
|
||||
);
|
||||
return context.sourceData;
|
||||
}
|
||||
|
||||
// 외부 데이터가 없으면 DB 쿼리 실행
|
||||
const { tableName, schema, whereConditions } = node.data;
|
||||
|
||||
if (!tableName) {
|
||||
logger.warn(
|
||||
"⚠️ 테이블 소스 노드에 테이블명이 없고, 외부 데이터도 없습니다."
|
||||
);
|
||||
return [];
|
||||
}
|
||||
|
||||
const schemaPrefix = schema ? `${schema}.` : "";
|
||||
const whereResult = whereConditions
|
||||
? this.buildWhereClause(whereConditions)
|
||||
: { clause: "", values: [] };
|
||||
|
||||
const sql = `SELECT * FROM ${schemaPrefix}${tableName} ${whereResult.clause}`;
|
||||
|
||||
const result = await query(sql, whereResult.values);
|
||||
|
||||
logger.info(`📊 테이블 소스 조회: ${tableName}, ${result.length}건`);
|
||||
|
||||
return result;
|
||||
return [];
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -1277,9 +1357,16 @@ export class NodeFlowExecutionService {
|
||||
}
|
||||
});
|
||||
|
||||
const whereResult = this.buildWhereClause(
|
||||
// 🆕 WHERE 조건 자동 보강: Primary Key 추가
|
||||
const enhancedWhereConditions = await this.enhanceWhereConditionsWithPK(
|
||||
whereConditions,
|
||||
data,
|
||||
targetTable
|
||||
);
|
||||
|
||||
const whereResult = this.buildWhereClause(
|
||||
enhancedWhereConditions,
|
||||
data,
|
||||
paramIndex
|
||||
);
|
||||
|
||||
@@ -1310,7 +1397,7 @@ export class NodeFlowExecutionService {
|
||||
return updatedDataArray;
|
||||
};
|
||||
|
||||
// 🔥 클라이언트가 전달되었으면 사용, 아니면 독립 트랜잭션 생성
|
||||
// 🔥 클라이언트가 전달되었으면 사용, 없으면 독립 트랜잭션 생성
|
||||
if (client) {
|
||||
return executeUpdate(client);
|
||||
} else {
|
||||
@@ -1605,7 +1692,15 @@ export class NodeFlowExecutionService {
|
||||
|
||||
for (const data of dataArray) {
|
||||
console.log("🔍 WHERE 조건 처리 중...");
|
||||
const whereResult = this.buildWhereClause(whereConditions, data, 1);
|
||||
|
||||
// 🆕 WHERE 조건 자동 보강: Primary Key 추가
|
||||
const enhancedWhereConditions = await this.enhanceWhereConditionsWithPK(
|
||||
whereConditions,
|
||||
data,
|
||||
targetTable
|
||||
);
|
||||
|
||||
const whereResult = this.buildWhereClause(enhancedWhereConditions, data, 1);
|
||||
|
||||
const sql = `DELETE FROM ${targetTable} ${whereResult.clause} RETURNING *`;
|
||||
|
||||
@@ -1629,7 +1724,7 @@ export class NodeFlowExecutionService {
|
||||
return deletedDataArray;
|
||||
};
|
||||
|
||||
// 🔥 클라이언트가 전달되었으면 사용, 아니면 독립 트랜잭션 생성
|
||||
// 🔥 클라이언트가 전달되었으면 사용, 없으면 독립 트랜잭션 생성
|
||||
if (client) {
|
||||
return executeDelete(client);
|
||||
} else {
|
||||
@@ -2439,6 +2534,105 @@ export class NodeFlowExecutionService {
|
||||
/**
|
||||
* WHERE 절 생성
|
||||
*/
|
||||
/**
|
||||
* 테이블의 Primary Key 컬럼 조회 (내부 DB - PostgreSQL)
|
||||
*/
|
||||
private static async getPrimaryKeyColumns(
|
||||
tableName: string,
|
||||
schema: string = "public"
|
||||
): Promise<string[]> {
|
||||
const sql = `
|
||||
SELECT a.attname AS column_name
|
||||
FROM pg_index i
|
||||
JOIN pg_attribute a ON a.attrelid = i.indrelid AND a.attnum = ANY(i.indkey)
|
||||
WHERE i.indrelid = $1::regclass
|
||||
AND i.indisprimary
|
||||
ORDER BY array_position(i.indkey, a.attnum);
|
||||
`;
|
||||
|
||||
const fullTableName = schema ? `${schema}.${tableName}` : tableName;
|
||||
|
||||
try {
|
||||
const result = await query(sql, [fullTableName]);
|
||||
const pkColumns = result.map((row: any) => row.column_name);
|
||||
|
||||
if (pkColumns.length > 0) {
|
||||
console.log(`🔑 테이블 ${tableName}의 Primary Key: ${pkColumns.join(", ")}`);
|
||||
} else {
|
||||
console.log(`⚠️ 테이블 ${tableName}에 Primary Key가 없습니다`);
|
||||
}
|
||||
|
||||
return pkColumns;
|
||||
} catch (error) {
|
||||
console.error(`❌ Primary Key 조회 실패 (${tableName}):`, error);
|
||||
return [];
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* WHERE 조건에 Primary Key 자동 추가 (컨텍스트 데이터 사용 시)
|
||||
*
|
||||
* 테이블의 실제 Primary Key를 자동으로 감지하여 WHERE 조건에 추가
|
||||
*/
|
||||
private static async enhanceWhereConditionsWithPK(
|
||||
whereConditions: any[],
|
||||
data: any,
|
||||
tableName: string,
|
||||
schema: string = "public"
|
||||
): Promise<any[]> {
|
||||
if (!data) {
|
||||
console.log("⚠️ 입력 데이터가 없어 WHERE 조건 자동 추가 불가");
|
||||
return whereConditions || [];
|
||||
}
|
||||
|
||||
// 🔑 테이블의 실제 Primary Key 컬럼 조회
|
||||
const pkColumns = await this.getPrimaryKeyColumns(tableName, schema);
|
||||
|
||||
if (pkColumns.length === 0) {
|
||||
console.log(`⚠️ 테이블 ${tableName}에 Primary Key가 없어 자동 추가 불가`);
|
||||
return whereConditions || [];
|
||||
}
|
||||
|
||||
// 🔍 데이터에 모든 PK 컬럼이 있는지 확인
|
||||
const missingPKColumns = pkColumns.filter(col =>
|
||||
data[col] === undefined || data[col] === null
|
||||
);
|
||||
|
||||
if (missingPKColumns.length > 0) {
|
||||
console.log(
|
||||
`⚠️ 입력 데이터에 Primary Key 컬럼이 없어 자동 추가 불가: ${missingPKColumns.join(", ")}`
|
||||
);
|
||||
return whereConditions || [];
|
||||
}
|
||||
|
||||
// 🔍 이미 WHERE 조건에 모든 PK가 포함되어 있는지 확인
|
||||
const existingFields = new Set(
|
||||
(whereConditions || []).map((cond: any) => cond.field)
|
||||
);
|
||||
const allPKsExist = pkColumns.every(col =>
|
||||
existingFields.has(col) || existingFields.has(`${tableName}.${col}`)
|
||||
);
|
||||
|
||||
if (allPKsExist) {
|
||||
console.log("✅ WHERE 조건에 이미 모든 Primary Key 포함, 추가하지 않음");
|
||||
return whereConditions || [];
|
||||
}
|
||||
|
||||
// 🔥 Primary Key 조건들을 맨 앞에 추가
|
||||
const pkConditions = pkColumns.map(col => ({
|
||||
field: col,
|
||||
operator: 'EQUALS',
|
||||
value: data[col]
|
||||
}));
|
||||
|
||||
const enhanced = [...pkConditions, ...(whereConditions || [])];
|
||||
|
||||
const pkValues = pkColumns.map(col => `${col} = ${data[col]}`).join(", ");
|
||||
console.log(`🔑 WHERE 조건에 Primary Key 자동 추가: ${pkValues}`);
|
||||
|
||||
return enhanced;
|
||||
}
|
||||
|
||||
private static buildWhereClause(
|
||||
conditions: any[],
|
||||
data?: any,
|
||||
|
||||
Reference in New Issue
Block a user