feat: Phase 0-2 complete — auth, machines, equipment parts with full CRUD

Multi-tenant factory inspection system (SpiFox, Enkid, Alpet):
- FastAPI backend with JWT auth, PostgreSQL (asyncpg)
- Next.js 16 frontend with App Router, SWR data fetching
- Machines CRUD with equipment parts management
- Part lifecycle tracking (hours/count/date) with counters
- Partial unique index for soft-delete support
- 24 pytest tests passing, E2E verified

Co-Authored-By: Claude Opus 4 <noreply@anthropic.com>
This commit is contained in:
Johngreen
2026-02-10 12:05:22 +09:00
commit ab2a3e35b2
75 changed files with 13327 additions and 0 deletions

16
src/auth/__init__.py Normal file
View File

@@ -0,0 +1,16 @@
from src.auth.jwt_handler import create_access_token, decode_access_token
from src.auth.password import hash_password, verify_password
from src.auth.dependencies import get_current_user, require_auth, require_superadmin
from src.auth.router import router as auth_router, admin_router as auth_admin_router
__all__ = [
"create_access_token",
"decode_access_token",
"hash_password",
"verify_password",
"get_current_user",
"require_auth",
"require_superadmin",
"auth_router",
"auth_admin_router",
]

69
src/auth/dependencies.py Normal file
View File

@@ -0,0 +1,69 @@
from typing import Optional
from fastapi import Depends, HTTPException, Request
from sqlalchemy.ext.asyncio import AsyncSession
from src.database.config import get_db
from src.auth.jwt_handler import decode_access_token
from src.auth.models import TokenData
from src.auth import service as auth_service
async def get_current_user(
request: Request,
db: AsyncSession = Depends(get_db),
) -> Optional[TokenData]:
auth_header = request.headers.get("Authorization")
if not auth_header or not auth_header.startswith("Bearer "):
return None
token = auth_header.split(" ", 1)[1]
payload = decode_access_token(token)
if not payload:
return None
user = await auth_service.get_user_by_id(db, payload.get("user_id", ""))
if not user or not bool(user.is_active):
return None
return TokenData(
user_id=str(user.id),
email=str(user.email),
role=str(user.role),
tenant_id=str(user.tenant_id) if user.tenant_id is not None else None,
)
async def require_auth(
current_user: Optional[TokenData] = Depends(get_current_user),
) -> TokenData:
if not current_user:
raise HTTPException(status_code=401, detail="인증이 필요합니다.")
return current_user
async def require_superadmin(
current_user: TokenData = Depends(require_auth),
) -> TokenData:
if current_user.role != "superadmin":
raise HTTPException(status_code=403, detail="관리자 권한이 필요합니다.")
return current_user
def verify_tenant_access(tenant_id: str, current_user: TokenData) -> None:
if current_user.role == "superadmin":
return
if current_user.tenant_id != tenant_id:
raise HTTPException(
status_code=403, detail="해당 테넌트에 대한 접근 권한이 없습니다."
)
class TenantAccessChecker:
async def __call__(
self,
tenant_id: str,
current_user: TokenData = Depends(require_auth),
) -> TokenData:
verify_tenant_access(tenant_id, current_user)
return current_user

23
src/auth/jwt_handler.py Normal file
View File

@@ -0,0 +1,23 @@
import os
from datetime import datetime, timedelta, timezone
from typing import Optional
import jwt
SECRET_KEY = os.getenv("JWT_SECRET_KEY", "your-super-secret-key")
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_HOURS = 24
def create_access_token(data: dict) -> str:
to_encode = data.copy()
expire = datetime.now(timezone.utc) + timedelta(hours=ACCESS_TOKEN_EXPIRE_HOURS)
to_encode.update({"exp": expire})
return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
def decode_access_token(token: str) -> Optional[dict]:
try:
return jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
except (jwt.ExpiredSignatureError, jwt.InvalidTokenError):
return None

46
src/auth/models.py Normal file
View File

@@ -0,0 +1,46 @@
from typing import Optional
from pydantic import BaseModel
class UserLogin(BaseModel):
email: str
password: str
class UserCreate(BaseModel):
email: str
password: str
name: str
role: str = "user"
tenant_id: Optional[str] = None
class UserUpdate(BaseModel):
name: Optional[str] = None
role: Optional[str] = None
is_active: Optional[bool] = None
class UserResponse(BaseModel):
id: str
email: str
name: str
role: str
tenant_id: Optional[str] = None
is_active: bool
model_config = {"from_attributes": True}
class Token(BaseModel):
access_token: str
token_type: str = "bearer"
user: UserResponse
class TokenData(BaseModel):
user_id: str
email: str
role: str
tenant_id: Optional[str] = None

9
src/auth/password.py Normal file
View File

@@ -0,0 +1,9 @@
import bcrypt
def hash_password(password: str) -> str:
return bcrypt.hashpw(password.encode("utf-8"), bcrypt.gensalt()).decode("utf-8")
def verify_password(password: str, hashed: str) -> bool:
return bcrypt.checkpw(password.encode("utf-8"), hashed.encode("utf-8"))

102
src/auth/router.py Normal file
View File

@@ -0,0 +1,102 @@
from typing import List
from fastapi import APIRouter, HTTPException, Response, Depends
from sqlalchemy.ext.asyncio import AsyncSession
from src.database.config import get_db
from src.auth.models import UserLogin, UserCreate, UserResponse, Token, TokenData
from src.auth import service as auth_service
from src.auth.dependencies import require_auth, require_superadmin
router = APIRouter(prefix="/api/auth", tags=["auth"])
admin_router = APIRouter(prefix="/api/admin", tags=["admin"])
@router.post("/login", response_model=Token)
async def login(
credentials: UserLogin,
response: Response,
db: AsyncSession = Depends(get_db),
):
"""로그인 - 이메일/비밀번호로 JWT 토큰 발급"""
result = await auth_service.login(db, credentials.email, credentials.password)
if not result:
raise HTTPException(
status_code=401, detail="이메일 또는 비밀번호가 올바르지 않습니다."
)
response.set_cookie(
key="access_token",
value=result.access_token,
httponly=True,
max_age=86400,
samesite="lax",
)
return result
@router.post("/logout")
async def logout(response: Response):
"""로그아웃"""
response.delete_cookie(key="access_token")
return {"status": "success", "message": "로그아웃되었습니다."}
@router.get("/me", response_model=UserResponse)
async def get_current_user_info(
current_user: TokenData = Depends(require_auth),
db: AsyncSession = Depends(get_db),
):
"""현재 로그인한 사용자 정보"""
user = await auth_service.get_user_by_id(db, current_user.user_id)
if not user:
raise HTTPException(status_code=404, detail="사용자를 찾을 수 없습니다.")
return UserResponse(
id=str(user.id),
email=str(user.email),
name=str(user.name),
role=str(user.role),
tenant_id=str(user.tenant_id) if user.tenant_id is not None else None,
is_active=bool(user.is_active),
)
@admin_router.get("/users", response_model=List[UserResponse])
async def list_all_users(
current_user: TokenData = Depends(require_superadmin),
db: AsyncSession = Depends(get_db),
):
"""전체 사용자 목록 (superadmin 전용)"""
users = await auth_service.list_users(db)
return [
UserResponse(
id=str(u.id),
email=str(u.email),
name=str(u.name),
role=str(u.role),
tenant_id=str(u.tenant_id) if u.tenant_id is not None else None,
is_active=bool(u.is_active),
)
for u in users
]
@admin_router.post("/users", response_model=UserResponse)
async def create_user(
user_data: UserCreate,
current_user: TokenData = Depends(require_superadmin),
db: AsyncSession = Depends(get_db),
):
"""사용자 생성 (superadmin 전용)"""
try:
user = await auth_service.create_user(db, user_data)
return UserResponse(
id=str(user.id),
email=str(user.email),
name=str(user.name),
role=str(user.role),
tenant_id=str(user.tenant_id) if user.tenant_id is not None else None,
is_active=bool(user.is_active),
)
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))

75
src/auth/service.py Normal file
View File

@@ -0,0 +1,75 @@
import uuid
from typing import Optional, List
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from src.database.models import User
from src.auth.jwt_handler import create_access_token
from src.auth.password import hash_password, verify_password
from src.auth.models import UserCreate, UserResponse, Token, TokenData
async def login(db: AsyncSession, email: str, password: str) -> Optional[Token]:
user = await get_user_by_email(db, email)
if not user or not verify_password(password, str(user.password_hash)):
return None
if not bool(user.is_active):
return None
token_data = {
"user_id": str(user.id),
"email": str(user.email),
"role": str(user.role),
"tenant_id": str(user.tenant_id) if user.tenant_id is not None else None,
}
access_token = create_access_token(token_data)
return Token(
access_token=access_token,
user=UserResponse(
id=str(user.id),
email=str(user.email),
name=str(user.name),
role=str(user.role),
tenant_id=str(user.tenant_id) if user.tenant_id is not None else None,
is_active=bool(user.is_active),
),
)
async def get_user_by_id(db: AsyncSession, user_id: str) -> Optional[User]:
result = await db.execute(select(User).where(User.id == user_id))
return result.scalar_one_or_none()
async def get_user_by_email(db: AsyncSession, email: str) -> Optional[User]:
result = await db.execute(select(User).where(User.email == email))
return result.scalar_one_or_none()
async def create_user(db: AsyncSession, data: UserCreate) -> User:
existing = await get_user_by_email(db, data.email)
if existing:
raise ValueError(f"Email already exists: {data.email}")
user = User(
id=uuid.uuid4(),
email=data.email,
password_hash=hash_password(data.password),
name=data.name,
role=data.role,
tenant_id=data.tenant_id,
)
db.add(user)
await db.commit()
await db.refresh(user)
return user
async def list_users(db: AsyncSession, tenant_id: Optional[str] = None) -> List[User]:
stmt = select(User)
if tenant_id:
stmt = stmt.where(User.tenant_id == tenant_id)
result = await db.execute(stmt.order_by(User.created_at.desc()))
return list(result.scalars().all())