Merge branch 'main' into lhj - Flow management system integration

This commit is contained in:
leeheejin
2025-10-20 18:08:11 +09:00
62 changed files with 12932 additions and 361 deletions

View File

@@ -56,6 +56,9 @@ import todoRoutes from "./routes/todoRoutes"; // To-Do 관리
import bookingRoutes from "./routes/bookingRoutes"; // 예약 요청 관리
import mapDataRoutes from "./routes/mapDataRoutes"; // 지도 데이터 관리
import yardLayoutRoutes from "./routes/yardLayoutRoutes"; // 야드 관리 3D
//import materialRoutes from "./routes/materialRoutes"; // 자재 관리
import flowRoutes from "./routes/flowRoutes"; // 플로우 관리
import flowExternalDbConnectionRoutes from "./routes/flowExternalDbConnectionRoutes"; // 플로우 전용 외부 DB 연결
import workHistoryRoutes from "./routes/workHistoryRoutes"; // 작업 이력 관리
import { BatchSchedulerService } from "./services/batchSchedulerService";
// import collectionRoutes from "./routes/collectionRoutes"; // 임시 주석
@@ -207,6 +210,9 @@ app.use("/api/todos", todoRoutes); // To-Do 관리
app.use("/api/bookings", bookingRoutes); // 예약 요청 관리
app.use("/api/map-data", mapDataRoutes); // 지도 데이터 조회
app.use("/api/yard-layouts", yardLayoutRoutes); // 야드 관리 3D
// app.use("/api/materials", materialRoutes); // 자재 관리 (임시 주석)
app.use("/api/flow-external-db", flowExternalDbConnectionRoutes); // 플로우 전용 외부 DB 연결
app.use("/api/flow", flowRoutes); // 플로우 관리 (마지막에 등록하여 다른 라우트와 충돌 방지)
app.use("/api/work-history", workHistoryRoutes); // 작업 이력 관리
// app.use("/api/collections", collectionRoutes); // 임시 주석
// app.use("/api/batch", batchRoutes); // 임시 주석

View File

@@ -0,0 +1,676 @@
/**
* 플로우 관리 컨트롤러
*/
import { Request, Response } from "express";
import { FlowDefinitionService } from "../services/flowDefinitionService";
import { FlowStepService } from "../services/flowStepService";
import { FlowConnectionService } from "../services/flowConnectionService";
import { FlowExecutionService } from "../services/flowExecutionService";
import { FlowDataMoveService } from "../services/flowDataMoveService";
export class FlowController {
private flowDefinitionService: FlowDefinitionService;
private flowStepService: FlowStepService;
private flowConnectionService: FlowConnectionService;
private flowExecutionService: FlowExecutionService;
private flowDataMoveService: FlowDataMoveService;
constructor() {
this.flowDefinitionService = new FlowDefinitionService();
this.flowStepService = new FlowStepService();
this.flowConnectionService = new FlowConnectionService();
this.flowExecutionService = new FlowExecutionService();
this.flowDataMoveService = new FlowDataMoveService();
}
// ==================== 플로우 정의 ====================
/**
* 플로우 정의 생성
*/
createFlowDefinition = async (req: Request, res: Response): Promise<void> => {
try {
const { name, description, tableName } = req.body;
const userId = (req as any).user?.userId || "system";
if (!name || !tableName) {
res.status(400).json({
success: false,
message: "Name and tableName are required",
});
return;
}
// 테이블 존재 확인
const tableExists =
await this.flowDefinitionService.checkTableExists(tableName);
if (!tableExists) {
res.status(400).json({
success: false,
message: `Table '${tableName}' does not exist`,
});
return;
}
const flowDef = await this.flowDefinitionService.create(
{ name, description, tableName },
userId
);
res.json({
success: true,
data: flowDef,
});
} catch (error: any) {
console.error("Error creating flow definition:", error);
res.status(500).json({
success: false,
message: error.message || "Failed to create flow definition",
});
}
};
/**
* 플로우 정의 목록 조회
*/
getFlowDefinitions = async (req: Request, res: Response): Promise<void> => {
try {
const { tableName, isActive } = req.query;
const flows = await this.flowDefinitionService.findAll(
tableName as string | undefined,
isActive !== undefined ? isActive === "true" : undefined
);
res.json({
success: true,
data: flows,
});
} catch (error: any) {
console.error("Error fetching flow definitions:", error);
res.status(500).json({
success: false,
message: error.message || "Failed to fetch flow definitions",
});
}
};
/**
* 플로우 정의 상세 조회 (단계 및 연결 포함)
*/
getFlowDefinitionDetail = async (
req: Request,
res: Response
): Promise<void> => {
try {
const { id } = req.params;
const flowId = parseInt(id);
const definition = await this.flowDefinitionService.findById(flowId);
if (!definition) {
res.status(404).json({
success: false,
message: "Flow definition not found",
});
return;
}
const steps = await this.flowStepService.findByFlowId(flowId);
const connections = await this.flowConnectionService.findByFlowId(flowId);
res.json({
success: true,
data: {
definition,
steps,
connections,
},
});
} catch (error: any) {
console.error("Error fetching flow definition detail:", error);
res.status(500).json({
success: false,
message: error.message || "Failed to fetch flow definition detail",
});
}
};
/**
* 플로우 정의 수정
*/
updateFlowDefinition = async (req: Request, res: Response): Promise<void> => {
try {
const { id } = req.params;
const flowId = parseInt(id);
const { name, description, isActive } = req.body;
const flowDef = await this.flowDefinitionService.update(flowId, {
name,
description,
isActive,
});
if (!flowDef) {
res.status(404).json({
success: false,
message: "Flow definition not found",
});
return;
}
res.json({
success: true,
data: flowDef,
});
} catch (error: any) {
console.error("Error updating flow definition:", error);
res.status(500).json({
success: false,
message: error.message || "Failed to update flow definition",
});
}
};
/**
* 플로우 정의 삭제
*/
deleteFlowDefinition = async (req: Request, res: Response): Promise<void> => {
try {
const { id } = req.params;
const flowId = parseInt(id);
const success = await this.flowDefinitionService.delete(flowId);
if (!success) {
res.status(404).json({
success: false,
message: "Flow definition not found",
});
return;
}
res.json({
success: true,
message: "Flow definition deleted successfully",
});
} catch (error: any) {
console.error("Error deleting flow definition:", error);
res.status(500).json({
success: false,
message: error.message || "Failed to delete flow definition",
});
}
};
// ==================== 플로우 단계 ====================
/**
* 플로우 단계 목록 조회
*/
getFlowSteps = async (req: Request, res: Response): Promise<void> => {
try {
const { flowId } = req.params;
const flowDefinitionId = parseInt(flowId);
const steps = await this.flowStepService.findByFlowId(flowDefinitionId);
res.json({
success: true,
data: steps,
});
return;
} catch (error: any) {
console.error("Error fetching flow steps:", error);
res.status(500).json({
success: false,
message: error.message || "Failed to fetch flow steps",
});
return;
}
};
/**
* 플로우 단계 생성
*/
createFlowStep = async (req: Request, res: Response): Promise<void> => {
try {
const { flowId } = req.params;
const flowDefinitionId = parseInt(flowId);
const {
stepName,
stepOrder,
tableName,
conditionJson,
color,
positionX,
positionY,
} = req.body;
if (!stepName || stepOrder === undefined) {
res.status(400).json({
success: false,
message: "stepName and stepOrder are required",
});
return;
}
const step = await this.flowStepService.create({
flowDefinitionId,
stepName,
stepOrder,
tableName,
conditionJson,
color,
positionX,
positionY,
});
res.json({
success: true,
data: step,
});
} catch (error: any) {
console.error("Error creating flow step:", error);
res.status(500).json({
success: false,
message: error.message || "Failed to create flow step",
});
}
};
/**
* 플로우 단계 수정
*/
updateFlowStep = async (req: Request, res: Response): Promise<void> => {
try {
const { stepId } = req.params;
const id = parseInt(stepId);
const {
stepName,
stepOrder,
tableName,
conditionJson,
color,
positionX,
positionY,
} = req.body;
const step = await this.flowStepService.update(id, {
stepName,
stepOrder,
tableName,
conditionJson,
color,
positionX,
positionY,
});
if (!step) {
res.status(404).json({
success: false,
message: "Flow step not found",
});
return;
}
res.json({
success: true,
data: step,
});
} catch (error: any) {
console.error("Error updating flow step:", error);
res.status(500).json({
success: false,
message: error.message || "Failed to update flow step",
});
}
};
/**
* 플로우 단계 삭제
*/
deleteFlowStep = async (req: Request, res: Response): Promise<void> => {
try {
const { stepId } = req.params;
const id = parseInt(stepId);
const success = await this.flowStepService.delete(id);
if (!success) {
res.status(404).json({
success: false,
message: "Flow step not found",
});
return;
}
res.json({
success: true,
message: "Flow step deleted successfully",
});
} catch (error: any) {
console.error("Error deleting flow step:", error);
res.status(500).json({
success: false,
message: error.message || "Failed to delete flow step",
});
}
};
// ==================== 플로우 연결 ====================
/**
* 플로우 연결 목록 조회
*/
getFlowConnections = async (req: Request, res: Response): Promise<void> => {
try {
const { flowId } = req.params;
const flowDefinitionId = parseInt(flowId);
const connections =
await this.flowConnectionService.findByFlowId(flowDefinitionId);
res.json({
success: true,
data: connections,
});
return;
} catch (error: any) {
console.error("Error fetching flow connections:", error);
res.status(500).json({
success: false,
message: error.message || "Failed to fetch flow connections",
});
return;
}
};
/**
* 플로우 연결 생성
*/
createConnection = async (req: Request, res: Response): Promise<void> => {
try {
const { flowDefinitionId, fromStepId, toStepId, label } = req.body;
if (!flowDefinitionId || !fromStepId || !toStepId) {
res.status(400).json({
success: false,
message: "flowDefinitionId, fromStepId, and toStepId are required",
});
return;
}
const connection = await this.flowConnectionService.create({
flowDefinitionId,
fromStepId,
toStepId,
label,
});
res.json({
success: true,
data: connection,
});
} catch (error: any) {
console.error("Error creating connection:", error);
res.status(500).json({
success: false,
message: error.message || "Failed to create connection",
});
}
};
/**
* 플로우 연결 삭제
*/
deleteConnection = async (req: Request, res: Response): Promise<void> => {
try {
const { connectionId } = req.params;
const id = parseInt(connectionId);
const success = await this.flowConnectionService.delete(id);
if (!success) {
res.status(404).json({
success: false,
message: "Connection not found",
});
return;
}
res.json({
success: true,
message: "Connection deleted successfully",
});
} catch (error: any) {
console.error("Error deleting connection:", error);
res.status(500).json({
success: false,
message: error.message || "Failed to delete connection",
});
}
};
// ==================== 플로우 실행 ====================
/**
* 단계별 데이터 카운트 조회
*/
getStepDataCount = async (req: Request, res: Response): Promise<void> => {
try {
const { flowId, stepId } = req.params;
const count = await this.flowExecutionService.getStepDataCount(
parseInt(flowId),
parseInt(stepId)
);
res.json({
success: true,
data: { count },
});
} catch (error: any) {
console.error("Error getting step data count:", error);
res.status(500).json({
success: false,
message: error.message || "Failed to get step data count",
});
}
};
/**
* 단계별 데이터 리스트 조회
*/
getStepDataList = async (req: Request, res: Response): Promise<void> => {
try {
const { flowId, stepId } = req.params;
const { page = 1, pageSize = 20 } = req.query;
const data = await this.flowExecutionService.getStepDataList(
parseInt(flowId),
parseInt(stepId),
parseInt(page as string),
parseInt(pageSize as string)
);
res.json({
success: true,
data,
});
} catch (error: any) {
console.error("Error getting step data list:", error);
res.status(500).json({
success: false,
message: error.message || "Failed to get step data list",
});
}
};
/**
* 플로우의 모든 단계별 카운트 조회
*/
getAllStepCounts = async (req: Request, res: Response): Promise<void> => {
try {
const { flowId } = req.params;
const counts = await this.flowExecutionService.getAllStepCounts(
parseInt(flowId)
);
res.json({
success: true,
data: counts,
});
} catch (error: any) {
console.error("Error getting all step counts:", error);
res.status(500).json({
success: false,
message: error.message || "Failed to get all step counts",
});
}
};
/**
* 데이터를 다음 단계로 이동
*/
moveData = async (req: Request, res: Response): Promise<void> => {
try {
const { flowId, recordId, toStepId, note } = req.body;
const userId = (req as any).user?.userId || "system";
if (!flowId || !recordId || !toStepId) {
res.status(400).json({
success: false,
message: "flowId, recordId, and toStepId are required",
});
return;
}
await this.flowDataMoveService.moveDataToStep(
flowId,
recordId,
toStepId,
userId,
note
);
res.json({
success: true,
message: "Data moved successfully",
});
} catch (error: any) {
console.error("Error moving data:", error);
res.status(500).json({
success: false,
message: error.message || "Failed to move data",
});
}
};
/**
* 여러 데이터를 동시에 이동
*/
moveBatchData = async (req: Request, res: Response): Promise<void> => {
try {
const { flowId, fromStepId, toStepId, dataIds } = req.body;
const userId = (req as any).user?.userId || "system";
if (
!flowId ||
!fromStepId ||
!toStepId ||
!dataIds ||
!Array.isArray(dataIds)
) {
res.status(400).json({
success: false,
message:
"flowId, fromStepId, toStepId, and dataIds (array) are required",
});
return;
}
const result = await this.flowDataMoveService.moveBatchData(
flowId,
fromStepId,
toStepId,
dataIds,
userId
);
const successCount = result.results.filter((r) => r.success).length;
const failureCount = result.results.filter((r) => !r.success).length;
res.json({
success: result.success,
message: result.success
? `${successCount}건의 데이터를 성공적으로 이동했습니다`
: `${successCount}건 성공, ${failureCount}건 실패`,
data: {
successCount,
failureCount,
total: dataIds.length,
},
results: result.results,
});
} catch (error: any) {
console.error("Error moving batch data:", error);
res.status(500).json({
success: false,
message: error.message || "Failed to move batch data",
});
}
};
/**
* 데이터의 플로우 이력 조회
*/
getAuditLogs = async (req: Request, res: Response): Promise<void> => {
try {
const { flowId, recordId } = req.params;
const logs = await this.flowDataMoveService.getAuditLogs(
parseInt(flowId),
recordId
);
res.json({
success: true,
data: logs,
});
} catch (error: any) {
console.error("Error getting audit logs:", error);
res.status(500).json({
success: false,
message: error.message || "Failed to get audit logs",
});
}
};
/**
* 플로우의 모든 이력 조회
*/
getFlowAuditLogs = async (req: Request, res: Response): Promise<void> => {
try {
const { flowId } = req.params;
const { limit = 100 } = req.query;
const logs = await this.flowDataMoveService.getFlowAuditLogs(
parseInt(flowId),
parseInt(limit as string)
);
res.json({
success: true,
data: logs,
});
} catch (error: any) {
console.error("Error getting flow audit logs:", error);
res.status(500).json({
success: false,
message: error.message || "Failed to get flow audit logs",
});
}
};
}

View File

@@ -0,0 +1,328 @@
import { Request, Response } from "express";
import { FlowExternalDbConnectionService } from "../services/flowExternalDbConnectionService";
import {
CreateFlowExternalDbConnectionRequest,
UpdateFlowExternalDbConnectionRequest,
} from "../types/flow";
import logger from "../utils/logger";
/**
* 플로우 전용 외부 DB 연결 컨트롤러
*/
export class FlowExternalDbConnectionController {
private service: FlowExternalDbConnectionService;
constructor() {
this.service = new FlowExternalDbConnectionService();
}
/**
* GET /api/flow/external-db-connections
* 모든 외부 DB 연결 목록 조회
*/
async getAll(req: Request, res: Response): Promise<void> {
try {
const activeOnly = req.query.activeOnly === "true";
const connections = await this.service.findAll(activeOnly);
res.json({
success: true,
data: connections,
message: `${connections.length}개의 외부 DB 연결을 조회했습니다`,
});
} catch (error: any) {
logger.error("외부 DB 연결 목록 조회 오류:", error);
res.status(500).json({
success: false,
message: "외부 DB 연결 목록 조회 중 오류가 발생했습니다",
error: error.message,
});
}
}
/**
* GET /api/flow/external-db-connections/:id
* 특정 외부 DB 연결 조회
*/
async getById(req: Request, res: Response): Promise<void> {
try {
const id = parseInt(req.params.id);
if (isNaN(id)) {
res.status(400).json({
success: false,
message: "유효하지 않은 연결 ID입니다",
});
return;
}
const connection = await this.service.findById(id);
if (!connection) {
res.status(404).json({
success: false,
message: "외부 DB 연결을 찾을 수 없습니다",
});
return;
}
res.json({
success: true,
data: connection,
});
} catch (error: any) {
logger.error("외부 DB 연결 조회 오류:", error);
res.status(500).json({
success: false,
message: "외부 DB 연결 조회 중 오류가 발생했습니다",
error: error.message,
});
}
}
/**
* POST /api/flow/external-db-connections
* 새 외부 DB 연결 생성
*/
async create(req: Request, res: Response): Promise<void> {
try {
const request: CreateFlowExternalDbConnectionRequest = req.body;
// 필수 필드 검증
if (
!request.name ||
!request.dbType ||
!request.host ||
!request.port ||
!request.databaseName ||
!request.username ||
!request.password
) {
res.status(400).json({
success: false,
message: "필수 필드가 누락되었습니다",
});
return;
}
const userId = (req as any).user?.userId || "system";
const connection = await this.service.create(request, userId);
logger.info(
`외부 DB 연결 생성: ${connection.name} (ID: ${connection.id})`
);
res.status(201).json({
success: true,
data: connection,
message: "외부 DB 연결이 생성되었습니다",
});
} catch (error: any) {
logger.error("외부 DB 연결 생성 오류:", error);
res.status(500).json({
success: false,
message: "외부 DB 연결 생성 중 오류가 발생했습니다",
error: error.message,
});
}
}
/**
* PUT /api/flow/external-db-connections/:id
* 외부 DB 연결 수정
*/
async update(req: Request, res: Response): Promise<void> {
try {
const id = parseInt(req.params.id);
if (isNaN(id)) {
res.status(400).json({
success: false,
message: "유효하지 않은 연결 ID입니다",
});
return;
}
const request: UpdateFlowExternalDbConnectionRequest = req.body;
const userId = (req as any).user?.userId || "system";
const connection = await this.service.update(id, request, userId);
if (!connection) {
res.status(404).json({
success: false,
message: "외부 DB 연결을 찾을 수 없습니다",
});
return;
}
logger.info(`외부 DB 연결 수정: ${connection.name} (ID: ${id})`);
res.json({
success: true,
data: connection,
message: "외부 DB 연결이 수정되었습니다",
});
} catch (error: any) {
logger.error("외부 DB 연결 수정 오류:", error);
res.status(500).json({
success: false,
message: "외부 DB 연결 수정 중 오류가 발생했습니다",
error: error.message,
});
}
}
/**
* DELETE /api/flow/external-db-connections/:id
* 외부 DB 연결 삭제
*/
async delete(req: Request, res: Response): Promise<void> {
try {
const id = parseInt(req.params.id);
if (isNaN(id)) {
res.status(400).json({
success: false,
message: "유효하지 않은 연결 ID입니다",
});
return;
}
const success = await this.service.delete(id);
if (!success) {
res.status(404).json({
success: false,
message: "외부 DB 연결을 찾을 수 없습니다",
});
return;
}
logger.info(`외부 DB 연결 삭제: ID ${id}`);
res.json({
success: true,
message: "외부 DB 연결이 삭제되었습니다",
});
} catch (error: any) {
logger.error("외부 DB 연결 삭제 오류:", error);
res.status(500).json({
success: false,
message: "외부 DB 연결 삭제 중 오류가 발생했습니다",
error: error.message,
});
}
}
/**
* POST /api/flow/external-db-connections/:id/test
* 외부 DB 연결 테스트
*/
async testConnection(req: Request, res: Response): Promise<void> {
try {
const id = parseInt(req.params.id);
if (isNaN(id)) {
res.status(400).json({
success: false,
message: "유효하지 않은 연결 ID입니다",
});
return;
}
const result = await this.service.testConnection(id);
if (result.success) {
logger.info(`외부 DB 연결 테스트 성공: ID ${id}`);
res.json({
success: true,
message: result.message,
});
} else {
logger.warn(`외부 DB 연결 테스트 실패: ID ${id} - ${result.message}`);
res.status(400).json({
success: false,
message: result.message,
});
}
} catch (error: any) {
logger.error("외부 DB 연결 테스트 오류:", error);
res.status(500).json({
success: false,
message: "외부 DB 연결 테스트 중 오류가 발생했습니다",
error: error.message,
});
}
}
/**
* GET /api/flow/external-db-connections/:id/tables
* 외부 DB의 테이블 목록 조회
*/
async getTables(req: Request, res: Response): Promise<void> {
try {
const id = parseInt(req.params.id);
if (isNaN(id)) {
res.status(400).json({
success: false,
message: "유효하지 않은 연결 ID입니다",
});
return;
}
const result = await this.service.getTables(id);
if (result.success) {
res.json(result);
} else {
res.status(400).json(result);
}
} catch (error: any) {
logger.error("외부 DB 테이블 목록 조회 오류:", error);
res.status(500).json({
success: false,
message: "외부 DB 테이블 목록 조회 중 오류가 발생했습니다",
error: error.message,
});
}
}
/**
* GET /api/flow/external-db-connections/:id/tables/:tableName/columns
* 외부 DB 특정 테이블의 컬럼 목록 조회
*/
async getTableColumns(req: Request, res: Response): Promise<void> {
try {
const id = parseInt(req.params.id);
const tableName = req.params.tableName;
if (isNaN(id)) {
res.status(400).json({
success: false,
message: "유효하지 않은 연결 ID입니다",
});
return;
}
if (!tableName) {
res.status(400).json({
success: false,
message: "테이블명이 필요합니다",
});
return;
}
const result = await this.service.getTableColumns(id, tableName);
if (result.success) {
res.json(result);
} else {
res.status(400).json(result);
}
} catch (error: any) {
logger.error("외부 DB 컬럼 목록 조회 오류:", error);
res.status(500).json({
success: false,
message: "외부 DB 컬럼 목록 조회 중 오류가 발생했습니다",
error: error.message,
});
}
}
}

View File

@@ -0,0 +1,48 @@
import { Router } from "express";
import { FlowExternalDbConnectionController } from "../controllers/flowExternalDbConnectionController";
import { authenticateToken } from "../middleware/authMiddleware";
const router = Router();
const controller = new FlowExternalDbConnectionController();
/**
* 플로우 전용 외부 DB 연결 라우트
* 기존 제어관리 외부 DB 연결과 별도
*/
// 모든 외부 DB 연결 목록 조회 (읽기 전용 - 인증 불필요)
// 민감한 정보(비밀번호)는 반환하지 않으므로 안전
router.get("/", (req, res) => controller.getAll(req, res));
// 특정 외부 DB 연결 조회
router.get("/:id", authenticateToken, (req, res) =>
controller.getById(req, res)
);
// 새 외부 DB 연결 생성
router.post("/", authenticateToken, (req, res) => controller.create(req, res));
// 외부 DB 연결 수정
router.put("/:id", authenticateToken, (req, res) =>
controller.update(req, res)
);
// 외부 DB 연결 삭제
router.delete("/:id", authenticateToken, (req, res) =>
controller.delete(req, res)
);
// 외부 DB 연결 테스트
router.post("/:id/test", authenticateToken, (req, res) =>
controller.testConnection(req, res)
);
// 외부 DB의 테이블 목록 조회 (읽기 전용 - 인증 불필요)
router.get("/:id/tables", (req, res) => controller.getTables(req, res));
// 외부 DB의 특정 테이블의 컬럼 목록 조회 (읽기 전용 - 인증 불필요)
router.get("/:id/tables/:tableName/columns", (req, res) =>
controller.getTableColumns(req, res)
);
export default router;

View File

@@ -0,0 +1,42 @@
/**
* 플로우 관리 라우터
*/
import { Router } from "express";
import { FlowController } from "../controllers/flowController";
const router = Router();
const flowController = new FlowController();
// ==================== 플로우 정의 ====================
router.post("/definitions", flowController.createFlowDefinition);
router.get("/definitions", flowController.getFlowDefinitions);
router.get("/definitions/:id", flowController.getFlowDefinitionDetail);
router.put("/definitions/:id", flowController.updateFlowDefinition);
router.delete("/definitions/:id", flowController.deleteFlowDefinition);
// ==================== 플로우 단계 ====================
router.get("/definitions/:flowId/steps", flowController.getFlowSteps); // 단계 목록 조회
router.post("/definitions/:flowId/steps", flowController.createFlowStep);
router.put("/steps/:stepId", flowController.updateFlowStep);
router.delete("/steps/:stepId", flowController.deleteFlowStep);
// ==================== 플로우 연결 ====================
router.get("/connections/:flowId", flowController.getFlowConnections); // 연결 목록 조회
router.post("/connections", flowController.createConnection);
router.delete("/connections/:connectionId", flowController.deleteConnection);
// ==================== 플로우 실행 ====================
router.get("/:flowId/step/:stepId/count", flowController.getStepDataCount);
router.get("/:flowId/step/:stepId/list", flowController.getStepDataList);
router.get("/:flowId/steps/counts", flowController.getAllStepCounts);
// ==================== 데이터 이동 ====================
router.post("/move", flowController.moveData);
router.post("/move-batch", flowController.moveBatchData);
// ==================== 오딧 로그 ====================
router.get("/audit/:flowId/:recordId", flowController.getAuditLogs);
router.get("/audit/:flowId", flowController.getFlowAuditLogs);
export default router;

View File

@@ -0,0 +1,215 @@
/**
* 플로우 조건 파서
* JSON 조건을 SQL WHERE 절로 변환
*/
import {
FlowCondition,
FlowConditionGroup,
SqlWhereResult,
} from "../types/flow";
export class FlowConditionParser {
/**
* 조건 JSON을 SQL WHERE 절로 변환
*/
static toSqlWhere(
conditionGroup: FlowConditionGroup | null | undefined
): SqlWhereResult {
if (
!conditionGroup ||
!conditionGroup.conditions ||
conditionGroup.conditions.length === 0
) {
return { where: "1=1", params: [] };
}
const conditions: string[] = [];
const params: any[] = [];
let paramIndex = 1;
for (const condition of conditionGroup.conditions) {
const column = this.sanitizeColumnName(condition.column);
switch (condition.operator) {
case "equals":
case "=":
conditions.push(`${column} = $${paramIndex}`);
params.push(condition.value);
paramIndex++;
break;
case "not_equals":
case "!=":
conditions.push(`${column} != $${paramIndex}`);
params.push(condition.value);
paramIndex++;
break;
case "in":
if (Array.isArray(condition.value) && condition.value.length > 0) {
const placeholders = condition.value
.map(() => `$${paramIndex++}`)
.join(", ");
conditions.push(`${column} IN (${placeholders})`);
params.push(...condition.value);
}
break;
case "not_in":
if (Array.isArray(condition.value) && condition.value.length > 0) {
const placeholders = condition.value
.map(() => `$${paramIndex++}`)
.join(", ");
conditions.push(`${column} NOT IN (${placeholders})`);
params.push(...condition.value);
}
break;
case "greater_than":
case ">":
conditions.push(`${column} > $${paramIndex}`);
params.push(condition.value);
paramIndex++;
break;
case "less_than":
case "<":
conditions.push(`${column} < $${paramIndex}`);
params.push(condition.value);
paramIndex++;
break;
case "greater_than_or_equal":
case ">=":
conditions.push(`${column} >= $${paramIndex}`);
params.push(condition.value);
paramIndex++;
break;
case "less_than_or_equal":
case "<=":
conditions.push(`${column} <= $${paramIndex}`);
params.push(condition.value);
paramIndex++;
break;
case "is_null":
conditions.push(`${column} IS NULL`);
break;
case "is_not_null":
conditions.push(`${column} IS NOT NULL`);
break;
case "like":
conditions.push(`${column} LIKE $${paramIndex}`);
params.push(`%${condition.value}%`);
paramIndex++;
break;
case "not_like":
conditions.push(`${column} NOT LIKE $${paramIndex}`);
params.push(`%${condition.value}%`);
paramIndex++;
break;
default:
throw new Error(`Unsupported operator: ${condition.operator}`);
}
}
if (conditions.length === 0) {
return { where: "1=1", params: [] };
}
const joinOperator = conditionGroup.type === "OR" ? " OR " : " AND ";
const where = conditions.join(joinOperator);
return { where, params };
}
/**
* SQL 인젝션 방지를 위한 컬럼명 검증
*/
private static sanitizeColumnName(columnName: string): string {
// 알파벳, 숫자, 언더스코어, 점(.)만 허용 (테이블명.컬럼명 형태 지원)
if (!/^[a-zA-Z0-9_.]+$/.test(columnName)) {
throw new Error(`Invalid column name: ${columnName}`);
}
return columnName;
}
/**
* 조건 검증
*/
static validateConditionGroup(conditionGroup: FlowConditionGroup): void {
if (!conditionGroup) {
throw new Error("Condition group is required");
}
if (!["AND", "OR"].includes(conditionGroup.type)) {
throw new Error("Condition group type must be AND or OR");
}
if (!Array.isArray(conditionGroup.conditions)) {
throw new Error("Conditions must be an array");
}
for (const condition of conditionGroup.conditions) {
this.validateCondition(condition);
}
}
/**
* 개별 조건 검증
*/
private static validateCondition(condition: FlowCondition): void {
if (!condition.column) {
throw new Error("Column name is required");
}
const validOperators = [
"equals",
"=",
"not_equals",
"!=",
"in",
"not_in",
"greater_than",
">",
"less_than",
"<",
"greater_than_or_equal",
">=",
"less_than_or_equal",
"<=",
"is_null",
"is_not_null",
"like",
"not_like",
];
if (!validOperators.includes(condition.operator)) {
throw new Error(`Invalid operator: ${condition.operator}`);
}
// is_null, is_not_null은 value가 필요 없음
if (!["is_null", "is_not_null"].includes(condition.operator)) {
if (condition.value === undefined || condition.value === null) {
throw new Error(
`Value is required for operator: ${condition.operator}`
);
}
}
// in, not_in은 배열이어야 함
if (["in", "not_in"].includes(condition.operator)) {
if (!Array.isArray(condition.value) || condition.value.length === 0) {
throw new Error(
`Operator ${condition.operator} requires a non-empty array value`
);
}
}
}
}

View File

@@ -0,0 +1,166 @@
/**
* 플로우 연결 서비스
*/
import db from "../database/db";
import { FlowStepConnection, CreateFlowConnectionRequest } from "../types/flow";
export class FlowConnectionService {
/**
* 플로우 단계 연결 생성
*/
async create(
request: CreateFlowConnectionRequest
): Promise<FlowStepConnection> {
// 순환 참조 체크
if (
await this.wouldCreateCycle(
request.flowDefinitionId,
request.fromStepId,
request.toStepId
)
) {
throw new Error(
"Creating this connection would create a cycle in the flow"
);
}
const query = `
INSERT INTO flow_step_connection (
flow_definition_id, from_step_id, to_step_id, label
)
VALUES ($1, $2, $3, $4)
RETURNING *
`;
const result = await db.query(query, [
request.flowDefinitionId,
request.fromStepId,
request.toStepId,
request.label || null,
]);
return this.mapToFlowConnection(result[0]);
}
/**
* 특정 플로우의 모든 연결 조회
*/
async findByFlowId(flowDefinitionId: number): Promise<FlowStepConnection[]> {
const query = `
SELECT * FROM flow_step_connection
WHERE flow_definition_id = $1
ORDER BY id ASC
`;
const result = await db.query(query, [flowDefinitionId]);
return result.map(this.mapToFlowConnection);
}
/**
* 플로우 연결 단일 조회
*/
async findById(id: number): Promise<FlowStepConnection | null> {
const query = "SELECT * FROM flow_step_connection WHERE id = $1";
const result = await db.query(query, [id]);
if (result.length === 0) {
return null;
}
return this.mapToFlowConnection(result[0]);
}
/**
* 플로우 연결 삭제
*/
async delete(id: number): Promise<boolean> {
const query = "DELETE FROM flow_step_connection WHERE id = $1 RETURNING id";
const result = await db.query(query, [id]);
return result.length > 0;
}
/**
* 특정 단계에서 나가는 연결 조회
*/
async findOutgoingConnections(stepId: number): Promise<FlowStepConnection[]> {
const query = `
SELECT * FROM flow_step_connection
WHERE from_step_id = $1
ORDER BY id ASC
`;
const result = await db.query(query, [stepId]);
return result.map(this.mapToFlowConnection);
}
/**
* 특정 단계로 들어오는 연결 조회
*/
async findIncomingConnections(stepId: number): Promise<FlowStepConnection[]> {
const query = `
SELECT * FROM flow_step_connection
WHERE to_step_id = $1
ORDER BY id ASC
`;
const result = await db.query(query, [stepId]);
return result.map(this.mapToFlowConnection);
}
/**
* 순환 참조 체크 (DFS)
*/
private async wouldCreateCycle(
flowDefinitionId: number,
fromStepId: number,
toStepId: number
): Promise<boolean> {
// toStepId에서 출발해서 fromStepId에 도달할 수 있는지 확인
const visited = new Set<number>();
const stack = [toStepId];
while (stack.length > 0) {
const current = stack.pop()!;
if (current === fromStepId) {
return true; // 순환 발견
}
if (visited.has(current)) {
continue;
}
visited.add(current);
// 현재 노드에서 나가는 모든 연결 조회
const query = `
SELECT to_step_id
FROM flow_step_connection
WHERE flow_definition_id = $1 AND from_step_id = $2
`;
const result = await db.query(query, [flowDefinitionId, current]);
for (const row of result) {
stack.push(row.to_step_id);
}
}
return false; // 순환 없음
}
/**
* DB 행을 FlowStepConnection 객체로 변환
*/
private mapToFlowConnection(row: any): FlowStepConnection {
return {
id: row.id,
flowDefinitionId: row.flow_definition_id,
fromStepId: row.from_step_id,
toStepId: row.to_step_id,
label: row.label,
createdAt: row.created_at,
};
}
}

View File

@@ -0,0 +1,593 @@
/**
* 플로우 데이터 이동 서비스 (하이브리드 방식 지원)
* - 상태 변경 방식: 같은 테이블 내에서 상태 컬럼 업데이트
* - 테이블 이동 방식: 다른 테이블로 데이터 복사 및 매핑
* - 하이브리드 방식: 두 가지 모두 수행
*/
import db from "../database/db";
import { FlowAuditLog, FlowIntegrationContext } from "../types/flow";
import { FlowDefinitionService } from "./flowDefinitionService";
import { FlowStepService } from "./flowStepService";
import { FlowExternalDbIntegrationService } from "./flowExternalDbIntegrationService";
export class FlowDataMoveService {
private flowDefinitionService: FlowDefinitionService;
private flowStepService: FlowStepService;
private externalDbIntegrationService: FlowExternalDbIntegrationService;
constructor() {
this.flowDefinitionService = new FlowDefinitionService();
this.flowStepService = new FlowStepService();
this.externalDbIntegrationService = new FlowExternalDbIntegrationService();
}
/**
* 데이터를 다음 플로우 단계로 이동 (하이브리드 지원)
*/
async moveDataToStep(
flowId: number,
fromStepId: number,
toStepId: number,
dataId: any,
userId: string = "system",
additionalData?: Record<string, any>
): Promise<{ success: boolean; targetDataId?: any; message?: string }> {
return await db.transaction(async (client) => {
try {
// 1. 단계 정보 조회
const fromStep = await this.flowStepService.findById(fromStepId);
const toStep = await this.flowStepService.findById(toStepId);
if (!fromStep || !toStep) {
throw new Error("유효하지 않은 단계입니다");
}
let targetDataId = dataId;
let sourceTable = fromStep.tableName;
let targetTable = toStep.tableName || fromStep.tableName;
// 2. 이동 방식에 따라 처리
switch (toStep.moveType || "status") {
case "status":
// 상태 변경 방식
await this.moveByStatusChange(
client,
fromStep,
toStep,
dataId,
additionalData
);
break;
case "table":
// 테이블 이동 방식
targetDataId = await this.moveByTableTransfer(
client,
fromStep,
toStep,
dataId,
additionalData
);
targetTable = toStep.targetTable || toStep.tableName;
break;
case "both":
// 하이브리드 방식: 둘 다 수행
await this.moveByStatusChange(
client,
fromStep,
toStep,
dataId,
additionalData
);
targetDataId = await this.moveByTableTransfer(
client,
fromStep,
toStep,
dataId,
additionalData
);
targetTable = toStep.targetTable || toStep.tableName;
break;
default:
throw new Error(`지원하지 않는 이동 방식: ${toStep.moveType}`);
}
// 3. 매핑 테이블 업데이트 (테이블 이동 방식일 때)
if (toStep.moveType === "table" || toStep.moveType === "both") {
await this.updateDataMapping(
client,
flowId,
toStepId,
fromStepId,
dataId,
targetDataId
);
}
// 4. 외부 DB 연동 실행 (설정된 경우)
if (
toStep.integrationType &&
toStep.integrationType !== "internal" &&
toStep.integrationConfig
) {
await this.executeExternalIntegration(
toStep,
flowId,
targetDataId,
sourceTable,
userId,
additionalData
);
}
// 5. 감사 로그 기록
await this.logDataMove(client, {
flowId,
fromStepId,
toStepId,
moveType: toStep.moveType || "status",
sourceTable,
targetTable,
sourceDataId: String(dataId),
targetDataId: String(targetDataId),
statusFrom: fromStep.statusValue,
statusTo: toStep.statusValue,
userId,
});
return {
success: true,
targetDataId,
message: "데이터가 성공적으로 이동되었습니다",
};
} catch (error: any) {
console.error("데이터 이동 실패:", error);
throw error;
}
});
}
/**
* 상태 변경 방식으로 데이터 이동
*/
private async moveByStatusChange(
client: any,
fromStep: any,
toStep: any,
dataId: any,
additionalData?: Record<string, any>
): Promise<void> {
const statusColumn = toStep.statusColumn || "flow_status";
const tableName = fromStep.tableName;
// 추가 필드 업데이트 준비
const updates: string[] = [`${statusColumn} = $2`, `updated_at = NOW()`];
const values: any[] = [dataId, toStep.statusValue];
let paramIndex = 3;
// 추가 데이터가 있으면 함께 업데이트
if (additionalData) {
for (const [key, value] of Object.entries(additionalData)) {
updates.push(`${key} = $${paramIndex}`);
values.push(value);
paramIndex++;
}
}
const updateQuery = `
UPDATE ${tableName}
SET ${updates.join(", ")}
WHERE id = $1
`;
const result = await client.query(updateQuery, values);
if (result.rowCount === 0) {
throw new Error(`데이터를 찾을 수 없습니다: ${dataId}`);
}
}
/**
* 테이블 이동 방식으로 데이터 이동
*/
private async moveByTableTransfer(
client: any,
fromStep: any,
toStep: any,
dataId: any,
additionalData?: Record<string, any>
): Promise<any> {
const sourceTable = fromStep.tableName;
const targetTable = toStep.targetTable || toStep.tableName;
const fieldMappings = toStep.fieldMappings || {};
// 1. 소스 데이터 조회
const selectQuery = `SELECT * FROM ${sourceTable} WHERE id = $1`;
const sourceResult = await client.query(selectQuery, [dataId]);
if (sourceResult.length === 0) {
throw new Error(`소스 데이터를 찾을 수 없습니다: ${dataId}`);
}
const sourceData = sourceResult[0];
// 2. 필드 매핑 적용
const mappedData: Record<string, any> = {};
// 매핑 정의가 있으면 적용
for (const [sourceField, targetField] of Object.entries(fieldMappings)) {
if (sourceData[sourceField] !== undefined) {
mappedData[targetField as string] = sourceData[sourceField];
}
}
// 추가 데이터 병합
if (additionalData) {
Object.assign(mappedData, additionalData);
}
// 3. 타겟 테이블에 데이터 삽입
if (Object.keys(mappedData).length === 0) {
throw new Error("매핑할 데이터가 없습니다");
}
const columns = Object.keys(mappedData);
const values = Object.values(mappedData);
const placeholders = columns.map((_, i) => `$${i + 1}`).join(", ");
const insertQuery = `
INSERT INTO ${targetTable} (${columns.join(", ")})
VALUES (${placeholders})
RETURNING id
`;
const insertResult = await client.query(insertQuery, values);
return insertResult[0].id;
}
/**
* 데이터 매핑 테이블 업데이트
*/
private async updateDataMapping(
client: any,
flowId: number,
currentStepId: number,
prevStepId: number,
sourceDataId: any,
targetDataId: any
): Promise<void> {
// 기존 매핑 조회
const selectQuery = `
SELECT id, step_data_map
FROM flow_data_mapping
WHERE flow_definition_id = $1
AND step_data_map->$2 = $3
`;
const mappingResult = await client.query(selectQuery, [
flowId,
String(prevStepId),
JSON.stringify(String(sourceDataId)),
]);
const stepDataMap: Record<string, string> =
mappingResult.length > 0 ? mappingResult[0].step_data_map : {};
// 새 단계 데이터 추가
stepDataMap[String(currentStepId)] = String(targetDataId);
if (mappingResult.length > 0) {
// 기존 매핑 업데이트
const updateQuery = `
UPDATE flow_data_mapping
SET current_step_id = $1,
step_data_map = $2,
updated_at = NOW()
WHERE id = $3
`;
await client.query(updateQuery, [
currentStepId,
JSON.stringify(stepDataMap),
mappingResult[0].id,
]);
} else {
// 새 매핑 생성
const insertQuery = `
INSERT INTO flow_data_mapping
(flow_definition_id, current_step_id, step_data_map)
VALUES ($1, $2, $3)
`;
await client.query(insertQuery, [
flowId,
currentStepId,
JSON.stringify(stepDataMap),
]);
}
}
/**
* 감사 로그 기록
*/
private async logDataMove(client: any, params: any): Promise<void> {
const query = `
INSERT INTO flow_audit_log (
flow_definition_id, from_step_id, to_step_id,
move_type, source_table, target_table,
source_data_id, target_data_id,
status_from, status_to,
changed_by, note
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12)
`;
await client.query(query, [
params.flowId,
params.fromStepId,
params.toStepId,
params.moveType,
params.sourceTable,
params.targetTable,
params.sourceDataId,
params.targetDataId,
params.statusFrom,
params.statusTo,
params.userId,
params.note || null,
]);
}
/**
* 여러 데이터를 동시에 다음 단계로 이동
*/
async moveBatchData(
flowId: number,
fromStepId: number,
toStepId: number,
dataIds: any[],
userId: string = "system"
): Promise<{ success: boolean; results: any[] }> {
const results = [];
for (const dataId of dataIds) {
try {
const result = await this.moveDataToStep(
flowId,
fromStepId,
toStepId,
dataId,
userId
);
results.push({ dataId, ...result });
} catch (error: any) {
results.push({ dataId, success: false, message: error.message });
}
}
return {
success: results.every((r) => r.success),
results,
};
}
/**
* 데이터의 플로우 이력 조회
*/
async getAuditLogs(flowId: number, dataId: string): Promise<FlowAuditLog[]> {
const query = `
SELECT
fal.*,
fs_from.step_name as from_step_name,
fs_to.step_name as to_step_name
FROM flow_audit_log fal
LEFT JOIN flow_step fs_from ON fal.from_step_id = fs_from.id
LEFT JOIN flow_step fs_to ON fal.to_step_id = fs_to.id
WHERE fal.flow_definition_id = $1
AND (fal.source_data_id = $2 OR fal.target_data_id = $2)
ORDER BY fal.changed_at DESC
`;
const result = await db.query(query, [flowId, dataId]);
return result.map((row) => ({
id: row.id,
flowDefinitionId: row.flow_definition_id,
tableName: row.table_name || row.source_table,
recordId: row.record_id || row.source_data_id,
fromStepId: row.from_step_id,
toStepId: row.to_step_id,
changedBy: row.changed_by,
changedAt: row.changed_at,
note: row.note,
fromStepName: row.from_step_name,
toStepName: row.to_step_name,
moveType: row.move_type,
sourceTable: row.source_table,
targetTable: row.target_table,
sourceDataId: row.source_data_id,
targetDataId: row.target_data_id,
statusFrom: row.status_from,
statusTo: row.status_to,
}));
}
/**
* 특정 플로우의 모든 이력 조회
*/
async getFlowAuditLogs(
flowId: number,
limit: number = 100
): Promise<FlowAuditLog[]> {
const query = `
SELECT
fal.*,
fs_from.step_name as from_step_name,
fs_to.step_name as to_step_name
FROM flow_audit_log fal
LEFT JOIN flow_step fs_from ON fal.from_step_id = fs_from.id
LEFT JOIN flow_step fs_to ON fal.to_step_id = fs_to.id
WHERE fal.flow_definition_id = $1
ORDER BY fal.changed_at DESC
LIMIT $2
`;
const result = await db.query(query, [flowId, limit]);
return result.map((row) => ({
id: row.id,
flowDefinitionId: row.flow_definition_id,
tableName: row.table_name || row.source_table,
recordId: row.record_id || row.source_data_id,
fromStepId: row.from_step_id,
toStepId: row.to_step_id,
changedBy: row.changed_by,
changedAt: row.changed_at,
note: row.note,
fromStepName: row.from_step_name,
toStepName: row.to_step_name,
moveType: row.move_type,
sourceTable: row.source_table,
targetTable: row.target_table,
sourceDataId: row.source_data_id,
targetDataId: row.target_data_id,
statusFrom: row.status_from,
statusTo: row.status_to,
}));
}
/**
* 외부 DB 연동 실행
*/
private async executeExternalIntegration(
toStep: any,
flowId: number,
dataId: any,
tableName: string | undefined,
userId: string,
additionalData?: Record<string, any>
): Promise<void> {
const startTime = Date.now();
try {
// 연동 컨텍스트 구성
const context: FlowIntegrationContext = {
flowId,
stepId: toStep.id,
dataId,
tableName,
currentUser: userId,
variables: {
...additionalData,
stepName: toStep.stepName,
stepId: toStep.id,
},
};
// 연동 타입별 처리
switch (toStep.integrationType) {
case "external_db":
const result = await this.externalDbIntegrationService.execute(
context,
toStep.integrationConfig
);
// 연동 로그 기록
await this.logIntegration(
flowId,
toStep.id,
dataId,
toStep.integrationType,
toStep.integrationConfig.connectionId,
toStep.integrationConfig,
result.data,
result.success ? "success" : "failed",
result.error?.message,
Date.now() - startTime,
userId
);
if (!result.success) {
throw new Error(
`외부 DB 연동 실패: ${result.error?.message || "알 수 없는 오류"}`
);
}
break;
case "rest_api":
// REST API 연동 (추후 구현)
console.warn("REST API 연동은 아직 구현되지 않았습니다");
break;
case "webhook":
// Webhook 연동 (추후 구현)
console.warn("Webhook 연동은 아직 구현되지 않았습니다");
break;
case "hybrid":
// 복합 연동 (추후 구현)
console.warn("복합 연동은 아직 구현되지 않았습니다");
break;
default:
throw new Error(`지원하지 않는 연동 타입: ${toStep.integrationType}`);
}
} catch (error: any) {
console.error("외부 연동 실행 실패:", error);
// 연동 실패 로그 기록
await this.logIntegration(
flowId,
toStep.id,
dataId,
toStep.integrationType,
toStep.integrationConfig?.connectionId,
toStep.integrationConfig,
null,
"failed",
error.message,
Date.now() - startTime,
userId
);
throw error;
}
}
/**
* 외부 연동 로그 기록
*/
private async logIntegration(
flowId: number,
stepId: number,
dataId: any,
integrationType: string,
connectionId: number | undefined,
requestPayload: any,
responsePayload: any,
status: "success" | "failed" | "timeout" | "rollback",
errorMessage: string | undefined,
executionTimeMs: number,
userId: string
): Promise<void> {
const query = `
INSERT INTO flow_integration_log (
flow_definition_id, step_id, data_id, integration_type, connection_id,
request_payload, response_payload, status, error_message,
execution_time_ms, executed_by, executed_at
)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, NOW())
`;
await db.query(query, [
flowId,
stepId,
String(dataId),
integrationType,
connectionId || null,
requestPayload ? JSON.stringify(requestPayload) : null,
responsePayload ? JSON.stringify(responsePayload) : null,
status,
errorMessage || null,
executionTimeMs,
userId,
]);
}
}

View File

@@ -0,0 +1,171 @@
/**
* 플로우 정의 서비스
*/
import db from "../database/db";
import {
FlowDefinition,
CreateFlowDefinitionRequest,
UpdateFlowDefinitionRequest,
} from "../types/flow";
export class FlowDefinitionService {
/**
* 플로우 정의 생성
*/
async create(
request: CreateFlowDefinitionRequest,
userId: string
): Promise<FlowDefinition> {
const query = `
INSERT INTO flow_definition (name, description, table_name, created_by)
VALUES ($1, $2, $3, $4)
RETURNING *
`;
const result = await db.query(query, [
request.name,
request.description || null,
request.tableName,
userId,
]);
return this.mapToFlowDefinition(result[0]);
}
/**
* 플로우 정의 목록 조회
*/
async findAll(
tableName?: string,
isActive?: boolean
): Promise<FlowDefinition[]> {
let query = "SELECT * FROM flow_definition WHERE 1=1";
const params: any[] = [];
let paramIndex = 1;
if (tableName) {
query += ` AND table_name = $${paramIndex}`;
params.push(tableName);
paramIndex++;
}
if (isActive !== undefined) {
query += ` AND is_active = $${paramIndex}`;
params.push(isActive);
paramIndex++;
}
query += " ORDER BY created_at DESC";
const result = await db.query(query, params);
return result.map(this.mapToFlowDefinition);
}
/**
* 플로우 정의 단일 조회
*/
async findById(id: number): Promise<FlowDefinition | null> {
const query = "SELECT * FROM flow_definition WHERE id = $1";
const result = await db.query(query, [id]);
if (result.length === 0) {
return null;
}
return this.mapToFlowDefinition(result[0]);
}
/**
* 플로우 정의 수정
*/
async update(
id: number,
request: UpdateFlowDefinitionRequest
): Promise<FlowDefinition | null> {
const fields: string[] = [];
const params: any[] = [];
let paramIndex = 1;
if (request.name !== undefined) {
fields.push(`name = $${paramIndex}`);
params.push(request.name);
paramIndex++;
}
if (request.description !== undefined) {
fields.push(`description = $${paramIndex}`);
params.push(request.description);
paramIndex++;
}
if (request.isActive !== undefined) {
fields.push(`is_active = $${paramIndex}`);
params.push(request.isActive);
paramIndex++;
}
if (fields.length === 0) {
return this.findById(id);
}
fields.push(`updated_at = NOW()`);
const query = `
UPDATE flow_definition
SET ${fields.join(", ")}
WHERE id = $${paramIndex}
RETURNING *
`;
params.push(id);
const result = await db.query(query, params);
if (result.length === 0) {
return null;
}
return this.mapToFlowDefinition(result[0]);
}
/**
* 플로우 정의 삭제
*/
async delete(id: number): Promise<boolean> {
const query = "DELETE FROM flow_definition WHERE id = $1 RETURNING id";
const result = await db.query(query, [id]);
return result.length > 0;
}
/**
* 테이블 존재 여부 확인
*/
async checkTableExists(tableName: string): Promise<boolean> {
const query = `
SELECT EXISTS (
SELECT FROM information_schema.tables
WHERE table_schema = 'public'
AND table_name = $1
) as exists
`;
const result = await db.query(query, [tableName]);
return result[0].exists;
}
/**
* DB 행을 FlowDefinition 객체로 변환
*/
private mapToFlowDefinition(row: any): FlowDefinition {
return {
id: row.id,
name: row.name,
description: row.description,
tableName: row.table_name,
isActive: row.is_active,
createdBy: row.created_by,
createdAt: row.created_at,
updatedAt: row.updated_at,
};
}
}

View File

@@ -0,0 +1,176 @@
/**
* 플로우 실행 서비스
* 단계별 데이터 카운트 및 리스트 조회
*/
import db from "../database/db";
import { FlowStepDataCount, FlowStepDataList } from "../types/flow";
import { FlowDefinitionService } from "./flowDefinitionService";
import { FlowStepService } from "./flowStepService";
import { FlowConditionParser } from "./flowConditionParser";
export class FlowExecutionService {
private flowDefinitionService: FlowDefinitionService;
private flowStepService: FlowStepService;
constructor() {
this.flowDefinitionService = new FlowDefinitionService();
this.flowStepService = new FlowStepService();
}
/**
* 특정 플로우 단계에 해당하는 데이터 카운트
*/
async getStepDataCount(flowId: number, stepId: number): Promise<number> {
// 1. 플로우 정의 조회
const flowDef = await this.flowDefinitionService.findById(flowId);
if (!flowDef) {
throw new Error(`Flow definition not found: ${flowId}`);
}
// 2. 플로우 단계 조회
const step = await this.flowStepService.findById(stepId);
if (!step) {
throw new Error(`Flow step not found: ${stepId}`);
}
if (step.flowDefinitionId !== flowId) {
throw new Error(`Step ${stepId} does not belong to flow ${flowId}`);
}
// 3. 테이블명 결정: 단계에 지정된 테이블이 있으면 사용, 없으면 플로우의 기본 테이블 사용
const tableName = step.tableName || flowDef.tableName;
// 4. 조건 JSON을 SQL WHERE절로 변환
const { where, params } = FlowConditionParser.toSqlWhere(
step.conditionJson
);
// 5. 카운트 쿼리 실행
const query = `SELECT COUNT(*) as count FROM ${tableName} WHERE ${where}`;
const result = await db.query(query, params);
return parseInt(result[0].count);
}
/**
* 특정 플로우 단계에 해당하는 데이터 리스트
*/
async getStepDataList(
flowId: number,
stepId: number,
page: number = 1,
pageSize: number = 20
): Promise<FlowStepDataList> {
// 1. 플로우 정의 조회
const flowDef = await this.flowDefinitionService.findById(flowId);
if (!flowDef) {
throw new Error(`Flow definition not found: ${flowId}`);
}
// 2. 플로우 단계 조회
const step = await this.flowStepService.findById(stepId);
if (!step) {
throw new Error(`Flow step not found: ${stepId}`);
}
if (step.flowDefinitionId !== flowId) {
throw new Error(`Step ${stepId} does not belong to flow ${flowId}`);
}
// 3. 테이블명 결정: 단계에 지정된 테이블이 있으면 사용, 없으면 플로우의 기본 테이블 사용
const tableName = step.tableName || flowDef.tableName;
// 4. 조건 JSON을 SQL WHERE절로 변환
const { where, params } = FlowConditionParser.toSqlWhere(
step.conditionJson
);
const offset = (page - 1) * pageSize;
// 5. 전체 카운트
const countQuery = `SELECT COUNT(*) as count FROM ${tableName} WHERE ${where}`;
const countResult = await db.query(countQuery, params);
const total = parseInt(countResult[0].count);
// 6. 테이블의 Primary Key 컬럼 찾기
let orderByColumn = "";
try {
const pkQuery = `
SELECT a.attname
FROM pg_index i
JOIN pg_attribute a ON a.attrelid = i.indrelid AND a.attnum = ANY(i.indkey)
WHERE i.indrelid = $1::regclass
AND i.indisprimary
LIMIT 1
`;
const pkResult = await db.query(pkQuery, [tableName]);
if (pkResult.length > 0) {
orderByColumn = pkResult[0].attname;
}
} catch (err) {
// Primary Key를 찾지 못하면 ORDER BY 없이 진행
console.warn(`Could not find primary key for table ${tableName}:`, err);
}
// 7. 데이터 조회
const orderByClause = orderByColumn ? `ORDER BY ${orderByColumn} DESC` : "";
const dataQuery = `
SELECT * FROM ${tableName}
WHERE ${where}
${orderByClause}
LIMIT $${params.length + 1} OFFSET $${params.length + 2}
`;
const dataResult = await db.query(dataQuery, [...params, pageSize, offset]);
return {
records: dataResult,
total,
page,
pageSize,
};
}
/**
* 플로우의 모든 단계별 데이터 카운트
*/
async getAllStepCounts(flowId: number): Promise<FlowStepDataCount[]> {
const steps = await this.flowStepService.findByFlowId(flowId);
const counts: FlowStepDataCount[] = [];
for (const step of steps) {
const count = await this.getStepDataCount(flowId, step.id);
counts.push({
stepId: step.id,
count,
});
}
return counts;
}
/**
* 특정 레코드의 현재 플로우 상태 조회
*/
async getCurrentStatus(
flowId: number,
recordId: string
): Promise<{ currentStepId: number | null; tableName: string } | null> {
const query = `
SELECT current_step_id, table_name
FROM flow_data_status
WHERE flow_definition_id = $1 AND record_id = $2
`;
const result = await db.query(query, [flowId, recordId]);
if (result.length === 0) {
return null;
}
return {
currentStepId: result[0].current_step_id,
tableName: result[0].table_name,
};
}
}

View File

@@ -0,0 +1,436 @@
import db from "../database/db";
import {
FlowExternalDbConnection,
CreateFlowExternalDbConnectionRequest,
UpdateFlowExternalDbConnectionRequest,
} from "../types/flow";
import { CredentialEncryption } from "../utils/credentialEncryption";
import { Pool } from "pg";
// import mysql from 'mysql2/promise'; // MySQL용 (추후)
// import { ConnectionPool } from 'mssql'; // MSSQL용 (추후)
/**
* 플로우 전용 외부 DB 연결 관리 서비스
* (기존 제어관리 외부 DB 연결과 별도)
*/
export class FlowExternalDbConnectionService {
private encryption: CredentialEncryption;
private connectionPools: Map<number, Pool> = new Map();
constructor() {
// 환경 변수에서 SECRET_KEY를 가져오거나 기본값 설정
const secretKey =
process.env.SECRET_KEY || "flow-external-db-secret-key-2025";
this.encryption = new CredentialEncryption(secretKey);
}
/**
* 외부 DB 연결 생성
*/
async create(
request: CreateFlowExternalDbConnectionRequest,
userId: string = "system"
): Promise<FlowExternalDbConnection> {
// 비밀번호 암호화
const encryptedPassword = this.encryption.encrypt(request.password);
const query = `
INSERT INTO flow_external_db_connection (
name, description, db_type, host, port, database_name, username,
password_encrypted, ssl_enabled, connection_options, created_by, updated_by
)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12)
RETURNING *
`;
const result = await db.query(query, [
request.name,
request.description || null,
request.dbType,
request.host,
request.port,
request.databaseName,
request.username,
encryptedPassword,
request.sslEnabled || false,
request.connectionOptions
? JSON.stringify(request.connectionOptions)
: null,
userId,
userId,
]);
return this.mapToFlowExternalDbConnection(result[0]);
}
/**
* ID로 외부 DB 연결 조회
*/
async findById(id: number): Promise<FlowExternalDbConnection | null> {
const query = "SELECT * FROM flow_external_db_connection WHERE id = $1";
const result = await db.query(query, [id]);
if (result.length === 0) {
return null;
}
return this.mapToFlowExternalDbConnection(result[0]);
}
/**
* 모든 외부 DB 연결 조회
*/
async findAll(
activeOnly: boolean = false
): Promise<FlowExternalDbConnection[]> {
let query = "SELECT * FROM flow_external_db_connection";
if (activeOnly) {
query += " WHERE is_active = true";
}
query += " ORDER BY name ASC";
const result = await db.query(query);
return result.map((row) => this.mapToFlowExternalDbConnection(row));
}
/**
* 외부 DB 연결 수정
*/
async update(
id: number,
request: UpdateFlowExternalDbConnectionRequest,
userId: string = "system"
): Promise<FlowExternalDbConnection | null> {
const fields: string[] = [];
const params: any[] = [];
let paramIndex = 1;
if (request.name !== undefined) {
fields.push(`name = $${paramIndex}`);
params.push(request.name);
paramIndex++;
}
if (request.description !== undefined) {
fields.push(`description = $${paramIndex}`);
params.push(request.description);
paramIndex++;
}
if (request.host !== undefined) {
fields.push(`host = $${paramIndex}`);
params.push(request.host);
paramIndex++;
}
if (request.port !== undefined) {
fields.push(`port = $${paramIndex}`);
params.push(request.port);
paramIndex++;
}
if (request.databaseName !== undefined) {
fields.push(`database_name = $${paramIndex}`);
params.push(request.databaseName);
paramIndex++;
}
if (request.username !== undefined) {
fields.push(`username = $${paramIndex}`);
params.push(request.username);
paramIndex++;
}
if (request.password !== undefined) {
const encryptedPassword = this.encryption.encrypt(request.password);
fields.push(`password_encrypted = $${paramIndex}`);
params.push(encryptedPassword);
paramIndex++;
}
if (request.sslEnabled !== undefined) {
fields.push(`ssl_enabled = $${paramIndex}`);
params.push(request.sslEnabled);
paramIndex++;
}
if (request.connectionOptions !== undefined) {
fields.push(`connection_options = $${paramIndex}`);
params.push(
request.connectionOptions
? JSON.stringify(request.connectionOptions)
: null
);
paramIndex++;
}
if (request.isActive !== undefined) {
fields.push(`is_active = $${paramIndex}`);
params.push(request.isActive);
paramIndex++;
}
if (fields.length === 0) {
return this.findById(id);
}
fields.push(`updated_by = $${paramIndex}`);
params.push(userId);
paramIndex++;
fields.push(`updated_at = NOW()`);
const query = `
UPDATE flow_external_db_connection
SET ${fields.join(", ")}
WHERE id = $${paramIndex}
RETURNING *
`;
params.push(id);
const result = await db.query(query, params);
if (result.length === 0) {
return null;
}
// 연결 풀 갱신 (비밀번호 변경 시)
if (request.password !== undefined || request.host !== undefined) {
this.closeConnection(id);
}
return this.mapToFlowExternalDbConnection(result[0]);
}
/**
* 외부 DB 연결 삭제
*/
async delete(id: number): Promise<boolean> {
// 연결 풀 정리
this.closeConnection(id);
const query =
"DELETE FROM flow_external_db_connection WHERE id = $1 RETURNING id";
const result = await db.query(query, [id]);
return result.length > 0;
}
/**
* 연결 테스트
*/
async testConnection(
id: number
): Promise<{ success: boolean; message: string }> {
try {
const connection = await this.findById(id);
if (!connection) {
return { success: false, message: "연결 정보를 찾을 수 없습니다." };
}
const pool = await this.getConnectionPool(connection);
// 간단한 쿼리로 연결 테스트
const client = await pool.connect();
try {
await client.query("SELECT 1");
return { success: true, message: "연결 성공" };
} finally {
client.release();
}
} catch (error: any) {
return { success: false, message: error.message };
}
}
/**
* 외부 DB의 테이블 목록 조회
*/
async getTables(
id: number
): Promise<{ success: boolean; data?: string[]; message?: string }> {
try {
const connection = await this.findById(id);
if (!connection) {
return { success: false, message: "연결 정보를 찾을 수 없습니다." };
}
const pool = await this.getConnectionPool(connection);
const client = await pool.connect();
try {
let query: string;
switch (connection.dbType) {
case "postgresql":
query =
"SELECT tablename FROM pg_tables WHERE schemaname = 'public' ORDER BY tablename";
break;
case "mysql":
query = `SELECT table_name as tablename FROM information_schema.tables WHERE table_schema = '${connection.databaseName}' ORDER BY table_name`;
break;
default:
return {
success: false,
message: `지원하지 않는 DB 타입: ${connection.dbType}`,
};
}
const result = await client.query(query);
const tables = result.rows.map((row: any) => row.tablename);
return { success: true, data: tables };
} finally {
client.release();
}
} catch (error: any) {
return { success: false, message: error.message };
}
}
/**
* 외부 DB의 특정 테이블 컬럼 목록 조회
*/
async getTableColumns(
id: number,
tableName: string
): Promise<{
success: boolean;
data?: { column_name: string; data_type: string }[];
message?: string;
}> {
try {
const connection = await this.findById(id);
if (!connection) {
return { success: false, message: "연결 정보를 찾을 수 없습니다." };
}
const pool = await this.getConnectionPool(connection);
const client = await pool.connect();
try {
let query: string;
switch (connection.dbType) {
case "postgresql":
query = `SELECT column_name, data_type
FROM information_schema.columns
WHERE table_schema = 'public' AND table_name = $1
ORDER BY ordinal_position`;
break;
case "mysql":
query = `SELECT column_name, data_type
FROM information_schema.columns
WHERE table_schema = '${connection.databaseName}' AND table_name = ?
ORDER BY ordinal_position`;
break;
default:
return {
success: false,
message: `지원하지 않는 DB 타입: ${connection.dbType}`,
};
}
const result = await client.query(query, [tableName]);
return { success: true, data: result.rows };
} finally {
client.release();
}
} catch (error: any) {
return { success: false, message: error.message };
}
}
/**
* 연결 풀 가져오기 (캐싱)
*/
async getConnectionPool(connection: FlowExternalDbConnection): Promise<Pool> {
if (this.connectionPools.has(connection.id)) {
return this.connectionPools.get(connection.id)!;
}
// 비밀번호 복호화
const decryptedPassword = this.encryption.decrypt(
connection.passwordEncrypted
);
let pool: Pool;
switch (connection.dbType) {
case "postgresql":
pool = new Pool({
host: connection.host,
port: connection.port,
database: connection.databaseName,
user: connection.username,
password: decryptedPassword,
ssl: connection.sslEnabled,
// 연결 풀 설정 (고갈 방지)
max: 10, // 최대 연결 수
min: 2, // 최소 연결 수
idleTimeoutMillis: 30000, // 30초 유휴 시간 후 연결 해제
connectionTimeoutMillis: 10000, // 10초 연결 타임아웃
...(connection.connectionOptions || {}),
});
// 에러 핸들러 등록
pool.on("error", (err) => {
console.error(`외부 DB 연결 풀 오류 (ID: ${connection.id}):`, err);
});
break;
// case "mysql":
// pool = mysql.createPool({ ... });
// break;
// case "mssql":
// pool = new ConnectionPool({ ... });
// break;
default:
throw new Error(`지원하지 않는 DB 타입: ${connection.dbType}`);
}
this.connectionPools.set(connection.id, pool);
return pool;
}
/**
* 연결 풀 정리
*/
closeConnection(id: number): void {
const pool = this.connectionPools.get(id);
if (pool) {
pool.end();
this.connectionPools.delete(id);
}
}
/**
* 모든 연결 풀 정리
*/
closeAllConnections(): void {
for (const [id, pool] of this.connectionPools.entries()) {
pool.end();
}
this.connectionPools.clear();
}
/**
* DB row를 FlowExternalDbConnection으로 매핑
*/
private mapToFlowExternalDbConnection(row: any): FlowExternalDbConnection {
return {
id: row.id,
name: row.name,
description: row.description || undefined,
dbType: row.db_type,
host: row.host,
port: row.port,
databaseName: row.database_name,
username: row.username,
passwordEncrypted: row.password_encrypted,
sslEnabled: row.ssl_enabled,
connectionOptions: row.connection_options || undefined,
isActive: row.is_active,
createdBy: row.created_by || undefined,
updatedBy: row.updated_by || undefined,
createdAt: row.created_at,
updatedAt: row.updated_at,
};
}
}

View File

@@ -0,0 +1,353 @@
import {
FlowExternalDbIntegrationConfig,
FlowIntegrationContext,
FlowIntegrationResult,
} from "../types/flow";
import { FlowExternalDbConnectionService } from "./flowExternalDbConnectionService";
import { Pool } from "pg";
/**
* 플로우 외부 DB 연동 실행 서비스
* 외부 데이터베이스에 대한 작업(INSERT, UPDATE, DELETE, CUSTOM QUERY) 수행
*/
export class FlowExternalDbIntegrationService {
private connectionService: FlowExternalDbConnectionService;
constructor() {
this.connectionService = new FlowExternalDbConnectionService();
}
/**
* 외부 DB 연동 실행
*/
async execute(
context: FlowIntegrationContext,
config: FlowExternalDbIntegrationConfig
): Promise<FlowIntegrationResult> {
const startTime = Date.now();
try {
// 1. 연결 정보 조회
const connection = await this.connectionService.findById(
config.connectionId
);
if (!connection) {
return {
success: false,
error: {
code: "CONNECTION_NOT_FOUND",
message: `외부 DB 연결 정보를 찾을 수 없습니다 (ID: ${config.connectionId})`,
},
};
}
if (!connection.isActive) {
return {
success: false,
error: {
code: "CONNECTION_INACTIVE",
message: `외부 DB 연결이 비활성화 상태입니다 (${connection.name})`,
},
};
}
// 2. 쿼리 생성 (템플릿 변수 치환)
const query = this.buildQuery(config, context);
// 3. 실행
const pool = await this.connectionService.getConnectionPool(connection);
const result = await this.executeQuery(pool, query);
const executionTime = Date.now() - startTime;
return {
success: true,
message: `외부 DB 작업 성공 (${config.operation}, ${executionTime}ms)`,
data: result,
rollbackInfo: {
query: this.buildRollbackQuery(config, context, result),
connectionId: config.connectionId,
},
};
} catch (error: any) {
const executionTime = Date.now() - startTime;
return {
success: false,
error: {
code: "EXTERNAL_DB_ERROR",
message: error.message || "외부 DB 작업 실패",
details: {
operation: config.operation,
tableName: config.tableName,
executionTime,
originalError: error,
},
},
};
}
}
/**
* 쿼리 실행
*/
private async executeQuery(
pool: Pool,
query: { sql: string; params: any[] }
): Promise<any> {
const client = await pool.connect();
try {
const result = await client.query(query.sql, query.params);
return result.rows;
} finally {
client.release();
}
}
/**
* 쿼리 빌드 (템플릿 변수 치환 포함)
*/
private buildQuery(
config: FlowExternalDbIntegrationConfig,
context: FlowIntegrationContext
): { sql: string; params: any[] } {
let sql = "";
const params: any[] = [];
let paramIndex = 1;
switch (config.operation) {
case "update":
return this.buildUpdateQuery(config, context, paramIndex);
case "insert":
return this.buildInsertQuery(config, context, paramIndex);
case "delete":
return this.buildDeleteQuery(config, context, paramIndex);
case "custom":
return this.buildCustomQuery(config, context);
default:
throw new Error(`지원하지 않는 작업: ${config.operation}`);
}
}
/**
* UPDATE 쿼리 빌드
*/
private buildUpdateQuery(
config: FlowExternalDbIntegrationConfig,
context: FlowIntegrationContext,
startIndex: number
): { sql: string; params: any[] } {
if (!config.updateFields || Object.keys(config.updateFields).length === 0) {
throw new Error("UPDATE 작업에는 updateFields가 필요합니다");
}
if (
!config.whereCondition ||
Object.keys(config.whereCondition).length === 0
) {
throw new Error("UPDATE 작업에는 whereCondition이 필요합니다");
}
const setClauses: string[] = [];
const params: any[] = [];
let paramIndex = startIndex;
// SET 절 생성
for (const [key, value] of Object.entries(config.updateFields)) {
setClauses.push(`${key} = $${paramIndex}`);
params.push(this.replaceVariables(value, context));
paramIndex++;
}
// WHERE 절 생성
const whereClauses: string[] = [];
for (const [key, value] of Object.entries(config.whereCondition)) {
whereClauses.push(`${key} = $${paramIndex}`);
params.push(this.replaceVariables(value, context));
paramIndex++;
}
const sql = `UPDATE ${config.tableName} SET ${setClauses.join(", ")} WHERE ${whereClauses.join(" AND ")}`;
return { sql, params };
}
/**
* INSERT 쿼리 빌드
*/
private buildInsertQuery(
config: FlowExternalDbIntegrationConfig,
context: FlowIntegrationContext,
startIndex: number
): { sql: string; params: any[] } {
if (!config.updateFields || Object.keys(config.updateFields).length === 0) {
throw new Error("INSERT 작업에는 updateFields가 필요합니다");
}
const columns: string[] = [];
const placeholders: string[] = [];
const params: any[] = [];
let paramIndex = startIndex;
for (const [key, value] of Object.entries(config.updateFields)) {
columns.push(key);
placeholders.push(`$${paramIndex}`);
params.push(this.replaceVariables(value, context));
paramIndex++;
}
const sql = `INSERT INTO ${config.tableName} (${columns.join(", ")}) VALUES (${placeholders.join(", ")}) RETURNING *`;
return { sql, params };
}
/**
* DELETE 쿼리 빌드
*/
private buildDeleteQuery(
config: FlowExternalDbIntegrationConfig,
context: FlowIntegrationContext,
startIndex: number
): { sql: string; params: any[] } {
if (
!config.whereCondition ||
Object.keys(config.whereCondition).length === 0
) {
throw new Error("DELETE 작업에는 whereCondition이 필요합니다");
}
const whereClauses: string[] = [];
const params: any[] = [];
let paramIndex = startIndex;
for (const [key, value] of Object.entries(config.whereCondition)) {
whereClauses.push(`${key} = $${paramIndex}`);
params.push(this.replaceVariables(value, context));
paramIndex++;
}
const sql = `DELETE FROM ${config.tableName} WHERE ${whereClauses.join(" AND ")}`;
return { sql, params };
}
/**
* CUSTOM 쿼리 빌드
*/
private buildCustomQuery(
config: FlowExternalDbIntegrationConfig,
context: FlowIntegrationContext
): { sql: string; params: any[] } {
if (!config.customQuery) {
throw new Error("CUSTOM 작업에는 customQuery가 필요합니다");
}
// 템플릿 변수 치환
const sql = this.replaceVariables(config.customQuery, context);
// 커스텀 쿼리는 파라미터를 직접 관리
// 보안을 위해 가능하면 파라미터 바인딩 사용 권장
return { sql, params: [] };
}
/**
* 템플릿 변수 치환
*/
private replaceVariables(value: any, context: FlowIntegrationContext): any {
if (typeof value !== "string") {
return value;
}
let result = value;
// {{dataId}} 치환
result = result.replace(/\{\{dataId\}\}/g, String(context.dataId));
// {{currentUser}} 치환
result = result.replace(/\{\{currentUser\}\}/g, context.currentUser);
// {{currentTimestamp}} 치환
result = result.replace(
/\{\{currentTimestamp\}\}/g,
new Date().toISOString()
);
// {{flowId}} 치환
result = result.replace(/\{\{flowId\}\}/g, String(context.flowId));
// {{stepId}} 치환
result = result.replace(/\{\{stepId\}\}/g, String(context.stepId));
// {{tableName}} 치환
if (context.tableName) {
result = result.replace(/\{\{tableName\}\}/g, context.tableName);
}
// context.variables의 커스텀 변수 치환
for (const [key, val] of Object.entries(context.variables)) {
const regex = new RegExp(`\\{\\{${key}\\}\\}`, "g");
result = result.replace(regex, String(val));
}
// NOW() 같은 SQL 함수는 그대로 반환
if (result === "NOW()" || result.startsWith("CURRENT_")) {
return result;
}
return result;
}
/**
* 롤백 쿼리 생성
*/
private buildRollbackQuery(
config: FlowExternalDbIntegrationConfig,
context: FlowIntegrationContext,
result: any
): { sql: string; params: any[] } | null {
// 롤백 쿼리 생성 로직 (복잡하므로 실제 구현 시 상세 설계 필요)
// 예: INSERT -> DELETE, UPDATE -> 이전 값으로 UPDATE
switch (config.operation) {
case "insert":
// INSERT를 롤백하려면 삽입된 레코드를 DELETE
if (result && result[0] && result[0].id) {
return {
sql: `DELETE FROM ${config.tableName} WHERE id = $1`,
params: [result[0].id],
};
}
break;
case "delete":
// DELETE 롤백은 매우 어려움 (원본 데이터 필요)
console.warn("DELETE 작업의 롤백은 지원하지 않습니다");
break;
case "update":
// UPDATE 롤백을 위해서는 이전 값을 저장해야 함
console.warn("UPDATE 작업의 롤백은 현재 구현되지 않았습니다");
break;
default:
break;
}
return null;
}
/**
* 롤백 실행
*/
async rollback(
connectionId: number,
rollbackQuery: { sql: string; params: any[] }
): Promise<void> {
const connection = await this.connectionService.findById(connectionId);
if (!connection) {
throw new Error(
`롤백 실패: 연결 정보를 찾을 수 없습니다 (ID: ${connectionId})`
);
}
const pool = await this.connectionService.getConnectionPool(connection);
await this.executeQuery(pool, rollbackQuery);
}
}

View File

@@ -0,0 +1,289 @@
/**
* 플로우 단계 서비스
*/
import db from "../database/db";
import {
FlowStep,
CreateFlowStepRequest,
UpdateFlowStepRequest,
FlowConditionGroup,
} from "../types/flow";
import { FlowConditionParser } from "./flowConditionParser";
export class FlowStepService {
/**
* 플로우 단계 생성
*/
async create(request: CreateFlowStepRequest): Promise<FlowStep> {
// 조건 검증
if (request.conditionJson) {
FlowConditionParser.validateConditionGroup(request.conditionJson);
}
const query = `
INSERT INTO flow_step (
flow_definition_id, step_name, step_order, table_name, condition_json,
color, position_x, position_y, move_type, status_column, status_value,
target_table, field_mappings, required_fields,
integration_type, integration_config
)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16)
RETURNING *
`;
const result = await db.query(query, [
request.flowDefinitionId,
request.stepName,
request.stepOrder,
request.tableName || null,
request.conditionJson ? JSON.stringify(request.conditionJson) : null,
request.color || "#3B82F6",
request.positionX || 0,
request.positionY || 0,
request.moveType || null,
request.statusColumn || null,
request.statusValue || null,
request.targetTable || null,
request.fieldMappings ? JSON.stringify(request.fieldMappings) : null,
request.requiredFields ? JSON.stringify(request.requiredFields) : null,
request.integrationType || "internal",
request.integrationConfig
? JSON.stringify(request.integrationConfig)
: null,
]);
return this.mapToFlowStep(result[0]);
}
/**
* 특정 플로우의 모든 단계 조회
*/
async findByFlowId(flowDefinitionId: number): Promise<FlowStep[]> {
const query = `
SELECT * FROM flow_step
WHERE flow_definition_id = $1
ORDER BY step_order ASC
`;
const result = await db.query(query, [flowDefinitionId]);
return result.map(this.mapToFlowStep);
}
/**
* 플로우 단계 단일 조회
*/
async findById(id: number): Promise<FlowStep | null> {
const query = "SELECT * FROM flow_step WHERE id = $1";
const result = await db.query(query, [id]);
if (result.length === 0) {
return null;
}
return this.mapToFlowStep(result[0]);
}
/**
* 플로우 단계 수정
*/
async update(
id: number,
request: UpdateFlowStepRequest
): Promise<FlowStep | null> {
console.log("🔧 FlowStepService.update called with:", {
id,
statusColumn: request.statusColumn,
statusValue: request.statusValue,
fullRequest: JSON.stringify(request),
});
// 조건 검증
if (request.conditionJson) {
FlowConditionParser.validateConditionGroup(request.conditionJson);
}
const fields: string[] = [];
const params: any[] = [];
let paramIndex = 1;
if (request.stepName !== undefined) {
fields.push(`step_name = $${paramIndex}`);
params.push(request.stepName);
paramIndex++;
}
if (request.stepOrder !== undefined) {
fields.push(`step_order = $${paramIndex}`);
params.push(request.stepOrder);
paramIndex++;
}
if (request.tableName !== undefined) {
fields.push(`table_name = $${paramIndex}`);
params.push(request.tableName);
paramIndex++;
}
if (request.conditionJson !== undefined) {
fields.push(`condition_json = $${paramIndex}`);
params.push(
request.conditionJson ? JSON.stringify(request.conditionJson) : null
);
paramIndex++;
}
if (request.color !== undefined) {
fields.push(`color = $${paramIndex}`);
params.push(request.color);
paramIndex++;
}
if (request.positionX !== undefined) {
fields.push(`position_x = $${paramIndex}`);
params.push(request.positionX);
paramIndex++;
}
if (request.positionY !== undefined) {
fields.push(`position_y = $${paramIndex}`);
params.push(request.positionY);
paramIndex++;
}
// 하이브리드 플로우 필드
if (request.moveType !== undefined) {
fields.push(`move_type = $${paramIndex}`);
params.push(request.moveType);
paramIndex++;
}
if (request.statusColumn !== undefined) {
fields.push(`status_column = $${paramIndex}`);
params.push(request.statusColumn);
paramIndex++;
}
if (request.statusValue !== undefined) {
fields.push(`status_value = $${paramIndex}`);
params.push(request.statusValue);
paramIndex++;
}
if (request.targetTable !== undefined) {
fields.push(`target_table = $${paramIndex}`);
params.push(request.targetTable);
paramIndex++;
}
if (request.fieldMappings !== undefined) {
fields.push(`field_mappings = $${paramIndex}`);
params.push(
request.fieldMappings ? JSON.stringify(request.fieldMappings) : null
);
paramIndex++;
}
if (request.requiredFields !== undefined) {
fields.push(`required_fields = $${paramIndex}`);
params.push(
request.requiredFields ? JSON.stringify(request.requiredFields) : null
);
paramIndex++;
}
// 외부 연동 필드
if (request.integrationType !== undefined) {
fields.push(`integration_type = $${paramIndex}`);
params.push(request.integrationType);
paramIndex++;
}
if (request.integrationConfig !== undefined) {
fields.push(`integration_config = $${paramIndex}`);
params.push(
request.integrationConfig
? JSON.stringify(request.integrationConfig)
: null
);
paramIndex++;
}
if (fields.length === 0) {
return this.findById(id);
}
fields.push(`updated_at = NOW()`);
const query = `
UPDATE flow_step
SET ${fields.join(", ")}
WHERE id = $${paramIndex}
RETURNING *
`;
params.push(id);
const result = await db.query(query, params);
if (result.length === 0) {
return null;
}
return this.mapToFlowStep(result[0]);
}
/**
* 플로우 단계 삭제
*/
async delete(id: number): Promise<boolean> {
const query = "DELETE FROM flow_step WHERE id = $1 RETURNING id";
const result = await db.query(query, [id]);
return result.length > 0;
}
/**
* 단계 순서 재정렬
*/
async reorder(
flowDefinitionId: number,
stepOrders: { id: number; order: number }[]
): Promise<void> {
await db.transaction(async (client) => {
for (const { id, order } of stepOrders) {
await client.query(
"UPDATE flow_step SET step_order = $1, updated_at = NOW() WHERE id = $2 AND flow_definition_id = $3",
[order, id, flowDefinitionId]
);
}
});
}
/**
* DB 행을 FlowStep 객체로 변환
*/
private mapToFlowStep(row: any): FlowStep {
return {
id: row.id,
flowDefinitionId: row.flow_definition_id,
stepName: row.step_name,
stepOrder: row.step_order,
tableName: row.table_name || undefined,
conditionJson: row.condition_json as FlowConditionGroup | undefined,
color: row.color,
positionX: row.position_x,
positionY: row.position_y,
// 하이브리드 플로우 지원 필드
moveType: row.move_type || undefined,
statusColumn: row.status_column || undefined,
statusValue: row.status_value || undefined,
targetTable: row.target_table || undefined,
fieldMappings: row.field_mappings || undefined,
requiredFields: row.required_fields || undefined,
// 외부 연동 필드
integrationType: row.integration_type || "internal",
integrationConfig: row.integration_config || undefined,
createdAt: row.created_at,
updatedAt: row.updated_at,
};
}
}

View File

@@ -0,0 +1,345 @@
/**
* 플로우 관리 시스템 타입 정의
*/
// 플로우 정의
export interface FlowDefinition {
id: number;
name: string;
description?: string;
tableName: string;
isActive: boolean;
createdBy?: string;
createdAt: Date;
updatedAt: Date;
}
// 플로우 정의 생성 요청
export interface CreateFlowDefinitionRequest {
name: string;
description?: string;
tableName: string;
}
// 플로우 정의 수정 요청
export interface UpdateFlowDefinitionRequest {
name?: string;
description?: string;
isActive?: boolean;
}
// 조건 연산자
export type ConditionOperator =
| "equals"
| "="
| "not_equals"
| "!="
| "in"
| "not_in"
| "greater_than"
| ">"
| "less_than"
| "<"
| "greater_than_or_equal"
| ">="
| "less_than_or_equal"
| "<="
| "is_null"
| "is_not_null"
| "like"
| "not_like";
// 플로우 조건
export interface FlowCondition {
column: string;
operator: ConditionOperator;
value: any;
}
// 플로우 조건 그룹
export interface FlowConditionGroup {
type: "AND" | "OR";
conditions: FlowCondition[];
}
// 플로우 단계
export interface FlowStep {
id: number;
flowDefinitionId: number;
stepName: string;
stepOrder: number;
tableName?: string; // 이 단계에서 조회할 테이블명 (NULL이면 flow_definition의 tableName 사용)
conditionJson?: FlowConditionGroup;
color: string;
positionX: number;
positionY: number;
// 하이브리드 플로우 지원 필드
moveType?: "status" | "table" | "both"; // 데이터 이동 방식
statusColumn?: string; // 상태 컬럼명 (상태 변경 방식)
statusValue?: string; // 이 단계의 상태값
targetTable?: string; // 타겟 테이블명 (테이블 이동 방식)
fieldMappings?: Record<string, string>; // 필드 매핑 정보
requiredFields?: string[]; // 필수 입력 필드
// 외부 연동 필드
integrationType?: FlowIntegrationType; // 연동 타입 (기본값: internal)
integrationConfig?: FlowIntegrationConfig; // 연동 설정 (JSONB)
createdAt: Date;
updatedAt: Date;
}
// 플로우 단계 생성 요청
export interface CreateFlowStepRequest {
flowDefinitionId: number;
stepName: string;
stepOrder: number;
tableName?: string; // 이 단계에서 조회할 테이블명
conditionJson?: FlowConditionGroup;
color?: string;
positionX?: number;
positionY?: number;
// 하이브리드 플로우 지원 필드
moveType?: "status" | "table" | "both";
statusColumn?: string;
statusValue?: string;
targetTable?: string;
fieldMappings?: Record<string, string>;
requiredFields?: string[];
// 외부 연동 필드
integrationType?: FlowIntegrationType;
integrationConfig?: FlowIntegrationConfig;
}
// 플로우 단계 수정 요청
export interface UpdateFlowStepRequest {
stepName?: string;
stepOrder?: number;
tableName?: string; // 이 단계에서 조회할 테이블명
conditionJson?: FlowConditionGroup;
color?: string;
positionX?: number;
positionY?: number;
// 하이브리드 플로우 지원 필드
moveType?: "status" | "table" | "both";
statusColumn?: string;
statusValue?: string;
targetTable?: string;
fieldMappings?: Record<string, string>;
requiredFields?: string[];
// 외부 연동 필드
integrationType?: FlowIntegrationType;
integrationConfig?: FlowIntegrationConfig;
}
// 플로우 단계 연결
export interface FlowStepConnection {
id: number;
flowDefinitionId: number;
fromStepId: number;
toStepId: number;
label?: string;
createdAt: Date;
}
// 플로우 단계 연결 생성 요청
export interface CreateFlowConnectionRequest {
flowDefinitionId: number;
fromStepId: number;
toStepId: number;
label?: string;
}
// 플로우 데이터 상태
export interface FlowDataStatus {
id: number;
flowDefinitionId: number;
tableName: string;
recordId: string;
currentStepId?: number;
updatedBy?: string;
updatedAt: Date;
}
// 플로우 오딧 로그
export interface FlowAuditLog {
id: number;
flowDefinitionId: number;
tableName: string;
recordId: string;
fromStepId?: number;
toStepId?: number;
changedBy?: string;
changedAt: Date;
note?: string;
// 하이브리드 플로우 지원 필드
moveType?: "status" | "table" | "both";
sourceTable?: string;
targetTable?: string;
sourceDataId?: string;
targetDataId?: string;
statusFrom?: string;
statusTo?: string;
// 조인 필드
fromStepName?: string;
toStepName?: string;
}
// 플로우 상세 정보
export interface FlowDetailResponse {
definition: FlowDefinition;
steps: FlowStep[];
connections: FlowStepConnection[];
}
// 단계별 데이터 카운트
export interface FlowStepDataCount {
stepId: number;
count: number;
}
// 단계별 데이터 리스트
export interface FlowStepDataList {
records: any[];
total: number;
page: number;
pageSize: number;
}
// 데이터 이동 요청
export interface MoveDataRequest {
flowId: number;
recordId: string;
toStepId: number;
note?: string;
}
// SQL WHERE 절 결과
export interface SqlWhereResult {
where: string;
params: any[];
}
// ==================== 플로우 외부 연동 타입 ====================
// 연동 타입
export type FlowIntegrationType =
| "internal" // 내부 DB (기본값)
| "external_db" // 외부 DB
| "rest_api" // REST API (추후 구현)
| "webhook" // Webhook (추후 구현)
| "hybrid"; // 복합 연동 (추후 구현)
// 플로우 전용 외부 DB 연결 정보
export interface FlowExternalDbConnection {
id: number;
name: string;
description?: string;
dbType: "postgresql" | "mysql" | "mssql" | "oracle";
host: string;
port: number;
databaseName: string;
username: string;
passwordEncrypted: string; // 암호화된 비밀번호
sslEnabled: boolean;
connectionOptions?: Record<string, any>;
isActive: boolean;
createdBy?: string;
updatedBy?: string;
createdAt: Date;
updatedAt: Date;
}
// 외부 DB 연결 생성 요청
export interface CreateFlowExternalDbConnectionRequest {
name: string;
description?: string;
dbType: "postgresql" | "mysql" | "mssql" | "oracle";
host: string;
port: number;
databaseName: string;
username: string;
password: string; // 평문 비밀번호 (저장 시 암호화)
sslEnabled?: boolean;
connectionOptions?: Record<string, any>;
}
// 외부 DB 연결 수정 요청
export interface UpdateFlowExternalDbConnectionRequest {
name?: string;
description?: string;
host?: string;
port?: number;
databaseName?: string;
username?: string;
password?: string; // 평문 비밀번호 (저장 시 암호화)
sslEnabled?: boolean;
connectionOptions?: Record<string, any>;
isActive?: boolean;
}
// 외부 DB 연동 설정 (integration_config JSON)
export interface FlowExternalDbIntegrationConfig {
type: "external_db";
connectionId: number; // flow_external_db_connection.id
operation: "update" | "insert" | "delete" | "custom";
tableName: string;
updateFields?: Record<string, any>; // 업데이트할 필드 (템플릿 변수 지원)
whereCondition?: Record<string, any>; // WHERE 조건 (템플릿 변수 지원)
customQuery?: string; // operation이 'custom'인 경우 사용
}
// 연동 설정 통합 타입
export type FlowIntegrationConfig = FlowExternalDbIntegrationConfig; // 나중에 다른 타입 추가
// 연동 실행 컨텍스트
export interface FlowIntegrationContext {
flowId: number;
stepId: number;
dataId: string | number;
tableName?: string;
currentUser: string;
variables: Record<string, any>; // 템플릿 변수 ({{dataId}}, {{currentUser}} 등)
transactionId?: string;
}
// 연동 실행 결과
export interface FlowIntegrationResult {
success: boolean;
message?: string;
data?: any;
error?: {
code: string;
message: string;
details?: any;
};
rollbackInfo?: any; // 롤백을 위한 정보
}
// 외부 연동 실행 로그
export interface FlowIntegrationLog {
id: number;
flowDefinitionId: number;
stepId: number;
dataId?: string;
integrationType: string;
connectionId?: number;
requestPayload?: Record<string, any>;
responsePayload?: Record<string, any>;
status: "success" | "failed" | "timeout" | "rollback";
errorMessage?: string;
executionTimeMs?: number;
executedBy?: string;
executedAt: Date;
}
// 외부 연결 권한
export interface FlowExternalConnectionPermission {
id: number;
connectionId: number;
userId?: number;
roleName?: string;
canView: boolean;
canUse: boolean;
canEdit: boolean;
canDelete: boolean;
createdAt: Date;
}

View File

@@ -0,0 +1,61 @@
import crypto from "crypto";
/**
* 자격 증명 암호화 유틸리티
* AES-256-GCM 알고리즘 사용
*/
export class CredentialEncryption {
private algorithm = "aes-256-gcm";
private key: Buffer;
constructor(secretKey: string) {
// scrypt로 안전한 키 생성
this.key = crypto.scryptSync(secretKey, "salt", 32);
}
/**
* 평문을 암호화
*/
encrypt(text: string): string {
const iv = crypto.randomBytes(16);
const cipher = crypto.createCipheriv(
this.algorithm,
this.key,
iv
) as crypto.CipherGCM;
let encrypted = cipher.update(text, "utf8", "hex");
encrypted += cipher.final("hex");
const authTag = cipher.getAuthTag();
// IV:AuthTag:EncryptedText 형식으로 반환
return `${iv.toString("hex")}:${authTag.toString("hex")}:${encrypted}`;
}
/**
* 암호문을 복호화
*/
decrypt(encrypted: string): string {
const [ivHex, authTagHex, encryptedText] = encrypted.split(":");
if (!ivHex || !authTagHex || !encryptedText) {
throw new Error("Invalid encrypted string format");
}
const iv = Buffer.from(ivHex, "hex");
const authTag = Buffer.from(authTagHex, "hex");
const decipher = crypto.createDecipheriv(
this.algorithm,
this.key,
iv
) as crypto.DecipherGCM;
decipher.setAuthTag(authTag);
let decrypted = decipher.update(encryptedText, "hex", "utf8");
decrypted += decipher.final("utf8");
return decrypted;
}
}