Core Modüller
app/core/ altındaki modüller, tüm feature'ların ortak kullandığı altyapı servislerini sağlar.
Config (settings.py)
Pydantic Settings ile ortam değişkenleri yönetimi:
# Temel
ENVIRONMENT = "development" # veya "production"
SECRET_KEY = "..." # JWT imzalama (min 32 karakter)
# Veritabanı
DATABASE_URL = "postgresql+asyncpg://postgres:pass@postgres:5432/energy_monitoring"
# Redis
REDIS_URL = "redis://redis:6379/0"
# MQTT
MQTT_BROKER_HOST = "emqx"
MQTT_BROKER_PORT = 1883
MQTT_EXTERNAL_HOST = "enerji.kepmark.com" # ESP32'ye iletilecek
MQTT_EXTERNAL_PORT = 8883
MQTT_EXTERNAL_TLS = True
# TimescaleDB
TIMESCALE_RETENTION_DAYS = 730 # 2 yıl
TIMESCALE_COMPRESS_AFTER_DAYS = 90 # 3 ay sonra sıkıştır
Database
Async Engine
engine = create_async_engine(
DATABASE_URL,
pool_size=5, # Bağlantı havuzu boyutu
max_overflow=10, # Ek bağlantı limiti
pool_timeout=10, # Bağlantı bekleme süresi (saniye)
pool_pre_ping=True, # Bağlantı sağlığı kontrolü
)
Session Yönetimi
AsyncSessionLocal = sessionmaker(
engine,
class_=AsyncSession,
expire_on_commit=False, # Commit sonrası lazy loading yok
autocommit=False,
autoflush=False,
)
# Dependency injection
async def get_session() -> AsyncSession:
async with AsyncSessionLocal() as session:
yield session
Models (2220 satır)
65+ SQLAlchemy modeli, UUID primary key ve timestamp mixin ile. Modellerin çoğunluğu core/database/models.py'de, Zigbee modelleri features/zigbee/models.py'de tanımlıdır.
Gateway = ModbusGateway alias'ı mevcuttur. Subscriber ve diğer modüller bu alias'ı yoğun kullanır (legacy uyumluluk).
Zigbee modelleri (features/zigbee/models.py):
ZigbeeGateway,ZigbeeDevice,ZigbeeGatewayShareTokenZigbeeEnergyMeasurement,ZigbeeSensorMeasurementPendingZigbeeClaim,ZigbeeClaimAudit
Detaylı şema için Veritabanı Şeması sayfasına bakın.
Security
JWT Token
# Access Token (30 dakika)
{"exp": ..., "sub": "user-uuid", "tenant_id": "tenant-uuid", "type": "access"}
# Refresh Token (7 gün)
{"exp": ..., "sub": "user-uuid", "tenant_id": "tenant-uuid", "type": "refresh"}
RBAC Permissions
56 granüler izin tanımı (action:resource formatında):
- admin — 56 izin (tüm izinler)
- operator — 26 izin (okuma + yazma + kontrol, silme yok)
- viewer — 14 izin (okuma + rapor export)
Logging (core/logging/config.py)
Structlog tabanlı yapılandırılmış loglama:
configure_logging()—main.pystartup'ta çağrılır- JSON formatında log çıktısı (production)
- Renkli konsol çıktısı (development)
- Request ID korelasyonu
Modbus (core/modbus/poller.py)
Modbus TCP polling servisi:
- IP-based gateway'ler için doğrudan Modbus TCP bağlantısı
- MQTT gateway'lerden bağımsız, backend tarafından yönetilen polling
modbus-pollerDocker servisi ile entegre çalışır
Redis Cache
# Async Redis bağlantı havuzu
redis = aioredis.from_url(REDIS_URL)
# Decorator ile cache
@cached(ttl=300) # 5 dakika
async def get_device(device_id: UUID):
...
# Pattern bazlı invalidation
await invalidate_cache("device:*")
MQTT Client
Singleton pattern, multi-worker desteği:
- Her worker kendi PID'si ile benzersiz client ID alır
- Backpressure: 10 concurrent message processing (semaphore)
- Payload decoding: JSON → MessagePack → Partial JSON recovery
- TLS desteği: Custom CA certificate
MQTT Subscriber (~3065 satır)
15 topic subscription ile tüm cihaz verilerini işler:
# Bootstrap
"/bootstrap/+/hello" → Bootstrap handler
"/bootstrap/+/claim_ready" → Claim ready handler
# Gateway reported (ayrı topic'ler, wildcard değil)
"tenant/+/gw/+/reported/status" → Gateway status report
"tenant/+/gw/+/reported/config" → Gateway config report
"tenant/+/gw/+/reported/rawconfig" → Rawconfig ACK
# Alarm events (spesifik, wildcard değil)
"tenant/+/gw/+/evt/alarm" → Alarm event handler
# Telemetri (5 farklı pattern)
"tenant/+/gw/+/data/device/+/+" → Per-group telemetry (Modbus V2)
"tenant/+/gw/+/data/device/+" → Device data (groupId'siz)
"tenant/+/gw/+/data/raw/+" → Raw Modbus data
"tenant/+/gw/+/data/raw/+/+" → Raw Modbus data (groupId'li)
"tenant/+/gw/+/data/+/timeseries" → Modbus gateway timeseries
"tenant/+/gw/+/data/timeseries" → ESP32 timeseries
"tenant/+/gw/+/telemetry" → ESP32 ana telemetri (legacy)
"tenant/+/gw/+/telemetry/+" → ESP32 cihaz bazlı telemetri (legacy)
# Ownership
"zeus/devices/+/events/ownership_ack" → Ownership ACK
Measurement Pipeline
Decode → Normalize (W→kW) → Validate → DB Upsert → Alarm Evaluate → WebSocket Broadcast
W→kW Normalization: POWER_FIELDS_W_TO_KW listesindeki alanlar (power_active_total, power_reactive_total vb.) otomatik olarak Watt'tan kW'a dönüştürülür (_normalize_power_w_to_kw()).
Güvenlik Önlemleri
MQTT topic injection koruması:
_SAFE_MAC_RE— MAC adresi format doğrulama (regex)_MQTT_UNSAFE_CHARS— Wildcard (+,#) ve path traversal karakter reddi_is_safe_topic_segment()— Her topic segmenti doğrulanır
WebSocket Hub
Kiracı izolasyonlu gerçek zamanlı iletişim:
WS /ws/telemetry?token=JWT&devices=uuid1,uuid2
- TenantRoom: Her kiracı ayrı odada
- Broadcast tipleri: measurement, device_status, gateway_status, alarm
- Keepalive: Client ping → server pong
Celery
Redis tabanlı task queue:
- Broker: Redis 6379/1
- Result Backend: Redis 6379/2
- Concurrency: 2 worker
- Queues: default, ota, cleanup
- Hard timeout: 600s (10 dakika)
- Soft timeout: 540s (graceful shutdown)
7 task modülü ve 11 periyodik görev. Detay için Celery Görevleri sayfasına bakın.