feat: Add procedure and function management in flow controller
- Introduced new endpoints in FlowController for listing procedures and retrieving procedure parameters, enhancing the flow management capabilities. - Updated FlowDataMoveService to support procedure calls during data movement, ensuring seamless integration with external and internal databases. - Enhanced NodeFlowExecutionService to execute procedure call actions, allowing for dynamic execution of stored procedures within flow nodes. - Updated frontend components to support procedure selection and parameter management, improving user experience in configuring flow steps. - Added necessary types and API functions for handling procedure-related data, ensuring type safety and clarity in implementation.
This commit is contained in:
@@ -9,6 +9,7 @@ import { FlowStepService } from "../services/flowStepService";
|
||||
import { FlowConnectionService } from "../services/flowConnectionService";
|
||||
import { FlowExecutionService } from "../services/flowExecutionService";
|
||||
import { FlowDataMoveService } from "../services/flowDataMoveService";
|
||||
import { FlowProcedureService } from "../services/flowProcedureService";
|
||||
|
||||
export class FlowController {
|
||||
private flowDefinitionService: FlowDefinitionService;
|
||||
@@ -16,6 +17,7 @@ export class FlowController {
|
||||
private flowConnectionService: FlowConnectionService;
|
||||
private flowExecutionService: FlowExecutionService;
|
||||
private flowDataMoveService: FlowDataMoveService;
|
||||
private flowProcedureService: FlowProcedureService;
|
||||
|
||||
constructor() {
|
||||
this.flowDefinitionService = new FlowDefinitionService();
|
||||
@@ -23,6 +25,7 @@ export class FlowController {
|
||||
this.flowConnectionService = new FlowConnectionService();
|
||||
this.flowExecutionService = new FlowExecutionService();
|
||||
this.flowDataMoveService = new FlowDataMoveService();
|
||||
this.flowProcedureService = new FlowProcedureService();
|
||||
}
|
||||
|
||||
// ==================== 플로우 정의 ====================
|
||||
@@ -936,4 +939,94 @@ export class FlowController {
|
||||
});
|
||||
}
|
||||
};
|
||||
|
||||
// ==================== 프로시저/함수 ====================
|
||||
|
||||
/**
|
||||
* 프로시저/함수 목록 조회
|
||||
*/
|
||||
listProcedures = async (req: Request, res: Response): Promise<void> => {
|
||||
try {
|
||||
const dbSource = (req.query.dbSource as string) || "internal";
|
||||
const connectionId = req.query.connectionId
|
||||
? parseInt(req.query.connectionId as string)
|
||||
: undefined;
|
||||
const schema = req.query.schema as string | undefined;
|
||||
|
||||
if (dbSource !== "internal" && dbSource !== "external") {
|
||||
res.status(400).json({
|
||||
success: false,
|
||||
message: "dbSource는 internal 또는 external이어야 합니다",
|
||||
});
|
||||
return;
|
||||
}
|
||||
|
||||
if (dbSource === "external" && !connectionId) {
|
||||
res.status(400).json({
|
||||
success: false,
|
||||
message: "외부 DB 조회 시 connectionId가 필요합니다",
|
||||
});
|
||||
return;
|
||||
}
|
||||
|
||||
const procedures = await this.flowProcedureService.listProcedures(
|
||||
dbSource,
|
||||
connectionId,
|
||||
schema
|
||||
);
|
||||
|
||||
res.json({ success: true, data: procedures });
|
||||
} catch (error: any) {
|
||||
console.error("프로시저 목록 조회 실패:", error);
|
||||
res.status(500).json({
|
||||
success: false,
|
||||
message: error.message || "프로시저 목록 조회에 실패했습니다",
|
||||
});
|
||||
}
|
||||
};
|
||||
|
||||
/**
|
||||
* 프로시저/함수 파라미터 조회
|
||||
*/
|
||||
getProcedureParameters = async (req: Request, res: Response): Promise<void> => {
|
||||
try {
|
||||
const { name } = req.params;
|
||||
const dbSource = (req.query.dbSource as string) || "internal";
|
||||
const connectionId = req.query.connectionId
|
||||
? parseInt(req.query.connectionId as string)
|
||||
: undefined;
|
||||
const schema = req.query.schema as string | undefined;
|
||||
|
||||
if (!name) {
|
||||
res.status(400).json({
|
||||
success: false,
|
||||
message: "프로시저 이름이 필요합니다",
|
||||
});
|
||||
return;
|
||||
}
|
||||
|
||||
if (dbSource !== "internal" && dbSource !== "external") {
|
||||
res.status(400).json({
|
||||
success: false,
|
||||
message: "dbSource는 internal 또는 external이어야 합니다",
|
||||
});
|
||||
return;
|
||||
}
|
||||
|
||||
const parameters = await this.flowProcedureService.getProcedureParameters(
|
||||
name,
|
||||
dbSource as "internal" | "external",
|
||||
connectionId,
|
||||
schema
|
||||
);
|
||||
|
||||
res.json({ success: true, data: parameters });
|
||||
} catch (error: any) {
|
||||
console.error("프로시저 파라미터 조회 실패:", error);
|
||||
res.status(500).json({
|
||||
success: false,
|
||||
message: error.message || "프로시저 파라미터 조회에 실패했습니다",
|
||||
});
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
@@ -50,4 +50,8 @@ router.put("/:flowId/step/:stepId/data/:recordId", flowController.updateStepData
|
||||
router.get("/audit/:flowId/:recordId", flowController.getAuditLogs);
|
||||
router.get("/audit/:flowId", flowController.getFlowAuditLogs);
|
||||
|
||||
// ==================== 프로시저/함수 ====================
|
||||
router.get("/procedures", flowController.listProcedures);
|
||||
router.get("/procedures/:name/parameters", flowController.getProcedureParameters);
|
||||
|
||||
export default router;
|
||||
|
||||
@@ -26,16 +26,20 @@ import {
|
||||
buildSelectQuery,
|
||||
} from "./dbQueryBuilder";
|
||||
import { FlowConditionParser } from "./flowConditionParser";
|
||||
import { FlowProcedureService } from "./flowProcedureService";
|
||||
import { FlowProcedureConfig } from "../types/flow";
|
||||
|
||||
export class FlowDataMoveService {
|
||||
private flowDefinitionService: FlowDefinitionService;
|
||||
private flowStepService: FlowStepService;
|
||||
private externalDbIntegrationService: FlowExternalDbIntegrationService;
|
||||
private flowProcedureService: FlowProcedureService;
|
||||
|
||||
constructor() {
|
||||
this.flowDefinitionService = new FlowDefinitionService();
|
||||
this.flowStepService = new FlowStepService();
|
||||
this.externalDbIntegrationService = new FlowExternalDbIntegrationService();
|
||||
this.flowProcedureService = new FlowProcedureService();
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -90,6 +94,64 @@ export class FlowDataMoveService {
|
||||
let sourceTable = fromStep.tableName;
|
||||
let targetTable = toStep.tableName || fromStep.tableName;
|
||||
|
||||
// 1.5. 프로시저 호출 (스텝 이동 전 실행, 실패 시 전체 롤백)
|
||||
if (
|
||||
toStep.integrationType === "procedure" &&
|
||||
toStep.integrationConfig &&
|
||||
(toStep.integrationConfig as FlowProcedureConfig).type === "procedure"
|
||||
) {
|
||||
const procConfig = toStep.integrationConfig as FlowProcedureConfig;
|
||||
// 레코드 데이터 조회 (파라미터 매핑용)
|
||||
let recordData: Record<string, any> = {};
|
||||
try {
|
||||
const recordTable = FlowConditionParser.sanitizeTableName(
|
||||
sourceTable || flowDefinition.tableName
|
||||
);
|
||||
const recordResult = await client.query(
|
||||
`SELECT * FROM ${recordTable} WHERE id = $1 LIMIT 1`,
|
||||
[dataId]
|
||||
);
|
||||
if (recordResult.rows && recordResult.rows.length > 0) {
|
||||
recordData = recordResult.rows[0];
|
||||
}
|
||||
} catch (err: any) {
|
||||
console.warn("프로시저 파라미터용 레코드 조회 실패:", err.message);
|
||||
}
|
||||
|
||||
console.log(`프로시저 호출 시작: ${procConfig.procedureName}`, {
|
||||
flowId,
|
||||
fromStepId,
|
||||
toStepId,
|
||||
dataId,
|
||||
dbSource: procConfig.dbSource,
|
||||
});
|
||||
|
||||
const procResult = await this.flowProcedureService.executeProcedure(
|
||||
procConfig,
|
||||
recordData,
|
||||
procConfig.dbSource === "internal" ? client : undefined
|
||||
);
|
||||
|
||||
console.log(`프로시저 호출 완료: ${procConfig.procedureName}`, {
|
||||
success: procResult.success,
|
||||
});
|
||||
|
||||
// 프로시저 실행 로그 기록
|
||||
await this.logIntegration(
|
||||
flowId,
|
||||
toStep.id,
|
||||
dataId,
|
||||
"procedure",
|
||||
procConfig.connectionId,
|
||||
procConfig,
|
||||
procResult.result,
|
||||
"success",
|
||||
undefined,
|
||||
0,
|
||||
userId
|
||||
);
|
||||
}
|
||||
|
||||
// 2. 이동 방식에 따라 처리
|
||||
switch (toStep.moveType || "status") {
|
||||
case "status":
|
||||
@@ -603,18 +665,19 @@ export class FlowDataMoveService {
|
||||
}
|
||||
break;
|
||||
|
||||
case "procedure":
|
||||
// 프로시저는 데이터 이동 전에 이미 실행됨 (step 1.5)
|
||||
break;
|
||||
|
||||
case "rest_api":
|
||||
// REST API 연동 (추후 구현)
|
||||
console.warn("REST API 연동은 아직 구현되지 않았습니다");
|
||||
break;
|
||||
|
||||
case "webhook":
|
||||
// Webhook 연동 (추후 구현)
|
||||
console.warn("Webhook 연동은 아직 구현되지 않았습니다");
|
||||
break;
|
||||
|
||||
case "hybrid":
|
||||
// 복합 연동 (추후 구현)
|
||||
console.warn("복합 연동은 아직 구현되지 않았습니다");
|
||||
break;
|
||||
|
||||
@@ -716,6 +779,40 @@ export class FlowDataMoveService {
|
||||
let sourceTable = fromStep.tableName;
|
||||
let targetTable = toStep.tableName || fromStep.tableName;
|
||||
|
||||
// 1.5. 프로시저 호출 (외부 DB 경로 - 스텝 이동 전)
|
||||
if (
|
||||
toStep.integrationType === "procedure" &&
|
||||
toStep.integrationConfig &&
|
||||
(toStep.integrationConfig as FlowProcedureConfig).type === "procedure"
|
||||
) {
|
||||
const procConfig = toStep.integrationConfig as FlowProcedureConfig;
|
||||
let recordData: Record<string, any> = {};
|
||||
try {
|
||||
const recordTable = FlowConditionParser.sanitizeTableName(
|
||||
sourceTable || ""
|
||||
);
|
||||
if (recordTable) {
|
||||
const placeholder = getPlaceholder(dbType, 1);
|
||||
const recordResult = await externalClient.query(
|
||||
`SELECT * FROM ${recordTable} WHERE id = ${placeholder}`,
|
||||
[dataId]
|
||||
);
|
||||
const rows = recordResult.rows || recordResult;
|
||||
if (Array.isArray(rows) && rows.length > 0) {
|
||||
recordData = rows[0];
|
||||
}
|
||||
}
|
||||
} catch (err: any) {
|
||||
console.warn("프로시저 파라미터용 레코드 조회 실패 (외부):", err.message);
|
||||
}
|
||||
|
||||
await this.flowProcedureService.executeProcedure(
|
||||
procConfig,
|
||||
recordData,
|
||||
procConfig.dbSource === "external" ? undefined : undefined
|
||||
);
|
||||
}
|
||||
|
||||
// 2. 이동 방식에 따라 처리
|
||||
switch (toStep.moveType || "status") {
|
||||
case "status":
|
||||
|
||||
429
backend-node/src/services/flowProcedureService.ts
Normal file
429
backend-node/src/services/flowProcedureService.ts
Normal file
@@ -0,0 +1,429 @@
|
||||
/**
|
||||
* 플로우 프로시저 호출 서비스
|
||||
* 내부/외부 DB의 프로시저/함수 목록 조회, 파라미터 조회, 실행을 담당
|
||||
*/
|
||||
|
||||
import db from "../database/db";
|
||||
import {
|
||||
getExternalPool,
|
||||
executeExternalQuery,
|
||||
} from "./externalDbHelper";
|
||||
import { getPlaceholder } from "./dbQueryBuilder";
|
||||
import {
|
||||
FlowProcedureConfig,
|
||||
FlowProcedureParam,
|
||||
ProcedureListItem,
|
||||
ProcedureParameterInfo,
|
||||
} from "../types/flow";
|
||||
|
||||
export class FlowProcedureService {
|
||||
/**
|
||||
* 프로시저/함수 목록 조회
|
||||
* information_schema.routines에서 사용 가능한 프로시저/함수를 가져온다
|
||||
*/
|
||||
async listProcedures(
|
||||
dbSource: "internal" | "external",
|
||||
connectionId?: number,
|
||||
schema?: string
|
||||
): Promise<ProcedureListItem[]> {
|
||||
if (dbSource === "external" && connectionId) {
|
||||
return this.listExternalProcedures(connectionId, schema);
|
||||
}
|
||||
return this.listInternalProcedures(schema);
|
||||
}
|
||||
|
||||
private async listInternalProcedures(
|
||||
schema?: string
|
||||
): Promise<ProcedureListItem[]> {
|
||||
const targetSchema = schema || "public";
|
||||
// 트리거 함수(data_type='trigger')는 직접 호출 대상이 아니므로 제외
|
||||
const query = `
|
||||
SELECT
|
||||
routine_name AS name,
|
||||
routine_schema AS schema,
|
||||
routine_type AS type,
|
||||
data_type AS return_type
|
||||
FROM information_schema.routines
|
||||
WHERE routine_schema = $1
|
||||
AND routine_type IN ('PROCEDURE', 'FUNCTION')
|
||||
AND data_type != 'trigger'
|
||||
ORDER BY routine_type, routine_name
|
||||
`;
|
||||
const rows = await db.query(query, [targetSchema]);
|
||||
return rows.map((r: any) => ({
|
||||
name: r.name,
|
||||
schema: r.schema,
|
||||
type: r.type as "PROCEDURE" | "FUNCTION",
|
||||
returnType: r.return_type || undefined,
|
||||
}));
|
||||
}
|
||||
|
||||
private async listExternalProcedures(
|
||||
connectionId: number,
|
||||
schema?: string
|
||||
): Promise<ProcedureListItem[]> {
|
||||
const poolInfo = await getExternalPool(connectionId);
|
||||
const dbType = poolInfo.dbType.toLowerCase();
|
||||
|
||||
let query: string;
|
||||
let params: any[];
|
||||
|
||||
switch (dbType) {
|
||||
case "postgresql": {
|
||||
const targetSchema = schema || "public";
|
||||
query = `
|
||||
SELECT
|
||||
routine_name AS name,
|
||||
routine_schema AS schema,
|
||||
routine_type AS type,
|
||||
data_type AS return_type
|
||||
FROM information_schema.routines
|
||||
WHERE routine_schema = $1
|
||||
AND routine_type IN ('PROCEDURE', 'FUNCTION')
|
||||
AND data_type != 'trigger'
|
||||
ORDER BY routine_type, routine_name
|
||||
`;
|
||||
params = [targetSchema];
|
||||
break;
|
||||
}
|
||||
case "mysql":
|
||||
case "mariadb": {
|
||||
query = `
|
||||
SELECT
|
||||
ROUTINE_NAME AS name,
|
||||
ROUTINE_SCHEMA AS \`schema\`,
|
||||
ROUTINE_TYPE AS type,
|
||||
DATA_TYPE AS return_type
|
||||
FROM information_schema.ROUTINES
|
||||
WHERE ROUTINE_SCHEMA = DATABASE()
|
||||
AND ROUTINE_TYPE IN ('PROCEDURE', 'FUNCTION')
|
||||
ORDER BY ROUTINE_TYPE, ROUTINE_NAME
|
||||
`;
|
||||
params = [];
|
||||
break;
|
||||
}
|
||||
case "mssql": {
|
||||
query = `
|
||||
SELECT
|
||||
ROUTINE_NAME AS name,
|
||||
ROUTINE_SCHEMA AS [schema],
|
||||
ROUTINE_TYPE AS type,
|
||||
DATA_TYPE AS return_type
|
||||
FROM INFORMATION_SCHEMA.ROUTINES
|
||||
WHERE ROUTINE_TYPE IN ('PROCEDURE', 'FUNCTION')
|
||||
ORDER BY ROUTINE_TYPE, ROUTINE_NAME
|
||||
`;
|
||||
params = [];
|
||||
break;
|
||||
}
|
||||
default:
|
||||
throw new Error(`프로시저 목록 조회 미지원 DB: ${dbType}`);
|
||||
}
|
||||
|
||||
const result = await executeExternalQuery(connectionId, query, params);
|
||||
return (result.rows || []).map((r: any) => ({
|
||||
name: r.name || r.NAME,
|
||||
schema: r.schema || r.SCHEMA || "",
|
||||
type: (r.type || r.TYPE || "FUNCTION").toUpperCase() as "PROCEDURE" | "FUNCTION",
|
||||
returnType: r.return_type || r.RETURN_TYPE || undefined,
|
||||
}));
|
||||
}
|
||||
|
||||
/**
|
||||
* 프로시저/함수 파라미터 정보 조회
|
||||
*/
|
||||
async getProcedureParameters(
|
||||
procedureName: string,
|
||||
dbSource: "internal" | "external",
|
||||
connectionId?: number,
|
||||
schema?: string
|
||||
): Promise<ProcedureParameterInfo[]> {
|
||||
if (dbSource === "external" && connectionId) {
|
||||
return this.getExternalProcedureParameters(
|
||||
connectionId,
|
||||
procedureName,
|
||||
schema
|
||||
);
|
||||
}
|
||||
return this.getInternalProcedureParameters(procedureName, schema);
|
||||
}
|
||||
|
||||
private async getInternalProcedureParameters(
|
||||
procedureName: string,
|
||||
schema?: string
|
||||
): Promise<ProcedureParameterInfo[]> {
|
||||
const targetSchema = schema || "public";
|
||||
// PostgreSQL의 specific_name은 routine_name + OID 형태이므로 서브쿼리로 매칭
|
||||
const query = `
|
||||
SELECT
|
||||
p.parameter_name AS name,
|
||||
p.ordinal_position AS position,
|
||||
p.data_type,
|
||||
p.parameter_mode AS mode,
|
||||
p.parameter_default AS default_value
|
||||
FROM information_schema.parameters p
|
||||
WHERE p.specific_schema = $1
|
||||
AND p.specific_name IN (
|
||||
SELECT r.specific_name FROM information_schema.routines r
|
||||
WHERE r.routine_schema = $1 AND r.routine_name = $2
|
||||
LIMIT 1
|
||||
)
|
||||
AND p.parameter_name IS NOT NULL
|
||||
ORDER BY p.ordinal_position
|
||||
`;
|
||||
const rows = await db.query(query, [targetSchema, procedureName]);
|
||||
return rows.map((r: any) => ({
|
||||
name: r.name,
|
||||
position: parseInt(r.position, 10),
|
||||
dataType: r.data_type,
|
||||
mode: this.normalizeParamMode(r.mode),
|
||||
defaultValue: r.default_value || undefined,
|
||||
}));
|
||||
}
|
||||
|
||||
private async getExternalProcedureParameters(
|
||||
connectionId: number,
|
||||
procedureName: string,
|
||||
schema?: string
|
||||
): Promise<ProcedureParameterInfo[]> {
|
||||
const poolInfo = await getExternalPool(connectionId);
|
||||
const dbType = poolInfo.dbType.toLowerCase();
|
||||
|
||||
let query: string;
|
||||
let params: any[];
|
||||
|
||||
switch (dbType) {
|
||||
case "postgresql": {
|
||||
const targetSchema = schema || "public";
|
||||
query = `
|
||||
SELECT
|
||||
p.parameter_name AS name,
|
||||
p.ordinal_position AS position,
|
||||
p.data_type,
|
||||
p.parameter_mode AS mode,
|
||||
p.parameter_default AS default_value
|
||||
FROM information_schema.parameters p
|
||||
WHERE p.specific_schema = $1
|
||||
AND p.specific_name IN (
|
||||
SELECT r.specific_name FROM information_schema.routines r
|
||||
WHERE r.routine_schema = $1 AND r.routine_name = $2
|
||||
LIMIT 1
|
||||
)
|
||||
AND p.parameter_name IS NOT NULL
|
||||
ORDER BY p.ordinal_position
|
||||
`;
|
||||
params = [targetSchema, procedureName];
|
||||
break;
|
||||
}
|
||||
case "mysql":
|
||||
case "mariadb": {
|
||||
query = `
|
||||
SELECT
|
||||
PARAMETER_NAME AS name,
|
||||
ORDINAL_POSITION AS position,
|
||||
DATA_TYPE AS data_type,
|
||||
PARAMETER_MODE AS mode,
|
||||
'' AS default_value
|
||||
FROM information_schema.PARAMETERS
|
||||
WHERE SPECIFIC_SCHEMA = DATABASE()
|
||||
AND SPECIFIC_NAME = ?
|
||||
AND PARAMETER_NAME IS NOT NULL
|
||||
ORDER BY ORDINAL_POSITION
|
||||
`;
|
||||
params = [procedureName];
|
||||
break;
|
||||
}
|
||||
case "mssql": {
|
||||
query = `
|
||||
SELECT
|
||||
PARAMETER_NAME AS name,
|
||||
ORDINAL_POSITION AS position,
|
||||
DATA_TYPE AS data_type,
|
||||
PARAMETER_MODE AS mode,
|
||||
'' AS default_value
|
||||
FROM INFORMATION_SCHEMA.PARAMETERS
|
||||
WHERE SPECIFIC_NAME = @p1
|
||||
AND PARAMETER_NAME IS NOT NULL
|
||||
ORDER BY ORDINAL_POSITION
|
||||
`;
|
||||
params = [procedureName];
|
||||
break;
|
||||
}
|
||||
default:
|
||||
throw new Error(`파라미터 조회 미지원 DB: ${dbType}`);
|
||||
}
|
||||
|
||||
const result = await executeExternalQuery(connectionId, query, params);
|
||||
return (result.rows || []).map((r: any) => ({
|
||||
name: (r.name || r.NAME || "").replace(/^@/, ""),
|
||||
position: parseInt(r.position || r.POSITION || "0", 10),
|
||||
dataType: r.data_type || r.DATA_TYPE || "unknown",
|
||||
mode: this.normalizeParamMode(r.mode || r.MODE),
|
||||
defaultValue: r.default_value || r.DEFAULT_VALUE || undefined,
|
||||
}));
|
||||
}
|
||||
|
||||
/**
|
||||
* 프로시저/함수 실행
|
||||
* 내부 DB는 기존 트랜잭션 client를 사용, 외부 DB는 별도 연결
|
||||
*/
|
||||
async executeProcedure(
|
||||
config: FlowProcedureConfig,
|
||||
recordData: Record<string, any>,
|
||||
client?: any
|
||||
): Promise<{ success: boolean; result?: any; error?: string }> {
|
||||
const paramValues = this.resolveParameters(config.parameters, recordData);
|
||||
|
||||
if (config.dbSource === "internal") {
|
||||
return this.executeInternalProcedure(config, paramValues, client);
|
||||
}
|
||||
|
||||
if (!config.connectionId) {
|
||||
throw new Error("외부 DB 프로시저 호출에 connectionId가 필요합니다");
|
||||
}
|
||||
return this.executeExternalProcedure(config, paramValues);
|
||||
}
|
||||
|
||||
/**
|
||||
* 내부 DB 프로시저 실행 (트랜잭션 client 공유)
|
||||
*/
|
||||
private async executeInternalProcedure(
|
||||
config: FlowProcedureConfig,
|
||||
paramValues: any[],
|
||||
client?: any
|
||||
): Promise<{ success: boolean; result?: any; error?: string }> {
|
||||
const schema = config.procedureSchema || "public";
|
||||
const safeName = this.sanitizeName(config.procedureName);
|
||||
const safeSchema = this.sanitizeName(schema);
|
||||
const qualifiedName = `${safeSchema}.${safeName}`;
|
||||
|
||||
const placeholders = paramValues.map((_, i) => `$${i + 1}`).join(", ");
|
||||
|
||||
let sql: string;
|
||||
if (config.callType === "function") {
|
||||
// SELECT * FROM fn()을 사용하여 OUT 파라미터를 개별 컬럼으로 반환
|
||||
sql = `SELECT * FROM ${qualifiedName}(${placeholders})`;
|
||||
} else {
|
||||
sql = `CALL ${qualifiedName}(${placeholders})`;
|
||||
}
|
||||
|
||||
try {
|
||||
const executor = client || db;
|
||||
const result = client
|
||||
? await client.query(sql, paramValues)
|
||||
: await db.query(sql, paramValues);
|
||||
|
||||
const rows = client ? result.rows : result;
|
||||
return { success: true, result: rows };
|
||||
} catch (error: any) {
|
||||
throw new Error(
|
||||
`프로시저 실행 실패 [${qualifiedName}]: ${error.message}`
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 외부 DB 프로시저 실행
|
||||
*/
|
||||
private async executeExternalProcedure(
|
||||
config: FlowProcedureConfig,
|
||||
paramValues: any[]
|
||||
): Promise<{ success: boolean; result?: any; error?: string }> {
|
||||
const connectionId = config.connectionId!;
|
||||
const poolInfo = await getExternalPool(connectionId);
|
||||
const dbType = poolInfo.dbType.toLowerCase();
|
||||
const safeName = this.sanitizeName(config.procedureName);
|
||||
const safeSchema = config.procedureSchema
|
||||
? this.sanitizeName(config.procedureSchema)
|
||||
: null;
|
||||
|
||||
let sql: string;
|
||||
|
||||
switch (dbType) {
|
||||
case "postgresql": {
|
||||
const qualifiedName = safeSchema
|
||||
? `${safeSchema}.${safeName}`
|
||||
: safeName;
|
||||
const placeholders = paramValues.map((_, i) => `$${i + 1}`).join(", ");
|
||||
sql =
|
||||
config.callType === "function"
|
||||
? `SELECT * FROM ${qualifiedName}(${placeholders})`
|
||||
: `CALL ${qualifiedName}(${placeholders})`;
|
||||
break;
|
||||
}
|
||||
case "mysql":
|
||||
case "mariadb": {
|
||||
const placeholders = paramValues.map(() => "?").join(", ");
|
||||
sql = `CALL ${safeName}(${placeholders})`;
|
||||
break;
|
||||
}
|
||||
case "mssql": {
|
||||
const paramList = paramValues
|
||||
.map((_, i) => `@p${i + 1}`)
|
||||
.join(", ");
|
||||
sql = `EXEC ${safeName} ${paramList}`;
|
||||
break;
|
||||
}
|
||||
default:
|
||||
throw new Error(`프로시저 실행 미지원 DB: ${dbType}`);
|
||||
}
|
||||
|
||||
try {
|
||||
const result = await executeExternalQuery(connectionId, sql, paramValues);
|
||||
return { success: true, result: result.rows };
|
||||
} catch (error: any) {
|
||||
throw new Error(
|
||||
`외부 프로시저 실행 실패 [${safeName}]: ${error.message}`
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 설정된 파라미터 매핑에서 실제 값을 추출
|
||||
*/
|
||||
private resolveParameters(
|
||||
params: FlowProcedureParam[],
|
||||
recordData: Record<string, any>
|
||||
): any[] {
|
||||
const inParams = params.filter((p) => p.mode === "IN" || p.mode === "INOUT");
|
||||
return inParams.map((param) => {
|
||||
switch (param.source) {
|
||||
case "record_field":
|
||||
if (!param.field) {
|
||||
throw new Error(`파라미터 ${param.name}: 레코드 필드가 지정되지 않았습니다`);
|
||||
}
|
||||
return recordData[param.field] ?? null;
|
||||
|
||||
case "static":
|
||||
return param.value ?? null;
|
||||
|
||||
case "step_variable":
|
||||
return recordData[param.field || param.name] ?? param.value ?? null;
|
||||
|
||||
default:
|
||||
return null;
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* 이름(스키마/프로시저) SQL Injection 방지용 검증
|
||||
*/
|
||||
private sanitizeName(name: string): string {
|
||||
if (!/^[a-zA-Z0-9_]+$/.test(name)) {
|
||||
throw new Error(`유효하지 않은 이름: ${name}`);
|
||||
}
|
||||
return name;
|
||||
}
|
||||
|
||||
/**
|
||||
* 파라미터 모드 정규화
|
||||
*/
|
||||
private normalizeParamMode(mode: string | null): "IN" | "OUT" | "INOUT" {
|
||||
if (!mode) return "IN";
|
||||
const upper = mode.toUpperCase();
|
||||
if (upper === "OUT") return "OUT";
|
||||
if (upper === "INOUT") return "INOUT";
|
||||
return "IN";
|
||||
}
|
||||
}
|
||||
@@ -11,6 +11,7 @@
|
||||
import { query, queryOne, transaction } from "../database/db";
|
||||
import { logger } from "../utils/logger";
|
||||
import axios from "axios";
|
||||
import { FlowProcedureService } from "./flowProcedureService";
|
||||
|
||||
// ===== 타입 정의 =====
|
||||
|
||||
@@ -36,6 +37,7 @@ export type NodeType =
|
||||
| "emailAction" // 이메일 발송 액션
|
||||
| "scriptAction" // 스크립트 실행 액션
|
||||
| "httpRequestAction" // HTTP 요청 액션
|
||||
| "procedureCallAction" // 프로시저/함수 호출 액션
|
||||
| "comment"
|
||||
| "log";
|
||||
|
||||
@@ -663,6 +665,9 @@ export class NodeFlowExecutionService {
|
||||
case "httpRequestAction":
|
||||
return this.executeHttpRequestAction(node, inputData, context);
|
||||
|
||||
case "procedureCallAction":
|
||||
return this.executeProcedureCallAction(node, inputData, context, client);
|
||||
|
||||
case "comment":
|
||||
case "log":
|
||||
// 로그/코멘트는 실행 없이 통과
|
||||
@@ -4856,4 +4861,105 @@ export class NodeFlowExecutionService {
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 프로시저/함수 호출 액션 노드 실행
|
||||
*/
|
||||
private static async executeProcedureCallAction(
|
||||
node: FlowNode,
|
||||
inputData: any,
|
||||
context: ExecutionContext,
|
||||
client?: any
|
||||
): Promise<any> {
|
||||
const {
|
||||
dbSource = "internal",
|
||||
connectionId,
|
||||
procedureName,
|
||||
procedureSchema = "public",
|
||||
callType = "function",
|
||||
parameters = [],
|
||||
} = node.data;
|
||||
|
||||
logger.info(
|
||||
`🔧 프로시저 호출 노드 실행: ${node.data.displayName || node.id}`
|
||||
);
|
||||
logger.info(
|
||||
` 프로시저: ${procedureSchema}.${procedureName} (${callType}), DB: ${dbSource}`
|
||||
);
|
||||
|
||||
if (!procedureName) {
|
||||
throw new Error("프로시저/함수가 선택되지 않았습니다.");
|
||||
}
|
||||
|
||||
const dataArray = Array.isArray(inputData)
|
||||
? inputData
|
||||
: inputData
|
||||
? [inputData]
|
||||
: [{}];
|
||||
|
||||
const procedureService = new FlowProcedureService();
|
||||
const results: any[] = [];
|
||||
|
||||
const config = {
|
||||
type: "procedure" as const,
|
||||
dbSource: dbSource as "internal" | "external",
|
||||
connectionId,
|
||||
procedureName,
|
||||
procedureSchema,
|
||||
callType: callType as "procedure" | "function",
|
||||
parameters: parameters.map((p: any) => ({
|
||||
name: p.name,
|
||||
dataType: p.dataType,
|
||||
mode: p.mode || "IN",
|
||||
source: p.source || "static",
|
||||
field: p.field,
|
||||
value: p.value,
|
||||
})),
|
||||
};
|
||||
|
||||
for (const record of dataArray) {
|
||||
try {
|
||||
logger.info(` 입력 레코드 키: ${Object.keys(record).join(", ")}`);
|
||||
|
||||
const execResult = await procedureService.executeProcedure(
|
||||
config,
|
||||
record,
|
||||
dbSource === "internal" ? client : undefined
|
||||
);
|
||||
|
||||
logger.info(` ✅ 프로시저 실행 성공: ${procedureName}`);
|
||||
|
||||
// 프로시저 반환값을 레코드에 평탄화하여 다음 노드에서 필드로 참조 가능하게 함
|
||||
let flatResult: Record<string, any> = {};
|
||||
if (Array.isArray(execResult.result) && execResult.result.length > 0) {
|
||||
const row = execResult.result[0];
|
||||
for (const [key, val] of Object.entries(row)) {
|
||||
// 함수명과 동일한 키(SELECT fn() 결과)는 _procedureReturn으로 매핑
|
||||
if (key === procedureName) {
|
||||
flatResult["_procedureReturn"] = val;
|
||||
} else {
|
||||
flatResult[key] = val;
|
||||
}
|
||||
}
|
||||
logger.info(` 반환 필드: ${Object.keys(flatResult).join(", ")}`);
|
||||
}
|
||||
|
||||
results.push({
|
||||
...record,
|
||||
...flatResult,
|
||||
_procedureResult: execResult.result,
|
||||
_procedureSuccess: true,
|
||||
});
|
||||
} catch (error: any) {
|
||||
logger.error(` ❌ 프로시저 실행 실패: ${error.message}`);
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
|
||||
logger.info(
|
||||
`🔧 프로시저 호출 완료: ${results.length}건 처리`
|
||||
);
|
||||
|
||||
return results;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -278,6 +278,7 @@ export interface SqlWhereResult {
|
||||
export type FlowIntegrationType =
|
||||
| "internal" // 내부 DB (기본값)
|
||||
| "external_db" // 외부 DB
|
||||
| "procedure" // 프로시저/함수 호출
|
||||
| "rest_api" // REST API (추후 구현)
|
||||
| "webhook" // Webhook (추후 구현)
|
||||
| "hybrid"; // 복합 연동 (추후 구현)
|
||||
@@ -341,8 +342,48 @@ export interface FlowExternalDbIntegrationConfig {
|
||||
customQuery?: string; // operation이 'custom'인 경우 사용
|
||||
}
|
||||
|
||||
// 프로시저 호출 파라미터 정의
|
||||
export interface FlowProcedureParam {
|
||||
name: string;
|
||||
dataType: string;
|
||||
mode: "IN" | "OUT" | "INOUT";
|
||||
source: "record_field" | "static" | "step_variable";
|
||||
field?: string; // source가 record_field인 경우: 레코드 컬럼명
|
||||
value?: string; // source가 static인 경우: 고정값
|
||||
}
|
||||
|
||||
// 프로시저 호출 설정 (integration_config JSON)
|
||||
export interface FlowProcedureConfig {
|
||||
type: "procedure";
|
||||
dbSource: "internal" | "external";
|
||||
connectionId?: number; // 외부 DB인 경우 external_db_connections.id
|
||||
procedureName: string;
|
||||
procedureSchema?: string; // 스키마명 (기본: public)
|
||||
callType: "procedure" | "function"; // CALL vs SELECT
|
||||
parameters: FlowProcedureParam[];
|
||||
}
|
||||
|
||||
// 프로시저/함수 목록 항목
|
||||
export interface ProcedureListItem {
|
||||
name: string;
|
||||
schema: string;
|
||||
type: "PROCEDURE" | "FUNCTION";
|
||||
returnType?: string;
|
||||
}
|
||||
|
||||
// 프로시저 파라미터 정보
|
||||
export interface ProcedureParameterInfo {
|
||||
name: string;
|
||||
position: number;
|
||||
dataType: string;
|
||||
mode: "IN" | "OUT" | "INOUT";
|
||||
defaultValue?: string;
|
||||
}
|
||||
|
||||
// 연동 설정 통합 타입
|
||||
export type FlowIntegrationConfig = FlowExternalDbIntegrationConfig; // 나중에 다른 타입 추가
|
||||
export type FlowIntegrationConfig =
|
||||
| FlowExternalDbIntegrationConfig
|
||||
| FlowProcedureConfig;
|
||||
|
||||
// 연동 실행 컨텍스트
|
||||
export interface FlowIntegrationContext {
|
||||
|
||||
Reference in New Issue
Block a user