# Copyright 2025 The EasyDeL Author @erfanzar (Erfan Zare Chavoshi).
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""eSurge Monitoring and Observability System."""
from __future__ import annotations
import logging
import threading
import time
try:
from prometheus_client import Counter, Gauge, Histogram, Info, start_http_server
PROMETHEUS_AVAILABLE = True
except ImportError:
PROMETHEUS_AVAILABLE = False
try:
from rich.console import Console
from rich.layout import Layout
from rich.live import Live
from rich.panel import Panel
from rich.table import Table
from rich.text import Text
RICH_AVAILABLE = True
except ImportError:
RICH_AVAILABLE = False
from .metrics import MetricsCollector, get_metrics_collector
[docs]class PrometheusMetrics:
"""Prometheus metrics exporter for eSurge."""
def __init__(self, prefix: str = "esurge_"):
"""Initialize Prometheus metrics.
Args:
prefix: Prefix for all metric names
"""
if not PROMETHEUS_AVAILABLE:
raise ImportError("prometheus_client not available. Install with: pip install prometheus-client")
self.prefix = prefix
# Request metrics
self.requests_total = Counter(f"{prefix}requests_total", "Total number of requests", ["status"])
self.request_duration = Histogram(
f"{prefix}request_duration_seconds",
"Request duration in seconds",
buckets=[0.01, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0],
)
self.time_to_first_token = Histogram(
f"{prefix}time_to_first_token_seconds",
"Time to first token in seconds",
buckets=[0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0],
)
self.tokens_generated_total = Counter(f"{prefix}tokens_generated_total", "Total number of tokens generated")
self.tokens_per_second = Gauge(f"{prefix}tokens_per_second", "Current tokens per second throughput")
# Scheduler metrics
self.waiting_requests = Gauge(f"{prefix}waiting_requests", "Number of requests waiting to be scheduled")
self.running_requests = Gauge(f"{prefix}running_requests", "Number of currently running requests")
self.scheduled_tokens = Gauge(f"{prefix}scheduled_tokens", "Number of tokens scheduled in current batch")
self.preempted_requests_total = Counter(
f"{prefix}preempted_requests_total", "Total number of preempted requests"
)
self.schedule_duration = Histogram(
f"{prefix}schedule_duration_seconds",
"Scheduler operation duration in seconds",
buckets=[0.0001, 0.0005, 0.001, 0.005, 0.01, 0.05, 0.1],
)
# Model runner metrics
self.model_execution_duration = Histogram(
f"{prefix}model_execution_duration_seconds",
"Model execution duration in seconds",
buckets=[0.001, 0.005, 0.01, 0.05, 0.1, 0.5, 1.0],
)
self.batch_size = Gauge(f"{prefix}batch_size", "Current batch size")
# Cache metrics
self.cache_pages_total = Gauge(f"{prefix}cache_pages_total", "Total number of cache pages")
self.cache_pages_used = Gauge(f"{prefix}cache_pages_used", "Number of used cache pages")
self.cache_hit_rate = Gauge(f"{prefix}cache_hit_rate", "Cache hit rate (0-1)")
# System info
self.system_info = Info(f"{prefix}system_info", "System information")
[docs] def update_from_metrics_collector(self, collector: MetricsCollector) -> None:
"""Update Prometheus metrics from the metrics collector."""
system_metrics = collector.get_system_metrics()
# Update system-level metrics
self.tokens_per_second.set(system_metrics.average_throughput)
# Update from recent completed requests
with collector._lock:
# Update request metrics from recent completions
recent_requests = list(collector.completed_requests)[-10:] # Last 10 requests
for req_metrics in recent_requests:
if req_metrics.error:
self.requests_total.labels(status="failed").inc()
else:
self.requests_total.labels(status="completed").inc()
if req_metrics.total_latency:
self.request_duration.observe(req_metrics.total_latency)
if req_metrics.time_to_first_token:
self.time_to_first_token.observe(req_metrics.time_to_first_token)
self.tokens_generated_total.inc(req_metrics.generated_tokens)
# Update scheduler metrics
if collector.scheduler_metrics:
latest_scheduler = collector.scheduler_metrics[-1]
self.waiting_requests.set(latest_scheduler.num_waiting_requests)
self.running_requests.set(latest_scheduler.num_running_requests)
self.scheduled_tokens.set(latest_scheduler.num_scheduled_tokens)
self.schedule_duration.observe(latest_scheduler.schedule_time)
# Update runner metrics
if collector.runner_metrics:
latest_runner = collector.runner_metrics[-1]
self.model_execution_duration.observe(latest_runner.execution_time)
self.batch_size.set(latest_runner.batch_size)
# Update cache metrics
if collector.cache_metrics:
latest_cache = collector.cache_metrics[-1]
self.cache_pages_total.set(latest_cache.total_pages)
self.cache_pages_used.set(latest_cache.used_pages)
self.cache_hit_rate.set(latest_cache.cache_hit_rate)
[docs]class RichConsoleMonitor:
"""Rich console-based live monitoring for eSurge."""
def __init__(self, refresh_rate: float = 1.0):
"""Initialize console monitor.
Args:
refresh_rate: Update rate in seconds
"""
if not RICH_AVAILABLE:
raise ImportError("rich not available. Install with: pip install rich")
self.console = Console()
self.refresh_rate = refresh_rate
self.running = False
self._thread: threading.Thread | None = None
# Layout for the console display
self.layout = Layout()
self.layout.split_column(Layout(name="header", size=3), Layout(name="main"), Layout(name="footer", size=3))
self.layout["main"].split_row(Layout(name="left"), Layout(name="right"))
def _create_system_metrics_table(self, collector: MetricsCollector) -> Table:
"""Create system metrics table."""
table = Table(title="System Metrics", show_header=True)
table.add_column("Metric", style="cyan")
table.add_column("Value", style="green")
system_metrics = collector.get_system_metrics()
table.add_row("Requests/sec", f"{system_metrics.requests_per_second:.2f}")
table.add_row("Avg Latency", f"{system_metrics.average_latency:.3f}s")
table.add_row("Avg TTFT", f"{system_metrics.average_ttft:.3f}s")
table.add_row("Avg Throughput", f"{system_metrics.average_throughput:.1f} tok/s")
table.add_row("Total Completed", str(system_metrics.total_requests_completed))
table.add_row("Total Failed", str(system_metrics.total_requests_failed))
table.add_row("Total Tokens", str(system_metrics.total_tokens_generated))
return table
def _create_scheduler_metrics_table(self, collector: MetricsCollector) -> Table:
"""Create scheduler metrics table."""
table = Table(title="Scheduler Status", show_header=True)
table.add_column("Metric", style="cyan")
table.add_column("Value", style="yellow")
with collector._lock:
if collector.scheduler_metrics:
latest = collector.scheduler_metrics[-1]
table.add_row("Waiting Requests", str(latest.num_waiting_requests))
table.add_row("Running Requests", str(latest.num_running_requests))
table.add_row("Scheduled Tokens", str(latest.num_scheduled_tokens))
table.add_row("Batch Size", str(latest.batch_size))
table.add_row("Schedule Time", f"{latest.schedule_time * 1000:.2f}ms")
else:
table.add_row("No Data", "Available")
return table
def _create_runner_metrics_table(self, collector: MetricsCollector) -> Table:
"""Create runner metrics table."""
table = Table(title="Model Runner", show_header=True)
table.add_column("Metric", style="cyan")
table.add_column("Value", style="magenta")
with collector._lock:
if collector.runner_metrics:
latest = collector.runner_metrics[-1]
table.add_row("Execution Time", f"{latest.execution_time * 1000:.2f}ms")
table.add_row("Batch Size", str(latest.batch_size))
table.add_row("Tokens", str(latest.num_tokens))
table.add_row("Tokens/sec", f"{latest.tokens_per_second:.1f}")
else:
table.add_row("No Data", "Available")
return table
def _create_cache_metrics_table(self, collector: MetricsCollector) -> Table:
"""Create cache metrics table."""
table = Table(title="Cache Status", show_header=True)
table.add_column("Metric", style="cyan")
table.add_column("Value", style="blue")
with collector._lock:
if collector.cache_metrics:
latest = collector.cache_metrics[-1]
utilization = (latest.used_pages / latest.total_pages * 100) if latest.total_pages > 0 else 0
table.add_row("Total Pages", str(latest.total_pages))
table.add_row("Used Pages", str(latest.used_pages))
table.add_row("Free Pages", str(latest.free_pages))
table.add_row("Utilization", f"{utilization:.1f}%")
table.add_row("Hit Rate", f"{latest.cache_hit_rate:.2%}")
else:
table.add_row("No Data", "Available")
return table
def _create_recent_requests_table(self, collector: MetricsCollector) -> Table:
"""Create recent requests table."""
table = Table(title="Recent Requests", show_header=True)
table.add_column("Request ID", style="dim")
table.add_column("Status", style="bold")
table.add_column("Latency", style="green")
table.add_column("TTFT", style="yellow")
table.add_column("Tokens", style="blue")
with collector._lock:
recent = list(collector.completed_requests)[-5:] # Last 5 requests
for req in recent:
status = "✓" if not req.error else "✗"
status_style = "green" if not req.error else "red"
table.add_row(
req.request_id[:8] + "...",
Text(status, style=status_style),
f"{req.total_latency:.3f}s" if req.total_latency else "N/A",
f"{req.time_to_first_token:.3f}s" if req.time_to_first_token else "N/A",
str(req.generated_tokens),
)
return table
def _update_layout(self) -> None:
"""Update the layout with current metrics."""
collector = get_metrics_collector()
if not collector:
self.layout["header"].update(Panel("❌ No metrics collector initialized", style="red"))
return
# Header
timestamp = time.strftime("%Y-%m-%d %H:%M:%S")
self.layout["header"].update(Panel(f"🚀 eSurge Live Monitor - {timestamp}", style="bold blue"))
# Left column
left_layout = Layout()
left_layout.split_column(
Layout(self._create_system_metrics_table(collector)), Layout(self._create_recent_requests_table(collector))
)
self.layout["left"].update(left_layout)
# Right column
right_layout = Layout()
right_layout.split_column(
Layout(self._create_scheduler_metrics_table(collector)),
Layout(self._create_runner_metrics_table(collector)),
Layout(self._create_cache_metrics_table(collector)),
)
self.layout["right"].update(right_layout)
# Footer
self.layout["footer"].update(Panel("Press Ctrl+C to stop monitoring", style="dim"))
[docs] def start(self) -> None:
"""Start the live console monitor."""
if self.running:
return
self.running = True
def _monitor_loop():
with Live(self.layout, console=self.console, refresh_per_second=1 / self.refresh_rate):
while self.running:
try:
self._update_layout()
time.sleep(self.refresh_rate)
except KeyboardInterrupt:
break
self._thread = threading.Thread(target=_monitor_loop, daemon=True)
self._thread.start()
try:
self._thread.join()
except KeyboardInterrupt:
self.stop()
[docs] def stop(self) -> None:
"""Stop the console monitor."""
self.running = False
if self._thread:
self._thread.join(timeout=1.0)
[docs]class eSurgeMonitoringServer:
"""Monitoring server for Prometheus export and console monitoring."""
def __init__(
self,
prometheus_port: int = 8000,
dashboard_port: int | None = None,
metrics_prefix: str = "esurge_",
update_interval: float = 1.0,
):
"""Initialize monitoring server.
Args:
prometheus_port: Port for Prometheus metrics endpoint.
dashboard_port: Deprecated; retained for compatibility (ignored).
metrics_prefix: Prefix for Prometheus metrics.
update_interval: Update interval in seconds.
"""
self.prometheus_port = prometheus_port
self.dashboard_port = dashboard_port
self.update_interval = update_interval
# Initialize Prometheus metrics if available
self.prometheus_metrics = None
if PROMETHEUS_AVAILABLE:
self.prometheus_metrics = PrometheusMetrics(metrics_prefix)
self.running = False
self._update_thread: threading.Thread | None = None
def _update_metrics_loop(self) -> None:
"""Background thread to update Prometheus metrics."""
while self.running:
try:
collector = get_metrics_collector()
if collector and self.prometheus_metrics:
self.prometheus_metrics.update_from_metrics_collector(collector)
time.sleep(self.update_interval)
except Exception as e:
logging.error(f"Error updating metrics: {e}")
time.sleep(self.update_interval)
[docs] def start_prometheus_server(self) -> None:
"""Start Prometheus metrics server."""
if not PROMETHEUS_AVAILABLE:
logging.warning("Prometheus client not available, skipping Prometheus server")
return
start_http_server(self.prometheus_port)
logging.info(f"Prometheus metrics server started on port {self.prometheus_port}")
logging.info(f"Metrics available at: http://localhost:{self.prometheus_port}/metrics")
[docs] def start(self) -> None:
"""Start the monitoring server."""
if self.running:
return
self.running = True
# Start Prometheus server
self.start_prometheus_server()
# Start metrics update thread
self._update_thread = threading.Thread(target=self._update_metrics_loop, daemon=True)
self._update_thread.start()
logging.info("eSurge monitoring server started")
[docs] def stop(self) -> None:
"""Stop the monitoring server."""
self.running = False
if self._update_thread:
self._update_thread.join(timeout=1.0)
logging.info("eSurge monitoring server stopped")
# Global monitoring instances
_monitoring_server: eSurgeMonitoringServer | None = None
_console_monitor: RichConsoleMonitor | None = None
[docs]def start_monitoring_server(
prometheus_port: int = 8000,
dashboard_port: int | None = None,
update_interval: float = 1.0,
) -> eSurgeMonitoringServer:
"""Start the global monitoring server."""
global _monitoring_server
if _monitoring_server is None:
_monitoring_server = eSurgeMonitoringServer(
prometheus_port=prometheus_port,
dashboard_port=dashboard_port,
update_interval=update_interval,
)
_monitoring_server.start()
return _monitoring_server
[docs]def start_console_monitor(refresh_rate: float = 1.0) -> RichConsoleMonitor:
"""Start the global console monitor."""
global _console_monitor
if _console_monitor is None:
_console_monitor = RichConsoleMonitor(refresh_rate=refresh_rate)
_console_monitor.start()
return _console_monitor
[docs]def stop_monitoring() -> None:
"""Stop all monitoring services."""
global _monitoring_server, _console_monitor
if _monitoring_server:
_monitoring_server.stop()
_monitoring_server = None
if _console_monitor:
_console_monitor.stop()
_console_monitor = None