from datetime import datetime, timezone from typing import Optional from uuid import UUID from fastapi import APIRouter, HTTPException, Depends, Path, Query from sqlalchemy import select, func from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.orm import selectinload from src.database.config import get_db from src.database.models import ( Alarm, EquipmentPart, PartCounter, Machine, InspectionSession, InspectionRecord, InspectionTemplate, InspectionTemplateItem, ) from src.auth.models import TokenData from src.auth.dependencies import require_auth, verify_tenant_access router = APIRouter(tags=["alarms"]) def _format_ts(val) -> Optional[str]: if val is None: return None if hasattr(val, "isoformat"): return val.isoformat() return str(val) def _alarm_to_dict(alarm: Alarm) -> dict: return { "id": str(alarm.id), "tenant_id": str(alarm.tenant_id), "equipment_part_id": str(alarm.equipment_part_id) if alarm.equipment_part_id else None, "machine_id": str(alarm.machine_id) if alarm.machine_id else None, "inspection_session_id": str(alarm.inspection_session_id) if alarm.inspection_session_id else None, "alarm_type": str(alarm.alarm_type), "severity": str(alarm.severity), "message": str(alarm.message), "lifecycle_pct_at_trigger": float(alarm.lifecycle_pct_at_trigger) if alarm.lifecycle_pct_at_trigger is not None else None, "is_acknowledged": bool(alarm.is_acknowledged), "acknowledged_by": str(alarm.acknowledged_by) if alarm.acknowledged_by else None, "acknowledged_at": _format_ts(alarm.acknowledged_at), "created_at": _format_ts(alarm.created_at), "machine_name": alarm.machine.name if alarm.machine else None, "part_name": alarm.equipment_part.name if alarm.equipment_part else None, } @router.get("/api/{tenant_id}/alarms") async def list_alarms( tenant_id: str = Path(...), is_acknowledged: Optional[bool] = Query(None), severity: Optional[str] = Query(None), current_user: TokenData = Depends(require_auth), db: AsyncSession = Depends(get_db), ): verify_tenant_access(tenant_id, current_user) stmt = ( select(Alarm) .options( selectinload(Alarm.equipment_part), selectinload(Alarm.machine), ) .where(Alarm.tenant_id == tenant_id) ) if is_acknowledged is not None: stmt = stmt.where(Alarm.is_acknowledged == is_acknowledged) if severity: stmt = stmt.where(Alarm.severity == severity) stmt = stmt.order_by(Alarm.created_at.desc()) result = await db.execute(stmt) alarms = result.scalars().all() return {"alarms": [_alarm_to_dict(a) for a in alarms]} @router.get("/api/{tenant_id}/alarms/summary") async def alarm_summary( tenant_id: str = Path(...), current_user: TokenData = Depends(require_auth), db: AsyncSession = Depends(get_db), ): verify_tenant_access(tenant_id, current_user) stmt = select( func.count().filter(Alarm.is_acknowledged == False).label("active_count"), func.count() .filter(Alarm.severity == "critical", Alarm.is_acknowledged == False) .label("critical_count"), func.count() .filter(Alarm.severity == "warning", Alarm.is_acknowledged == False) .label("warning_count"), ).where(Alarm.tenant_id == tenant_id) result = await db.execute(stmt) row = result.one() return { "active_count": row.active_count, "critical_count": row.critical_count, "warning_count": row.warning_count, } @router.post("/api/{tenant_id}/alarms/{alarm_id}/acknowledge") async def acknowledge_alarm( tenant_id: str = Path(...), alarm_id: UUID = Path(...), current_user: TokenData = Depends(require_auth), db: AsyncSession = Depends(get_db), ): verify_tenant_access(tenant_id, current_user) stmt = select(Alarm).where(Alarm.id == alarm_id, Alarm.tenant_id == tenant_id) result = await db.execute(stmt) alarm = result.scalar_one_or_none() if not alarm: raise HTTPException(status_code=404, detail="알람을 찾을 수 없습니다.") if alarm.is_acknowledged: raise HTTPException(status_code=400, detail="이미 확인된 알람입니다.") alarm.is_acknowledged = True alarm.acknowledged_by = UUID(current_user.user_id) alarm.acknowledged_at = datetime.now(timezone.utc) await db.commit() await db.refresh(alarm) db.expire_all() stmt2 = ( select(Alarm) .options(selectinload(Alarm.equipment_part), selectinload(Alarm.machine)) .where(Alarm.id == alarm_id) ) alarm = (await db.execute(stmt2)).scalar_one() return _alarm_to_dict(alarm) async def generate_alarms_for_part( db: AsyncSession, tenant_id: str, part: EquipmentPart, counter: PartCounter, ): pct = float(counter.lifecycle_pct or 0) threshold = float(part.alarm_threshold or 90) if pct < threshold: return if pct >= 100: alarm_type = "critical" severity = "critical" message = f"{part.name} 수명 {pct:.1f}% — 즉시 교체 필요" else: alarm_type = "threshold" severity = "warning" message = f"{part.name} 수명 {pct:.1f}% — 교체 예정 (기준 {threshold:.0f}%)" existing = await db.execute( select(Alarm).where( Alarm.tenant_id == tenant_id, Alarm.equipment_part_id == part.id, Alarm.alarm_type == alarm_type, Alarm.is_acknowledged == False, ) ) if existing.scalar_one_or_none(): return alarm = Alarm( tenant_id=tenant_id, equipment_part_id=part.id, machine_id=part.machine_id, alarm_type=alarm_type, severity=severity, message=message, lifecycle_pct_at_trigger=pct, ) db.add(alarm) async def generate_alarms_for_inspection( db: AsyncSession, tenant_id: str, session: InspectionSession, fail_records: list, template_name: str, ): if not fail_records: return fail_count = len(fail_records) severity = "critical" if fail_count >= 3 else "warning" for record, item_name, spec_info in fail_records: message = ( f"검사 '{template_name}'의 '{item_name}' 항목이 불합격입니다 ({spec_info})" ) if len(message) > 500: message = message[:497] + "..." alarm = Alarm( tenant_id=tenant_id, inspection_session_id=session.id, alarm_type="inspection_fail", severity=severity, message=message, ) db.add(alarm)