Streaming Data#
Streaming enables training on datasets too large to fit in memory. EasyData supports streaming from local files, cloud storage, and HuggingFace Hub.
Why Streaming?#
Approach |
Memory Usage |
Startup Time |
Best For |
|---|---|---|---|
Full Load |
~Dataset Size |
Slow |
Small datasets (<10GB) |
Streaming |
~Batch Size |
Fast |
Large datasets (>10GB) |
HuggingFace Hub Streaming#
Basic Streaming#
from datasets import load_dataset
# Stream from HuggingFace Hub (never downloads full dataset)
dataset = load_dataset(
"HuggingFaceFW/fineweb",
split="train",
streaming=True, # Key: enables streaming
)
# Use with trainer
trainer = ed.SFTTrainer(
train_dataset=dataset,
...
)
With HuggingFaceShardedSource#
from easydel.data import HuggingFaceShardedSource
source = HuggingFaceShardedSource(
dataset_name="HuggingFaceFW/fineweb",
split="train",
streaming=True,
cache_dir="/data/hf_cache", # Optional local cache
)
# Iterate without loading to memory
for example in source.open_shard(source.shard_names[0]):
process(example)
Streaming with Subsets#
from easydel.data import HuggingFaceShardedSource
# The Stack with specific language
source = HuggingFaceShardedSource(
dataset_name="bigcode/the-stack",
split="train",
subset="python", # Only Python code
streaming=True,
)
# C4 with specific configuration
source = HuggingFaceShardedSource(
dataset_name="allenai/c4",
split="train",
subset="en", # English only
streaming=True,
)
Cloud Storage Streaming#
Google Cloud Storage (GCS)#
from easydel.data import ParquetShardedSource
# Stream from GCS
source = ParquetShardedSource(
"gs://my-bucket/training-data/*.parquet",
storage_options={"token": "cloud"}, # Uses default credentials
)
# With service account
source = ParquetShardedSource(
"gs://my-bucket/data/*.parquet",
storage_options={"token": "/path/to/service-account.json"},
)
# Column projection for efficiency
source = ParquetShardedSource(
"gs://my-bucket/data/*.parquet",
storage_options={"token": "cloud"},
columns=["input_ids", "attention_mask"], # Only load needed columns
)
Amazon S3#
from easydel.data import ParquetShardedSource
# Stream from S3
source = ParquetShardedSource(
"s3://my-bucket/data/*.parquet",
storage_options={
"key": "ACCESS_KEY_ID",
"secret": "SECRET_ACCESS_KEY",
"client_kwargs": {"region_name": "us-west-2"},
},
)
# Public bucket (anonymous access)
source = ParquetShardedSource(
"s3://public-bucket/data/*.parquet",
storage_options={"anon": True},
)
Azure Blob Storage#
from easydel.data import ParquetShardedSource
source = ParquetShardedSource(
"az://container/data/*.parquet",
storage_options={
"account_name": "myaccount",
"account_key": "mykey",
},
)
JSON/JSONL Streaming#
from easydel.data import JsonShardedSource
# Stream JSONL from GCS
source = JsonShardedSource(
"gs://bucket/data/*.jsonl",
storage_options={"token": "cloud"},
)
# Stream large JSON files
source = JsonShardedSource(
"gs://bucket/data/*.json",
jsonl=False, # JSON array format
storage_options={"token": "cloud"},
)
Automatic Retry#
All cloud sources have automatic retry for transient failures:
from easydel.data import ParquetShardedSource
# Built-in: 3 retries with exponential backoff
source = ParquetShardedSource("gs://bucket/data/*.parquet")
# Retry happens automatically on network errors
for shard in source.shard_names:
for example in source.open_shard(shard): # Retries on failure
process(example)
Prefetching#
Overlap data loading with training using async data loader:
from easydel.data import ParquetShardedSource, AsyncDataLoader
source = ParquetShardedSource("gs://bucket/data/*.parquet")
loader = AsyncDataLoader(
source=source,
batch_size=8,
prefetch_enabled=True,
prefetch_workers=2,
prefetch_buffer_size=4, # Batches to prefetch
)
# Training loop - data arrives while computing
for batch in loader:
train_step(batch) # Next batch loading in background
Shuffling Streamed Data#
Streaming prevents global shuffle, but buffer shuffle provides randomization:
from datasets import load_dataset
# HuggingFace streaming shuffle
dataset = load_dataset("dataset", streaming=True)
dataset = dataset.shuffle(buffer_size=10000, seed=42)
from easydel.data import AsyncDataLoader
# EasyData shuffle during loading
loader = AsyncDataLoader(
source=source,
shuffle_buffer_size=10000, # Examples to buffer for shuffle
seed=42,
)
Shuffle Buffer Considerations#
Buffer Size |
Memory |
Randomization Quality |
|---|---|---|
1000 |
~10MB |
Low (local patterns remain) |
10000 |
~100MB |
Medium (good for most cases) |
100000 |
~1GB |
High (nearly global shuffle) |
Resumable Streaming#
All sources support resuming from checkpoints:
from easydel.data import ParquetShardedSource
source = ParquetShardedSource("gs://bucket/data/*.parquet")
# Save checkpoint
checkpoint = {
"shard": current_shard,
"row": current_row,
}
# Resume from checkpoint
for example in source.open_shard_at_row(checkpoint["shard"], checkpoint["row"]):
process(example)
# Update checkpoint periodically
Parquet Row-Group Resume#
Parquet sources use row-group metadata for efficient seeking:
# Parquet file with 100 row groups of 10000 rows each
# Resume at row 550000 = row group 55, row 0 in group
source = ParquetShardedSource("data.parquet")
# Automatically seeks to correct row group
for example in source.open_shard_at_row("data.parquet", row=550000):
process(example)
Distributed Streaming#
Assign different shards to different workers:
from easydel.data import ParquetShardedSource
source = ParquetShardedSource("gs://bucket/data/*.parquet")
# Get all shard names
all_shards = source.shard_names
# Assign to workers
worker_id = 0
num_workers = 8
worker_shards = all_shards[worker_id::num_workers]
# Each worker processes its shards
for shard in worker_shards:
for example in source.open_shard(shard):
process(example)
Streaming with Transforms#
Apply transforms while streaming:
from easydel.data import (
HuggingFaceShardedSource,
TransformedShardedSource,
ChatTemplateTransform,
)
from transformers import AutoTokenizer
tokenizer = AutoTokenizer.from_pretrained("meta-llama/Llama-2-7b-chat-hf")
# Stream from HuggingFace
source = HuggingFaceShardedSource(
"tatsu-lab/alpaca",
streaming=True,
)
# Apply chat template on the fly
transformed = TransformedShardedSource(
source,
transform=ChatTemplateTransform(tokenizer),
)
# Memory-efficient: transforms applied during iteration
for example in transformed.open_shard(transformed.shard_names[0]):
print(example["text"])
Streaming Pipeline#
Full pipeline with streaming:
from easydel.data import (
Pipeline,
PipelineConfig,
DatasetConfig,
LoadStageConfig,
)
config = PipelineConfig(
datasets=[
DatasetConfig(
data_files="gs://bucket/data/*.parquet",
type="parquet",
),
],
streaming=True, # Enable streaming mode
load=LoadStageConfig(
batch_size=8,
prefetch_enabled=True,
prefetch_buffer_size=4,
shuffle_buffer_size=10000,
),
)
pipeline = Pipeline.from_config(config)
for batch in pipeline.source().load().build():
train_step(batch)
Performance Optimization#
1. Use Parquet with Column Projection#
# Only load needed columns
source = ParquetShardedSource(
"gs://bucket/data/*.parquet",
columns=["input_ids", "attention_mask"], # Skip unused columns
)
2. Optimal Shard Size#
Ideal shard size: 100MB - 500MB
- Too small: Too many files, high overhead
- Too large: Slow resume, high memory for seeking
3. Increase Prefetch Buffer#
loader = AsyncDataLoader(
source=source,
prefetch_buffer_size=8, # More batches in flight
)
4. Use Fast Storage for Cache#
source = HuggingFaceShardedSource(
"dataset",
streaming=True,
cache_dir="/nvme/hf_cache", # Fast local SSD
)
Monitoring Streaming#
import time
from easydel.data import ParquetShardedSource
source = ParquetShardedSource("gs://bucket/data/*.parquet")
start = time.time()
count = 0
for shard in source.shard_names:
for example in source.open_shard(shard):
count += 1
if count % 10000 == 0:
elapsed = time.time() - start
rate = count / elapsed
print(f"Processed {count} examples, {rate:.1f} ex/sec")
Troubleshooting#
Slow Streaming from Cloud#
# Check: Are you using column projection?
source = ParquetShardedSource(
"gs://bucket/*.parquet",
columns=["input_ids"], # Add this
)
# Check: Is prefetching enabled?
loader = AsyncDataLoader(
source=source,
prefetch_enabled=True, # Should be True
)
Memory Growing During Streaming#
# Check: Is shuffle buffer too large?
loader = AsyncDataLoader(
source=source,
shuffle_buffer_size=10000, # Reduce if OOM
)
# Check: Are you holding references?
for example in source.open_shard(shard):
process(example) # Don't store examples
Network Errors#
# Automatic retry handles most issues
# For persistent errors, check:
# 1. Credentials valid?
# 2. Network connectivity?
# 3. Rate limiting?
Next Steps#
Dataset Mixing - Mix streamed datasets
Caching - Cache streamed data
Trainer Integration - Use with trainers