[agent-pipeline] pipe-20260318044621-56k5 round-2

This commit is contained in:
DDD1542
2026-03-18 13:56:03 +09:00
parent 8e4791c57a
commit 351e57dd31
5 changed files with 136 additions and 4 deletions

View File

@@ -165,8 +165,20 @@ export class BatchSchedulerService {
executionLog = executionLogResponse.data;
// 실제 배치 실행 로직 (수동 실행과 동일한 로직 사용)
const result = await this.executeBatchMappings(config);
// 실행 유형에 따라 분기: node_flow면 노드 플로우 실행, 아니면 매핑 배치 실행
let result: {
totalRecords: number;
successRecords: number;
failedRecords: number;
};
if (
config.execution_type === "node_flow" &&
config.node_flow_id != null
) {
result = await this.executeNodeFlow(config);
} else {
result = await this.executeBatchMappings(config);
}
// 실행 로그 업데이트 (성공)
await BatchExecutionLogService.updateExecutionLog(executionLog.id, {
@@ -207,6 +219,50 @@ export class BatchSchedulerService {
}
}
/**
* 노드 플로우 실행 (execution_type === 'node_flow'일 때)
* node_flows 테이블의 플로우를 NodeFlowExecutionService로 실행하고 결과를 배치 로그 형식으로 반환
*/
private static async executeNodeFlow(config: any): Promise<{
totalRecords: number;
successRecords: number;
failedRecords: number;
}> {
const { NodeFlowExecutionService } = await import(
"./nodeFlowExecutionService"
);
// 플로우 존재 여부 확인
const flowCheck = await query<{ flow_id: number; flow_name: string }>(
"SELECT flow_id, flow_name FROM node_flows WHERE flow_id = $1",
[config.node_flow_id]
);
if (flowCheck.length === 0) {
throw new Error(
`노드 플로우를 찾을 수 없습니다 (flow_id: ${config.node_flow_id})`
);
}
const contextData: Record<string, any> = {
...(config.node_flow_context || {}),
_batchId: config.id,
_batchName: config.batch_name,
_companyCode: config.company_code,
_executedBy: "batch_system",
};
const flowResult = await NodeFlowExecutionService.executeFlow(
config.node_flow_id,
contextData
);
return {
totalRecords: flowResult.summary.total,
successRecords: flowResult.summary.success,
failedRecords: flowResult.summary.failed,
};
}
/**
* 배치 매핑 실행 (수동 실행과 동일한 로직)
*/

View File

@@ -176,8 +176,8 @@ export class BatchService {
// 배치 설정 생성
const batchConfigResult = await client.query(
`INSERT INTO batch_configs
(batch_name, description, cron_schedule, is_active, company_code, save_mode, conflict_key, auth_service_name, data_array_path, created_by, created_date, updated_date)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, NOW(), NOW())
(batch_name, description, cron_schedule, is_active, company_code, save_mode, conflict_key, auth_service_name, data_array_path, execution_type, node_flow_id, node_flow_context, created_by, created_date, updated_date)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, NOW(), NOW())
RETURNING *`,
[
data.batchName,
@@ -189,6 +189,11 @@ export class BatchService {
data.conflictKey || null,
data.authServiceName || null,
data.dataArrayPath || null,
data.executionType || "mapping",
data.nodeFlowId ?? null,
data.nodeFlowContext != null
? JSON.stringify(data.nodeFlowContext)
: null,
userId,
]
);
@@ -332,6 +337,22 @@ export class BatchService {
updateFields.push(`data_array_path = $${paramIndex++}`);
updateValues.push(data.dataArrayPath || null);
}
if (data.executionType !== undefined) {
updateFields.push(`execution_type = $${paramIndex++}`);
updateValues.push(data.executionType);
}
if (data.nodeFlowId !== undefined) {
updateFields.push(`node_flow_id = $${paramIndex++}`);
updateValues.push(data.nodeFlowId ?? null);
}
if (data.nodeFlowContext !== undefined) {
updateFields.push(`node_flow_context = $${paramIndex++}`);
updateValues.push(
data.nodeFlowContext != null
? JSON.stringify(data.nodeFlowContext)
: null
);
}
// 배치 설정 업데이트
const batchConfigResult = await client.query(