api / src /data /thingsboard_client.py
safraeli's picture
Deploy: 2026 sensor migration + redesign + bucket B endpoints
13fc29d verified
"""
ThingsBoardClient: live telemetry client for the Seymour vineyard at
web.seymouragri.com.
2026 device layout (clean replacement of the 2024-25 sensor fleet)
------------------------------------------------------------------
TREATMENT area — rows 501 / 502 / 503 / 504 / 509, under solar panels.
Each row hosts one or more Crop_2Soil sensors at cardinal positions
(north / south / east / center). Per-device payload: dual soil profile
(shallow + deep), leaf temp, IR ambient, NDVI, 17-band spectrometer,
17 vegetation indices, 32-point ToF canopy distance grid.
REFERENCE area — row 202, open sky (no panels). Mirrors treatment with
~7 Crop_2Soil sensors carrying the same payload.
PANEL SURFACE TEMPERATURE — 12 four-channel thermocouple devices
(Thermocouples1-12) attached to panels (installation in progress;
only a subset are active at any time).
TRACKERS — 4 controllers (Tracker501/502/503/509) carrying angle + mode
telemetry, addressable via shared attributes (setAngle/setMode).
PLANT ASSET — "Yeruham Vineyard" carries plant-level energy (power, production).
AMBIENT — there is no longer an on-site outdoor station; ambient weather
comes from the IMS Sde Boker station 43 client (see ``ims_client.py``).
Credentials (env vars or .env):
THINGSBOARD_HOST — default https://web.seymouragri.com
THINGSBOARD_USERNAME — tenant login email
THINGSBOARD_PASSWORD — tenant login password
THINGSBOARD_TOKEN — pre-generated JWT (takes priority over user/pass)
"""
from __future__ import annotations
import math
import os
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
from dataclasses import dataclass, field
from datetime import datetime, timezone
from enum import Enum
from typing import Any, Dict, List, Optional, Tuple
import pandas as pd
import requests
# ---------------------------------------------------------------------------
# Enumerations
# ---------------------------------------------------------------------------
class VineArea(str, Enum):
TREATMENT = "treatment" # under solar panels
REFERENCE = "reference" # open sky, no panels
AMBIENT = "ambient" # site-level outdoor baseline
# ---------------------------------------------------------------------------
# Device registry
# ---------------------------------------------------------------------------
@dataclass(frozen=True)
class DeviceInfo:
uuid: str
device_id: int
area: VineArea
row: Optional[int]
label: str
position: Optional[str] = None # "north", "south", "center", "south-east", etc.
#: 2026 device registry mapping short name → DeviceInfo.
#: Source: TB prod entity groups "Crop 2Soil 2026" and "Thermocouples 2026"
#: (discovered 2026-05-18). The 2024-25 fleet (Air1-4, Crop1-7, Soil1-9,
#: Irrigation1, Thermocouples1-2) was decommissioned in the 2026 refresh.
DEVICE_REGISTRY: Dict[str, DeviceInfo] = {
# ---------- Crop_2Soil — TREATMENT (rows 501/502/503/504/509) ----------
"Crop_2Soil11": DeviceInfo(
uuid="a4cfc5c0-2b64-11f1-b902-5ff1ea8c4cf9", device_id=0,
area=VineArea.TREATMENT, row=501, label="501 north", position="north",
),
"Crop_2Soil8": DeviceInfo(
uuid="933b7f70-2b64-11f1-b902-5ff1ea8c4cf9", device_id=0,
area=VineArea.TREATMENT, row=501, label="501 south", position="south",
),
"Crop_2Soil1": DeviceInfo(
uuid="38437e20-2b63-11f1-b902-5ff1ea8c4cf9", device_id=0,
area=VineArea.TREATMENT, row=502, label="502 north", position="north",
),
"Crop_2Soil17": DeviceInfo(
uuid="d232bae0-2b64-11f1-b902-5ff1ea8c4cf9", device_id=0,
area=VineArea.TREATMENT, row=502, label="502 south", position="south",
),
"Crop_2Soil6": DeviceInfo(
uuid="8766eef0-2b64-11f1-b902-5ff1ea8c4cf9", device_id=0,
area=VineArea.TREATMENT, row=502, label="502 south-east", position="south-east",
),
"Crop_2Soil13": DeviceInfo( # currently inactive
uuid="bafcce10-2b64-11f1-b902-5ff1ea8c4cf9", device_id=0,
area=VineArea.TREATMENT, row=503, label="503", position=None,
),
"Crop_2Soil7": DeviceInfo(
uuid="8d965680-2b64-11f1-b902-5ff1ea8c4cf9", device_id=0,
area=VineArea.TREATMENT, row=504, label="504 north", position="north",
),
"Crop_2Soil19": DeviceInfo(
uuid="dde80390-2b64-11f1-b902-5ff1ea8c4cf9", device_id=0,
area=VineArea.TREATMENT, row=504, label="504 center", position="center",
),
"Crop_2Soil20": DeviceInfo(
uuid="e44b2550-2b64-11f1-b902-5ff1ea8c4cf9", device_id=0,
area=VineArea.TREATMENT, row=504, label="504 center-east", position="center-east",
),
"Crop_2Soil9": DeviceInfo(
uuid="9908c9d0-2b64-11f1-b902-5ff1ea8c4cf9", device_id=0,
area=VineArea.TREATMENT, row=509, label="509 south", position="south",
),
# ---------- Crop_2Soil — REFERENCE (row 202, open sky) ----------
"Crop_2Soil4": DeviceInfo(
uuid="7bea6980-2b64-11f1-b902-5ff1ea8c4cf9", device_id=0,
area=VineArea.REFERENCE, row=202, label="202 north (ref)", position="north",
),
"Crop_2Soil3": DeviceInfo(
uuid="7362e120-2b64-11f1-b902-5ff1ea8c4cf9", device_id=0,
area=VineArea.REFERENCE, row=202, label="202 north", position="north",
),
"Crop_2Soil18": DeviceInfo(
uuid="d7b8ea20-2b64-11f1-b902-5ff1ea8c4cf9", device_id=0,
area=VineArea.REFERENCE, row=202, label="202 north-east", position="north-east",
),
"Crop_2Soil2": DeviceInfo( # currently inactive
uuid="79a26ac0-2b63-11f1-b902-5ff1ea8c4cf9", device_id=0,
area=VineArea.REFERENCE, row=202, label="202 center (ref)", position="center",
),
"Crop_2Soil5": DeviceInfo(
uuid="81a95c00-2b64-11f1-b902-5ff1ea8c4cf9", device_id=0,
area=VineArea.REFERENCE, row=202, label="202 center", position="center",
),
"Crop_2Soil14": DeviceInfo(
uuid="c05cd7b0-2b64-11f1-b902-5ff1ea8c4cf9", device_id=0,
area=VineArea.REFERENCE, row=202, label="202 south", position="south",
),
"Crop_2Soil10": DeviceInfo(
uuid="9f4047b0-2b64-11f1-b902-5ff1ea8c4cf9", device_id=0,
area=VineArea.REFERENCE, row=202, label="202 south (ref)", position="south",
),
# Unlabeled / unallocated (kept for completeness; currently inactive)
"Crop_2Soil12": DeviceInfo(
uuid="aa114ae0-2b64-11f1-b902-5ff1ea8c4cf9", device_id=0,
area=VineArea.TREATMENT, row=None, label="unallocated",
),
"Crop_2Soil15": DeviceInfo(
uuid="c6574c90-2b64-11f1-b902-5ff1ea8c4cf9", device_id=0,
area=VineArea.TREATMENT, row=None, label="unallocated",
),
# ---------- Thermocouples — panel surface temps (installation in progress) ----------
"Thermocouples1": DeviceInfo(
uuid="d172ffe0-4fac-11f1-829c-09d61d29d108", device_id=0,
area=VineArea.TREATMENT, row=None, label="Panel surface thermocouples 1",
),
"Thermocouples2": DeviceInfo(
uuid="e0d87f50-4fac-11f1-829c-09d61d29d108", device_id=0,
area=VineArea.TREATMENT, row=None, label="Panel surface thermocouples 2",
),
"Thermocouples3": DeviceInfo(
uuid="e737d080-4fac-11f1-829c-09d61d29d108", device_id=0,
area=VineArea.TREATMENT, row=None, label="Panel surface thermocouples 3",
),
"Thermocouples4": DeviceInfo(
uuid="ed4901b0-4fac-11f1-829c-09d61d29d108", device_id=0,
area=VineArea.TREATMENT, row=None, label="Panel surface thermocouples 4",
),
"Thermocouples5": DeviceInfo(
uuid="f3f07f70-4fac-11f1-829c-09d61d29d108", device_id=0,
area=VineArea.TREATMENT, row=None, label="Panel surface thermocouples 5",
),
"Thermocouples6": DeviceInfo(
uuid="faa591c0-4fac-11f1-829c-09d61d29d108", device_id=0,
area=VineArea.TREATMENT, row=None, label="Panel surface thermocouples 6",
),
"Thermocouples7": DeviceInfo(
uuid="0095a660-4fad-11f1-829c-09d61d29d108", device_id=0,
area=VineArea.TREATMENT, row=None, label="Panel surface thermocouples 7",
),
"Thermocouples8": DeviceInfo(
uuid="07168950-4fad-11f1-829c-09d61d29d108", device_id=0,
area=VineArea.TREATMENT, row=None, label="Panel surface thermocouples 8",
),
"Thermocouples9": DeviceInfo(
uuid="0e69fe80-4fad-11f1-829c-09d61d29d108", device_id=0,
area=VineArea.TREATMENT, row=None, label="Panel surface thermocouples 9",
),
"Thermocouples10": DeviceInfo(
uuid="14e36760-4fad-11f1-829c-09d61d29d108", device_id=0,
area=VineArea.TREATMENT, row=None, label="Panel surface thermocouples 10",
),
"Thermocouples11": DeviceInfo(
uuid="1b513780-4fad-11f1-829c-09d61d29d108", device_id=0,
area=VineArea.TREATMENT, row=None, label="Panel surface thermocouples 11",
),
"Thermocouples12": DeviceInfo(
uuid="2121dd40-4fad-11f1-829c-09d61d29d108", device_id=0,
area=VineArea.TREATMENT, row=None, label="Panel surface thermocouples 12",
),
# ---------- Tracker controllers (panel angle + mode) ----------
"Tracker501": DeviceInfo(
uuid="aac06e50-f769-11f0-b902-5ff1ea8c4cf9", device_id=0,
area=VineArea.TREATMENT, row=501, label="Tracker row 501",
),
"Tracker502": DeviceInfo(
uuid="b99bd630-f769-11f0-b902-5ff1ea8c4cf9", device_id=0,
area=VineArea.TREATMENT, row=502, label="Tracker row 502",
),
"Tracker503": DeviceInfo(
uuid="caffe4c0-f769-11f0-b902-5ff1ea8c4cf9", device_id=0,
area=VineArea.TREATMENT, row=503, label="Tracker row 503",
),
"Tracker509": DeviceInfo(
uuid="bacf7c50-fcdc-11f0-b902-5ff1ea8c4cf9", device_id=0,
area=VineArea.TREATMENT, row=509, label="Tracker row 509",
),
}
# Convenience subsets — derived once at import time.
CROP_2SOIL_DEVICES: List[str] = [
n for n, d in DEVICE_REGISTRY.items() if n.startswith("Crop_2Soil")
]
THERMOCOUPLE_DEVICES: List[str] = [
n for n in DEVICE_REGISTRY if n.startswith("Thermocouples")
]
TREATMENT_DEVICES: List[str] = [
n for n, d in DEVICE_REGISTRY.items()
if d.area == VineArea.TREATMENT and n.startswith("Crop_2Soil") and d.row is not None
]
REFERENCE_DEVICES: List[str] = [
n for n, d in DEVICE_REGISTRY.items()
if d.area == VineArea.REFERENCE and n.startswith("Crop_2Soil")
]
# ---------------------------------------------------------------------------
# Asset registry (non-device entities — e.g. the plant-level energy asset)
# ---------------------------------------------------------------------------
@dataclass(frozen=True)
class AssetInfo:
uuid: str
label: str
ASSET_REGISTRY: Dict[str, AssetInfo] = {
"Plant": AssetInfo(
uuid="dc94ddb0-dbe6-11f0-9352-a53ca0b6a212",
label="Yeruham Vineyard — plant-level energy",
),
}
ENERGY_KEYS: List[str] = ["power", "production"]
TRACKER_KEYS: List[str] = ["angle", "manualMode", "setAngle", "setMode"]
# ---------------------------------------------------------------------------
# Telemetry key sets per device type
# ---------------------------------------------------------------------------
# Core subset of the 2026 Crop_2Soil 118-key payload — chosen to keep VineSnapshot
# size sane while covering every field downstream code reads. Extend as needed.
CROP_2SOIL_KEYS: List[str] = [
# canopy / leaf
"leafTemperature", "ambientTemperatureIRT", "NDVI", "leafWetnessErr",
# soil profile 1 (shallow)
"soilTemperature", "soilDielectric", "soilBulkEC",
"soilMoisture", "soilPoreWaterEC",
# soil profile 2 (deep)
"soilTemperature2", "soilDielectric2", "soilBulkEC2",
"soilMoisture2", "soilPoreWaterEC2",
# vegetation indices (subset surfaced via VineSnapshot)
"PRI", "PSRI", "SIPI", "GCI", "LCI",
# UV stress proxy
"DUVI",
# health
"batteryPercentage", "rssi",
]
# 4 thermocouple channels per device; "internalTemperature_n" is the cold-junction reference.
THERMOCOUPLE_KEYS: List[str] = [
"thermocoupleTemperature_1", "thermocoupleTemperature_2",
"thermocoupleTemperature_3", "thermocoupleTemperature_4",
"batteryPercentage", "rssi",
]
# ---------------------------------------------------------------------------
# VineSnapshot dataclass
# ---------------------------------------------------------------------------
@dataclass
class VineSnapshot:
"""
Aggregated real-time vine state from the 2026 ThingsBoard fleet.
Fields are grouped by area:
- ambient : reserved (None — sourced from IMS, not from TB)
- treatment : under solar panels (rows 501/502/503/504/509)
- reference : open sky / no panels (row 202)
None means no sensor returned a value (or no sensor exists for that field).
"""
snapshot_ts: datetime
staleness_minutes: float
# --- Ambient (reserved — sourced from IMS Sde Boker, not TB) ---
ambient_temp_c: Optional[float] = None
ambient_humidity_pct: Optional[float] = None
ambient_wind_speed_ms: Optional[float] = None
ambient_wind_angle_deg: Optional[float] = None
ambient_rain_mm: Optional[float] = None
# --- Treatment microclimate (now derived from Crop_2Soil IRT + leaf temp) ---
treatment_air_temp_c: Optional[float] = None # avg ambientTemperatureIRT
treatment_leaf_temp_c: Optional[float] = None # avg leafTemperature
treatment_vpd_kpa: Optional[float] = None # None — no dedicated VPD sensor
treatment_co2_ppm: Optional[float] = None # None — no CO2 sensor in 2026 fleet
treatment_air_leaf_delta_t: Optional[float] = None # derived = leaf - irt
# --- Treatment crop (avg over treatment Crop_2Soil devices) ---
treatment_crop_ndvi: Optional[float] = None
treatment_crop_pri: Optional[float] = None
treatment_crop_psri: Optional[float] = None
treatment_crop_sipi: Optional[float] = None
treatment_crop_gci: Optional[float] = None
treatment_crop_lci: Optional[float] = None
treatment_crop_duvi: Optional[float] = None
treatment_leaf_wetness_err: Optional[float] = None
# Per-position readings keyed by "row-position" (e.g. "502-north")
treatment_crop_by_position: Dict[str, Dict[str, Optional[float]]] = field(default_factory=dict)
# --- Reference crop (avg over reference Crop_2Soil devices) ---
reference_crop_ndvi: Optional[float] = None
reference_crop_pri: Optional[float] = None
reference_crop_psri: Optional[float] = None
reference_crop_sipi: Optional[float] = None
reference_crop_leaf_temp_c: Optional[float] = None
reference_air_temp_c: Optional[float] = None
reference_crop_by_position: Dict[str, Dict[str, Optional[float]]] = field(default_factory=dict)
# --- Index ratios (treatment / reference) — proxy for shading benefit/cost ---
ndvi_ratio: Optional[float] = None # <1 = panels reduce greenness
par_shading_ratio: Optional[float] = None # placeholder — no PAR sensor in 2026 fleet
# --- Treatment soil (avg over treatment Crop_2Soil; dual profile) ---
treatment_soil_moisture_pct: Optional[float] = None # shallow avg
treatment_soil_moisture_deep_pct: Optional[float] = None # deep avg
treatment_soil_temp_c: Optional[float] = None # shallow avg
treatment_soil_temp_deep_c: Optional[float] = None
treatment_soil_ec_ds_m: Optional[float] = None # bulk EC, shallow
treatment_soil_pore_water_ec: Optional[float] = None # pore water EC, shallow
treatment_soil_pore_water_ec_deep: Optional[float] = None # pore water EC, deep
treatment_soil_ph: Optional[float] = None # None — no pH in 2026
# --- Reference soil ---
reference_soil_moisture_pct: Optional[float] = None
reference_soil_moisture_deep_pct: Optional[float] = None
reference_soil_temp_c: Optional[float] = None
reference_soil_pore_water_ec: Optional[float] = None
reference_soil_pore_water_ec_deep: Optional[float] = None
# --- Irrigation (no dedicated device in 2026 — kept None; see soil moisture trends) ---
irrigation_last_volume_l: Optional[float] = None
irrigation_last_minutes: Optional[float] = None
irrigation_ec: Optional[float] = None
irrigation_ph: Optional[float] = None
water_temp_c: Optional[float] = None
# --- Panel surface temperatures (avg across all active Thermocouples1-12) ---
treatment_panel_temp_c: Optional[float] = None
panel_temp_active_count: int = 0 # how many of the 12 devices reported
reference_panel_temp_c: Optional[float] = None # always None — no reference TC in 2026
# --- Compatibility shims for downstream consumers --------------------
# The 2024-25 schema exposed PAR/DLI fields. The 2026 fleet has no PAR
# sensor; we expose Nones here so consumers don't crash. New deployments
# should consume NDVI/PRI/PSRI directly.
treatment_par_umol: Optional[float] = None
treatment_dli_mol_m2: Optional[float] = None
treatment_ndvi: Optional[float] = None # alias of treatment_crop_ndvi
treatment_pri: Optional[float] = None # alias of treatment_crop_pri
treatment_crop_par_umol: Optional[float] = None
treatment_crop_leaf_temp_c: Optional[float] = None
treatment_crop_dli_mol_m2: Optional[float] = None
treatment_crop_par_avg1h: Optional[float] = None
reference_crop_par_umol: Optional[float] = None
reference_crop_dli_mol_m2: Optional[float] = None
def to_advisor_text(self) -> str:
"""Format snapshot for inclusion in an AI advisory prompt."""
age = f"{self.staleness_minutes:.0f}" if self.staleness_minutes < 120 else f">{self.staleness_minutes:.0f}"
lines = [f"VINE STATE (ThingsBoard 2026 sensors, ~{age} min ago):"]
lines.append(" TREATMENT area (rows 501/502/503/504/509, under solar panels):")
if self.treatment_air_temp_c is not None:
lines.append(f" Air temperature (IRT): {self.treatment_air_temp_c:.1f} C")
if self.treatment_leaf_temp_c is not None:
lines.append(f" Leaf temperature: {self.treatment_leaf_temp_c:.1f} C")
if self.treatment_air_leaf_delta_t is not None:
lines.append(f" Leaf-air delta-T: {self.treatment_air_leaf_delta_t:+.1f} C (proxy for heat stress)")
if self.treatment_crop_ndvi is not None:
lines.append(f" NDVI: {self.treatment_crop_ndvi:.3f}")
if self.treatment_crop_pri is not None:
lines.append(f" PRI: {self.treatment_crop_pri:.3f} (light-use efficiency)")
if self.treatment_crop_psri is not None:
lines.append(f" PSRI: {self.treatment_crop_psri:.3f} (senescence)")
if self.treatment_soil_moisture_pct is not None:
line = f" Soil moisture (shallow): {self.treatment_soil_moisture_pct:.1f}%"
if self.treatment_soil_moisture_deep_pct is not None:
line += f" / deep {self.treatment_soil_moisture_deep_pct:.1f}%"
lines.append(line)
if self.treatment_soil_temp_c is not None:
lines.append(f" Soil temperature: {self.treatment_soil_temp_c:.1f} C")
if self.treatment_soil_pore_water_ec is not None:
lines.append(f" Soil pore-water EC: {self.treatment_soil_pore_water_ec:.2f} dS/m")
if self.treatment_panel_temp_c is not None:
lines.append(
f" Panel surface temp: {self.treatment_panel_temp_c:.1f} C "
f"({self.panel_temp_active_count}/12 thermocouples)"
)
if self.treatment_crop_by_position:
lines.append(" Per-position readings:")
for pos, vals in self.treatment_crop_by_position.items():
tl = vals.get("leaf_temp")
nv = vals.get("ndvi")
sm = vals.get("soil_moisture")
tl_s = f"leaf {tl:.1f} C" if tl is not None else "leaf N/A"
nv_s = f" | NDVI {nv:.2f}" if nv is not None else ""
sm_s = f" | soil {sm:.1f}%" if sm is not None else ""
lines.append(f" {pos}: {tl_s}{nv_s}{sm_s}")
lines.append("")
lines.append(" REFERENCE area (row 202, open sky, no panels):")
if self.reference_air_temp_c is not None:
lines.append(f" Air temperature (IRT): {self.reference_air_temp_c:.1f} C")
if self.reference_crop_leaf_temp_c is not None:
lines.append(f" Leaf temperature: {self.reference_crop_leaf_temp_c:.1f} C")
if self.reference_crop_ndvi is not None:
lines.append(f" NDVI: {self.reference_crop_ndvi:.3f}")
if self.reference_crop_pri is not None:
lines.append(f" PRI: {self.reference_crop_pri:.3f}")
if self.reference_soil_moisture_pct is not None:
line = f" Soil moisture (shallow): {self.reference_soil_moisture_pct:.1f}%"
if self.reference_soil_moisture_deep_pct is not None:
line += f" / deep {self.reference_soil_moisture_deep_pct:.1f}%"
lines.append(line)
if self.reference_crop_by_position:
lines.append(" Per-position readings:")
for pos, vals in self.reference_crop_by_position.items():
tl = vals.get("leaf_temp")
nv = vals.get("ndvi")
tl_s = f"leaf {tl:.1f} C" if tl is not None else "leaf N/A"
nv_s = f" | NDVI {nv:.2f}" if nv is not None else ""
lines.append(f" {pos}: {tl_s}{nv_s}")
if self.ndvi_ratio is not None:
reduction_pct = (1 - self.ndvi_ratio) * 100
lines.append("")
direction = "below" if reduction_pct >= 0 else "above"
lines.append(
f" NDVI ratio (treatment/reference): {self.ndvi_ratio:.2f}"
f" ({abs(reduction_pct):.0f}% {direction} reference)"
)
return "\n".join(lines)
def to_dict(self) -> Dict[str, Any]:
"""Return a flat dict suitable for JSON serialization (e.g., chatbot tool result)."""
out: Dict[str, Any] = {
"snapshot_ts": self.snapshot_ts.isoformat(),
"staleness_minutes": round(self.staleness_minutes, 1),
"panel_temp_active_count": self.panel_temp_active_count,
}
for attr in (
# ambient (None until IMS bridge wired)
"ambient_temp_c", "ambient_humidity_pct", "ambient_wind_speed_ms",
"ambient_wind_angle_deg", "ambient_rain_mm",
# treatment air/leaf
"treatment_air_temp_c", "treatment_leaf_temp_c", "treatment_vpd_kpa",
"treatment_co2_ppm", "treatment_air_leaf_delta_t",
# treatment indices
"treatment_crop_ndvi", "treatment_crop_pri", "treatment_crop_psri",
"treatment_crop_sipi", "treatment_crop_gci", "treatment_crop_lci",
"treatment_crop_duvi", "treatment_leaf_wetness_err",
# reference indices
"reference_crop_ndvi", "reference_crop_pri", "reference_crop_psri",
"reference_crop_sipi", "reference_crop_leaf_temp_c", "reference_air_temp_c",
# ratios
"ndvi_ratio", "par_shading_ratio",
# treatment soil
"treatment_soil_moisture_pct", "treatment_soil_moisture_deep_pct",
"treatment_soil_temp_c", "treatment_soil_temp_deep_c",
"treatment_soil_ec_ds_m",
"treatment_soil_pore_water_ec", "treatment_soil_pore_water_ec_deep",
"treatment_soil_ph",
# reference soil
"reference_soil_moisture_pct", "reference_soil_moisture_deep_pct",
"reference_soil_temp_c",
"reference_soil_pore_water_ec", "reference_soil_pore_water_ec_deep",
# irrigation (None until re-instrumented)
"irrigation_last_volume_l", "irrigation_last_minutes",
"irrigation_ec", "irrigation_ph", "water_temp_c",
# panels
"treatment_panel_temp_c", "reference_panel_temp_c",
# legacy shims (None on 2026 hardware)
"treatment_par_umol", "treatment_dli_mol_m2", "treatment_ndvi", "treatment_pri",
"treatment_crop_par_umol", "treatment_crop_leaf_temp_c",
"treatment_crop_dli_mol_m2", "treatment_crop_par_avg1h",
"reference_crop_par_umol", "reference_crop_dli_mol_m2",
):
val = getattr(self, attr)
out[attr] = round(val, 3) if val is not None else None
out["treatment_crop_by_position"] = self.treatment_crop_by_position
out["reference_crop_by_position"] = self.reference_crop_by_position
return out
# ---------------------------------------------------------------------------
# Configuration
# ---------------------------------------------------------------------------
@dataclass
class ThingsBoardConfig:
"""ThingsBoard connection settings. Data retrieval always uses prod (Seymour)."""
# Prod only — test (eu.thingsboard.cloud) is for deploying apps, not data
host: str = os.environ.get("THINGSBOARD_HOST", "https://web.seymouragri.com/")
username: Optional[str] = (
os.environ.get("THINGSBOARD_USERNAME") or os.environ.get("TB_USERNAME")
)
password: Optional[str] = (
os.environ.get("THINGSBOARD_PASSWORD") or os.environ.get("TB_PASSWORD")
)
token: Optional[str] = os.environ.get("THINGSBOARD_TOKEN")
# ---------------------------------------------------------------------------
# Client
# ---------------------------------------------------------------------------
class ThingsBoardClient:
"""
Minimal ThingsBoard client for the Seymour vineyard.
Authentication
--------------
Provide THINGSBOARD_TOKEN for a pre-generated JWT, or
THINGSBOARD_USERNAME + THINGSBOARD_PASSWORD for login-based auth.
Tokens are cached and refreshed automatically before they expire.
Usage
-----
client = ThingsBoardClient()
snapshot = client.get_vine_snapshot()
print(snapshot.to_advisor_text())
"""
_TOKEN_TTL_SECONDS = 8_000 # ThingsBoard default is 9000 s; be conservative
def __init__(self, config: Optional[ThingsBoardConfig] = None) -> None:
self.config = config or ThingsBoardConfig()
self._session = requests.Session()
self._session.headers.update({"Content-Type": "application/json"})
self._jwt: Optional[str] = None
self._jwt_expires_at: float = 0.0
# ------------------------------------------------------------------
# Authentication
# ------------------------------------------------------------------
def _ensure_jwt(self) -> str:
"""Return a valid JWT, obtaining or refreshing as needed."""
if self.config.token:
if "X-Authorization" not in self._session.headers:
self._session.headers["X-Authorization"] = f"Bearer {self.config.token}"
return self.config.token
if self._jwt and time.monotonic() < self._jwt_expires_at:
return self._jwt
if not self.config.username or not self.config.password:
raise RuntimeError(
"ThingsBoard authentication requires THINGSBOARD_TOKEN "
"or both THINGSBOARD_USERNAME and THINGSBOARD_PASSWORD."
)
url = f"{self.config.host.rstrip('/')}/api/auth/login"
resp = self._session.post(
url,
json={"username": self.config.username, "password": self.config.password},
timeout=10,
)
resp.raise_for_status()
token = resp.json()["token"]
self._jwt = token
self._jwt_expires_at = time.monotonic() + self._TOKEN_TTL_SECONDS
self._session.headers["X-Authorization"] = f"Bearer {token}"
return token
# ------------------------------------------------------------------
# Low-level API calls
# ------------------------------------------------------------------
# ------------------------------------------------------------------
# Shared low-level helpers (DEVICE and ASSET use the same REST API,
# differing only in the entity-type path segment).
# ------------------------------------------------------------------
def _fetch_latest_raw(
self,
entity_type: str,
uuid: str,
keys: List[str],
) -> Tuple[Dict[str, Optional[float]], Optional[datetime]]:
"""Fetch most-recent telemetry for any entity type (DEVICE or ASSET)."""
self._ensure_jwt()
url = (
f"{self.config.host.rstrip('/')}/api/plugins/telemetry/{entity_type}"
f"/{uuid}/values/timeseries"
)
resp = self._session.get(url, params={"keys": ",".join(keys)}, timeout=15)
resp.raise_for_status()
raw: Dict[str, List[Dict]] = resp.json()
values: Dict[str, Optional[float]] = {}
newest_ts_ms: Optional[int] = None
for key in keys:
entries = raw.get(key, [])
if entries:
values[key] = _safe_float(entries[0]["value"])
ts_ms = entries[0].get("ts")
if ts_ms and (newest_ts_ms is None or ts_ms > newest_ts_ms):
newest_ts_ms = ts_ms
else:
values[key] = None
newest_ts = (
datetime.fromtimestamp(newest_ts_ms / 1000, tz=timezone.utc)
if newest_ts_ms else None
)
return values, newest_ts
def _fetch_timeseries_raw(
self,
entity_type: str,
uuid: str,
keys: List[str],
start: datetime,
end: datetime,
limit: int = 1000,
interval_ms: int = 900_000,
agg: str = "NONE",
) -> pd.DataFrame:
"""Fetch time-series telemetry for any entity type (DEVICE or ASSET)."""
self._ensure_jwt()
start_ms = int(start.timestamp() * 1000)
end_ms = int(end.timestamp() * 1000)
url = (
f"{self.config.host.rstrip('/')}/api/plugins/telemetry/{entity_type}"
f"/{uuid}/values/timeseries"
)
params: Dict[str, Any] = {
"keys": ",".join(keys),
"startTs": start_ms,
"endTs": end_ms,
"limit": limit,
"agg": agg,
}
if agg != "NONE":
params["interval"] = interval_ms
resp = self._session.get(url, params=params, timeout=30)
resp.raise_for_status()
raw: Dict[str, List[Dict]] = resp.json()
frames: Dict[str, pd.Series] = {}
for key, entries in raw.items():
if key in keys and entries:
ts = pd.to_datetime([e["ts"] for e in entries], unit="ms", utc=True)
vals = [_safe_float(e["value"]) for e in entries]
frames[key] = pd.Series(vals, index=ts)
if not frames:
return pd.DataFrame()
return pd.DataFrame(frames).sort_index()
# ------------------------------------------------------------------
# Device API (public)
# ------------------------------------------------------------------
def _fetch_latest(
self,
device_name: str,
keys: List[str],
) -> Tuple[Dict[str, Optional[float]], Optional[datetime]]:
"""Fetch most-recent values for a named device."""
info = DEVICE_REGISTRY[device_name]
return self._fetch_latest_raw("DEVICE", info.uuid, keys)
def get_latest_telemetry(
self,
device_name: str,
keys: List[str],
) -> Dict[str, Optional[float]]:
"""Return the most recent value for each key. Missing keys return None."""
if device_name not in DEVICE_REGISTRY:
raise KeyError(
f"Unknown device: {device_name!r}. "
f"Valid names: {sorted(DEVICE_REGISTRY)}"
)
values, _ = self._fetch_latest(device_name, keys)
return values
def get_timeseries(
self,
device_name: str,
keys: List[str],
start: datetime,
end: datetime,
limit: int = 1000,
interval_ms: int = 900_000, # 15 minutes
agg: str = "NONE",
) -> pd.DataFrame:
"""Fetch time-series telemetry for a named device."""
if device_name not in DEVICE_REGISTRY:
raise KeyError(f"Unknown device: {device_name!r}")
info = DEVICE_REGISTRY[device_name]
return self._fetch_timeseries_raw(
"DEVICE", info.uuid, keys, start, end, limit, interval_ms, agg,
)
# ------------------------------------------------------------------
# Asset API (public)
# ------------------------------------------------------------------
def get_asset_timeseries(
self,
asset_name: str,
keys: List[str],
start: datetime,
end: datetime,
limit: int = 1000,
interval_ms: int = 3_600_000, # 1 hour
agg: str = "SUM",
) -> pd.DataFrame:
"""Fetch time-series from a ThingsBoard ASSET (e.g. Plant energy)."""
if asset_name not in ASSET_REGISTRY:
raise KeyError(f"Unknown asset: {asset_name!r}. Valid: {sorted(ASSET_REGISTRY)}")
info = ASSET_REGISTRY[asset_name]
return self._fetch_timeseries_raw(
"ASSET", info.uuid, keys, start, end, limit, interval_ms, agg,
)
def get_asset_latest(
self,
asset_name: str,
keys: List[str],
) -> Dict[str, Optional[float]]:
"""Fetch latest telemetry from a ThingsBoard ASSET."""
if asset_name not in ASSET_REGISTRY:
raise KeyError(f"Unknown asset: {asset_name!r}")
info = ASSET_REGISTRY[asset_name]
values, _ = self._fetch_latest_raw("ASSET", info.uuid, keys)
return values
# ------------------------------------------------------------------
# Device commands (RPC + attribute writes)
# ------------------------------------------------------------------
def send_rpc_command(
self,
device_name: str,
method: str,
params: Any = None,
timeout: float = 10.0,
) -> Dict[str, Any]:
"""Send a two-way RPC command to a device.
Uses POST /api/plugins/rpc/twoway/{deviceId}.
Falls back to one-way if two-way returns 404.
"""
if device_name not in DEVICE_REGISTRY:
raise KeyError(f"Unknown device: {device_name!r}")
info = DEVICE_REGISTRY[device_name]
self._ensure_jwt()
payload = {"method": method, "params": params if params is not None else {}}
# Try two-way RPC first
url = (
f"{self.config.host.rstrip('/')}/api/plugins/rpc/twoway"
f"/{info.uuid}"
)
resp = self._session.post(url, json=payload, timeout=timeout)
if resp.status_code in (404, 405):
# Fallback to one-way RPC
url = (
f"{self.config.host.rstrip('/')}/api/plugins/rpc/oneway"
f"/{info.uuid}"
)
resp = self._session.post(url, json=payload, timeout=timeout)
resp.raise_for_status()
try:
return resp.json()
except Exception:
return {"status": "ok", "status_code": resp.status_code}
def set_device_attributes(
self,
device_name: str,
attributes: Dict[str, Any],
scope: str = "SHARED_SCOPE",
) -> None:
"""Write server-side attributes to a device.
Uses POST /api/plugins/telemetry/DEVICE/{id}/attributes/{scope}.
This is an alternative to RPC for setting tracker targets.
"""
if device_name not in DEVICE_REGISTRY:
raise KeyError(f"Unknown device: {device_name!r}")
info = DEVICE_REGISTRY[device_name]
self._ensure_jwt()
url = (
f"{self.config.host.rstrip('/')}/api/plugins/telemetry/DEVICE"
f"/{info.uuid}/attributes/{scope}"
)
resp = self._session.post(url, json=attributes, timeout=10)
resp.raise_for_status()
# ------------------------------------------------------------------
# High-level vine snapshot
# ------------------------------------------------------------------
# Dashboard-only: one treatment + one reference Crop_2Soil for farmer view.
_DASHBOARD_FETCH_PLAN: Dict[str, List[str]] = {
"Crop_2Soil1": CROP_2SOIL_KEYS, # 502 north (treatment)
"Crop_2Soil4": CROP_2SOIL_KEYS, # 202 north reference
}
# Light mode: a couple per area + one thermocouple for the chatbot.
_LIGHT_FETCH_PLAN: Dict[str, List[str]] = {
"Crop_2Soil1": CROP_2SOIL_KEYS,
"Crop_2Soil11": CROP_2SOIL_KEYS,
"Crop_2Soil4": CROP_2SOIL_KEYS,
"Crop_2Soil10": CROP_2SOIL_KEYS,
"Thermocouples3": THERMOCOUPLE_KEYS,
}
# Full mode: every Crop_2Soil with a known row + every thermocouple.
_FULL_FETCH_PLAN: Dict[str, List[str]] = {
**{n: CROP_2SOIL_KEYS for n in TREATMENT_DEVICES + REFERENCE_DEVICES},
**{n: THERMOCOUPLE_KEYS for n in THERMOCOUPLE_DEVICES},
}
def get_vine_snapshot(self, light: bool = False,
mode: Optional[str] = None) -> VineSnapshot:
"""
Fetch latest telemetry from the 2026 Crop_2Soil + thermocouple fleet
and return an aggregated VineSnapshot distinguishing treatment vs
reference areas.
Uses a thread pool to parallelise HTTP requests.
Individual device failures are silently skipped (returns None fields).
Parameters
----------
light : bool
If True, fetch a 5-device subset instead of the full fleet.
mode : str, optional
"dashboard" = 2 devices (one treatment, one reference).
Overrides `light` when set.
"""
if mode == "dashboard":
fetch_plan = self._DASHBOARD_FETCH_PLAN
elif light:
fetch_plan = self._LIGHT_FETCH_PLAN
else:
fetch_plan = self._FULL_FETCH_PLAN
# Ensure auth token before spawning threads (avoid race on login)
self._ensure_jwt()
raw_results: Dict[str, Dict[str, Optional[float]]] = {}
newest_ts_overall: Optional[datetime] = None
with ThreadPoolExecutor(max_workers=8) as pool:
future_map = {
pool.submit(self._fetch_latest, name, keys): name
for name, keys in fetch_plan.items()
}
for future in as_completed(future_map, timeout=25):
name = future_map[future]
try:
values, ts = future.result()
raw_results[name] = values
if ts and (newest_ts_overall is None or ts > newest_ts_overall):
newest_ts_overall = ts
except Exception:
raw_results[name] = {}
now = datetime.now(tz=timezone.utc)
staleness = (
(now - newest_ts_overall).total_seconds() / 60
if newest_ts_overall else float("nan")
)
# ---------- Bucket results by area + collect per-position rows ----------
treatment_devs: List[Dict[str, Any]] = []
reference_devs: List[Dict[str, Any]] = []
treatment_by_pos: Dict[str, Dict[str, Optional[float]]] = {}
reference_by_pos: Dict[str, Dict[str, Optional[float]]] = {}
for name, values in raw_results.items():
if not name.startswith("Crop_2Soil"):
continue
info = DEVICE_REGISTRY.get(name)
if info is None or not values:
continue
pos_key = (
f"{info.row}-{info.position}"
if info.row is not None and info.position
else (str(info.row) if info.row is not None else name)
)
row_summary = {
"leaf_temp": values.get("leafTemperature"),
"air_temp_irt": values.get("ambientTemperatureIRT"),
"ndvi": values.get("NDVI"),
"pri": values.get("PRI"),
"psri": values.get("PSRI"),
"soil_moisture": values.get("soilMoisture"),
"soil_temp": values.get("soilTemperature"),
}
if info.area == VineArea.TREATMENT:
treatment_devs.append(values)
treatment_by_pos[pos_key] = row_summary
elif info.area == VineArea.REFERENCE:
reference_devs.append(values)
reference_by_pos[pos_key] = row_summary
# ---------- Helpers to average a key across devices, with bounds ----------
def _avg(devs: List[Dict[str, Any]], key: str, bounds: Tuple[float, float]) -> Optional[float]:
return _bounded_avg(*bounds, *[d.get(key) for d in devs])
def _avg_soil_layer(devs: List[Dict[str, Any]], keys: Tuple[str, ...],
bounds: Tuple[float, float]) -> Optional[float]:
vals = [d.get(k) for d in devs for k in keys if d.get(k) is not None]
return _bounded_avg(*bounds, *vals) if vals else None
# ---------- Air / leaf ----------
t_leaf = _avg(treatment_devs, "leafTemperature", _BOUNDS["leaf_temp"])
t_irt = _avg(treatment_devs, "ambientTemperatureIRT", _BOUNDS["air_temp"])
r_leaf = _avg(reference_devs, "leafTemperature", _BOUNDS["leaf_temp"])
r_irt = _avg(reference_devs, "ambientTemperatureIRT", _BOUNDS["air_temp"])
t_delta = (t_leaf - t_irt) if (t_leaf is not None and t_irt is not None) else None
# ---------- NDVI ratio (treatment / reference) ----------
t_ndvi = _avg(treatment_devs, "NDVI", _BOUNDS["ndvi"])
r_ndvi = _avg(reference_devs, "NDVI", _BOUNDS["ndvi"])
t_pri = _avg(treatment_devs, "PRI", _BOUNDS["pri"])
ndvi_ratio: Optional[float] = None
if t_ndvi is not None and r_ndvi is not None and r_ndvi > 0:
ndvi_ratio = t_ndvi / r_ndvi
# ---------- Panel surface temps (avg over active thermocouples) ----------
tc_vals: List[float] = []
tc_active_count = 0
for name in THERMOCOUPLE_DEVICES:
tc = raw_results.get(name, {})
channels = [tc.get(k) for k in THERMOCOUPLE_KEYS if k.startswith("thermocoupleTemperature_")]
if any(v is not None for v in channels):
tc_active_count += 1
tc_vals.extend(channels)
t_panel_temp = _bounded_avg(*_BOUNDS["panel_temp"], *tc_vals) if tc_vals else None
snapshot = VineSnapshot(
snapshot_ts=now,
staleness_minutes=staleness,
# Ambient — reserved (sourced from IMS, not TB)
# Treatment climate (now derived from Crop_2Soil IRT + leaf)
treatment_air_temp_c=t_irt,
treatment_leaf_temp_c=t_leaf,
treatment_air_leaf_delta_t=t_delta,
# Treatment crop indices
treatment_crop_ndvi=t_ndvi,
treatment_crop_pri=t_pri,
treatment_crop_psri=_avg(treatment_devs, "PSRI", _BOUNDS["psri"]),
treatment_crop_sipi=_avg(treatment_devs, "SIPI", _BOUNDS["sipi"]),
treatment_crop_gci=_avg(treatment_devs, "GCI", _BOUNDS["gci"]),
treatment_crop_lci=_avg(treatment_devs, "LCI", _BOUNDS["lci"]),
treatment_crop_duvi=_avg(treatment_devs, "DUVI", _BOUNDS["duvi"]),
treatment_leaf_wetness_err=_avg(
treatment_devs, "leafWetnessErr", _BOUNDS["leaf_wetness"]
),
treatment_crop_by_position=treatment_by_pos,
# Reference crop indices
reference_crop_ndvi=r_ndvi,
reference_crop_pri=_avg(reference_devs, "PRI", _BOUNDS["pri"]),
reference_crop_psri=_avg(reference_devs, "PSRI", _BOUNDS["psri"]),
reference_crop_sipi=_avg(reference_devs, "SIPI", _BOUNDS["sipi"]),
reference_crop_leaf_temp_c=r_leaf,
reference_air_temp_c=r_irt,
reference_crop_by_position=reference_by_pos,
# Ratios (par_shading_ratio is None — no PAR sensor in 2026 fleet)
ndvi_ratio=ndvi_ratio,
# Treatment soil — dual profile (shallow + deep are physically distinct layers)
treatment_soil_moisture_pct=_avg_soil_layer(treatment_devs, ("soilMoisture",), _BOUNDS["soil_moisture"]),
treatment_soil_moisture_deep_pct=_avg_soil_layer(treatment_devs, ("soilMoisture2",), _BOUNDS["soil_moisture"]),
treatment_soil_temp_c=_avg_soil_layer(treatment_devs, ("soilTemperature",), _BOUNDS["soil_temp"]),
treatment_soil_temp_deep_c=_avg_soil_layer(treatment_devs, ("soilTemperature2",), _BOUNDS["soil_temp"]),
treatment_soil_ec_ds_m=_avg_soil_layer(treatment_devs, ("soilBulkEC",), _BOUNDS["soil_ec"]),
treatment_soil_pore_water_ec=_avg_soil_layer(treatment_devs, ("soilPoreWaterEC",), _BOUNDS["soil_ec"]),
treatment_soil_pore_water_ec_deep=_avg_soil_layer(treatment_devs, ("soilPoreWaterEC2",), _BOUNDS["soil_ec"]),
# Reference soil
reference_soil_moisture_pct=_avg_soil_layer(reference_devs, ("soilMoisture",), _BOUNDS["soil_moisture"]),
reference_soil_moisture_deep_pct=_avg_soil_layer(reference_devs, ("soilMoisture2",), _BOUNDS["soil_moisture"]),
reference_soil_temp_c=_avg_soil_layer(reference_devs, ("soilTemperature",), _BOUNDS["soil_temp"]),
reference_soil_pore_water_ec=_avg_soil_layer(reference_devs, ("soilPoreWaterEC",), _BOUNDS["soil_ec"]),
reference_soil_pore_water_ec_deep=_avg_soil_layer(reference_devs, ("soilPoreWaterEC2",), _BOUNDS["soil_ec"]),
# Panel temps
treatment_panel_temp_c=t_panel_temp,
panel_temp_active_count=tc_active_count,
# Legacy shims (None — 2026 fleet has no PAR/DLI/CO2 sensors)
treatment_ndvi=t_ndvi,
treatment_pri=t_pri,
treatment_crop_leaf_temp_c=t_leaf,
)
return snapshot
# ---------------------------------------------------------------------------
# Helpers (module-level so threads can share without self)
# ---------------------------------------------------------------------------
def _safe_float(val: Any) -> Optional[float]:
"""Convert a TB telemetry value string/number to float, or None on failure."""
if val is None:
return None
try:
f = float(val)
return None if math.isnan(f) or math.isinf(f) else f
except (TypeError, ValueError):
return None
def _safe_avg(*vals: Any) -> Optional[float]:
"""Return the mean of non-None, finite values, or None if none available."""
valid = [v for v in vals if v is not None and isinstance(v, (int, float))
and not math.isnan(v) and not math.isinf(v)]
return sum(valid) / len(valid) if valid else None
def _bounded_avg(lo: float, hi: float, *vals: Any) -> Optional[float]:
"""Return the mean of values within [lo, hi], rejecting sensor faults outside that range."""
valid = [v for v in vals if v is not None and isinstance(v, (int, float))
and not math.isnan(v) and not math.isinf(v) and lo <= v <= hi]
return sum(valid) / len(valid) if valid else None
# Physical plausibility bounds for Negev site
_BOUNDS = {
"air_temp": (-5.0, 55.0), # °C — extreme Negev range
"leaf_temp": (-5.0, 60.0), # °C — leaves can exceed air under direct sun
"soil_temp": (-2.0, 45.0), # °C — soil in Negev
"soil_moisture": (0.0, 100.0), # %
"soil_ec": (0.0, 10.0), # dS/m — bulk + pore water
"par": (0.0, 3000.0), # µmol m⁻² s⁻¹
"vpd": (0.0, 10.0), # kPa
"co2": (300.0, 2000.0), # ppm
"ndvi": (-1.0, 1.0),
"pri": (-1.0, 1.0),
"psri": (-1.0, 1.0),
"sipi": (-1.0, 2.0), # SIPI typical 0..2 (no upper saturation)
"gci": (0.0, 30.0), # GCI ranges 0 .. ~25 for healthy canopy
"lci": (0.0, 1.0),
"duvi": (0.0, 30.0), # daily UV index
"leaf_wetness": (0.0, 1.0), # error flag, normalised
"dli": (0.0, 80.0), # mol m⁻² day⁻¹
"panel_temp": (-10.0, 100.0), # °C — panel surface
}
# ---------------------------------------------------------------------------
# CLI smoke test
# ---------------------------------------------------------------------------
if __name__ == "__main__":
client = ThingsBoardClient()
print("Fetching vine snapshot from ThingsBoard...")
try:
snap = client.get_vine_snapshot()
print(snap.to_advisor_text())
print(f"\nSnapshot age: {snap.staleness_minutes:.1f} min")
except Exception as exc:
print(f"Error: {exc}")
print("Make sure THINGSBOARD_USERNAME/PASSWORD or THINGSBOARD_TOKEN are set in your .env")