easydel.inference.vsurge.utils

easydel.inference.vsurge.utils#

class easydel.inference.vsurge.utils.ActiveRequest(max_tokens: int, return_channel: ~easydel.inference.vsurge.utils.AsyncMultifuture[list[easydel.inference.vsurge.utils.ReturnSample]], top_p: float = 1.0, top_k: int = 0, min_p: float = 0.0, temperature: float = 0.7, presence_penalty: float = 0.0, frequency_penalty: float = 0.0, repetition_penalty: float = 1.0, complete: ~typing.Optional[~numpy.ndarray] = None, prefill_result: ~typing.Any = None, prefill_content: ~typing.Optional[~typing.Union[str, list[int]]] = None, generate_timestep_added: ~typing.Optional[int] = None, is_client_side_tokenization: ~typing.Optional[bool] = False, decode_start_time: float | None = None, total_generated_tokens: int = 0, metadata: ~easydel.inference.vsurge.utils.ActiveRequestMetadata = <factory>, id: str = <factory>)[source]#

Bases: object

Current state of the driver.

complete: Optional[ndarray] = None#
decode_start_time: float | None = None#
enqueue_samples(generated_samples: list[easydel.inference.vsurge.utils.ReturnSample])[source]#

Adds the generated sample(s) to return channel for current step.

Parameters

generated_samples โ€“ The generated sample(s) for current step.

This should be called only from within the Drivers background thread.

frequency_penalty: float = 0.0#
generate_timestep_added: Optional[int] = None#
id: str#
is_client_side_tokenization: Optional[bool] = False#
max_tokens: int#
metadata: ActiveRequestMetadata#
min_p: float = 0.0#
prefill_content: Optional[Union[str, list[int]]] = None#
prefill_result: Any = None#
presence_penalty: float = 0.0#
repetition_penalty: float = 1.0#
return_channel: AsyncMultifuture[list[easydel.inference.vsurge.utils.ReturnSample]]#
temperature: float = 0.7#
top_k: int = 0#
top_p: float = 1.0#
total_generated_tokens: int = 0#
class easydel.inference.vsurge.utils.ActiveRequestMetadata(start_time: Optional[float] = None, prefill_enqueue_time: Optional[float] = None, prefill_dequeue_time: Optional[float] = None, transfer_enqueue_time: Optional[float] = None, transfer_dequeue_time: Optional[float] = None, generate_enqueue_time: Optional[float] = None, generate_dequeue_time: Optional[float] = None, complete_time: Optional[float] = None)[source]#

Bases: object

Inference request metadata.

complete_time: Optional[float] = None#
generate_dequeue_time: Optional[float] = None#
generate_enqueue_time: Optional[float] = None#
prefill_dequeue_time: Optional[float] = None#
prefill_enqueue_time: Optional[float] = None#
start_time: Optional[float] = None#
transfer_dequeue_time: Optional[float] = None#
transfer_enqueue_time: Optional[float] = None#
class easydel.inference.vsurge.utils.AsyncMultifuture[source]#

Bases: Generic[V]

AsyncMultifuture is like concurrent.futures.Future but supports returning

multiple results. It provides an unidirectional stream with buffering and exception propagation.

Supports delivering results to an async Python event loop. Must be constructed inside of the event loop.

add_result(result: V) None[source]#

Adds the result to the asyncmultifuture.

Caller must call .close() once all results are added.

Parameters

result โ€“ The result to add.

cancel(unused: Any = None) None[source]#

Cancels the asyncmultifuture.

cancelled() bool[source]#

Returns whether the asyncmultifuture has been cancelled.

close() None[source]#

Notifies the receiver that no more results would be added.

done() bool[source]#

AsyncMultifuture is done when it is finalized with close() or

set_exception().

set_exception(exception: Exception) None[source]#

Stores the given exception in the asyncmultifuture.

The exception would be delivered after all previously added results are yielded. set_exception can be called multiple times, however subsequent calls will be ignored.

Parameters

exception โ€“ The exception to set.

class easydel.inference.vsurge.utils.ReturnSample(text: list[str] | str, token_ids: list[int], tokens_per_second: float | None = None, num_generated_tokens: int | None = None)[source]#

Bases: object

Represents a single generated sample with text, token IDs, and metrics.

This dataclass encapsulates the output for one sample (sequence) from a generation step, including the detokenized text, the raw token IDs, and performance metrics like tokens per second and the cumulative number of generated tokens.

text#

A list of string pieces detokenized from the token IDs. This can be a single string or a list of strings if dealing with byte tokens or streaming output.

Type

list[str] | str

token_ids#

A list of integer token IDs generated in this step.

Type

list[int]

tokens_per_second#

The cumulative tokens per second achieved for this sample up to the current generation step. Optional.

Type

float | None

num_generated_tokens#

The cumulative number of tokens generated for this sample since the start of the decode phase. Optional.

Type

int | None

num_generated_tokens: int | None = None#
text: list[str] | str#
token_ids: list[int]#
tokens_per_second: float | None = None#
class easydel.inference.vsurge.utils.SafeThread(group=None, target=None, name=None, args=(), kwargs=None, *, daemon=None)[source]#

Bases: Thread

Thread that kills the program if it fails.

If a driver thread goes down, we canโ€™t operate.

run()[source]#

Executes the threadโ€™s target function.

If the target function raises any exception, this method catches it, prints the traceback, and forcefully kills the entire process using os.kill with signal.SIGKILL. This ensures that if a critical driver thread fails, the whole system stops, preventing potential inconsistent states or hangs.

easydel.inference.vsurge.utils.is_byte_token(s: str) bool[source]#

Returns True if s is a byte string like โ€œ<0xAB>โ€.

These tokens represent raw bytes and are used in some tokenization schemes to handle multi-byte characters or special symbols.

Parameters

s โ€“ The input string to check.

Returns

True if the string matches the byte token format โ€œ<0xXX>โ€, False otherwise.

easydel.inference.vsurge.utils.pad_tokens(tokens: ndarray, valids: ndarray, pad_token_id: int, prefill_lengths: Optional[List[int]] = None, max_prefill_length: Optional[int] = None, jax_padding: bool = True, right_padding: bool = False, bos_token_id: int | None = None, is_bos: bool = True) Tuple[Union[Array, ndarray], Union[Array, ndarray], int][source]#

Pads token and validity arrays to a specified bucket length.

Takes 1D NumPy arrays of token IDs and validity masks, determines the nearest appropriate padding length from prefill_lengths (or capped by max_prefill_length), and pads or truncates the arrays to that length. Padding uses the pad_token_id for tokens and 0 for validity.

Note: The bos_token_id and is_bos arguments are included for potential future use or consistency with tokenize_and_pad, but they are not currently used within this functionโ€™s logic. BOS token handling should be done before calling this function if required.

Parameters
  • tokens โ€“ A 1D NumPy array of token IDs.

  • valids โ€“ A 1D NumPy array representing the attention mask (1 for valid, 0 for padding). Must be the same size as tokens.

  • pad_token_id โ€“ The token ID used for padding.

  • prefill_lengths โ€“ A list of integer bucket lengths to choose from. Defaults to DEFAULT_PREFILL_BUCKETS.

  • max_prefill_length โ€“ An optional maximum length. If provided, buckets larger than this are ignored, and this value is used as the maximum padding length.

  • jax_padding โ€“ If True, converts the padded NumPy arrays to JAX arrays before returning. Defaults to True.

  • bos_token_id โ€“ The beginning-of-sequence token ID (currently unused).

  • is_bos โ€“ Flag indicating if BOS token handling is expected (currently unused).

Returns

  • padded_tokens: The padded/truncated token ID array (JAX or NumPy).

  • padded_valids: The padded/truncated validity mask array (JAX or NumPy).

  • padded_length: The length to which the arrays were padded/truncated.

Return type

A tuple containing

easydel.inference.vsurge.utils.process_result_tokens(processor: Any, slot: int, slot_max_length: int, result_tokens: Any, complete: ndarray, eos_token_id: list[int], is_client_side_tokenization: bool = False) Tuple[List[ReturnSample], ndarray, list[int]][source]#

Processes the result tokens for a given slot, extracts text and token IDs, updates completion status, and counts valid tokens generated in this step.

Parameters
  • processor โ€“ The tokenizer/processor instance.

  • slot โ€“ The index of the inference slot being processed.

  • slot_max_length โ€“ The maximum allowed length for the sequence in this slot.

  • result_tokens โ€“ The ResultTokens object containing the generated tokens and metadata.

  • complete โ€“ A boolean NumPy array indicating the completion status of each sample in the batch.

  • is_client_side_tokenization โ€“ A boolean indicating if tokenization is handled client-side.

Returns

  • A list of ReturnSample objects (without TPS/count populated yet).

  • The updated completion status array.

  • A list containing the number of valid tokens generated in this step for each corresponding ReturnSample.

Return type

A tuple containing

easydel.inference.vsurge.utils.take_nearest_length(lengths: list[int], length: int) int[source]#

Gets the nearest length to the right in a set of lengths.

Uses binary search to find the smallest length in the lengths list that is greater than or equal to the input length.

Parameters
  • lengths โ€“ A sorted list of integer lengths (e.g., prefill buckets).

  • length โ€“ The target length to find the nearest value for.

Returns

The nearest length in lengths that is greater than or equal to length. If length is greater than all lengths in the list, returns the largest length.

easydel.inference.vsurge.utils.text_tokens_to_string(text_tokens: Iterable[str]) str[source]#

Converts an iterable of text tokens, including byte tokens, to a string.

This function handles tokens that represent raw bytes (e.g., โ€œ<0xAB>โ€) correctly by converting them to their byte values before decoding the entire sequence of bytes into a UTF-8 string. This is necessary for tokenizers that output byte tokens for special characters or multi-byte sequences.

Iterates through text tokens. If a token represents a byte (e.g., โ€œ<0xAB>โ€), itโ€™s converted to its byte value. Otherwise, the token is treated as a UTF-8 string and converted to bytes. All resulting bytes are joined and decoded back into a single UTF-8 string, replacing errors.

Parameters

text_tokens โ€“ An iterable (e.g., list) of string tokens, which may include byte tokens in the format โ€œ<0xXX>โ€.

Returns

The decoded string representation of the token sequence.

easydel.inference.vsurge.utils.tokenize_and_pad(string: str, processor: Any, is_bos: bool = True, prefill_lengths: Optional[List[int]] = None, max_prefill_length: Optional[int] = None, jax_padding: bool = True) Tuple[Union[Array, ndarray], Union[Array, ndarray], int][source]#

Tokenizes an input string and pads it to a suitable length.

Uses the provided processor to tokenize the input string, then pads the resulting token IDs and attention mask (valids) to the nearest length specified in prefill_lengths or up to max_prefill_length. Optionally prepends the BOS token.

Parameters
  • string โ€“ The input string to tokenize.

  • processor โ€“ The tokenizer/processor object.

  • is_bos โ€“ Whether to prepend the beginning-of-sequence (BOS) token. Defaults to True. (Note: BOS handling seems missing in the current pad_tokens implementation called internally).

  • prefill_lengths โ€“ A list of bucket lengths to pad to. If None, uses DEFAULT_PREFILL_BUCKETS.

  • max_prefill_length โ€“ The maximum allowed prefill length. Overrides buckets larger than this value.

  • jax_padding โ€“ If True, returns JAX arrays; otherwise, returns NumPy arrays. Defaults to True.

Returns

  • padded_tokens: The padded token ID array (JAX or NumPy).

  • padded_valids: The padded attention mask array (JAX or NumPy).

  • padded_length: The length to which the arrays were padded/truncated.

Return type

A tuple containing