Celery Arka Plan Görevleri
Beat Schedule (Periyodik Görevler)
| Görev | Zamanlama | Queue | Açıklama |
|---|---|---|---|
ota.check_scheduled | Her 5 dk | ota | Zamanlanmış OTA'ları tetikle |
ota.check_timeouts | Her 10 dk | ota | 30dk+ süren OTA'ları timeout yap |
alarm.check_snoozed | Her 5 dk | default | Snooze süresi biten alarm'ları reopen |
alarm.check_device_offline | Her 5 dk | default | Veri gelmeyen cihazlar için alarm |
alarm.check_battery_soh_decline | 06:00 | default | Batarya sağlığı düşüş kontrolü |
cleanup.check_unclaimed_devices | 03:00 | cleanup | Unclaimed device temizliği |
reactive_ratio.calculate_daily | 00:15 | default | Günlük reaktif oran hesaplama |
reactive_ratio.calculate_monthly | Ay 1. gün 01:00 | default | Aylık reaktif oran |
energy_summary.calculate_daily | 00:30 | default | Günlük enerji delta özeti |
rawconfig.push_all_gateways | Her 30 dk | default | Gateway config push (retained msg recovery) |
bms_identity.daily_check | 04:00 | default | BMS kimlik bilgisi kontrol |
Görev Detayları
OTA Tasks
ota.process_batch — Batch OTA orkestrasyonu
- Hard timeout: 7200s (2 saat), soft: 7000s
- Gateway'leri batch_size'a böler (default 10)
- Her batch arası batch_delay_seconds bekler (default 300s)
- Her gateway için
_initiate_single_ota()çağırır - Progress DB'ye yazılır
ota.check_scheduled — Zamanlanmış OTA tetikleme (5dk)
ota.check_timeouts — 30 dakikadan uzun süredir in_progress/downloading olan OTA'ları failed olarak işaretler
Alarm Tasks
alarm.send_notification — Alarm bildirimi gönderme
- Kanallar: Email, SMS, WhatsApp, Push
- 3 retry, 60s exponential backoff
alarm.check_device_offline — X dakika veri gelmeyen cihazları tespit eder ve device_offline alarmı oluşturur
Energy Tasks
reactive_ratio.calculate_daily — Önceki güne ait aktif/endüktif/kapasitif tüketim ve reaktif oran yüzdesi hesaplar. reactive_ratio_daily tablosuna yazar.
energy_summary.calculate_daily — İnverter (PV/load/grid/battery) ve analizör (active/reactive import/export) enerji deltalarını hesaplar. daily_energy_summaries tablosuna yazar.
Celery Broker URL Derivation
Celery worker ve beat servisleri Redis'i broker olarak kullanır. URL üretimi _derive_celery_url_from_redis helper'ı ile yapılır.
Öncelik Sırası
CELERY_BROKER_URLenv var tanımlıysa (override) doğrudan kullanılır.- Aksi halde
REDIS_URLüzerinden türetilir; veritabanı index'i0(cache) yerine1seçilir (cache anahtarlarıyla broker mesajlarının çakışmasını engellemek için).
def _derive_celery_url_from_redis(redis_url: str) -> str:
# redis://host:6379/0 → redis://host:6379/1
if redis_url.endswith("/0"):
return redis_url[:-2] + "/1"
return redis_url + "/1"
Container İçinde localhost Tehlikesi
Celery konfigürasyonunda redis://localhost:6379/1 ASLA hard-code edilmemelidir. Container içinde localhost farklı bir host'tur (kendi container'ı), Redis erişilemez ve worker başlatılamaz.
# ❌ Yanlış
broker_url = "redis://localhost:6379/1"
# ✅ Doğru
broker_url = os.getenv("CELERY_BROKER_URL") or _derive_celery_url_from_redis(
os.getenv("REDIS_URL", "redis://redis:6379/0")
)
rawconfig_republish_task (Issue #246)
Gateway konfigürasyonu (devices, templates, desired) değiştiğinde retained MQTT mesajları yeniden yayınlanır. Yüksek değişiklik frekansında broker'ı boğmamak için debounce mekanizması kullanılır.
Trigger Noktaları
- Device CRUD (POST/PUT/DELETE
/api/devices) - Template register-map güncellemesi
- Gateway config değişikliği
- ORM event hook (SQLAlchemy
after_insert,after_update,after_delete)
Redis Debounce Penceresi
key = f"rawconfig:republish:{gateway_id}"
acquired = redis.set(key, "1", nx=True, ex=5) # 5 saniye TTL
if not acquired:
return # Pencere açık, atla
SET NX EX 5 ile 5 saniyelik pencere açılır; aynı pencere içinde gelen tekrar tetikleyiciler atlanır. İlk task 5 saniye içinde toplanmış değişiklikleri tek seferde publish eder.
Payload Size Guards
| Limit | Değer | Davranış |
|---|---|---|
RAWCONFIG_SIZE_SOFT | 12KB | Warning log, publish devam eder |
RAWCONFIG_SIZE_HARD | 20KB | Chunked publish'e düş (parçalı gönderim) |
Chunked Fallback — HARD limit aşıldığında payload chunk_index/chunk_total metadata ile parçalanır; gateway tarafında reassembly yapılır. Detay için bkz. Firmware — Modbus.
energy_summary.calculate_daily (Issue #209)
Schedule
- Her gün 00:30 Europe/Istanbul timezone'unda çalışır
- Bir önceki günün ölçüm verilerini özetler
daily_energy_summariestablosuna(device_id, summary_date)UNIQUE kısıtı ile yazılır
Hesaplanan 10 Enerji Alanı
| Cihaz Tipi | Alan | Birim |
|---|---|---|
| Inverter | PV import (üretim) | kWh |
| Inverter | PV export | kWh |
| Inverter | Load (yük tüketim) | kWh |
| Inverter | Grid import | kWh |
| Inverter | Grid export | kWh |
| Inverter | Battery charge | kWh |
| Analyzer | Active import | kWh |
| Analyzer | Active export | kWh |
| Analyzer | Reactive import | kVArh |
| Analyzer | Reactive export | kVArh |
calculate_energy_deltas() Semantiği
Energy meter'ları kümülatif counter'dır (sürekli artar). Günlük tüketim:
delta = max(end_value, start_value) - start_value
- Counter wraparound (uint32 overflow) → wraparound modulus eklenir
- Counter reset (cihaz değişimi) → 0'dan başlatılır, delta
end_value - Eksik veri (cihaz offline) →
NULLyazılır, downstream rapor "veri yok" gösterir
Tablo Tipi
daily_energy_summaries standart bir PostgreSQL tablosudur (hypertable değildir). Günde cihaz başına 1 satır üretildiği için zaman serisi yoğunluğu hypertable'ı haklı kılmaz. İndeks: (device_id, summary_date DESC).
bms_identity.daily_check (Issue #212)
Schedule
- Her gün 04:00 UTC çalışır (düşük yoğunluk saati)
- BMS register grup
5.10.x(Sofar HYD 13 BMS) okunur
Akış
- Tüm aktif batarya-bağlı inverter'lar listelenir
- Her cihaz için MQTT command gönderilir:
cmd/read_grouppayload'ı{"groupId": "bms_identity"} - ESP32 gateway
read_grouphandler'ı tetiklenir (Modbus polling pause edilir, register'lar okunur) - Response payload backend'de parse edilir
device_bms_identitytablosu güncellenir (UPDATE veya INSERT)- Değişiklik varsa
device_bms_audit_logtablosuna append-only kayıt eklenir
Audit Log Trigger
if old_record.pack_count != new_record.pack_count:
audit_log.create(
identity_id=record.id,
field_changed="pack_count",
old_value=str(old_record.pack_count),
new_value=str(new_record.pack_count),
)
Bu sayede sahada batarya pack ekleme/çıkarma fiziksel müdahaleleri zaman damgalı olarak takip edilir.