feat: 배치 관리 시스템 구현
✨ 주요 기능: - 배치 설정 관리 (생성/수정/삭제/실행) - 배치 실행 로그 관리 및 모니터링 - 배치 스케줄러 자동 실행 (cron 기반) - 외부 DB 연결을 통한 데이터 동기화 - Oracle, MSSQL, MariaDB 커넥터 지원 🔧 백엔드 구현: - BatchManagementController: 배치 설정 CRUD - BatchExecutionLogController: 실행 로그 관리 - BatchSchedulerService: 자동 스케줄링 - BatchExternalDbService: 외부 DB 연동 - 배치 관련 테이블 스키마 추가 🎨 프론트엔드 구현: - 배치 관리 대시보드 UI - 배치 생성/수정 폼 - 실행 로그 모니터링 화면 - 수동 실행 및 상태 관리 🛡️ 안전성: - 기존 시스템과 독립적 구현 - 트랜잭션 기반 안전한 데이터 처리 - 에러 핸들링 및 로깅 강화
This commit is contained in:
@@ -1,7 +1,7 @@
|
||||
// 배치관리 서비스
|
||||
// 작성일: 2024-12-24
|
||||
|
||||
import { PrismaClient } from "@prisma/client";
|
||||
import prisma from "../config/database";
|
||||
import {
|
||||
BatchConfig,
|
||||
BatchMapping,
|
||||
@@ -12,12 +12,12 @@ import {
|
||||
ConnectionInfo,
|
||||
TableInfo,
|
||||
ColumnInfo,
|
||||
CreateBatchConfigRequest,
|
||||
UpdateBatchConfigRequest,
|
||||
} from "../types/batchTypes";
|
||||
import { ExternalDbConnectionService } from "./externalDbConnectionService";
|
||||
import { BatchExternalDbService } from "./batchExternalDbService";
|
||||
import { DbConnectionManager } from "./dbConnectionManager";
|
||||
|
||||
const prisma = new PrismaClient();
|
||||
|
||||
export class BatchService {
|
||||
/**
|
||||
* 배치 설정 목록 조회
|
||||
@@ -55,17 +55,32 @@ export class BatchService {
|
||||
];
|
||||
}
|
||||
|
||||
const batchConfigs = await prisma.batch_configs.findMany({
|
||||
where,
|
||||
include: {
|
||||
batch_mappings: true,
|
||||
},
|
||||
orderBy: [{ is_active: "desc" }, { batch_name: "asc" }],
|
||||
});
|
||||
const page = filter.page || 1;
|
||||
const limit = filter.limit || 10;
|
||||
const skip = (page - 1) * limit;
|
||||
|
||||
const [batchConfigs, total] = await Promise.all([
|
||||
prisma.batch_configs.findMany({
|
||||
where,
|
||||
include: {
|
||||
batch_mappings: true,
|
||||
},
|
||||
orderBy: [{ is_active: "desc" }, { batch_name: "asc" }],
|
||||
skip,
|
||||
take: limit,
|
||||
}),
|
||||
prisma.batch_configs.count({ where }),
|
||||
]);
|
||||
|
||||
return {
|
||||
success: true,
|
||||
data: batchConfigs as BatchConfig[],
|
||||
pagination: {
|
||||
page,
|
||||
limit,
|
||||
total,
|
||||
totalPages: Math.ceil(total / limit),
|
||||
},
|
||||
};
|
||||
} catch (error) {
|
||||
console.error("배치 설정 목록 조회 오류:", error);
|
||||
@@ -122,28 +137,18 @@ export class BatchService {
|
||||
* 배치 설정 생성
|
||||
*/
|
||||
static async createBatchConfig(
|
||||
data: BatchMappingRequest,
|
||||
data: CreateBatchConfigRequest,
|
||||
userId?: string
|
||||
): Promise<ApiResponse<BatchConfig>> {
|
||||
try {
|
||||
// 매핑 유효성 검사
|
||||
const validation = await this.validateBatchMappings(data.mappings);
|
||||
if (!validation.isValid) {
|
||||
return {
|
||||
success: false,
|
||||
message: "매핑 유효성 검사 실패",
|
||||
error: validation.errors.join(", "),
|
||||
};
|
||||
}
|
||||
|
||||
// 트랜잭션으로 배치 설정과 매핑 생성
|
||||
const result = await prisma.$transaction(async (tx) => {
|
||||
// 배치 설정 생성
|
||||
const batchConfig = await tx.batch_configs.create({
|
||||
data: {
|
||||
batch_name: data.batch_name,
|
||||
batch_name: data.batchName,
|
||||
description: data.description,
|
||||
cron_schedule: data.cron_schedule,
|
||||
cron_schedule: data.cronSchedule,
|
||||
created_by: userId,
|
||||
updated_by: userId,
|
||||
},
|
||||
@@ -198,7 +203,7 @@ export class BatchService {
|
||||
*/
|
||||
static async updateBatchConfig(
|
||||
id: number,
|
||||
data: Partial<BatchMappingRequest>,
|
||||
data: UpdateBatchConfigRequest,
|
||||
userId?: string
|
||||
): Promise<ApiResponse<BatchConfig>> {
|
||||
try {
|
||||
@@ -215,18 +220,6 @@ export class BatchService {
|
||||
};
|
||||
}
|
||||
|
||||
// 매핑이 제공된 경우 유효성 검사
|
||||
if (data.mappings) {
|
||||
const validation = await this.validateBatchMappings(data.mappings);
|
||||
if (!validation.isValid) {
|
||||
return {
|
||||
success: false,
|
||||
message: "매핑 유효성 검사 실패",
|
||||
error: validation.errors.join(", "),
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
// 트랜잭션으로 업데이트
|
||||
const result = await prisma.$transaction(async (tx) => {
|
||||
// 배치 설정 업데이트
|
||||
@@ -234,9 +227,10 @@ export class BatchService {
|
||||
updated_by: userId,
|
||||
};
|
||||
|
||||
if (data.batch_name) updateData.batch_name = data.batch_name;
|
||||
if (data.batchName) updateData.batch_name = data.batchName;
|
||||
if (data.description !== undefined) updateData.description = data.description;
|
||||
if (data.cron_schedule) updateData.cron_schedule = data.cron_schedule;
|
||||
if (data.cronSchedule) updateData.cron_schedule = data.cronSchedule;
|
||||
if (data.isActive !== undefined) updateData.is_active = data.isActive;
|
||||
|
||||
const batchConfig = await tx.batch_configs.update({
|
||||
where: { id },
|
||||
@@ -354,16 +348,14 @@ export class BatchService {
|
||||
});
|
||||
|
||||
// 외부 DB 연결 조회
|
||||
const externalConnections = await ExternalDbConnectionService.getConnections({
|
||||
is_active: 'Y',
|
||||
});
|
||||
const externalConnections = await BatchExternalDbService.getAvailableConnections();
|
||||
|
||||
if (externalConnections.success && externalConnections.data) {
|
||||
externalConnections.data.forEach((conn) => {
|
||||
connections.push({
|
||||
type: 'external',
|
||||
id: conn.id,
|
||||
name: conn.connection_name,
|
||||
name: conn.name,
|
||||
db_type: conn.db_type,
|
||||
});
|
||||
});
|
||||
@@ -389,9 +381,9 @@ export class BatchService {
|
||||
static async getTablesFromConnection(
|
||||
connectionType: 'internal' | 'external',
|
||||
connectionId?: number
|
||||
): Promise<ApiResponse<string[]>> {
|
||||
): Promise<ApiResponse<TableInfo[]>> {
|
||||
try {
|
||||
let tables: string[] = [];
|
||||
let tables: TableInfo[] = [];
|
||||
|
||||
if (connectionType === 'internal') {
|
||||
// 내부 DB 테이블 조회
|
||||
@@ -402,10 +394,13 @@ export class BatchService {
|
||||
AND table_type = 'BASE TABLE'
|
||||
ORDER BY table_name
|
||||
`;
|
||||
tables = result.map(row => row.table_name);
|
||||
tables = result.map(row => ({
|
||||
table_name: row.table_name,
|
||||
columns: []
|
||||
}));
|
||||
} else if (connectionType === 'external' && connectionId) {
|
||||
// 외부 DB 테이블 조회
|
||||
const tablesResult = await ExternalDbConnectionService.getTables(connectionId);
|
||||
const tablesResult = await BatchExternalDbService.getTablesFromConnection(connectionType, connectionId);
|
||||
if (tablesResult.success && tablesResult.data) {
|
||||
tables = tablesResult.data;
|
||||
}
|
||||
@@ -430,14 +425,22 @@ export class BatchService {
|
||||
*/
|
||||
static async getTableColumns(
|
||||
connectionType: 'internal' | 'external',
|
||||
tableName: string,
|
||||
connectionId?: number
|
||||
connectionId: number | undefined,
|
||||
tableName: string
|
||||
): Promise<ApiResponse<ColumnInfo[]>> {
|
||||
try {
|
||||
console.log(`[BatchService] getTableColumns 호출:`, {
|
||||
connectionType,
|
||||
connectionId,
|
||||
tableName
|
||||
});
|
||||
|
||||
let columns: ColumnInfo[] = [];
|
||||
|
||||
|
||||
if (connectionType === 'internal') {
|
||||
// 내부 DB 컬럼 조회
|
||||
console.log(`[BatchService] 내부 DB 컬럼 조회 시작: ${tableName}`);
|
||||
|
||||
const result = await prisma.$queryRaw<Array<{
|
||||
column_name: string;
|
||||
data_type: string;
|
||||
@@ -455,26 +458,31 @@ export class BatchService {
|
||||
ORDER BY ordinal_position
|
||||
`;
|
||||
|
||||
console.log(`[BatchService] 내부 DB 컬럼 조회 결과:`, result);
|
||||
|
||||
columns = result.map(row => ({
|
||||
column_name: row.column_name,
|
||||
data_type: row.data_type,
|
||||
is_nullable: row.is_nullable === 'YES',
|
||||
is_nullable: row.is_nullable,
|
||||
column_default: row.column_default,
|
||||
}));
|
||||
} else if (connectionType === 'external' && connectionId) {
|
||||
// 외부 DB 컬럼 조회
|
||||
const columnsResult = await ExternalDbConnectionService.getTableColumns(
|
||||
console.log(`[BatchService] 외부 DB 컬럼 조회 시작: connectionId=${connectionId}, tableName=${tableName}`);
|
||||
|
||||
const columnsResult = await BatchExternalDbService.getTableColumns(
|
||||
connectionType,
|
||||
connectionId,
|
||||
tableName
|
||||
);
|
||||
|
||||
console.log(`[BatchService] 외부 DB 컬럼 조회 결과:`, columnsResult);
|
||||
|
||||
if (columnsResult.success && columnsResult.data) {
|
||||
columns = columnsResult.data.map(col => ({
|
||||
column_name: col.column_name,
|
||||
data_type: col.data_type,
|
||||
is_nullable: col.is_nullable,
|
||||
column_default: col.column_default,
|
||||
}));
|
||||
columns = columnsResult.data;
|
||||
}
|
||||
|
||||
console.log(`[BatchService] 외부 DB 컬럼:`, columns);
|
||||
}
|
||||
|
||||
return {
|
||||
@@ -491,6 +499,228 @@ export class BatchService {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 배치 실행 로그 생성
|
||||
*/
|
||||
static async createExecutionLog(data: {
|
||||
batch_config_id: number;
|
||||
execution_status: string;
|
||||
start_time: Date;
|
||||
total_records: number;
|
||||
success_records: number;
|
||||
failed_records: number;
|
||||
}): Promise<any> {
|
||||
try {
|
||||
const executionLog = await prisma.batch_execution_logs.create({
|
||||
data: {
|
||||
batch_config_id: data.batch_config_id,
|
||||
execution_status: data.execution_status,
|
||||
start_time: data.start_time,
|
||||
total_records: data.total_records,
|
||||
success_records: data.success_records,
|
||||
failed_records: data.failed_records,
|
||||
},
|
||||
});
|
||||
|
||||
return executionLog;
|
||||
} catch (error) {
|
||||
console.error("배치 실행 로그 생성 오류:", error);
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 배치 실행 로그 업데이트
|
||||
*/
|
||||
static async updateExecutionLog(
|
||||
id: number,
|
||||
data: {
|
||||
execution_status?: string;
|
||||
end_time?: Date;
|
||||
duration_ms?: number;
|
||||
total_records?: number;
|
||||
success_records?: number;
|
||||
failed_records?: number;
|
||||
error_message?: string;
|
||||
}
|
||||
): Promise<void> {
|
||||
try {
|
||||
await prisma.batch_execution_logs.update({
|
||||
where: { id },
|
||||
data,
|
||||
});
|
||||
} catch (error) {
|
||||
console.error("배치 실행 로그 업데이트 오류:", error);
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 테이블에서 데이터 조회 (연결 타입에 따라 내부/외부 DB 구분)
|
||||
*/
|
||||
static async getDataFromTable(
|
||||
tableName: string,
|
||||
connectionType: 'internal' | 'external' = 'internal',
|
||||
connectionId?: number
|
||||
): Promise<any[]> {
|
||||
try {
|
||||
console.log(`[BatchService] 테이블에서 데이터 조회: ${tableName} (${connectionType}${connectionId ? `:${connectionId}` : ''})`);
|
||||
|
||||
if (connectionType === 'internal') {
|
||||
// 내부 DB에서 데이터 조회
|
||||
const result = await prisma.$queryRawUnsafe(`SELECT * FROM ${tableName} LIMIT 100`);
|
||||
console.log(`[BatchService] 내부 DB 데이터 조회 결과: ${Array.isArray(result) ? result.length : 0}개 레코드`);
|
||||
return result as any[];
|
||||
} else if (connectionType === 'external' && connectionId) {
|
||||
// 외부 DB에서 데이터 조회
|
||||
const result = await BatchExternalDbService.getDataFromTable(connectionId, tableName);
|
||||
if (result.success && result.data) {
|
||||
console.log(`[BatchService] 외부 DB 데이터 조회 결과: ${result.data.length}개 레코드`);
|
||||
return result.data;
|
||||
} else {
|
||||
console.error(`외부 DB 데이터 조회 실패: ${result.message}`);
|
||||
return [];
|
||||
}
|
||||
} else {
|
||||
throw new Error(`잘못된 연결 타입 또는 연결 ID: ${connectionType}, ${connectionId}`);
|
||||
}
|
||||
} catch (error) {
|
||||
console.error(`테이블 데이터 조회 오류 (${tableName}):`, error);
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 테이블에서 특정 컬럼들만 조회 (연결 타입에 따라 내부/외부 DB 구분)
|
||||
*/
|
||||
static async getDataFromTableWithColumns(
|
||||
tableName: string,
|
||||
columns: string[],
|
||||
connectionType: 'internal' | 'external' = 'internal',
|
||||
connectionId?: number
|
||||
): Promise<any[]> {
|
||||
try {
|
||||
console.log(`[BatchService] 테이블에서 특정 컬럼 데이터 조회: ${tableName} (${columns.join(', ')}) (${connectionType}${connectionId ? `:${connectionId}` : ''})`);
|
||||
|
||||
if (connectionType === 'internal') {
|
||||
// 내부 DB에서 특정 컬럼만 조회
|
||||
const columnList = columns.join(', ');
|
||||
const result = await prisma.$queryRawUnsafe(`SELECT ${columnList} FROM ${tableName} LIMIT 100`);
|
||||
console.log(`[BatchService] 내부 DB 특정 컬럼 조회 결과: ${Array.isArray(result) ? result.length : 0}개 레코드`);
|
||||
return result as any[];
|
||||
} else if (connectionType === 'external' && connectionId) {
|
||||
// 외부 DB에서 특정 컬럼만 조회
|
||||
const result = await BatchExternalDbService.getDataFromTableWithColumns(connectionId, tableName, columns);
|
||||
if (result.success && result.data) {
|
||||
console.log(`[BatchService] 외부 DB 특정 컬럼 조회 결과: ${result.data.length}개 레코드`);
|
||||
return result.data;
|
||||
} else {
|
||||
console.error(`외부 DB 특정 컬럼 조회 실패: ${result.message}`);
|
||||
return [];
|
||||
}
|
||||
} else {
|
||||
throw new Error(`잘못된 연결 타입 또는 연결 ID: ${connectionType}, ${connectionId}`);
|
||||
}
|
||||
} catch (error) {
|
||||
console.error(`테이블 특정 컬럼 조회 오류 (${tableName}):`, error);
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 테이블에 데이터 삽입 (연결 타입에 따라 내부/외부 DB 구분)
|
||||
*/
|
||||
static async insertDataToTable(
|
||||
tableName: string,
|
||||
data: any[],
|
||||
connectionType: 'internal' | 'external' = 'internal',
|
||||
connectionId?: number
|
||||
): Promise<{
|
||||
successCount: number;
|
||||
failedCount: number;
|
||||
}> {
|
||||
try {
|
||||
console.log(`[BatchService] 테이블에 데이터 삽입: ${tableName} (${connectionType}${connectionId ? `:${connectionId}` : ''}), ${data.length}개 레코드`);
|
||||
|
||||
if (!data || data.length === 0) {
|
||||
return { successCount: 0, failedCount: 0 };
|
||||
}
|
||||
|
||||
if (connectionType === 'internal') {
|
||||
// 내부 DB에 데이터 삽입
|
||||
let successCount = 0;
|
||||
let failedCount = 0;
|
||||
|
||||
// 각 레코드를 개별적으로 삽입 (UPSERT 방식으로 중복 처리)
|
||||
for (const record of data) {
|
||||
try {
|
||||
// 동적 UPSERT 쿼리 생성 (PostgreSQL ON CONFLICT 사용)
|
||||
const columns = Object.keys(record);
|
||||
const values = Object.values(record).map(value => {
|
||||
// Date 객체를 ISO 문자열로 변환 (PostgreSQL이 자동으로 파싱)
|
||||
if (value instanceof Date) {
|
||||
return value.toISOString();
|
||||
}
|
||||
// JavaScript Date 문자열을 Date 객체로 변환 후 ISO 문자열로
|
||||
if (typeof value === 'string') {
|
||||
const dateRegex = /^(Mon|Tue|Wed|Thu|Fri|Sat|Sun)\s+(Jan|Feb|Mar|Apr|May|Jun|Jul|Aug|Sep|Oct|Nov|Dec)\s+\d{2}\s+\d{4}\s+\d{2}:\d{2}:\d{2}/;
|
||||
if (dateRegex.test(value)) {
|
||||
return new Date(value).toISOString();
|
||||
}
|
||||
}
|
||||
return value;
|
||||
});
|
||||
const placeholders = values.map((_, index) => `$${index + 1}`).join(', ');
|
||||
|
||||
// Primary Key 컬럼 추정 (일반적으로 id 또는 첫 번째 컬럼)
|
||||
const primaryKeyColumn = columns.includes('id') ? 'id' :
|
||||
columns.includes('user_id') ? 'user_id' :
|
||||
columns[0];
|
||||
|
||||
// UPDATE SET 절 생성 (Primary Key 제외)
|
||||
const updateColumns = columns.filter(col => col !== primaryKeyColumn);
|
||||
const updateSet = updateColumns.map(col => `${col} = EXCLUDED.${col}`).join(', ');
|
||||
|
||||
let query: string;
|
||||
if (updateSet) {
|
||||
// UPSERT: 중복 시 업데이트
|
||||
query = `INSERT INTO ${tableName} (${columns.join(', ')}) VALUES (${placeholders})
|
||||
ON CONFLICT (${primaryKeyColumn}) DO UPDATE SET ${updateSet}`;
|
||||
} else {
|
||||
// Primary Key만 있는 경우 중복 시 무시
|
||||
query = `INSERT INTO ${tableName} (${columns.join(', ')}) VALUES (${placeholders})
|
||||
ON CONFLICT (${primaryKeyColumn}) DO NOTHING`;
|
||||
}
|
||||
|
||||
await prisma.$executeRawUnsafe(query, ...values);
|
||||
successCount++;
|
||||
} catch (error) {
|
||||
console.error(`레코드 UPSERT 실패:`, error);
|
||||
failedCount++;
|
||||
}
|
||||
}
|
||||
|
||||
console.log(`[BatchService] 내부 DB 데이터 삽입 완료: 성공 ${successCount}개, 실패 ${failedCount}개`);
|
||||
return { successCount, failedCount };
|
||||
} else if (connectionType === 'external' && connectionId) {
|
||||
// 외부 DB에 데이터 삽입
|
||||
const result = await BatchExternalDbService.insertDataToTable(connectionId, tableName, data);
|
||||
if (result.success && result.data) {
|
||||
console.log(`[BatchService] 외부 DB 데이터 삽입 완료: 성공 ${result.data.successCount}개, 실패 ${result.data.failedCount}개`);
|
||||
return result.data;
|
||||
} else {
|
||||
console.error(`외부 DB 데이터 삽입 실패: ${result.message}`);
|
||||
return { successCount: 0, failedCount: data.length };
|
||||
}
|
||||
} else {
|
||||
throw new Error(`잘못된 연결 타입 또는 연결 ID: ${connectionType}, ${connectionId}`);
|
||||
}
|
||||
} catch (error) {
|
||||
console.error(`테이블 데이터 삽입 오류 (${tableName}):`, error);
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 배치 매핑 유효성 검사
|
||||
*/
|
||||
|
||||
Reference in New Issue
Block a user