Ana içeriğe geç

Core Modüller

app/core/ altındaki modüller, tüm feature'ların ortak kullandığı altyapı servislerini sağlar.

Config (settings.py)

Pydantic Settings ile ortam değişkenleri yönetimi:

# Temel
ENVIRONMENT = "development" # veya "production"
SECRET_KEY = "..." # JWT imzalama (min 32 karakter)

# Veritabanı
DATABASE_URL = "postgresql+asyncpg://postgres:pass@postgres:5432/energy_monitoring"

# Redis
REDIS_URL = "redis://redis:6379/0"

# MQTT
MQTT_BROKER_HOST = "emqx"
MQTT_BROKER_PORT = 1883
MQTT_EXTERNAL_HOST = "enerji.kepmark.com" # ESP32'ye iletilecek
MQTT_EXTERNAL_PORT = 8883
MQTT_EXTERNAL_TLS = True

# TimescaleDB
TIMESCALE_RETENTION_DAYS = 730 # 2 yıl
TIMESCALE_COMPRESS_AFTER_DAYS = 90 # 3 ay sonra sıkıştır

Database

Async Engine

engine = create_async_engine(
DATABASE_URL,
pool_size=5, # Bağlantı havuzu boyutu
max_overflow=10, # Ek bağlantı limiti
pool_timeout=10, # Bağlantı bekleme süresi (saniye)
pool_pre_ping=True, # Bağlantı sağlığı kontrolü
)

Session Yönetimi

AsyncSessionLocal = sessionmaker(
engine,
class_=AsyncSession,
expire_on_commit=False, # Commit sonrası lazy loading yok
autocommit=False,
autoflush=False,
)

# Dependency injection
async def get_session() -> AsyncSession:
async with AsyncSessionLocal() as session:
yield session

Models (2220 satır)

65+ SQLAlchemy modeli, UUID primary key ve timestamp mixin ile. Modellerin çoğunluğu core/database/models.py'de, Zigbee modelleri features/zigbee/models.py'de tanımlıdır.

not

Gateway = ModbusGateway alias'ı mevcuttur. Subscriber ve diğer modüller bu alias'ı yoğun kullanır (legacy uyumluluk).

Zigbee modelleri (features/zigbee/models.py):

  • ZigbeeGateway, ZigbeeDevice, ZigbeeGatewayShareToken
  • ZigbeeEnergyMeasurement, ZigbeeSensorMeasurement
  • PendingZigbeeClaim, ZigbeeClaimAudit

Detaylı şema için Veritabanı Şeması sayfasına bakın.

Security

JWT Token

# Access Token (30 dakika)
{"exp": ..., "sub": "user-uuid", "tenant_id": "tenant-uuid", "type": "access"}

# Refresh Token (7 gün)
{"exp": ..., "sub": "user-uuid", "tenant_id": "tenant-uuid", "type": "refresh"}

RBAC Permissions

56 granüler izin tanımı (action:resource formatında):

  • admin — 56 izin (tüm izinler)
  • operator — 26 izin (okuma + yazma + kontrol, silme yok)
  • viewer — 14 izin (okuma + rapor export)

Logging (core/logging/config.py)

Structlog tabanlı yapılandırılmış loglama:

  • configure_logging()main.py startup'ta çağrılır
  • JSON formatında log çıktısı (production)
  • Renkli konsol çıktısı (development)
  • Request ID korelasyonu

Modbus (core/modbus/poller.py)

Modbus TCP polling servisi:

  • IP-based gateway'ler için doğrudan Modbus TCP bağlantısı
  • MQTT gateway'lerden bağımsız, backend tarafından yönetilen polling
  • modbus-poller Docker servisi ile entegre çalışır

Redis Cache

# Async Redis bağlantı havuzu
redis = aioredis.from_url(REDIS_URL)

# Decorator ile cache
@cached(ttl=300) # 5 dakika
async def get_device(device_id: UUID):
...

# Pattern bazlı invalidation
await invalidate_cache("device:*")

MQTT Client

Singleton pattern, multi-worker desteği:

  • Her worker kendi PID'si ile benzersiz client ID alır
  • Backpressure: 10 concurrent message processing (semaphore)
  • Payload decoding: JSON → MessagePack → Partial JSON recovery
  • TLS desteği: Custom CA certificate

MQTT Subscriber (~3065 satır)

15 topic subscription ile tüm cihaz verilerini işler:

# Bootstrap
"/bootstrap/+/hello" → Bootstrap handler
"/bootstrap/+/claim_ready" → Claim ready handler

# Gateway reported (ayrı topic'ler, wildcard değil)
"tenant/+/gw/+/reported/status" → Gateway status report
"tenant/+/gw/+/reported/config" → Gateway config report
"tenant/+/gw/+/reported/rawconfig" → Rawconfig ACK

# Alarm events (spesifik, wildcard değil)
"tenant/+/gw/+/evt/alarm" → Alarm event handler

# Telemetri (5 farklı pattern)
"tenant/+/gw/+/data/device/+/+" → Per-group telemetry (Modbus V2)
"tenant/+/gw/+/data/device/+" → Device data (groupId'siz)
"tenant/+/gw/+/data/raw/+" → Raw Modbus data
"tenant/+/gw/+/data/raw/+/+" → Raw Modbus data (groupId'li)
"tenant/+/gw/+/data/+/timeseries" → Modbus gateway timeseries
"tenant/+/gw/+/data/timeseries" → ESP32 timeseries
"tenant/+/gw/+/telemetry" → ESP32 ana telemetri (legacy)
"tenant/+/gw/+/telemetry/+" → ESP32 cihaz bazlı telemetri (legacy)

# Ownership
"zeus/devices/+/events/ownership_ack" → Ownership ACK

Measurement Pipeline

Decode → Normalize (W→kW) → Validate → DB Upsert → Alarm Evaluate → WebSocket Broadcast

W→kW Normalization: POWER_FIELDS_W_TO_KW listesindeki alanlar (power_active_total, power_reactive_total vb.) otomatik olarak Watt'tan kW'a dönüştürülür (_normalize_power_w_to_kw()).

Güvenlik Önlemleri

MQTT topic injection koruması:

  • _SAFE_MAC_RE — MAC adresi format doğrulama (regex)
  • _MQTT_UNSAFE_CHARS — Wildcard (+, #) ve path traversal karakter reddi
  • _is_safe_topic_segment() — Her topic segmenti doğrulanır

WebSocket Hub

Kiracı izolasyonlu gerçek zamanlı iletişim:

WS /ws/telemetry?token=JWT&devices=uuid1,uuid2
  • TenantRoom: Her kiracı ayrı odada
  • Broadcast tipleri: measurement, device_status, gateway_status, alarm
  • Keepalive: Client ping → server pong

Celery

Redis tabanlı task queue:

  • Broker: Redis 6379/1
  • Result Backend: Redis 6379/2
  • Concurrency: 2 worker
  • Queues: default, ota, cleanup
  • Hard timeout: 600s (10 dakika)
  • Soft timeout: 540s (graceful shutdown)

7 task modülü ve 11 periyodik görev. Detay için Celery Görevleri sayfasına bakın.