플로우 외부연결 중간커밋

This commit is contained in:
kjs
2025-10-21 13:19:18 +09:00
parent 967f9a9f5b
commit 0d96ea566b
12 changed files with 1667 additions and 100 deletions

View File

@@ -34,23 +34,31 @@ export class FlowController {
const { name, description, tableName } = req.body;
const userId = (req as any).user?.userId || "system";
if (!name || !tableName) {
console.log("🔍 createFlowDefinition called with:", {
name,
description,
tableName,
});
if (!name) {
res.status(400).json({
success: false,
message: "Name and tableName are required",
message: "Name is required",
});
return;
}
// 테이블 존재 확인
const tableExists =
await this.flowDefinitionService.checkTableExists(tableName);
if (!tableExists) {
res.status(400).json({
success: false,
message: `Table '${tableName}' does not exist`,
});
return;
// 테이블 이름이 제공된 경우에만 존재 확인
if (tableName) {
const tableExists =
await this.flowDefinitionService.checkTableExists(tableName);
if (!tableExists) {
res.status(400).json({
success: false,
message: `Table '${tableName}' does not exist`,
});
return;
}
}
const flowDef = await this.flowDefinitionService.create(

View File

@@ -0,0 +1,230 @@
/**
* 데이터베이스별 쿼리 빌더
* PostgreSQL, MySQL/MariaDB, MSSQL, Oracle 지원
*/
export type DbType = "postgresql" | "mysql" | "mariadb" | "mssql" | "oracle";
/**
* DB별 파라미터 플레이스홀더 생성
*/
export function getPlaceholder(dbType: string, index: number): string {
const normalizedType = dbType.toLowerCase();
switch (normalizedType) {
case "postgresql":
return `$${index}`;
case "mysql":
case "mariadb":
return "?";
case "mssql":
return `@p${index}`;
case "oracle":
return `:${index}`;
default:
// 기본값은 PostgreSQL
return `$${index}`;
}
}
/**
* UPDATE 쿼리 생성
*/
export function buildUpdateQuery(
dbType: string,
tableName: string,
updates: { column: string; value: any }[],
whereColumn: string = "id"
): { query: string; values: any[] } {
const normalizedType = dbType.toLowerCase();
const values: any[] = [];
// SET 절 생성
const setClause = updates
.map((update, index) => {
values.push(update.value);
const placeholder = getPlaceholder(normalizedType, values.length);
return `${update.column} = ${placeholder}`;
})
.join(", ");
// WHERE 절 생성
values.push(undefined); // whereValue는 나중에 설정
const wherePlaceholder = getPlaceholder(normalizedType, values.length);
// updated_at 처리 (DB별 NOW() 함수)
let updatedAtExpr = "NOW()";
if (normalizedType === "mssql") {
updatedAtExpr = "GETDATE()";
} else if (normalizedType === "oracle") {
updatedAtExpr = "SYSDATE";
}
const query = `
UPDATE ${tableName}
SET ${setClause}, updated_at = ${updatedAtExpr}
WHERE ${whereColumn} = ${wherePlaceholder}
`;
return { query, values };
}
/**
* INSERT 쿼리 생성
*/
export function buildInsertQuery(
dbType: string,
tableName: string,
data: Record<string, any>
): { query: string; values: any[]; returningClause: string } {
const normalizedType = dbType.toLowerCase();
const columns = Object.keys(data);
const values = Object.values(data);
// 플레이스홀더 생성
const placeholders = columns
.map((_, index) => getPlaceholder(normalizedType, index + 1))
.join(", ");
let query = `
INSERT INTO ${tableName} (${columns.join(", ")})
VALUES (${placeholders})
`;
// RETURNING/OUTPUT 절 추가 (DB별로 다름)
let returningClause = "";
if (normalizedType === "postgresql") {
query += " RETURNING id";
returningClause = "RETURNING id";
} else if (normalizedType === "mssql") {
// MSSQL은 OUTPUT 절을 INSERT와 VALUES 사이에
const insertIndex = query.indexOf("VALUES");
query =
query.substring(0, insertIndex) +
"OUTPUT INSERTED.id " +
query.substring(insertIndex);
returningClause = "OUTPUT INSERTED.id";
} else if (normalizedType === "oracle") {
query += " RETURNING id INTO :out_id";
returningClause = "RETURNING id INTO :out_id";
}
// MySQL/MariaDB는 RETURNING 없음, LAST_INSERT_ID() 사용
return { query, values, returningClause };
}
/**
* SELECT 쿼리 생성
*/
export function buildSelectQuery(
dbType: string,
tableName: string,
whereColumn: string = "id"
): { query: string; placeholder: string } {
const normalizedType = dbType.toLowerCase();
const placeholder = getPlaceholder(normalizedType, 1);
const query = `SELECT * FROM ${tableName} WHERE ${whereColumn} = ${placeholder}`;
return { query, placeholder };
}
/**
* LIMIT/OFFSET 쿼리 생성 (페이징)
*/
export function buildPaginationClause(
dbType: string,
limit?: number,
offset?: number
): string {
const normalizedType = dbType.toLowerCase();
if (!limit) {
return "";
}
if (
normalizedType === "postgresql" ||
normalizedType === "mysql" ||
normalizedType === "mariadb"
) {
// PostgreSQL, MySQL, MariaDB: LIMIT ... OFFSET ...
let clause = ` LIMIT ${limit}`;
if (offset) {
clause += ` OFFSET ${offset}`;
}
return clause;
} else if (normalizedType === "mssql") {
// MSSQL: OFFSET ... ROWS FETCH NEXT ... ROWS ONLY
if (offset) {
return ` OFFSET ${offset} ROWS FETCH NEXT ${limit} ROWS ONLY`;
} else {
return ` OFFSET 0 ROWS FETCH NEXT ${limit} ROWS ONLY`;
}
} else if (normalizedType === "oracle") {
// Oracle: ROWNUM 또는 FETCH FIRST (12c+)
if (offset) {
return ` OFFSET ${offset} ROWS FETCH NEXT ${limit} ROWS ONLY`;
} else {
return ` FETCH FIRST ${limit} ROWS ONLY`;
}
}
return "";
}
/**
* 트랜잭션 시작
*/
export function getBeginTransactionQuery(dbType: string): string {
const normalizedType = dbType.toLowerCase();
if (normalizedType === "mssql") {
return "BEGIN TRANSACTION";
}
return "BEGIN";
}
/**
* 트랜잭션 커밋
*/
export function getCommitQuery(dbType: string): string {
return "COMMIT";
}
/**
* 트랜잭션 롤백
*/
export function getRollbackQuery(dbType: string): string {
return "ROLLBACK";
}
/**
* DB 연결 테스트 쿼리
*/
export function getConnectionTestQuery(dbType: string): string {
const normalizedType = dbType.toLowerCase();
switch (normalizedType) {
case "postgresql":
return "SELECT 1";
case "mysql":
case "mariadb":
return "SELECT 1";
case "mssql":
return "SELECT 1";
case "oracle":
return "SELECT 1 FROM DUAL";
default:
return "SELECT 1";
}
}

View File

@@ -0,0 +1,477 @@
/**
* 외부 DB 연결 헬퍼
* 플로우 데이터 이동 시 외부 DB 연결 관리
* PostgreSQL, MySQL/MariaDB, MSSQL, Oracle 지원
*/
import { Pool as PgPool } from "pg";
import * as mysql from "mysql2/promise";
import db from "../database/db";
import { CredentialEncryption } from "../utils/credentialEncryption";
import {
getConnectionTestQuery,
getPlaceholder,
getBeginTransactionQuery,
getCommitQuery,
getRollbackQuery,
} from "./dbQueryBuilder";
interface ExternalDbConnection {
id: number;
connectionName: string;
dbType: string;
host: string;
port: number;
database: string;
username: string;
password: string;
isActive: boolean;
}
// 외부 DB 연결 풀 캐시 (타입별로 다른 풀 객체)
const connectionPools = new Map<number, any>();
// 비밀번호 복호화 유틸
const credentialEncryption = new CredentialEncryption(
process.env.ENCRYPTION_SECRET_KEY || "default-secret-key-change-in-production"
);
/**
* 외부 DB 연결 정보 조회
*/
async function getExternalConnection(
connectionId: number
): Promise<ExternalDbConnection | null> {
const query = `
SELECT
id, connection_name, db_type, host, port,
database_name, username, encrypted_password, is_active
FROM external_db_connections
WHERE id = $1 AND is_active = true
`;
const result = await db.query(query, [connectionId]);
if (result.length === 0) {
return null;
}
const row = result[0];
// 비밀번호 복호화
let decryptedPassword = "";
try {
decryptedPassword = credentialEncryption.decrypt(row.encrypted_password);
} catch (error) {
console.error(`비밀번호 복호화 실패 (ID: ${connectionId}):`, error);
throw new Error("외부 DB 비밀번호 복호화에 실패했습니다");
}
return {
id: row.id,
connectionName: row.connection_name,
dbType: row.db_type,
host: row.host,
port: row.port,
database: row.database_name,
username: row.username,
password: decryptedPassword,
isActive: row.is_active,
};
}
/**
* 외부 DB 연결 풀 생성 또는 재사용
*/
export async function getExternalPool(connectionId: number): Promise<any> {
// 캐시된 연결 풀 확인
if (connectionPools.has(connectionId)) {
const poolInfo = connectionPools.get(connectionId)!;
const connection = await getExternalConnection(connectionId);
// 연결이 유효한지 확인
try {
const testQuery = getConnectionTestQuery(connection!.dbType);
await executePoolQuery(poolInfo.pool, connection!.dbType, testQuery, []);
return poolInfo;
} catch (error) {
console.warn(
`캐시된 외부 DB 연결 풀 무효화 (ID: ${connectionId}), 재생성합니다.`
);
connectionPools.delete(connectionId);
await closePool(poolInfo.pool, connection!.dbType);
}
}
// 새로운 연결 풀 생성
const connection = await getExternalConnection(connectionId);
if (!connection) {
throw new Error(
`외부 DB 연결 정보를 찾을 수 없습니다 (ID: ${connectionId})`
);
}
const dbType = connection.dbType.toLowerCase();
let pool: any;
try {
switch (dbType) {
case "postgresql":
pool = await createPostgreSQLPool(connection);
break;
case "mysql":
case "mariadb":
pool = await createMySQLPool(connection);
break;
case "mssql":
pool = await createMSSQLPool(connection);
break;
case "oracle":
pool = await createOraclePool(connection);
break;
default:
throw new Error(`지원하지 않는 DB 타입입니다: ${connection.dbType}`);
}
// 연결 테스트
const testQuery = getConnectionTestQuery(dbType);
await executePoolQuery(pool, dbType, testQuery, []);
console.log(
`✅ 외부 DB 연결 풀 생성 성공 (ID: ${connectionId}, ${connection.connectionName}, ${connection.dbType})`
);
// 캐시에 저장 (dbType 정보 포함)
const poolInfo = { pool, dbType };
connectionPools.set(connectionId, poolInfo);
return poolInfo;
} catch (error) {
if (pool) {
await closePool(pool, dbType);
}
throw new Error(
`외부 DB 연결 실패 (${connection.connectionName}, ${connection.dbType}): ${error}`
);
}
}
/**
* PostgreSQL 연결 풀 생성
*/
async function createPostgreSQLPool(
connection: ExternalDbConnection
): Promise<PgPool> {
return new PgPool({
host: connection.host,
port: connection.port,
database: connection.database,
user: connection.username,
password: connection.password,
max: 5,
idleTimeoutMillis: 30000,
connectionTimeoutMillis: 5000,
});
}
/**
* MySQL/MariaDB 연결 풀 생성
*/
async function createMySQLPool(
connection: ExternalDbConnection
): Promise<mysql.Pool> {
return mysql.createPool({
host: connection.host,
port: connection.port,
database: connection.database,
user: connection.username,
password: connection.password,
connectionLimit: 5,
waitForConnections: true,
queueLimit: 0,
});
}
/**
* MSSQL 연결 풀 생성
*/
async function createMSSQLPool(connection: ExternalDbConnection): Promise<any> {
// mssql 패키지를 동적으로 import (설치되어 있는 경우만)
try {
const sql = require("mssql");
const config = {
user: connection.username,
password: connection.password,
server: connection.host,
port: connection.port,
database: connection.database,
options: {
encrypt: true,
trustServerCertificate: true,
enableArithAbort: true,
},
pool: {
max: 5,
min: 0,
idleTimeoutMillis: 30000,
},
};
const pool = await sql.connect(config);
return pool;
} catch (error) {
throw new Error(
`MSSQL 연결 실패: mssql 패키지가 설치되어 있는지 확인하세요. (${error})`
);
}
}
/**
* Oracle 연결 풀 생성
*/
async function createOraclePool(
connection: ExternalDbConnection
): Promise<any> {
try {
// oracledb를 동적으로 import
const oracledb = require("oracledb");
// Oracle 클라이언트 초기화 (최초 1회만)
if (!oracledb.oracleClientVersion) {
// Instant Client 경로 설정 (환경변수로 지정 가능)
const instantClientPath = process.env.ORACLE_INSTANT_CLIENT_PATH;
if (instantClientPath) {
oracledb.initOracleClient({ libDir: instantClientPath });
}
}
// 연결 문자열 생성
const connectString = connection.database.includes("/")
? connection.database // 이미 전체 연결 문자열인 경우
: `${connection.host}:${connection.port}/${connection.database}`;
const pool = await oracledb.createPool({
user: connection.username,
password: connection.password,
connectString: connectString,
poolMin: 1,
poolMax: 5,
poolIncrement: 1,
poolTimeout: 60, // 60초 후 유휴 연결 해제
queueTimeout: 5000, // 연결 대기 타임아웃 5초
enableStatistics: true,
});
return pool;
} catch (error: any) {
throw new Error(
`Oracle 연결 실패: ${error.message}. oracledb 패키지와 Oracle Instant Client가 설치되어 있는지 확인하세요.`
);
}
}
/**
* 풀에서 쿼리 실행 (DB 타입별 처리)
*/
async function executePoolQuery(
pool: any,
dbType: string,
query: string,
params: any[]
): Promise<any> {
const normalizedType = dbType.toLowerCase();
switch (normalizedType) {
case "postgresql": {
const result = await pool.query(query, params);
return { rows: result.rows, rowCount: result.rowCount };
}
case "mysql":
case "mariadb": {
const [rows] = await pool.query(query, params);
return {
rows: Array.isArray(rows) ? rows : [rows],
rowCount: rows.length,
};
}
case "mssql": {
const request = pool.request();
// MSSQL은 명명된 파라미터 사용
params.forEach((param, index) => {
request.input(`p${index + 1}`, param);
});
const result = await request.query(query);
return { rows: result.recordset, rowCount: result.rowCount };
}
case "oracle": {
const oracledb = require("oracledb");
const connection = await pool.getConnection();
try {
// Oracle은 :1, :2 형식의 바인드 변수 사용
const result = await connection.execute(query, params, {
autoCommit: false, // 트랜잭션 관리를 위해 false
outFormat: oracledb.OUT_FORMAT_OBJECT, // 객체 형식으로 반환
});
return { rows: result.rows || [], rowCount: result.rowCount || 0 };
} finally {
await connection.close();
}
}
default:
throw new Error(`지원하지 않는 DB 타입: ${dbType}`);
}
}
/**
* 연결 풀 종료 (DB 타입별 처리)
*/
async function closePool(pool: any, dbType: string): Promise<void> {
const normalizedType = dbType.toLowerCase();
try {
switch (normalizedType) {
case "postgresql":
case "mysql":
case "mariadb":
await pool.end();
break;
case "mssql":
case "oracle":
await pool.close();
break;
}
} catch (error) {
console.error(`풀 종료 오류 (${dbType}):`, error);
}
}
/**
* 외부 DB 쿼리 실행
*/
export async function executeExternalQuery(
connectionId: number,
query: string,
params: any[] = []
): Promise<any> {
const poolInfo = await getExternalPool(connectionId);
return await executePoolQuery(poolInfo.pool, poolInfo.dbType, query, params);
}
/**
* 외부 DB 트랜잭션 실행
*/
export async function executeExternalTransaction(
connectionId: number,
callback: (client: any, dbType: string) => Promise<any>
): Promise<any> {
const poolInfo = await getExternalPool(connectionId);
const { pool, dbType } = poolInfo;
const normalizedType = dbType.toLowerCase();
let client: any;
try {
switch (normalizedType) {
case "postgresql": {
client = await pool.connect();
await client.query(getBeginTransactionQuery(dbType));
const result = await callback(client, dbType);
await client.query(getCommitQuery(dbType));
return result;
}
case "mysql":
case "mariadb": {
client = await pool.getConnection();
await client.beginTransaction();
const result = await callback(client, dbType);
await client.commit();
return result;
}
case "mssql": {
const transaction = new pool.constructor.Transaction(pool);
await transaction.begin();
client = transaction;
const result = await callback(client, dbType);
await transaction.commit();
return result;
}
case "oracle": {
client = await pool.getConnection();
// Oracle은 명시적 BEGIN 없이 트랜잭션 시작
const result = await callback(client, dbType);
// 명시적 커밋
await client.commit();
return result;
}
default:
throw new Error(`지원하지 않는 DB 타입: ${dbType}`);
}
} catch (error) {
console.error(`외부 DB 트랜잭션 오류 (ID: ${connectionId}):`, error);
// 롤백 시도
if (client) {
try {
switch (normalizedType) {
case "postgresql":
await client.query(getRollbackQuery(dbType));
break;
case "mysql":
case "mariadb":
await client.rollback();
break;
case "mssql":
case "oracle":
await client.rollback();
break;
}
} catch (rollbackError) {
console.error("트랜잭션 롤백 오류:", rollbackError);
}
}
throw error;
} finally {
// 연결 해제
if (client) {
try {
switch (normalizedType) {
case "postgresql":
client.release();
break;
case "mysql":
case "mariadb":
client.release();
break;
case "oracle":
await client.close();
break;
case "mssql":
// MSSQL Transaction 객체는 자동으로 정리됨
break;
}
} catch (releaseError) {
console.error("클라이언트 해제 오류:", releaseError);
}
}
}
}

View File

@@ -6,10 +6,25 @@
*/
import db from "../database/db";
import { FlowAuditLog, FlowIntegrationContext } from "../types/flow";
import {
FlowAuditLog,
FlowIntegrationContext,
FlowDefinition,
} from "../types/flow";
import { FlowDefinitionService } from "./flowDefinitionService";
import { FlowStepService } from "./flowStepService";
import { FlowExternalDbIntegrationService } from "./flowExternalDbIntegrationService";
import {
getExternalPool,
executeExternalQuery,
executeExternalTransaction,
} from "./externalDbHelper";
import {
getPlaceholder,
buildUpdateQuery,
buildInsertQuery,
buildSelectQuery,
} from "./dbQueryBuilder";
export class FlowDataMoveService {
private flowDefinitionService: FlowDefinitionService;
@@ -33,6 +48,28 @@ export class FlowDataMoveService {
userId: string = "system",
additionalData?: Record<string, any>
): Promise<{ success: boolean; targetDataId?: any; message?: string }> {
// 0. 플로우 정의 조회 (DB 소스 확인)
const flowDefinition = await this.flowDefinitionService.findById(flowId);
if (!flowDefinition) {
throw new Error(`플로우를 찾을 수 없습니다 (ID: ${flowId})`);
}
// 외부 DB인 경우 별도 처리
if (
flowDefinition.dbSourceType === "external" &&
flowDefinition.dbConnectionId
) {
return await this.moveDataToStepExternal(
flowDefinition.dbConnectionId,
fromStepId,
toStepId,
dataId,
userId,
additionalData
);
}
// 내부 DB 처리 (기존 로직)
return await db.transaction(async (client) => {
try {
// 1. 단계 정보 조회
@@ -160,7 +197,14 @@ export class FlowDataMoveService {
dataId: any,
additionalData?: Record<string, any>
): Promise<void> {
const statusColumn = toStep.statusColumn || "flow_status";
// 상태 컬럼이 지정되지 않은 경우 에러
if (!toStep.statusColumn) {
throw new Error(
`단계 "${toStep.stepName}"의 상태 컬럼이 지정되지 않았습니다. 플로우 편집 화면에서 "상태 컬럼명"을 설정해주세요.`
);
}
const statusColumn = toStep.statusColumn;
const tableName = fromStep.tableName;
// 추가 필드 업데이트 준비
@@ -590,4 +634,307 @@ export class FlowDataMoveService {
userId,
]);
}
/**
* 외부 DB 데이터 이동 처리
*/
private async moveDataToStepExternal(
dbConnectionId: number,
fromStepId: number,
toStepId: number,
dataId: any,
userId: string = "system",
additionalData?: Record<string, any>
): Promise<{ success: boolean; targetDataId?: any; message?: string }> {
return await executeExternalTransaction(
dbConnectionId,
async (externalClient, dbType) => {
try {
// 1. 단계 정보 조회 (내부 DB에서)
const fromStep = await this.flowStepService.findById(fromStepId);
const toStep = await this.flowStepService.findById(toStepId);
if (!fromStep || !toStep) {
throw new Error("유효하지 않은 단계입니다");
}
let targetDataId = dataId;
let sourceTable = fromStep.tableName;
let targetTable = toStep.tableName || fromStep.tableName;
// 2. 이동 방식에 따라 처리
switch (toStep.moveType || "status") {
case "status":
// 상태 변경 방식
await this.moveByStatusChangeExternal(
externalClient,
dbType,
fromStep,
toStep,
dataId,
additionalData
);
break;
case "table":
// 테이블 이동 방식
targetDataId = await this.moveByTableTransferExternal(
externalClient,
dbType,
fromStep,
toStep,
dataId,
additionalData
);
targetTable = toStep.targetTable || toStep.tableName;
break;
case "both":
// 하이브리드 방식: 둘 다 수행
await this.moveByStatusChangeExternal(
externalClient,
dbType,
fromStep,
toStep,
dataId,
additionalData
);
targetDataId = await this.moveByTableTransferExternal(
externalClient,
dbType,
fromStep,
toStep,
dataId,
additionalData
);
targetTable = toStep.targetTable || toStep.tableName;
break;
default:
throw new Error(
`지원하지 않는 이동 방식입니다: ${toStep.moveType}`
);
}
// 3. 외부 연동 처리는 생략 (외부 DB 자체가 외부이므로)
// 4. 감사 로그 기록 (내부 DB에)
// 외부 DB는 내부 DB 트랜잭션 외부이므로 직접 쿼리 실행
const auditQuery = `
INSERT INTO flow_audit_log (
flow_definition_id, from_step_id, to_step_id,
move_type, source_table, target_table,
source_data_id, target_data_id,
status_from, status_to,
changed_by, note
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12)
`;
await db.query(auditQuery, [
toStep.flowDefinitionId,
fromStep.id,
toStep.id,
toStep.moveType || "status",
sourceTable,
targetTable,
dataId,
targetDataId,
null, // statusFrom
toStep.statusValue || null, // statusTo
userId,
`외부 DB (${dbType}) 데이터 이동`,
]);
return {
success: true,
targetDataId,
message: `데이터 이동이 완료되었습니다 (외부 DB: ${dbType})`,
};
} catch (error: any) {
console.error("외부 DB 데이터 이동 오류:", error);
throw error;
}
}
);
}
/**
* 외부 DB 상태 변경 방식으로 데이터 이동
*/
private async moveByStatusChangeExternal(
externalClient: any,
dbType: string,
fromStep: any,
toStep: any,
dataId: any,
additionalData?: Record<string, any>
): Promise<void> {
// 상태 컬럼이 지정되지 않은 경우 에러
if (!toStep.statusColumn) {
throw new Error(
`단계 "${toStep.stepName}"의 상태 컬럼이 지정되지 않았습니다. 플로우 편집 화면에서 "상태 컬럼명"을 설정해주세요.`
);
}
const statusColumn = toStep.statusColumn;
const tableName = fromStep.tableName;
const normalizedDbType = dbType.toLowerCase();
// 업데이트할 필드 준비
const updateFields: { column: string; value: any }[] = [
{ column: statusColumn, value: toStep.statusValue },
];
// 추가 데이터가 있으면 함께 업데이트
if (additionalData) {
for (const [key, value] of Object.entries(additionalData)) {
updateFields.push({ column: key, value });
}
}
// DB별 쿼리 생성
const { query: updateQuery, values } = buildUpdateQuery(
dbType,
tableName,
updateFields,
"id"
);
// WHERE 절 값 설정 (마지막 파라미터)
values[values.length - 1] = dataId;
// 쿼리 실행 (DB 타입별 처리)
let result: any;
if (normalizedDbType === "postgresql") {
result = await externalClient.query(updateQuery, values);
} else if (normalizedDbType === "mysql" || normalizedDbType === "mariadb") {
[result] = await externalClient.query(updateQuery, values);
} else if (normalizedDbType === "mssql") {
const request = externalClient.request();
values.forEach((val: any, idx: number) => {
request.input(`p${idx + 1}`, val);
});
result = await request.query(updateQuery);
} else if (normalizedDbType === "oracle") {
result = await externalClient.execute(updateQuery, values, {
autoCommit: false,
});
}
// 결과 확인
const affectedRows =
normalizedDbType === "postgresql"
? result.rowCount
: normalizedDbType === "mssql"
? result.rowsAffected[0]
: normalizedDbType === "oracle"
? result.rowsAffected
: result.affectedRows;
if (affectedRows === 0) {
throw new Error(`데이터를 찾을 수 없습니다: ${dataId}`);
}
}
/**
* 외부 DB 테이블 이동 방식으로 데이터 이동
*/
private async moveByTableTransferExternal(
externalClient: any,
dbType: string,
fromStep: any,
toStep: any,
dataId: any,
additionalData?: Record<string, any>
): Promise<any> {
const sourceTable = fromStep.tableName;
const targetTable = toStep.targetTable || toStep.tableName;
const fieldMappings = toStep.fieldMappings || {};
const normalizedDbType = dbType.toLowerCase();
// 1. 소스 데이터 조회
const { query: selectQuery, placeholder } = buildSelectQuery(
dbType,
sourceTable,
"id"
);
let sourceResult: any;
if (normalizedDbType === "postgresql") {
sourceResult = await externalClient.query(selectQuery, [dataId]);
} else if (normalizedDbType === "mysql" || normalizedDbType === "mariadb") {
[sourceResult] = await externalClient.query(selectQuery, [dataId]);
} else if (normalizedDbType === "mssql") {
const request = externalClient.request();
request.input("p1", dataId);
sourceResult = await request.query(selectQuery);
sourceResult = { rows: sourceResult.recordset };
} else if (normalizedDbType === "oracle") {
sourceResult = await externalClient.execute(selectQuery, [dataId], {
autoCommit: false,
outFormat: 4001, // oracledb.OUT_FORMAT_OBJECT
});
}
const rows = sourceResult.rows || sourceResult;
if (!rows || rows.length === 0) {
throw new Error(`소스 데이터를 찾을 수 없습니다: ${dataId}`);
}
const sourceData = rows[0];
// 2. 필드 매핑 적용
const targetData: Record<string, any> = {};
for (const [targetField, sourceField] of Object.entries(fieldMappings)) {
const sourceFieldKey = sourceField as string;
if (sourceData[sourceFieldKey] !== undefined) {
targetData[targetField] = sourceData[sourceFieldKey];
}
}
// 추가 데이터 병합
if (additionalData) {
Object.assign(targetData, additionalData);
}
// 3. 대상 테이블에 삽입
const { query: insertQuery, values } = buildInsertQuery(
dbType,
targetTable,
targetData
);
let insertResult: any;
let newDataId: any;
if (normalizedDbType === "postgresql") {
insertResult = await externalClient.query(insertQuery, values);
newDataId = insertResult.rows[0].id;
} else if (normalizedDbType === "mysql" || normalizedDbType === "mariadb") {
[insertResult] = await externalClient.query(insertQuery, values);
newDataId = insertResult.insertId;
} else if (normalizedDbType === "mssql") {
const request = externalClient.request();
values.forEach((val: any, idx: number) => {
request.input(`p${idx + 1}`, val);
});
insertResult = await request.query(insertQuery);
newDataId = insertResult.recordset[0].id;
} else if (normalizedDbType === "oracle") {
// Oracle RETURNING 절 처리
const outBinds: any = { id: { dir: 3003, type: 2001 } }; // OUT, NUMBER
insertResult = await externalClient.execute(insertQuery, values, {
autoCommit: false,
outBinds: outBinds,
});
newDataId = insertResult.outBinds.id[0];
}
// 4. 필요 시 소스 데이터 삭제 (옵션)
// const deletePlaceholder = getPlaceholder(dbType, 1);
// await externalClient.query(`DELETE FROM ${sourceTable} WHERE id = ${deletePlaceholder}`, [dataId]);
return newDataId;
}
}

View File

@@ -17,18 +17,33 @@ export class FlowDefinitionService {
request: CreateFlowDefinitionRequest,
userId: string
): Promise<FlowDefinition> {
console.log("🔥 flowDefinitionService.create called with:", {
name: request.name,
description: request.description,
tableName: request.tableName,
dbSourceType: request.dbSourceType,
dbConnectionId: request.dbConnectionId,
userId,
});
const query = `
INSERT INTO flow_definition (name, description, table_name, created_by)
VALUES ($1, $2, $3, $4)
INSERT INTO flow_definition (name, description, table_name, db_source_type, db_connection_id, created_by)
VALUES ($1, $2, $3, $4, $5, $6)
RETURNING *
`;
const result = await db.query(query, [
const values = [
request.name,
request.description || null,
request.tableName,
request.tableName || null,
request.dbSourceType || "internal",
request.dbConnectionId || null,
userId,
]);
];
console.log("💾 Executing INSERT with values:", values);
const result = await db.query(query, values);
return this.mapToFlowDefinition(result[0]);
}
@@ -162,6 +177,8 @@ export class FlowDefinitionService {
name: row.name,
description: row.description,
tableName: row.table_name,
dbSourceType: row.db_source_type || "internal",
dbConnectionId: row.db_connection_id,
isActive: row.is_active,
createdBy: row.created_by,
createdAt: row.created_at,

View File

@@ -8,6 +8,8 @@ import { FlowStepDataCount, FlowStepDataList } from "../types/flow";
import { FlowDefinitionService } from "./flowDefinitionService";
import { FlowStepService } from "./flowStepService";
import { FlowConditionParser } from "./flowConditionParser";
import { executeExternalQuery } from "./externalDbHelper";
import { getPlaceholder, buildPaginationClause } from "./dbQueryBuilder";
export class FlowExecutionService {
private flowDefinitionService: FlowDefinitionService;
@@ -28,6 +30,13 @@ export class FlowExecutionService {
throw new Error(`Flow definition not found: ${flowId}`);
}
console.log("🔍 [getStepDataCount] Flow Definition:", {
flowId,
dbSourceType: flowDef.dbSourceType,
dbConnectionId: flowDef.dbConnectionId,
tableName: flowDef.tableName,
});
// 2. 플로우 단계 조회
const step = await this.flowStepService.findById(stepId);
if (!step) {
@@ -46,11 +55,40 @@ export class FlowExecutionService {
step.conditionJson
);
// 5. 카운트 쿼리 실행
// 5. 카운트 쿼리 실행 (내부 또는 외부 DB)
const query = `SELECT COUNT(*) as count FROM ${tableName} WHERE ${where}`;
const result = await db.query(query, params);
return parseInt(result[0].count);
console.log("🔍 [getStepDataCount] Query Info:", {
tableName,
query,
params,
isExternal: flowDef.dbSourceType === "external",
connectionId: flowDef.dbConnectionId,
});
let result: any;
if (flowDef.dbSourceType === "external" && flowDef.dbConnectionId) {
// 외부 DB 조회
console.log(
"✅ [getStepDataCount] Using EXTERNAL DB:",
flowDef.dbConnectionId
);
const externalResult = await executeExternalQuery(
flowDef.dbConnectionId,
query,
params
);
console.log("📦 [getStepDataCount] External result:", externalResult);
result = externalResult.rows;
} else {
// 내부 DB 조회
console.log("✅ [getStepDataCount] Using INTERNAL DB");
result = await db.query(query, params);
}
const count = parseInt(result[0].count || result[0].COUNT);
console.log("✅ [getStepDataCount] Final count:", count);
return count;
}
/**
@@ -88,47 +126,98 @@ export class FlowExecutionService {
const offset = (page - 1) * pageSize;
const isExternalDb =
flowDef.dbSourceType === "external" && flowDef.dbConnectionId;
// 5. 전체 카운트
const countQuery = `SELECT COUNT(*) as count FROM ${tableName} WHERE ${where}`;
const countResult = await db.query(countQuery, params);
const total = parseInt(countResult[0].count);
let countResult: any;
let total: number;
// 6. 테이블의 Primary Key 컬럼 찾기
let orderByColumn = "";
try {
const pkQuery = `
SELECT a.attname
FROM pg_index i
JOIN pg_attribute a ON a.attrelid = i.indrelid AND a.attnum = ANY(i.indkey)
WHERE i.indrelid = $1::regclass
AND i.indisprimary
LIMIT 1
`;
const pkResult = await db.query(pkQuery, [tableName]);
if (pkResult.length > 0) {
orderByColumn = pkResult[0].attname;
}
} catch (err) {
// Primary Key를 찾지 못하면 ORDER BY 없이 진행
console.warn(`Could not find primary key for table ${tableName}:`, err);
if (isExternalDb) {
const externalCountResult = await executeExternalQuery(
flowDef.dbConnectionId!,
countQuery,
params
);
countResult = externalCountResult.rows;
total = parseInt(countResult[0].count || countResult[0].COUNT);
} else {
countResult = await db.query(countQuery, params);
total = parseInt(countResult[0].count);
}
// 7. 데이터 조회
const orderByClause = orderByColumn ? `ORDER BY ${orderByColumn} DESC` : "";
const dataQuery = `
SELECT * FROM ${tableName}
WHERE ${where}
${orderByClause}
LIMIT $${params.length + 1} OFFSET $${params.length + 2}
`;
const dataResult = await db.query(dataQuery, [...params, pageSize, offset]);
// 6. 데이터 조회 (DB 타입별 페이징 처리)
let dataQuery: string;
let dataParams: any[];
return {
records: dataResult,
total,
page,
pageSize,
};
if (isExternalDb) {
// 외부 DB는 id 컬럼으로 정렬 (가정)
// DB 타입에 따른 페이징 절은 빌더에서 처리하지 않고 직접 작성
// PostgreSQL, MySQL, MSSQL, Oracle 모두 지원하도록 단순화
dataQuery = `
SELECT * FROM ${tableName}
WHERE ${where}
ORDER BY id DESC
LIMIT ${pageSize} OFFSET ${offset}
`;
dataParams = params;
const externalDataResult = await executeExternalQuery(
flowDef.dbConnectionId!,
dataQuery,
dataParams
);
return {
records: externalDataResult.rows,
total,
page,
pageSize,
};
} else {
// 내부 DB (PostgreSQL)
// Primary Key 컬럼 찾기
let orderByColumn = "";
try {
const pkQuery = `
SELECT a.attname
FROM pg_index i
JOIN pg_attribute a ON a.attrelid = i.indrelid AND a.attnum = ANY(i.indkey)
WHERE i.indrelid = $1::regclass
AND i.indisprimary
LIMIT 1
`;
const pkResult = await db.query(pkQuery, [tableName]);
if (pkResult.length > 0) {
orderByColumn = pkResult[0].attname;
}
} catch (err) {
console.warn(`Could not find primary key for table ${tableName}:`, err);
}
const orderByClause = orderByColumn
? `ORDER BY ${orderByColumn} DESC`
: "";
dataQuery = `
SELECT * FROM ${tableName}
WHERE ${where}
${orderByClause}
LIMIT $${params.length + 1} OFFSET $${params.length + 2}
`;
const dataResult = await db.query(dataQuery, [
...params,
pageSize,
offset,
]);
return {
records: dataResult,
total,
page,
pageSize,
};
}
}
/**

View File

@@ -8,6 +8,8 @@ export interface FlowDefinition {
name: string;
description?: string;
tableName: string;
dbSourceType?: "internal" | "external"; // 데이터베이스 소스 타입
dbConnectionId?: number; // 외부 DB 연결 ID (external인 경우)
isActive: boolean;
createdBy?: string;
createdAt: Date;
@@ -19,6 +21,8 @@ export interface CreateFlowDefinitionRequest {
name: string;
description?: string;
tableName: string;
dbSourceType?: "internal" | "external"; // 데이터베이스 소스 타입
dbConnectionId?: number; // 외부 DB 연결 ID
}
// 플로우 정의 수정 요청