메뉴관리 추가 안되는 버그 수정
This commit is contained in:
@@ -168,22 +168,54 @@ export class NodeFlowExecutionService {
|
||||
const levels = this.topologicalSort(nodes, edges);
|
||||
logger.info(`📋 실행 순서 (레벨별):`, levels);
|
||||
|
||||
// 4. 레벨별 실행
|
||||
for (const level of levels) {
|
||||
await this.executeLevel(level, nodes, edges, context);
|
||||
// 4. 🔥 전체 플로우를 하나의 트랜잭션으로 실행
|
||||
let result: ExecutionResult;
|
||||
|
||||
try {
|
||||
result = await transaction(async (client) => {
|
||||
// 트랜잭션 내에서 레벨별 실행
|
||||
for (const level of levels) {
|
||||
await this.executeLevel(level, nodes, edges, context, client);
|
||||
}
|
||||
|
||||
// 5. 결과 생성
|
||||
const executionTime = Date.now() - startTime;
|
||||
const executionResult = this.generateExecutionResult(
|
||||
nodes,
|
||||
context,
|
||||
executionTime
|
||||
);
|
||||
|
||||
// 실패한 액션 노드가 있으면 롤백
|
||||
const failedActionNodes = Array.from(
|
||||
context.nodeResults.values()
|
||||
).filter(
|
||||
(result) =>
|
||||
result.status === "failed" &&
|
||||
nodes.find(
|
||||
(n: FlowNode) =>
|
||||
n.id === result.nodeId && this.isActionNode(n.type)
|
||||
)
|
||||
);
|
||||
|
||||
if (failedActionNodes.length > 0) {
|
||||
logger.warn(
|
||||
`🔄 액션 노드 실패 감지 (${failedActionNodes.length}개), 트랜잭션 롤백`
|
||||
);
|
||||
throw new Error(
|
||||
`액션 노드 실패: ${failedActionNodes.map((n) => n.nodeId).join(", ")}`
|
||||
);
|
||||
}
|
||||
|
||||
return executionResult;
|
||||
});
|
||||
|
||||
logger.info(`✅ 플로우 실행 완료:`, result.summary);
|
||||
return result;
|
||||
} catch (error) {
|
||||
logger.error(`❌ 플로우 실행 실패, 모든 변경사항 롤백됨:`, error);
|
||||
throw error;
|
||||
}
|
||||
|
||||
// 5. 결과 생성
|
||||
const executionTime = Date.now() - startTime;
|
||||
const result = this.generateExecutionResult(
|
||||
nodes,
|
||||
context,
|
||||
executionTime
|
||||
);
|
||||
|
||||
logger.info(`✅ 플로우 실행 완료:`, result.summary);
|
||||
|
||||
return result;
|
||||
} catch (error) {
|
||||
logger.error(`❌ 플로우 실행 실패:`, error);
|
||||
throw error;
|
||||
@@ -271,13 +303,16 @@ export class NodeFlowExecutionService {
|
||||
nodeIds: string[],
|
||||
nodes: FlowNode[],
|
||||
edges: FlowEdge[],
|
||||
context: ExecutionContext
|
||||
context: ExecutionContext,
|
||||
client?: any // 🔥 트랜잭션 클라이언트 (optional)
|
||||
): Promise<void> {
|
||||
logger.info(`⏳ 레벨 실행 시작: ${nodeIds.length}개 노드`);
|
||||
|
||||
// Promise.allSettled로 병렬 실행
|
||||
const results = await Promise.allSettled(
|
||||
nodeIds.map((nodeId) => this.executeNode(nodeId, nodes, edges, context))
|
||||
nodeIds.map((nodeId) =>
|
||||
this.executeNode(nodeId, nodes, edges, context, client)
|
||||
)
|
||||
);
|
||||
|
||||
// 결과 저장
|
||||
@@ -307,7 +342,8 @@ export class NodeFlowExecutionService {
|
||||
nodeId: string,
|
||||
nodes: FlowNode[],
|
||||
edges: FlowEdge[],
|
||||
context: ExecutionContext
|
||||
context: ExecutionContext,
|
||||
client?: any // 🔥 트랜잭션 클라이언트 (optional)
|
||||
): Promise<NodeResult> {
|
||||
const startTime = Date.now();
|
||||
const node = nodes.find((n) => n.id === nodeId);
|
||||
@@ -341,7 +377,12 @@ export class NodeFlowExecutionService {
|
||||
|
||||
// 3. 노드 타입별 실행
|
||||
try {
|
||||
const result = await this.executeNodeByType(node, inputData, context);
|
||||
const result = await this.executeNodeByType(
|
||||
node,
|
||||
inputData,
|
||||
context,
|
||||
client
|
||||
);
|
||||
|
||||
logger.info(`✅ 노드 실행 성공: ${nodeId}`);
|
||||
|
||||
@@ -405,7 +446,8 @@ export class NodeFlowExecutionService {
|
||||
private static async executeNodeByType(
|
||||
node: FlowNode,
|
||||
inputData: any,
|
||||
context: ExecutionContext
|
||||
context: ExecutionContext,
|
||||
client?: any // 🔥 트랜잭션 클라이언트 (optional)
|
||||
): Promise<any> {
|
||||
switch (node.type) {
|
||||
case "tableSource":
|
||||
@@ -418,16 +460,16 @@ export class NodeFlowExecutionService {
|
||||
return this.executeDataTransform(node, inputData, context);
|
||||
|
||||
case "insertAction":
|
||||
return this.executeInsertAction(node, inputData, context);
|
||||
return this.executeInsertAction(node, inputData, context, client);
|
||||
|
||||
case "updateAction":
|
||||
return this.executeUpdateAction(node, inputData, context);
|
||||
return this.executeUpdateAction(node, inputData, context, client);
|
||||
|
||||
case "deleteAction":
|
||||
return this.executeDeleteAction(node, inputData, context);
|
||||
return this.executeDeleteAction(node, inputData, context, client);
|
||||
|
||||
case "upsertAction":
|
||||
return this.executeUpsertAction(node, inputData, context);
|
||||
return this.executeUpsertAction(node, inputData, context, client);
|
||||
|
||||
case "condition":
|
||||
return this.executeCondition(node, inputData, context);
|
||||
@@ -610,14 +652,15 @@ export class NodeFlowExecutionService {
|
||||
private static async executeInsertAction(
|
||||
node: FlowNode,
|
||||
inputData: any,
|
||||
context: ExecutionContext
|
||||
context: ExecutionContext,
|
||||
client?: any // 🔥 트랜잭션 클라이언트 (optional)
|
||||
): Promise<any> {
|
||||
const { targetType } = node.data;
|
||||
|
||||
// 🔥 타겟 타입별 분기
|
||||
switch (targetType) {
|
||||
case "internal":
|
||||
return this.executeInternalInsert(node, inputData, context);
|
||||
return this.executeInternalInsert(node, inputData, context, client);
|
||||
|
||||
case "external":
|
||||
return this.executeExternalInsert(node, inputData, context);
|
||||
@@ -628,7 +671,7 @@ export class NodeFlowExecutionService {
|
||||
default:
|
||||
// 하위 호환성: targetType이 없으면 internal로 간주
|
||||
logger.warn(`⚠️ targetType이 설정되지 않음, internal로 간주`);
|
||||
return this.executeInternalInsert(node, inputData, context);
|
||||
return this.executeInternalInsert(node, inputData, context, client);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -638,7 +681,8 @@ export class NodeFlowExecutionService {
|
||||
private static async executeInternalInsert(
|
||||
node: FlowNode,
|
||||
inputData: any,
|
||||
context: ExecutionContext
|
||||
context: ExecutionContext,
|
||||
client?: any // 🔥 트랜잭션 클라이언트 (optional)
|
||||
): Promise<any> {
|
||||
const { targetTable, fieldMappings } = node.data;
|
||||
|
||||
@@ -655,7 +699,8 @@ export class NodeFlowExecutionService {
|
||||
console.log("🔑 입력 데이터 필드명:", Object.keys(inputData[0]));
|
||||
}
|
||||
|
||||
return transaction(async (client) => {
|
||||
// 🔥 트랜잭션 클라이언트가 있으면 사용, 없으면 독립 트랜잭션
|
||||
const executeInsert = async (txClient: any) => {
|
||||
const dataArray = Array.isArray(inputData) ? inputData : [inputData];
|
||||
let insertedCount = 0;
|
||||
|
||||
@@ -685,7 +730,7 @@ export class NodeFlowExecutionService {
|
||||
console.log("📝 실행할 SQL:", sql);
|
||||
console.log("📊 바인딩 값:", values);
|
||||
|
||||
await client.query(sql, values);
|
||||
await txClient.query(sql, values);
|
||||
insertedCount++;
|
||||
}
|
||||
|
||||
@@ -694,7 +739,14 @@ export class NodeFlowExecutionService {
|
||||
);
|
||||
|
||||
return { insertedCount };
|
||||
});
|
||||
};
|
||||
|
||||
// 🔥 클라이언트가 전달되었으면 사용, 아니면 독립 트랜잭션 생성
|
||||
if (client) {
|
||||
return executeInsert(client);
|
||||
} else {
|
||||
return transaction(executeInsert);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -1004,14 +1056,15 @@ export class NodeFlowExecutionService {
|
||||
private static async executeUpdateAction(
|
||||
node: FlowNode,
|
||||
inputData: any,
|
||||
context: ExecutionContext
|
||||
context: ExecutionContext,
|
||||
client?: any // 🔥 트랜잭션 클라이언트 (optional)
|
||||
): Promise<any> {
|
||||
const { targetType } = node.data;
|
||||
|
||||
// 🔥 타겟 타입별 분기
|
||||
switch (targetType) {
|
||||
case "internal":
|
||||
return this.executeInternalUpdate(node, inputData, context);
|
||||
return this.executeInternalUpdate(node, inputData, context, client);
|
||||
|
||||
case "external":
|
||||
return this.executeExternalUpdate(node, inputData, context);
|
||||
@@ -1022,7 +1075,7 @@ export class NodeFlowExecutionService {
|
||||
default:
|
||||
// 하위 호환성: targetType이 없으면 internal로 간주
|
||||
logger.warn(`⚠️ targetType이 설정되지 않음, internal로 간주`);
|
||||
return this.executeInternalUpdate(node, inputData, context);
|
||||
return this.executeInternalUpdate(node, inputData, context, client);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1032,7 +1085,8 @@ export class NodeFlowExecutionService {
|
||||
private static async executeInternalUpdate(
|
||||
node: FlowNode,
|
||||
inputData: any,
|
||||
context: ExecutionContext
|
||||
context: ExecutionContext,
|
||||
client?: any // 🔥 트랜잭션 클라이언트 (optional)
|
||||
): Promise<any> {
|
||||
const { targetTable, fieldMappings, whereConditions } = node.data;
|
||||
|
||||
@@ -1049,7 +1103,8 @@ export class NodeFlowExecutionService {
|
||||
console.log("🔑 입력 데이터 필드명:", Object.keys(inputData[0]));
|
||||
}
|
||||
|
||||
return transaction(async (client) => {
|
||||
// 🔥 트랜잭션 클라이언트가 있으면 사용, 없으면 독립 트랜잭션
|
||||
const executeUpdate = async (txClient: any) => {
|
||||
const dataArray = Array.isArray(inputData) ? inputData : [inputData];
|
||||
let updatedCount = 0;
|
||||
|
||||
@@ -1088,7 +1143,7 @@ export class NodeFlowExecutionService {
|
||||
console.log("📝 실행할 SQL:", sql);
|
||||
console.log("📊 바인딩 값:", values);
|
||||
|
||||
const result = await client.query(sql, values);
|
||||
const result = await txClient.query(sql, values);
|
||||
updatedCount += result.rowCount || 0;
|
||||
}
|
||||
|
||||
@@ -1097,7 +1152,14 @@ export class NodeFlowExecutionService {
|
||||
);
|
||||
|
||||
return { updatedCount };
|
||||
});
|
||||
};
|
||||
|
||||
// 🔥 클라이언트가 전달되었으면 사용, 아니면 독립 트랜잭션 생성
|
||||
if (client) {
|
||||
return executeUpdate(client);
|
||||
} else {
|
||||
return transaction(executeUpdate);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -1326,14 +1388,15 @@ export class NodeFlowExecutionService {
|
||||
private static async executeDeleteAction(
|
||||
node: FlowNode,
|
||||
inputData: any,
|
||||
context: ExecutionContext
|
||||
context: ExecutionContext,
|
||||
client?: any // 🔥 트랜잭션 클라이언트 (optional)
|
||||
): Promise<any> {
|
||||
const { targetType } = node.data;
|
||||
|
||||
// 🔥 타겟 타입별 분기
|
||||
switch (targetType) {
|
||||
case "internal":
|
||||
return this.executeInternalDelete(node, inputData, context);
|
||||
return this.executeInternalDelete(node, inputData, context, client);
|
||||
|
||||
case "external":
|
||||
return this.executeExternalDelete(node, inputData, context);
|
||||
@@ -1344,7 +1407,7 @@ export class NodeFlowExecutionService {
|
||||
default:
|
||||
// 하위 호환성: targetType이 없으면 internal로 간주
|
||||
logger.warn(`⚠️ targetType이 설정되지 않음, internal로 간주`);
|
||||
return this.executeInternalDelete(node, inputData, context);
|
||||
return this.executeInternalDelete(node, inputData, context, client);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1354,7 +1417,8 @@ export class NodeFlowExecutionService {
|
||||
private static async executeInternalDelete(
|
||||
node: FlowNode,
|
||||
inputData: any,
|
||||
context: ExecutionContext
|
||||
context: ExecutionContext,
|
||||
client?: any // 🔥 트랜잭션 클라이언트 (optional)
|
||||
): Promise<any> {
|
||||
const { targetTable, whereConditions } = node.data;
|
||||
|
||||
@@ -1371,7 +1435,8 @@ export class NodeFlowExecutionService {
|
||||
console.log("🔑 입력 데이터 필드명:", Object.keys(inputData[0]));
|
||||
}
|
||||
|
||||
return transaction(async (client) => {
|
||||
// 🔥 트랜잭션 클라이언트가 있으면 사용, 없으면 독립 트랜잭션
|
||||
const executeDelete = async (txClient: any) => {
|
||||
const dataArray = Array.isArray(inputData) ? inputData : [inputData];
|
||||
let deletedCount = 0;
|
||||
|
||||
@@ -1383,7 +1448,7 @@ export class NodeFlowExecutionService {
|
||||
|
||||
console.log("📝 실행할 SQL:", sql);
|
||||
|
||||
const result = await client.query(sql, []);
|
||||
const result = await txClient.query(sql, []);
|
||||
deletedCount += result.rowCount || 0;
|
||||
}
|
||||
|
||||
@@ -1392,7 +1457,14 @@ export class NodeFlowExecutionService {
|
||||
);
|
||||
|
||||
return { deletedCount };
|
||||
});
|
||||
};
|
||||
|
||||
// 🔥 클라이언트가 전달되었으면 사용, 아니면 독립 트랜잭션 생성
|
||||
if (client) {
|
||||
return executeDelete(client);
|
||||
} else {
|
||||
return transaction(executeDelete);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -1575,14 +1647,15 @@ export class NodeFlowExecutionService {
|
||||
private static async executeUpsertAction(
|
||||
node: FlowNode,
|
||||
inputData: any,
|
||||
context: ExecutionContext
|
||||
context: ExecutionContext,
|
||||
client?: any // 🔥 트랜잭션 클라이언트 (optional)
|
||||
): Promise<any> {
|
||||
const { targetType } = node.data;
|
||||
|
||||
// 🔥 타겟 타입별 분기
|
||||
switch (targetType) {
|
||||
case "internal":
|
||||
return this.executeInternalUpsert(node, inputData, context);
|
||||
return this.executeInternalUpsert(node, inputData, context, client);
|
||||
|
||||
case "external":
|
||||
return this.executeExternalUpsert(node, inputData, context);
|
||||
@@ -1593,7 +1666,7 @@ export class NodeFlowExecutionService {
|
||||
default:
|
||||
// 하위 호환성: targetType이 없으면 internal로 간주
|
||||
logger.warn(`⚠️ targetType이 설정되지 않음, internal로 간주`);
|
||||
return this.executeInternalUpsert(node, inputData, context);
|
||||
return this.executeInternalUpsert(node, inputData, context, client);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1604,7 +1677,8 @@ export class NodeFlowExecutionService {
|
||||
private static async executeInternalUpsert(
|
||||
node: FlowNode,
|
||||
inputData: any,
|
||||
context: ExecutionContext
|
||||
context: ExecutionContext,
|
||||
client?: any // 🔥 트랜잭션 클라이언트 (optional)
|
||||
): Promise<any> {
|
||||
const { targetTable, fieldMappings, conflictKeys } = node.data;
|
||||
|
||||
@@ -1630,7 +1704,8 @@ export class NodeFlowExecutionService {
|
||||
}
|
||||
console.log("🔑 충돌 키:", conflictKeys);
|
||||
|
||||
return transaction(async (client) => {
|
||||
// 🔥 트랜잭션 클라이언트가 있으면 사용, 없으면 독립 트랜잭션
|
||||
const executeUpsert = async (txClient: any) => {
|
||||
const dataArray = Array.isArray(inputData) ? inputData : [inputData];
|
||||
let insertedCount = 0;
|
||||
let updatedCount = 0;
|
||||
@@ -1660,7 +1735,7 @@ export class NodeFlowExecutionService {
|
||||
console.log("🔍 존재 여부 확인 - 바인딩 값:", whereValues);
|
||||
|
||||
const checkSql = `SELECT 1 FROM ${targetTable} WHERE ${whereConditions} LIMIT 1`;
|
||||
const existingRow = await client.query(checkSql, whereValues);
|
||||
const existingRow = await txClient.query(checkSql, whereValues);
|
||||
|
||||
if (existingRow.rows.length > 0) {
|
||||
// 3-A. 존재하면 UPDATE
|
||||
@@ -1707,7 +1782,7 @@ export class NodeFlowExecutionService {
|
||||
values: updateValues,
|
||||
});
|
||||
|
||||
await client.query(updateSql, updateValues);
|
||||
await txClient.query(updateSql, updateValues);
|
||||
updatedCount++;
|
||||
} else {
|
||||
// 3-B. 없으면 INSERT
|
||||
@@ -1735,7 +1810,7 @@ export class NodeFlowExecutionService {
|
||||
conflictKeyValues,
|
||||
});
|
||||
|
||||
await client.query(insertSql, values);
|
||||
await txClient.query(insertSql, values);
|
||||
insertedCount++;
|
||||
}
|
||||
}
|
||||
@@ -1749,7 +1824,14 @@ export class NodeFlowExecutionService {
|
||||
updatedCount,
|
||||
totalCount: insertedCount + updatedCount,
|
||||
};
|
||||
});
|
||||
};
|
||||
|
||||
// 🔥 클라이언트가 전달되었으면 사용, 아니면 독립 트랜잭션 생성
|
||||
if (client) {
|
||||
return executeUpsert(client);
|
||||
} else {
|
||||
return transaction(executeUpsert);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -2401,4 +2483,16 @@ export class NodeFlowExecutionService {
|
||||
);
|
||||
return expandedRows;
|
||||
}
|
||||
|
||||
/**
|
||||
* 🔥 액션 노드 여부 확인
|
||||
*/
|
||||
private static isActionNode(nodeType: NodeType): boolean {
|
||||
return [
|
||||
"insertAction",
|
||||
"updateAction",
|
||||
"deleteAction",
|
||||
"upsertAction",
|
||||
].includes(nodeType);
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user