Files
vexplor/docs/노드플로우_개선사항.md

17 KiB

노드 플로우 기능 개선 사항

작성일: 2024-12-08 상태: 분석 완료, 개선 대기

현재 구현 상태

잘 구현된 기능

기능 상태 설명
위상 정렬 실행 완료 DAG 기반 레벨별 실행
트랜잭션 관리 완료 전체 플로우 단일 트랜잭션, 실패 시 자동 롤백
병렬 실행 완료 같은 레벨 노드 Promise.allSettled로 병렬 처리
CRUD 액션 완료 INSERT, UPDATE, DELETE, UPSERT 지원
외부 DB 연동 완료 PostgreSQL, MySQL, MSSQL, Oracle 지원
REST API 연동 완료 GET, POST, PUT, DELETE 지원
조건 분기 완료 다양한 연산자 지원
데이터 변환 부분 완료 UPPERCASE, TRIM, EXPLODE 등 기본 변환
집계 함수 완료 SUM, COUNT, AVG, MIN, MAX, FIRST, LAST

관련 파일

  • 백엔드 실행 엔진: backend-node/src/services/nodeFlowExecutionService.ts
  • 백엔드 라우트: backend-node/src/routes/dataflow/node-flows.ts
  • 프론트엔드 API: frontend/lib/api/nodeFlows.ts
  • 프론트엔드 에디터: frontend/components/dataflow/node-editor/FlowEditor.tsx
  • 타입 정의: backend-node/src/types/flow.ts

개선 필요 사항

1. [우선순위 높음] 실행 이력 로깅

현재 상태: 플로우 실행 이력이 저장되지 않음

문제점:

  • 언제, 누가, 어떤 플로우를 실행했는지 추적 불가
  • 실패 원인 분석 어려움
  • 감사(Audit) 요구사항 충족 불가

개선 방안:

-- db/migrations/XXX_add_node_flow_execution_log.sql
CREATE TABLE node_flow_execution_log (
  id SERIAL PRIMARY KEY,
  flow_id INTEGER NOT NULL REFERENCES node_flows(flow_id) ON DELETE CASCADE,
  execution_status VARCHAR(20) NOT NULL, -- 'success', 'failed', 'partial'
  execution_time_ms INTEGER,
  total_nodes INTEGER,
  success_nodes INTEGER,
  failed_nodes INTEGER,
  skipped_nodes INTEGER,
  executed_by VARCHAR(50),
  company_code VARCHAR(20),
  context_data JSONB,
  result_summary JSONB,
  error_message TEXT,
  created_at TIMESTAMP DEFAULT NOW()
);

CREATE INDEX idx_flow_execution_log_flow_id ON node_flow_execution_log(flow_id);
CREATE INDEX idx_flow_execution_log_created_at ON node_flow_execution_log(created_at DESC);
CREATE INDEX idx_flow_execution_log_company_code ON node_flow_execution_log(company_code);

필요 작업:

  • 마이그레이션 파일 생성
  • nodeFlowExecutionService.ts에 로그 저장 로직 추가
  • 실행 이력 조회 API 추가 (GET /api/dataflow/node-flows/:flowId/executions)
  • 프론트엔드 실행 이력 UI 추가

2. [우선순위 높음] 드라이런(Dry Run) 모드

현재 상태: 실제 데이터를 변경하지 않고 테스트할 방법 없음

문제점:

  • 프로덕션 데이터에 직접 영향
  • 플로우 디버깅 어려움
  • 신규 플로우 검증 불가

개선 방안:

// nodeFlowExecutionService.ts
static async executeFlow(
  flowId: number,
  contextData: Record<string, any>,
  options: { dryRun?: boolean } = {}
): Promise<ExecutionResult> {
  if (options.dryRun) {
    // 트랜잭션 시작 후 항상 롤백
    return transaction(async (client) => {
      const result = await this.executeFlowInternal(flowId, contextData, client);
      // 롤백을 위해 의도적으로 에러 발생
      throw new DryRunComplete(result);
    }).catch((e) => {
      if (e instanceof DryRunComplete) {
        return { ...e.result, dryRun: true };
      }
      throw e;
    });
  }
  // 기존 로직...
}
// node-flows.ts 라우트 수정
router.post("/:flowId/execute", async (req, res) => {
  const dryRun = req.query.dryRun === 'true';
  const result = await NodeFlowExecutionService.executeFlow(
    parseInt(flowId, 10),
    enrichedContextData,
    { dryRun }
  );
  // ...
});

필요 작업:

  • DryRunComplete 예외 클래스 생성
  • executeFlow 메서드에 dryRun 옵션 추가
  • 라우트에 쿼리 파라미터 처리 추가
  • 프론트엔드 "테스트 실행" 버튼 추가

3. [우선순위 높음] 재시도 메커니즘

현재 상태: 외부 API/DB 호출 실패 시 재시도 없음

문제점:

  • 일시적 네트워크 오류로 전체 플로우 실패
  • 외부 서비스 불안정 시 신뢰성 저하

개선 방안:

// utils/retry.ts
export async function withRetry<T>(
  fn: () => Promise<T>,
  options: {
    maxRetries?: number;
    delay?: number;
    backoffMultiplier?: number;
    retryOn?: (error: any) => boolean;
  } = {}
): Promise<T> {
  const { 
    maxRetries = 3, 
    delay = 1000, 
    backoffMultiplier = 2,
    retryOn = () => true 
  } = options;

  for (let attempt = 0; attempt < maxRetries; attempt++) {
    try {
      return await fn();
    } catch (error) {
      if (attempt === maxRetries - 1 || !retryOn(error)) {
        throw error;
      }
      const waitTime = delay * Math.pow(backoffMultiplier, attempt);
      logger.warn(`재시도 ${attempt + 1}/${maxRetries}, ${waitTime}ms 후...`);
      await new Promise(r => setTimeout(r, waitTime));
    }
  }
  throw new Error('재시도 횟수 초과');
}
// nodeFlowExecutionService.ts에서 사용
const response = await withRetry(
  () => axios({ method, url, headers, data, timeout }),
  { 
    maxRetries: 3, 
    delay: 1000,
    retryOn: (err) => err.code === 'ECONNRESET' || err.response?.status >= 500
  }
);

필요 작업:

  • withRetry 유틸리티 함수 생성
  • REST API 호출 부분에 재시도 로직 적용
  • 외부 DB 연결 부분에 재시도 로직 적용
  • 노드별 재시도 설정 UI 추가 (선택사항)

4. [우선순위 높음] 미완성 데이터 변환 함수

현재 상태: FORMAT, CALCULATE, JSON_EXTRACT, CUSTOM 변환이 미구현

문제점:

  • 날짜/숫자 포맷팅 불가
  • 계산식 처리 불가
  • JSON 데이터 파싱 불가

개선 방안:

// nodeFlowExecutionService.ts - applyTransformation 메서드 수정

case "FORMAT":
  return rows.map((row) => {
    const value = row[sourceField];
    let formatted = value;
    
    if (transform.formatType === 'date') {
      // dayjs 사용
      formatted = dayjs(value).format(transform.formatPattern || 'YYYY-MM-DD');
    } else if (transform.formatType === 'number') {
      // 숫자 포맷팅
      const num = parseFloat(value);
      if (transform.formatPattern === 'currency') {
        formatted = num.toLocaleString('ko-KR', { style: 'currency', currency: 'KRW' });
      } else if (transform.formatPattern === 'percent') {
        formatted = (num * 100).toFixed(transform.decimals || 0) + '%';
      } else {
        formatted = num.toLocaleString('ko-KR', { maximumFractionDigits: transform.decimals || 2 });
      }
    }
    
    return { ...row, [actualTargetField]: formatted };
  });

case "CALCULATE":
  return rows.map((row) => {
    // 간단한 수식 평가 (보안 주의!)
    const expression = transform.expression; // 예: "price * quantity"
    const result = evaluateExpression(expression, row);
    return { ...row, [actualTargetField]: result };
  });

case "JSON_EXTRACT":
  return rows.map((row) => {
    const jsonValue = typeof row[sourceField] === 'string' 
      ? JSON.parse(row[sourceField]) 
      : row[sourceField];
    const extracted = jsonPath.query(jsonValue, transform.jsonPath); // JSONPath 라이브러리 사용
    return { ...row, [actualTargetField]: extracted[0] || null };
  });

필요 작업:

  • dayjs 라이브러리 추가 (날짜 포맷팅)
  • jsonpath 라이브러리 추가 (JSON 추출)
  • 안전한 수식 평가 함수 구현 (eval 대신)
  • 각 변환 타입별 UI 설정 패널 추가

5. [우선순위 중간] 플로우 버전 관리

현재 상태: 플로우 수정 시 이전 버전 덮어씀

문제점:

  • 실수로 수정한 플로우 복구 불가
  • 변경 이력 추적 불가

개선 방안:

-- db/migrations/XXX_add_node_flow_versions.sql
CREATE TABLE node_flow_versions (
  id SERIAL PRIMARY KEY,
  flow_id INTEGER NOT NULL REFERENCES node_flows(flow_id) ON DELETE CASCADE,
  version INTEGER NOT NULL,
  flow_data JSONB NOT NULL,
  change_description TEXT,
  created_by VARCHAR(50),
  created_at TIMESTAMP DEFAULT NOW(),
  UNIQUE(flow_id, version)
);

CREATE INDEX idx_flow_versions_flow_id ON node_flow_versions(flow_id);
// 플로우 수정 시 버전 저장
async function updateNodeFlow(flowId, flowData, changeDescription, userId) {
  // 현재 버전 조회
  const currentVersion = await queryOne(
    'SELECT COALESCE(MAX(version), 0) as max_version FROM node_flow_versions WHERE flow_id = $1',
    [flowId]
  );
  
  // 새 버전 저장
  await query(
    'INSERT INTO node_flow_versions (flow_id, version, flow_data, change_description, created_by) VALUES ($1, $2, $3, $4, $5)',
    [flowId, currentVersion.max_version + 1, flowData, changeDescription, userId]
  );
  
  // 기존 업데이트 로직...
}

필요 작업:

  • 버전 테이블 마이그레이션 생성
  • 플로우 수정 시 버전 자동 저장
  • 버전 목록 조회 API (GET /api/dataflow/node-flows/:flowId/versions)
  • 특정 버전으로 롤백 API (POST /api/dataflow/node-flows/:flowId/rollback/:version)
  • 프론트엔드 버전 히스토리 UI

6. [우선순위 중간] 복합 조건 지원

현재 상태: 조건 노드에서 단일 조건만 지원

문제점:

  • 복잡한 비즈니스 로직 표현 불가
  • 여러 조건을 AND/OR로 조합 불가

개선 방안:

// 복합 조건 타입 정의
interface ConditionGroup {
  type: 'AND' | 'OR';
  conditions: (Condition | ConditionGroup)[];
}

interface Condition {
  field: string;
  operator: string;
  value: any;
}

// 조건 평가 함수 수정
function evaluateConditionGroup(group: ConditionGroup, data: any): boolean {
  const results = group.conditions.map(condition => {
    if ('type' in condition) {
      // 중첩된 그룹
      return evaluateConditionGroup(condition, data);
    } else {
      // 단일 조건
      return evaluateCondition(data[condition.field], condition.operator, condition.value);
    }
  });
  
  return group.type === 'AND' 
    ? results.every(r => r) 
    : results.some(r => r);
}

필요 작업:

  • 복합 조건 타입 정의
  • evaluateConditionGroup 함수 구현
  • 조건 노드 속성 패널 UI 수정 (AND/OR 그룹 빌더)

7. [우선순위 중간] 비동기 실행

현재 상태: 동기 실행만 가능 (HTTP 요청 타임아웃 제한)

문제점:

  • 대용량 데이터 처리 시 타임아웃
  • 장시간 실행 플로우 처리 불가

개선 방안:

-- 실행 큐 테이블
CREATE TABLE node_flow_execution_queue (
  id SERIAL PRIMARY KEY,
  flow_id INTEGER NOT NULL REFERENCES node_flows(flow_id),
  execution_id UUID NOT NULL UNIQUE DEFAULT gen_random_uuid(),
  status VARCHAR(20) NOT NULL DEFAULT 'queued', -- queued, running, completed, failed
  context_data JSONB,
  callback_url TEXT,
  result JSONB,
  error_message TEXT,
  queued_by VARCHAR(50),
  company_code VARCHAR(20),
  queued_at TIMESTAMP DEFAULT NOW(),
  started_at TIMESTAMP,
  completed_at TIMESTAMP
);
// 비동기 실행 API
router.post("/:flowId/execute-async", async (req, res) => {
  const { callbackUrl, contextData } = req.body;
  
  // 큐에 추가
  const execution = await queryOne(
    `INSERT INTO node_flow_execution_queue (flow_id, context_data, callback_url, queued_by, company_code)
     VALUES ($1, $2, $3, $4, $5) RETURNING execution_id`,
    [flowId, contextData, callbackUrl, req.user?.userId, req.user?.companyCode]
  );
  
  // 백그라운드 워커가 처리
  return res.json({
    success: true,
    executionId: execution.execution_id,
    status: 'queued'
  });
});

// 상태 조회 API
router.get("/executions/:executionId", async (req, res) => {
  const execution = await queryOne(
    'SELECT * FROM node_flow_execution_queue WHERE execution_id = $1',
    [req.params.executionId]
  );
  return res.json({ success: true, data: execution });
});

필요 작업:

  • 실행 큐 테이블 마이그레이션
  • 비동기 실행 API 추가
  • 백그라운드 워커 프로세스 구현 (별도 프로세스 또는 Bull 큐)
  • 웹훅 콜백 기능 구현
  • 프론트엔드 비동기 실행 상태 폴링 UI

8. [우선순위 낮음] 플로우 스케줄링

현재 상태: 수동 실행만 가능

문제점:

  • 정기적인 배치 작업 자동화 불가
  • 특정 시간 예약 실행 불가

개선 방안:

-- 스케줄 테이블
CREATE TABLE node_flow_schedules (
  id SERIAL PRIMARY KEY,
  flow_id INTEGER NOT NULL REFERENCES node_flows(flow_id) ON DELETE CASCADE,
  schedule_name VARCHAR(100),
  cron_expression VARCHAR(50) NOT NULL, -- '0 9 * * 1-5' (평일 9시)
  context_data JSONB,
  is_active BOOLEAN DEFAULT true,
  last_run_at TIMESTAMP,
  next_run_at TIMESTAMP,
  created_by VARCHAR(50),
  company_code VARCHAR(20),
  created_at TIMESTAMP DEFAULT NOW(),
  updated_at TIMESTAMP DEFAULT NOW()
);

필요 작업:

  • 스케줄 테이블 마이그레이션
  • 스케줄 CRUD API
  • node-cron 또는 Bull 스케줄러 통합
  • 스케줄 관리 UI

9. [우선순위 낮음] 플러그인 아키텍처

현재 상태: 새 노드 타입 추가 시 nodeFlowExecutionService.ts 직접 수정 필요

문제점:

  • 코드 복잡도 증가
  • 확장성 제한

개선 방안:

// interfaces/NodeHandler.ts
export interface NodeHandler {
  type: string;
  execute(node: FlowNode, inputData: any, context: ExecutionContext, client?: any): Promise<any>;
  validate?(node: FlowNode): { valid: boolean; errors: string[] };
}

// handlers/InsertActionHandler.ts
export class InsertActionHandler implements NodeHandler {
  type = 'insertAction';
  
  async execute(node, inputData, context, client) {
    // 기존 executeInsertAction 로직
  }
}

// NodeHandlerRegistry.ts
class NodeHandlerRegistry {
  private handlers = new Map<string, NodeHandler>();
  
  register(handler: NodeHandler) {
    this.handlers.set(handler.type, handler);
  }
  
  get(type: string): NodeHandler | undefined {
    return this.handlers.get(type);
  }
}

// 사용
const registry = new NodeHandlerRegistry();
registry.register(new InsertActionHandler());
registry.register(new UpdateActionHandler());
// ...

// executeNodeByType에서
const handler = registry.get(node.type);
if (handler) {
  return handler.execute(node, inputData, context, client);
}

필요 작업:

  • NodeHandler 인터페이스 정의
  • 기존 노드 타입별 핸들러 클래스 분리
  • NodeHandlerRegistry 구현
  • 커스텀 노드 핸들러 등록 메커니즘

10. [우선순위 낮음] 프론트엔드 연동 강화

현재 상태: 기본 에디터 구현됨

개선 필요 항목:

  • 실행 결과 시각화 (노드별 성공/실패 표시)
  • 실시간 실행 진행률 표시
  • 드라이런 모드 UI
  • 실행 이력 조회 UI
  • 버전 히스토리 UI
  • 노드 검증 결과 표시

프론트엔드 컴포넌트 CRUD 로직 이전 계획

현재 프론트엔드 컴포넌트에서 직접 CRUD를 수행하는 코드들을 노드 플로우로 이전해야 합니다.

이전 대상 컴포넌트

컴포넌트 파일 위치 현재 로직 이전 우선순위
SplitPanelLayoutComponent frontend/lib/registry/components/split-panel-layout/ createRecord, updateRecord, deleteRecord 높음
RepeatScreenModalComponent frontend/lib/registry/components/repeat-screen-modal/ 다중 테이블 INSERT/UPDATE/DELETE 높음
UniversalFormModalComponent frontend/lib/registry/components/universal-form-modal/ 다중 행 저장 높음
SelectedItemsDetailInputComponent frontend/lib/registry/components/selected-items-detail-input/ upsertGroupedRecords 높음
ButtonPrimaryComponent frontend/lib/registry/components/button-primary/ 상태 변경 POST 중간
SimpleRepeaterTableComponent frontend/lib/registry/components/simple-repeater-table/ 데이터 저장 POST 중간

이전 방식

  1. 플로우 생성: 각 컴포넌트의 저장 로직을 노드 플로우로 구현
  2. 프론트엔드 수정: 직접 API 호출 대신 executeNodeFlow(flowId, contextData) 호출
  3. 화면 설정에 플로우 연결: 버튼 액션에 실행할 플로우 ID 설정
// 현재 (프론트엔드에서 직접 호출)
const result = await dataApi.createRecord(tableName, data);

// 개선 후 (플로우 실행)
const result = await executeNodeFlow(flowId, {
  formData: data,
  tableName: tableName,
  action: 'create'
});

참고 자료

  • 노드 플로우 실행 엔진: backend-node/src/services/nodeFlowExecutionService.ts
  • 플로우 타입 정의: backend-node/src/types/flow.ts
  • 프론트엔드 플로우 에디터: frontend/components/dataflow/node-editor/FlowEditor.tsx
  • 프론트엔드 플로우 API: frontend/lib/api/nodeFlows.ts