Files
vexplor/backend-node/src/services/batchService.ts

871 lines
28 KiB
TypeScript

// 배치관리 서비스
// 작성일: 2024-12-24
import { query, queryOne, transaction } from "../database/db";
import {
BatchConfig,
BatchMapping,
BatchConfigFilter,
BatchMappingRequest,
BatchValidationResult,
ApiResponse,
ConnectionInfo,
TableInfo,
ColumnInfo,
CreateBatchConfigRequest,
UpdateBatchConfigRequest,
} from "../types/batchTypes";
import { BatchExternalDbService } from "./batchExternalDbService";
export class BatchService {
/**
* 배치 설정 목록 조회 (회사별)
*/
static async getBatchConfigs(
filter: BatchConfigFilter,
userCompanyCode?: string
): Promise<ApiResponse<BatchConfig[]>> {
try {
const whereConditions: string[] = [];
const values: any[] = [];
let paramIndex = 1;
// 회사별 필터링 (최고 관리자가 아닌 경우 필수)
if (userCompanyCode && userCompanyCode !== "*") {
whereConditions.push(`bc.company_code = $${paramIndex++}`);
values.push(userCompanyCode);
} else if (userCompanyCode === "*" && filter.company_code) {
// 최고 관리자: 필터가 있으면 적용
whereConditions.push(`bc.company_code = $${paramIndex++}`);
values.push(filter.company_code);
}
// 필터 조건 적용
if (filter.is_active) {
whereConditions.push(`bc.is_active = $${paramIndex++}`);
values.push(filter.is_active);
}
// 검색 조건 적용 (OR)
if (filter.search && filter.search.trim()) {
whereConditions.push(
`(bc.batch_name ILIKE $${paramIndex} OR bc.description ILIKE $${paramIndex})`
);
values.push(`%${filter.search.trim()}%`);
paramIndex++;
}
const whereClause =
whereConditions.length > 0
? `WHERE ${whereConditions.join(" AND ")}`
: "";
const page = filter.page || 1;
const limit = filter.limit || 10;
const offset = (page - 1) * limit;
// 전체 카운트 조회
const countResult = await query<{ count: string }>(
`SELECT COUNT(*) as count FROM batch_configs bc ${whereClause}`,
values
);
const total = parseInt(countResult[0].count);
const totalPages = Math.ceil(total / limit);
// 목록 조회
const configs = await query<any>(
`SELECT bc.*
FROM batch_configs bc
${whereClause}
ORDER BY bc.created_date DESC
LIMIT $${paramIndex++} OFFSET $${paramIndex++}`,
[...values, limit, offset]
);
// 매핑 정보 조회 (N+1 문제 해결을 위해 별도 쿼리 대신 여기서는 생략하고 상세 조회에서 처리)
// 하지만 목록에서도 간단한 정보는 필요할 수 있음
return {
success: true,
data: configs as BatchConfig[],
pagination: {
page,
limit,
total,
totalPages,
},
message: `${configs.length}개의 배치 설정을 조회했습니다.`,
};
} catch (error) {
console.error("배치 설정 목록 조회 오류:", error);
return {
success: false,
data: [],
message: "배치 설정 목록 조회에 실패했습니다.",
error: error instanceof Error ? error.message : "알 수 없는 오류",
};
}
}
/**
* 특정 배치 설정 조회 (별칭)
*/
static async getBatchConfig(id: number): Promise<BatchConfig | null> {
const result = await this.getBatchConfigById(id);
if (!result.success || !result.data) {
return null;
}
return result.data;
}
/**
* 배치 설정 상세 조회
*/
static async getBatchConfigById(
id: number
): Promise<ApiResponse<BatchConfig>> {
try {
// 배치 설정 조회
const config = await queryOne<any>(
`SELECT * FROM batch_configs WHERE id = $1`,
[id]
);
if (!config) {
return {
success: false,
message: "배치 설정을 찾을 수 없습니다.",
};
}
// 매핑 정보 조회
const mappings = await query<BatchMapping>(
`SELECT * FROM batch_mappings WHERE batch_config_id = $1 ORDER BY mapping_order ASC`,
[id]
);
const batchConfig: BatchConfig = {
...config,
batch_mappings: mappings,
} as BatchConfig;
return {
success: true,
data: batchConfig,
};
} catch (error) {
console.error("배치 설정 상세 조회 오류:", error);
return {
success: false,
message: "배치 설정 상세 조회에 실패했습니다.",
error: error instanceof Error ? error.message : "알 수 없는 오류",
};
}
}
/**
* 배치 설정 생성
*/
static async createBatchConfig(
data: CreateBatchConfigRequest,
userId?: string
): Promise<ApiResponse<BatchConfig>> {
try {
// 트랜잭션으로 배치 설정과 매핑 생성
const result = await transaction(async (client) => {
// 배치 설정 생성
const batchConfigResult = await client.query(
`INSERT INTO batch_configs
(batch_name, description, cron_schedule, is_active, company_code, save_mode, conflict_key, auth_service_name, data_array_path, created_by, created_date, updated_date)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, NOW(), NOW())
RETURNING *`,
[
data.batchName,
data.description,
data.cronSchedule,
data.isActive || "Y",
data.companyCode,
data.saveMode || "INSERT",
data.conflictKey || null,
data.authServiceName || null,
data.dataArrayPath || null,
userId,
]
);
const batchConfig = batchConfigResult.rows[0];
// 배치 매핑 생성
const mappings = [];
for (let index = 0; index < data.mappings.length; index++) {
const mapping = data.mappings[index];
const mappingResult = await client.query(
`INSERT INTO batch_mappings
(batch_config_id, company_code, from_connection_type, from_connection_id, from_table_name, from_column_name,
from_column_type, from_api_url, from_api_key, from_api_method, from_api_param_type,
from_api_param_name, from_api_param_value, from_api_param_source, from_api_body,
to_connection_type, to_connection_id, to_table_name, to_column_name, to_column_type,
to_api_url, to_api_key, to_api_method, to_api_body, mapping_order, mapping_type, created_by, created_date)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19, $20, $21, $22, $23, $24, $25, $26, $27, NOW())
RETURNING *`,
[
batchConfig.id,
data.companyCode, // 멀티테넌시: 배치 설정과 동일한 company_code 사용
mapping.from_connection_type,
mapping.from_connection_id,
mapping.from_table_name,
mapping.from_column_name,
mapping.from_column_type,
mapping.from_api_url,
mapping.from_api_key,
mapping.from_api_method,
mapping.from_api_param_type,
mapping.from_api_param_name,
mapping.from_api_param_value,
mapping.from_api_param_source,
mapping.from_api_body, // FROM REST API Body
mapping.to_connection_type,
mapping.to_connection_id,
mapping.to_table_name,
mapping.to_column_name,
mapping.to_column_type,
mapping.to_api_url,
mapping.to_api_key,
mapping.to_api_method,
mapping.to_api_body,
mapping.mapping_order || index + 1,
mapping.mapping_type || "direct", // 매핑 타입: direct 또는 fixed
userId,
]
);
mappings.push(mappingResult.rows[0]);
}
return {
...batchConfig,
batch_mappings: mappings,
};
});
return {
success: true,
data: result as BatchConfig,
message: "배치 설정이 성공적으로 생성되었습니다.",
};
} catch (error) {
console.error("배치 설정 생성 오류:", error);
return {
success: false,
message: "배치 설정 생성에 실패했습니다.",
error: error instanceof Error ? error.message : "알 수 없는 오류",
};
}
}
/**
* 배치 설정 수정 (회사별)
*/
static async updateBatchConfig(
id: number,
data: UpdateBatchConfigRequest,
userId?: string,
userCompanyCode?: string
): Promise<ApiResponse<BatchConfig>> {
try {
// 기존 설정 확인
const existingResult = await this.getBatchConfigById(id);
if (!existingResult.success || !existingResult.data) {
throw new Error(
existingResult.message || "배치 설정을 찾을 수 없습니다."
);
}
const existingConfig = existingResult.data;
// 권한 체크 (회사 코드가 다르면 수정 불가)
if (
userCompanyCode &&
userCompanyCode !== "*" &&
existingConfig.company_code !== userCompanyCode
) {
throw new Error("수정 권한이 없습니다.");
}
// 트랜잭션으로 업데이트
const result = await transaction(async (client) => {
// 동적 UPDATE 쿼리 생성
const updateFields: string[] = [
"updated_by = $1",
"updated_date = NOW()",
];
const updateValues: any[] = [userId];
let paramIndex = 2;
if (data.batchName) {
updateFields.push(`batch_name = $${paramIndex++}`);
updateValues.push(data.batchName);
}
if (data.description !== undefined) {
updateFields.push(`description = $${paramIndex++}`);
updateValues.push(data.description);
}
if (data.cronSchedule) {
updateFields.push(`cron_schedule = $${paramIndex++}`);
updateValues.push(data.cronSchedule);
}
if (data.isActive !== undefined) {
updateFields.push(`is_active = $${paramIndex++}`);
updateValues.push(data.isActive);
}
if (data.saveMode !== undefined) {
updateFields.push(`save_mode = $${paramIndex++}`);
updateValues.push(data.saveMode);
}
if (data.conflictKey !== undefined) {
updateFields.push(`conflict_key = $${paramIndex++}`);
updateValues.push(data.conflictKey || null);
}
if (data.authServiceName !== undefined) {
updateFields.push(`auth_service_name = $${paramIndex++}`);
updateValues.push(data.authServiceName || null);
}
if (data.dataArrayPath !== undefined) {
updateFields.push(`data_array_path = $${paramIndex++}`);
updateValues.push(data.dataArrayPath || null);
}
// 배치 설정 업데이트
const batchConfigResult = await client.query(
`UPDATE batch_configs
SET ${updateFields.join(", ")}
WHERE id = $${paramIndex}
RETURNING *`,
[...updateValues, id]
);
const batchConfig = batchConfigResult.rows[0];
// 매핑이 제공된 경우 기존 매핑 삭제 후 새로 생성
if (data.mappings) {
await client.query(
`DELETE FROM batch_mappings WHERE batch_config_id = $1`,
[id]
);
const mappings = [];
for (let index = 0; index < data.mappings.length; index++) {
const mapping = data.mappings[index];
const mappingResult = await client.query(
`INSERT INTO batch_mappings
(batch_config_id, company_code, from_connection_type, from_connection_id, from_table_name, from_column_name,
from_column_type, from_api_url, from_api_key, from_api_method, from_api_param_type,
from_api_param_name, from_api_param_value, from_api_param_source, from_api_body,
to_connection_type, to_connection_id, to_table_name, to_column_name, to_column_type,
to_api_url, to_api_key, to_api_method, to_api_body, mapping_order, mapping_type, created_by, created_date)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19, $20, $21, $22, $23, $24, $25, $26, $27, NOW())
RETURNING *`,
[
id,
existingConfig.company_code, // 기존 설정의 company_code 유지
mapping.from_connection_type,
mapping.from_connection_id,
mapping.from_table_name,
mapping.from_column_name,
mapping.from_column_type,
mapping.from_api_url,
mapping.from_api_key,
mapping.from_api_method,
mapping.from_api_param_type,
mapping.from_api_param_name,
mapping.from_api_param_value,
mapping.from_api_param_source,
mapping.from_api_body, // FROM REST API Body
mapping.to_connection_type,
mapping.to_connection_id,
mapping.to_table_name,
mapping.to_column_name,
mapping.to_column_type,
mapping.to_api_url,
mapping.to_api_key,
mapping.to_api_method,
mapping.to_api_body,
mapping.mapping_order || index + 1,
mapping.mapping_type || "direct", // 매핑 타입: direct 또는 fixed
userId,
]
);
mappings.push(mappingResult.rows[0]);
}
return {
...batchConfig,
batch_mappings: mappings,
};
} else {
return {
...batchConfig,
batch_mappings: existingConfig.batch_mappings,
};
}
});
return {
success: true,
data: result as BatchConfig,
message: "배치 설정이 성공적으로 수정되었습니다.",
};
} catch (error) {
console.error("배치 설정 수정 오류:", error);
return {
success: false,
message: "배치 설정 수정에 실패했습니다.",
error: error instanceof Error ? error.message : "알 수 없는 오류",
};
}
}
/**
* 배치 설정 삭제 (논리 삭제, 회사별)
*/
static async deleteBatchConfig(
id: number,
userId?: string,
userCompanyCode?: string
): Promise<ApiResponse<void>> {
try {
// 기존 설정 확인
const existingResult = await this.getBatchConfigById(id);
if (!existingResult.success || !existingResult.data) {
throw new Error(
existingResult.message || "배치 설정을 찾을 수 없습니다."
);
}
const existingConfig = existingResult.data;
// 권한 체크
if (
userCompanyCode &&
userCompanyCode !== "*" &&
existingConfig.company_code !== userCompanyCode
) {
throw new Error("삭제 권한이 없습니다.");
}
// 물리 삭제 (CASCADE 설정에 따라 매핑도 삭제됨)
await query(`DELETE FROM batch_configs WHERE id = $1`, [id]);
return {
success: true,
message: "배치 설정이 성공적으로 삭제되었습니다.",
};
} catch (error) {
console.error("배치 설정 삭제 오류:", error);
return {
success: false,
message: "배치 설정 삭제에 실패했습니다.",
error: error instanceof Error ? error.message : "알 수 없는 오류",
};
}
}
/**
* DB 연결 정보 조회
*/
static async getConnections(): Promise<ApiResponse<ConnectionInfo[]>> {
try {
// BatchExternalDbService 사용
const result = await BatchExternalDbService.getAvailableConnections();
return result;
} catch (error) {
console.error("DB 연결 목록 조회 오류:", error);
return {
success: false,
data: [],
message: "DB 연결 목록 조회에 실패했습니다.",
error: error instanceof Error ? error.message : "알 수 없는 오류",
};
}
}
/**
* 테이블 목록 조회
*/
static async getTables(
connectionType: "internal" | "external",
connectionId?: number
): Promise<ApiResponse<TableInfo[]>> {
try {
if (connectionType === "internal") {
// 내부 DB 테이블 조회
const tables = await query<TableInfo>(
`SELECT table_name, table_type, table_schema
FROM information_schema.tables
WHERE table_schema = 'public'
AND table_type = 'BASE TABLE'
ORDER BY table_name`
);
return {
success: true,
data: tables,
message: `${tables.length}개의 테이블을 조회했습니다.`,
};
} else if (connectionId) {
// 외부 DB 테이블 조회
return await BatchExternalDbService.getTables(connectionId);
} else {
throw new Error("외부 연결 ID가 필요합니다.");
}
} catch (error) {
console.error("테이블 목록 조회 오류:", error);
return {
success: false,
data: [],
message: "테이블 목록 조회에 실패했습니다.",
error: error instanceof Error ? error.message : "알 수 없는 오류",
};
}
}
/**
* 컬럼 목록 조회
*/
static async getColumns(
tableName: string,
connectionType: "internal" | "external",
connectionId?: number
): Promise<ApiResponse<ColumnInfo[]>> {
try {
if (connectionType === "internal") {
// 내부 DB 컬럼 조회
const columns = await query<ColumnInfo>(
`SELECT column_name, data_type, is_nullable, column_default
FROM information_schema.columns
WHERE table_schema = 'public' AND table_name = $1
ORDER BY ordinal_position`,
[tableName]
);
return {
success: true,
data: columns,
message: `${columns.length}개의 컬럼을 조회했습니다.`,
};
} else if (connectionId) {
// 외부 DB 컬럼 조회
return await BatchExternalDbService.getColumns(connectionId, tableName);
} else {
throw new Error("외부 연결 ID가 필요합니다.");
}
} catch (error) {
console.error("컬럼 목록 조회 오류:", error);
return {
success: false,
data: [],
message: "컬럼 목록 조회에 실패했습니다.",
error: error instanceof Error ? error.message : "알 수 없는 오류",
};
}
}
/**
* 데이터 미리보기
*/
static async previewData(
tableName: string,
connectionType: "internal" | "external",
connectionId?: number
): Promise<ApiResponse<any[]>> {
try {
if (connectionType === "internal") {
// 내부 DB 데이터 조회
const data = await query<any>(`SELECT * FROM ${tableName} LIMIT 10`);
return {
success: true,
data,
message: "데이터 미리보기 성공",
};
} else if (connectionId) {
// 외부 DB 데이터 조회
return await BatchExternalDbService.getDataFromTable(
connectionId,
tableName
);
} else {
throw new Error("외부 연결 ID가 필요합니다.");
}
} catch (error) {
console.error("데이터 미리보기 오류:", error);
return {
success: false,
data: [],
message: "데이터 미리보기에 실패했습니다.",
error: error instanceof Error ? error.message : "알 수 없는 오류",
};
}
}
/**
* REST API 데이터 미리보기
*/
static async previewRestApiData(
apiUrl: string,
apiKey: string,
endpoint: string,
method: "GET" | "POST" | "PUT" | "DELETE" = "GET",
paramInfo?: {
paramType: "url" | "query";
paramName: string;
paramValue: string;
paramSource: "static" | "dynamic";
},
body?: string
): Promise<ApiResponse<any>> {
try {
return await BatchExternalDbService.previewRestApiData(
apiUrl,
apiKey,
endpoint,
method,
paramInfo,
body
);
} catch (error) {
console.error("REST API 미리보기 오류:", error);
return {
success: false,
message: "REST API 미리보기에 실패했습니다.",
error: error instanceof Error ? error.message : "알 수 없는 오류",
};
}
}
/**
* 배치 유효성 검사
*/
static async validateBatch(
config: Partial<CreateBatchConfigRequest>
): Promise<BatchValidationResult> {
const errors: string[] = [];
if (!config.batchName) errors.push("배치 작업명이 필요합니다.");
if (!config.cronSchedule) errors.push("Cron 스케줄이 필요합니다.");
if (!config.mappings || config.mappings.length === 0) {
errors.push("최소 하나 이상의 매핑이 필요합니다.");
}
// 추가 유효성 검사 로직...
return {
isValid: errors.length === 0,
errors,
};
}
/**
* 테이블에서 데이터 조회 (연결 타입에 따라 내부/외부 DB 구분)
*/
static async getDataFromTable(
tableName: string,
connectionType: "internal" | "external" = "internal",
connectionId?: number
): Promise<any[]> {
try {
console.log(
`[BatchService] 테이블에서 데이터 조회: ${tableName} (${connectionType}${connectionId ? `:${connectionId}` : ""})`
);
if (connectionType === "internal") {
// 내부 DB에서 데이터 조회 (주의: SQL 인젝션 위험 - 실제 프로덕션에서는 테이블명 검증 필요)
const result = await query<any>(`SELECT * FROM ${tableName} LIMIT 100`);
console.log(
`[BatchService] 내부 DB 데이터 조회 결과: ${result.length}개 레코드`
);
return result;
} else if (connectionType === "external" && connectionId) {
// 외부 DB에서 데이터 조회
const result = await BatchExternalDbService.getDataFromTable(
connectionId,
tableName
);
if (result.success && result.data) {
console.log(
`[BatchService] 외부 DB 데이터 조회 결과: ${result.data.length}개 레코드`
);
return result.data;
} else {
console.error(`외부 DB 데이터 조회 실패: ${result.message}`);
return [];
}
} else {
throw new Error(
`잘못된 연결 타입 또는 연결 ID: ${connectionType}, ${connectionId}`
);
}
} catch (error) {
console.error(`테이블 데이터 조회 오류 (${tableName}):`, error);
throw error;
}
}
/**
* 테이블에서 특정 컬럼들만 조회 (연결 타입에 따라 내부/외부 DB 구분)
*/
static async getDataFromTableWithColumns(
tableName: string,
columns: string[],
connectionType: "internal" | "external" = "internal",
connectionId?: number
): Promise<any[]> {
try {
console.log(
`[BatchService] 테이블에서 컬럼 지정 데이터 조회: ${tableName} (${connectionType})`
);
if (connectionType === "internal") {
// 내부 DB
const columnString = columns.join(", ");
const result = await query<any>(
`SELECT ${columnString} FROM ${tableName} LIMIT 100`
);
return result;
} else if (connectionType === "external" && connectionId) {
// 외부 DB
const result = await BatchExternalDbService.getDataFromTableWithColumns(
connectionId,
tableName,
columns
);
if (result.success && result.data) {
return result.data;
} else {
throw new Error(result.message || "외부 DB 조회 실패");
}
} else {
throw new Error("잘못된 연결 설정입니다.");
}
} catch (error) {
console.error(`데이터 조회 오류 (${tableName}):`, error);
throw error;
}
}
/**
* 테이블에 데이터 삽입 (연결 타입에 따라 내부/외부 DB 구분)
* @param tableName 테이블명
* @param data 삽입할 데이터 배열
* @param connectionType 연결 타입 (internal/external)
* @param connectionId 외부 연결 ID
* @param saveMode 저장 모드 (INSERT/UPSERT)
* @param conflictKey UPSERT 시 충돌 기준 컬럼명
*/
static async insertDataToTable(
tableName: string,
data: any[],
connectionType: "internal" | "external" = "internal",
connectionId?: number,
saveMode: "INSERT" | "UPSERT" = "INSERT",
conflictKey?: string
): Promise<{
successCount: number;
failedCount: number;
}> {
try {
console.log(
`[BatchService] 테이블에 데이터 ${saveMode}: ${tableName} (${connectionType}${connectionId ? `:${connectionId}` : ""}), ${data.length}개 레코드${conflictKey ? `, 충돌키: ${conflictKey}` : ""}`
);
if (!data || data.length === 0) {
return { successCount: 0, failedCount: 0 };
}
if (connectionType === "internal") {
// 내부 DB에 데이터 삽입
let successCount = 0;
let failedCount = 0;
// 각 레코드를 개별적으로 삽입
for (const record of data) {
try {
const columns = Object.keys(record);
const values = Object.values(record);
const placeholders = values.map((_, i) => `$${i + 1}`).join(", ");
let queryStr: string;
if (saveMode === "UPSERT" && conflictKey) {
// UPSERT 모드: ON CONFLICT DO UPDATE
// 충돌 키를 제외한 컬럼들만 UPDATE
const updateColumns = columns.filter(
(col) => col !== conflictKey
);
// 업데이트할 컬럼이 없으면 DO NOTHING 사용
if (updateColumns.length === 0) {
queryStr = `INSERT INTO ${tableName} (${columns.join(", ")})
VALUES (${placeholders})
ON CONFLICT (${conflictKey})
DO NOTHING`;
} else {
const updateSet = updateColumns
.map((col) => `${col} = EXCLUDED.${col}`)
.join(", ");
// updated_date 컬럼이 있으면 현재 시간으로 업데이트
const hasUpdatedDate = columns.includes("updated_date");
const finalUpdateSet = hasUpdatedDate
? `${updateSet}, updated_date = NOW()`
: updateSet;
queryStr = `INSERT INTO ${tableName} (${columns.join(", ")})
VALUES (${placeholders})
ON CONFLICT (${conflictKey})
DO UPDATE SET ${finalUpdateSet}`;
}
} else {
// INSERT 모드: 기존 방식
queryStr = `INSERT INTO ${tableName} (${columns.join(", ")}) VALUES (${placeholders})`;
}
await query(queryStr, values);
successCount++;
} catch (insertError) {
console.error(
`내부 DB 데이터 ${saveMode} 실패 (${tableName}):`,
insertError
);
failedCount++;
}
}
return { successCount, failedCount };
} else if (connectionType === "external" && connectionId) {
// 외부 DB에 데이터 삽입 (UPSERT는 내부 DB만 지원)
if (saveMode === "UPSERT") {
console.warn(
`[BatchService] 외부 DB는 UPSERT를 지원하지 않습니다. INSERT로 실행합니다.`
);
}
const result = await BatchExternalDbService.insertDataToTable(
connectionId,
tableName,
data
);
if (result.success && result.data) {
return result.data;
} else {
console.error(`외부 DB 데이터 삽입 실패: ${result.message}`);
// 실패 시 전체 실패로 간주하지 않고 0/전체 로 반환
return { successCount: 0, failedCount: data.length };
}
} else {
throw new Error(
`잘못된 연결 타입 또는 연결 ID: ${connectionType}, ${connectionId}`
);
}
} catch (error) {
console.error(`데이터 ${saveMode} 오류 (${tableName}):`, error);
return { successCount: 0, failedCount: data ? data.length : 0 };
}
}
}