Ana içeriğe geç

Celery Arka Plan Görevleri

Beat Schedule (Periyodik Görevler)

GörevZamanlamaQueueAçıklama
ota.check_scheduledHer 5 dkotaZamanlanmış OTA'ları tetikle
ota.check_timeoutsHer 10 dkota30dk+ süren OTA'ları timeout yap
alarm.check_snoozedHer 5 dkdefaultSnooze süresi biten alarm'ları reopen
alarm.check_device_offlineHer 5 dkdefaultVeri gelmeyen cihazlar için alarm
alarm.check_battery_soh_decline06:00defaultBatarya sağlığı düşüş kontrolü
cleanup.check_unclaimed_devices03:00cleanupUnclaimed device temizliği
reactive_ratio.calculate_daily00:15defaultGünlük reaktif oran hesaplama
reactive_ratio.calculate_monthlyAy 1. gün 01:00defaultAylık reaktif oran
energy_summary.calculate_daily00:30defaultGünlük enerji delta özeti
rawconfig.push_all_gatewaysHer 30 dkdefaultGateway config push (retained msg recovery)
bms_identity.daily_check04:00defaultBMS kimlik bilgisi kontrol

Görev Detayları

OTA Tasks

ota.process_batch — Batch OTA orkestrasyonu

  • Hard timeout: 7200s (2 saat), soft: 7000s
  • Gateway'leri batch_size'a böler (default 10)
  • Her batch arası batch_delay_seconds bekler (default 300s)
  • Her gateway için _initiate_single_ota() çağırır
  • Progress DB'ye yazılır

ota.check_scheduled — Zamanlanmış OTA tetikleme (5dk)

ota.check_timeouts — 30 dakikadan uzun süredir in_progress/downloading olan OTA'ları failed olarak işaretler

Alarm Tasks

alarm.send_notification — Alarm bildirimi gönderme

  • Kanallar: Email, SMS, WhatsApp, Push
  • 3 retry, 60s exponential backoff

alarm.check_device_offline — X dakika veri gelmeyen cihazları tespit eder ve device_offline alarmı oluşturur

Energy Tasks

reactive_ratio.calculate_daily — Önceki güne ait aktif/endüktif/kapasitif tüketim ve reaktif oran yüzdesi hesaplar. reactive_ratio_daily tablosuna yazar.

energy_summary.calculate_daily — İnverter (PV/load/grid/battery) ve analizör (active/reactive import/export) enerji deltalarını hesaplar. daily_energy_summaries tablosuna yazar.


Celery Broker URL Derivation

Celery worker ve beat servisleri Redis'i broker olarak kullanır. URL üretimi _derive_celery_url_from_redis helper'ı ile yapılır.

Öncelik Sırası

  1. CELERY_BROKER_URL env var tanımlıysa (override) doğrudan kullanılır.
  2. Aksi halde REDIS_URL üzerinden türetilir; veritabanı index'i 0 (cache) yerine 1 seçilir (cache anahtarlarıyla broker mesajlarının çakışmasını engellemek için).
def _derive_celery_url_from_redis(redis_url: str) -> str:
# redis://host:6379/0 → redis://host:6379/1
if redis_url.endswith("/0"):
return redis_url[:-2] + "/1"
return redis_url + "/1"

Container İçinde localhost Tehlikesi

ASLA hard-code etme

Celery konfigürasyonunda redis://localhost:6379/1 ASLA hard-code edilmemelidir. Container içinde localhost farklı bir host'tur (kendi container'ı), Redis erişilemez ve worker başlatılamaz.

# ❌ Yanlış
broker_url = "redis://localhost:6379/1"

# ✅ Doğru
broker_url = os.getenv("CELERY_BROKER_URL") or _derive_celery_url_from_redis(
os.getenv("REDIS_URL", "redis://redis:6379/0")
)

rawconfig_republish_task (Issue #246)

Gateway konfigürasyonu (devices, templates, desired) değiştiğinde retained MQTT mesajları yeniden yayınlanır. Yüksek değişiklik frekansında broker'ı boğmamak için debounce mekanizması kullanılır.

Trigger Noktaları

  • Device CRUD (POST/PUT/DELETE /api/devices)
  • Template register-map güncellemesi
  • Gateway config değişikliği
  • ORM event hook (SQLAlchemy after_insert, after_update, after_delete)

Redis Debounce Penceresi

key = f"rawconfig:republish:{gateway_id}"
acquired = redis.set(key, "1", nx=True, ex=5) # 5 saniye TTL
if not acquired:
return # Pencere açık, atla

SET NX EX 5 ile 5 saniyelik pencere açılır; aynı pencere içinde gelen tekrar tetikleyiciler atlanır. İlk task 5 saniye içinde toplanmış değişiklikleri tek seferde publish eder.

Payload Size Guards

LimitDeğerDavranış
RAWCONFIG_SIZE_SOFT12KBWarning log, publish devam eder
RAWCONFIG_SIZE_HARD20KBChunked publish'e düş (parçalı gönderim)

Chunked Fallback — HARD limit aşıldığında payload chunk_index/chunk_total metadata ile parçalanır; gateway tarafında reassembly yapılır. Detay için bkz. Firmware — Modbus.


energy_summary.calculate_daily (Issue #209)

Schedule

  • Her gün 00:30 Europe/Istanbul timezone'unda çalışır
  • Bir önceki günün ölçüm verilerini özetler
  • daily_energy_summaries tablosuna (device_id, summary_date) UNIQUE kısıtı ile yazılır

Hesaplanan 10 Enerji Alanı

Cihaz TipiAlanBirim
InverterPV import (üretim)kWh
InverterPV exportkWh
InverterLoad (yük tüketim)kWh
InverterGrid importkWh
InverterGrid exportkWh
InverterBattery chargekWh
AnalyzerActive importkWh
AnalyzerActive exportkWh
AnalyzerReactive importkVArh
AnalyzerReactive exportkVArh

calculate_energy_deltas() Semantiği

Energy meter'ları kümülatif counter'dır (sürekli artar). Günlük tüketim:

delta = max(end_value, start_value) - start_value
  • Counter wraparound (uint32 overflow) → wraparound modulus eklenir
  • Counter reset (cihaz değişimi) → 0'dan başlatılır, delta end_value
  • Eksik veri (cihaz offline) → NULL yazılır, downstream rapor "veri yok" gösterir

Tablo Tipi

daily_energy_summaries standart bir PostgreSQL tablosudur (hypertable değildir). Günde cihaz başına 1 satır üretildiği için zaman serisi yoğunluğu hypertable'ı haklı kılmaz. İndeks: (device_id, summary_date DESC).


bms_identity.daily_check (Issue #212)

Schedule

  • Her gün 04:00 UTC çalışır (düşük yoğunluk saati)
  • BMS register grup 5.10.x (Sofar HYD 13 BMS) okunur

Akış

  1. Tüm aktif batarya-bağlı inverter'lar listelenir
  2. Her cihaz için MQTT command gönderilir: cmd/read_group payload'ı {"groupId": "bms_identity"}
  3. ESP32 gateway read_group handler'ı tetiklenir (Modbus polling pause edilir, register'lar okunur)
  4. Response payload backend'de parse edilir
  5. device_bms_identity tablosu güncellenir (UPDATE veya INSERT)
  6. Değişiklik varsa device_bms_audit_log tablosuna append-only kayıt eklenir

Audit Log Trigger

if old_record.pack_count != new_record.pack_count:
audit_log.create(
identity_id=record.id,
field_changed="pack_count",
old_value=str(old_record.pack_count),
new_value=str(new_record.pack_count),
)

Bu sayede sahada batarya pack ekleme/çıkarma fiziksel müdahaleleri zaman damgalı olarak takip edilir.