rest api get 파라미터 설정 개발중
This commit is contained in:
@@ -10,19 +10,18 @@ import { logger } from '../utils/logger';
|
||||
export class BatchSchedulerService {
|
||||
private static scheduledTasks: Map<number, cron.ScheduledTask> = new Map();
|
||||
private static isInitialized = false;
|
||||
private static executingBatches: Set<number> = new Set(); // 실행 중인 배치 추적
|
||||
|
||||
/**
|
||||
* 스케줄러 초기화
|
||||
*/
|
||||
static async initialize() {
|
||||
if (this.isInitialized) {
|
||||
logger.info('배치 스케줄러가 이미 초기화되었습니다.');
|
||||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
logger.info('배치 스케줄러 초기화 시작...');
|
||||
|
||||
// 기존 모든 스케줄 정리 (중복 방지)
|
||||
this.clearAllSchedules();
|
||||
|
||||
// 활성화된 배치 설정들을 로드하여 스케줄 등록
|
||||
await this.loadActiveBatchConfigs();
|
||||
|
||||
@@ -34,6 +33,27 @@ export class BatchSchedulerService {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 모든 스케줄 정리
|
||||
*/
|
||||
private static clearAllSchedules() {
|
||||
logger.info(`기존 스케줄 ${this.scheduledTasks.size}개 정리 중...`);
|
||||
|
||||
for (const [id, task] of this.scheduledTasks) {
|
||||
try {
|
||||
task.stop();
|
||||
task.destroy();
|
||||
logger.info(`스케줄 정리 완료: ID ${id}`);
|
||||
} catch (error) {
|
||||
logger.error(`스케줄 정리 실패: ID ${id}`, error);
|
||||
}
|
||||
}
|
||||
|
||||
this.scheduledTasks.clear();
|
||||
this.isInitialized = false;
|
||||
logger.info('모든 스케줄 정리 완료');
|
||||
}
|
||||
|
||||
/**
|
||||
* 활성화된 배치 설정들을 로드하여 스케줄 등록
|
||||
*/
|
||||
@@ -80,8 +100,23 @@ export class BatchSchedulerService {
|
||||
|
||||
// 새로운 스케줄 등록
|
||||
const task = cron.schedule(cron_schedule, async () => {
|
||||
// 중복 실행 방지 체크
|
||||
if (this.executingBatches.has(id)) {
|
||||
logger.warn(`⚠️ 배치가 이미 실행 중입니다. 건너뜀: ${batch_name} (ID: ${id})`);
|
||||
return;
|
||||
}
|
||||
|
||||
logger.info(`🔄 스케줄 배치 실행 시작: ${batch_name} (ID: ${id})`);
|
||||
await this.executeBatchConfig(config);
|
||||
|
||||
// 실행 중 플래그 설정
|
||||
this.executingBatches.add(id);
|
||||
|
||||
try {
|
||||
await this.executeBatchConfig(config);
|
||||
} finally {
|
||||
// 실행 완료 후 플래그 제거
|
||||
this.executingBatches.delete(id);
|
||||
}
|
||||
});
|
||||
|
||||
// 스케줄 시작 (기본적으로 시작되지만 명시적으로 호출)
|
||||
@@ -149,7 +184,7 @@ export class BatchSchedulerService {
|
||||
/**
|
||||
* 배치 설정 실행
|
||||
*/
|
||||
private static async executeBatchConfig(config: any) {
|
||||
static async executeBatchConfig(config: any) {
|
||||
const startTime = new Date();
|
||||
let executionLog: any = null;
|
||||
|
||||
@@ -168,7 +203,11 @@ export class BatchSchedulerService {
|
||||
|
||||
if (!executionLogResponse.success || !executionLogResponse.data) {
|
||||
logger.error(`배치 실행 로그 생성 실패: ${config.batch_name}`, executionLogResponse.message);
|
||||
return;
|
||||
return {
|
||||
totalRecords: 0,
|
||||
successRecords: 0,
|
||||
failedRecords: 1
|
||||
};
|
||||
}
|
||||
|
||||
executionLog = executionLogResponse.data;
|
||||
@@ -187,6 +226,10 @@ export class BatchSchedulerService {
|
||||
});
|
||||
|
||||
logger.info(`배치 실행 완료: ${config.batch_name} (처리된 레코드: ${result.totalRecords})`);
|
||||
|
||||
// 성공 결과 반환
|
||||
return result;
|
||||
|
||||
} catch (error) {
|
||||
logger.error(`배치 실행 실패: ${config.batch_name}`, error);
|
||||
|
||||
@@ -200,6 +243,13 @@ export class BatchSchedulerService {
|
||||
error_details: error instanceof Error ? error.stack : String(error)
|
||||
});
|
||||
}
|
||||
|
||||
// 실패 시에도 결과 반환
|
||||
return {
|
||||
totalRecords: 0,
|
||||
successRecords: 0,
|
||||
failedRecords: 1
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -722,38 +722,56 @@ export class BatchService {
|
||||
const updateColumns = columns.filter(col => col !== primaryKeyColumn);
|
||||
const updateSet = updateColumns.map(col => `${col} = EXCLUDED.${col}`).join(', ');
|
||||
|
||||
// 먼저 해당 레코드가 존재하는지 확인
|
||||
const checkQuery = `SELECT COUNT(*) as count FROM ${tableName} WHERE ${primaryKeyColumn} = $1`;
|
||||
const existsResult = await prisma.$queryRawUnsafe(checkQuery, record[primaryKeyColumn]);
|
||||
const exists = (existsResult as any)[0]?.count > 0;
|
||||
|
||||
let query: string;
|
||||
if (exists && updateSet) {
|
||||
// 기존 레코드가 있으면 UPDATE (값이 다른 경우에만)
|
||||
const whereConditions = updateColumns.map((col, index) =>
|
||||
`${col} IS DISTINCT FROM $${index + 2}`
|
||||
).join(' OR ');
|
||||
// 트랜잭션 내에서 처리하여 연결 관리 최적화
|
||||
const result = await prisma.$transaction(async (tx) => {
|
||||
// 먼저 해당 레코드가 존재하는지 확인
|
||||
const checkQuery = `SELECT COUNT(*) as count FROM ${tableName} WHERE ${primaryKeyColumn} = $1`;
|
||||
const existsResult = await tx.$queryRawUnsafe(checkQuery, record[primaryKeyColumn]);
|
||||
const exists = (existsResult as any)[0]?.count > 0;
|
||||
|
||||
query = `UPDATE ${tableName} SET ${updateSet.replace(/EXCLUDED\./g, '')}
|
||||
WHERE ${primaryKeyColumn} = $1 AND (${whereConditions})`;
|
||||
let operationResult = 'no_change';
|
||||
|
||||
// 파라미터: [primaryKeyValue, ...updateValues]
|
||||
const updateValues = [record[primaryKeyColumn], ...updateColumns.map(col => record[col])];
|
||||
const updateResult = await prisma.$executeRawUnsafe(query, ...updateValues);
|
||||
|
||||
if (updateResult > 0) {
|
||||
console.log(`[BatchService] 레코드 업데이트: ${primaryKeyColumn}=${record[primaryKeyColumn]}`);
|
||||
if (exists && updateSet) {
|
||||
// 기존 레코드가 있으면 UPDATE (값이 다른 경우에만)
|
||||
const whereConditions = updateColumns.map((col, index) => {
|
||||
// 날짜/시간 컬럼에 대한 타입 캐스팅 처리
|
||||
if (col.toLowerCase().includes('date') ||
|
||||
col.toLowerCase().includes('time') ||
|
||||
col.toLowerCase().includes('created') ||
|
||||
col.toLowerCase().includes('updated') ||
|
||||
col.toLowerCase().includes('reg')) {
|
||||
return `${col} IS DISTINCT FROM $${index + 2}::timestamp`;
|
||||
}
|
||||
return `${col} IS DISTINCT FROM $${index + 2}`;
|
||||
}).join(' OR ');
|
||||
|
||||
const query = `UPDATE ${tableName} SET ${updateSet.replace(/EXCLUDED\./g, '')}
|
||||
WHERE ${primaryKeyColumn} = $1 AND (${whereConditions})`;
|
||||
|
||||
// 파라미터: [primaryKeyValue, ...updateValues]
|
||||
const updateValues = [record[primaryKeyColumn], ...updateColumns.map(col => record[col])];
|
||||
const updateResult = await tx.$executeRawUnsafe(query, ...updateValues);
|
||||
|
||||
if (updateResult > 0) {
|
||||
console.log(`[BatchService] 레코드 업데이트: ${primaryKeyColumn}=${record[primaryKeyColumn]}`);
|
||||
operationResult = 'updated';
|
||||
} else {
|
||||
console.log(`[BatchService] 레코드 변경사항 없음: ${primaryKeyColumn}=${record[primaryKeyColumn]}`);
|
||||
operationResult = 'no_change';
|
||||
}
|
||||
} else if (!exists) {
|
||||
// 새 레코드 삽입
|
||||
const query = `INSERT INTO ${tableName} (${columns.join(', ')}) VALUES (${placeholders})`;
|
||||
await tx.$executeRawUnsafe(query, ...values);
|
||||
console.log(`[BatchService] 새 레코드 삽입: ${primaryKeyColumn}=${record[primaryKeyColumn]}`);
|
||||
operationResult = 'inserted';
|
||||
} else {
|
||||
console.log(`[BatchService] 레코드 변경사항 없음: ${primaryKeyColumn}=${record[primaryKeyColumn]}`);
|
||||
console.log(`[BatchService] 레코드 이미 존재 (변경사항 없음): ${primaryKeyColumn}=${record[primaryKeyColumn]}`);
|
||||
operationResult = 'no_change';
|
||||
}
|
||||
} else if (!exists) {
|
||||
// 새 레코드 삽입
|
||||
query = `INSERT INTO ${tableName} (${columns.join(', ')}) VALUES (${placeholders})`;
|
||||
await prisma.$executeRawUnsafe(query, ...values);
|
||||
console.log(`[BatchService] 새 레코드 삽입: ${primaryKeyColumn}=${record[primaryKeyColumn]}`);
|
||||
} else {
|
||||
console.log(`[BatchService] 레코드 이미 존재 (변경사항 없음): ${primaryKeyColumn}=${record[primaryKeyColumn]}`);
|
||||
}
|
||||
|
||||
return operationResult;
|
||||
});
|
||||
|
||||
successCount++;
|
||||
} catch (error) {
|
||||
|
||||
Reference in New Issue
Block a user