Files
vexplor/docs/노드플로우_개선사항.md

592 lines
17 KiB
Markdown

# 노드 플로우 기능 개선 사항
> 작성일: 2024-12-08
> 상태: 분석 완료, 개선 대기
## 현재 구현 상태
### 잘 구현된 기능
| 기능 | 상태 | 설명 |
|------|------|------|
| 위상 정렬 실행 | 완료 | DAG 기반 레벨별 실행 |
| 트랜잭션 관리 | 완료 | 전체 플로우 단일 트랜잭션, 실패 시 자동 롤백 |
| 병렬 실행 | 완료 | 같은 레벨 노드 `Promise.allSettled`로 병렬 처리 |
| CRUD 액션 | 완료 | INSERT, UPDATE, DELETE, UPSERT 지원 |
| 외부 DB 연동 | 완료 | PostgreSQL, MySQL, MSSQL, Oracle 지원 |
| REST API 연동 | 완료 | GET, POST, PUT, DELETE 지원 |
| 조건 분기 | 완료 | 다양한 연산자 지원 |
| 데이터 변환 | 부분 완료 | UPPERCASE, TRIM, EXPLODE 등 기본 변환 |
| 집계 함수 | 완료 | SUM, COUNT, AVG, MIN, MAX, FIRST, LAST |
### 관련 파일
- **백엔드 실행 엔진**: `backend-node/src/services/nodeFlowExecutionService.ts`
- **백엔드 라우트**: `backend-node/src/routes/dataflow/node-flows.ts`
- **프론트엔드 API**: `frontend/lib/api/nodeFlows.ts`
- **프론트엔드 에디터**: `frontend/components/dataflow/node-editor/FlowEditor.tsx`
- **타입 정의**: `backend-node/src/types/flow.ts`
---
## 개선 필요 사항
### 1. [우선순위 높음] 실행 이력 로깅
**현재 상태**: 플로우 실행 이력이 저장되지 않음
**문제점**:
- 언제, 누가, 어떤 플로우를 실행했는지 추적 불가
- 실패 원인 분석 어려움
- 감사(Audit) 요구사항 충족 불가
**개선 방안**:
```sql
-- db/migrations/XXX_add_node_flow_execution_log.sql
CREATE TABLE node_flow_execution_log (
id SERIAL PRIMARY KEY,
flow_id INTEGER NOT NULL REFERENCES node_flows(flow_id) ON DELETE CASCADE,
execution_status VARCHAR(20) NOT NULL, -- 'success', 'failed', 'partial'
execution_time_ms INTEGER,
total_nodes INTEGER,
success_nodes INTEGER,
failed_nodes INTEGER,
skipped_nodes INTEGER,
executed_by VARCHAR(50),
company_code VARCHAR(20),
context_data JSONB,
result_summary JSONB,
error_message TEXT,
created_at TIMESTAMP DEFAULT NOW()
);
CREATE INDEX idx_flow_execution_log_flow_id ON node_flow_execution_log(flow_id);
CREATE INDEX idx_flow_execution_log_created_at ON node_flow_execution_log(created_at DESC);
CREATE INDEX idx_flow_execution_log_company_code ON node_flow_execution_log(company_code);
```
**필요 작업**:
- [ ] 마이그레이션 파일 생성
- [ ] `nodeFlowExecutionService.ts`에 로그 저장 로직 추가
- [ ] 실행 이력 조회 API 추가 (`GET /api/dataflow/node-flows/:flowId/executions`)
- [ ] 프론트엔드 실행 이력 UI 추가
---
### 2. [우선순위 높음] 드라이런(Dry Run) 모드
**현재 상태**: 실제 데이터를 변경하지 않고 테스트할 방법 없음
**문제점**:
- 프로덕션 데이터에 직접 영향
- 플로우 디버깅 어려움
- 신규 플로우 검증 불가
**개선 방안**:
```typescript
// nodeFlowExecutionService.ts
static async executeFlow(
flowId: number,
contextData: Record<string, any>,
options: { dryRun?: boolean } = {}
): Promise<ExecutionResult> {
if (options.dryRun) {
// 트랜잭션 시작 후 항상 롤백
return transaction(async (client) => {
const result = await this.executeFlowInternal(flowId, contextData, client);
// 롤백을 위해 의도적으로 에러 발생
throw new DryRunComplete(result);
}).catch((e) => {
if (e instanceof DryRunComplete) {
return { ...e.result, dryRun: true };
}
throw e;
});
}
// 기존 로직...
}
```
```typescript
// node-flows.ts 라우트 수정
router.post("/:flowId/execute", async (req, res) => {
const dryRun = req.query.dryRun === 'true';
const result = await NodeFlowExecutionService.executeFlow(
parseInt(flowId, 10),
enrichedContextData,
{ dryRun }
);
// ...
});
```
**필요 작업**:
- [ ] `DryRunComplete` 예외 클래스 생성
- [ ] `executeFlow` 메서드에 `dryRun` 옵션 추가
- [ ] 라우트에 쿼리 파라미터 처리 추가
- [ ] 프론트엔드 "테스트 실행" 버튼 추가
---
### 3. [우선순위 높음] 재시도 메커니즘
**현재 상태**: 외부 API/DB 호출 실패 시 재시도 없음
**문제점**:
- 일시적 네트워크 오류로 전체 플로우 실패
- 외부 서비스 불안정 시 신뢰성 저하
**개선 방안**:
```typescript
// utils/retry.ts
export async function withRetry<T>(
fn: () => Promise<T>,
options: {
maxRetries?: number;
delay?: number;
backoffMultiplier?: number;
retryOn?: (error: any) => boolean;
} = {}
): Promise<T> {
const {
maxRetries = 3,
delay = 1000,
backoffMultiplier = 2,
retryOn = () => true
} = options;
for (let attempt = 0; attempt < maxRetries; attempt++) {
try {
return await fn();
} catch (error) {
if (attempt === maxRetries - 1 || !retryOn(error)) {
throw error;
}
const waitTime = delay * Math.pow(backoffMultiplier, attempt);
logger.warn(`재시도 ${attempt + 1}/${maxRetries}, ${waitTime}ms 후...`);
await new Promise(r => setTimeout(r, waitTime));
}
}
throw new Error('재시도 횟수 초과');
}
```
```typescript
// nodeFlowExecutionService.ts에서 사용
const response = await withRetry(
() => axios({ method, url, headers, data, timeout }),
{
maxRetries: 3,
delay: 1000,
retryOn: (err) => err.code === 'ECONNRESET' || err.response?.status >= 500
}
);
```
**필요 작업**:
- [ ] `withRetry` 유틸리티 함수 생성
- [ ] REST API 호출 부분에 재시도 로직 적용
- [ ] 외부 DB 연결 부분에 재시도 로직 적용
- [ ] 노드별 재시도 설정 UI 추가 (선택사항)
---
### 4. [우선순위 높음] 미완성 데이터 변환 함수
**현재 상태**: FORMAT, CALCULATE, JSON_EXTRACT, CUSTOM 변환이 미구현
**문제점**:
- 날짜/숫자 포맷팅 불가
- 계산식 처리 불가
- JSON 데이터 파싱 불가
**개선 방안**:
```typescript
// nodeFlowExecutionService.ts - applyTransformation 메서드 수정
case "FORMAT":
return rows.map((row) => {
const value = row[sourceField];
let formatted = value;
if (transform.formatType === 'date') {
// dayjs 사용
formatted = dayjs(value).format(transform.formatPattern || 'YYYY-MM-DD');
} else if (transform.formatType === 'number') {
// 숫자 포맷팅
const num = parseFloat(value);
if (transform.formatPattern === 'currency') {
formatted = num.toLocaleString('ko-KR', { style: 'currency', currency: 'KRW' });
} else if (transform.formatPattern === 'percent') {
formatted = (num * 100).toFixed(transform.decimals || 0) + '%';
} else {
formatted = num.toLocaleString('ko-KR', { maximumFractionDigits: transform.decimals || 2 });
}
}
return { ...row, [actualTargetField]: formatted };
});
case "CALCULATE":
return rows.map((row) => {
// 간단한 수식 평가 (보안 주의!)
const expression = transform.expression; // 예: "price * quantity"
const result = evaluateExpression(expression, row);
return { ...row, [actualTargetField]: result };
});
case "JSON_EXTRACT":
return rows.map((row) => {
const jsonValue = typeof row[sourceField] === 'string'
? JSON.parse(row[sourceField])
: row[sourceField];
const extracted = jsonPath.query(jsonValue, transform.jsonPath); // JSONPath 라이브러리 사용
return { ...row, [actualTargetField]: extracted[0] || null };
});
```
**필요 작업**:
- [ ] `dayjs` 라이브러리 추가 (날짜 포맷팅)
- [ ] `jsonpath` 라이브러리 추가 (JSON 추출)
- [ ] 안전한 수식 평가 함수 구현 (eval 대신)
- [ ] 각 변환 타입별 UI 설정 패널 추가
---
### 5. [우선순위 중간] 플로우 버전 관리
**현재 상태**: 플로우 수정 시 이전 버전 덮어씀
**문제점**:
- 실수로 수정한 플로우 복구 불가
- 변경 이력 추적 불가
**개선 방안**:
```sql
-- db/migrations/XXX_add_node_flow_versions.sql
CREATE TABLE node_flow_versions (
id SERIAL PRIMARY KEY,
flow_id INTEGER NOT NULL REFERENCES node_flows(flow_id) ON DELETE CASCADE,
version INTEGER NOT NULL,
flow_data JSONB NOT NULL,
change_description TEXT,
created_by VARCHAR(50),
created_at TIMESTAMP DEFAULT NOW(),
UNIQUE(flow_id, version)
);
CREATE INDEX idx_flow_versions_flow_id ON node_flow_versions(flow_id);
```
```typescript
// 플로우 수정 시 버전 저장
async function updateNodeFlow(flowId, flowData, changeDescription, userId) {
// 현재 버전 조회
const currentVersion = await queryOne(
'SELECT COALESCE(MAX(version), 0) as max_version FROM node_flow_versions WHERE flow_id = $1',
[flowId]
);
// 새 버전 저장
await query(
'INSERT INTO node_flow_versions (flow_id, version, flow_data, change_description, created_by) VALUES ($1, $2, $3, $4, $5)',
[flowId, currentVersion.max_version + 1, flowData, changeDescription, userId]
);
// 기존 업데이트 로직...
}
```
**필요 작업**:
- [ ] 버전 테이블 마이그레이션 생성
- [ ] 플로우 수정 시 버전 자동 저장
- [ ] 버전 목록 조회 API (`GET /api/dataflow/node-flows/:flowId/versions`)
- [ ] 특정 버전으로 롤백 API (`POST /api/dataflow/node-flows/:flowId/rollback/:version`)
- [ ] 프론트엔드 버전 히스토리 UI
---
### 6. [우선순위 중간] 복합 조건 지원
**현재 상태**: 조건 노드에서 단일 조건만 지원
**문제점**:
- 복잡한 비즈니스 로직 표현 불가
- 여러 조건을 AND/OR로 조합 불가
**개선 방안**:
```typescript
// 복합 조건 타입 정의
interface ConditionGroup {
type: 'AND' | 'OR';
conditions: (Condition | ConditionGroup)[];
}
interface Condition {
field: string;
operator: string;
value: any;
}
// 조건 평가 함수 수정
function evaluateConditionGroup(group: ConditionGroup, data: any): boolean {
const results = group.conditions.map(condition => {
if ('type' in condition) {
// 중첩된 그룹
return evaluateConditionGroup(condition, data);
} else {
// 단일 조건
return evaluateCondition(data[condition.field], condition.operator, condition.value);
}
});
return group.type === 'AND'
? results.every(r => r)
: results.some(r => r);
}
```
**필요 작업**:
- [ ] 복합 조건 타입 정의
- [ ] `evaluateConditionGroup` 함수 구현
- [ ] 조건 노드 속성 패널 UI 수정 (AND/OR 그룹 빌더)
---
### 7. [우선순위 중간] 비동기 실행
**현재 상태**: 동기 실행만 가능 (HTTP 요청 타임아웃 제한)
**문제점**:
- 대용량 데이터 처리 시 타임아웃
- 장시간 실행 플로우 처리 불가
**개선 방안**:
```sql
-- 실행 큐 테이블
CREATE TABLE node_flow_execution_queue (
id SERIAL PRIMARY KEY,
flow_id INTEGER NOT NULL REFERENCES node_flows(flow_id),
execution_id UUID NOT NULL UNIQUE DEFAULT gen_random_uuid(),
status VARCHAR(20) NOT NULL DEFAULT 'queued', -- queued, running, completed, failed
context_data JSONB,
callback_url TEXT,
result JSONB,
error_message TEXT,
queued_by VARCHAR(50),
company_code VARCHAR(20),
queued_at TIMESTAMP DEFAULT NOW(),
started_at TIMESTAMP,
completed_at TIMESTAMP
);
```
```typescript
// 비동기 실행 API
router.post("/:flowId/execute-async", async (req, res) => {
const { callbackUrl, contextData } = req.body;
// 큐에 추가
const execution = await queryOne(
`INSERT INTO node_flow_execution_queue (flow_id, context_data, callback_url, queued_by, company_code)
VALUES ($1, $2, $3, $4, $5) RETURNING execution_id`,
[flowId, contextData, callbackUrl, req.user?.userId, req.user?.companyCode]
);
// 백그라운드 워커가 처리
return res.json({
success: true,
executionId: execution.execution_id,
status: 'queued'
});
});
// 상태 조회 API
router.get("/executions/:executionId", async (req, res) => {
const execution = await queryOne(
'SELECT * FROM node_flow_execution_queue WHERE execution_id = $1',
[req.params.executionId]
);
return res.json({ success: true, data: execution });
});
```
**필요 작업**:
- [ ] 실행 큐 테이블 마이그레이션
- [ ] 비동기 실행 API 추가
- [ ] 백그라운드 워커 프로세스 구현 (별도 프로세스 또는 Bull 큐)
- [ ] 웹훅 콜백 기능 구현
- [ ] 프론트엔드 비동기 실행 상태 폴링 UI
---
### 8. [우선순위 낮음] 플로우 스케줄링
**현재 상태**: 수동 실행만 가능
**문제점**:
- 정기적인 배치 작업 자동화 불가
- 특정 시간 예약 실행 불가
**개선 방안**:
```sql
-- 스케줄 테이블
CREATE TABLE node_flow_schedules (
id SERIAL PRIMARY KEY,
flow_id INTEGER NOT NULL REFERENCES node_flows(flow_id) ON DELETE CASCADE,
schedule_name VARCHAR(100),
cron_expression VARCHAR(50) NOT NULL, -- '0 9 * * 1-5' (평일 9시)
context_data JSONB,
is_active BOOLEAN DEFAULT true,
last_run_at TIMESTAMP,
next_run_at TIMESTAMP,
created_by VARCHAR(50),
company_code VARCHAR(20),
created_at TIMESTAMP DEFAULT NOW(),
updated_at TIMESTAMP DEFAULT NOW()
);
```
**필요 작업**:
- [ ] 스케줄 테이블 마이그레이션
- [ ] 스케줄 CRUD API
- [ ] node-cron 또는 Bull 스케줄러 통합
- [ ] 스케줄 관리 UI
---
### 9. [우선순위 낮음] 플러그인 아키텍처
**현재 상태**: 새 노드 타입 추가 시 `nodeFlowExecutionService.ts` 직접 수정 필요
**문제점**:
- 코드 복잡도 증가
- 확장성 제한
**개선 방안**:
```typescript
// interfaces/NodeHandler.ts
export interface NodeHandler {
type: string;
execute(node: FlowNode, inputData: any, context: ExecutionContext, client?: any): Promise<any>;
validate?(node: FlowNode): { valid: boolean; errors: string[] };
}
// handlers/InsertActionHandler.ts
export class InsertActionHandler implements NodeHandler {
type = 'insertAction';
async execute(node, inputData, context, client) {
// 기존 executeInsertAction 로직
}
}
// NodeHandlerRegistry.ts
class NodeHandlerRegistry {
private handlers = new Map<string, NodeHandler>();
register(handler: NodeHandler) {
this.handlers.set(handler.type, handler);
}
get(type: string): NodeHandler | undefined {
return this.handlers.get(type);
}
}
// 사용
const registry = new NodeHandlerRegistry();
registry.register(new InsertActionHandler());
registry.register(new UpdateActionHandler());
// ...
// executeNodeByType에서
const handler = registry.get(node.type);
if (handler) {
return handler.execute(node, inputData, context, client);
}
```
**필요 작업**:
- [ ] `NodeHandler` 인터페이스 정의
- [ ] 기존 노드 타입별 핸들러 클래스 분리
- [ ] `NodeHandlerRegistry` 구현
- [ ] 커스텀 노드 핸들러 등록 메커니즘
---
### 10. [우선순위 낮음] 프론트엔드 연동 강화
**현재 상태**: 기본 에디터 구현됨
**개선 필요 항목**:
- [ ] 실행 결과 시각화 (노드별 성공/실패 표시)
- [ ] 실시간 실행 진행률 표시
- [ ] 드라이런 모드 UI
- [ ] 실행 이력 조회 UI
- [ ] 버전 히스토리 UI
- [ ] 노드 검증 결과 표시
---
## 프론트엔드 컴포넌트 CRUD 로직 이전 계획
현재 프론트엔드 컴포넌트에서 직접 CRUD를 수행하는 코드들을 노드 플로우로 이전해야 합니다.
### 이전 대상 컴포넌트
| 컴포넌트 | 파일 위치 | 현재 로직 | 이전 우선순위 |
|----------|----------|----------|--------------|
| SplitPanelLayoutComponent | `frontend/lib/registry/components/split-panel-layout/` | createRecord, updateRecord, deleteRecord | 높음 |
| RepeatScreenModalComponent | `frontend/lib/registry/components/repeat-screen-modal/` | 다중 테이블 INSERT/UPDATE/DELETE | 높음 |
| UniversalFormModalComponent | `frontend/lib/registry/components/universal-form-modal/` | 다중 행 저장 | 높음 |
| SelectedItemsDetailInputComponent | `frontend/lib/registry/components/selected-items-detail-input/` | upsertGroupedRecords | 높음 |
| ButtonPrimaryComponent | `frontend/lib/registry/components/button-primary/` | 상태 변경 POST | 중간 |
| SimpleRepeaterTableComponent | `frontend/lib/registry/components/simple-repeater-table/` | 데이터 저장 POST | 중간 |
### 이전 방식
1. **플로우 생성**: 각 컴포넌트의 저장 로직을 노드 플로우로 구현
2. **프론트엔드 수정**: 직접 API 호출 대신 `executeNodeFlow(flowId, contextData)` 호출
3. **화면 설정에 플로우 연결**: 버튼 액션에 실행할 플로우 ID 설정
```typescript
// 현재 (프론트엔드에서 직접 호출)
const result = await dataApi.createRecord(tableName, data);
// 개선 후 (플로우 실행)
const result = await executeNodeFlow(flowId, {
formData: data,
tableName: tableName,
action: 'create'
});
```
---
## 참고 자료
- 노드 플로우 실행 엔진: `backend-node/src/services/nodeFlowExecutionService.ts`
- 플로우 타입 정의: `backend-node/src/types/flow.ts`
- 프론트엔드 플로우 에디터: `frontend/components/dataflow/node-editor/FlowEditor.tsx`
- 프론트엔드 플로우 API: `frontend/lib/api/nodeFlows.ts`