Data Sources#
EasyData supports loading data from multiple sources with a unified ShardedDataSource interface.
Supported Formats#
Format |
Class |
Extensions |
Cloud Support |
|---|---|---|---|
Parquet |
|
|
GCS, S3, HTTP |
JSON/JSONL |
|
|
GCS, S3, HTTP |
Arrow IPC |
|
|
GCS, S3, HTTP |
CSV |
|
|
GCS, S3, HTTP |
Plain Text |
|
|
GCS, S3, HTTP |
HuggingFace Hub |
|
- |
Native |
HF Dataset |
|
- |
- |
ParquetShardedSource#
Best for large-scale training with efficient row-group level seeking.
from easydel.data import ParquetShardedSource
# Local files with glob pattern
source = ParquetShardedSource("data/*.parquet")
# Google Cloud Storage
source = ParquetShardedSource(
"gs://my-bucket/training/*.parquet",
storage_options={"token": "cloud"}, # Default credentials
)
# AWS S3
source = ParquetShardedSource(
"s3://my-bucket/data/*.parquet",
storage_options={
"key": "ACCESS_KEY_ID",
"secret": "SECRET_ACCESS_KEY",
"endpoint_url": "https://s3.amazonaws.com", # Optional
},
)
# Load specific columns only
source = ParquetShardedSource(
"data/*.parquet",
columns=["input_ids", "attention_mask"], # Memory efficient
)
# Iteration
for shard_name in source.shard_names:
for example in source.open_shard(shard_name):
print(example)
# Resume from specific row (uses row-group metadata)
for example in source.open_shard_at_row("data/shard_0.parquet", row=10000):
print(example)
Parquet Features#
Row-group level seeking: Efficient resume without reading entire file
Column projection: Load only needed columns
Automatic retry: 3 retries with exponential backoff for cloud
Parallel shard access: Distribute shards across workers
JsonShardedSource#
For JSON array files or JSONL (one JSON per line).
from easydel.data import JsonShardedSource
# JSONL files (default)
source = JsonShardedSource("data/*.jsonl")
# JSON array files
source = JsonShardedSource("data/*.json", jsonl=False)
# GCS with credentials
source = JsonShardedSource(
"gs://bucket/data/*.jsonl",
storage_options={"token": "/path/to/service-account.json"},
)
# Iteration
for shard in source.shard_names:
for record in source.open_shard(shard):
print(record)
JSON vs JSONL#
JSONL (recommended for large datasets):
{"text": "First example"}
{"text": "Second example"}
{"text": "Third example"}
JSON array (simpler but loads entire file):
[
{"text": "First example"},
{"text": "Second example"},
{"text": "Third example"}
]
ArrowShardedSource#
For Arrow IPC format files.
from easydel.data import ArrowShardedSource
source = ArrowShardedSource("data/*.arrow")
# With cloud storage
source = ArrowShardedSource(
"gs://bucket/arrow-data/*.arrow",
storage_options={"token": "cloud"},
)
CsvShardedSource#
For CSV files with headers.
from easydel.data import CsvShardedSource
# Standard CSV
source = CsvShardedSource("data/*.csv")
# TSV (tab-separated)
source = CsvShardedSource("data/*.tsv", delimiter="\t")
# Custom storage options
source = CsvShardedSource(
"s3://bucket/data/*.csv",
storage_options={"key": "...", "secret": "..."},
)
TextShardedSource#
For plain text files where each line becomes a record.
from easydel.data import TextShardedSource
# Each line becomes {"text": "line content"}
source = TextShardedSource("data/*.txt")
# Custom field name
source = TextShardedSource(
"data/*.txt",
text_field="content", # {"content": "line content"}
)
HuggingFaceShardedSource#
For streaming directly from HuggingFace Hub.
from easydel.data import HuggingFaceShardedSource
# Basic usage
source = HuggingFaceShardedSource(
dataset_name="HuggingFaceFW/fineweb",
split="train",
streaming=True,
)
# With subset/configuration
source = HuggingFaceShardedSource(
dataset_name="bigcode/the-stack",
split="train",
subset="python", # Language subset
streaming=True,
cache_dir="/data/hf_cache",
)
# Non-streaming (loads to memory)
source = HuggingFaceShardedSource(
dataset_name="tatsu-lab/alpaca",
split="train",
streaming=False, # Full download
)
HFDatasetShardedSource#
Wraps an existing HuggingFace Dataset as ShardedDataSource.
from datasets import load_dataset
from easydel.data import HFDatasetShardedSource, wrap_hf_dataset
# Load HuggingFace dataset
hf_ds = load_dataset("trl-lib/ultrafeedback_binarized", split="train")
# Wrap as ShardedDataSource
source = HFDatasetShardedSource(hf_ds)
# Or use the convenience function
source = wrap_hf_dataset(hf_ds)
# Works with IterableDataset too
hf_streaming = load_dataset("HuggingFaceFW/fineweb", split="train", streaming=True)
source = wrap_hf_dataset(hf_streaming)
CompositeShardedSource#
Combine multiple sources of different types.
from easydel.data import (
CompositeShardedSource,
ParquetShardedSource,
JsonShardedSource,
)
# Combine different formats
parquet_source = ParquetShardedSource("data/parquet/*.parquet")
json_source = JsonShardedSource("data/json/*.jsonl")
combined = CompositeShardedSource([parquet_source, json_source])
# All shards from both sources
print(combined.shard_names) # ['data/parquet/0.parquet', 'data/json/0.jsonl', ...]
Auto-detection with create_source#
Automatically detect and create appropriate source:
from easydel.data import create_source, DatasetConfig
# From config
config = DatasetConfig(
data_files="data/*.parquet", # Auto-detects parquet
)
source = create_source(config)
# HuggingFace detection
config = DatasetConfig(
data_files="trl-lib/ultrafeedback_binarized",
type="huggingface",
split="train",
)
source = create_source(config)
Cloud Storage Authentication#
Google Cloud Storage (GCS)#
# Default credentials (recommended for GCE/Cloud Run)
storage_options = {"token": "cloud"}
# Service account JSON
storage_options = {"token": "/path/to/service-account.json"}
# Inline credentials
storage_options = {"token": {"type": "service_account", ...}}
source = ParquetShardedSource("gs://bucket/data/*.parquet", storage_options=storage_options)
Amazon S3#
# Access keys
storage_options = {
"key": "ACCESS_KEY_ID",
"secret": "SECRET_ACCESS_KEY",
}
# With region
storage_options = {
"key": "ACCESS_KEY_ID",
"secret": "SECRET_ACCESS_KEY",
"client_kwargs": {"region_name": "us-west-2"},
}
# Anonymous access (public buckets)
storage_options = {"anon": True}
source = ParquetShardedSource("s3://bucket/data/*.parquet", storage_options=storage_options)
HTTP/HTTPS#
# Public URLs
source = ParquetShardedSource("https://example.com/data/*.parquet")
# With auth headers
storage_options = {
"headers": {"Authorization": "Bearer TOKEN"},
}
source = JsonShardedSource("https://api.example.com/data.jsonl", storage_options=storage_options)
Glob Patterns#
All sources support glob patterns:
# Single directory
source = ParquetShardedSource("data/*.parquet")
# Recursive
source = ParquetShardedSource("data/**/*.parquet")
# Multiple patterns
source = ParquetShardedSource(["data/2023/*.parquet", "data/2024/*.parquet"])
# Character ranges
source = JsonShardedSource("data/shard_[0-9].jsonl")
Resumable Iteration#
All sources support resuming from a specific position:
# Get checkpoint
checkpoint = {"shard": "data/shard_5.parquet", "row": 12345}
# Resume iteration
source = ParquetShardedSource("data/*.parquet")
for example in source.open_shard_at_row(checkpoint["shard"], checkpoint["row"]):
# Process example
# Save new checkpoint periodically
pass
Performance Tips#
Use Parquet for large datasets: Best compression and column projection
Enable streaming for HuggingFace: Avoids downloading entire dataset
Column projection: Load only needed columns with
columns=Shard distribution: Assign different shards to different workers
Prefetch: Use
AsyncDataLoaderfor overlap with training
Next Steps#
Transforms - Apply transformations to loaded data
Streaming - Detailed streaming guide
Pipeline API - Chain sources with processing