배치 UPSERT 기능 및 고정값 매핑 버그 수정

This commit is contained in:
dohyeons
2025-12-04 17:26:29 +09:00
parent 7a2f80b646
commit ef3b85f343
9 changed files with 1176 additions and 576 deletions

View File

@@ -2,6 +2,7 @@ import cron, { ScheduledTask } from "node-cron";
import { BatchService } from "./batchService";
import { BatchExecutionLogService } from "./batchExecutionLogService";
import { logger } from "../utils/logger";
import { query } from "../database/db";
export class BatchSchedulerService {
private static scheduledTasks: Map<number, ScheduledTask> = new Map();
@@ -214,9 +215,16 @@ export class BatchSchedulerService {
}
// 테이블별로 매핑을 그룹화
// 고정값 매핑(mapping_type === 'fixed')은 별도 그룹으로 분리하지 않고 나중에 처리
const tableGroups = new Map<string, typeof config.batch_mappings>();
const fixedMappingsGlobal: typeof config.batch_mappings = [];
for (const mapping of config.batch_mappings) {
// 고정값 매핑은 별도로 모아둠 (FROM 소스가 필요 없음)
if (mapping.mapping_type === "fixed") {
fixedMappingsGlobal.push(mapping);
continue;
}
const key = `${mapping.from_connection_type}:${mapping.from_connection_id || "internal"}:${mapping.from_table_name}`;
if (!tableGroups.has(key)) {
tableGroups.set(key, []);
@@ -224,6 +232,14 @@ export class BatchSchedulerService {
tableGroups.get(key)!.push(mapping);
}
// 고정값 매핑만 있고 일반 매핑이 없는 경우 처리
if (tableGroups.size === 0 && fixedMappingsGlobal.length > 0) {
logger.warn(
`일반 매핑이 없고 고정값 매핑만 있습니다. 고정값만으로는 배치를 실행할 수 없습니다.`
);
return { totalRecords, successRecords, failedRecords };
}
// 각 테이블 그룹별로 처리
for (const [tableKey, mappings] of tableGroups) {
try {
@@ -244,10 +260,31 @@ export class BatchSchedulerService {
"./batchExternalDbService"
);
// auth_service_name이 설정된 경우 auth_tokens에서 토큰 조회
let apiKey = firstMapping.from_api_key || "";
if (config.auth_service_name) {
const tokenResult = await query<{ access_token: string }>(
`SELECT access_token FROM auth_tokens
WHERE service_name = $1
ORDER BY created_date DESC LIMIT 1`,
[config.auth_service_name]
);
if (tokenResult.length > 0 && tokenResult[0].access_token) {
apiKey = tokenResult[0].access_token;
logger.info(
`auth_tokens에서 토큰 조회 성공: ${config.auth_service_name}`
);
} else {
logger.warn(
`auth_tokens에서 토큰을 찾을 수 없음: ${config.auth_service_name}`
);
}
}
// 👇 Body 파라미터 추가 (POST 요청 시)
const apiResult = await BatchExternalDbService.getDataFromRestApi(
firstMapping.from_api_url!,
firstMapping.from_api_key!,
apiKey,
firstMapping.from_table_name,
(firstMapping.from_api_method as
| "GET"
@@ -266,7 +303,36 @@ export class BatchSchedulerService {
);
if (apiResult.success && apiResult.data) {
fromData = apiResult.data;
// 데이터 배열 경로가 설정되어 있으면 해당 경로에서 배열 추출
if (config.data_array_path) {
const extractArrayByPath = (obj: any, path: string): any[] => {
if (!path) return Array.isArray(obj) ? obj : [obj];
const keys = path.split(".");
let current = obj;
for (const key of keys) {
if (current === null || current === undefined) return [];
current = current[key];
}
return Array.isArray(current)
? current
: current
? [current]
: [];
};
// apiResult.data가 단일 객체인 경우 (API 응답 전체)
const rawData =
Array.isArray(apiResult.data) && apiResult.data.length === 1
? apiResult.data[0]
: apiResult.data;
fromData = extractArrayByPath(rawData, config.data_array_path);
logger.info(
`데이터 배열 경로 '${config.data_array_path}'에서 ${fromData.length}개 레코드 추출`
);
} else {
fromData = apiResult.data;
}
} else {
throw new Error(`REST API 데이터 조회 실패: ${apiResult.message}`);
}
@@ -298,6 +364,11 @@ export class BatchSchedulerService {
const mappedData = fromData.map((row) => {
const mappedRow: any = {};
for (const mapping of mappings) {
// 고정값 매핑은 이미 분리되어 있으므로 여기서는 처리하지 않음
if (mapping.mapping_type === "fixed") {
continue;
}
// DB → REST API 배치인지 확인
if (
firstMapping.to_connection_type === "restapi" &&
@@ -315,6 +386,13 @@ export class BatchSchedulerService {
}
}
// 고정값 매핑 적용 (전역으로 분리된 fixedMappingsGlobal 사용)
for (const fixedMapping of fixedMappingsGlobal) {
// from_column_name에 고정값이 저장되어 있음
mappedRow[fixedMapping.to_column_name] =
fixedMapping.from_column_name;
}
// 멀티테넌시: TO가 DB일 때 company_code 자동 주입
// - 배치 설정에 company_code가 있고
// - 매핑에서 company_code를 명시적으로 다루지 않은 경우만
@@ -384,12 +462,14 @@ export class BatchSchedulerService {
insertResult = { successCount: 0, failedCount: 0 };
}
} else {
// DB에 데이터 삽입
// DB에 데이터 삽입 (save_mode, conflict_key 지원)
insertResult = await BatchService.insertDataToTable(
firstMapping.to_table_name,
mappedData,
firstMapping.to_connection_type as "internal" | "external",
firstMapping.to_connection_id || undefined
firstMapping.to_connection_id || undefined,
(config.save_mode as "INSERT" | "UPSERT") || "INSERT",
config.conflict_key || undefined
);
}

View File

@@ -176,8 +176,8 @@ export class BatchService {
// 배치 설정 생성
const batchConfigResult = await client.query(
`INSERT INTO batch_configs
(batch_name, description, cron_schedule, is_active, company_code, created_by, created_date, updated_date)
VALUES ($1, $2, $3, $4, $5, $6, NOW(), NOW())
(batch_name, description, cron_schedule, is_active, company_code, save_mode, conflict_key, auth_service_name, data_array_path, created_by, created_date, updated_date)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, NOW(), NOW())
RETURNING *`,
[
data.batchName,
@@ -185,6 +185,10 @@ export class BatchService {
data.cronSchedule,
data.isActive || "Y",
data.companyCode,
data.saveMode || "INSERT",
data.conflictKey || null,
data.authServiceName || null,
data.dataArrayPath || null,
userId,
]
);
@@ -201,37 +205,38 @@ export class BatchService {
from_column_type, from_api_url, from_api_key, from_api_method, from_api_param_type,
from_api_param_name, from_api_param_value, from_api_param_source, from_api_body,
to_connection_type, to_connection_id, to_table_name, to_column_name, to_column_type,
to_api_url, to_api_key, to_api_method, to_api_body, mapping_order, created_by, created_date)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19, $20, $21, $22, $23, $24, $25, $26, NOW())
to_api_url, to_api_key, to_api_method, to_api_body, mapping_order, mapping_type, created_by, created_date)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19, $20, $21, $22, $23, $24, $25, $26, $27, NOW())
RETURNING *`,
[
batchConfig.id,
data.companyCode, // 멀티테넌시: 배치 설정과 동일한 company_code 사용
mapping.from_connection_type,
mapping.from_connection_id,
mapping.from_table_name,
mapping.from_column_name,
mapping.from_column_type,
mapping.from_api_url,
mapping.from_api_key,
mapping.from_api_method,
mapping.from_api_param_type,
mapping.from_api_param_name,
mapping.from_api_param_value,
mapping.from_api_param_source,
mapping.from_api_body, // FROM REST API Body
mapping.to_connection_type,
mapping.to_connection_id,
mapping.to_table_name,
mapping.to_column_name,
mapping.to_column_type,
mapping.to_api_url,
mapping.to_api_key,
mapping.to_api_method,
mapping.to_api_body,
mapping.mapping_order || index + 1,
userId,
]
batchConfig.id,
data.companyCode, // 멀티테넌시: 배치 설정과 동일한 company_code 사용
mapping.from_connection_type,
mapping.from_connection_id,
mapping.from_table_name,
mapping.from_column_name,
mapping.from_column_type,
mapping.from_api_url,
mapping.from_api_key,
mapping.from_api_method,
mapping.from_api_param_type,
mapping.from_api_param_name,
mapping.from_api_param_value,
mapping.from_api_param_source,
mapping.from_api_body, // FROM REST API Body
mapping.to_connection_type,
mapping.to_connection_id,
mapping.to_table_name,
mapping.to_column_name,
mapping.to_column_type,
mapping.to_api_url,
mapping.to_api_key,
mapping.to_api_method,
mapping.to_api_body,
mapping.mapping_order || index + 1,
mapping.mapping_type || "direct", // 매핑 타입: direct 또는 fixed
userId,
]
);
mappings.push(mappingResult.rows[0]);
}
@@ -311,6 +316,18 @@ export class BatchService {
updateFields.push(`is_active = $${paramIndex++}`);
updateValues.push(data.isActive);
}
if (data.saveMode !== undefined) {
updateFields.push(`save_mode = $${paramIndex++}`);
updateValues.push(data.saveMode);
}
if (data.conflictKey !== undefined) {
updateFields.push(`conflict_key = $${paramIndex++}`);
updateValues.push(data.conflictKey || null);
}
if (data.authServiceName !== undefined) {
updateFields.push(`auth_service_name = $${paramIndex++}`);
updateValues.push(data.authServiceName || null);
}
// 배치 설정 업데이트
const batchConfigResult = await client.query(
@@ -339,8 +356,8 @@ export class BatchService {
from_column_type, from_api_url, from_api_key, from_api_method, from_api_param_type,
from_api_param_name, from_api_param_value, from_api_param_source, from_api_body,
to_connection_type, to_connection_id, to_table_name, to_column_name, to_column_type,
to_api_url, to_api_key, to_api_method, to_api_body, mapping_order, created_by, created_date)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19, $20, $21, $22, $23, $24, $25, $26, NOW())
to_api_url, to_api_key, to_api_method, to_api_body, mapping_order, mapping_type, created_by, created_date)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19, $20, $21, $22, $23, $24, $25, $26, $27, NOW())
RETURNING *`,
[
id,
@@ -368,6 +385,7 @@ export class BatchService {
mapping.to_api_method,
mapping.to_api_body,
mapping.mapping_order || index + 1,
mapping.mapping_type || "direct", // 매핑 타입: direct 또는 fixed
userId,
]
);
@@ -554,9 +572,7 @@ export class BatchService {
try {
if (connectionType === "internal") {
// 내부 DB 데이터 조회
const data = await query<any>(
`SELECT * FROM ${tableName} LIMIT 10`
);
const data = await query<any>(`SELECT * FROM ${tableName} LIMIT 10`);
return {
success: true,
data,
@@ -729,19 +745,27 @@ export class BatchService {
/**
* 테이블에 데이터 삽입 (연결 타입에 따라 내부/외부 DB 구분)
* @param tableName 테이블명
* @param data 삽입할 데이터 배열
* @param connectionType 연결 타입 (internal/external)
* @param connectionId 외부 연결 ID
* @param saveMode 저장 모드 (INSERT/UPSERT)
* @param conflictKey UPSERT 시 충돌 기준 컬럼명
*/
static async insertDataToTable(
tableName: string,
data: any[],
connectionType: "internal" | "external" = "internal",
connectionId?: number
connectionId?: number,
saveMode: "INSERT" | "UPSERT" = "INSERT",
conflictKey?: string
): Promise<{
successCount: number;
failedCount: number;
}> {
try {
console.log(
`[BatchService] 테이블에 데이터 삽입: ${tableName} (${connectionType}${connectionId ? `:${connectionId}` : ""}), ${data.length}개 레코드`
`[BatchService] 테이블에 데이터 ${saveMode}: ${tableName} (${connectionType}${connectionId ? `:${connectionId}` : ""}), ${data.length}개 레코드${conflictKey ? `, 충돌키: ${conflictKey}` : ""}`
);
if (!data || data.length === 0) {
@@ -753,24 +777,45 @@ export class BatchService {
let successCount = 0;
let failedCount = 0;
// 각 레코드를 개별적으로 삽입 (UPSERT 방식으로 중복 처리)
// 각 레코드를 개별적으로 삽입
for (const record of data) {
try {
const columns = Object.keys(record);
const values = Object.values(record);
const placeholders = values
.map((_, i) => `$${i + 1}`)
.join(", ");
const placeholders = values.map((_, i) => `$${i + 1}`).join(", ");
const queryStr = `INSERT INTO ${tableName} (${columns.join(
", "
)}) VALUES (${placeholders})`;
let queryStr: string;
if (saveMode === "UPSERT" && conflictKey) {
// UPSERT 모드: ON CONFLICT DO UPDATE
// 충돌 키를 제외한 컬럼들만 UPDATE
const updateColumns = columns.filter(
(col) => col !== conflictKey
);
const updateSet = updateColumns
.map((col) => `${col} = EXCLUDED.${col}`)
.join(", ");
// updated_date 컬럼이 있으면 현재 시간으로 업데이트
const hasUpdatedDate = columns.includes("updated_date");
const finalUpdateSet = hasUpdatedDate
? `${updateSet}, updated_date = NOW()`
: updateSet;
queryStr = `INSERT INTO ${tableName} (${columns.join(", ")})
VALUES (${placeholders})
ON CONFLICT (${conflictKey})
DO UPDATE SET ${finalUpdateSet}`;
} else {
// INSERT 모드: 기존 방식
queryStr = `INSERT INTO ${tableName} (${columns.join(", ")}) VALUES (${placeholders})`;
}
await query(queryStr, values);
successCount++;
} catch (insertError) {
console.error(
`내부 DB 데이터 삽입 실패 (${tableName}):`,
`내부 DB 데이터 ${saveMode} 실패 (${tableName}):`,
insertError
);
failedCount++;
@@ -779,7 +824,13 @@ export class BatchService {
return { successCount, failedCount };
} else if (connectionType === "external" && connectionId) {
// 외부 DB에 데이터 삽입
// 외부 DB에 데이터 삽입 (UPSERT는 내부 DB만 지원)
if (saveMode === "UPSERT") {
console.warn(
`[BatchService] 외부 DB는 UPSERT를 지원하지 않습니다. INSERT로 실행합니다.`
);
}
const result = await BatchExternalDbService.insertDataToTable(
connectionId,
tableName,
@@ -799,7 +850,7 @@ export class BatchService {
);
}
} catch (error) {
console.error(`데이터 삽입 오류 (${tableName}):`, error);
console.error(`데이터 ${saveMode} 오류 (${tableName}):`, error);
return { successCount: 0, failedCount: data ? data.length : 0 };
}
}