REST API→DB 토큰 배치 및 auth_tokens 저장 구현

This commit is contained in:
dohyeons
2025-11-27 11:32:19 +09:00
parent ed56e14aa2
commit 707328e765
16 changed files with 1459 additions and 1964 deletions

View File

@@ -1,258 +1,114 @@
// 배치 스케줄러 서비스
// 작성일: 2024-12-24
import * as cron from "node-cron";
import { query, queryOne } from "../database/db";
import cron from "node-cron";
import { BatchService } from "./batchService";
import { BatchExecutionLogService } from "./batchExecutionLogService";
import { logger } from "../utils/logger";
export class BatchSchedulerService {
private static scheduledTasks: Map<number, cron.ScheduledTask> = new Map();
private static isInitialized = false;
private static executingBatches: Set<number> = new Set(); // 실행 중인 배치 추적
/**
* 스케줄 초기화
* 모든 활성 배치의 스케줄 초기화
*/
static async initialize() {
static async initializeScheduler() {
try {
logger.info("배치 스케줄러 초기화 시작...");
logger.info("배치 스케줄러 초기화 시작");
// 기존 모든 스케줄 정리 (중복 방지)
this.clearAllSchedules();
const batchConfigsResponse = await BatchService.getBatchConfigs({
is_active: "Y",
});
// 활성화된 배치 설정들을 로드하여 스케줄 등록
await this.loadActiveBatchConfigs();
this.isInitialized = true;
logger.info("배치 스케줄러 초기화 완료");
} catch (error) {
logger.error("배치 스케줄러 초기화 실패:", error);
throw error;
}
}
/**
* 모든 스케줄 정리
*/
private static clearAllSchedules() {
logger.info(`기존 스케줄 ${this.scheduledTasks.size}개 정리 중...`);
for (const [id, task] of this.scheduledTasks) {
try {
task.stop();
task.destroy();
logger.info(`스케줄 정리 완료: ID ${id}`);
} catch (error) {
logger.error(`스케줄 정리 실패: ID ${id}`, error);
}
}
this.scheduledTasks.clear();
this.isInitialized = false;
logger.info("모든 스케줄 정리 완료");
}
/**
* 활성화된 배치 설정들을 로드하여 스케줄 등록
*/
private static async loadActiveBatchConfigs() {
try {
const activeConfigs = await query<any>(
`SELECT
bc.*,
json_agg(
json_build_object(
'id', bm.id,
'batch_config_id', bm.batch_config_id,
'from_connection_type', bm.from_connection_type,
'from_connection_id', bm.from_connection_id,
'from_table_name', bm.from_table_name,
'from_column_name', bm.from_column_name,
'from_column_type', bm.from_column_type,
'to_connection_type', bm.to_connection_type,
'to_connection_id', bm.to_connection_id,
'to_table_name', bm.to_table_name,
'to_column_name', bm.to_column_name,
'to_column_type', bm.to_column_type,
'mapping_order', bm.mapping_order,
'from_api_url', bm.from_api_url,
'from_api_key', bm.from_api_key,
'from_api_method', bm.from_api_method,
'from_api_param_type', bm.from_api_param_type,
'from_api_param_name', bm.from_api_param_name,
'from_api_param_value', bm.from_api_param_value,
'from_api_param_source', bm.from_api_param_source,
'to_api_url', bm.to_api_url,
'to_api_key', bm.to_api_key,
'to_api_method', bm.to_api_method,
'to_api_body', bm.to_api_body
)
) FILTER (WHERE bm.id IS NOT NULL) as batch_mappings
FROM batch_configs bc
LEFT JOIN batch_mappings bm ON bc.id = bm.batch_config_id
WHERE bc.is_active = 'Y'
GROUP BY bc.id`,
[]
);
logger.info(`활성화된 배치 설정 ${activeConfigs.length}개 발견`);
for (const config of activeConfigs) {
await this.scheduleBatchConfig(config);
}
} catch (error) {
logger.error("활성화된 배치 설정 로드 실패:", error);
throw error;
}
}
/**
* 배치 설정을 스케줄에 등록
*/
static async scheduleBatchConfig(config: any) {
try {
const { id, batch_name, cron_schedule } = config;
// 기존 스케줄이 있다면 제거
if (this.scheduledTasks.has(id)) {
this.scheduledTasks.get(id)?.stop();
this.scheduledTasks.delete(id);
}
// cron 스케줄 유효성 검사
if (!cron.validate(cron_schedule)) {
logger.error(`잘못된 cron 스케줄: ${cron_schedule} (배치 ID: ${id})`);
if (!batchConfigsResponse.success || !batchConfigsResponse.data) {
logger.warn("스케줄링할 활성 배치 설정이 없습니다.");
return;
}
// 새로운 스케줄 등록
const task = cron.schedule(cron_schedule, async () => {
// 중복 실행 방지 체크
if (this.executingBatches.has(id)) {
logger.warn(
`⚠️ 배치가 이미 실행 중입니다. 건너뜀: ${batch_name} (ID: ${id})`
);
return;
}
const batchConfigs = batchConfigsResponse.data;
logger.info(`${batchConfigs.length}개의 배치 설정 스케줄링 등록`);
logger.info(`🔄 스케줄 배치 실행 시작: ${batch_name} (ID: ${id})`);
for (const config of batchConfigs) {
await this.scheduleBatch(config);
}
// 실행 중 플래그 설정
this.executingBatches.add(id);
logger.info("배치 스케줄러 초기화 완료");
} catch (error) {
logger.error("배치 스케줄러 초기화 중 오류 발생:", error);
}
}
try {
await this.executeBatchConfig(config);
} finally {
// 실행 완료 후 플래그 제거
this.executingBatches.delete(id);
}
/**
* 개별 배치 작업 스케줄링
*/
static async scheduleBatch(config: any) {
try {
// 기존 스케줄이 있으면 제거
if (this.scheduledTasks.has(config.id)) {
this.scheduledTasks.get(config.id)?.stop();
this.scheduledTasks.delete(config.id);
}
if (config.is_active !== "Y") {
logger.info(
`배치 스케줄링 건너뜀 (비활성 상태): ${config.batch_name} (ID: ${config.id})`
);
return;
}
if (!cron.validate(config.cron_schedule)) {
logger.error(
`유효하지 않은 Cron 표현식: ${config.cron_schedule} (Batch ID: ${config.id})`
);
return;
}
logger.info(
`배치 스케줄 등록: ${config.batch_name} (ID: ${config.id}, Cron: ${config.cron_schedule})`
);
const task = cron.schedule(config.cron_schedule, async () => {
logger.info(
`스케줄에 의한 배치 실행 시작: ${config.batch_name} (ID: ${config.id})`
);
await this.executeBatchConfig(config);
});
// 스케줄 시작 (기본적으로 시작되지만 명시적으로 호출)
task.start();
this.scheduledTasks.set(id, task);
logger.info(
`배치 스케줄 등록 완료: ${batch_name} (ID: ${id}, Schedule: ${cron_schedule}) - 스케줄 시작됨`
);
this.scheduledTasks.set(config.id, task);
} catch (error) {
logger.error(`배치 스케줄 등록 실패 (ID: ${config.id}):`, error);
logger.error(`배치 스케줄링 중 오류 발생 (ID: ${config.id}):`, error);
}
}
/**
* 배치 설정 스케줄 제거
*/
static async unscheduleBatchConfig(batchConfigId: number) {
try {
if (this.scheduledTasks.has(batchConfigId)) {
this.scheduledTasks.get(batchConfigId)?.stop();
this.scheduledTasks.delete(batchConfigId);
logger.info(`배치 스케줄 제거 완료 (ID: ${batchConfigId})`);
}
} catch (error) {
logger.error(`배치 스케줄 제거 실패 (ID: ${batchConfigId}):`, error);
}
}
/**
* 배치 설정 업데이트 시 스케줄 재등록
* 배치 스케줄 업데이트 (설정 변경 시 호출)
*/
static async updateBatchSchedule(
configId: number,
executeImmediately: boolean = true
) {
try {
// 기존 스케줄 제거
await this.unscheduleBatchConfig(configId);
// 업데이트된 배치 설정 조회
const configResult = await query<any>(
`SELECT
bc.*,
json_agg(
json_build_object(
'id', bm.id,
'batch_config_id', bm.batch_config_id,
'from_connection_type', bm.from_connection_type,
'from_connection_id', bm.from_connection_id,
'from_table_name', bm.from_table_name,
'from_column_name', bm.from_column_name,
'from_column_type', bm.from_column_type,
'to_connection_type', bm.to_connection_type,
'to_connection_id', bm.to_connection_id,
'to_table_name', bm.to_table_name,
'to_column_name', bm.to_column_name,
'to_column_type', bm.to_column_type,
'mapping_order', bm.mapping_order,
'from_api_url', bm.from_api_url,
'from_api_key', bm.from_api_key,
'from_api_method', bm.from_api_method,
'from_api_param_type', bm.from_api_param_type,
'from_api_param_name', bm.from_api_param_name,
'from_api_param_value', bm.from_api_param_value,
'from_api_param_source', bm.from_api_param_source,
'to_api_url', bm.to_api_url,
'to_api_key', bm.to_api_key,
'to_api_method', bm.to_api_method,
'to_api_body', bm.to_api_body
)
) FILTER (WHERE bm.id IS NOT NULL) as batch_mappings
FROM batch_configs bc
LEFT JOIN batch_mappings bm ON bc.id = bm.batch_config_id
WHERE bc.id = $1
GROUP BY bc.id`,
[configId]
);
const config = configResult[0] || null;
if (!config) {
logger.warn(`배치 설정을 찾을 수 없습니다: ID ${configId}`);
const result = await BatchService.getBatchConfigById(configId);
if (!result.success || !result.data) {
// 설정이 없으면 스케줄 제거
if (this.scheduledTasks.has(configId)) {
this.scheduledTasks.get(configId)?.stop();
this.scheduledTasks.delete(configId);
}
return;
}
// 활성화된 배치만 다시 스케줄 등록
if (config.is_active === "Y") {
await this.scheduleBatchConfig(config);
logger.info(
`배치 스케줄 업데이트 완료: ${config.batch_name} (ID: ${configId})`
);
const config = result.data;
// 활성화 시 즉시 실행 (옵션)
if (executeImmediately) {
logger.info(
`🚀 배치 활성화 즉시 실행: ${config.batch_name} (ID: ${configId})`
);
await this.executeBatchConfig(config);
}
} else {
logger.info(
`비활성화된 배치 스케줄 제거: ${config.batch_name} (ID: ${configId})`
// 스케줄 재등록
await this.scheduleBatch(config);
// 즉시 실행 옵션이 있으면 실행
/*
if (executeImmediately && config.is_active === "Y") {
logger.info(`배치 설정 변경 후 즉시 실행: ${config.batch_name}`);
this.executeBatchConfig(config).catch((err) =>
logger.error(`즉시 실행 중 오류 발생:`, err)
);
}
*/
} catch (error) {
logger.error(`배치 스케줄 업데이트 실패: ID ${configId}`, error);
}
@@ -272,6 +128,7 @@ export class BatchSchedulerService {
const executionLogResponse =
await BatchExecutionLogService.createExecutionLog({
batch_config_id: config.id,
company_code: config.company_code,
execution_status: "RUNNING",
start_time: startTime,
total_records: 0,
@@ -313,21 +170,20 @@ export class BatchSchedulerService {
// 성공 결과 반환
return result;
} catch (error) {
logger.error(`배치 실행 실패: ${config.batch_name}`, error);
logger.error(`배치 실행 중 오류 발생: ${config.batch_name}`, error);
// 실행 로그 업데이트 (실패)
if (executionLog) {
await BatchExecutionLogService.updateExecutionLog(executionLog.id, {
execution_status: "FAILED",
execution_status: "FAILURE",
end_time: new Date(),
duration_ms: Date.now() - startTime.getTime(),
error_message:
error instanceof Error ? error.message : "알 수 없는 오류",
error_details: error instanceof Error ? error.stack : String(error),
});
}
// 실패 시에도 결과 반환
// 실패 결과 반환
return {
totalRecords: 0,
successRecords: 0,
@@ -379,6 +235,8 @@ export class BatchSchedulerService {
const { BatchExternalDbService } = await import(
"./batchExternalDbService"
);
// 👇 Body 파라미터 추가 (POST 요청 시)
const apiResult = await BatchExternalDbService.getDataFromRestApi(
firstMapping.from_api_url!,
firstMapping.from_api_key!,
@@ -394,7 +252,9 @@ export class BatchSchedulerService {
firstMapping.from_api_param_type,
firstMapping.from_api_param_name,
firstMapping.from_api_param_value,
firstMapping.from_api_param_source
firstMapping.from_api_param_source,
// 👇 Body 전달 (FROM - REST API - POST 요청)
firstMapping.from_api_body
);
if (apiResult.success && apiResult.data) {
@@ -416,6 +276,17 @@ export class BatchSchedulerService {
totalRecords += fromData.length;
// 컬럼 매핑 적용하여 TO 테이블 형식으로 변환
// 유틸리티 함수: 점 표기법을 사용하여 중첩된 객체 값 가져오기
const getValueByPath = (obj: any, path: string) => {
if (!path) return undefined;
// path가 'response.access_token' 처럼 점을 포함하는 경우
if (path.includes(".")) {
return path.split(".").reduce((acc, part) => acc && acc[part], obj);
}
// 단순 키인 경우
return obj[path];
};
const mappedData = fromData.map((row) => {
const mappedRow: any = {};
for (const mapping of mappings) {
@@ -428,8 +299,11 @@ export class BatchSchedulerService {
mappedRow[mapping.from_column_name] =
row[mapping.from_column_name];
} else {
// 기존 로직: to_column_name을 키로 사용
mappedRow[mapping.to_column_name] = row[mapping.from_column_name];
// REST API -> DB (POST 요청 포함) 또는 DB -> DB
// row[mapping.from_column_name] 대신 getValueByPath 사용
const value = getValueByPath(row, mapping.from_column_name);
mappedRow[mapping.to_column_name] = value;
}
}
return mappedRow;
@@ -482,22 +356,12 @@ export class BatchSchedulerService {
);
}
} else {
// 기존 REST API 전송 (REST API → DB 배치)
const apiResult = await BatchExternalDbService.sendDataToRestApi(
firstMapping.to_api_url!,
firstMapping.to_api_key!,
firstMapping.to_table_name,
(firstMapping.to_api_method as "POST" | "PUT") || "POST",
mappedData
// 기존 REST API 전송 (REST API → DB 배치) - 사실 이 경우는 거의 없음 (REST to REST)
// 지원하지 않음
logger.warn(
"REST API -> REST API (단순 매핑)은 아직 지원하지 않습니다."
);
if (apiResult.success && apiResult.data) {
insertResult = apiResult.data;
} else {
throw new Error(
`REST API 데이터 전송 실패: ${apiResult.message}`
);
}
insertResult = { successCount: 0, failedCount: 0 };
}
} else {
// DB에 데이터 삽입
@@ -511,167 +375,13 @@ export class BatchSchedulerService {
successRecords += insertResult.successCount;
failedRecords += insertResult.failedCount;
logger.info(
`테이블 처리 완료: ${insertResult.successCount}개 성공, ${insertResult.failedCount}개 실패`
);
} catch (error) {
logger.error(`테이블 처리 실패: ${tableKey}`, error);
failedRecords += 1;
logger.error(`테이블 처리 중 오류 발생: ${tableKey}`, error);
// 해당 테이블 처리 실패는 전체 실패로 간주하지 않고, 실패 카운트만 증가?
// 여기서는 일단 실패 로그만 남기고 계속 진행 (필요시 정책 변경)
}
}
return { totalRecords, successRecords, failedRecords };
}
/**
* 배치 매핑 처리 (기존 메서드 - 사용 안 함)
*/
private static async processBatchMappings(config: any) {
const { batch_mappings } = config;
let totalRecords = 0;
let successRecords = 0;
let failedRecords = 0;
if (!batch_mappings || batch_mappings.length === 0) {
logger.warn(`배치 매핑이 없습니다: ${config.batch_name}`);
return { totalRecords, successRecords, failedRecords };
}
for (const mapping of batch_mappings) {
try {
logger.info(
`매핑 처리 시작: ${mapping.from_table_name} -> ${mapping.to_table_name}`
);
// FROM 테이블에서 데이터 조회
const fromData = await this.getDataFromSource(mapping);
totalRecords += fromData.length;
// TO 테이블에 데이터 삽입
const insertResult = await this.insertDataToTarget(mapping, fromData);
successRecords += insertResult.successCount;
failedRecords += insertResult.failedCount;
logger.info(
`매핑 처리 완료: ${insertResult.successCount}개 성공, ${insertResult.failedCount}개 실패`
);
} catch (error) {
logger.error(
`매핑 처리 실패: ${mapping.from_table_name} -> ${mapping.to_table_name}`,
error
);
failedRecords += 1;
}
}
return { totalRecords, successRecords, failedRecords };
}
/**
* FROM 테이블에서 데이터 조회
*/
private static async getDataFromSource(mapping: any) {
try {
if (mapping.from_connection_type === "internal") {
// 내부 DB에서 조회
const result = await query<any>(
`SELECT * FROM ${mapping.from_table_name}`,
[]
);
return result;
} else {
// 외부 DB에서 조회 (구현 필요)
logger.warn("외부 DB 조회는 아직 구현되지 않았습니다.");
return [];
}
} catch (error) {
logger.error(
`FROM 테이블 데이터 조회 실패: ${mapping.from_table_name}`,
error
);
throw error;
}
}
/**
* TO 테이블에 데이터 삽입
*/
private static async insertDataToTarget(mapping: any, data: any[]) {
let successCount = 0;
let failedCount = 0;
try {
if (mapping.to_connection_type === "internal") {
// 내부 DB에 삽입
for (const record of data) {
try {
// 매핑된 컬럼만 추출
const mappedData = this.mapColumns(record, mapping);
const columns = Object.keys(mappedData);
const values = Object.values(mappedData);
const placeholders = values.map((_, i) => `$${i + 1}`).join(", ");
await query(
`INSERT INTO ${mapping.to_table_name} (${columns.join(", ")}) VALUES (${placeholders})`,
values
);
successCount++;
} catch (error) {
logger.error(`레코드 삽입 실패:`, error);
failedCount++;
}
}
} else {
// 외부 DB에 삽입 (구현 필요)
logger.warn("외부 DB 삽입은 아직 구현되지 않았습니다.");
failedCount = data.length;
}
} catch (error) {
logger.error(
`TO 테이블 데이터 삽입 실패: ${mapping.to_table_name}`,
error
);
throw error;
}
return { successCount, failedCount };
}
/**
* 컬럼 매핑
*/
private static mapColumns(record: any, mapping: any) {
const mappedData: any = {};
// 단순한 컬럼 매핑 (실제로는 더 복잡한 로직 필요)
mappedData[mapping.to_column_name] = record[mapping.from_column_name];
return mappedData;
}
/**
* 모든 스케줄 중지
*/
static async stopAllSchedules() {
try {
for (const [id, task] of this.scheduledTasks) {
task.stop();
logger.info(`배치 스케줄 중지: ID ${id}`);
}
this.scheduledTasks.clear();
this.isInitialized = false;
logger.info("모든 배치 스케줄이 중지되었습니다.");
} catch (error) {
logger.error("배치 스케줄 중지 실패:", error);
}
}
/**
* 현재 등록된 스케줄 목록 조회
*/
static getScheduledTasks() {
return Array.from(this.scheduledTasks.keys());
}
}