Files
factoryOps-v2/src/auth/dependencies.py
Johngreen ab2a3e35b2 feat: Phase 0-2 complete — auth, machines, equipment parts with full CRUD
Multi-tenant factory inspection system (SpiFox, Enkid, Alpet):
- FastAPI backend with JWT auth, PostgreSQL (asyncpg)
- Next.js 16 frontend with App Router, SWR data fetching
- Machines CRUD with equipment parts management
- Part lifecycle tracking (hours/count/date) with counters
- Partial unique index for soft-delete support
- 24 pytest tests passing, E2E verified

Co-Authored-By: Claude Opus 4 <noreply@anthropic.com>
2026-02-10 12:05:22 +09:00

70 lines
2.0 KiB
Python

from typing import Optional
from fastapi import Depends, HTTPException, Request
from sqlalchemy.ext.asyncio import AsyncSession
from src.database.config import get_db
from src.auth.jwt_handler import decode_access_token
from src.auth.models import TokenData
from src.auth import service as auth_service
async def get_current_user(
request: Request,
db: AsyncSession = Depends(get_db),
) -> Optional[TokenData]:
auth_header = request.headers.get("Authorization")
if not auth_header or not auth_header.startswith("Bearer "):
return None
token = auth_header.split(" ", 1)[1]
payload = decode_access_token(token)
if not payload:
return None
user = await auth_service.get_user_by_id(db, payload.get("user_id", ""))
if not user or not bool(user.is_active):
return None
return TokenData(
user_id=str(user.id),
email=str(user.email),
role=str(user.role),
tenant_id=str(user.tenant_id) if user.tenant_id is not None else None,
)
async def require_auth(
current_user: Optional[TokenData] = Depends(get_current_user),
) -> TokenData:
if not current_user:
raise HTTPException(status_code=401, detail="인증이 필요합니다.")
return current_user
async def require_superadmin(
current_user: TokenData = Depends(require_auth),
) -> TokenData:
if current_user.role != "superadmin":
raise HTTPException(status_code=403, detail="관리자 권한이 필요합니다.")
return current_user
def verify_tenant_access(tenant_id: str, current_user: TokenData) -> None:
if current_user.role == "superadmin":
return
if current_user.tenant_id != tenant_id:
raise HTTPException(
status_code=403, detail="해당 테넌트에 대한 접근 권한이 없습니다."
)
class TenantAccessChecker:
async def __call__(
self,
tenant_id: str,
current_user: TokenData = Depends(require_auth),
) -> TokenData:
verify_tenant_access(tenant_id, current_user)
return current_user