Merge branch 'main' of http://39.117.244.52:3000/kjs/ERP-node into feature/report
This commit is contained in:
@@ -61,23 +61,31 @@ export class BatchService {
|
||||
|
||||
// 배치 설정 조회 (매핑 포함 - 서브쿼리 사용)
|
||||
const batchConfigs = await query<any>(
|
||||
`SELECT bc.*,
|
||||
`SELECT bc.id, bc.batch_name, bc.description, bc.cron_schedule,
|
||||
bc.is_active, bc.company_code, bc.created_date, bc.created_by,
|
||||
bc.updated_date, bc.updated_by,
|
||||
COALESCE(
|
||||
json_agg(
|
||||
json_build_object(
|
||||
'mapping_id', bm.mapping_id,
|
||||
'batch_id', bm.batch_id,
|
||||
'source_column', bm.source_column,
|
||||
'target_column', bm.target_column,
|
||||
'transformation_rule', bm.transformation_rule
|
||||
'id', bm.id,
|
||||
'batch_config_id', bm.batch_config_id,
|
||||
'from_connection_type', bm.from_connection_type,
|
||||
'from_connection_id', bm.from_connection_id,
|
||||
'from_table_name', bm.from_table_name,
|
||||
'from_column_name', bm.from_column_name,
|
||||
'to_connection_type', bm.to_connection_type,
|
||||
'to_connection_id', bm.to_connection_id,
|
||||
'to_table_name', bm.to_table_name,
|
||||
'to_column_name', bm.to_column_name,
|
||||
'mapping_order', bm.mapping_order
|
||||
)
|
||||
) FILTER (WHERE bm.mapping_id IS NOT NULL),
|
||||
) FILTER (WHERE bm.id IS NOT NULL),
|
||||
'[]'
|
||||
) as batch_mappings
|
||||
FROM batch_configs bc
|
||||
LEFT JOIN batch_mappings bm ON bc.batch_id = bm.batch_id
|
||||
LEFT JOIN batch_mappings bm ON bc.id = bm.batch_config_id
|
||||
${whereClause}
|
||||
GROUP BY bc.batch_id
|
||||
GROUP BY bc.id
|
||||
ORDER BY bc.is_active DESC, bc.batch_name ASC
|
||||
LIMIT $${paramIndex} OFFSET $${paramIndex + 1}`,
|
||||
[...values, limit, offset]
|
||||
@@ -85,7 +93,7 @@ export class BatchService {
|
||||
|
||||
// 전체 개수 조회
|
||||
const countResult = await queryOne<{ count: string }>(
|
||||
`SELECT COUNT(DISTINCT bc.batch_id) as count
|
||||
`SELECT COUNT(DISTINCT bc.id) as count
|
||||
FROM batch_configs bc
|
||||
${whereClause}`,
|
||||
values
|
||||
@@ -121,29 +129,34 @@ export class BatchService {
|
||||
): Promise<ApiResponse<BatchConfig>> {
|
||||
try {
|
||||
const batchConfig = await queryOne<any>(
|
||||
`SELECT bc.*,
|
||||
`SELECT bc.id, bc.batch_name, bc.description, bc.cron_schedule,
|
||||
bc.is_active, bc.company_code, bc.created_date, bc.created_by,
|
||||
bc.updated_date, bc.updated_by,
|
||||
COALESCE(
|
||||
json_agg(
|
||||
json_build_object(
|
||||
'mapping_id', bm.mapping_id,
|
||||
'batch_id', bm.batch_id,
|
||||
'id', bm.id,
|
||||
'batch_config_id', bm.batch_config_id,
|
||||
'from_connection_type', bm.from_connection_type,
|
||||
'from_connection_id', bm.from_connection_id,
|
||||
'from_table_name', bm.from_table_name,
|
||||
'from_column_name', bm.from_column_name,
|
||||
'from_column_type', bm.from_column_type,
|
||||
'to_connection_type', bm.to_connection_type,
|
||||
'to_connection_id', bm.to_connection_id,
|
||||
'to_table_name', bm.to_table_name,
|
||||
'to_column_name', bm.to_column_name,
|
||||
'mapping_order', bm.mapping_order,
|
||||
'source_column', bm.source_column,
|
||||
'target_column', bm.target_column,
|
||||
'transformation_rule', bm.transformation_rule
|
||||
'to_column_type', bm.to_column_type,
|
||||
'mapping_order', bm.mapping_order
|
||||
)
|
||||
ORDER BY bm.from_table_name ASC, bm.from_column_name ASC, bm.mapping_order ASC
|
||||
) FILTER (WHERE bm.mapping_id IS NOT NULL),
|
||||
) FILTER (WHERE bm.id IS NOT NULL),
|
||||
'[]'
|
||||
) as batch_mappings
|
||||
FROM batch_configs bc
|
||||
LEFT JOIN batch_mappings bm ON bc.batch_id = bm.batch_id
|
||||
LEFT JOIN batch_mappings bm ON bc.id = bm.batch_config_id
|
||||
WHERE bc.id = $1
|
||||
GROUP BY bc.batch_id`,
|
||||
GROUP BY bc.id`,
|
||||
[id]
|
||||
);
|
||||
|
||||
@@ -268,16 +281,16 @@ export class BatchService {
|
||||
COALESCE(
|
||||
json_agg(
|
||||
json_build_object(
|
||||
'mapping_id', bm.mapping_id,
|
||||
'batch_id', bm.batch_id
|
||||
'id', bm.id,
|
||||
'batch_config_id', bm.batch_config_id
|
||||
)
|
||||
) FILTER (WHERE bm.mapping_id IS NOT NULL),
|
||||
) FILTER (WHERE bm.id IS NOT NULL),
|
||||
'[]'
|
||||
) as batch_mappings
|
||||
FROM batch_configs bc
|
||||
LEFT JOIN batch_mappings bm ON bc.batch_id = bm.batch_id
|
||||
LEFT JOIN batch_mappings bm ON bc.id = bm.batch_config_id
|
||||
WHERE bc.id = $1
|
||||
GROUP BY bc.batch_id`,
|
||||
GROUP BY bc.id`,
|
||||
[id]
|
||||
);
|
||||
|
||||
|
||||
@@ -10,6 +10,7 @@
|
||||
|
||||
import { query, queryOne, transaction } from "../database/db";
|
||||
import { logger } from "../utils/logger";
|
||||
import axios from "axios";
|
||||
|
||||
// ===== 타입 정의 =====
|
||||
|
||||
@@ -410,6 +411,9 @@ export class NodeFlowExecutionService {
|
||||
case "tableSource":
|
||||
return this.executeTableSource(node, context);
|
||||
|
||||
case "restAPISource":
|
||||
return this.executeRestAPISource(node, context);
|
||||
|
||||
case "dataTransform":
|
||||
return this.executeDataTransform(node, inputData, context);
|
||||
|
||||
@@ -440,6 +444,123 @@ export class NodeFlowExecutionService {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* REST API 소스 노드 실행
|
||||
*/
|
||||
private static async executeRestAPISource(
|
||||
node: FlowNode,
|
||||
context: ExecutionContext
|
||||
): Promise<any[]> {
|
||||
const {
|
||||
url,
|
||||
method = "GET",
|
||||
headers = {},
|
||||
body,
|
||||
timeout = 30000,
|
||||
responseMapping,
|
||||
authentication,
|
||||
} = node.data;
|
||||
|
||||
if (!url) {
|
||||
throw new Error("REST API URL이 설정되지 않았습니다.");
|
||||
}
|
||||
|
||||
logger.info(`🌐 REST API 호출: ${method} ${url}`);
|
||||
|
||||
try {
|
||||
// 헤더 설정
|
||||
const requestHeaders: any = { ...headers };
|
||||
|
||||
// 인증 헤더 추가
|
||||
if (authentication) {
|
||||
if (authentication.type === "bearer" && authentication.token) {
|
||||
requestHeaders["Authorization"] = `Bearer ${authentication.token}`;
|
||||
} else if (
|
||||
authentication.type === "basic" &&
|
||||
authentication.username &&
|
||||
authentication.password
|
||||
) {
|
||||
const credentials = Buffer.from(
|
||||
`${authentication.username}:${authentication.password}`
|
||||
).toString("base64");
|
||||
requestHeaders["Authorization"] = `Basic ${credentials}`;
|
||||
} else if (authentication.type === "apikey" && authentication.token) {
|
||||
const headerName = authentication.apiKeyHeader || "X-API-Key";
|
||||
requestHeaders[headerName] = authentication.token;
|
||||
}
|
||||
}
|
||||
|
||||
if (!requestHeaders["Content-Type"]) {
|
||||
requestHeaders["Content-Type"] = "application/json";
|
||||
}
|
||||
|
||||
// API 호출
|
||||
const response = await axios({
|
||||
method: method.toLowerCase(),
|
||||
url,
|
||||
headers: requestHeaders,
|
||||
data: body,
|
||||
timeout,
|
||||
});
|
||||
|
||||
logger.info(`✅ REST API 응답 수신: ${response.status}`);
|
||||
|
||||
let responseData = response.data;
|
||||
|
||||
// 🔥 표준 API 응답 형식 자동 감지 { success, message, data }
|
||||
if (
|
||||
!responseMapping &&
|
||||
responseData &&
|
||||
typeof responseData === "object" &&
|
||||
"success" in responseData &&
|
||||
"data" in responseData
|
||||
) {
|
||||
logger.info("🔍 표준 API 응답 형식 감지, data 속성 자동 추출");
|
||||
responseData = responseData.data;
|
||||
}
|
||||
|
||||
// responseMapping이 있으면 해당 경로의 데이터 추출
|
||||
if (responseMapping && responseData) {
|
||||
logger.info(`🔍 응답 매핑 적용: ${responseMapping}`);
|
||||
const path = responseMapping.split(".");
|
||||
for (const key of path) {
|
||||
if (
|
||||
responseData &&
|
||||
typeof responseData === "object" &&
|
||||
key in responseData
|
||||
) {
|
||||
responseData = responseData[key];
|
||||
} else {
|
||||
logger.warn(
|
||||
`⚠️ 응답 매핑 경로를 찾을 수 없습니다: ${responseMapping}`
|
||||
);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// 배열이 아니면 배열로 변환
|
||||
if (!Array.isArray(responseData)) {
|
||||
logger.info("🔄 단일 객체를 배열로 변환");
|
||||
responseData = [responseData];
|
||||
}
|
||||
|
||||
logger.info(`📦 REST API 데이터 ${responseData.length}건 반환`);
|
||||
|
||||
// 첫 번째 데이터 샘플 상세 로깅
|
||||
if (responseData.length > 0) {
|
||||
console.log("🔍 REST API 응답 데이터 샘플 (첫 번째 항목):");
|
||||
console.log(JSON.stringify(responseData[0], null, 2));
|
||||
console.log("🔑 사용 가능한 필드명:", Object.keys(responseData[0]));
|
||||
}
|
||||
|
||||
return responseData;
|
||||
} catch (error: any) {
|
||||
logger.error(`❌ REST API 호출 실패:`, error.message);
|
||||
throw new Error(`REST API 호출 실패: ${error.message}`);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 테이블 소스 노드 실행
|
||||
*/
|
||||
@@ -521,6 +642,19 @@ export class NodeFlowExecutionService {
|
||||
): Promise<any> {
|
||||
const { targetTable, fieldMappings } = node.data;
|
||||
|
||||
logger.info(`💾 INSERT 노드 실행: ${targetTable}`);
|
||||
console.log(
|
||||
"📥 입력 데이터 타입:",
|
||||
typeof inputData,
|
||||
Array.isArray(inputData) ? `배열(${inputData.length}건)` : "단일 객체"
|
||||
);
|
||||
|
||||
if (inputData && inputData.length > 0) {
|
||||
console.log("📄 첫 번째 입력 데이터:");
|
||||
console.log(JSON.stringify(inputData[0], null, 2));
|
||||
console.log("🔑 입력 데이터 필드명:", Object.keys(inputData[0]));
|
||||
}
|
||||
|
||||
return transaction(async (client) => {
|
||||
const dataArray = Array.isArray(inputData) ? inputData : [inputData];
|
||||
let insertedCount = 0;
|
||||
@@ -529,12 +663,17 @@ export class NodeFlowExecutionService {
|
||||
const fields: string[] = [];
|
||||
const values: any[] = [];
|
||||
|
||||
console.log("🗺️ 필드 매핑 처리 중...");
|
||||
fieldMappings.forEach((mapping: any) => {
|
||||
fields.push(mapping.targetField);
|
||||
const value =
|
||||
mapping.staticValue !== undefined
|
||||
? mapping.staticValue
|
||||
: data[mapping.sourceField];
|
||||
|
||||
console.log(
|
||||
` ${mapping.sourceField} → ${mapping.targetField}: ${value === undefined ? "❌ undefined" : "✅ " + value}`
|
||||
);
|
||||
values.push(value);
|
||||
});
|
||||
|
||||
@@ -543,6 +682,9 @@ export class NodeFlowExecutionService {
|
||||
VALUES (${fields.map((_, i) => `$${i + 1}`).join(", ")})
|
||||
`;
|
||||
|
||||
console.log("📝 실행할 SQL:", sql);
|
||||
console.log("📊 바인딩 값:", values);
|
||||
|
||||
await client.query(sql, values);
|
||||
insertedCount++;
|
||||
}
|
||||
@@ -682,7 +824,6 @@ export class NodeFlowExecutionService {
|
||||
|
||||
logger.info(`🌐 REST API INSERT 시작: ${apiMethod} ${apiEndpoint}`);
|
||||
|
||||
const axios = require("axios");
|
||||
const dataArray = Array.isArray(inputData) ? inputData : [inputData];
|
||||
const results: any[] = [];
|
||||
|
||||
@@ -895,6 +1036,19 @@ export class NodeFlowExecutionService {
|
||||
): Promise<any> {
|
||||
const { targetTable, fieldMappings, whereConditions } = node.data;
|
||||
|
||||
logger.info(`🔄 UPDATE 노드 실행: ${targetTable}`);
|
||||
console.log(
|
||||
"📥 입력 데이터 타입:",
|
||||
typeof inputData,
|
||||
Array.isArray(inputData) ? `배열(${inputData.length}건)` : "단일 객체"
|
||||
);
|
||||
|
||||
if (inputData && inputData.length > 0) {
|
||||
console.log("📄 첫 번째 입력 데이터:");
|
||||
console.log(JSON.stringify(inputData[0], null, 2));
|
||||
console.log("🔑 입력 데이터 필드명:", Object.keys(inputData[0]));
|
||||
}
|
||||
|
||||
return transaction(async (client) => {
|
||||
const dataArray = Array.isArray(inputData) ? inputData : [inputData];
|
||||
let updatedCount = 0;
|
||||
@@ -904,11 +1058,16 @@ export class NodeFlowExecutionService {
|
||||
const values: any[] = [];
|
||||
let paramIndex = 1;
|
||||
|
||||
console.log("🗺️ 필드 매핑 처리 중...");
|
||||
fieldMappings.forEach((mapping: any) => {
|
||||
const value =
|
||||
mapping.staticValue !== undefined
|
||||
? mapping.staticValue
|
||||
: data[mapping.sourceField];
|
||||
|
||||
console.log(
|
||||
` ${mapping.sourceField} → ${mapping.targetField}: ${value === undefined ? "❌ undefined" : "✅ " + value}`
|
||||
);
|
||||
setClauses.push(`${mapping.targetField} = $${paramIndex}`);
|
||||
values.push(value);
|
||||
paramIndex++;
|
||||
@@ -926,6 +1085,9 @@ export class NodeFlowExecutionService {
|
||||
${whereClause}
|
||||
`;
|
||||
|
||||
console.log("📝 실행할 SQL:", sql);
|
||||
console.log("📊 바인딩 값:", values);
|
||||
|
||||
const result = await client.query(sql, values);
|
||||
updatedCount += result.rowCount || 0;
|
||||
}
|
||||
@@ -1086,7 +1248,6 @@ export class NodeFlowExecutionService {
|
||||
|
||||
logger.info(`🌐 REST API UPDATE 시작: ${apiMethod} ${apiEndpoint}`);
|
||||
|
||||
const axios = require("axios");
|
||||
const dataArray = Array.isArray(inputData) ? inputData : [inputData];
|
||||
const results: any[] = [];
|
||||
|
||||
@@ -1197,15 +1358,31 @@ export class NodeFlowExecutionService {
|
||||
): Promise<any> {
|
||||
const { targetTable, whereConditions } = node.data;
|
||||
|
||||
logger.info(`🗑️ DELETE 노드 실행: ${targetTable}`);
|
||||
console.log(
|
||||
"📥 입력 데이터 타입:",
|
||||
typeof inputData,
|
||||
Array.isArray(inputData) ? `배열(${inputData.length}건)` : "단일 객체"
|
||||
);
|
||||
|
||||
if (inputData && inputData.length > 0) {
|
||||
console.log("📄 첫 번째 입력 데이터:");
|
||||
console.log(JSON.stringify(inputData[0], null, 2));
|
||||
console.log("🔑 입력 데이터 필드명:", Object.keys(inputData[0]));
|
||||
}
|
||||
|
||||
return transaction(async (client) => {
|
||||
const dataArray = Array.isArray(inputData) ? inputData : [inputData];
|
||||
let deletedCount = 0;
|
||||
|
||||
for (const data of dataArray) {
|
||||
console.log("🔍 WHERE 조건 처리 중...");
|
||||
const whereClause = this.buildWhereClause(whereConditions, data, 1);
|
||||
|
||||
const sql = `DELETE FROM ${targetTable} ${whereClause}`;
|
||||
|
||||
console.log("📝 실행할 SQL:", sql);
|
||||
|
||||
const result = await client.query(sql, []);
|
||||
deletedCount += result.rowCount || 0;
|
||||
}
|
||||
@@ -1339,7 +1516,6 @@ export class NodeFlowExecutionService {
|
||||
|
||||
logger.info(`🌐 REST API DELETE 시작: ${apiEndpoint}`);
|
||||
|
||||
const axios = require("axios");
|
||||
const dataArray = Array.isArray(inputData) ? inputData : [inputData];
|
||||
const results: any[] = [];
|
||||
|
||||
@@ -1440,6 +1616,20 @@ export class NodeFlowExecutionService {
|
||||
throw new Error("UPSERT 액션에 충돌 키(Conflict Keys)가 필요합니다.");
|
||||
}
|
||||
|
||||
logger.info(`🔀 UPSERT 노드 실행: ${targetTable}`);
|
||||
console.log(
|
||||
"📥 입력 데이터 타입:",
|
||||
typeof inputData,
|
||||
Array.isArray(inputData) ? `배열(${inputData.length}건)` : "단일 객체"
|
||||
);
|
||||
|
||||
if (inputData && inputData.length > 0) {
|
||||
console.log("📄 첫 번째 입력 데이터:");
|
||||
console.log(JSON.stringify(inputData[0], null, 2));
|
||||
console.log("🔑 입력 데이터 필드명:", Object.keys(inputData[0]));
|
||||
}
|
||||
console.log("🔑 충돌 키:", conflictKeys);
|
||||
|
||||
return transaction(async (client) => {
|
||||
const dataArray = Array.isArray(inputData) ? inputData : [inputData];
|
||||
let insertedCount = 0;
|
||||
@@ -1466,7 +1656,10 @@ export class NodeFlowExecutionService {
|
||||
(key: string) => conflictKeyValues[key]
|
||||
);
|
||||
|
||||
const checkSql = `SELECT id FROM ${targetTable} WHERE ${whereConditions} LIMIT 1`;
|
||||
console.log("🔍 존재 여부 확인 - WHERE 조건:", whereConditions);
|
||||
console.log("🔍 존재 여부 확인 - 바인딩 값:", whereValues);
|
||||
|
||||
const checkSql = `SELECT 1 FROM ${targetTable} WHERE ${whereConditions} LIMIT 1`;
|
||||
const existingRow = await client.query(checkSql, whereValues);
|
||||
|
||||
if (existingRow.rows.length > 0) {
|
||||
@@ -1780,7 +1973,6 @@ export class NodeFlowExecutionService {
|
||||
|
||||
logger.info(`🌐 REST API UPSERT 시작: ${apiMethod} ${apiEndpoint}`);
|
||||
|
||||
const axios = require("axios");
|
||||
const dataArray = Array.isArray(inputData) ? inputData : [inputData];
|
||||
const results: any[] = [];
|
||||
|
||||
@@ -1977,6 +2169,20 @@ export class NodeFlowExecutionService {
|
||||
|
||||
const success = summary.failed === 0;
|
||||
|
||||
// 실패한 노드 상세 로깅
|
||||
if (!success) {
|
||||
const failedNodes = nodeSummaries.filter((n) => n.status === "failed");
|
||||
logger.error(
|
||||
`❌ 실패한 노드들:`,
|
||||
failedNodes.map((n) => ({
|
||||
nodeId: n.nodeId,
|
||||
nodeName: n.nodeName,
|
||||
nodeType: n.nodeType,
|
||||
error: n.error,
|
||||
}))
|
||||
);
|
||||
}
|
||||
|
||||
return {
|
||||
success,
|
||||
message: success
|
||||
|
||||
Reference in New Issue
Block a user