""" ThingsBoardClient: live telemetry client for the Seymour vineyard at web.seymouragri.com. 2026 device layout (clean replacement of the 2024-25 sensor fleet) ------------------------------------------------------------------ TREATMENT area — rows 501 / 502 / 503 / 504 / 509, under solar panels. Each row hosts one or more Crop_2Soil sensors at cardinal positions (north / south / east / center). Per-device payload: dual soil profile (shallow + deep), leaf temp, IR ambient, NDVI, 17-band spectrometer, 17 vegetation indices, 32-point ToF canopy distance grid. REFERENCE area — row 202, open sky (no panels). Mirrors treatment with ~7 Crop_2Soil sensors carrying the same payload. PANEL SURFACE TEMPERATURE — 12 four-channel thermocouple devices (Thermocouples1-12) attached to panels (installation in progress; only a subset are active at any time). TRACKERS — 4 controllers (Tracker501/502/503/509) carrying angle + mode telemetry, addressable via shared attributes (setAngle/setMode). PLANT ASSET — "Yeruham Vineyard" carries plant-level energy (power, production). AMBIENT — there is no longer an on-site outdoor station; ambient weather comes from the IMS Sde Boker station 43 client (see ``ims_client.py``). Credentials (env vars or .env): THINGSBOARD_HOST — default https://web.seymouragri.com THINGSBOARD_USERNAME — tenant login email THINGSBOARD_PASSWORD — tenant login password THINGSBOARD_TOKEN — pre-generated JWT (takes priority over user/pass) """ from __future__ import annotations import math import os import time from concurrent.futures import ThreadPoolExecutor, as_completed from dataclasses import dataclass, field from datetime import datetime, timezone from enum import Enum from typing import Any, Dict, List, Optional, Tuple import pandas as pd import requests # --------------------------------------------------------------------------- # Enumerations # --------------------------------------------------------------------------- class VineArea(str, Enum): TREATMENT = "treatment" # under solar panels REFERENCE = "reference" # open sky, no panels AMBIENT = "ambient" # site-level outdoor baseline # --------------------------------------------------------------------------- # Device registry # --------------------------------------------------------------------------- @dataclass(frozen=True) class DeviceInfo: uuid: str device_id: int area: VineArea row: Optional[int] label: str position: Optional[str] = None # "north", "south", "center", "south-east", etc. #: 2026 device registry mapping short name → DeviceInfo. #: Source: TB prod entity groups "Crop 2Soil 2026" and "Thermocouples 2026" #: (discovered 2026-05-18). The 2024-25 fleet (Air1-4, Crop1-7, Soil1-9, #: Irrigation1, Thermocouples1-2) was decommissioned in the 2026 refresh. DEVICE_REGISTRY: Dict[str, DeviceInfo] = { # ---------- Crop_2Soil — TREATMENT (rows 501/502/503/504/509) ---------- "Crop_2Soil11": DeviceInfo( uuid="a4cfc5c0-2b64-11f1-b902-5ff1ea8c4cf9", device_id=0, area=VineArea.TREATMENT, row=501, label="501 north", position="north", ), "Crop_2Soil8": DeviceInfo( uuid="933b7f70-2b64-11f1-b902-5ff1ea8c4cf9", device_id=0, area=VineArea.TREATMENT, row=501, label="501 south", position="south", ), "Crop_2Soil1": DeviceInfo( uuid="38437e20-2b63-11f1-b902-5ff1ea8c4cf9", device_id=0, area=VineArea.TREATMENT, row=502, label="502 north", position="north", ), "Crop_2Soil17": DeviceInfo( uuid="d232bae0-2b64-11f1-b902-5ff1ea8c4cf9", device_id=0, area=VineArea.TREATMENT, row=502, label="502 south", position="south", ), "Crop_2Soil6": DeviceInfo( uuid="8766eef0-2b64-11f1-b902-5ff1ea8c4cf9", device_id=0, area=VineArea.TREATMENT, row=502, label="502 south-east", position="south-east", ), "Crop_2Soil13": DeviceInfo( # currently inactive uuid="bafcce10-2b64-11f1-b902-5ff1ea8c4cf9", device_id=0, area=VineArea.TREATMENT, row=503, label="503", position=None, ), "Crop_2Soil7": DeviceInfo( uuid="8d965680-2b64-11f1-b902-5ff1ea8c4cf9", device_id=0, area=VineArea.TREATMENT, row=504, label="504 north", position="north", ), "Crop_2Soil19": DeviceInfo( uuid="dde80390-2b64-11f1-b902-5ff1ea8c4cf9", device_id=0, area=VineArea.TREATMENT, row=504, label="504 center", position="center", ), "Crop_2Soil20": DeviceInfo( uuid="e44b2550-2b64-11f1-b902-5ff1ea8c4cf9", device_id=0, area=VineArea.TREATMENT, row=504, label="504 center-east", position="center-east", ), "Crop_2Soil9": DeviceInfo( uuid="9908c9d0-2b64-11f1-b902-5ff1ea8c4cf9", device_id=0, area=VineArea.TREATMENT, row=509, label="509 south", position="south", ), # ---------- Crop_2Soil — REFERENCE (row 202, open sky) ---------- "Crop_2Soil4": DeviceInfo( uuid="7bea6980-2b64-11f1-b902-5ff1ea8c4cf9", device_id=0, area=VineArea.REFERENCE, row=202, label="202 north (ref)", position="north", ), "Crop_2Soil3": DeviceInfo( uuid="7362e120-2b64-11f1-b902-5ff1ea8c4cf9", device_id=0, area=VineArea.REFERENCE, row=202, label="202 north", position="north", ), "Crop_2Soil18": DeviceInfo( uuid="d7b8ea20-2b64-11f1-b902-5ff1ea8c4cf9", device_id=0, area=VineArea.REFERENCE, row=202, label="202 north-east", position="north-east", ), "Crop_2Soil2": DeviceInfo( # currently inactive uuid="79a26ac0-2b63-11f1-b902-5ff1ea8c4cf9", device_id=0, area=VineArea.REFERENCE, row=202, label="202 center (ref)", position="center", ), "Crop_2Soil5": DeviceInfo( uuid="81a95c00-2b64-11f1-b902-5ff1ea8c4cf9", device_id=0, area=VineArea.REFERENCE, row=202, label="202 center", position="center", ), "Crop_2Soil14": DeviceInfo( uuid="c05cd7b0-2b64-11f1-b902-5ff1ea8c4cf9", device_id=0, area=VineArea.REFERENCE, row=202, label="202 south", position="south", ), "Crop_2Soil10": DeviceInfo( uuid="9f4047b0-2b64-11f1-b902-5ff1ea8c4cf9", device_id=0, area=VineArea.REFERENCE, row=202, label="202 south (ref)", position="south", ), # Unlabeled / unallocated (kept for completeness; currently inactive) "Crop_2Soil12": DeviceInfo( uuid="aa114ae0-2b64-11f1-b902-5ff1ea8c4cf9", device_id=0, area=VineArea.TREATMENT, row=None, label="unallocated", ), "Crop_2Soil15": DeviceInfo( uuid="c6574c90-2b64-11f1-b902-5ff1ea8c4cf9", device_id=0, area=VineArea.TREATMENT, row=None, label="unallocated", ), # ---------- Thermocouples — panel surface temps (installation in progress) ---------- "Thermocouples1": DeviceInfo( uuid="d172ffe0-4fac-11f1-829c-09d61d29d108", device_id=0, area=VineArea.TREATMENT, row=None, label="Panel surface thermocouples 1", ), "Thermocouples2": DeviceInfo( uuid="e0d87f50-4fac-11f1-829c-09d61d29d108", device_id=0, area=VineArea.TREATMENT, row=None, label="Panel surface thermocouples 2", ), "Thermocouples3": DeviceInfo( uuid="e737d080-4fac-11f1-829c-09d61d29d108", device_id=0, area=VineArea.TREATMENT, row=None, label="Panel surface thermocouples 3", ), "Thermocouples4": DeviceInfo( uuid="ed4901b0-4fac-11f1-829c-09d61d29d108", device_id=0, area=VineArea.TREATMENT, row=None, label="Panel surface thermocouples 4", ), "Thermocouples5": DeviceInfo( uuid="f3f07f70-4fac-11f1-829c-09d61d29d108", device_id=0, area=VineArea.TREATMENT, row=None, label="Panel surface thermocouples 5", ), "Thermocouples6": DeviceInfo( uuid="faa591c0-4fac-11f1-829c-09d61d29d108", device_id=0, area=VineArea.TREATMENT, row=None, label="Panel surface thermocouples 6", ), "Thermocouples7": DeviceInfo( uuid="0095a660-4fad-11f1-829c-09d61d29d108", device_id=0, area=VineArea.TREATMENT, row=None, label="Panel surface thermocouples 7", ), "Thermocouples8": DeviceInfo( uuid="07168950-4fad-11f1-829c-09d61d29d108", device_id=0, area=VineArea.TREATMENT, row=None, label="Panel surface thermocouples 8", ), "Thermocouples9": DeviceInfo( uuid="0e69fe80-4fad-11f1-829c-09d61d29d108", device_id=0, area=VineArea.TREATMENT, row=None, label="Panel surface thermocouples 9", ), "Thermocouples10": DeviceInfo( uuid="14e36760-4fad-11f1-829c-09d61d29d108", device_id=0, area=VineArea.TREATMENT, row=None, label="Panel surface thermocouples 10", ), "Thermocouples11": DeviceInfo( uuid="1b513780-4fad-11f1-829c-09d61d29d108", device_id=0, area=VineArea.TREATMENT, row=None, label="Panel surface thermocouples 11", ), "Thermocouples12": DeviceInfo( uuid="2121dd40-4fad-11f1-829c-09d61d29d108", device_id=0, area=VineArea.TREATMENT, row=None, label="Panel surface thermocouples 12", ), # ---------- Tracker controllers (panel angle + mode) ---------- "Tracker501": DeviceInfo( uuid="aac06e50-f769-11f0-b902-5ff1ea8c4cf9", device_id=0, area=VineArea.TREATMENT, row=501, label="Tracker row 501", ), "Tracker502": DeviceInfo( uuid="b99bd630-f769-11f0-b902-5ff1ea8c4cf9", device_id=0, area=VineArea.TREATMENT, row=502, label="Tracker row 502", ), "Tracker503": DeviceInfo( uuid="caffe4c0-f769-11f0-b902-5ff1ea8c4cf9", device_id=0, area=VineArea.TREATMENT, row=503, label="Tracker row 503", ), "Tracker509": DeviceInfo( uuid="bacf7c50-fcdc-11f0-b902-5ff1ea8c4cf9", device_id=0, area=VineArea.TREATMENT, row=509, label="Tracker row 509", ), } # Convenience subsets — derived once at import time. CROP_2SOIL_DEVICES: List[str] = [ n for n, d in DEVICE_REGISTRY.items() if n.startswith("Crop_2Soil") ] THERMOCOUPLE_DEVICES: List[str] = [ n for n in DEVICE_REGISTRY if n.startswith("Thermocouples") ] TREATMENT_DEVICES: List[str] = [ n for n, d in DEVICE_REGISTRY.items() if d.area == VineArea.TREATMENT and n.startswith("Crop_2Soil") and d.row is not None ] REFERENCE_DEVICES: List[str] = [ n for n, d in DEVICE_REGISTRY.items() if d.area == VineArea.REFERENCE and n.startswith("Crop_2Soil") ] # --------------------------------------------------------------------------- # Asset registry (non-device entities — e.g. the plant-level energy asset) # --------------------------------------------------------------------------- @dataclass(frozen=True) class AssetInfo: uuid: str label: str ASSET_REGISTRY: Dict[str, AssetInfo] = { "Plant": AssetInfo( uuid="dc94ddb0-dbe6-11f0-9352-a53ca0b6a212", label="Yeruham Vineyard — plant-level energy", ), } ENERGY_KEYS: List[str] = ["power", "production"] TRACKER_KEYS: List[str] = ["angle", "manualMode", "setAngle", "setMode"] # --------------------------------------------------------------------------- # Telemetry key sets per device type # --------------------------------------------------------------------------- # Core subset of the 2026 Crop_2Soil 118-key payload — chosen to keep VineSnapshot # size sane while covering every field downstream code reads. Extend as needed. CROP_2SOIL_KEYS: List[str] = [ # canopy / leaf "leafTemperature", "ambientTemperatureIRT", "NDVI", "leafWetnessErr", # soil profile 1 (shallow) "soilTemperature", "soilDielectric", "soilBulkEC", "soilMoisture", "soilPoreWaterEC", # soil profile 2 (deep) "soilTemperature2", "soilDielectric2", "soilBulkEC2", "soilMoisture2", "soilPoreWaterEC2", # vegetation indices (subset surfaced via VineSnapshot) "PRI", "PSRI", "SIPI", "GCI", "LCI", # UV stress proxy "DUVI", # health "batteryPercentage", "rssi", ] # 4 thermocouple channels per device; "internalTemperature_n" is the cold-junction reference. THERMOCOUPLE_KEYS: List[str] = [ "thermocoupleTemperature_1", "thermocoupleTemperature_2", "thermocoupleTemperature_3", "thermocoupleTemperature_4", "batteryPercentage", "rssi", ] # --------------------------------------------------------------------------- # VineSnapshot dataclass # --------------------------------------------------------------------------- @dataclass class VineSnapshot: """ Aggregated real-time vine state from the 2026 ThingsBoard fleet. Fields are grouped by area: - ambient : reserved (None — sourced from IMS, not from TB) - treatment : under solar panels (rows 501/502/503/504/509) - reference : open sky / no panels (row 202) None means no sensor returned a value (or no sensor exists for that field). """ snapshot_ts: datetime staleness_minutes: float # --- Ambient (reserved — sourced from IMS Sde Boker, not TB) --- ambient_temp_c: Optional[float] = None ambient_humidity_pct: Optional[float] = None ambient_wind_speed_ms: Optional[float] = None ambient_wind_angle_deg: Optional[float] = None ambient_rain_mm: Optional[float] = None # --- Treatment microclimate (now derived from Crop_2Soil IRT + leaf temp) --- treatment_air_temp_c: Optional[float] = None # avg ambientTemperatureIRT treatment_leaf_temp_c: Optional[float] = None # avg leafTemperature treatment_vpd_kpa: Optional[float] = None # None — no dedicated VPD sensor treatment_co2_ppm: Optional[float] = None # None — no CO2 sensor in 2026 fleet treatment_air_leaf_delta_t: Optional[float] = None # derived = leaf - irt # --- Treatment crop (avg over treatment Crop_2Soil devices) --- treatment_crop_ndvi: Optional[float] = None treatment_crop_pri: Optional[float] = None treatment_crop_psri: Optional[float] = None treatment_crop_sipi: Optional[float] = None treatment_crop_gci: Optional[float] = None treatment_crop_lci: Optional[float] = None treatment_crop_duvi: Optional[float] = None treatment_leaf_wetness_err: Optional[float] = None # Per-position readings keyed by "row-position" (e.g. "502-north") treatment_crop_by_position: Dict[str, Dict[str, Optional[float]]] = field(default_factory=dict) # --- Reference crop (avg over reference Crop_2Soil devices) --- reference_crop_ndvi: Optional[float] = None reference_crop_pri: Optional[float] = None reference_crop_psri: Optional[float] = None reference_crop_sipi: Optional[float] = None reference_crop_leaf_temp_c: Optional[float] = None reference_air_temp_c: Optional[float] = None reference_crop_by_position: Dict[str, Dict[str, Optional[float]]] = field(default_factory=dict) # --- Index ratios (treatment / reference) — proxy for shading benefit/cost --- ndvi_ratio: Optional[float] = None # <1 = panels reduce greenness par_shading_ratio: Optional[float] = None # placeholder — no PAR sensor in 2026 fleet # --- Treatment soil (avg over treatment Crop_2Soil; dual profile) --- treatment_soil_moisture_pct: Optional[float] = None # shallow avg treatment_soil_moisture_deep_pct: Optional[float] = None # deep avg treatment_soil_temp_c: Optional[float] = None # shallow avg treatment_soil_temp_deep_c: Optional[float] = None treatment_soil_ec_ds_m: Optional[float] = None # bulk EC, shallow treatment_soil_pore_water_ec: Optional[float] = None # pore water EC, shallow treatment_soil_pore_water_ec_deep: Optional[float] = None # pore water EC, deep treatment_soil_ph: Optional[float] = None # None — no pH in 2026 # --- Reference soil --- reference_soil_moisture_pct: Optional[float] = None reference_soil_moisture_deep_pct: Optional[float] = None reference_soil_temp_c: Optional[float] = None reference_soil_pore_water_ec: Optional[float] = None reference_soil_pore_water_ec_deep: Optional[float] = None # --- Irrigation (no dedicated device in 2026 — kept None; see soil moisture trends) --- irrigation_last_volume_l: Optional[float] = None irrigation_last_minutes: Optional[float] = None irrigation_ec: Optional[float] = None irrigation_ph: Optional[float] = None water_temp_c: Optional[float] = None # --- Panel surface temperatures (avg across all active Thermocouples1-12) --- treatment_panel_temp_c: Optional[float] = None panel_temp_active_count: int = 0 # how many of the 12 devices reported reference_panel_temp_c: Optional[float] = None # always None — no reference TC in 2026 # --- Compatibility shims for downstream consumers -------------------- # The 2024-25 schema exposed PAR/DLI fields. The 2026 fleet has no PAR # sensor; we expose Nones here so consumers don't crash. New deployments # should consume NDVI/PRI/PSRI directly. treatment_par_umol: Optional[float] = None treatment_dli_mol_m2: Optional[float] = None treatment_ndvi: Optional[float] = None # alias of treatment_crop_ndvi treatment_pri: Optional[float] = None # alias of treatment_crop_pri treatment_crop_par_umol: Optional[float] = None treatment_crop_leaf_temp_c: Optional[float] = None treatment_crop_dli_mol_m2: Optional[float] = None treatment_crop_par_avg1h: Optional[float] = None reference_crop_par_umol: Optional[float] = None reference_crop_dli_mol_m2: Optional[float] = None def to_advisor_text(self) -> str: """Format snapshot for inclusion in an AI advisory prompt.""" age = f"{self.staleness_minutes:.0f}" if self.staleness_minutes < 120 else f">{self.staleness_minutes:.0f}" lines = [f"VINE STATE (ThingsBoard 2026 sensors, ~{age} min ago):"] lines.append(" TREATMENT area (rows 501/502/503/504/509, under solar panels):") if self.treatment_air_temp_c is not None: lines.append(f" Air temperature (IRT): {self.treatment_air_temp_c:.1f} C") if self.treatment_leaf_temp_c is not None: lines.append(f" Leaf temperature: {self.treatment_leaf_temp_c:.1f} C") if self.treatment_air_leaf_delta_t is not None: lines.append(f" Leaf-air delta-T: {self.treatment_air_leaf_delta_t:+.1f} C (proxy for heat stress)") if self.treatment_crop_ndvi is not None: lines.append(f" NDVI: {self.treatment_crop_ndvi:.3f}") if self.treatment_crop_pri is not None: lines.append(f" PRI: {self.treatment_crop_pri:.3f} (light-use efficiency)") if self.treatment_crop_psri is not None: lines.append(f" PSRI: {self.treatment_crop_psri:.3f} (senescence)") if self.treatment_soil_moisture_pct is not None: line = f" Soil moisture (shallow): {self.treatment_soil_moisture_pct:.1f}%" if self.treatment_soil_moisture_deep_pct is not None: line += f" / deep {self.treatment_soil_moisture_deep_pct:.1f}%" lines.append(line) if self.treatment_soil_temp_c is not None: lines.append(f" Soil temperature: {self.treatment_soil_temp_c:.1f} C") if self.treatment_soil_pore_water_ec is not None: lines.append(f" Soil pore-water EC: {self.treatment_soil_pore_water_ec:.2f} dS/m") if self.treatment_panel_temp_c is not None: lines.append( f" Panel surface temp: {self.treatment_panel_temp_c:.1f} C " f"({self.panel_temp_active_count}/12 thermocouples)" ) if self.treatment_crop_by_position: lines.append(" Per-position readings:") for pos, vals in self.treatment_crop_by_position.items(): tl = vals.get("leaf_temp") nv = vals.get("ndvi") sm = vals.get("soil_moisture") tl_s = f"leaf {tl:.1f} C" if tl is not None else "leaf N/A" nv_s = f" | NDVI {nv:.2f}" if nv is not None else "" sm_s = f" | soil {sm:.1f}%" if sm is not None else "" lines.append(f" {pos}: {tl_s}{nv_s}{sm_s}") lines.append("") lines.append(" REFERENCE area (row 202, open sky, no panels):") if self.reference_air_temp_c is not None: lines.append(f" Air temperature (IRT): {self.reference_air_temp_c:.1f} C") if self.reference_crop_leaf_temp_c is not None: lines.append(f" Leaf temperature: {self.reference_crop_leaf_temp_c:.1f} C") if self.reference_crop_ndvi is not None: lines.append(f" NDVI: {self.reference_crop_ndvi:.3f}") if self.reference_crop_pri is not None: lines.append(f" PRI: {self.reference_crop_pri:.3f}") if self.reference_soil_moisture_pct is not None: line = f" Soil moisture (shallow): {self.reference_soil_moisture_pct:.1f}%" if self.reference_soil_moisture_deep_pct is not None: line += f" / deep {self.reference_soil_moisture_deep_pct:.1f}%" lines.append(line) if self.reference_crop_by_position: lines.append(" Per-position readings:") for pos, vals in self.reference_crop_by_position.items(): tl = vals.get("leaf_temp") nv = vals.get("ndvi") tl_s = f"leaf {tl:.1f} C" if tl is not None else "leaf N/A" nv_s = f" | NDVI {nv:.2f}" if nv is not None else "" lines.append(f" {pos}: {tl_s}{nv_s}") if self.ndvi_ratio is not None: reduction_pct = (1 - self.ndvi_ratio) * 100 lines.append("") direction = "below" if reduction_pct >= 0 else "above" lines.append( f" NDVI ratio (treatment/reference): {self.ndvi_ratio:.2f}" f" ({abs(reduction_pct):.0f}% {direction} reference)" ) return "\n".join(lines) def to_dict(self) -> Dict[str, Any]: """Return a flat dict suitable for JSON serialization (e.g., chatbot tool result).""" out: Dict[str, Any] = { "snapshot_ts": self.snapshot_ts.isoformat(), "staleness_minutes": round(self.staleness_minutes, 1), "panel_temp_active_count": self.panel_temp_active_count, } for attr in ( # ambient (None until IMS bridge wired) "ambient_temp_c", "ambient_humidity_pct", "ambient_wind_speed_ms", "ambient_wind_angle_deg", "ambient_rain_mm", # treatment air/leaf "treatment_air_temp_c", "treatment_leaf_temp_c", "treatment_vpd_kpa", "treatment_co2_ppm", "treatment_air_leaf_delta_t", # treatment indices "treatment_crop_ndvi", "treatment_crop_pri", "treatment_crop_psri", "treatment_crop_sipi", "treatment_crop_gci", "treatment_crop_lci", "treatment_crop_duvi", "treatment_leaf_wetness_err", # reference indices "reference_crop_ndvi", "reference_crop_pri", "reference_crop_psri", "reference_crop_sipi", "reference_crop_leaf_temp_c", "reference_air_temp_c", # ratios "ndvi_ratio", "par_shading_ratio", # treatment soil "treatment_soil_moisture_pct", "treatment_soil_moisture_deep_pct", "treatment_soil_temp_c", "treatment_soil_temp_deep_c", "treatment_soil_ec_ds_m", "treatment_soil_pore_water_ec", "treatment_soil_pore_water_ec_deep", "treatment_soil_ph", # reference soil "reference_soil_moisture_pct", "reference_soil_moisture_deep_pct", "reference_soil_temp_c", "reference_soil_pore_water_ec", "reference_soil_pore_water_ec_deep", # irrigation (None until re-instrumented) "irrigation_last_volume_l", "irrigation_last_minutes", "irrigation_ec", "irrigation_ph", "water_temp_c", # panels "treatment_panel_temp_c", "reference_panel_temp_c", # legacy shims (None on 2026 hardware) "treatment_par_umol", "treatment_dli_mol_m2", "treatment_ndvi", "treatment_pri", "treatment_crop_par_umol", "treatment_crop_leaf_temp_c", "treatment_crop_dli_mol_m2", "treatment_crop_par_avg1h", "reference_crop_par_umol", "reference_crop_dli_mol_m2", ): val = getattr(self, attr) out[attr] = round(val, 3) if val is not None else None out["treatment_crop_by_position"] = self.treatment_crop_by_position out["reference_crop_by_position"] = self.reference_crop_by_position return out # --------------------------------------------------------------------------- # Configuration # --------------------------------------------------------------------------- @dataclass class ThingsBoardConfig: """ThingsBoard connection settings. Data retrieval always uses prod (Seymour).""" # Prod only — test (eu.thingsboard.cloud) is for deploying apps, not data host: str = os.environ.get("THINGSBOARD_HOST", "https://web.seymouragri.com/") username: Optional[str] = ( os.environ.get("THINGSBOARD_USERNAME") or os.environ.get("TB_USERNAME") ) password: Optional[str] = ( os.environ.get("THINGSBOARD_PASSWORD") or os.environ.get("TB_PASSWORD") ) token: Optional[str] = os.environ.get("THINGSBOARD_TOKEN") # --------------------------------------------------------------------------- # Client # --------------------------------------------------------------------------- class ThingsBoardClient: """ Minimal ThingsBoard client for the Seymour vineyard. Authentication -------------- Provide THINGSBOARD_TOKEN for a pre-generated JWT, or THINGSBOARD_USERNAME + THINGSBOARD_PASSWORD for login-based auth. Tokens are cached and refreshed automatically before they expire. Usage ----- client = ThingsBoardClient() snapshot = client.get_vine_snapshot() print(snapshot.to_advisor_text()) """ _TOKEN_TTL_SECONDS = 8_000 # ThingsBoard default is 9000 s; be conservative def __init__(self, config: Optional[ThingsBoardConfig] = None) -> None: self.config = config or ThingsBoardConfig() self._session = requests.Session() self._session.headers.update({"Content-Type": "application/json"}) self._jwt: Optional[str] = None self._jwt_expires_at: float = 0.0 # ------------------------------------------------------------------ # Authentication # ------------------------------------------------------------------ def _ensure_jwt(self) -> str: """Return a valid JWT, obtaining or refreshing as needed.""" if self.config.token: if "X-Authorization" not in self._session.headers: self._session.headers["X-Authorization"] = f"Bearer {self.config.token}" return self.config.token if self._jwt and time.monotonic() < self._jwt_expires_at: return self._jwt if not self.config.username or not self.config.password: raise RuntimeError( "ThingsBoard authentication requires THINGSBOARD_TOKEN " "or both THINGSBOARD_USERNAME and THINGSBOARD_PASSWORD." ) url = f"{self.config.host.rstrip('/')}/api/auth/login" resp = self._session.post( url, json={"username": self.config.username, "password": self.config.password}, timeout=10, ) resp.raise_for_status() token = resp.json()["token"] self._jwt = token self._jwt_expires_at = time.monotonic() + self._TOKEN_TTL_SECONDS self._session.headers["X-Authorization"] = f"Bearer {token}" return token # ------------------------------------------------------------------ # Low-level API calls # ------------------------------------------------------------------ # ------------------------------------------------------------------ # Shared low-level helpers (DEVICE and ASSET use the same REST API, # differing only in the entity-type path segment). # ------------------------------------------------------------------ def _fetch_latest_raw( self, entity_type: str, uuid: str, keys: List[str], ) -> Tuple[Dict[str, Optional[float]], Optional[datetime]]: """Fetch most-recent telemetry for any entity type (DEVICE or ASSET).""" self._ensure_jwt() url = ( f"{self.config.host.rstrip('/')}/api/plugins/telemetry/{entity_type}" f"/{uuid}/values/timeseries" ) resp = self._session.get(url, params={"keys": ",".join(keys)}, timeout=15) resp.raise_for_status() raw: Dict[str, List[Dict]] = resp.json() values: Dict[str, Optional[float]] = {} newest_ts_ms: Optional[int] = None for key in keys: entries = raw.get(key, []) if entries: values[key] = _safe_float(entries[0]["value"]) ts_ms = entries[0].get("ts") if ts_ms and (newest_ts_ms is None or ts_ms > newest_ts_ms): newest_ts_ms = ts_ms else: values[key] = None newest_ts = ( datetime.fromtimestamp(newest_ts_ms / 1000, tz=timezone.utc) if newest_ts_ms else None ) return values, newest_ts def _fetch_timeseries_raw( self, entity_type: str, uuid: str, keys: List[str], start: datetime, end: datetime, limit: int = 1000, interval_ms: int = 900_000, agg: str = "NONE", ) -> pd.DataFrame: """Fetch time-series telemetry for any entity type (DEVICE or ASSET).""" self._ensure_jwt() start_ms = int(start.timestamp() * 1000) end_ms = int(end.timestamp() * 1000) url = ( f"{self.config.host.rstrip('/')}/api/plugins/telemetry/{entity_type}" f"/{uuid}/values/timeseries" ) params: Dict[str, Any] = { "keys": ",".join(keys), "startTs": start_ms, "endTs": end_ms, "limit": limit, "agg": agg, } if agg != "NONE": params["interval"] = interval_ms resp = self._session.get(url, params=params, timeout=30) resp.raise_for_status() raw: Dict[str, List[Dict]] = resp.json() frames: Dict[str, pd.Series] = {} for key, entries in raw.items(): if key in keys and entries: ts = pd.to_datetime([e["ts"] for e in entries], unit="ms", utc=True) vals = [_safe_float(e["value"]) for e in entries] frames[key] = pd.Series(vals, index=ts) if not frames: return pd.DataFrame() return pd.DataFrame(frames).sort_index() # ------------------------------------------------------------------ # Device API (public) # ------------------------------------------------------------------ def _fetch_latest( self, device_name: str, keys: List[str], ) -> Tuple[Dict[str, Optional[float]], Optional[datetime]]: """Fetch most-recent values for a named device.""" info = DEVICE_REGISTRY[device_name] return self._fetch_latest_raw("DEVICE", info.uuid, keys) def get_latest_telemetry( self, device_name: str, keys: List[str], ) -> Dict[str, Optional[float]]: """Return the most recent value for each key. Missing keys return None.""" if device_name not in DEVICE_REGISTRY: raise KeyError( f"Unknown device: {device_name!r}. " f"Valid names: {sorted(DEVICE_REGISTRY)}" ) values, _ = self._fetch_latest(device_name, keys) return values def get_timeseries( self, device_name: str, keys: List[str], start: datetime, end: datetime, limit: int = 1000, interval_ms: int = 900_000, # 15 minutes agg: str = "NONE", ) -> pd.DataFrame: """Fetch time-series telemetry for a named device.""" if device_name not in DEVICE_REGISTRY: raise KeyError(f"Unknown device: {device_name!r}") info = DEVICE_REGISTRY[device_name] return self._fetch_timeseries_raw( "DEVICE", info.uuid, keys, start, end, limit, interval_ms, agg, ) # ------------------------------------------------------------------ # Asset API (public) # ------------------------------------------------------------------ def get_asset_timeseries( self, asset_name: str, keys: List[str], start: datetime, end: datetime, limit: int = 1000, interval_ms: int = 3_600_000, # 1 hour agg: str = "SUM", ) -> pd.DataFrame: """Fetch time-series from a ThingsBoard ASSET (e.g. Plant energy).""" if asset_name not in ASSET_REGISTRY: raise KeyError(f"Unknown asset: {asset_name!r}. Valid: {sorted(ASSET_REGISTRY)}") info = ASSET_REGISTRY[asset_name] return self._fetch_timeseries_raw( "ASSET", info.uuid, keys, start, end, limit, interval_ms, agg, ) def get_asset_latest( self, asset_name: str, keys: List[str], ) -> Dict[str, Optional[float]]: """Fetch latest telemetry from a ThingsBoard ASSET.""" if asset_name not in ASSET_REGISTRY: raise KeyError(f"Unknown asset: {asset_name!r}") info = ASSET_REGISTRY[asset_name] values, _ = self._fetch_latest_raw("ASSET", info.uuid, keys) return values # ------------------------------------------------------------------ # Device commands (RPC + attribute writes) # ------------------------------------------------------------------ def send_rpc_command( self, device_name: str, method: str, params: Any = None, timeout: float = 10.0, ) -> Dict[str, Any]: """Send a two-way RPC command to a device. Uses POST /api/plugins/rpc/twoway/{deviceId}. Falls back to one-way if two-way returns 404. """ if device_name not in DEVICE_REGISTRY: raise KeyError(f"Unknown device: {device_name!r}") info = DEVICE_REGISTRY[device_name] self._ensure_jwt() payload = {"method": method, "params": params if params is not None else {}} # Try two-way RPC first url = ( f"{self.config.host.rstrip('/')}/api/plugins/rpc/twoway" f"/{info.uuid}" ) resp = self._session.post(url, json=payload, timeout=timeout) if resp.status_code in (404, 405): # Fallback to one-way RPC url = ( f"{self.config.host.rstrip('/')}/api/plugins/rpc/oneway" f"/{info.uuid}" ) resp = self._session.post(url, json=payload, timeout=timeout) resp.raise_for_status() try: return resp.json() except Exception: return {"status": "ok", "status_code": resp.status_code} def set_device_attributes( self, device_name: str, attributes: Dict[str, Any], scope: str = "SHARED_SCOPE", ) -> None: """Write server-side attributes to a device. Uses POST /api/plugins/telemetry/DEVICE/{id}/attributes/{scope}. This is an alternative to RPC for setting tracker targets. """ if device_name not in DEVICE_REGISTRY: raise KeyError(f"Unknown device: {device_name!r}") info = DEVICE_REGISTRY[device_name] self._ensure_jwt() url = ( f"{self.config.host.rstrip('/')}/api/plugins/telemetry/DEVICE" f"/{info.uuid}/attributes/{scope}" ) resp = self._session.post(url, json=attributes, timeout=10) resp.raise_for_status() # ------------------------------------------------------------------ # High-level vine snapshot # ------------------------------------------------------------------ # Dashboard-only: one treatment + one reference Crop_2Soil for farmer view. _DASHBOARD_FETCH_PLAN: Dict[str, List[str]] = { "Crop_2Soil1": CROP_2SOIL_KEYS, # 502 north (treatment) "Crop_2Soil4": CROP_2SOIL_KEYS, # 202 north reference } # Light mode: a couple per area + one thermocouple for the chatbot. _LIGHT_FETCH_PLAN: Dict[str, List[str]] = { "Crop_2Soil1": CROP_2SOIL_KEYS, "Crop_2Soil11": CROP_2SOIL_KEYS, "Crop_2Soil4": CROP_2SOIL_KEYS, "Crop_2Soil10": CROP_2SOIL_KEYS, "Thermocouples3": THERMOCOUPLE_KEYS, } # Full mode: every Crop_2Soil with a known row + every thermocouple. _FULL_FETCH_PLAN: Dict[str, List[str]] = { **{n: CROP_2SOIL_KEYS for n in TREATMENT_DEVICES + REFERENCE_DEVICES}, **{n: THERMOCOUPLE_KEYS for n in THERMOCOUPLE_DEVICES}, } def get_vine_snapshot(self, light: bool = False, mode: Optional[str] = None) -> VineSnapshot: """ Fetch latest telemetry from the 2026 Crop_2Soil + thermocouple fleet and return an aggregated VineSnapshot distinguishing treatment vs reference areas. Uses a thread pool to parallelise HTTP requests. Individual device failures are silently skipped (returns None fields). Parameters ---------- light : bool If True, fetch a 5-device subset instead of the full fleet. mode : str, optional "dashboard" = 2 devices (one treatment, one reference). Overrides `light` when set. """ if mode == "dashboard": fetch_plan = self._DASHBOARD_FETCH_PLAN elif light: fetch_plan = self._LIGHT_FETCH_PLAN else: fetch_plan = self._FULL_FETCH_PLAN # Ensure auth token before spawning threads (avoid race on login) self._ensure_jwt() raw_results: Dict[str, Dict[str, Optional[float]]] = {} newest_ts_overall: Optional[datetime] = None with ThreadPoolExecutor(max_workers=8) as pool: future_map = { pool.submit(self._fetch_latest, name, keys): name for name, keys in fetch_plan.items() } for future in as_completed(future_map, timeout=25): name = future_map[future] try: values, ts = future.result() raw_results[name] = values if ts and (newest_ts_overall is None or ts > newest_ts_overall): newest_ts_overall = ts except Exception: raw_results[name] = {} now = datetime.now(tz=timezone.utc) staleness = ( (now - newest_ts_overall).total_seconds() / 60 if newest_ts_overall else float("nan") ) # ---------- Bucket results by area + collect per-position rows ---------- treatment_devs: List[Dict[str, Any]] = [] reference_devs: List[Dict[str, Any]] = [] treatment_by_pos: Dict[str, Dict[str, Optional[float]]] = {} reference_by_pos: Dict[str, Dict[str, Optional[float]]] = {} for name, values in raw_results.items(): if not name.startswith("Crop_2Soil"): continue info = DEVICE_REGISTRY.get(name) if info is None or not values: continue pos_key = ( f"{info.row}-{info.position}" if info.row is not None and info.position else (str(info.row) if info.row is not None else name) ) row_summary = { "leaf_temp": values.get("leafTemperature"), "air_temp_irt": values.get("ambientTemperatureIRT"), "ndvi": values.get("NDVI"), "pri": values.get("PRI"), "psri": values.get("PSRI"), "soil_moisture": values.get("soilMoisture"), "soil_temp": values.get("soilTemperature"), } if info.area == VineArea.TREATMENT: treatment_devs.append(values) treatment_by_pos[pos_key] = row_summary elif info.area == VineArea.REFERENCE: reference_devs.append(values) reference_by_pos[pos_key] = row_summary # ---------- Helpers to average a key across devices, with bounds ---------- def _avg(devs: List[Dict[str, Any]], key: str, bounds: Tuple[float, float]) -> Optional[float]: return _bounded_avg(*bounds, *[d.get(key) for d in devs]) def _avg_soil_layer(devs: List[Dict[str, Any]], keys: Tuple[str, ...], bounds: Tuple[float, float]) -> Optional[float]: vals = [d.get(k) for d in devs for k in keys if d.get(k) is not None] return _bounded_avg(*bounds, *vals) if vals else None # ---------- Air / leaf ---------- t_leaf = _avg(treatment_devs, "leafTemperature", _BOUNDS["leaf_temp"]) t_irt = _avg(treatment_devs, "ambientTemperatureIRT", _BOUNDS["air_temp"]) r_leaf = _avg(reference_devs, "leafTemperature", _BOUNDS["leaf_temp"]) r_irt = _avg(reference_devs, "ambientTemperatureIRT", _BOUNDS["air_temp"]) t_delta = (t_leaf - t_irt) if (t_leaf is not None and t_irt is not None) else None # ---------- NDVI ratio (treatment / reference) ---------- t_ndvi = _avg(treatment_devs, "NDVI", _BOUNDS["ndvi"]) r_ndvi = _avg(reference_devs, "NDVI", _BOUNDS["ndvi"]) t_pri = _avg(treatment_devs, "PRI", _BOUNDS["pri"]) ndvi_ratio: Optional[float] = None if t_ndvi is not None and r_ndvi is not None and r_ndvi > 0: ndvi_ratio = t_ndvi / r_ndvi # ---------- Panel surface temps (avg over active thermocouples) ---------- tc_vals: List[float] = [] tc_active_count = 0 for name in THERMOCOUPLE_DEVICES: tc = raw_results.get(name, {}) channels = [tc.get(k) for k in THERMOCOUPLE_KEYS if k.startswith("thermocoupleTemperature_")] if any(v is not None for v in channels): tc_active_count += 1 tc_vals.extend(channels) t_panel_temp = _bounded_avg(*_BOUNDS["panel_temp"], *tc_vals) if tc_vals else None snapshot = VineSnapshot( snapshot_ts=now, staleness_minutes=staleness, # Ambient — reserved (sourced from IMS, not TB) # Treatment climate (now derived from Crop_2Soil IRT + leaf) treatment_air_temp_c=t_irt, treatment_leaf_temp_c=t_leaf, treatment_air_leaf_delta_t=t_delta, # Treatment crop indices treatment_crop_ndvi=t_ndvi, treatment_crop_pri=t_pri, treatment_crop_psri=_avg(treatment_devs, "PSRI", _BOUNDS["psri"]), treatment_crop_sipi=_avg(treatment_devs, "SIPI", _BOUNDS["sipi"]), treatment_crop_gci=_avg(treatment_devs, "GCI", _BOUNDS["gci"]), treatment_crop_lci=_avg(treatment_devs, "LCI", _BOUNDS["lci"]), treatment_crop_duvi=_avg(treatment_devs, "DUVI", _BOUNDS["duvi"]), treatment_leaf_wetness_err=_avg( treatment_devs, "leafWetnessErr", _BOUNDS["leaf_wetness"] ), treatment_crop_by_position=treatment_by_pos, # Reference crop indices reference_crop_ndvi=r_ndvi, reference_crop_pri=_avg(reference_devs, "PRI", _BOUNDS["pri"]), reference_crop_psri=_avg(reference_devs, "PSRI", _BOUNDS["psri"]), reference_crop_sipi=_avg(reference_devs, "SIPI", _BOUNDS["sipi"]), reference_crop_leaf_temp_c=r_leaf, reference_air_temp_c=r_irt, reference_crop_by_position=reference_by_pos, # Ratios (par_shading_ratio is None — no PAR sensor in 2026 fleet) ndvi_ratio=ndvi_ratio, # Treatment soil — dual profile (shallow + deep are physically distinct layers) treatment_soil_moisture_pct=_avg_soil_layer(treatment_devs, ("soilMoisture",), _BOUNDS["soil_moisture"]), treatment_soil_moisture_deep_pct=_avg_soil_layer(treatment_devs, ("soilMoisture2",), _BOUNDS["soil_moisture"]), treatment_soil_temp_c=_avg_soil_layer(treatment_devs, ("soilTemperature",), _BOUNDS["soil_temp"]), treatment_soil_temp_deep_c=_avg_soil_layer(treatment_devs, ("soilTemperature2",), _BOUNDS["soil_temp"]), treatment_soil_ec_ds_m=_avg_soil_layer(treatment_devs, ("soilBulkEC",), _BOUNDS["soil_ec"]), treatment_soil_pore_water_ec=_avg_soil_layer(treatment_devs, ("soilPoreWaterEC",), _BOUNDS["soil_ec"]), treatment_soil_pore_water_ec_deep=_avg_soil_layer(treatment_devs, ("soilPoreWaterEC2",), _BOUNDS["soil_ec"]), # Reference soil reference_soil_moisture_pct=_avg_soil_layer(reference_devs, ("soilMoisture",), _BOUNDS["soil_moisture"]), reference_soil_moisture_deep_pct=_avg_soil_layer(reference_devs, ("soilMoisture2",), _BOUNDS["soil_moisture"]), reference_soil_temp_c=_avg_soil_layer(reference_devs, ("soilTemperature",), _BOUNDS["soil_temp"]), reference_soil_pore_water_ec=_avg_soil_layer(reference_devs, ("soilPoreWaterEC",), _BOUNDS["soil_ec"]), reference_soil_pore_water_ec_deep=_avg_soil_layer(reference_devs, ("soilPoreWaterEC2",), _BOUNDS["soil_ec"]), # Panel temps treatment_panel_temp_c=t_panel_temp, panel_temp_active_count=tc_active_count, # Legacy shims (None — 2026 fleet has no PAR/DLI/CO2 sensors) treatment_ndvi=t_ndvi, treatment_pri=t_pri, treatment_crop_leaf_temp_c=t_leaf, ) return snapshot # --------------------------------------------------------------------------- # Helpers (module-level so threads can share without self) # --------------------------------------------------------------------------- def _safe_float(val: Any) -> Optional[float]: """Convert a TB telemetry value string/number to float, or None on failure.""" if val is None: return None try: f = float(val) return None if math.isnan(f) or math.isinf(f) else f except (TypeError, ValueError): return None def _safe_avg(*vals: Any) -> Optional[float]: """Return the mean of non-None, finite values, or None if none available.""" valid = [v for v in vals if v is not None and isinstance(v, (int, float)) and not math.isnan(v) and not math.isinf(v)] return sum(valid) / len(valid) if valid else None def _bounded_avg(lo: float, hi: float, *vals: Any) -> Optional[float]: """Return the mean of values within [lo, hi], rejecting sensor faults outside that range.""" valid = [v for v in vals if v is not None and isinstance(v, (int, float)) and not math.isnan(v) and not math.isinf(v) and lo <= v <= hi] return sum(valid) / len(valid) if valid else None # Physical plausibility bounds for Negev site _BOUNDS = { "air_temp": (-5.0, 55.0), # °C — extreme Negev range "leaf_temp": (-5.0, 60.0), # °C — leaves can exceed air under direct sun "soil_temp": (-2.0, 45.0), # °C — soil in Negev "soil_moisture": (0.0, 100.0), # % "soil_ec": (0.0, 10.0), # dS/m — bulk + pore water "par": (0.0, 3000.0), # µmol m⁻² s⁻¹ "vpd": (0.0, 10.0), # kPa "co2": (300.0, 2000.0), # ppm "ndvi": (-1.0, 1.0), "pri": (-1.0, 1.0), "psri": (-1.0, 1.0), "sipi": (-1.0, 2.0), # SIPI typical 0..2 (no upper saturation) "gci": (0.0, 30.0), # GCI ranges 0 .. ~25 for healthy canopy "lci": (0.0, 1.0), "duvi": (0.0, 30.0), # daily UV index "leaf_wetness": (0.0, 1.0), # error flag, normalised "dli": (0.0, 80.0), # mol m⁻² day⁻¹ "panel_temp": (-10.0, 100.0), # °C — panel surface } # --------------------------------------------------------------------------- # CLI smoke test # --------------------------------------------------------------------------- if __name__ == "__main__": client = ThingsBoardClient() print("Fetching vine snapshot from ThingsBoard...") try: snap = client.get_vine_snapshot() print(snap.to_advisor_text()) print(f"\nSnapshot age: {snap.staleness_minutes:.1f} min") except Exception as exc: print(f"Error: {exc}") print("Make sure THINGSBOARD_USERNAME/PASSWORD or THINGSBOARD_TOKEN are set in your .env")