Merge branch 'gbpark-node' of http://39.117.244.52:3000/kjs/ERP-node into jskim-node

This commit is contained in:
kjs
2026-03-19 15:09:05 +09:00
47 changed files with 5340 additions and 3178 deletions

View File

@@ -126,29 +126,41 @@ export class BatchManagementController {
*/
static async createBatchConfig(req: AuthenticatedRequest, res: Response) {
try {
const { batchName, description, cronSchedule, mappings, isActive } =
req.body;
const {
batchName, description, cronSchedule, mappings, isActive,
executionType, nodeFlowId, nodeFlowContext,
} = req.body;
const companyCode = req.user?.companyCode;
if (
!batchName ||
!cronSchedule ||
!mappings ||
!Array.isArray(mappings)
) {
if (!batchName || !cronSchedule) {
return res.status(400).json({
success: false,
message:
"필수 필드가 누락되었습니다. (batchName, cronSchedule, mappings)",
message: "필수 필드가 누락되었습니다. (batchName, cronSchedule)",
});
}
const batchConfig = await BatchService.createBatchConfig({
batchName,
description,
cronSchedule,
mappings,
isActive: isActive !== undefined ? isActive : true,
} as CreateBatchConfigRequest);
// 노드 플로우 타입은 매핑 없이 생성 가능
if (executionType !== "node_flow" && (!mappings || !Array.isArray(mappings))) {
return res.status(400).json({
success: false,
message: "매핑 타입은 mappings 배열이 필요합니다.",
});
}
const batchConfig = await BatchService.createBatchConfig(
{
batchName,
description,
cronSchedule,
mappings: mappings || [],
isActive: isActive === false || isActive === "N" ? "N" : "Y",
companyCode: companyCode || "",
executionType: executionType || "mapping",
nodeFlowId: nodeFlowId || null,
nodeFlowContext: nodeFlowContext || null,
} as CreateBatchConfigRequest,
req.user?.userId
);
return res.status(201).json({
success: true,
@@ -768,4 +780,287 @@ export class BatchManagementController {
});
}
}
/**
* 노드 플로우 목록 조회 (배치 설정에서 플로우 선택용)
* GET /api/batch-management/node-flows
*/
static async getNodeFlows(req: AuthenticatedRequest, res: Response) {
try {
const companyCode = req.user?.companyCode;
let flowQuery: string;
let flowParams: any[] = [];
if (companyCode === "*") {
flowQuery = `
SELECT flow_id, flow_name, flow_description AS description, company_code,
COALESCE(jsonb_array_length(
CASE WHEN flow_data IS NOT NULL AND flow_data::text != ''
THEN (flow_data::jsonb -> 'nodes')
ELSE '[]'::jsonb END
), 0) AS node_count
FROM node_flows
ORDER BY flow_name
`;
} else {
flowQuery = `
SELECT flow_id, flow_name, flow_description AS description, company_code,
COALESCE(jsonb_array_length(
CASE WHEN flow_data IS NOT NULL AND flow_data::text != ''
THEN (flow_data::jsonb -> 'nodes')
ELSE '[]'::jsonb END
), 0) AS node_count
FROM node_flows
WHERE company_code = $1
ORDER BY flow_name
`;
flowParams = [companyCode];
}
const result = await query(flowQuery, flowParams);
return res.json({ success: true, data: result });
} catch (error) {
console.error("노드 플로우 목록 조회 오류:", error);
return res.status(500).json({
success: false,
message: "노드 플로우 목록 조회 실패",
error: error instanceof Error ? error.message : "알 수 없는 오류",
});
}
}
/**
* 배치 대시보드 통계 조회
* GET /api/batch-management/stats
* totalBatches, activeBatches, todayExecutions, todayFailures, prevDayExecutions, prevDayFailures
* 멀티테넌시: company_code 필터링 필수
*/
static async getBatchStats(req: AuthenticatedRequest, res: Response) {
try {
const companyCode = req.user?.companyCode;
// 전체/활성 배치 수
let configQuery: string;
let configParams: any[] = [];
if (companyCode === "*") {
configQuery = `
SELECT
COUNT(*)::int AS total,
COUNT(*) FILTER (WHERE is_active = 'Y')::int AS active
FROM batch_configs
`;
} else {
configQuery = `
SELECT
COUNT(*)::int AS total,
COUNT(*) FILTER (WHERE is_active = 'Y')::int AS active
FROM batch_configs
WHERE company_code = $1
`;
configParams = [companyCode];
}
const configResult = await query<{ total: number; active: number }>(
configQuery,
configParams
);
// 오늘/어제 실행·실패 수 (KST 기준 날짜)
const logParams: any[] = [];
let logWhere = "";
if (companyCode && companyCode !== "*") {
logWhere = " AND company_code = $1";
logParams.push(companyCode);
}
const todayLogQuery = `
SELECT
COUNT(*)::int AS today_executions,
COUNT(*) FILTER (WHERE execution_status = 'FAILED')::int AS today_failures
FROM batch_execution_logs
WHERE (start_time AT TIME ZONE 'Asia/Seoul')::date = (NOW() AT TIME ZONE 'Asia/Seoul')::date
${logWhere}
`;
const prevDayLogQuery = `
SELECT
COUNT(*)::int AS prev_executions,
COUNT(*) FILTER (WHERE execution_status = 'FAILED')::int AS prev_failures
FROM batch_execution_logs
WHERE (start_time AT TIME ZONE 'Asia/Seoul')::date = (NOW() AT TIME ZONE 'Asia/Seoul')::date - INTERVAL '1 day'
${logWhere}
`;
const [todayResult, prevResult] = await Promise.all([
query<{ today_executions: number; today_failures: number }>(
todayLogQuery,
logParams
),
query<{ prev_executions: number; prev_failures: number }>(
prevDayLogQuery,
logParams
),
]);
const config = configResult[0];
const today = todayResult[0];
const prev = prevResult[0];
return res.json({
success: true,
data: {
totalBatches: config?.total ?? 0,
activeBatches: config?.active ?? 0,
todayExecutions: today?.today_executions ?? 0,
todayFailures: today?.today_failures ?? 0,
prevDayExecutions: prev?.prev_executions ?? 0,
prevDayFailures: prev?.prev_failures ?? 0,
},
});
} catch (error) {
console.error("배치 통계 조회 오류:", error);
return res.status(500).json({
success: false,
message: "배치 통계 조회 실패",
error: error instanceof Error ? error.message : "알 수 없는 오류",
});
}
}
/**
* 배치별 최근 24시간 스파크라인 (1시간 단위 집계)
* GET /api/batch-management/batch-configs/:id/sparkline
* 멀티테넌시: company_code 필터링 필수
*/
static async getBatchSparkline(req: AuthenticatedRequest, res: Response) {
try {
const { id } = req.params;
const companyCode = req.user?.companyCode;
const batchId = Number(id);
if (!id || isNaN(batchId)) {
return res.status(400).json({
success: false,
message: "올바른 배치 ID를 제공해주세요.",
});
}
const params: any[] = [batchId];
let companyFilter = "";
if (companyCode && companyCode !== "*") {
companyFilter = " AND bel.company_code = $2";
params.push(companyCode);
}
// KST 기준 최근 24시간 1시간 단위 슬롯 + 집계 (generate_series로 24개 보장)
const sparklineQuery = `
WITH kst_slots AS (
SELECT to_char(s, 'YYYY-MM-DD"T"HH24:00:00') AS hour
FROM generate_series(
(NOW() AT TIME ZONE 'Asia/Seoul') - INTERVAL '23 hours',
(NOW() AT TIME ZONE 'Asia/Seoul'),
INTERVAL '1 hour'
) AS s
),
agg AS (
SELECT
to_char(date_trunc('hour', (bel.start_time AT TIME ZONE 'Asia/Seoul')) AT TIME ZONE 'Asia/Seoul', 'YYYY-MM-DD"T"HH24:00:00') AS hour,
COUNT(*) FILTER (WHERE bel.execution_status = 'SUCCESS')::int AS success,
COUNT(*) FILTER (WHERE bel.execution_status = 'FAILED')::int AS failed
FROM batch_execution_logs bel
WHERE bel.batch_config_id = $1
AND bel.start_time >= (NOW() AT TIME ZONE 'Asia/Seoul') - INTERVAL '24 hours'
${companyFilter}
GROUP BY date_trunc('hour', (bel.start_time AT TIME ZONE 'Asia/Seoul'))
)
SELECT
k.hour,
COALESCE(a.success, 0) AS success,
COALESCE(a.failed, 0) AS failed
FROM kst_slots k
LEFT JOIN agg a ON k.hour = a.hour
ORDER BY k.hour
`;
const data = await query<{
hour: string;
success: number;
failed: number;
}>(sparklineQuery, params);
return res.json({ success: true, data });
} catch (error) {
console.error("스파크라인 조회 오류:", error);
return res.status(500).json({
success: false,
message: "스파크라인 데이터 조회 실패",
error: error instanceof Error ? error.message : "알 수 없는 오류",
});
}
}
/**
* 배치별 최근 실행 로그 (최대 20건)
* GET /api/batch-management/batch-configs/:id/recent-logs
* 멀티테넌시: company_code 필터링 필수
*/
static async getBatchRecentLogs(req: AuthenticatedRequest, res: Response) {
try {
const { id } = req.params;
const companyCode = req.user?.companyCode;
const batchId = Number(id);
const limit = Math.min(Number(req.query.limit) || 20, 20);
if (!id || isNaN(batchId)) {
return res.status(400).json({
success: false,
message: "올바른 배치 ID를 제공해주세요.",
});
}
let logsQuery: string;
let logsParams: any[];
if (companyCode === "*") {
logsQuery = `
SELECT
id,
start_time AS started_at,
end_time AS finished_at,
execution_status AS status,
total_records,
success_records,
failed_records,
error_message,
duration_ms
FROM batch_execution_logs
WHERE batch_config_id = $1
ORDER BY start_time DESC
LIMIT $2
`;
logsParams = [batchId, limit];
} else {
logsQuery = `
SELECT
id,
start_time AS started_at,
end_time AS finished_at,
execution_status AS status,
total_records,
success_records,
failed_records,
error_message,
duration_ms
FROM batch_execution_logs
WHERE batch_config_id = $1 AND company_code = $2
ORDER BY start_time DESC
LIMIT $3
`;
logsParams = [batchId, companyCode, limit];
}
const result = await query(logsQuery, logsParams);
return res.json({ success: true, data: result });
} catch (error) {
console.error("최근 실행 이력 조회 오류:", error);
return res.status(500).json({
success: false,
message: "최근 실행 이력 조회 실패",
error: error instanceof Error ? error.message : "알 수 없는 오류",
});
}
}
}

View File

@@ -7,6 +7,19 @@ import { authenticateToken } from "../middleware/authMiddleware";
const router = Router();
/**
* GET /api/batch-management/stats
* 배치 대시보드 통계 (전체/활성 배치 수, 오늘·어제 실행/실패 수)
* 반드시 /batch-configs 보다 위에 등록 (/:id로 잡히지 않도록)
*/
router.get("/stats", authenticateToken, BatchManagementController.getBatchStats);
/**
* GET /api/batch-management/node-flows
* 배치 설정에서 노드 플로우 선택용 목록 조회
*/
router.get("/node-flows", authenticateToken, BatchManagementController.getNodeFlows);
/**
* GET /api/batch-management/connections
* 사용 가능한 커넥션 목록 조회
@@ -55,6 +68,18 @@ router.get("/batch-configs", authenticateToken, BatchManagementController.getBat
*/
router.get("/batch-configs/:id", authenticateToken, BatchManagementController.getBatchConfigById);
/**
* GET /api/batch-management/batch-configs/:id/sparkline
* 해당 배치 최근 24시간 1시간 단위 실행 집계
*/
router.get("/batch-configs/:id/sparkline", authenticateToken, BatchManagementController.getBatchSparkline);
/**
* GET /api/batch-management/batch-configs/:id/recent-logs
* 해당 배치 최근 실행 로그 (최대 20건)
*/
router.get("/batch-configs/:id/recent-logs", authenticateToken, BatchManagementController.getBatchRecentLogs);
/**
* PUT /api/batch-management/batch-configs/:id
* 배치 설정 업데이트

View File

@@ -13,7 +13,54 @@ import { auditLogService, getClientIp } from "../../services/auditLogService";
const router = Router();
/**
* 플로우 목록 조회
* flow_data에서 요약 정보 추출
*/
function extractFlowSummary(flowData: any) {
try {
const parsed = typeof flowData === "string" ? JSON.parse(flowData) : flowData;
const nodes = parsed?.nodes || [];
const edges = parsed?.edges || [];
const nodeTypes: Record<string, number> = {};
nodes.forEach((n: any) => {
const t = n.type || "unknown";
nodeTypes[t] = (nodeTypes[t] || 0) + 1;
});
// 미니 토폴로지용 간소화된 좌표 (0~1 정규화)
let topology = null;
if (nodes.length > 0) {
const xs = nodes.map((n: any) => n.position?.x || 0);
const ys = nodes.map((n: any) => n.position?.y || 0);
const minX = Math.min(...xs), maxX = Math.max(...xs);
const minY = Math.min(...ys), maxY = Math.max(...ys);
const rangeX = maxX - minX || 1;
const rangeY = maxY - minY || 1;
topology = {
nodes: nodes.map((n: any) => ({
id: n.id,
type: n.type,
x: (((n.position?.x || 0) - minX) / rangeX),
y: (((n.position?.y || 0) - minY) / rangeY),
})),
edges: edges.map((e: any) => [e.source, e.target]),
};
}
return {
nodeCount: nodes.length,
edgeCount: edges.length,
nodeTypes,
topology,
};
} catch {
return { nodeCount: 0, edgeCount: 0, nodeTypes: {}, topology: null };
}
}
/**
* 플로우 목록 조회 (summary 포함)
*/
router.get("/", async (req: AuthenticatedRequest, res: Response) => {
try {
@@ -24,6 +71,7 @@ router.get("/", async (req: AuthenticatedRequest, res: Response) => {
flow_id as "flowId",
flow_name as "flowName",
flow_description as "flowDescription",
flow_data as "flowData",
company_code as "companyCode",
created_at as "createdAt",
updated_at as "updatedAt"
@@ -32,7 +80,6 @@ router.get("/", async (req: AuthenticatedRequest, res: Response) => {
const params: any[] = [];
// 슈퍼 관리자가 아니면 회사별 필터링
if (userCompanyCode && userCompanyCode !== "*") {
sqlQuery += ` WHERE company_code = $1`;
params.push(userCompanyCode);
@@ -42,9 +89,15 @@ router.get("/", async (req: AuthenticatedRequest, res: Response) => {
const flows = await query(sqlQuery, params);
const flowsWithSummary = flows.map((flow: any) => {
const summary = extractFlowSummary(flow.flowData);
const { flowData, ...rest } = flow;
return { ...rest, summary };
});
return res.json({
success: true,
data: flows,
data: flowsWithSummary,
});
} catch (error) {
logger.error("플로우 목록 조회 실패:", error);

View File

@@ -122,20 +122,22 @@ export class BatchSchedulerService {
}
/**
* 배치 설정 실행
* 배치 설정 실행 - execution_type에 따라 매핑 또는 노드 플로우 실행
*/
static async executeBatchConfig(config: any) {
const startTime = new Date();
let executionLog: any = null;
try {
logger.info(`배치 실행 시작: ${config.batch_name} (ID: ${config.id})`);
logger.info(`배치 실행 시작: ${config.batch_name} (ID: ${config.id}, type: ${config.execution_type || "mapping"})`);
// 매핑 정보가 없으면 상세 조회로 다시 가져오기
if (!config.batch_mappings || config.batch_mappings.length === 0) {
const fullConfig = await BatchService.getBatchConfigById(config.id);
if (fullConfig.success && fullConfig.data) {
config = fullConfig.data;
// 상세 조회 (매핑 또는 노드플로우 정보가 없을 수 있음)
if (!config.execution_type || config.execution_type === "mapping") {
if (!config.batch_mappings || config.batch_mappings.length === 0) {
const fullConfig = await BatchService.getBatchConfigById(config.id);
if (fullConfig.success && fullConfig.data) {
config = fullConfig.data;
}
}
}
@@ -165,12 +167,17 @@ export class BatchSchedulerService {
executionLog = executionLogResponse.data;
// 실제 배치 실행 로직 (수동 실행과 동일한 로직 사용)
const result = await this.executeBatchMappings(config);
let result: { totalRecords: number; successRecords: number; failedRecords: number };
if (config.execution_type === "node_flow") {
result = await this.executeNodeFlow(config);
} else {
result = await this.executeBatchMappings(config);
}
// 실행 로그 업데이트 (성공)
await BatchExecutionLogService.updateExecutionLog(executionLog.id, {
execution_status: "SUCCESS",
execution_status: result.failedRecords > 0 ? "PARTIAL" : "SUCCESS",
end_time: new Date(),
duration_ms: Date.now() - startTime.getTime(),
total_records: result.totalRecords,
@@ -182,12 +189,10 @@ export class BatchSchedulerService {
`배치 실행 완료: ${config.batch_name} (처리된 레코드: ${result.totalRecords})`
);
// 성공 결과 반환
return result;
} catch (error) {
logger.error(`배치 실행 중 오류 발생: ${config.batch_name}`, error);
// 실행 로그 업데이트 (실패)
if (executionLog) {
await BatchExecutionLogService.updateExecutionLog(executionLog.id, {
execution_status: "FAILED",
@@ -198,7 +203,6 @@ export class BatchSchedulerService {
});
}
// 실패 결과 반환
return {
totalRecords: 0,
successRecords: 0,
@@ -207,6 +211,43 @@ export class BatchSchedulerService {
}
}
/**
* 노드 플로우 실행 - NodeFlowExecutionService에 위임
*/
private static async executeNodeFlow(config: any) {
if (!config.node_flow_id) {
throw new Error("노드 플로우 ID가 설정되지 않았습니다.");
}
const { NodeFlowExecutionService } = await import(
"./nodeFlowExecutionService"
);
const contextData: Record<string, any> = {
companyCode: config.company_code,
batchConfigId: config.id,
batchName: config.batch_name,
executionSource: "batch_scheduler",
...(config.node_flow_context || {}),
};
logger.info(
`노드 플로우 실행: flowId=${config.node_flow_id}, batch=${config.batch_name}`
);
const flowResult = await NodeFlowExecutionService.executeFlow(
config.node_flow_id,
contextData
);
// 노드 플로우 실행 결과를 배치 로그 형식으로 변환
return {
totalRecords: flowResult.summary.total,
successRecords: flowResult.summary.success,
failedRecords: flowResult.summary.failed,
};
}
/**
* 배치 매핑 실행 (수동 실행과 동일한 로직)
*/

View File

@@ -72,9 +72,12 @@ export class BatchService {
const total = parseInt(countResult[0].count);
const totalPages = Math.ceil(total / limit);
// 목록 조회
// 목록 조회 (최근 실행 정보 포함)
const configs = await query<any>(
`SELECT bc.*
`SELECT bc.*,
(SELECT bel.execution_status FROM batch_execution_logs bel WHERE bel.batch_config_id = bc.id ORDER BY bel.start_time DESC LIMIT 1) as last_status,
(SELECT bel.start_time FROM batch_execution_logs bel WHERE bel.batch_config_id = bc.id ORDER BY bel.start_time DESC LIMIT 1) as last_executed_at,
(SELECT bel.total_records FROM batch_execution_logs bel WHERE bel.batch_config_id = bc.id ORDER BY bel.start_time DESC LIMIT 1) as last_total_records
FROM batch_configs bc
${whereClause}
ORDER BY bc.created_date DESC
@@ -82,9 +85,6 @@ export class BatchService {
[...values, limit, offset]
);
// 매핑 정보 조회 (N+1 문제 해결을 위해 별도 쿼리 대신 여기서는 생략하고 상세 조회에서 처리)
// 하지만 목록에서도 간단한 정보는 필요할 수 있음
return {
success: true,
data: configs as BatchConfig[],
@@ -176,8 +176,8 @@ export class BatchService {
// 배치 설정 생성
const batchConfigResult = await client.query(
`INSERT INTO batch_configs
(batch_name, description, cron_schedule, is_active, company_code, save_mode, conflict_key, auth_service_name, data_array_path, created_by, created_date, updated_date)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, NOW(), NOW())
(batch_name, description, cron_schedule, is_active, company_code, save_mode, conflict_key, auth_service_name, data_array_path, execution_type, node_flow_id, node_flow_context, created_by, created_date, updated_date)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, NOW(), NOW())
RETURNING *`,
[
data.batchName,
@@ -189,6 +189,9 @@ export class BatchService {
data.conflictKey || null,
data.authServiceName || null,
data.dataArrayPath || null,
data.executionType || "mapping",
data.nodeFlowId || null,
data.nodeFlowContext ? JSON.stringify(data.nodeFlowContext) : null,
userId,
]
);
@@ -332,6 +335,22 @@ export class BatchService {
updateFields.push(`data_array_path = $${paramIndex++}`);
updateValues.push(data.dataArrayPath || null);
}
if (data.executionType !== undefined) {
updateFields.push(`execution_type = $${paramIndex++}`);
updateValues.push(data.executionType);
}
if (data.nodeFlowId !== undefined) {
updateFields.push(`node_flow_id = $${paramIndex++}`);
updateValues.push(data.nodeFlowId || null);
}
if (data.nodeFlowContext !== undefined) {
updateFields.push(`node_flow_context = $${paramIndex++}`);
updateValues.push(
data.nodeFlowContext
? JSON.stringify(data.nodeFlowContext)
: null
);
}
// 배치 설정 업데이트
const batchConfigResult = await client.query(

View File

@@ -79,6 +79,9 @@ export interface BatchMapping {
created_date?: Date;
}
// 배치 실행 타입: 기존 매핑 방식 또는 노드 플로우 실행
export type BatchExecutionType = "mapping" | "node_flow";
// 배치 설정 타입
export interface BatchConfig {
id?: number;
@@ -87,15 +90,21 @@ export interface BatchConfig {
cron_schedule: string;
is_active: "Y" | "N";
company_code?: string;
save_mode?: "INSERT" | "UPSERT"; // 저장 모드 (기본: INSERT)
conflict_key?: string; // UPSERT 시 충돌 기준 컬럼명
auth_service_name?: string; // REST API 인증에 사용할 토큰 서비스명
data_array_path?: string; // REST API 응답에서 데이터 배열 경로 (예: response, data.items)
save_mode?: "INSERT" | "UPSERT";
conflict_key?: string;
auth_service_name?: string;
data_array_path?: string;
execution_type?: BatchExecutionType;
node_flow_id?: number;
node_flow_context?: Record<string, any>;
created_by?: string;
created_date?: Date;
updated_by?: string;
updated_date?: Date;
batch_mappings?: BatchMapping[];
last_status?: string;
last_executed_at?: string;
last_total_records?: number;
}
export interface BatchConnectionInfo {
@@ -149,7 +158,10 @@ export interface CreateBatchConfigRequest {
saveMode?: "INSERT" | "UPSERT";
conflictKey?: string;
authServiceName?: string;
dataArrayPath?: string; // REST API 응답에서 데이터 배열 경로
dataArrayPath?: string;
executionType?: BatchExecutionType;
nodeFlowId?: number;
nodeFlowContext?: Record<string, any>;
mappings: BatchMappingRequest[];
}
@@ -161,7 +173,10 @@ export interface UpdateBatchConfigRequest {
saveMode?: "INSERT" | "UPSERT";
conflictKey?: string;
authServiceName?: string;
dataArrayPath?: string; // REST API 응답에서 데이터 배열 경로
dataArrayPath?: string;
executionType?: BatchExecutionType;
nodeFlowId?: number;
nodeFlowContext?: Record<string, any>;
mappings?: BatchMappingRequest[];
}