Files
factoryOps-v2/tests/test_equipment_sync.py
Johngreen 278cd9d551
All checks were successful
Deploy to Production / deploy (push) Successful in 1m37s
feat: bidirectional equipment sync with digital-twin
Add import, sync, and push capabilities between factoryOps and the
digital-twin (BaSyx AAS) backend. Includes:

- Equipment sync service with field mapping and LWW conflict resolution
- Import preview modal with already-imported detection
- Bidirectional sync (pull updates + push local changes)
- Sync history tracking via equipment_sync_history table
- Machine detail page shows sync status and change history
- Docker networking for container-to-container communication
- UI fixes: responsive layout (375px), touch targets, section spacing
- 30 test cases for sync service
2026-02-12 12:27:21 +09:00

707 lines
22 KiB
Python

import os
import json
from datetime import datetime, timezone
from unittest.mock import AsyncMock, patch, MagicMock
from uuid import uuid4
import pytest
from httpx import AsyncClient
from tests.conftest import get_auth_headers
MOCK_REMOTE_EQUIPMENT = [
{
"id": "dt-001",
"equipmentName": "Press Machine 1",
"equipmentId": "PM-001",
"model": "XM-200",
"manufacturer": "TestCorp",
"installationDate": "2024-01-15T00:00:00Z",
"location": "Bay 1",
"capacity": "200t",
"powerConsumption": "50kW",
"updatedAt": "2025-01-01T00:00:00Z",
},
{
"id": "dt-002",
"equipmentName": "Press Machine 2",
"equipmentId": "PM-002",
"model": "XM-300",
"manufacturer": "TestCorp",
"installationDate": "2024-06-01T00:00:00Z",
"location": "Bay 2",
"capacity": "300t",
"powerConsumption": "75kW",
"updatedAt": "2025-01-01T00:00:00Z",
},
]
@pytest.mark.asyncio
async def test_import_preview(client: AsyncClient, seeded_db):
headers = await get_auth_headers(client)
with patch(
"src.services.equipment_sync.EquipmentSyncService.fetch_remote_equipment",
new_callable=AsyncMock,
) as mock_fetch:
mock_fetch.return_value = MOCK_REMOTE_EQUIPMENT
with patch.dict(os.environ, {"DIGITAL_TWIN_API_URL": "http://mock-api"}):
resp = await client.get(
"/api/test-co/machines/import/preview", headers=headers
)
assert resp.status_code == 200
data = resp.json()
assert "equipment" in data
assert data["total"] == 2
assert len(data["equipment"]) == 2
assert data["equipment"][0]["id"] == "dt-001"
assert data["equipment"][0]["name"] == "Press Machine 1"
assert data["equipment"][0]["equipment_code"] == "PM-001"
assert data["equipment"][0]["already_imported"] is False
@pytest.mark.asyncio
async def test_import_preview_marks_existing_as_imported(
client: AsyncClient, seeded_db
):
headers = await get_auth_headers(client)
create_resp = await client.post(
"/api/test-co/machines",
json={"name": "Press Machine 1", "equipment_code": "PM-001"},
headers=headers,
)
machine_id = create_resp.json()["id"]
from sqlalchemy import select
from src.database.models import Machine
stmt = select(Machine).where(Machine.id == machine_id)
machine = (await seeded_db.execute(stmt)).scalar_one()
machine.external_id = "dt-001"
machine.source = "digital-twin"
await seeded_db.commit()
with patch(
"src.services.equipment_sync.EquipmentSyncService.fetch_remote_equipment",
new_callable=AsyncMock,
) as mock_fetch:
mock_fetch.return_value = MOCK_REMOTE_EQUIPMENT
with patch.dict(os.environ, {"DIGITAL_TWIN_API_URL": "http://mock-api"}):
resp = await client.get(
"/api/test-co/machines/import/preview", headers=headers
)
assert resp.status_code == 200
data = resp.json()
assert data["equipment"][0]["already_imported"] is True
assert data["equipment"][1]["already_imported"] is False
@pytest.mark.asyncio
async def test_import_equipment(client: AsyncClient, seeded_db):
headers = await get_auth_headers(client)
with patch(
"src.services.equipment_sync.EquipmentSyncService.fetch_remote_equipment",
new_callable=AsyncMock,
) as mock_fetch:
mock_fetch.return_value = MOCK_REMOTE_EQUIPMENT
with patch.dict(os.environ, {"DIGITAL_TWIN_API_URL": "http://mock-api"}):
resp = await client.post(
"/api/test-co/machines/import",
json={},
headers=headers,
)
assert resp.status_code == 200
data = resp.json()
assert data["imported_count"] == 2
assert data["skipped_count"] == 0
assert len(data["errors"]) == 0
list_resp = await client.get("/api/test-co/machines", headers=headers)
machines = list_resp.json()["machines"]
assert len(machines) == 2
assert machines[0]["source"] == "digital-twin"
assert machines[0]["external_id"] == "dt-001"
@pytest.mark.asyncio
async def test_import_equipment_with_specific_ids(client: AsyncClient, seeded_db):
headers = await get_auth_headers(client)
with patch(
"src.services.equipment_sync.EquipmentSyncService.fetch_remote_equipment",
new_callable=AsyncMock,
) as mock_fetch:
mock_fetch.return_value = MOCK_REMOTE_EQUIPMENT
with patch.dict(os.environ, {"DIGITAL_TWIN_API_URL": "http://mock-api"}):
resp = await client.post(
"/api/test-co/machines/import",
json={"external_ids": ["dt-001"]},
headers=headers,
)
assert resp.status_code == 200
data = resp.json()
assert data["imported_count"] == 1
assert data["skipped_count"] == 0
list_resp = await client.get("/api/test-co/machines", headers=headers)
machines = list_resp.json()["machines"]
assert len(machines) == 1
assert machines[0]["name"] == "Press Machine 1"
@pytest.mark.asyncio
async def test_import_skips_duplicates(client: AsyncClient, seeded_db):
headers = await get_auth_headers(client)
with patch(
"src.services.equipment_sync.EquipmentSyncService.fetch_remote_equipment",
new_callable=AsyncMock,
) as mock_fetch:
mock_fetch.return_value = MOCK_REMOTE_EQUIPMENT
with patch.dict(os.environ, {"DIGITAL_TWIN_API_URL": "http://mock-api"}):
resp1 = await client.post(
"/api/test-co/machines/import",
json={},
headers=headers,
)
assert resp1.status_code == 200
assert resp1.json()["imported_count"] == 2
with patch(
"src.services.equipment_sync.EquipmentSyncService.fetch_remote_equipment",
new_callable=AsyncMock,
) as mock_fetch:
mock_fetch.return_value = MOCK_REMOTE_EQUIPMENT
with patch.dict(os.environ, {"DIGITAL_TWIN_API_URL": "http://mock-api"}):
resp2 = await client.post(
"/api/test-co/machines/import",
json={},
headers=headers,
)
assert resp2.status_code == 200
assert resp2.json()["imported_count"] == 0
assert resp2.json()["skipped_count"] == 2
@pytest.mark.asyncio
async def test_sync_endpoint(client: AsyncClient, seeded_db):
headers = await get_auth_headers(client)
with patch(
"src.services.equipment_sync.EquipmentSyncService.fetch_remote_equipment",
new_callable=AsyncMock,
) as mock_fetch:
mock_fetch.return_value = MOCK_REMOTE_EQUIPMENT
with patch.dict(os.environ, {"DIGITAL_TWIN_API_URL": "http://mock-api"}):
await client.post(
"/api/test-co/machines/import",
json={},
headers=headers,
)
updated_equipment = [
{
"id": "dt-001",
"equipmentName": "Press Machine 1 Updated",
"equipmentId": "PM-001",
"model": "XM-200-V2",
"manufacturer": "TestCorp",
"installationDate": "2024-01-15T00:00:00Z",
"location": "Bay 1",
"capacity": "200t",
"powerConsumption": "50kW",
"updatedAt": "2025-02-01T00:00:00Z",
},
]
with patch(
"src.services.equipment_sync.EquipmentSyncService.fetch_remote_equipment",
new_callable=AsyncMock,
) as mock_fetch:
mock_fetch.return_value = updated_equipment
with patch(
"src.services.equipment_sync.EquipmentSyncService.push_to_remote",
new_callable=AsyncMock,
) as mock_push:
from src.services.equipment_sync import PushResult
mock_push.return_value = PushResult(success=True, fields_pushed=["name"])
with patch.dict(os.environ, {"DIGITAL_TWIN_API_URL": "http://mock-api"}):
resp = await client.post(
"/api/test-co/machines/sync",
headers=headers,
)
assert resp.status_code == 200
data = resp.json()
assert "pull" in data
assert "push_count" in data
assert data["pull"]["synced_count"] == 1
assert data["pull"]["fields_updated"] == 1
@pytest.mark.asyncio
async def test_machine_history_endpoint(client: AsyncClient, seeded_db):
headers = await get_auth_headers(client)
create_resp = await client.post(
"/api/test-co/machines",
json={"name": "Test Machine", "equipment_code": "TM-001"},
headers=headers,
)
machine_id = create_resp.json()["id"]
await client.put(
f"/api/test-co/machines/{machine_id}",
json={"name": "Test Machine Updated"},
headers=headers,
)
resp = await client.get(
f"/api/test-co/machines/{machine_id}/history",
headers=headers,
)
assert resp.status_code == 200
data = resp.json()
assert "history" in data
assert "total" in data
assert data["total"] >= 1
assert any(h["field_name"] == "name" for h in data["history"])
@pytest.mark.asyncio
async def test_machine_history_pagination(client: AsyncClient, seeded_db):
headers = await get_auth_headers(client)
create_resp = await client.post(
"/api/test-co/machines",
json={"name": "Test Machine", "equipment_code": "TM-001"},
headers=headers,
)
machine_id = create_resp.json()["id"]
for i in range(5):
await client.put(
f"/api/test-co/machines/{machine_id}",
json={"description": f"Update {i}"},
headers=headers,
)
resp = await client.get(
f"/api/test-co/machines/{machine_id}/history?limit=2&offset=0",
headers=headers,
)
assert resp.status_code == 200
data = resp.json()
assert len(data["history"]) == 2
assert data["total"] >= 5
@pytest.mark.asyncio
async def test_update_machine_records_history(client: AsyncClient, seeded_db):
headers = await get_auth_headers(client)
create_resp = await client.post(
"/api/test-co/machines",
json={"name": "Original Name", "equipment_code": "OM-001"},
headers=headers,
)
machine_id = create_resp.json()["id"]
update_resp = await client.put(
f"/api/test-co/machines/{machine_id}",
json={
"name": "Updated Name",
"equipment_code": "UM-001",
"model": "Model-X",
},
headers=headers,
)
assert update_resp.status_code == 200
history_resp = await client.get(
f"/api/test-co/machines/{machine_id}/history",
headers=headers,
)
history = history_resp.json()["history"]
assert len(history) >= 3
name_change = next((h for h in history if h["field_name"] == "name"), None)
assert name_change is not None
assert name_change["old_value"] == "Original Name"
assert name_change["new_value"] == "Updated Name"
assert name_change["change_source"] == "local"
code_change = next(
(h for h in history if h["field_name"] == "equipment_code"), None
)
assert code_change is not None
assert code_change["old_value"] == "OM-001"
assert code_change["new_value"] == "UM-001"
@pytest.mark.asyncio
async def test_sync_without_config_returns_503(client: AsyncClient, seeded_db):
headers = await get_auth_headers(client)
with patch.dict(os.environ, {}, clear=False):
if "DIGITAL_TWIN_API_URL" in os.environ:
del os.environ["DIGITAL_TWIN_API_URL"]
resp = await client.post(
"/api/test-co/machines/sync",
headers=headers,
)
assert resp.status_code == 503
assert "DIGITAL_TWIN_API_URL" in resp.json()["detail"]
@pytest.mark.asyncio
async def test_import_without_config_returns_503(client: AsyncClient, seeded_db):
headers = await get_auth_headers(client)
with patch.dict(os.environ, {}, clear=False):
if "DIGITAL_TWIN_API_URL" in os.environ:
del os.environ["DIGITAL_TWIN_API_URL"]
resp = await client.post(
"/api/test-co/machines/import",
json={},
headers=headers,
)
assert resp.status_code == 503
assert "DIGITAL_TWIN_API_URL" in resp.json()["detail"]
@pytest.mark.asyncio
async def test_import_preview_without_config_returns_503(
client: AsyncClient, seeded_db
):
headers = await get_auth_headers(client)
with patch.dict(os.environ, {}, clear=False):
if "DIGITAL_TWIN_API_URL" in os.environ:
del os.environ["DIGITAL_TWIN_API_URL"]
resp = await client.get(
"/api/test-co/machines/import/preview",
headers=headers,
)
assert resp.status_code == 503
assert "DIGITAL_TWIN_API_URL" in resp.json()["detail"]
@pytest.mark.asyncio
async def test_tenant_isolation_sync(client: AsyncClient, seeded_db):
headers = await get_auth_headers(client)
with patch(
"src.services.equipment_sync.EquipmentSyncService.fetch_remote_equipment",
new_callable=AsyncMock,
) as mock_fetch:
mock_fetch.return_value = MOCK_REMOTE_EQUIPMENT
with patch.dict(os.environ, {"DIGITAL_TWIN_API_URL": "http://mock-api"}):
resp_test = await client.post(
"/api/test-co/machines/import",
json={},
headers=headers,
)
assert resp_test.status_code == 200
assert resp_test.json()["imported_count"] == 2
list_test = await client.get("/api/test-co/machines", headers=headers)
list_other = await client.get("/api/other-co/machines", headers=headers)
assert len(list_test.json()["machines"]) == 2
assert len(list_other.json()["machines"]) == 0
@pytest.mark.asyncio
async def test_sync_lww_logic_remote_wins(client: AsyncClient, seeded_db):
headers = await get_auth_headers(client)
with patch(
"src.services.equipment_sync.EquipmentSyncService.fetch_remote_equipment",
new_callable=AsyncMock,
) as mock_fetch:
mock_fetch.return_value = MOCK_REMOTE_EQUIPMENT
with patch.dict(os.environ, {"DIGITAL_TWIN_API_URL": "http://mock-api"}):
await client.post(
"/api/test-co/machines/import",
json={"external_ids": ["dt-001"]},
headers=headers,
)
machines_resp = await client.get("/api/test-co/machines", headers=headers)
machine_id = machines_resp.json()["machines"][0]["id"]
await client.put(
f"/api/test-co/machines/{machine_id}",
json={"name": "Local Update"},
headers=headers,
)
remote_updated = [
{
"id": "dt-001",
"equipmentName": "Remote Update",
"equipmentId": "PM-001",
"model": "XM-200",
"manufacturer": "TestCorp",
"installationDate": "2024-01-15T00:00:00Z",
"location": "Bay 1",
"capacity": "200t",
"powerConsumption": "50kW",
"updatedAt": "2025-02-11T12:00:00Z",
},
]
with patch(
"src.services.equipment_sync.EquipmentSyncService.fetch_remote_equipment",
new_callable=AsyncMock,
) as mock_fetch:
mock_fetch.return_value = remote_updated
with patch.dict(os.environ, {"DIGITAL_TWIN_API_URL": "http://mock-api"}):
sync_resp = await client.post(
"/api/test-co/machines/sync",
headers=headers,
)
assert sync_resp.status_code == 200
assert sync_resp.json()["pull"]["synced_count"] == 1
updated_machine = await client.get(
f"/api/test-co/machines/{machine_id}",
headers=headers,
)
assert updated_machine.json()["name"] == "Remote Update"
@pytest.mark.asyncio
async def test_sync_lww_logic_local_wins(client: AsyncClient, seeded_db):
headers = await get_auth_headers(client)
with patch(
"src.services.equipment_sync.EquipmentSyncService.fetch_remote_equipment",
new_callable=AsyncMock,
) as mock_fetch:
mock_fetch.return_value = MOCK_REMOTE_EQUIPMENT
with patch.dict(os.environ, {"DIGITAL_TWIN_API_URL": "http://mock-api"}):
await client.post(
"/api/test-co/machines/import",
json={"external_ids": ["dt-001"]},
headers=headers,
)
machines_resp = await client.get("/api/test-co/machines", headers=headers)
machine_id = machines_resp.json()["machines"][0]["id"]
await client.put(
f"/api/test-co/machines/{machine_id}",
json={"name": "Local Update"},
headers=headers,
)
remote_old = [
{
"id": "dt-001",
"equipmentName": "Remote Old",
"equipmentId": "PM-001",
"model": "XM-200",
"manufacturer": "TestCorp",
"installationDate": "2024-01-15T00:00:00Z",
"location": "Bay 1",
"capacity": "200t",
"powerConsumption": "50kW",
"updatedAt": "2025-01-01T00:00:00Z",
},
]
with patch(
"src.services.equipment_sync.EquipmentSyncService.fetch_remote_equipment",
new_callable=AsyncMock,
) as mock_fetch:
mock_fetch.return_value = remote_old
with patch.dict(os.environ, {"DIGITAL_TWIN_API_URL": "http://mock-api"}):
sync_resp = await client.post(
"/api/test-co/machines/sync",
headers=headers,
)
assert sync_resp.status_code == 200
assert sync_resp.json()["pull"]["synced_count"] == 0
unchanged_machine = await client.get(
f"/api/test-co/machines/{machine_id}",
headers=headers,
)
assert unchanged_machine.json()["name"] == "Local Update"
@pytest.mark.asyncio
async def test_unauthenticated_import_preview(client: AsyncClient, seeded_db):
resp = await client.get("/api/test-co/machines/import/preview")
assert resp.status_code == 401
@pytest.mark.asyncio
async def test_unauthenticated_import(client: AsyncClient, seeded_db):
resp = await client.post("/api/test-co/machines/import", json={})
assert resp.status_code == 401
@pytest.mark.asyncio
async def test_unauthenticated_sync(client: AsyncClient, seeded_db):
resp = await client.post("/api/test-co/machines/sync")
assert resp.status_code == 401
@pytest.mark.asyncio
async def test_tenant_access_denied_import_preview(client: AsyncClient, seeded_db):
headers = await get_auth_headers(client, email="admin@test-co.com")
resp = await client.get("/api/other-co/machines/import/preview", headers=headers)
assert resp.status_code == 403
@pytest.mark.asyncio
async def test_tenant_access_denied_import(client: AsyncClient, seeded_db):
headers = await get_auth_headers(client, email="admin@test-co.com")
resp = await client.post(
"/api/other-co/machines/import",
json={},
headers=headers,
)
assert resp.status_code == 403
@pytest.mark.asyncio
async def test_tenant_access_denied_sync(client: AsyncClient, seeded_db):
headers = await get_auth_headers(client, email="admin@test-co.com")
resp = await client.post("/api/other-co/machines/sync", headers=headers)
assert resp.status_code == 403
@pytest.mark.asyncio
async def test_history_endpoint_empty_machine(client: AsyncClient, seeded_db):
headers = await get_auth_headers(client)
create_resp = await client.post(
"/api/test-co/machines",
json={"name": "No Changes Machine"},
headers=headers,
)
machine_id = create_resp.json()["id"]
resp = await client.get(
f"/api/test-co/machines/{machine_id}/history",
headers=headers,
)
assert resp.status_code == 200
data = resp.json()
assert data["total"] == 0
assert len(data["history"]) == 0
@pytest.mark.asyncio
async def test_history_endpoint_nonexistent_machine(client: AsyncClient, seeded_db):
headers = await get_auth_headers(client)
fake_id = str(uuid4())
resp = await client.get(
f"/api/test-co/machines/{fake_id}/history",
headers=headers,
)
assert resp.status_code == 200
data = resp.json()
assert data["total"] == 0
assert len(data["history"]) == 0
@pytest.mark.asyncio
async def test_sync_version_increments(client: AsyncClient, seeded_db):
headers = await get_auth_headers(client)
with patch(
"src.services.equipment_sync.EquipmentSyncService.fetch_remote_equipment",
new_callable=AsyncMock,
) as mock_fetch:
mock_fetch.return_value = MOCK_REMOTE_EQUIPMENT
with patch.dict(os.environ, {"DIGITAL_TWIN_API_URL": "http://mock-api"}):
await client.post(
"/api/test-co/machines/import",
json={"external_ids": ["dt-001"]},
headers=headers,
)
machines_resp = await client.get("/api/test-co/machines", headers=headers)
machine = machines_resp.json()["machines"][0]
assert machine["sync_version"] == 1
updated_equipment = [
{
"id": "dt-001",
"equipmentName": "Press Machine 1 Updated",
"equipmentId": "PM-001",
"model": "XM-200-V2",
"manufacturer": "TestCorp",
"installationDate": "2024-01-15T00:00:00Z",
"location": "Bay 1",
"capacity": "200t",
"powerConsumption": "50kW",
"updatedAt": "2025-02-11T12:00:00Z",
},
]
with patch(
"src.services.equipment_sync.EquipmentSyncService.fetch_remote_equipment",
new_callable=AsyncMock,
) as mock_fetch:
mock_fetch.return_value = updated_equipment
with patch.dict(os.environ, {"DIGITAL_TWIN_API_URL": "http://mock-api"}):
await client.post(
"/api/test-co/machines/sync",
headers=headers,
)
updated_resp = await client.get("/api/test-co/machines", headers=headers)
updated_machine = updated_resp.json()["machines"][0]
assert updated_machine["sync_version"] == 2