Files
factoryOps-v2/src/api/equipment_parts.py
Johngreen 00a17c0b86
All checks were successful
Deploy to Production / deploy (push) Successful in 1m8s
feat: complete 5 GAP requirements — auto counters, inspection alarms, machine specs, responsive CSS
- GAP 2+4: auto_time/date counter auto-computation with manual update blocking
- GAP 3: auto-generate alarms on inspection completion for failed items
- GAP 1: machine spec fields (manufacturer, location, capacity, power, description)
- GAP 5: enhanced mobile responsive CSS (768px/480px breakpoints)
- Alembic migration for new columns, seed data enriched with specs and date-type parts
2026-02-10 15:45:51 +09:00

505 lines
16 KiB
Python

from datetime import datetime, timezone
from typing import Optional
from uuid import UUID
from fastapi import APIRouter, HTTPException, Depends, Path
from pydantic import BaseModel
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import selectinload
from src.database.config import get_db
from src.database.models import Machine, EquipmentPart, PartCounter, PartReplacementLog
from src.auth.models import TokenData
from src.auth.dependencies import require_auth, verify_tenant_access
from src.api.alarms import generate_alarms_for_part
router = APIRouter(tags=["equipment_parts"])
class PartCreate(BaseModel):
name: str
part_number: Optional[str] = None
category: Optional[str] = None
lifecycle_type: str # hours | count | date
lifecycle_limit: float
alarm_threshold: float = 90.0
counter_source: str = "manual" # auto_plc | auto_time | manual
installed_at: Optional[str] = None
class PartUpdate(BaseModel):
name: Optional[str] = None
part_number: Optional[str] = None
category: Optional[str] = None
lifecycle_type: Optional[str] = None
lifecycle_limit: Optional[float] = None
alarm_threshold: Optional[float] = None
counter_source: Optional[str] = None
class CounterUpdate(BaseModel):
value: float
class ReplaceRequest(BaseModel):
reason: Optional[str] = None
notes: Optional[str] = None
def _format_ts(val) -> Optional[str]:
if val is None:
return None
return val.isoformat() if hasattr(val, "isoformat") else str(val)
def _compute_auto_counter(part: EquipmentPart, counter: PartCounter) -> tuple:
source = str(part.counter_source or "manual")
lt = str(part.lifecycle_type)
limit = float(part.lifecycle_limit) if part.lifecycle_limit else 0
if source == "auto_time" and lt == "hours" and limit > 0:
ref = counter.last_reset_at or part.installed_at
if ref:
now = datetime.now(timezone.utc)
if ref.tzinfo is None:
from datetime import timezone as tz
ref = ref.replace(tzinfo=tz.utc)
elapsed_hours = (now - ref).total_seconds() / 3600
pct = round((elapsed_hours / limit) * 100, 1)
return (round(elapsed_hours, 1), pct)
if lt == "date" and limit > 0:
ref = counter.last_reset_at or part.installed_at
if ref:
now = datetime.now(timezone.utc)
if ref.tzinfo is None:
from datetime import timezone as tz
ref = ref.replace(tzinfo=tz.utc)
elapsed_days = (now - ref).total_seconds() / 86400
pct = round((elapsed_days / limit) * 100, 1)
return (round(elapsed_days, 1), pct)
return (float(counter.current_value or 0), float(counter.lifecycle_pct or 0))
def _part_to_dict(p: EquipmentPart, counter: Optional[PartCounter] = None) -> dict:
result = {
"id": str(p.id),
"tenant_id": str(p.tenant_id),
"machine_id": str(p.machine_id),
"name": str(p.name),
"part_number": str(p.part_number) if p.part_number else None,
"category": str(p.category) if p.category else None,
"lifecycle_type": str(p.lifecycle_type),
"lifecycle_limit": float(p.lifecycle_limit),
"alarm_threshold": float(p.alarm_threshold or 90.0),
"counter_source": str(p.counter_source or "manual"),
"installed_at": _format_ts(p.installed_at),
"is_active": bool(p.is_active),
"created_at": _format_ts(p.created_at),
"updated_at": _format_ts(p.updated_at),
}
if counter:
cv, pct = _compute_auto_counter(p, counter)
result["counter"] = {
"current_value": cv,
"lifecycle_pct": pct,
"last_reset_at": _format_ts(counter.last_reset_at),
"last_updated_at": _format_ts(counter.last_updated_at),
"version": int(counter.version or 0),
}
return result
async def _verify_machine(
db: AsyncSession, tenant_id: str, machine_id: UUID
) -> Machine:
stmt = select(Machine).where(
Machine.id == machine_id, Machine.tenant_id == tenant_id
)
result = await db.execute(stmt)
machine = result.scalar_one_or_none()
if not machine:
raise HTTPException(status_code=404, detail="설비를 찾을 수 없습니다.")
return machine
@router.get("/api/{tenant_id}/machines/{machine_id}/parts")
async def list_parts(
tenant_id: str = Path(...),
machine_id: UUID = Path(...),
current_user: TokenData = Depends(require_auth),
db: AsyncSession = Depends(get_db),
):
verify_tenant_access(tenant_id, current_user)
await _verify_machine(db, tenant_id, machine_id)
stmt = (
select(EquipmentPart)
.options(selectinload(EquipmentPart.counter))
.where(
EquipmentPart.machine_id == machine_id,
EquipmentPart.tenant_id == tenant_id,
EquipmentPart.is_active == True,
)
.order_by(EquipmentPart.name)
)
result = await db.execute(stmt)
parts = result.scalars().all()
return {"parts": [_part_to_dict(p, p.counter) for p in parts]}
@router.post("/api/{tenant_id}/machines/{machine_id}/parts")
async def create_part(
body: PartCreate,
tenant_id: str = Path(...),
machine_id: UUID = Path(...),
current_user: TokenData = Depends(require_auth),
db: AsyncSession = Depends(get_db),
):
verify_tenant_access(tenant_id, current_user)
await _verify_machine(db, tenant_id, machine_id)
if body.lifecycle_type not in ("hours", "count", "date"):
raise HTTPException(
status_code=400,
detail="lifecycle_type은 hours, count, date 중 하나여야 합니다.",
)
if body.counter_source not in ("auto_plc", "auto_time", "manual"):
raise HTTPException(
status_code=400,
detail="counter_source는 auto_plc, auto_time, manual 중 하나여야 합니다.",
)
dupe_stmt = select(EquipmentPart).where(
EquipmentPart.tenant_id == tenant_id,
EquipmentPart.machine_id == machine_id,
EquipmentPart.name == body.name,
EquipmentPart.is_active == True,
)
if (await db.execute(dupe_stmt)).scalar_one_or_none():
raise HTTPException(
status_code=409, detail=f"같은 이름의 부품 '{body.name}'이 이미 존재합니다."
)
installed_dt = None
if body.installed_at:
try:
installed_dt = datetime.fromisoformat(
body.installed_at.replace("Z", "+00:00")
)
except ValueError:
raise HTTPException(
status_code=400,
detail="installed_at 형식이 올바르지 않습니다. ISO 8601 형식을 사용하세요.",
)
part = EquipmentPart(
tenant_id=tenant_id,
machine_id=machine_id,
name=body.name,
part_number=body.part_number,
category=body.category,
lifecycle_type=body.lifecycle_type,
lifecycle_limit=body.lifecycle_limit,
alarm_threshold=body.alarm_threshold,
counter_source=body.counter_source,
installed_at=installed_dt,
)
db.add(part)
await db.flush()
now = datetime.now(timezone.utc)
counter = PartCounter(
tenant_id=tenant_id,
equipment_part_id=part.id,
current_value=0,
lifecycle_pct=0,
last_reset_at=installed_dt or now,
last_updated_at=now,
)
db.add(counter)
await db.commit()
await db.refresh(part)
await db.refresh(counter)
return _part_to_dict(part, counter)
@router.get("/api/{tenant_id}/parts/{part_id}")
async def get_part(
tenant_id: str = Path(...),
part_id: UUID = Path(...),
current_user: TokenData = Depends(require_auth),
db: AsyncSession = Depends(get_db),
):
verify_tenant_access(tenant_id, current_user)
stmt = (
select(EquipmentPart)
.options(selectinload(EquipmentPart.counter))
.where(EquipmentPart.id == part_id, EquipmentPart.tenant_id == tenant_id)
)
result = await db.execute(stmt)
part = result.scalar_one_or_none()
if not part:
raise HTTPException(status_code=404, detail="부품을 찾을 수 없습니다.")
return _part_to_dict(part, part.counter)
@router.put("/api/{tenant_id}/parts/{part_id}")
async def update_part(
body: PartUpdate,
tenant_id: str = Path(...),
part_id: UUID = Path(...),
current_user: TokenData = Depends(require_auth),
db: AsyncSession = Depends(get_db),
):
verify_tenant_access(tenant_id, current_user)
stmt = select(EquipmentPart).where(
EquipmentPart.id == part_id, EquipmentPart.tenant_id == tenant_id
)
result = await db.execute(stmt)
part = result.scalar_one_or_none()
if not part:
raise HTTPException(status_code=404, detail="부품을 찾을 수 없습니다.")
if body.name is not None:
dupe_stmt = select(EquipmentPart).where(
EquipmentPart.tenant_id == tenant_id,
EquipmentPart.machine_id == part.machine_id,
EquipmentPart.name == body.name,
EquipmentPart.is_active == True,
EquipmentPart.id != part_id,
)
if (await db.execute(dupe_stmt)).scalar_one_or_none():
raise HTTPException(
status_code=409,
detail=f"같은 이름의 부품 '{body.name}'이 이미 존재합니다.",
)
part.name = body.name
if body.part_number is not None:
part.part_number = body.part_number
if body.category is not None:
part.category = body.category
if body.lifecycle_type is not None:
if body.lifecycle_type not in ("hours", "count", "date"):
raise HTTPException(
status_code=400,
detail="lifecycle_type은 hours, count, date 중 하나여야 합니다.",
)
part.lifecycle_type = body.lifecycle_type
if body.lifecycle_limit is not None:
part.lifecycle_limit = body.lifecycle_limit
if body.alarm_threshold is not None:
part.alarm_threshold = body.alarm_threshold
if body.counter_source is not None:
if body.counter_source not in ("auto_plc", "auto_time", "manual"):
raise HTTPException(
status_code=400,
detail="counter_source는 auto_plc, auto_time, manual 중 하나여야 합니다.",
)
part.counter_source = body.counter_source
await db.commit()
await db.refresh(part)
return _part_to_dict(part)
@router.delete("/api/{tenant_id}/parts/{part_id}")
async def delete_part(
tenant_id: str = Path(...),
part_id: UUID = Path(...),
current_user: TokenData = Depends(require_auth),
db: AsyncSession = Depends(get_db),
):
verify_tenant_access(tenant_id, current_user)
stmt = select(EquipmentPart).where(
EquipmentPart.id == part_id, EquipmentPart.tenant_id == tenant_id
)
result = await db.execute(stmt)
part = result.scalar_one_or_none()
if not part:
raise HTTPException(status_code=404, detail="부품을 찾을 수 없습니다.")
part.is_active = False
await db.commit()
return {"status": "success", "message": "부품이 비활성화되었습니다."}
@router.put("/api/{tenant_id}/parts/{part_id}/counter")
async def update_counter(
body: CounterUpdate,
tenant_id: str = Path(...),
part_id: UUID = Path(...),
current_user: TokenData = Depends(require_auth),
db: AsyncSession = Depends(get_db),
):
verify_tenant_access(tenant_id, current_user)
stmt = (
select(EquipmentPart)
.options(selectinload(EquipmentPart.counter))
.where(
EquipmentPart.id == part_id,
EquipmentPart.tenant_id == tenant_id,
EquipmentPart.is_active == True,
)
)
result = await db.execute(stmt)
part = result.scalar_one_or_none()
if not part:
raise HTTPException(status_code=404, detail="부품을 찾을 수 없습니다.")
counter = part.counter
if not counter:
raise HTTPException(status_code=404, detail="카운터를 찾을 수 없습니다.")
source = str(part.counter_source or "manual")
lt = str(part.lifecycle_type)
if source == "auto_time" or lt == "date":
raise HTTPException(
status_code=400,
detail="자동 계산 부품(auto_time/date)은 수동으로 카운터를 업데이트할 수 없습니다.",
)
if body.value < 0:
raise HTTPException(status_code=400, detail="카운터 값은 0 이상이어야 합니다.")
now = datetime.now(timezone.utc)
counter.current_value = body.value
counter.lifecycle_pct = (
(body.value / float(part.lifecycle_limit)) * 100
if float(part.lifecycle_limit) > 0
else 0
)
counter.last_updated_at = now
counter.version = (counter.version or 0) + 1
await db.flush()
await generate_alarms_for_part(db, tenant_id, part, counter)
await db.commit()
await db.refresh(counter)
return _part_to_dict(part, counter)
@router.post("/api/{tenant_id}/parts/{part_id}/replace")
async def replace_part(
tenant_id: str = Path(...),
part_id: UUID = Path(...),
body: Optional[ReplaceRequest] = None,
current_user: TokenData = Depends(require_auth),
db: AsyncSession = Depends(get_db),
):
verify_tenant_access(tenant_id, current_user)
stmt = (
select(EquipmentPart)
.options(selectinload(EquipmentPart.counter))
.where(
EquipmentPart.id == part_id,
EquipmentPart.tenant_id == tenant_id,
EquipmentPart.is_active == True,
)
)
result = await db.execute(stmt)
part = result.scalar_one_or_none()
if not part:
raise HTTPException(status_code=404, detail="부품을 찾을 수 없습니다.")
counter = part.counter
if not counter:
raise HTTPException(status_code=404, detail="카운터를 찾을 수 없습니다.")
now = datetime.now(timezone.utc)
log = PartReplacementLog(
tenant_id=tenant_id,
equipment_part_id=part.id,
replaced_by=UUID(current_user.user_id),
replaced_at=now,
counter_at_replacement=float(counter.current_value or 0),
lifecycle_pct_at_replacement=float(counter.lifecycle_pct or 0),
reason=body.reason if body else None,
notes=body.notes if body else None,
)
db.add(log)
counter.current_value = 0
counter.lifecycle_pct = 0
counter.last_reset_at = now
counter.last_updated_at = now
counter.version = (counter.version or 0) + 1
part.installed_at = now
await db.commit()
await db.refresh(part)
await db.refresh(counter)
return {
"status": "success",
"message": "부품이 교체되었습니다.",
"part": _part_to_dict(part, counter),
}
def _replacement_to_dict(log: PartReplacementLog) -> dict:
return {
"id": str(log.id),
"equipment_part_id": str(log.equipment_part_id),
"replaced_by": str(log.replaced_by) if log.replaced_by else None,
"replaced_at": _format_ts(log.replaced_at),
"counter_at_replacement": float(log.counter_at_replacement),
"lifecycle_pct_at_replacement": float(log.lifecycle_pct_at_replacement),
"reason": str(log.reason) if log.reason else None,
"notes": str(log.notes) if log.notes else None,
}
@router.get("/api/{tenant_id}/parts/{part_id}/replacements")
async def list_replacements(
tenant_id: str = Path(...),
part_id: UUID = Path(...),
current_user: TokenData = Depends(require_auth),
db: AsyncSession = Depends(get_db),
):
verify_tenant_access(tenant_id, current_user)
part_stmt = select(EquipmentPart).where(
EquipmentPart.id == part_id, EquipmentPart.tenant_id == tenant_id
)
part = (await db.execute(part_stmt)).scalar_one_or_none()
if not part:
raise HTTPException(status_code=404, detail="부품을 찾을 수 없습니다.")
stmt = (
select(PartReplacementLog)
.where(
PartReplacementLog.equipment_part_id == part_id,
PartReplacementLog.tenant_id == tenant_id,
)
.order_by(PartReplacementLog.replaced_at.desc())
)
result = await db.execute(stmt)
logs = result.scalars().all()
return {"replacements": [_replacement_to_dict(log) for log in logs]}