Source code for easydel.inference.esurge.scheduler.request_queue

# Copyright 2025 The EasyDeL Author @erfanzar (Erfan Zare Chavoshi).
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Request queue implementations for scheduling.

Provides different queueing strategies for managing inference requests,
including FCFS (first-come-first-served) and priority-based scheduling.

Classes:
    RequestQueue: Abstract base class for request queues
    FCFSRequestQueue: First-come-first-served queue
    PriorityRequestQueue: Priority-based request queue
    SchedulingPolicy: Enum of scheduling policies

Example:
    >>> queue = FCFSRequestQueue()
    >>> queue.add_request(request1)
    >>> queue.add_request(request2)
    >>> next_request = queue.pop_request()

    >>> priority_queue = PriorityRequestQueue()
    >>> priority_queue.add_request(high_priority_request)
    >>> priority_queue.add_request(low_priority_request)
    >>> next_request = priority_queue.pop_request()  # Gets high priority first
"""

from __future__ import annotations

import heapq
from abc import ABC, abstractmethod
from collections import deque
from collections.abc import Iterable, Iterator
from enum import Enum

from ..request import EngineRequest


[docs]class SchedulingPolicy(Enum): """Enum for scheduling policies. Attributes: FCFS: First-come-first-served scheduling. PRIORITY: Priority-based scheduling. """ FCFS = "fcfs" PRIORITY = "priority"
[docs]class RequestQueue(ABC): """Abstract base class for request queues. Defines the interface for different request queueing strategies. Implementations must provide methods for adding, removing, and inspecting requests in the queue. """
[docs] @abstractmethod def add_request(self, request: EngineRequest) -> None: """Add a request to the queue according to the policy. Args: request: The engine request to add. """ pass
[docs] @abstractmethod def pop_request(self) -> EngineRequest: """Pop a request from the queue according to the policy.""" pass
[docs] @abstractmethod def peek_request(self) -> EngineRequest: """Peek at the request at the front of the queue without removing it.""" pass
[docs] @abstractmethod def prepend_request(self, request: EngineRequest) -> None: """Prepend a request to the front of the queue.""" pass
[docs] @abstractmethod def prepend_requests(self, requests: RequestQueue) -> None: """Prepend all requests from another queue to the front of this queue.""" pass
[docs] @abstractmethod def remove_request(self, request: EngineRequest) -> None: """Remove a specific request from the queue.""" pass
[docs] @abstractmethod def remove_requests(self, requests: Iterable[EngineRequest]) -> None: """Remove multiple specific requests from the queue.""" pass
@abstractmethod def __bool__(self) -> bool: """Check if queue has any requests.""" pass @abstractmethod def __len__(self) -> int: """Get number of requests in queue.""" pass @abstractmethod def __iter__(self) -> Iterator[EngineRequest]: """Iterate over the queue according to the policy.""" pass @abstractmethod def __reversed__(self) -> Iterator[EngineRequest]: """Iterate over the queue in reverse order.""" pass
[docs]class FCFSRequestQueue(deque[EngineRequest], RequestQueue): """A first-come-first-served queue that supports deque operations."""
[docs] def add_request(self, request: EngineRequest) -> None: """Add a request to the queue according to FCFS policy.""" self.append(request)
[docs] def pop_request(self) -> EngineRequest: """Pop a request from the queue according to FCFS policy.""" return self.popleft()
[docs] def peek_request(self) -> EngineRequest: """Peek at the next request in the queue without removing it.""" if not self: raise IndexError("peek from an empty queue") return self[0]
[docs] def prepend_request(self, request: EngineRequest) -> None: """Prepend a request to the front of the queue.""" self.appendleft(request)
[docs] def prepend_requests(self, requests: RequestQueue) -> None: """Prepend all requests from another queue to the front of this queue.""" self.extendleft(reversed(requests))
[docs] def remove_request(self, request: EngineRequest) -> None: """Remove a specific request from the queue.""" self.remove(request)
[docs] def remove_requests(self, requests: Iterable[EngineRequest]) -> None: """Remove multiple specific requests from the queue.""" requests_to_remove = set(requests) filtered_requests = [req for req in self if req not in requests_to_remove] # deque does not support in-place filtering, so we need to clear # and extend self.clear() self.extend(filtered_requests)
def __bool__(self) -> bool: """Check if queue has any requests.""" return len(self) > 0 def __len__(self) -> int: """Get number of requests in queue.""" return super().__len__() def __iter__(self) -> Iterator[EngineRequest]: """Iterate over the queue according to FCFS policy.""" return super().__iter__() def __reversed__(self) -> Iterator[EngineRequest]: """Iterate over the queue in reverse order.""" return super().__reversed__()
[docs]class PriorityRequestQueue(RequestQueue): """ A priority queue that supports heap operations. Requests with a smaller value of `priority` are processed first. If multiple requests have the same priority, the one with the earlier `arrival_time` is processed first. """ def __init__(self) -> None: self._heap: list[tuple[int, float, EngineRequest]] = []
[docs] def add_request(self, request: EngineRequest) -> None: """Add a request to the queue according to priority policy.""" heapq.heappush(self._heap, (request.priority, request.arrival_time, request))
[docs] def pop_request(self) -> EngineRequest: """Pop a request from the queue according to priority policy.""" if not self._heap: raise IndexError("pop from empty heap") _, _, request = heapq.heappop(self._heap) return request
[docs] def peek_request(self) -> EngineRequest: """Peek at the next request in the queue without removing it.""" if not self._heap: raise IndexError("peek from empty heap") _, _, request = self._heap[0] return request
[docs] def prepend_request(self, request: EngineRequest) -> None: """Add a request to the queue according to priority policy. Note: In a priority queue, there is no concept of prepending to the front. Requests are ordered by (priority, arrival_time).""" self.add_request(request)
[docs] def prepend_requests(self, requests: RequestQueue) -> None: """Add all requests from another queue according to priority policy. Note: In a priority queue, there is no concept of prepending to the front. Requests are ordered by (priority, arrival_time).""" for request in requests: self.add_request(request)
[docs] def remove_request(self, request: EngineRequest) -> None: """Remove a specific request from the queue.""" self._heap = [(p, t, r) for p, t, r in self._heap if r != request] heapq.heapify(self._heap)
[docs] def remove_requests(self, requests: Iterable[EngineRequest]) -> None: """Remove multiple specific requests from the queue.""" requests_to_remove = set(requests) self._heap = [(p, t, r) for p, t, r in self._heap if r not in requests_to_remove] heapq.heapify(self._heap)
def __bool__(self) -> bool: """Check if queue has any requests.""" return bool(self._heap) def __len__(self) -> int: """Get number of requests in queue.""" return len(self._heap) def __iter__(self) -> Iterator[EngineRequest]: """Iterate over the queue according to priority policy.""" heap_copy = self._heap[:] while heap_copy: _, _, request = heapq.heappop(heap_copy) yield request def __reversed__(self) -> Iterator[EngineRequest]: """Iterate over the queue in reverse priority order.""" return reversed(list(self))
[docs]def create_request_queue(policy: SchedulingPolicy) -> RequestQueue: """Create request queue based on scheduling policy.""" if policy == SchedulingPolicy.PRIORITY: return PriorityRequestQueue() elif policy == SchedulingPolicy.FCFS: return FCFSRequestQueue() else: raise ValueError(f"Unknown scheduling policy: {policy}")