제어 집계함수 노드 추가
This commit is contained in:
@@ -528,6 +528,9 @@ export class NodeFlowExecutionService {
|
||||
case "dataTransform":
|
||||
return this.executeDataTransform(node, inputData, context);
|
||||
|
||||
case "aggregate":
|
||||
return this.executeAggregate(node, inputData, context);
|
||||
|
||||
case "insertAction":
|
||||
return this.executeInsertAction(node, inputData, context, client);
|
||||
|
||||
@@ -3197,4 +3200,161 @@ export class NodeFlowExecutionService {
|
||||
"upsertAction",
|
||||
].includes(nodeType);
|
||||
}
|
||||
|
||||
/**
|
||||
* 집계 노드 실행 (SUM, COUNT, AVG, MIN, MAX 등)
|
||||
*/
|
||||
private static async executeAggregate(
|
||||
node: FlowNode,
|
||||
inputData: any,
|
||||
context: ExecutionContext
|
||||
): Promise<any[]> {
|
||||
const { groupByFields = [], aggregations = [], havingConditions = [] } = node.data;
|
||||
|
||||
logger.info(`📊 집계 노드 실행: ${node.data.displayName || node.id}`);
|
||||
|
||||
// 입력 데이터가 없으면 빈 배열 반환
|
||||
if (!inputData || !Array.isArray(inputData) || inputData.length === 0) {
|
||||
logger.warn("⚠️ 집계할 입력 데이터가 없습니다.");
|
||||
return [];
|
||||
}
|
||||
|
||||
logger.info(`📥 입력 데이터: ${inputData.length}건`);
|
||||
logger.info(`📊 그룹 기준: ${groupByFields.length > 0 ? groupByFields.map((f: any) => f.field).join(", ") : "전체"}`);
|
||||
logger.info(`📊 집계 연산: ${aggregations.length}개`);
|
||||
|
||||
// 그룹화 수행
|
||||
const groups = new Map<string, any[]>();
|
||||
|
||||
for (const row of inputData) {
|
||||
// 그룹 키 생성
|
||||
const groupKey = groupByFields.length > 0
|
||||
? groupByFields.map((f: any) => String(row[f.field] ?? "")).join("|||")
|
||||
: "__ALL__";
|
||||
|
||||
if (!groups.has(groupKey)) {
|
||||
groups.set(groupKey, []);
|
||||
}
|
||||
groups.get(groupKey)!.push(row);
|
||||
}
|
||||
|
||||
logger.info(`📊 그룹 수: ${groups.size}개`);
|
||||
|
||||
// 각 그룹에 대해 집계 수행
|
||||
const results: any[] = [];
|
||||
|
||||
for (const [groupKey, groupRows] of groups) {
|
||||
const resultRow: any = {};
|
||||
|
||||
// 그룹 기준 필드값 추가
|
||||
if (groupByFields.length > 0) {
|
||||
const keyValues = groupKey.split("|||");
|
||||
groupByFields.forEach((field: any, idx: number) => {
|
||||
resultRow[field.field] = keyValues[idx];
|
||||
});
|
||||
}
|
||||
|
||||
// 각 집계 연산 수행
|
||||
for (const agg of aggregations) {
|
||||
const { sourceField, function: aggFunc, outputField } = agg;
|
||||
|
||||
if (!outputField) continue;
|
||||
|
||||
let aggregatedValue: any;
|
||||
|
||||
switch (aggFunc) {
|
||||
case "SUM":
|
||||
aggregatedValue = groupRows.reduce((sum: number, row: any) => {
|
||||
const val = parseFloat(row[sourceField]);
|
||||
return sum + (isNaN(val) ? 0 : val);
|
||||
}, 0);
|
||||
break;
|
||||
|
||||
case "COUNT":
|
||||
aggregatedValue = groupRows.length;
|
||||
break;
|
||||
|
||||
case "AVG":
|
||||
const sum = groupRows.reduce((acc: number, row: any) => {
|
||||
const val = parseFloat(row[sourceField]);
|
||||
return acc + (isNaN(val) ? 0 : val);
|
||||
}, 0);
|
||||
aggregatedValue = groupRows.length > 0 ? sum / groupRows.length : 0;
|
||||
break;
|
||||
|
||||
case "MIN":
|
||||
aggregatedValue = groupRows.reduce((min: number | null, row: any) => {
|
||||
const val = parseFloat(row[sourceField]);
|
||||
if (isNaN(val)) return min;
|
||||
return min === null ? val : Math.min(min, val);
|
||||
}, null);
|
||||
break;
|
||||
|
||||
case "MAX":
|
||||
aggregatedValue = groupRows.reduce((max: number | null, row: any) => {
|
||||
const val = parseFloat(row[sourceField]);
|
||||
if (isNaN(val)) return max;
|
||||
return max === null ? val : Math.max(max, val);
|
||||
}, null);
|
||||
break;
|
||||
|
||||
case "FIRST":
|
||||
aggregatedValue = groupRows.length > 0 ? groupRows[0][sourceField] : null;
|
||||
break;
|
||||
|
||||
case "LAST":
|
||||
aggregatedValue = groupRows.length > 0 ? groupRows[groupRows.length - 1][sourceField] : null;
|
||||
break;
|
||||
|
||||
default:
|
||||
logger.warn(`⚠️ 지원하지 않는 집계 함수: ${aggFunc}`);
|
||||
aggregatedValue = null;
|
||||
}
|
||||
|
||||
resultRow[outputField] = aggregatedValue;
|
||||
logger.info(` ${aggFunc}(${sourceField}) → ${outputField}: ${aggregatedValue}`);
|
||||
}
|
||||
|
||||
results.push(resultRow);
|
||||
}
|
||||
|
||||
// HAVING 조건 적용 (집계 후 필터링)
|
||||
let filteredResults = results;
|
||||
if (havingConditions && havingConditions.length > 0) {
|
||||
filteredResults = results.filter((row) => {
|
||||
return havingConditions.every((condition: any) => {
|
||||
const fieldValue = row[condition.field];
|
||||
const compareValue = parseFloat(condition.value);
|
||||
|
||||
switch (condition.operator) {
|
||||
case "=":
|
||||
return fieldValue === compareValue;
|
||||
case "!=":
|
||||
return fieldValue !== compareValue;
|
||||
case ">":
|
||||
return fieldValue > compareValue;
|
||||
case ">=":
|
||||
return fieldValue >= compareValue;
|
||||
case "<":
|
||||
return fieldValue < compareValue;
|
||||
case "<=":
|
||||
return fieldValue <= compareValue;
|
||||
default:
|
||||
return true;
|
||||
}
|
||||
});
|
||||
});
|
||||
|
||||
logger.info(`📊 HAVING 필터링: ${results.length}건 → ${filteredResults.length}건`);
|
||||
}
|
||||
|
||||
logger.info(`✅ 집계 완료: ${filteredResults.length}건 결과`);
|
||||
|
||||
// 결과 샘플 출력
|
||||
if (filteredResults.length > 0) {
|
||||
logger.info(`📄 결과 샘플:`, JSON.stringify(filteredResults[0], null, 2));
|
||||
}
|
||||
|
||||
return filteredResults;
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user