Online Data Streaming#
This page documents the online data streaming pipeline used for live training from simulation. The core pieces are:
OnlineDataEngine: a process-safe shared buffer that stores trajectories coming from live simulation workers.
OnlineDataset: a PyTorch
IterableDatasetthat samples trajectory chunks from the engine in either item mode or batch mode.ChunkSizeSampler: an interface for drawing dynamic chunk sizes per iteration step.
These components live under embodichain/agents/ and are designed to work with standard DataLoader patterns.
OnlineDataEngine#
Module: embodichain/agents/engine/data.py
OnlineDataEngine manages an in-memory, shared buffer for streaming trajectory data. A typical usage pattern is:
Build and start the engine with
OnlineDataEngineCfg.Run simulation workers that continually push new experience into the engine.
Train by sampling trajectory chunks from the engine via
OnlineDataset.
Key ideas:
Shared buffer: multiple producers (simulation workers) and multiple consumers (training workers) can read/write concurrently.
GPU-friendly: buffer is designed for efficient sampling and minimal copying.
Chunked sampling: training samples fixed-length or dynamically sized chunks.
Minimal setup#
from embodichain.agents.engine.data import OnlineDataEngine, OnlineDataEngineCfg
cfg = OnlineDataEngineCfg(
buffer_size=2, # number of trajectories kept in the ring buffer
state_dim=6, # example state dimension
gym_config=your_gym_cfg, # parsed JSON config for the task
)
engine = OnlineDataEngine(cfg)
engine.start()
Shutdown#
engine.stop()
OnlineDataset#
Module: embodichain/agents/datasets/online_data.py
OnlineDataset wraps a live OnlineDataEngine and exposes a PyTorch IterableDataset. It supports two modes:
Item mode (default)#
Create the dataset with
batch_size=None(default).Each iteration yields a single
TensorDictof shape[chunk_size, ...].Use
DataLoader(dataset, batch_size=B)to let the DataLoader stack items into batches.
from torch.utils.data import DataLoader
from embodichain.agents.datasets import OnlineDataset
dataset = OnlineDataset(engine, chunk_size=64)
loader = DataLoader(
dataset,
batch_size=32,
collate_fn=OnlineDataset.collate_fn,
)
for batch in loader:
# batch shape: [32, 64, ...]
train_step(batch)
Batch mode#
Create the dataset with
batch_size=N.Each iteration yields a pre-batched
TensorDictof shape[N, chunk_size, ...].Use
DataLoader(dataset, batch_size=None)to bypass auto-collation.
dataset = OnlineDataset(engine, chunk_size=64, batch_size=32)
loader = DataLoader(
dataset,
batch_size=None,
collate_fn=OnlineDataset.passthrough_collate_fn,
)
for batch in loader:
# batch shape: [32, 64, ...]
train_step(batch)
Dynamic chunk sizes#
Pass a ChunkSizeSampler instead of an int to chunk_size to sample a new length each iteration step.
from embodichain.agents.datasets.sampler import UniformChunkSampler
sampler = UniformChunkSampler(low=16, high=64)
dataset = OnlineDataset(engine, chunk_size=sampler)
In batch mode, the sampler is called once per step so all trajectories in the batch share the same chunk length.
ChunkSizeSampler#
Module: embodichain/agents/datasets/sampler.py
ChunkSizeSampler is a small interface that returns a positive integer chunk size each time it is called.
Built-in samplers:
UniformChunkSampler(low, high): discrete uniform over[low, high].GMMChunkSampler(means, stds, weights, low, high): Gaussian mixture with optional bounds.
Example (GMM):
from embodichain.agents.datasets.sampler import GMMChunkSampler
sampler = GMMChunkSampler(
means=[16.0, 64.0],
stds=[4.0, 8.0],
weights=[0.6, 0.4],
low=8,
high=96,
)
End-to-end demo#
A runnable example that wires everything together is provided in:
examples/agents/datasets/online_dataset_demo.py
It shows item mode, batch mode, and dynamic chunk sizes. Run it with:
python examples/agents/datasets/online_dataset_demo.py