feat: 노드 기반 데이터 플로우 시스템 구현

- 노드 에디터 UI 구현 (React Flow 기반)
  - TableSource, DataTransform, INSERT, UPDATE, DELETE, UPSERT 노드
  - 드래그앤드롭 노드 추가 및 연결
  - 속성 패널을 통한 노드 설정
  - 실시간 필드 라벨 표시 (column_labels 테이블 연동)

- 데이터 변환 노드 (DataTransform) 기능
  - EXPLODE: 구분자로 1개 행 → 여러 행 확장
  - UPPERCASE, LOWERCASE, TRIM, CONCAT, SPLIT, REPLACE 등 12가지 변환 타입
  - In-place 변환 지원 (타겟 필드 생략 시 소스 필드 덮어쓰기)
  - 변환된 필드가 하위 액션 노드에 자동 전달

- 노드 플로우 실행 엔진
  - 위상 정렬을 통한 노드 실행 순서 결정
  - 레벨별 병렬 실행 (Promise.allSettled)
  - 부분 실패 허용 (한 노드 실패 시 연결된 하위 노드만 스킵)
  - 트랜잭션 기반 안전한 데이터 처리

- UPSERT 액션 로직 구현
  - DB 제약 조건 없이 SELECT → UPDATE or INSERT 방식
  - 복합 충돌 키 지원 (예: sales_no + product_name)
  - 파라미터 인덱스 정확한 매핑

- 데이터 소스 자동 감지
  - 테이블 선택 데이터 (selectedRowsData) 자동 주입
  - 폼 입력 데이터 (formData) 자동 주입
  - TableSource 노드가 외부 데이터 우선 사용

- 버튼 컴포넌트 통합
  - 기존 관계 실행 + 새 노드 플로우 실행 하이브리드 지원
  - 노드 플로우 선택 UI 추가
  - API 클라이언트 통합 (Axios)

- 개발 문서 작성
  - 노드 기반 제어 시스템 개선 계획
  - 노드 연결 규칙 설계
  - 노드 실행 엔진 설계
  - 노드 구조 개선안
  - 버튼 통합 분석
This commit is contained in:
kjs
2025-10-02 16:22:29 +09:00
parent db25b0435f
commit 0743786f9b
51 changed files with 13838 additions and 131 deletions

View File

@@ -0,0 +1,237 @@
/**
* 노드 기반 데이터 플로우 API
*/
import { Router, Request, Response } from "express";
import { query, queryOne } from "../../database/db";
import { logger } from "../../utils/logger";
import { NodeFlowExecutionService } from "../../services/nodeFlowExecutionService";
const router = Router();
/**
* 플로우 목록 조회
*/
router.get("/", async (req: Request, res: Response) => {
try {
const flows = await query(
`
SELECT
flow_id as "flowId",
flow_name as "flowName",
flow_description as "flowDescription",
created_at as "createdAt",
updated_at as "updatedAt"
FROM node_flows
ORDER BY updated_at DESC
`,
[]
);
return res.json({
success: true,
data: flows,
});
} catch (error) {
logger.error("플로우 목록 조회 실패:", error);
return res.status(500).json({
success: false,
message: "플로우 목록을 조회하지 못했습니다.",
});
}
});
/**
* 플로우 상세 조회
*/
router.get("/:flowId", async (req: Request, res: Response) => {
try {
const { flowId } = req.params;
const flow = await queryOne(
`
SELECT
flow_id as "flowId",
flow_name as "flowName",
flow_description as "flowDescription",
flow_data as "flowData",
created_at as "createdAt",
updated_at as "updatedAt"
FROM node_flows
WHERE flow_id = $1
`,
[flowId]
);
if (!flow) {
return res.status(404).json({
success: false,
message: "플로우를 찾을 수 없습니다.",
});
}
return res.json({
success: true,
data: flow,
});
} catch (error) {
logger.error("플로우 조회 실패:", error);
return res.status(500).json({
success: false,
message: "플로우를 조회하지 못했습니다.",
});
}
});
/**
* 플로우 저장 (신규)
*/
router.post("/", async (req: Request, res: Response) => {
try {
const { flowName, flowDescription, flowData } = req.body;
if (!flowName || !flowData) {
return res.status(400).json({
success: false,
message: "플로우 이름과 데이터는 필수입니다.",
});
}
const result = await queryOne(
`
INSERT INTO node_flows (flow_name, flow_description, flow_data)
VALUES ($1, $2, $3)
RETURNING flow_id as "flowId"
`,
[flowName, flowDescription || "", flowData]
);
logger.info(`플로우 저장 성공: ${result.flowId}`);
return res.json({
success: true,
message: "플로우가 저장되었습니다.",
data: {
flowId: result.flowId,
},
});
} catch (error) {
logger.error("플로우 저장 실패:", error);
return res.status(500).json({
success: false,
message: "플로우를 저장하지 못했습니다.",
});
}
});
/**
* 플로우 수정
*/
router.put("/", async (req: Request, res: Response) => {
try {
const { flowId, flowName, flowDescription, flowData } = req.body;
if (!flowId || !flowName || !flowData) {
return res.status(400).json({
success: false,
message: "플로우 ID, 이름, 데이터는 필수입니다.",
});
}
await query(
`
UPDATE node_flows
SET flow_name = $1,
flow_description = $2,
flow_data = $3,
updated_at = NOW()
WHERE flow_id = $4
`,
[flowName, flowDescription || "", flowData, flowId]
);
logger.info(`플로우 수정 성공: ${flowId}`);
return res.json({
success: true,
message: "플로우가 수정되었습니다.",
data: {
flowId,
},
});
} catch (error) {
logger.error("플로우 수정 실패:", error);
return res.status(500).json({
success: false,
message: "플로우를 수정하지 못했습니다.",
});
}
});
/**
* 플로우 삭제
*/
router.delete("/:flowId", async (req: Request, res: Response) => {
try {
const { flowId } = req.params;
await query(
`
DELETE FROM node_flows
WHERE flow_id = $1
`,
[flowId]
);
logger.info(`플로우 삭제 성공: ${flowId}`);
return res.json({
success: true,
message: "플로우가 삭제되었습니다.",
});
} catch (error) {
logger.error("플로우 삭제 실패:", error);
return res.status(500).json({
success: false,
message: "플로우를 삭제하지 못했습니다.",
});
}
});
/**
* 플로우 실행
* POST /api/dataflow/node-flows/:flowId/execute
*/
router.post("/:flowId/execute", async (req: Request, res: Response) => {
try {
const { flowId } = req.params;
const contextData = req.body;
logger.info(`🚀 플로우 실행 요청: flowId=${flowId}`, {
contextDataKeys: Object.keys(contextData),
});
// 플로우 실행
const result = await NodeFlowExecutionService.executeFlow(
parseInt(flowId, 10),
contextData
);
return res.json({
success: result.success,
message: result.message,
data: result,
});
} catch (error) {
logger.error("플로우 실행 실패:", error);
return res.status(500).json({
success: false,
message:
error instanceof Error
? error.message
: "플로우 실행 중 오류가 발생했습니다.",
});
}
});
export default router;

View File

@@ -21,6 +21,7 @@ import {
testConditionalConnection,
executeConditionalActions,
} from "../controllers/conditionalConnectionController";
import nodeFlowsRouter from "./dataflow/node-flows";
const router = express.Router();
@@ -146,4 +147,10 @@ router.post("/diagrams/:diagramId/test-conditions", testConditionalConnection);
*/
router.post("/diagrams/:diagramId/execute-actions", executeConditionalActions);
/**
* 노드 기반 플로우 관리
* /api/dataflow/node-flows/*
*/
router.use("/node-flows", nodeFlowsRouter);
export default router;

File diff suppressed because it is too large Load Diff