Files
vexplor/backend-node/src/services/flowProcedureService.ts
kjs f697e1e897 feat: Add procedure and function management in flow controller
- Introduced new endpoints in FlowController for listing procedures and retrieving procedure parameters, enhancing the flow management capabilities.
- Updated FlowDataMoveService to support procedure calls during data movement, ensuring seamless integration with external and internal databases.
- Enhanced NodeFlowExecutionService to execute procedure call actions, allowing for dynamic execution of stored procedures within flow nodes.
- Updated frontend components to support procedure selection and parameter management, improving user experience in configuring flow steps.
- Added necessary types and API functions for handling procedure-related data, ensuring type safety and clarity in implementation.
2026-03-03 14:33:17 +09:00

430 lines
13 KiB
TypeScript

/**
* 플로우 프로시저 호출 서비스
* 내부/외부 DB의 프로시저/함수 목록 조회, 파라미터 조회, 실행을 담당
*/
import db from "../database/db";
import {
getExternalPool,
executeExternalQuery,
} from "./externalDbHelper";
import { getPlaceholder } from "./dbQueryBuilder";
import {
FlowProcedureConfig,
FlowProcedureParam,
ProcedureListItem,
ProcedureParameterInfo,
} from "../types/flow";
export class FlowProcedureService {
/**
* 프로시저/함수 목록 조회
* information_schema.routines에서 사용 가능한 프로시저/함수를 가져온다
*/
async listProcedures(
dbSource: "internal" | "external",
connectionId?: number,
schema?: string
): Promise<ProcedureListItem[]> {
if (dbSource === "external" && connectionId) {
return this.listExternalProcedures(connectionId, schema);
}
return this.listInternalProcedures(schema);
}
private async listInternalProcedures(
schema?: string
): Promise<ProcedureListItem[]> {
const targetSchema = schema || "public";
// 트리거 함수(data_type='trigger')는 직접 호출 대상이 아니므로 제외
const query = `
SELECT
routine_name AS name,
routine_schema AS schema,
routine_type AS type,
data_type AS return_type
FROM information_schema.routines
WHERE routine_schema = $1
AND routine_type IN ('PROCEDURE', 'FUNCTION')
AND data_type != 'trigger'
ORDER BY routine_type, routine_name
`;
const rows = await db.query(query, [targetSchema]);
return rows.map((r: any) => ({
name: r.name,
schema: r.schema,
type: r.type as "PROCEDURE" | "FUNCTION",
returnType: r.return_type || undefined,
}));
}
private async listExternalProcedures(
connectionId: number,
schema?: string
): Promise<ProcedureListItem[]> {
const poolInfo = await getExternalPool(connectionId);
const dbType = poolInfo.dbType.toLowerCase();
let query: string;
let params: any[];
switch (dbType) {
case "postgresql": {
const targetSchema = schema || "public";
query = `
SELECT
routine_name AS name,
routine_schema AS schema,
routine_type AS type,
data_type AS return_type
FROM information_schema.routines
WHERE routine_schema = $1
AND routine_type IN ('PROCEDURE', 'FUNCTION')
AND data_type != 'trigger'
ORDER BY routine_type, routine_name
`;
params = [targetSchema];
break;
}
case "mysql":
case "mariadb": {
query = `
SELECT
ROUTINE_NAME AS name,
ROUTINE_SCHEMA AS \`schema\`,
ROUTINE_TYPE AS type,
DATA_TYPE AS return_type
FROM information_schema.ROUTINES
WHERE ROUTINE_SCHEMA = DATABASE()
AND ROUTINE_TYPE IN ('PROCEDURE', 'FUNCTION')
ORDER BY ROUTINE_TYPE, ROUTINE_NAME
`;
params = [];
break;
}
case "mssql": {
query = `
SELECT
ROUTINE_NAME AS name,
ROUTINE_SCHEMA AS [schema],
ROUTINE_TYPE AS type,
DATA_TYPE AS return_type
FROM INFORMATION_SCHEMA.ROUTINES
WHERE ROUTINE_TYPE IN ('PROCEDURE', 'FUNCTION')
ORDER BY ROUTINE_TYPE, ROUTINE_NAME
`;
params = [];
break;
}
default:
throw new Error(`프로시저 목록 조회 미지원 DB: ${dbType}`);
}
const result = await executeExternalQuery(connectionId, query, params);
return (result.rows || []).map((r: any) => ({
name: r.name || r.NAME,
schema: r.schema || r.SCHEMA || "",
type: (r.type || r.TYPE || "FUNCTION").toUpperCase() as "PROCEDURE" | "FUNCTION",
returnType: r.return_type || r.RETURN_TYPE || undefined,
}));
}
/**
* 프로시저/함수 파라미터 정보 조회
*/
async getProcedureParameters(
procedureName: string,
dbSource: "internal" | "external",
connectionId?: number,
schema?: string
): Promise<ProcedureParameterInfo[]> {
if (dbSource === "external" && connectionId) {
return this.getExternalProcedureParameters(
connectionId,
procedureName,
schema
);
}
return this.getInternalProcedureParameters(procedureName, schema);
}
private async getInternalProcedureParameters(
procedureName: string,
schema?: string
): Promise<ProcedureParameterInfo[]> {
const targetSchema = schema || "public";
// PostgreSQL의 specific_name은 routine_name + OID 형태이므로 서브쿼리로 매칭
const query = `
SELECT
p.parameter_name AS name,
p.ordinal_position AS position,
p.data_type,
p.parameter_mode AS mode,
p.parameter_default AS default_value
FROM information_schema.parameters p
WHERE p.specific_schema = $1
AND p.specific_name IN (
SELECT r.specific_name FROM information_schema.routines r
WHERE r.routine_schema = $1 AND r.routine_name = $2
LIMIT 1
)
AND p.parameter_name IS NOT NULL
ORDER BY p.ordinal_position
`;
const rows = await db.query(query, [targetSchema, procedureName]);
return rows.map((r: any) => ({
name: r.name,
position: parseInt(r.position, 10),
dataType: r.data_type,
mode: this.normalizeParamMode(r.mode),
defaultValue: r.default_value || undefined,
}));
}
private async getExternalProcedureParameters(
connectionId: number,
procedureName: string,
schema?: string
): Promise<ProcedureParameterInfo[]> {
const poolInfo = await getExternalPool(connectionId);
const dbType = poolInfo.dbType.toLowerCase();
let query: string;
let params: any[];
switch (dbType) {
case "postgresql": {
const targetSchema = schema || "public";
query = `
SELECT
p.parameter_name AS name,
p.ordinal_position AS position,
p.data_type,
p.parameter_mode AS mode,
p.parameter_default AS default_value
FROM information_schema.parameters p
WHERE p.specific_schema = $1
AND p.specific_name IN (
SELECT r.specific_name FROM information_schema.routines r
WHERE r.routine_schema = $1 AND r.routine_name = $2
LIMIT 1
)
AND p.parameter_name IS NOT NULL
ORDER BY p.ordinal_position
`;
params = [targetSchema, procedureName];
break;
}
case "mysql":
case "mariadb": {
query = `
SELECT
PARAMETER_NAME AS name,
ORDINAL_POSITION AS position,
DATA_TYPE AS data_type,
PARAMETER_MODE AS mode,
'' AS default_value
FROM information_schema.PARAMETERS
WHERE SPECIFIC_SCHEMA = DATABASE()
AND SPECIFIC_NAME = ?
AND PARAMETER_NAME IS NOT NULL
ORDER BY ORDINAL_POSITION
`;
params = [procedureName];
break;
}
case "mssql": {
query = `
SELECT
PARAMETER_NAME AS name,
ORDINAL_POSITION AS position,
DATA_TYPE AS data_type,
PARAMETER_MODE AS mode,
'' AS default_value
FROM INFORMATION_SCHEMA.PARAMETERS
WHERE SPECIFIC_NAME = @p1
AND PARAMETER_NAME IS NOT NULL
ORDER BY ORDINAL_POSITION
`;
params = [procedureName];
break;
}
default:
throw new Error(`파라미터 조회 미지원 DB: ${dbType}`);
}
const result = await executeExternalQuery(connectionId, query, params);
return (result.rows || []).map((r: any) => ({
name: (r.name || r.NAME || "").replace(/^@/, ""),
position: parseInt(r.position || r.POSITION || "0", 10),
dataType: r.data_type || r.DATA_TYPE || "unknown",
mode: this.normalizeParamMode(r.mode || r.MODE),
defaultValue: r.default_value || r.DEFAULT_VALUE || undefined,
}));
}
/**
* 프로시저/함수 실행
* 내부 DB는 기존 트랜잭션 client를 사용, 외부 DB는 별도 연결
*/
async executeProcedure(
config: FlowProcedureConfig,
recordData: Record<string, any>,
client?: any
): Promise<{ success: boolean; result?: any; error?: string }> {
const paramValues = this.resolveParameters(config.parameters, recordData);
if (config.dbSource === "internal") {
return this.executeInternalProcedure(config, paramValues, client);
}
if (!config.connectionId) {
throw new Error("외부 DB 프로시저 호출에 connectionId가 필요합니다");
}
return this.executeExternalProcedure(config, paramValues);
}
/**
* 내부 DB 프로시저 실행 (트랜잭션 client 공유)
*/
private async executeInternalProcedure(
config: FlowProcedureConfig,
paramValues: any[],
client?: any
): Promise<{ success: boolean; result?: any; error?: string }> {
const schema = config.procedureSchema || "public";
const safeName = this.sanitizeName(config.procedureName);
const safeSchema = this.sanitizeName(schema);
const qualifiedName = `${safeSchema}.${safeName}`;
const placeholders = paramValues.map((_, i) => `$${i + 1}`).join(", ");
let sql: string;
if (config.callType === "function") {
// SELECT * FROM fn()을 사용하여 OUT 파라미터를 개별 컬럼으로 반환
sql = `SELECT * FROM ${qualifiedName}(${placeholders})`;
} else {
sql = `CALL ${qualifiedName}(${placeholders})`;
}
try {
const executor = client || db;
const result = client
? await client.query(sql, paramValues)
: await db.query(sql, paramValues);
const rows = client ? result.rows : result;
return { success: true, result: rows };
} catch (error: any) {
throw new Error(
`프로시저 실행 실패 [${qualifiedName}]: ${error.message}`
);
}
}
/**
* 외부 DB 프로시저 실행
*/
private async executeExternalProcedure(
config: FlowProcedureConfig,
paramValues: any[]
): Promise<{ success: boolean; result?: any; error?: string }> {
const connectionId = config.connectionId!;
const poolInfo = await getExternalPool(connectionId);
const dbType = poolInfo.dbType.toLowerCase();
const safeName = this.sanitizeName(config.procedureName);
const safeSchema = config.procedureSchema
? this.sanitizeName(config.procedureSchema)
: null;
let sql: string;
switch (dbType) {
case "postgresql": {
const qualifiedName = safeSchema
? `${safeSchema}.${safeName}`
: safeName;
const placeholders = paramValues.map((_, i) => `$${i + 1}`).join(", ");
sql =
config.callType === "function"
? `SELECT * FROM ${qualifiedName}(${placeholders})`
: `CALL ${qualifiedName}(${placeholders})`;
break;
}
case "mysql":
case "mariadb": {
const placeholders = paramValues.map(() => "?").join(", ");
sql = `CALL ${safeName}(${placeholders})`;
break;
}
case "mssql": {
const paramList = paramValues
.map((_, i) => `@p${i + 1}`)
.join(", ");
sql = `EXEC ${safeName} ${paramList}`;
break;
}
default:
throw new Error(`프로시저 실행 미지원 DB: ${dbType}`);
}
try {
const result = await executeExternalQuery(connectionId, sql, paramValues);
return { success: true, result: result.rows };
} catch (error: any) {
throw new Error(
`외부 프로시저 실행 실패 [${safeName}]: ${error.message}`
);
}
}
/**
* 설정된 파라미터 매핑에서 실제 값을 추출
*/
private resolveParameters(
params: FlowProcedureParam[],
recordData: Record<string, any>
): any[] {
const inParams = params.filter((p) => p.mode === "IN" || p.mode === "INOUT");
return inParams.map((param) => {
switch (param.source) {
case "record_field":
if (!param.field) {
throw new Error(`파라미터 ${param.name}: 레코드 필드가 지정되지 않았습니다`);
}
return recordData[param.field] ?? null;
case "static":
return param.value ?? null;
case "step_variable":
return recordData[param.field || param.name] ?? param.value ?? null;
default:
return null;
}
});
}
/**
* 이름(스키마/프로시저) SQL Injection 방지용 검증
*/
private sanitizeName(name: string): string {
if (!/^[a-zA-Z0-9_]+$/.test(name)) {
throw new Error(`유효하지 않은 이름: ${name}`);
}
return name;
}
/**
* 파라미터 모드 정규화
*/
private normalizeParamMode(mode: string | null): "IN" | "OUT" | "INOUT" {
if (!mode) return "IN";
const upper = mode.toUpperCase();
if (upper === "OUT") return "OUT";
if (upper === "INOUT") return "INOUT";
return "IN";
}
}