flowExecutionService 트랜잭션 처리 개선 및 데이터 변경 추적 로직 수정

This commit is contained in:
dohyeons
2025-12-09 11:15:18 +09:00
parent 8d07458c94
commit 0aaab45329
5 changed files with 156 additions and 123 deletions

View File

@@ -72,6 +72,11 @@ export class FlowDataMoveService {
// 내부 DB 처리 (기존 로직)
return await db.transaction(async (client) => {
try {
// 트랜잭션 세션 변수 설정 (트리거에서 changed_by 기록용)
await client.query("SELECT set_config('app.user_id', $1, true)", [
userId || "system",
]);
// 1. 단계 정보 조회
const fromStep = await this.flowStepService.findById(fromStepId);
const toStep = await this.flowStepService.findById(toStepId);
@@ -684,6 +689,14 @@ export class FlowDataMoveService {
dbConnectionId,
async (externalClient, dbType) => {
try {
// 외부 DB가 PostgreSQL인 경우에만 세션 변수 설정 시도
if (dbType.toLowerCase() === "postgresql") {
await externalClient.query(
"SELECT set_config('app.user_id', $1, true)",
[userId || "system"]
);
}
// 1. 단계 정보 조회 (내부 DB에서)
const fromStep = await this.flowStepService.findById(fromStepId);
const toStep = await this.flowStepService.findById(toStepId);

View File

@@ -298,7 +298,9 @@ export class FlowExecutionService {
// 4. Primary Key 컬럼 결정 (기본값: id)
const primaryKeyColumn = flowDef.primaryKey || "id";
console.log(`🔍 [updateStepData] Updating table: ${tableName}, PK: ${primaryKeyColumn}=${recordId}`);
console.log(
`🔍 [updateStepData] Updating table: ${tableName}, PK: ${primaryKeyColumn}=${recordId}`
);
// 5. SET 절 생성
const updateColumns = Object.keys(updateData);
@@ -309,74 +311,86 @@ export class FlowExecutionService {
// 6. 외부 DB vs 내부 DB 구분
if (flowDef.dbSourceType === "external" && flowDef.dbConnectionId) {
// 외부 DB 업데이트
console.log("✅ [updateStepData] Using EXTERNAL DB:", flowDef.dbConnectionId);
console.log(
"✅ [updateStepData] Using EXTERNAL DB:",
flowDef.dbConnectionId
);
// 외부 DB 연결 정보 조회
const connectionResult = await db.query(
"SELECT * FROM external_db_connection WHERE id = $1",
[flowDef.dbConnectionId]
);
if (connectionResult.length === 0) {
throw new Error(`External DB connection not found: ${flowDef.dbConnectionId}`);
throw new Error(
`External DB connection not found: ${flowDef.dbConnectionId}`
);
}
const connection = connectionResult[0];
const dbType = connection.db_type?.toLowerCase();
// DB 타입에 따른 placeholder 및 쿼리 생성
let setClause: string;
let params: any[];
if (dbType === "mysql" || dbType === "mariadb") {
// MySQL/MariaDB: ? placeholder
setClause = updateColumns.map((col) => `\`${col}\` = ?`).join(", ");
params = [...Object.values(updateData), recordId];
} else if (dbType === "mssql") {
// MSSQL: @p1, @p2 placeholder
setClause = updateColumns.map((col, idx) => `[${col}] = @p${idx + 1}`).join(", ");
setClause = updateColumns
.map((col, idx) => `[${col}] = @p${idx + 1}`)
.join(", ");
params = [...Object.values(updateData), recordId];
} else {
// PostgreSQL: $1, $2 placeholder
setClause = updateColumns.map((col, idx) => `"${col}" = $${idx + 1}`).join(", ");
setClause = updateColumns
.map((col, idx) => `"${col}" = $${idx + 1}`)
.join(", ");
params = [...Object.values(updateData), recordId];
}
const updateQuery = `UPDATE ${tableName} SET ${setClause} WHERE ${primaryKeyColumn} = ${dbType === "mysql" || dbType === "mariadb" ? "?" : dbType === "mssql" ? `@p${params.length}` : `$${params.length}`}`;
console.log(`📝 [updateStepData] Query: ${updateQuery}`);
console.log(`📝 [updateStepData] Params:`, params);
await executeExternalQuery(flowDef.dbConnectionId, updateQuery, params);
} else {
// 내부 DB 업데이트
console.log("✅ [updateStepData] Using INTERNAL DB");
const setClause = updateColumns.map((col, idx) => `"${col}" = $${idx + 1}`).join(", ");
const setClause = updateColumns
.map((col, idx) => `"${col}" = $${idx + 1}`)
.join(", ");
const params = [...Object.values(updateData), recordId];
const updateQuery = `UPDATE "${tableName}" SET ${setClause} WHERE "${primaryKeyColumn}" = $${params.length}`;
console.log(`📝 [updateStepData] Query: ${updateQuery}`);
console.log(`📝 [updateStepData] Params:`, params);
// 트랜잭션으로 감싸서 사용자 ID 세션 변수 설정 후 업데이트 실행
// (트리거에서 changed_by를 기록하기 위함)
await db.query("BEGIN");
try {
await db.query(`SET LOCAL app.user_id = '${userId}'`);
await db.query(updateQuery, params);
await db.query("COMMIT");
} catch (txError) {
await db.query("ROLLBACK");
throw txError;
}
await db.transaction(async (client) => {
// 안전한 파라미터 바인딩 방식 사용
await client.query("SELECT set_config('app.user_id', $1, true)", [
userId,
]);
await client.query(updateQuery, params);
});
}
console.log(`✅ [updateStepData] Data updated successfully: ${tableName}.${primaryKeyColumn}=${recordId}`, {
updatedFields: updateColumns,
userId,
});
console.log(
`✅ [updateStepData] Data updated successfully: ${tableName}.${primaryKeyColumn}=${recordId}`,
{
updatedFields: updateColumns,
userId,
}
);
return { success: true };
} catch (error: any) {

View File

@@ -175,6 +175,12 @@ export class NodeFlowExecutionService {
try {
result = await transaction(async (client) => {
// 🔥 사용자 ID 세션 변수 설정 (트리거용)
const userId = context.buttonContext?.userId || "system";
await client.query("SELECT set_config('app.user_id', $1, true)", [
userId,
]);
// 트랜잭션 내에서 레벨별 실행
for (const level of levels) {
await this.executeLevel(level, nodes, edges, context, client);