Image Classification
Transformers
PyTorch
English
sybil
medical
cancer
ct-scan
risk-prediction
healthcare
vision
Instructions to use Lab-Rasool/sybil with libraries, inference providers, notebooks, and local apps. Follow these links to get started.
- Libraries
- Transformers
How to use Lab-Rasool/sybil with Transformers:
# Use a pipeline as a high-level helper from transformers import pipeline pipe = pipeline("image-classification", model="Lab-Rasool/sybil") pipe("https://huggingface.co/datasets/huggingface/documentation-images/resolve/main/hub/parrots.png")# Load model directly from transformers import AutoModel model = AutoModel.from_pretrained("Lab-Rasool/sybil", dtype="auto") - Notebooks
- Google Colab
- Kaggle
| """ | |
| Self-contained Hugging Face wrapper for Sybil lung cancer risk prediction model. | |
| This version works directly from HF without requiring external Sybil package. | |
| """ | |
| import os | |
| import json | |
| import sys | |
| import torch | |
| import numpy as np | |
| from typing import List, Dict, Optional | |
| from dataclasses import dataclass | |
| from transformers.modeling_outputs import BaseModelOutput | |
| from safetensors.torch import load_file | |
| # Add model path to sys.path for imports | |
| current_dir = os.path.dirname(os.path.abspath(__file__)) | |
| if current_dir not in sys.path: | |
| sys.path.insert(0, current_dir) | |
| try: | |
| from .configuration_sybil import SybilConfig | |
| from .modeling_sybil import SybilForRiskPrediction | |
| from .image_processing_sybil import SybilImageProcessor | |
| except ImportError: | |
| from configuration_sybil import SybilConfig | |
| from modeling_sybil import SybilForRiskPrediction | |
| from image_processing_sybil import SybilImageProcessor | |
| class SybilOutput(BaseModelOutput): | |
| """ | |
| Output class for Sybil model predictions. | |
| Args: | |
| risk_scores: Risk scores for each year (1-6 years by default) | |
| attentions: Optional attention maps if requested | |
| """ | |
| risk_scores: torch.FloatTensor = None | |
| attentions: Optional[Dict] = None | |
| class SybilHFWrapper: | |
| """ | |
| Hugging Face wrapper for Sybil ensemble model. | |
| Provides a simple interface for lung cancer risk prediction from CT scans. | |
| """ | |
| def __init__(self, config: SybilConfig = None, model_dir: str = None): | |
| """ | |
| Initialize the Sybil model ensemble. | |
| Args: | |
| config: Model configuration (will use default if not provided) | |
| model_dir: Directory containing model files (defaults to file location) | |
| """ | |
| self.config = config if config is not None else SybilConfig() | |
| self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu") | |
| # Get the directory where model files are located | |
| if model_dir is not None: | |
| self.model_dir = model_dir | |
| else: | |
| # Default to where this file is located | |
| self.model_dir = os.path.dirname(os.path.abspath(__file__)) | |
| # Initialize image processor | |
| self.image_processor = SybilImageProcessor() | |
| # Load calibrator | |
| self.calibrator = self._load_calibrator() | |
| # Load ensemble models | |
| self.models = self._load_ensemble_models() | |
| def _load_calibrator(self) -> Dict: | |
| """Load ensemble calibrator data""" | |
| calibrator_path = os.path.join(self.model_dir, "checkpoints", "sybil_ensemble_simple_calibrator.json") | |
| if os.path.exists(calibrator_path): | |
| with open(calibrator_path, 'r') as f: | |
| return json.load(f) | |
| else: | |
| # Try alternative location | |
| calibrator_path = os.path.join(self.model_dir, "calibrator_data.json") | |
| if os.path.exists(calibrator_path): | |
| with open(calibrator_path, 'r') as f: | |
| return json.load(f) | |
| return {} | |
| def _load_ensemble_models(self) -> List[torch.nn.Module]: | |
| """ | |
| Load all models in the ensemble from original checkpoints. | |
| Note: We load from .ckpt files instead of safetensors because the safetensors | |
| were created with the wrong CumulativeProbabilityLayer architecture. | |
| """ | |
| import glob as glob_module | |
| models = [] | |
| # Find all .ckpt files in checkpoints directory | |
| checkpoints_dir = os.path.join(self.model_dir, "checkpoints") | |
| checkpoint_files = sorted(glob_module.glob(os.path.join(checkpoints_dir, "*.ckpt"))) | |
| print(f"Found {len(checkpoint_files)} checkpoint files") | |
| # Load each checkpoint file | |
| for checkpoint_path in checkpoint_files: | |
| try: | |
| model = SybilForRiskPrediction(self.config) | |
| checkpoint = torch.load(checkpoint_path, map_location='cpu', weights_only=False) | |
| # Extract state dict | |
| if 'state_dict' in checkpoint: | |
| state_dict = checkpoint['state_dict'] | |
| else: | |
| state_dict = checkpoint | |
| # Remove 'model.' prefix if present | |
| cleaned_state_dict = {} | |
| for k, v in state_dict.items(): | |
| if k.startswith('model.'): | |
| cleaned_state_dict[k[6:]] = v | |
| else: | |
| cleaned_state_dict[k] = v | |
| # Load weights | |
| model.load_state_dict(cleaned_state_dict, strict=False) | |
| model.to(self.device) | |
| model.eval() | |
| models.append(model) | |
| print(f" Loaded model from {os.path.basename(checkpoint_path)}") | |
| except Exception as e: | |
| print(f" Warning: Could not load {os.path.basename(checkpoint_path)}: {e}") | |
| continue | |
| if not models: | |
| raise ValueError("No models could be loaded from the ensemble. Please ensure model files are present.") | |
| print(f"Loaded {len(models)} models in ensemble") | |
| return models | |
| def _apply_calibration(self, scores: np.ndarray) -> np.ndarray: | |
| """ | |
| Apply complete isotonic regression calibration matching the original Sybil implementation. | |
| This method applies the same calibration as the original SimpleClassifierGroup.predict_proba: | |
| 1. For each year, apply each calibrator in the ensemble | |
| 2. Each calibrator applies: linear transform -> clip -> isotonic regression (np.interp) | |
| 3. Average predictions from all calibrators | |
| Args: | |
| scores: Raw risk scores from the model (shape: [batch_size, num_years]) | |
| Returns: | |
| Calibrated risk scores (shape: [batch_size, num_years]) | |
| """ | |
| if not self.calibrator: | |
| return scores | |
| calibrated_scores = [] | |
| for year in range(scores.shape[1]): | |
| year_key = f"Year{year + 1}" | |
| if year_key not in self.calibrator: | |
| # No calibrator for this year, use raw scores | |
| calibrated_scores.append(scores[:, year]) | |
| continue | |
| cal_list = self.calibrator[year_key] | |
| if not isinstance(cal_list, list) or len(cal_list) == 0: | |
| # Invalid calibrator format, use raw scores | |
| calibrated_scores.append(scores[:, year]) | |
| continue | |
| # Apply each calibrator and collect predictions | |
| year_predictions = [] | |
| for cal_data in cal_list: | |
| if not isinstance(cal_data, dict): | |
| continue | |
| # Extract calibration parameters | |
| if "coef" not in cal_data or "intercept" not in cal_data: | |
| continue | |
| coef = np.array(cal_data["coef"]) # Shape: [[scalar]] | |
| intercept = np.array(cal_data["intercept"]) # Shape: [scalar] | |
| # Extract isotonic regression points | |
| if "x0" not in cal_data or "y0" not in cal_data: | |
| continue | |
| x0 = np.array(cal_data["x0"]) | |
| y0 = np.array(cal_data["y0"]) | |
| # Extract clipping bounds | |
| x_min = cal_data.get("x_min", -np.inf) | |
| x_max = cal_data.get("x_max", np.inf) | |
| # Apply complete calibration pipeline: | |
| # Step 1: Linear transformation | |
| probs = scores[:, year].reshape(-1, 1) # Shape: [batch_size, 1] | |
| T = probs @ coef + intercept # Matrix multiplication | |
| T = T.flatten() # Shape: [batch_size] | |
| # Step 2: Clip to valid range | |
| T = np.clip(T, x_min, x_max) | |
| # Step 3: Apply isotonic regression via interpolation | |
| # This is the CRITICAL step that was missing! | |
| calibrated = np.interp(T, x0, y0) | |
| year_predictions.append(calibrated) | |
| if len(year_predictions) == 0: | |
| # No valid calibrators, use raw scores | |
| calibrated_scores.append(scores[:, year]) | |
| else: | |
| # Average predictions from all calibrators (like SimpleClassifierGroup) | |
| calibrated_scores.append(np.mean(year_predictions, axis=0)) | |
| return np.stack(calibrated_scores, axis=1) | |
| def preprocess_dicom(self, dicom_paths: List[str]) -> torch.Tensor: | |
| """ | |
| Preprocess DICOM files for model input. | |
| Args: | |
| dicom_paths: List of paths to DICOM files | |
| Returns: | |
| Preprocessed tensor ready for model input | |
| """ | |
| # Use the image processor to handle DICOM files | |
| result = self.image_processor(dicom_paths, file_type="dicom", return_tensors="pt") | |
| pixel_values = result["pixel_values"] | |
| # Ensure we have 5D tensor (B, C, D, H, W) | |
| if pixel_values.ndim == 4: | |
| pixel_values = pixel_values.unsqueeze(0) # Add batch dimension | |
| return pixel_values.to(self.device) | |
| def predict(self, dicom_paths: List[str], return_attentions: bool = False) -> SybilOutput: | |
| """ | |
| Run prediction on a CT scan series. | |
| Args: | |
| dicom_paths: List of paths to DICOM files for a single CT series | |
| return_attentions: Whether to return attention maps | |
| Returns: | |
| SybilOutput with risk scores and optional attention maps | |
| """ | |
| # Preprocess the DICOM files | |
| pixel_values = self.preprocess_dicom(dicom_paths) | |
| # Run inference with ensemble | |
| all_predictions = [] | |
| all_attentions = [] | |
| with torch.no_grad(): | |
| for model in self.models: | |
| output = model( | |
| pixel_values=pixel_values, | |
| return_attentions=return_attentions | |
| ) | |
| # Extract risk scores | |
| if hasattr(output, 'risk_scores'): | |
| predictions = output.risk_scores | |
| else: | |
| predictions = output[0] if isinstance(output, tuple) else output | |
| all_predictions.append(predictions.cpu().numpy()) | |
| if return_attentions and hasattr(output, 'image_attention'): | |
| all_attentions.append(output.image_attention) | |
| # Average ensemble predictions | |
| ensemble_pred = np.mean(all_predictions, axis=0) | |
| # Apply calibration | |
| calibrated_pred = self._apply_calibration(ensemble_pred) | |
| # Convert back to torch tensor | |
| risk_scores = torch.from_numpy(calibrated_pred).float() | |
| # Average attentions if requested | |
| attentions = None | |
| if return_attentions and all_attentions: | |
| attentions = {"image_attention": torch.stack(all_attentions).mean(dim=0)} | |
| return SybilOutput(risk_scores=risk_scores, attentions=attentions) | |
| def __call__(self, dicom_paths: List[str] = None, dicom_series: List[List[str]] = None, **kwargs) -> SybilOutput: | |
| """ | |
| Convenience method for prediction. | |
| Args: | |
| dicom_paths: List of DICOM file paths for a single series | |
| dicom_series: List of lists of DICOM paths for batch processing | |
| **kwargs: Additional arguments passed to predict() | |
| Returns: | |
| SybilOutput with predictions | |
| """ | |
| if dicom_series is not None: | |
| # Batch processing | |
| all_outputs = [] | |
| for paths in dicom_series: | |
| output = self.predict(paths, **kwargs) | |
| all_outputs.append(output.risk_scores) | |
| risk_scores = torch.stack(all_outputs) | |
| return SybilOutput(risk_scores=risk_scores) | |
| elif dicom_paths is not None: | |
| return self.predict(dicom_paths, **kwargs) | |
| else: | |
| raise ValueError("Either dicom_paths or dicom_series must be provided") | |
| def from_pretrained(cls, pretrained_model_name_or_path: str, **kwargs): | |
| """ | |
| Load model from Hugging Face hub or local path. | |
| Args: | |
| pretrained_model_name_or_path: HF model ID or local path | |
| **kwargs: Additional configuration arguments | |
| Returns: | |
| SybilHFWrapper instance | |
| """ | |
| # Load configuration | |
| config = kwargs.pop("config", None) | |
| if config is None: | |
| try: | |
| config = SybilConfig.from_pretrained(pretrained_model_name_or_path) | |
| except: | |
| config = SybilConfig() | |
| return cls(config=config) |