Merge branch 'main' of http://39.117.244.52:3000/kjs/ERP-node into feature/report

This commit is contained in:
dohyeons
2025-10-13 09:44:09 +09:00
74 changed files with 20398 additions and 482 deletions

View File

@@ -103,15 +103,34 @@ export class OracleConnector implements DatabaseConnector {
try {
const startTime = Date.now();
// 쿼리 타입 확인 (DML인지 SELECT인지)
// 쿼리 타입 확인
const isDML = /^\s*(INSERT|UPDATE|DELETE|MERGE)/i.test(query);
const isCOMMIT = /^\s*COMMIT/i.test(query);
const isROLLBACK = /^\s*ROLLBACK/i.test(query);
// 🔥 COMMIT/ROLLBACK 명령은 직접 실행
if (isCOMMIT || isROLLBACK) {
if (isCOMMIT) {
await (this.connection as any).commit();
console.log("✅ Oracle COMMIT 실행됨");
} else {
await (this.connection as any).rollback();
console.log("⚠️ Oracle ROLLBACK 실행됨");
}
return {
rows: [],
rowCount: 0,
fields: [],
affectedRows: 0,
};
}
// Oracle XE 21c 쿼리 실행 옵션
const options: any = {
outFormat: (oracledb as any).OUT_FORMAT_OBJECT, // OBJECT format
maxRows: 10000, // XE 제한 고려
fetchArraySize: 100,
autoCommit: isDML, // ✅ DML 쿼리는 자동 커밋
autoCommit: false, // 🔥 수동으로 COMMIT 제어하도록 변경
};
console.log("Oracle 쿼리 실행:", {

View File

@@ -1,6 +1,10 @@
import axios, { AxiosInstance, AxiosResponse } from 'axios';
import { DatabaseConnector, ConnectionConfig, QueryResult } from '../interfaces/DatabaseConnector';
import { ConnectionTestResult, TableInfo } from '../types/externalDbTypes';
import axios, { AxiosInstance, AxiosResponse } from "axios";
import {
DatabaseConnector,
ConnectionConfig,
QueryResult,
} from "../interfaces/DatabaseConnector";
import { ConnectionTestResult, TableInfo } from "../types/externalDbTypes";
export interface RestApiConfig {
baseUrl: string;
@@ -20,16 +24,16 @@ export class RestApiConnector implements DatabaseConnector {
constructor(config: RestApiConfig) {
this.config = config;
// Axios 인스턴스 생성
this.httpClient = axios.create({
baseURL: config.baseUrl,
timeout: config.timeout || 30000,
headers: {
'Content-Type': 'application/json',
'Authorization': `Bearer ${config.apiKey}`,
'Accept': 'application/json'
}
"Content-Type": "application/json",
Authorization: `Bearer ${config.apiKey}`,
Accept: "application/json",
},
});
// 요청/응답 인터셉터 설정
@@ -40,11 +44,13 @@ export class RestApiConnector implements DatabaseConnector {
// 요청 인터셉터
this.httpClient.interceptors.request.use(
(config) => {
console.log(`[RestApiConnector] 요청: ${config.method?.toUpperCase()} ${config.url}`);
console.log(
`[RestApiConnector] 요청: ${config.method?.toUpperCase()} ${config.url}`
);
return config;
},
(error) => {
console.error('[RestApiConnector] 요청 오류:', error);
console.error("[RestApiConnector] 요청 오류:", error);
return Promise.reject(error);
}
);
@@ -52,11 +58,17 @@ export class RestApiConnector implements DatabaseConnector {
// 응답 인터셉터
this.httpClient.interceptors.response.use(
(response) => {
console.log(`[RestApiConnector] 응답: ${response.status} ${response.statusText}`);
console.log(
`[RestApiConnector] 응답: ${response.status} ${response.statusText}`
);
return response;
},
(error) => {
console.error('[RestApiConnector] 응답 오류:', error.response?.status, error.response?.statusText);
console.error(
"[RestApiConnector] 응답 오류:",
error.response?.status,
error.response?.statusText
);
return Promise.reject(error);
}
);
@@ -65,16 +77,23 @@ export class RestApiConnector implements DatabaseConnector {
async connect(): Promise<void> {
try {
// 연결 테스트 - 기본 엔드포인트 호출
await this.httpClient.get('/health', { timeout: 5000 });
await this.httpClient.get("/health", { timeout: 5000 });
console.log(`[RestApiConnector] 연결 성공: ${this.config.baseUrl}`);
} catch (error) {
// health 엔드포인트가 없을 수 있으므로 404는 정상으로 처리
if (axios.isAxiosError(error) && error.response?.status === 404) {
console.log(`[RestApiConnector] 연결 성공 (health 엔드포인트 없음): ${this.config.baseUrl}`);
console.log(
`[RestApiConnector] 연결 성공 (health 엔드포인트 없음): ${this.config.baseUrl}`
);
return;
}
console.error(`[RestApiConnector] 연결 실패: ${this.config.baseUrl}`, error);
throw new Error(`REST API 연결 실패: ${error instanceof Error ? error.message : '알 수 없는 오류'}`);
console.error(
`[RestApiConnector] 연결 실패: ${this.config.baseUrl}`,
error
);
throw new Error(
`REST API 연결 실패: ${error instanceof Error ? error.message : "알 수 없는 오류"}`
);
}
}
@@ -88,39 +107,55 @@ export class RestApiConnector implements DatabaseConnector {
await this.connect();
return {
success: true,
message: 'REST API 연결이 성공했습니다.',
message: "REST API 연결이 성공했습니다.",
details: {
response_time: Date.now()
}
response_time: Date.now(),
},
};
} catch (error) {
return {
success: false,
message: error instanceof Error ? error.message : 'REST API 연결에 실패했습니다.',
message:
error instanceof Error
? error.message
: "REST API 연결에 실패했습니다.",
details: {
response_time: Date.now()
}
response_time: Date.now(),
},
};
}
}
async executeQuery(endpoint: string, method: 'GET' | 'POST' | 'PUT' | 'DELETE' = 'GET', data?: any): Promise<QueryResult> {
// 🔥 DatabaseConnector 인터페이스 호환용 executeQuery (사용하지 않음)
async executeQuery(query: string, params?: any[]): Promise<QueryResult> {
// REST API는 executeRequest를 사용해야 함
throw new Error(
"REST API Connector는 executeQuery를 지원하지 않습니다. executeRequest를 사용하세요."
);
}
// 🔥 실제 REST API 요청을 위한 메서드
async executeRequest(
endpoint: string,
method: "GET" | "POST" | "PUT" | "DELETE" = "GET",
data?: any
): Promise<QueryResult> {
try {
const startTime = Date.now();
let response: AxiosResponse;
// HTTP 메서드에 따른 요청 실행
switch (method.toUpperCase()) {
case 'GET':
case "GET":
response = await this.httpClient.get(endpoint);
break;
case 'POST':
case "POST":
response = await this.httpClient.post(endpoint, data);
break;
case 'PUT':
case "PUT":
response = await this.httpClient.put(endpoint, data);
break;
case 'DELETE':
case "DELETE":
response = await this.httpClient.delete(endpoint);
break;
default:
@@ -133,21 +168,36 @@ export class RestApiConnector implements DatabaseConnector {
console.log(`[RestApiConnector] 원본 응답 데이터:`, {
type: typeof responseData,
isArray: Array.isArray(responseData),
keys: typeof responseData === 'object' ? Object.keys(responseData) : 'not object',
responseData: responseData
keys:
typeof responseData === "object"
? Object.keys(responseData)
: "not object",
responseData: responseData,
});
// 응답 데이터 처리
let rows: any[];
if (Array.isArray(responseData)) {
rows = responseData;
} else if (responseData && responseData.data && Array.isArray(responseData.data)) {
} else if (
responseData &&
responseData.data &&
Array.isArray(responseData.data)
) {
// API 응답이 {success: true, data: [...]} 형태인 경우
rows = responseData.data;
} else if (responseData && responseData.data && typeof responseData.data === 'object') {
} else if (
responseData &&
responseData.data &&
typeof responseData.data === "object"
) {
// API 응답이 {success: true, data: {...}} 형태인 경우 (단일 객체)
rows = [responseData.data];
} else if (responseData && typeof responseData === 'object' && !Array.isArray(responseData)) {
} else if (
responseData &&
typeof responseData === "object" &&
!Array.isArray(responseData)
) {
// 단일 객체 응답인 경우
rows = [responseData];
} else {
@@ -156,8 +206,8 @@ export class RestApiConnector implements DatabaseConnector {
console.log(`[RestApiConnector] 처리된 rows:`, {
rowsLength: rows.length,
firstRow: rows.length > 0 ? rows[0] : 'no data',
allRows: rows
firstRow: rows.length > 0 ? rows[0] : "no data",
allRows: rows,
});
console.log(`[RestApiConnector] API 호출 결과:`, {
@@ -165,22 +215,32 @@ export class RestApiConnector implements DatabaseConnector {
method,
status: response.status,
rowCount: rows.length,
executionTime: `${executionTime}ms`
executionTime: `${executionTime}ms`,
});
return {
rows: rows,
rowCount: rows.length,
fields: rows.length > 0 ? Object.keys(rows[0]).map(key => ({ name: key, type: 'string' })) : []
fields:
rows.length > 0
? Object.keys(rows[0]).map((key) => ({ name: key, type: "string" }))
: [],
};
} catch (error) {
console.error(`[RestApiConnector] API 호출 오류 (${method} ${endpoint}):`, error);
console.error(
`[RestApiConnector] API 호출 오류 (${method} ${endpoint}):`,
error
);
if (axios.isAxiosError(error)) {
throw new Error(`REST API 호출 실패: ${error.response?.status} ${error.response?.statusText}`);
throw new Error(
`REST API 호출 실패: ${error.response?.status} ${error.response?.statusText}`
);
}
throw new Error(`REST API 호출 실패: ${error instanceof Error ? error.message : '알 수 없는 오류'}`);
throw new Error(
`REST API 호출 실패: ${error instanceof Error ? error.message : "알 수 없는 오류"}`
);
}
}
@@ -189,20 +249,20 @@ export class RestApiConnector implements DatabaseConnector {
// 일반적인 REST API 엔드포인트들을 반환
return [
{
table_name: '/api/users',
table_name: "/api/users",
columns: [],
description: '사용자 정보 API'
description: "사용자 정보 API",
},
{
table_name: '/api/data',
table_name: "/api/data",
columns: [],
description: '기본 데이터 API'
description: "기본 데이터 API",
},
{
table_name: '/api/custom',
table_name: "/api/custom",
columns: [],
description: '사용자 정의 엔드포인트'
}
description: "사용자 정의 엔드포인트",
},
];
}
@@ -213,22 +273,25 @@ export class RestApiConnector implements DatabaseConnector {
async getColumns(endpoint: string): Promise<any[]> {
try {
// GET 요청으로 샘플 데이터를 가져와서 필드 구조 파악
const result = await this.executeQuery(endpoint, 'GET');
const result = await this.executeRequest(endpoint, "GET");
if (result.rows.length > 0) {
const sampleRow = result.rows[0];
return Object.keys(sampleRow).map(key => ({
return Object.keys(sampleRow).map((key) => ({
column_name: key,
data_type: typeof sampleRow[key],
is_nullable: 'YES',
is_nullable: "YES",
column_default: null,
description: `${key} 필드`
description: `${key} 필드`,
}));
}
return [];
} catch (error) {
console.error(`[RestApiConnector] 컬럼 정보 조회 오류 (${endpoint}):`, error);
console.error(
`[RestApiConnector] 컬럼 정보 조회 오류 (${endpoint}):`,
error
);
return [];
}
}
@@ -238,24 +301,29 @@ export class RestApiConnector implements DatabaseConnector {
}
// REST API 전용 메서드들
async getData(endpoint: string, params?: Record<string, any>): Promise<any[]> {
const queryString = params ? '?' + new URLSearchParams(params).toString() : '';
const result = await this.executeQuery(endpoint + queryString, 'GET');
async getData(
endpoint: string,
params?: Record<string, any>
): Promise<any[]> {
const queryString = params
? "?" + new URLSearchParams(params).toString()
: "";
const result = await this.executeRequest(endpoint + queryString, "GET");
return result.rows;
}
async postData(endpoint: string, data: any): Promise<any> {
const result = await this.executeQuery(endpoint, 'POST', data);
const result = await this.executeRequest(endpoint, "POST", data);
return result.rows[0];
}
async putData(endpoint: string, data: any): Promise<any> {
const result = await this.executeQuery(endpoint, 'PUT', data);
const result = await this.executeRequest(endpoint, "PUT", data);
return result.rows[0];
}
async deleteData(endpoint: string): Promise<any> {
const result = await this.executeQuery(endpoint, 'DELETE');
const result = await this.executeRequest(endpoint, "DELETE");
return result.rows[0];
}
}

View File

@@ -1,4 +1,4 @@
import { ConnectionTestResult, TableInfo } from '../types/externalDbTypes';
import { ConnectionTestResult, TableInfo } from "../types/externalDbTypes";
export interface ConnectionConfig {
host: string;
@@ -15,13 +15,15 @@ export interface QueryResult {
rows: any[];
rowCount?: number;
fields?: any[];
affectedRows?: number; // MySQL/MariaDB용
length?: number; // 배열 형태로 반환되는 경우
}
export interface DatabaseConnector {
connect(): Promise<void>;
disconnect(): Promise<void>;
testConnection(): Promise<ConnectionTestResult>;
executeQuery(query: string): Promise<QueryResult>;
executeQuery(query: string, params?: any[]): Promise<QueryResult>; // params 추가
getTables(): Promise<TableInfo[]>;
getColumns(tableName: string): Promise<any[]>; // 특정 테이블의 컬럼 정보 조회
}
}

View File

@@ -0,0 +1,231 @@
import { Router, Request, Response } from "express";
import {
authenticateToken,
AuthenticatedRequest,
} from "../../middleware/authMiddleware";
import { ExternalDbConnectionService } from "../../services/externalDbConnectionService";
import { ExternalDbConnectionFilter } from "../../types/externalDbTypes";
import logger from "../../utils/logger";
const router = Router();
/**
* GET /api/dataflow/node-external-connections/tested
* 노드 플로우용: 테스트에 성공한 외부 DB 커넥션 목록 조회
*/
router.get(
"/tested",
authenticateToken,
async (req: AuthenticatedRequest, res: Response) => {
try {
logger.info("🔍 노드 플로우용 테스트 완료된 커넥션 조회 요청");
// 활성 상태의 외부 커넥션 조회
const filter: ExternalDbConnectionFilter = {
is_active: "Y",
};
const externalConnections =
await ExternalDbConnectionService.getConnections(filter);
if (!externalConnections.success) {
return res.status(400).json(externalConnections);
}
// 외부 커넥션들에 대해 연결 테스트 수행 (제한된 병렬 처리 + 타임아웃 관리)
const validExternalConnections: any[] = [];
const connections = externalConnections.data || [];
const MAX_CONCURRENT = 3; // 최대 동시 연결 수
const TIMEOUT_MS = 3000; // 타임아웃 3초
// 청크 단위로 처리 (최대 3개씩)
for (let i = 0; i < connections.length; i += MAX_CONCURRENT) {
const chunk = connections.slice(i, i + MAX_CONCURRENT);
const chunkResults = await Promise.allSettled(
chunk.map(async (connection) => {
let testPromise: Promise<any> | null = null;
let timeoutId: NodeJS.Timeout | null = null;
try {
// 타임아웃과 함께 테스트 실행
testPromise = ExternalDbConnectionService.testConnectionById(
connection.id!
);
const timeoutPromise = new Promise<never>((_, reject) => {
timeoutId = setTimeout(() => {
reject(new Error("연결 테스트 타임아웃"));
}, TIMEOUT_MS);
});
const testResult = await Promise.race([
testPromise,
timeoutPromise,
]);
// 타임아웃 정리
if (timeoutId) clearTimeout(timeoutId);
if (testResult.success) {
return {
id: connection.id,
connection_name: connection.connection_name,
description: connection.description,
db_type: connection.db_type,
host: connection.host,
port: connection.port,
database_name: connection.database_name,
};
}
return null;
} catch (error) {
// 타임아웃 정리
if (timeoutId) clearTimeout(timeoutId);
// 🔥 타임아웃 시 연결 강제 해제
try {
const { DatabaseConnectorFactory } = await import(
"../../database/DatabaseConnectorFactory"
);
await DatabaseConnectorFactory.closeConnector(
connection.id!,
connection.db_type
);
logger.info(
`🧹 타임아웃/실패로 인한 커넥션 정리 완료: ${connection.connection_name}`
);
} catch (cleanupError) {
logger.warn(
`커넥션 정리 실패 (ID: ${connection.id}):`,
cleanupError instanceof Error
? cleanupError.message
: cleanupError
);
}
logger.warn(
`커넥션 테스트 실패 (ID: ${connection.id}):`,
error instanceof Error ? error.message : error
);
return null;
}
})
);
// fulfilled 결과만 수집
chunkResults.forEach((result) => {
if (result.status === "fulfilled" && result.value !== null) {
validExternalConnections.push(result.value);
}
});
// 다음 청크 처리 전 짧은 대기 (연결 풀 안정화)
if (i + MAX_CONCURRENT < connections.length) {
await new Promise((resolve) => setTimeout(resolve, 100));
}
}
logger.info(
`✅ 테스트 성공한 커넥션: ${validExternalConnections.length}/${externalConnections.data?.length || 0}`
);
return res.status(200).json({
success: true,
data: validExternalConnections,
message: `테스트에 성공한 ${validExternalConnections.length}개의 커넥션을 조회했습니다.`,
});
} catch (error) {
logger.error("노드 플로우용 커넥션 조회 오류:", error);
return res.status(500).json({
success: false,
message: "서버 내부 오류가 발생했습니다.",
error: error instanceof Error ? error.message : "알 수 없는 오류",
});
}
}
);
/**
* GET /api/dataflow/node-external-connections/:id/tables
* 특정 외부 DB의 테이블 목록 조회
*/
router.get(
"/:id/tables",
authenticateToken,
async (req: AuthenticatedRequest, res: Response) => {
try {
const id = parseInt(req.params.id);
if (isNaN(id)) {
return res.status(400).json({
success: false,
message: "유효하지 않은 연결 ID입니다.",
});
}
logger.info(`🔍 외부 DB 테이블 목록 조회: connectionId=${id}`);
const result =
await ExternalDbConnectionService.getTablesFromConnection(id);
return res.status(200).json(result);
} catch (error) {
logger.error("외부 DB 테이블 목록 조회 오류:", error);
return res.status(500).json({
success: false,
message: "서버 내부 오류가 발생했습니다.",
error: error instanceof Error ? error.message : "알 수 없는 오류",
});
}
}
);
/**
* GET /api/dataflow/node-external-connections/:id/tables/:tableName/columns
* 특정 외부 DB 테이블의 컬럼 목록 조회
*/
router.get(
"/:id/tables/:tableName/columns",
authenticateToken,
async (req: AuthenticatedRequest, res: Response) => {
try {
const id = parseInt(req.params.id);
const { tableName } = req.params;
if (isNaN(id)) {
return res.status(400).json({
success: false,
message: "유효하지 않은 연결 ID입니다.",
});
}
if (!tableName) {
return res.status(400).json({
success: false,
message: "테이블명이 필요합니다.",
});
}
logger.info(
`🔍 외부 DB 컬럼 목록 조회: connectionId=${id}, table=${tableName}`
);
const result = await ExternalDbConnectionService.getColumnsFromConnection(
id,
tableName
);
return res.status(200).json(result);
} catch (error) {
logger.error("외부 DB 컬럼 목록 조회 오류:", error);
return res.status(500).json({
success: false,
message: "서버 내부 오류가 발생했습니다.",
error: error instanceof Error ? error.message : "알 수 없는 오류",
});
}
}
);
export default router;

View File

@@ -0,0 +1,237 @@
/**
* 노드 기반 데이터 플로우 API
*/
import { Router, Request, Response } from "express";
import { query, queryOne } from "../../database/db";
import { logger } from "../../utils/logger";
import { NodeFlowExecutionService } from "../../services/nodeFlowExecutionService";
const router = Router();
/**
* 플로우 목록 조회
*/
router.get("/", async (req: Request, res: Response) => {
try {
const flows = await query(
`
SELECT
flow_id as "flowId",
flow_name as "flowName",
flow_description as "flowDescription",
created_at as "createdAt",
updated_at as "updatedAt"
FROM node_flows
ORDER BY updated_at DESC
`,
[]
);
return res.json({
success: true,
data: flows,
});
} catch (error) {
logger.error("플로우 목록 조회 실패:", error);
return res.status(500).json({
success: false,
message: "플로우 목록을 조회하지 못했습니다.",
});
}
});
/**
* 플로우 상세 조회
*/
router.get("/:flowId", async (req: Request, res: Response) => {
try {
const { flowId } = req.params;
const flow = await queryOne(
`
SELECT
flow_id as "flowId",
flow_name as "flowName",
flow_description as "flowDescription",
flow_data as "flowData",
created_at as "createdAt",
updated_at as "updatedAt"
FROM node_flows
WHERE flow_id = $1
`,
[flowId]
);
if (!flow) {
return res.status(404).json({
success: false,
message: "플로우를 찾을 수 없습니다.",
});
}
return res.json({
success: true,
data: flow,
});
} catch (error) {
logger.error("플로우 조회 실패:", error);
return res.status(500).json({
success: false,
message: "플로우를 조회하지 못했습니다.",
});
}
});
/**
* 플로우 저장 (신규)
*/
router.post("/", async (req: Request, res: Response) => {
try {
const { flowName, flowDescription, flowData } = req.body;
if (!flowName || !flowData) {
return res.status(400).json({
success: false,
message: "플로우 이름과 데이터는 필수입니다.",
});
}
const result = await queryOne(
`
INSERT INTO node_flows (flow_name, flow_description, flow_data)
VALUES ($1, $2, $3)
RETURNING flow_id as "flowId"
`,
[flowName, flowDescription || "", flowData]
);
logger.info(`플로우 저장 성공: ${result.flowId}`);
return res.json({
success: true,
message: "플로우가 저장되었습니다.",
data: {
flowId: result.flowId,
},
});
} catch (error) {
logger.error("플로우 저장 실패:", error);
return res.status(500).json({
success: false,
message: "플로우를 저장하지 못했습니다.",
});
}
});
/**
* 플로우 수정
*/
router.put("/", async (req: Request, res: Response) => {
try {
const { flowId, flowName, flowDescription, flowData } = req.body;
if (!flowId || !flowName || !flowData) {
return res.status(400).json({
success: false,
message: "플로우 ID, 이름, 데이터는 필수입니다.",
});
}
await query(
`
UPDATE node_flows
SET flow_name = $1,
flow_description = $2,
flow_data = $3,
updated_at = NOW()
WHERE flow_id = $4
`,
[flowName, flowDescription || "", flowData, flowId]
);
logger.info(`플로우 수정 성공: ${flowId}`);
return res.json({
success: true,
message: "플로우가 수정되었습니다.",
data: {
flowId,
},
});
} catch (error) {
logger.error("플로우 수정 실패:", error);
return res.status(500).json({
success: false,
message: "플로우를 수정하지 못했습니다.",
});
}
});
/**
* 플로우 삭제
*/
router.delete("/:flowId", async (req: Request, res: Response) => {
try {
const { flowId } = req.params;
await query(
`
DELETE FROM node_flows
WHERE flow_id = $1
`,
[flowId]
);
logger.info(`플로우 삭제 성공: ${flowId}`);
return res.json({
success: true,
message: "플로우가 삭제되었습니다.",
});
} catch (error) {
logger.error("플로우 삭제 실패:", error);
return res.status(500).json({
success: false,
message: "플로우를 삭제하지 못했습니다.",
});
}
});
/**
* 플로우 실행
* POST /api/dataflow/node-flows/:flowId/execute
*/
router.post("/:flowId/execute", async (req: Request, res: Response) => {
try {
const { flowId } = req.params;
const contextData = req.body;
logger.info(`🚀 플로우 실행 요청: flowId=${flowId}`, {
contextDataKeys: Object.keys(contextData),
});
// 플로우 실행
const result = await NodeFlowExecutionService.executeFlow(
parseInt(flowId, 10),
contextData
);
return res.json({
success: result.success,
message: result.message,
data: result,
});
} catch (error) {
logger.error("플로우 실행 실패:", error);
return res.status(500).json({
success: false,
message:
error instanceof Error
? error.message
: "플로우 실행 중 오류가 발생했습니다.",
});
}
});
export default router;

View File

@@ -21,6 +21,8 @@ import {
testConditionalConnection,
executeConditionalActions,
} from "../controllers/conditionalConnectionController";
import nodeFlowsRouter from "./dataflow/node-flows";
import nodeExternalConnectionsRouter from "./dataflow/node-external-connections";
const router = express.Router();
@@ -146,4 +148,16 @@ router.post("/diagrams/:diagramId/test-conditions", testConditionalConnection);
*/
router.post("/diagrams/:diagramId/execute-actions", executeConditionalActions);
/**
* 노드 기반 플로우 관리
* /api/dataflow/node-flows/*
*/
router.use("/node-flows", nodeFlowsRouter);
/**
* 노드 플로우용 외부 DB 커넥션 관리
* /api/dataflow/node-external-connections/*
*/
router.use("/node-external-connections", nodeExternalConnectionsRouter);
export default router;

View File

@@ -895,13 +895,18 @@ export class BatchExternalDbService {
);
}
// 데이터 조회
const result = await connector.executeQuery(finalEndpoint, method);
// 데이터 조회 (REST API는 executeRequest 사용)
let result;
if ((connector as any).executeRequest) {
result = await (connector as any).executeRequest(finalEndpoint, method);
} else {
result = await connector.executeQuery(finalEndpoint);
}
let data = result.rows;
// 컬럼 필터링 (지정된 컬럼만 추출)
if (columns && columns.length > 0) {
data = data.map((row) => {
data = data.map((row: any) => {
const filteredRow: any = {};
columns.forEach((col) => {
if (row.hasOwnProperty(col)) {
@@ -1039,7 +1044,16 @@ export class BatchExternalDbService {
);
console.log(`[BatchExternalDbService] 전송할 데이터:`, requestData);
await connector.executeQuery(finalEndpoint, method, requestData);
// REST API는 executeRequest 사용
if ((connector as any).executeRequest) {
await (connector as any).executeRequest(
finalEndpoint,
method,
requestData
);
} else {
await connector.executeQuery(finalEndpoint);
}
successCount++;
} catch (error) {
console.error(`REST API 레코드 전송 실패:`, error);
@@ -1104,7 +1118,12 @@ export class BatchExternalDbService {
);
console.log(`[BatchExternalDbService] 전송할 데이터:`, record);
await connector.executeQuery(endpoint, method, record);
// REST API는 executeRequest 사용
if ((connector as any).executeRequest) {
await (connector as any).executeRequest(endpoint, method, record);
} else {
await connector.executeQuery(endpoint);
}
successCount++;
} catch (error) {
console.error(`REST API 레코드 전송 실패:`, error);

View File

@@ -1205,4 +1205,157 @@ export class ExternalDbConnectionService {
};
}
}
/**
* 특정 외부 DB 연결의 테이블 목록 조회
*/
static async getTablesFromConnection(
connectionId: number
): Promise<ApiResponse<TableInfo[]>> {
try {
// 연결 정보 조회
const connection = await queryOne<any>(
`SELECT * FROM external_db_connections WHERE id = $1`,
[connectionId]
);
if (!connection) {
return {
success: false,
message: `연결 ID ${connectionId}를 찾을 수 없습니다.`,
};
}
// 비밀번호 복호화
const password = connection.password
? PasswordEncryption.decrypt(connection.password)
: "";
// 연결 설정 준비
const config = {
host: connection.host,
port: connection.port,
database: connection.database_name,
user: connection.username,
password: password,
connectionTimeoutMillis:
connection.connection_timeout != null
? connection.connection_timeout * 1000
: undefined,
queryTimeoutMillis:
connection.query_timeout != null
? connection.query_timeout * 1000
: undefined,
ssl:
connection.ssl_enabled === "Y"
? { rejectUnauthorized: false }
: false,
};
// 커넥터 생성
const connector = await DatabaseConnectorFactory.createConnector(
connection.db_type,
config,
connectionId
);
try {
const tables = await connector.getTables();
return {
success: true,
data: tables,
message: `${tables.length}개의 테이블을 조회했습니다.`,
};
} finally {
await DatabaseConnectorFactory.closeConnector(
connectionId,
connection.db_type
);
}
} catch (error) {
logger.error("테이블 목록 조회 실패:", error);
return {
success: false,
message: "테이블 목록 조회 중 오류가 발생했습니다.",
error: error instanceof Error ? error.message : "알 수 없는 오류",
};
}
}
/**
* 특정 외부 DB 테이블의 컬럼 목록 조회
*/
static async getColumnsFromConnection(
connectionId: number,
tableName: string
): Promise<ApiResponse<any[]>> {
try {
// 연결 정보 조회
const connection = await queryOne<any>(
`SELECT * FROM external_db_connections WHERE id = $1`,
[connectionId]
);
if (!connection) {
return {
success: false,
message: `연결 ID ${connectionId}를 찾을 수 없습니다.`,
};
}
// 비밀번호 복호화
const password = connection.password
? PasswordEncryption.decrypt(connection.password)
: "";
// 연결 설정 준비
const config = {
host: connection.host,
port: connection.port,
database: connection.database_name,
user: connection.username,
password: password,
connectionTimeoutMillis:
connection.connection_timeout != null
? connection.connection_timeout * 1000
: undefined,
queryTimeoutMillis:
connection.query_timeout != null
? connection.query_timeout * 1000
: undefined,
ssl:
connection.ssl_enabled === "Y"
? { rejectUnauthorized: false }
: false,
};
// 커넥터 생성
const connector = await DatabaseConnectorFactory.createConnector(
connection.db_type,
config,
connectionId
);
try {
const columns = await connector.getColumns(tableName);
return {
success: true,
data: columns,
message: `${columns.length}개의 컬럼을 조회했습니다.`,
};
} finally {
await DatabaseConnectorFactory.closeConnector(
connectionId,
connection.db_type
);
}
} catch (error) {
logger.error("컬럼 목록 조회 실패:", error);
return {
success: false,
message: "컬럼 목록 조회 중 오류가 발생했습니다.",
error: error instanceof Error ? error.message : "알 수 없는 오류",
};
}
}
}

File diff suppressed because it is too large Load Diff